Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24157][SS] Enabled no-data batches in MicroBatchExecution for streaming aggregation and deduplication. #21220

Closed
wants to merge 2 commits into from

Conversation

tdas
Copy link
Contributor

@tdas tdas commented May 2, 2018

What changes were proposed in this pull request?

This PR enables the MicroBatchExecution to run no-data batches if some SparkPlan requires running another batch to output results based on updated watermark / processing time. In this PR, I have enabled streaming aggregations and streaming deduplicates to automatically run addition batch even if new data is available. See https://issues.apache.org/jira/browse/SPARK-24156 for more context.

Major changes/refactoring done in this PR.

  • Refactoring MicroBatchExecution - A major point of confusion in MicroBatchExecution control flow was always (at least to me) was that populateStartOffsets internally called constructNextBatch which was not obvious from just the name "populateStartOffsets" and made the control flow from the main trigger execution loop very confusing (main loop in runActivatedStream called constructNextBatch but only if populateStartOffsets hadn't already called it). Instead, the refactoring makes it cleaner.

    • populateStartOffsets only the updates availableOffsets and committedOffsets. Does not call constructNextBatch.
    • Main loop in runActivatedStream calls constructNextBatch which returns true or false reflecting whether the next batch is ready for executing. This method is now idempotent; if a batch has already been constructed, then it will always return true until the batch has been executed.
    • If next batch is ready then we call runBatch or sleep.
    • That's it.
  • Refactoring watermark management logic - This has been refactored out from MicroBatchExecution in a separate class to simplify MicroBatchExecution.

  • New method shouldRunAnotherBatch in IncrementalExecution - This returns true if there is any stateful operation in the last execution plan that requires another batch for state cleanup, etc. This is used to decide whether to construct a batch or not in constructNextBatch.

  • Changes to stream testing framework - Many tests used CheckLastBatch to validate answers. This assumed that there will be no more batches after the last set of input has been processed, so the last batch is the one that has output corresponding to the last input. This is not true anymore. To account for that, I made two changes.

    • CheckNewAnswer is a new test action that verifies the new rows generated since the last time the answer was checked by CheckAnswer, CheckNewAnswer or CheckLastBatch. This is agnostic to how many batches occurred between the last check and now. To do make this easier, I added a common trait between MemorySink and MemorySinkV2 to abstract out some common methods.
    • assertNumStateRows has been updated in the same way to be agnostic to batches while checking what the total rows and how many state rows were updated (sums up updates since the last check).

How was this patch tested?

  • Changes made to existing tests - Tests have been changed in one of the following patterns.
    • Tests where the last input was given again to force another batch to be executed and state cleaned up / output generated, they were simplified by removing the extra input.
    • Tests using aggregation+watermark where CheckLastBatch were replaced with CheckNewAnswer to make them batch agnostic.
  • New tests added to check whether the flag works for streaming aggregation and deduplication

@tdas
Copy link
Contributor Author

tdas commented May 2, 2018

jenkins retest this please.

@tdas
Copy link
Contributor Author

tdas commented May 2, 2018

@brkyvz @zsxwing @jose-torres

@SparkQA
Copy link

SparkQA commented May 3, 2018

Test build #90080 has finished for PR 21220 at commit 7fa11c0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class WatermarkTracker extends Logging
  • trait MemorySinkBase extends BaseStreamingSink
  • class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink
  • class MemorySinkV2 extends DataSourceV2 with StreamWriteSupport with MemorySinkBase with Logging

@SparkQA
Copy link

SparkQA commented May 3, 2018

Test build #90078 has finished for PR 21220 at commit 7fa11c0.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class WatermarkTracker extends Logging
  • trait MemorySinkBase extends BaseStreamingSink
  • class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink
  • class MemorySinkV2 extends DataSourceV2 with StreamWriteSupport with MemorySinkBase with Logging

Copy link
Contributor

@brkyvz brkyvz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good generally. Pretty hard to review through GitHub, I had to checkout locally.

Some questions:

  • There are some emptyDF optimizations for aggregations. Are we sure they won't get triggered with zero-data batches, causing this to not work
  • What about processing time timeouts for flatMapGroupsWithState?

reportTimeTaken("triggerExecution") {
startTrigger()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this used to be out of the timing block

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct. fixed.

@@ -266,93 +276,62 @@ class MicroBatchExecution(
}

/**
* Queries all of the sources to see if any new data is available. When there is new data the
* batchId counter is incremented and a new log entry is written with the newest offsets.
* Attempts to construct the next batch based on whether new data is available and/or updated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this paragraph is highly confusing. Could you please reword? Maybe something like:

Attempts to construct a batch according to:
 - Availability of new data
 - Existence of timeouts in stateful operators

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

updateStatusMessage("Processing new data")
// Remember whether the current batch has data or not. This will be required later
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the status message above isn't completely truthful if we are running a zero-data batch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can argue that we are still processing the effect of the new data from the previous batch. This is something we can fix later if this is confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed it.

// is available or not.
currentBatchIsRunnable = constructNextBatch()

currentStatus = currentStatus.copy(isDataAvailable = isNewDataAvailable)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not set currentBatchHadNewData here? It's not immediately clear to me if isNewDataAvailable can update between here and 3 lines below.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then you can do something like:

if (currentBatchIsRunnable && currentBatchHasNewData) {
  updateStatusMessage("Processing new data")
  runBatch(sparkSessionForStream)
} else if (currentBatchIsRunnable) {
  updateStatusMessage("Processing empty trigger to timeout state") // or whatever
  runBatch(sparkSessionForStream)
} else {
  updateStatusMessage("Waiting for data to arrive")
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could do that. Though it would be duplicating the line runBatch(sparkSessionForStream)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

// If new data is already available that means this method has already been called before
// and it must have already committed the offset range of next batch to the offset log.
// Hence do nothing, just return true.
if (isNewDataAvailable) return true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is this possible?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see anyone else calling this method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition is possible when restarting. If it finds that the last batch was planned but not completed, then there is new data is already available and committed to the offset log.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the second question, which method are you talking about? isNewDataAvailable? That's being called.

@@ -373,7 +352,7 @@ class MicroBatchExecution(
reader.commit(reader.deserializeOffset(off.json))
}
} else {
throw new IllegalStateException(s"batch $currentBatchId doesn't exist")
throw new IllegalStateException(s"batch ${currentBatchId - 1} doesn't exist")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

}

/**
* Processes any data available between `availableOffsets` and `committedOffsets`.
* @param sparkSessionToRunBatch Isolated [[SparkSession]] to run this batch with.
*/
private def runBatch(sparkSessionToRunBatch: SparkSession): Unit = {
logDebug(s"Running batch $currentBatchId")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we're going to see if all sources follow the contract of returning an empty dataframe if the start and end offsets are the same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well a correct source implementation should obviously do that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brkyvz @tdas We don't call getBatch if source and end are the same.

}

def updateWatermark(executedPlan: SparkPlan): Unit = synchronized {
val watermarkOperators = executedPlan.collect {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would comment on the contracts. We expect a certain ordering of stateful operators across triggers. therefore we turn off cbo, etc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think we do anything that depends on ordering of EventTimeWatermarkExec. We choose to take the minimum watermark calculated across multiple of them independent of the order.

}
}

def watermarkUpdated: Boolean = synchronized { updated }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this used anywhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. I will remove it.

@@ -279,13 +279,10 @@ class FileStreamSinkSuite extends StreamTest {
check() // nothing emitted yet

addTimestamp(104, 123) // watermark = 90 before this, watermark = 123 - 10 = 113 after this
check() // nothing emitted yet
check((100L, 105L) -> 2L) // no-data-batch emits results on 100-105,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would explicitly test with flag on for this. in case we want to turn it off, this test shouldn't fail

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think most of such feature flags in spark are designed to be flagged off not in code but at runtime.

finishTrigger(dataAvailable)
if (dataAvailable) {
// Update committed offsets.
commitLog.add(currentBatchId)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brkyvz this also fixes a different bug-ish thing where we were no reporting the time taken to write to commit log as part of the "triggerExecution" since its outside the reportTimeTaken("triggerExecution")

@SparkQA
Copy link

SparkQA commented May 4, 2018

Test build #90172 has finished for PR 21220 at commit da3fd2f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor Author

tdas commented May 4, 2018

@brkyvz Answers to your questions.

  1. We have already fixed those emptyDF optimizations. The optimization only quick in df.isStreaming = false, and emptyDFs generated by sources should be have isStreaming = true. That's for v1 sources. For v2 sources, the engine already takes care of that by making sure that StreamingDataSourceV2Relation.isStreaming is true where StreamingDataSourceV2Relation is the logical plan leaf inserted into the micro-batch logical plan irrespective of empty or not.

  2. I will do both type of timeouts in flatMapGroupsWithState in later PR.

1 similar comment
@tdas
Copy link
Contributor Author

tdas commented May 4, 2018

@brkyvz Answers to your questions.

  1. We have already fixed those emptyDF optimizations. The optimization only quick in df.isStreaming = false, and emptyDFs generated by sources should be have isStreaming = true. That's for v1 sources. For v2 sources, the engine already takes care of that by making sure that StreamingDataSourceV2Relation.isStreaming is true where StreamingDataSourceV2Relation is the logical plan leaf inserted into the micro-batch logical plan irrespective of empty or not.

  2. I will do both type of timeouts in flatMapGroupsWithState in later PR.

@jose-torres
Copy link
Contributor

LGTM.

Obviously shouldn't block this PR, but MicroBatchExecution is structured in a way that makes it hard to review changes like this. It seems like changing the condition under which new batches are run should have been a much more local change than it ended up having to be.

@tdas
Copy link
Contributor Author

tdas commented May 4, 2018

Yeah. This refactoring was needed. Now it should be easier to make such changes. Thank you for reviewing @brkyvz and @jose-torres for your offline and PR comments.

@asfgit asfgit closed this in 47b5b68 May 4, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants