Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26872][STREAMING] Use a configurable value for final termination in the JobScheduler.stop() method #23926

Closed
wants to merge 1 commit into from

Conversation

shivusondur
Copy link
Contributor

What changes were proposed in this pull request?

This PR provides user to set the spark.streaming.jobTimeout, After that it will terminate forcefully.

How was this patch tested?

Tested manually

…n UI filters are added in spark-sql launch

What changes were proposed in this pull request?
This PR provides user to set the spark.streaming.jobTimeout, After that it will terminate forcefully.

How was this patch tested?
Tested manually
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@srowen
Copy link
Member

srowen commented Mar 1, 2019

What's the use case for this? I think it would be abnormal for termination to take a long time in this case

@smrosenberry
Copy link

Recognizing my use case may take hours, I would still suggest the units for the configurable value be minutes -- not hours.

What's the use case for this? I think it would be abnormal for termination to take a long time in this case

From my initial Jira request:

My use case may take many hours to finish the queued job as it was created from a large data file.

I agree I'm abusing the Spark Streaming Context, but take it as a compliment that the Spark code can be used in flexible unforeseen ways!

@srowen
Copy link
Member

srowen commented Mar 2, 2019

Hm, I thought an earlier part of the code is what waited for the batches to complete, but I think this is it. What about waiting for a multiple of the batch interval in this case?

@srowen srowen changed the title [SPARK-26872][Minor]Use a configurable value for final termination in the JobScheduler.stop() method [SPARK-26872][STREAMING] Use a configurable value for final termination in the JobScheduler.stop() method Mar 2, 2019
@smrosenberry
Copy link

Basically, I found I could process a single batch of file input data through a streaming pipeline by:

  1. Preloading the streaming context queue with an RDD of the records from the file(s): StreamingContext.queueStream(queue,false)
  2. Starting the streaming context: StreamingContext.start()
  3. Immediately and gracefully stopping the streaming context: StreamingContext.stop(true,true)

The batch interval, not unexpectedly, determines when the first (and in my case only) batch actually begins processing. Since I'm impatient (and who among us isn't?), my batch interval is 1 millisecond. Processing begins immediately.

Based upon the size of the input file, my expectation is to set the new spark.streaming.jobTimeout value to twice the guestimated run time.

I expect my jobs to run for hours, not days. While specifying the jobTimeout in units of hours is acceptable, it may not be granular enough for other potential use cases. Specifying the timeout in minutes feels like the proper compromise between flexibility and awkwardly large numbers.

@srowen
Copy link
Member

srowen commented Mar 2, 2019

In that case, why use streaming at all? is it for testing?

@smrosenberry
Copy link

The application processes streaming data from Kafka 24/7. The file processing is a backup mechanism for those "rare" occasions when something goes bump in the night and downstream processing fails. We manually run the same application to pick up the missing output by processing the raw input files that were saved while processing the streaming data.

We have had issues with the manual process using StreamingContext.textFileStream() including the sheer number of files, amount of data, and time to copy the raw input files to the directory being monitored by the textFileStream.

The single batch file data technique I outlined previously allows us to again use the same application to process the input data but now by reading the input data directly from where it sits without the moving and file watching.

The longer goal is to restructure the application architecture so either a DStream or DataFrame pipeline can be built: DStream for Kafka streaming processing, Dataframe for file input processing. Unfortunately, the current architecture is such that DStream parameters are passed widely and deeply, and restructuring will not be a quick, easy effort.

@srowen
Copy link
Member

srowen commented Mar 2, 2019

It still seems much easier to process manually by not using streaming. Running one batch and stopping is exactly what non-streaming running is. I'm not getting it if that's the use case.

@smrosenberry
Copy link

The problem is that the application currently only supports the DStream processing model and is difficult to modify for DataFrame processing. The issues with DStream monitored directories led to the workaround. The only thing holding back the workaround is the one hour hard coded limit currently in the code. Making that configurable provides a processing model that has merit in certain, albeit limited, use cases. Other than yet another configuration item, which I admit has cost, the changes are benign and beneficial.

@srowen
Copy link
Member

srowen commented Mar 4, 2019

How about you wait for the batch to finish, and then shut it down? possibly with shutdownNow()? if there are no more batches, that should terminate quickly anyway, no?

@smrosenberry
Copy link

smrosenberry commented Mar 4, 2019 via email

@shivusondur
Copy link
Contributor Author

shivusondur commented Mar 4, 2019

@smrosenberry
Configuration added is TimeDuration Type, which can support mentioning time in minutes also, Example we can mention as spark.streaming.jobTimeout=60m

@shivusondur
Copy link
Contributor Author

@srowen and @smrosenberry
Thanks for your comments, detailed discussion on use case

@smrosenberry
Copy link

@shivusondur Thanks for the support for community requests!

@srowen
Copy link
Member

srowen commented Mar 4, 2019

I think the only change I'd entertain here is some multiple of the batch interval, like 2. I don't think this warrants a whole new config. I don't see why data is not available at the first batch in your example, but, whatever it is I am not sure Spark should accommodate it with a new config.

@smrosenberry
Copy link

Unfortunately, the batch interval introduces different issues for this use case (see my previous message) as it controls the ongoing streaming process.

The configuration needed for this use case is to gracefully stop the streaming. From questions on StackOverflow, I know others besides myself would find useful something that would limit the number of batches created and processed followed by a clean termination of the application.

Without much research, I expect such a change would reach deeper into the core code than the proposed spark.streaming.jobTimeout which simply uses existing code by eliminating a hard-coded magic number.

@shivusondur
Copy link
Contributor Author

shivusondur commented Mar 8, 2019

@srowen
Isn't it always better to make it configurable than hard coding?. We can make this configuration internal and not expose to the end user. So only developer can configure it. Is that fine?

@smrosenberry
Copy link

When you write "only developer can configure", are you suggesting changing the value in source and then rebuilding?

Security policy would prevent me from doing that. If I could have done that, I would have done so. ;)

@shivusondur
Copy link
Contributor Author

@smrosenberry
"only developer can configure" means it is still available for configuration for all.
But corresponding details are not exposed in the user through documentation

@srowen srowen closed this May 28, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants