-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-575] Support Async Compaction for spark streaming writes to hudi table #1752
Conversation
* In case of deltastreamer, Spark job scheduling configs are automatically set. | ||
* As the configs needs to be set before spark context is initiated, it is not | ||
* automated for Structured Streaming. | ||
* https://spark.apache.org/docs/latest/job-scheduling.html |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://jira.apache.org/jira/browse/HUDI-1031 to add to docs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some preliminary comments..
I rebased this PR against latest master..
Will continue to review.
hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
Outdated
Show resolved
Hide resolved
val asyncCompactionEnabled = isAsyncCompactionEnabled(client, parameters, jsc.hadoopConfiguration()) | ||
val compactionInstant : common.util.Option[java.lang.String] = | ||
if (asyncCompactionEnabled) { | ||
client.scheduleCompaction(common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap)))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible that nothing is actually scheduled here, since there is nothiing to compact?
private def isAsyncCompactionEnabled(client: HoodieWriteClient[HoodieRecordPayload[Nothing]], | ||
parameters: Map[String, String], configuration: Configuration) : Boolean = { | ||
log.info(s"Config.isInlineCompaction ? ${client.getConfig.isInlineCompaction}") | ||
if (!client.getConfig.isInlineCompaction |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if the user sets the writeClient config for inline
= false and does not set async compaction datasource option? should we control at a single level..
* active. If there is no activity for sufficient period, async compactor shuts down. If the sink was indeed active, | ||
* a subsequent batch will re-trigger async compaction. | ||
*/ | ||
public class SparkStreamingWriterActivityDetector { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
basic question.. if there are no writes , no compaction gets scheduled right? so async compaction is a no-op i.e it will check if there is some work to do, if not won't trigger anything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks close. but we need to iron out the activity detection/async compaction thread shutdown
} | ||
|
||
/** | ||
* Spark Structured Streaming Sink implementation do not have mechanism to know when the stream is shutdown. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move comments that refer to a sub-class impl to that class itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
long currTime = System.nanoTime(); | ||
long elapsedTimeSecs = Double.valueOf(Math.ceil(1.0 * (currTime - lastEndBatchTime) / SECS_TO_NANOS)).longValue(); | ||
if (elapsedTimeSecs > sinkInactivityTimeoutSecs) { | ||
LOG.warn("Streaming Sink has been idle for " + elapsedTimeSecs + " seconds"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this does not mean there is no work for compaction right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is deleted.
@@ -58,6 +58,7 @@ | |||
protected static final String PRESTO_COORDINATOR = "/presto-coordinator-1"; | |||
protected static final String HOODIE_WS_ROOT = "/var/hoodie/ws"; | |||
protected static final String HOODIE_JAVA_APP = HOODIE_WS_ROOT + "/hudi-spark/run_hoodie_app.sh"; | |||
protected static final String HOODIE_JAVA_STREAMING_APP = HOODIE_WS_ROOT + "/hudi-spark/run_hoodie_streaming_app.sh"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
more importantly, we should also renable the test in TestDataSource
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enabled it after adding timed retry logic to wait for commits
mode, | ||
options, | ||
data) | ||
sqlContext, mode, options, data, writeClient, Some(triggerAsyncCompactor)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just confirming that reuse of writeClient across batches is fine..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this worked fine.
}) | ||
|
||
// Add Shutdown Hook | ||
Runtime.getRuntime.addShutdownHook(new Thread(new Runnable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this alone should be good enough to prevent the jvm from not hanging during exit? do we really need the laststart/lastend logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this and setting daemon mode was good enough
})) | ||
|
||
// First time, scan .hoodie folder and get all pending compactions | ||
val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this will happen each trigger/ not just first time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now, only for the first time when async compactor is null.
@@ -68,7 +74,7 @@ | |||
private String tableName = "hoodie_test"; | |||
|
|||
@Parameter(names = {"--table-type", "-t"}, description = "One of COPY_ON_WRITE or MERGE_ON_READ") | |||
private String tableType = HoodieTableType.MERGE_ON_READ.name(); | |||
private String tableType = HoodieTableType.COPY_ON_WRITE.name(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why move to COW?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted.
@@ -38,46 +50,65 @@ class HoodieStreamingSink(sqlContext: SQLContext, | |||
private val retryIntervalMs = options(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS_OPT_KEY).toLong | |||
private val ignoreFailedBatch = options(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY).toBoolean | |||
|
|||
private var isAsyncCompactorServiceShutdownAbnormally = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-StreamingQueryManager.html seems like there are some listeners we can exploit to know of a StreamingQuery?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Fixed by setting up daemon mode for async compactor thread
Codecov Report
@@ Coverage Diff @@
## master #1752 +/- ##
============================================
- Coverage 62.82% 59.95% -2.87%
- Complexity 3437 3609 +172
============================================
Files 401 439 +38
Lines 17091 19088 +1997
Branches 1698 1943 +245
============================================
+ Hits 10737 11445 +708
- Misses 5623 6850 +1227
- Partials 731 793 +62
|
d542689
to
54e2e25
Compare
All tests passes now. Will run this in a setup for few hours before merging. |
8d515f9
to
f3736c2
Compare
For the remaining 2 questions, here is the answer: what if the user sets the writeClient config for basic question.. if there are no writes , no compaction gets scheduled right? so async compaction is a no-op i.e it will check if there is some work to do, if not won't trigger anything? |
Merging as was previously discussed. |
This PR is dependent on #1577 It has 2 commits: The first commit corresponds to #1577 and the second commit is for this PR.
Contains: