-
Notifications
You must be signed in to change notification settings - Fork 200
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Snap 2828] Serialize the write ops on Column Table #1362
Conversation
Conflicts: core/src/dunit/scala/io/snappydata/cluster/SplitClusterDUnitTestBase.scala core/src/main/scala/org/apache/spark/sql/SnappySession.scala core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatRelation.scala store
Conflicts: store
Conflicts: store
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comment and clarifications. Will approve after I get the response.
@@ -370,6 +370,7 @@ trait SplitClusterDUnitTestObject extends Logging { | |||
// val connectionURL = "jdbc:snappydata://localhost:" + locatorClientPort + "/" | |||
val connectionURL = s"localhost:$locatorClientPort" | |||
logInfo(s"Starting spark job using spark://$hostName:7077, connectionURL=$connectionURL") | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inadvertent change looks like. Please revert if not intended.
logDebug(s" Going to take lock on server for table ${table}," + | ||
s" current Thread ${Thread.currentThread().getId}") | ||
val ps = conn.prepareCall(s"VALUES sys.ACQUIRE_REGION_LOCK(?)") | ||
ps.setString(1, "BULKWRITE_" + table) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You want to make this some constant?
JdbcExtendedUtils.getTableWithSchema(table, conn = null, Some(sqlContext.sparkSession)) | ||
val lock = grabLock(table, schemaName, defaultConnectionProps) | ||
|
||
if (lock.isInstanceOf[RegionLock]) lock.asInstanceOf[RegionLock].lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For uniformity I think you can move the lock acquisition inside grabLock method itself even for embedded mode. Cosmetic change, not insisting.
core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatRelation.scala
Show resolved
Hide resolved
Conflicts: core/src/main/scala/org/apache/spark/sql/SnappySession.scala store
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suranjan explained that while loop won't hang as either results will come as true or exception will come and in either case loop will end.
Conflicts: store
Changes proposed in this pull request
Take Write lock on table in case of Insert, PutInto, Update and Delete.
Release them on completion of operation. This is to serialize the write operations to avoid inconsistency.
Handles the smart connector case where we use procedure to take and release lock for a write operation on the table.
Patch testing
precheckin and hydra tests, manual tests.
ReleaseNotes.txt changes
(Does this change require an entry in ReleaseNotes.txt? If yes, has it been added to it?)
Other PRs
TIBCOSoftware/snappy-store#488
(Does this change require changes in other projects- store, spark, spark-jobserver, aqp? Add the links of PR of the other subprojects that are related to this change)