Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28869][CORE] Roll over event log files #25670

Closed
wants to merge 15 commits into from

Conversation

HeartSaVioR
Copy link
Contributor

What changes were proposed in this pull request?

This patch is a part of SPARK-28594 and design doc for SPARK-28594 is linked here: https://docs.google.com/document/d/12bdCC4nA58uveRxpeo8k7kGOI2NRTXmXyBOweSi4YcY/edit?usp=sharing

This patch proposes adding new feature to event logging, rolling event log files via configured file size.

Previously event logging is done with single file and related codebase (EventLoggingListener/FsHistoryProvider) is tightly coupled with it. This patch adds layer on both reader (EventLogFileReader) and writer (EventLogFileWriter) to decouple implementation details between "handling events" and "how to read/write events from/to file".

This patch adds two properties, spark.eventLog.rollLog and spark.eventLog.rollLog.maxFileSize which provides configurable behavior of rolling log. The feature is disabled by default, as we only expect huge event log for huge/long-running application. For other cases single event log file would be sufficient and still simpler.

Why are the changes needed?

This is a part of SPARK-28594 which addresses event log growing infinitely for long-running application.

This patch itself also provides some option for the situation where event log file gets huge and consume their storage. End users may give up replaying their events and want to delete the event log file, but given application is still running and writing the file, it's not safe to delete the file. End users will be able to delete some of old files after applying rolling over event log.

Does this PR introduce any user-facing change?

No, as the new feature is turned off by default.

How was this patch tested?

Added unit tests, as well as basic manual tests.

Basic manual tests - ran SHS, ran structured streaming query with roll event log enabled, verified split files are generated as well as SHS can load these files, with handling app status as incomplete/complete.

@HeartSaVioR
Copy link
Contributor Author

  1. Given we've added layer in this patch, would we still want to convert existing tests (EventLoggingListenerSuite/FsHistoryProviderSuite/etc.) to test with both cases (single/rolling)?

  2. I'm seeing some weird exception while testing manually:

19/09/04 06:00:11 ERROR AsyncEventQueue: Listener EventLoggingListener threw an exception
java.util.ConcurrentModificationException
	at java.util.Hashtable$Enumerator.next(Hashtable.java:1387)
	at scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:424)
	at scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:420)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:237)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:230)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.util.JsonProtocol$.mapToJson(JsonProtocol.scala:514)
	at org.apache.spark.util.JsonProtocol$.$anonfun$propertiesToJson$1(JsonProtocol.scala:520)
	at scala.Option.map(Option.scala:163)
	at org.apache.spark.util.JsonProtocol$.propertiesToJson(JsonProtocol.scala:519)
	at org.apache.spark.util.JsonProtocol$.jobStartToJson(JsonProtocol.scala:155)
	at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:79)
	at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:96)
	at org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:158)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:37)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:99)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:84)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:102)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:102)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:97)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:93)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1319)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:93)

Fixing it would be simple - clone the properties and include into SparkListenerJobStart, but just want to know it didn't occur before this patch.

@HeartSaVioR
Copy link
Contributor Author

cc. @felixcheung (as shepherd of SPARK-28594) @vanzin @squito @gengliangwang @dongjoon-hyun

also cc. to @Ngone51 as might be interested on this.

@SparkQA
Copy link

SparkQA commented Sep 3, 2019

Test build #110065 has finished for PR 25670 at commit f79f42f.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class EventLogFileReader(
  • class SingleFileEventLogFileReader(
  • class RollingEventLogFilesFileReader(
  • abstract class EventLogFileWriter(
  • class SingleEventLogFileWriter(
  • class RollingEventLogFilesWriter(

@SparkQA
Copy link

SparkQA commented Sep 4, 2019

Test build #110068 has finished for PR 25670 at commit 8c613d5.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

java.util.ConcurrentModificationException

This is also occurring with current master branch. You can reproduce it with below query in spark-shell, with continuously pushing records to topic1 and topic2.

val bootstrapServers = "localhost:9092"
val checkpointLocation = "/tmp/SPARK-28869-testing"
val sourceTopics = "topic1"
val sourceTopics2 = "topic2"
val targetTopic = "topic3"

val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", bootstrapServers).option("subscribe", sourceTopics).option("startingOffsets", "earliest").load()

val df2 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", bootstrapServers).option("subscribe", sourceTopics2).option("startingOffsets", "earliest").load()

df.union(df2).writeStream.format("kafka").option("kafka.bootstrap.servers", bootstrapServers).option("checkpointLocation", checkpointLocation).option("topic", targetTopic).start()

@SparkQA
Copy link

SparkQA commented Sep 4, 2019

Test build #110073 has finished for PR 25670 at commit 4bb9de0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@felixcheung
Copy link
Member

java.util.ConcurrentModificationException

This is also occurring with current master branch. You can reproduce it with below query in spark-shell, with continuously pushing records to topic1 and topic2.

val bootstrapServers = "localhost:9092"
val checkpointLocation = "/tmp/SPARK-28869-testing"
val sourceTopics = "topic1"
val sourceTopics2 = "topic2"
val targetTopic = "topic3"

val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", bootstrapServers).option("subscribe", sourceTopics).option("startingOffsets", "earliest").load()

val df2 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", bootstrapServers).option("subscribe", sourceTopics2).option("startingOffsets", "earliest").load()

df.union(df2).writeStream.format("kafka").option("kafka.bootstrap.servers", bootstrapServers).option("checkpointLocation", checkpointLocation).option("topic", targetTopic).start()

is there a separate jira on this?

@HeartSaVioR
Copy link
Contributor Author

Yes, filed https://issues.apache.org/jira/browse/SPARK-28967 and submitted a patch #25672

@SparkQA
Copy link

SparkQA commented Sep 4, 2019

Test build #110141 has finished for PR 25670 at commit 082cf16.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 5, 2019

Test build #110157 has finished for PR 25670 at commit 72a6253.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 5, 2019

Test build #110202 has finished for PR 25670 at commit 9b4d53d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Sep 6, 2019

Test build #110208 has finished for PR 25670 at commit 9b4d53d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

First failure: known flaky test, SPARK-26989 (submitted a patch #25706)

Second failure: SparkContext is leaked in test and makes tests failing.

[info] *** 4 SUITES ABORTED ***
[info] *** 131 TESTS FAILED ***
[error] Error: Total 418, Failed 131, Errors 4, Passed 283, Ignored 1
[error] Failed tests:
[error] 	org.apache.spark.streaming.scheduler.JobGeneratorSuite
[error] 	org.apache.spark.streaming.ReceiverInputDStreamSuite
[error] 	org.apache.spark.streaming.WindowOperationsSuite
[error] 	org.apache.spark.streaming.StreamingContextSuite
[error] 	org.apache.spark.streaming.scheduler.ReceiverTrackerSuite
[error] 	org.apache.spark.streaming.CheckpointSuite
[error] 	org.apache.spark.streaming.UISeleniumSuite
[error] 	org.apache.spark.streaming.scheduler.ExecutorAllocationManagerSuite
[error] 	org.apache.spark.streaming.ReceiverSuite
[error] 	org.apache.spark.streaming.BasicOperationsSuite
[error] 	org.apache.spark.streaming.InputStreamsSuite
[error] Error during tests:
[error] 	org.apache.spark.streaming.MapWithStateSuite
[error] 	org.apache.spark.streaming.DStreamScopeSuite
[error] 	org.apache.spark.streaming.rdd.MapWithStateRDDSuite
[error] 	org.apache.spark.streaming.scheduler.InputInfoTrackerSuite

One of stack trace follows:

[info] JobGeneratorSuite:
[info] - SPARK-6222: Do not clear received block data too soon *** FAILED *** (2 milliseconds)
[info]   org.apache.spark.SparkException: Only one SparkContext should be running in this JVM (see SPARK-2243).The currently running SparkContext was created at:
[info] org.apache.spark.SparkContext.<init>(SparkContext.scala:82)
[info] org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:851)
[info] org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:85)
[info] org.apache.spark.streaming.TestSuiteBase.setupStreams(TestSuiteBase.scala:317)
[info] org.apache.spark.streaming.TestSuiteBase.setupStreams$(TestSuiteBase.scala:311)
[info] org.apache.spark.streaming.CheckpointSuite.setupStreams(CheckpointSuite.scala:209)
[info] org.apache.spark.streaming.CheckpointSuite.$anonfun$new$3(CheckpointSuite.scala:258)
[info] scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info] org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info] org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info] org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info] org.scalatest.Transformer.apply(Transformer.scala:22)
[info] org.scalatest.Transformer.apply(Transformer.scala:20)
[info] org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
[info] org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:149)
[info] org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSuiteLike.scala:184)
[info] org.scalatest.FunSuiteLike.$anonfun$runTest$1(FunSuiteLike.scala:196)
[info] org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
[info] org.scalatest.FunSuiteLike.runTest(FunSuiteLike.scala:196)
[info] org.scalatest.FunSuiteLike.runTest$(FunSuiteLike.scala:178)
[info]   at org.apache.spark.SparkContext$.$anonfun$assertNoOtherContextIsRunning$2(SparkContext.scala:2512)
[info]   at scala.Option.foreach(Option.scala:274)
[info]   at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2509)
[info]   at org.apache.spark.SparkContext$.markPartiallyConstructed(SparkContext.scala:2586)
[info]   at org.apache.spark.SparkContext.<init>(SparkContext.scala:87)
[info]   at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:851)
[info]   at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:85)
[info]   at org.apache.spark.streaming.scheduler.JobGeneratorSuite.$anonfun$new$1(JobGeneratorSuite.scala:65)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:149)
[info]   at org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSuiteLike.scala:184)
[info]   at org.scalatest.FunSuiteLike.$anonfun$runTest$1(FunSuiteLike.scala:196)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
[info]   at org.scalatest.FunSuiteLike.runTest(FunSuiteLike.scala:196)
[info]   at org.scalatest.FunSuiteLike.runTest$(FunSuiteLike.scala:178)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:56)
[info]   at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:221)
[info]   at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:214)
[info]   at org.apache.spark.streaming.scheduler.JobGeneratorSuite.org$scalatest$BeforeAndAfter$$super$runTest(JobGeneratorSuite.scala:30)
[info]   at org.scalatest.BeforeAndAfter.runTest(BeforeAndAfter.scala:203)
[info]   at org.scalatest.BeforeAndAfter.runTest$(BeforeAndAfter.scala:192)
[info]   at org.apache.spark.streaming.scheduler.JobGeneratorSuite.runTest(JobGeneratorSuite.scala:30)
[info]   at org.scalatest.FunSuiteLike.$anonfun$runTests$1(FunSuiteLike.scala:229)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:396)
[info]   at scala.collection.immutable.List.foreach(List.scala:392)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:379)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
[info]   at org.scalatest.FunSuiteLike.runTests(FunSuiteLike.scala:229)
[info]   at org.scalatest.FunSuiteLike.runTests$(FunSuiteLike.scala:228)
[info]   at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
[info]   at org.scalatest.Suite.run(Suite.scala:1147)
[info]   at org.scalatest.Suite.run$(Suite.scala:1129)
[info]   at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
[info]   at org.scalatest.FunSuiteLike.$anonfun$run$1(FunSuiteLike.scala:233)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
[info]   at org.scalatest.FunSuiteLike.run(FunSuiteLike.scala:233)
[info]   at org.scalatest.FunSuiteLike.run$(FunSuiteLike.scala:232)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:56)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.spark.streaming.scheduler.JobGeneratorSuite.org$scalatest$BeforeAndAfter$$super$run(JobGeneratorSuite.scala:30)
[info]   at org.scalatest.BeforeAndAfter.run(BeforeAndAfter.scala:258)
[info]   at org.scalatest.BeforeAndAfter.run$(BeforeAndAfter.scala:256)
[info]   at org.apache.spark.streaming.scheduler.JobGeneratorSuite.run(JobGeneratorSuite.scala:30)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:507)
[info]   at sbt.ForkMain$Run$2.call(ForkMain.java:296)
[info]   at sbt.ForkMain$Run$2.call(ForkMain.java:286)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]   at java.lang.Thread.run(Thread.java:748)

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Sep 6, 2019

Test build #110220 has finished for PR 25670 at commit 9b4d53d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

I'd like to put some efforts if that accelerates reviewing. Would it help reviewing if I split this to two parts - writer (EventLoggingListener) / reader (SHS)?

@HeartSaVioR
Copy link
Contributor Author

Btw, I'll start working on next stuff which doesn't depend on this patch. Maybe I'll split them down into smaller parts than what I planned. Some parts may not be unused until following part will come.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Sep 17, 2019

Test build #110704 has finished for PR 25670 at commit 9b4d53d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

UT failure: SPARK-29104 - not relevant.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@HeartSaVioR
Copy link
Contributor Author

FYI, #25811 is submitted to cover supporting snapshot/restore KVStore.

@SparkQA
Copy link

SparkQA commented Sep 17, 2019

Test build #110730 has finished for PR 25670 at commit 9b4d53d.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Sep 17, 2019

Test build #110733 has finished for PR 25670 at commit 9b4d53d.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 15, 2019

Test build #112065 has finished for PR 25670 at commit 540052d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

writer.foreach { w =>
val currentLen = countingOutputStream.get.getBytesWritten
if (currentLen + eventJson.length > eventFileMaxLength) {
rollEventLogFile()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an edge case where a single event may be larger than the configured limit. (currentLen would be 0 in that case.) That's bad configuration, but in that case you should probably log something and not roll the file. Or maybe add a lower limit to the configuration so that the app doesn't even start or something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice finding! Maybe having lower limit to 10 MiB seems to be OK - I wouldn't imagine the size of a single event is bigger than 10 MiB. Will make a change.

@SparkQA
Copy link

SparkQA commented Oct 16, 2019

Test build #112134 has finished for PR 25670 at commit 2ff349b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a small nit.

.doc("The max size of event log file to be rolled over.")
.bytesConf(ByteUnit.BYTE)
.checkValue(_ >= (1024 * 1024 * 10), "Max file size of event log should be configured to" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ByteUnit.MiB.toBytes(10)

@vanzin
Copy link
Contributor

vanzin commented Oct 16, 2019

Looks ok, will merge tomorrow if no one else comments.

@SparkQA
Copy link

SparkQA commented Oct 16, 2019

Test build #112190 has finished for PR 25670 at commit a2f631d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Oct 17, 2019

Merging to master.

@vanzin vanzin closed this in 100fc58 Oct 17, 2019
@HeartSaVioR
Copy link
Contributor Author

Thanks all for reviewing and merging!

@HeartSaVioR HeartSaVioR deleted the SPARK-28869 branch November 4, 2019 15:05
@gaborgsomogyi
Copy link
Contributor

@HeartSaVioR just gone through this PR and plan to join later developments. You can ping me if this feature goes forward. One question what I have now. Do I see correctly that the actual implementation measures the events size before compression? If yes maybe my suggestion can be considered.

Namely I can see 2 possibilities to overcame this but both cases the basic idea is the same. Dstream variable can be wrapped with CountingOutputStream which would measure file size after compression.

  1. If we can NOT treat spark.eventLog.rolling.maxFileSize as soft threshold we can listen at this point (with custom CountingOutputStream) for writes and initiate rolling there.
  2. If we can treat spark.eventLog.rolling.maxFileSize as soft threshold we can just use dstream.getBytesWritten() in the actual condition.

I've tested the second approach with lz4, lzf, snappy, zstd and only lz4 didn't flush the buffer immediately. Of course this doesn't mean 2nd approach is advised, just wanted to give more info...


private[spark] val EVENT_LOG_ROLLING_MAX_FILE_SIZE =
ConfigBuilder("spark.eventLog.rolling.maxFileSize")
.doc("The max size of event log file to be rolled over.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry leaving a comment late like this but it should have been better to say this configuration is only effective when spark.eventLog.rolling.enabled is enabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there're counter examples in Spark configurations which rely on the fact once there's a configuration .enabled, others are effective only when that is enabled.

Even we only check from the SHS configuration, spark.history.fs.cleaner.*, spark.history.kerberos.*, spark.history.ui.acls.* fall into the case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I tend to disagree with omitting such dependent configurations in their documentations. Can we add and link related configurations in the documentations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. Looks like we'll have to agree to disagree then. No one has privilege to make someone do the work under his/her authorship which he/she disagrees with - it will end up putting wrong authorship on commit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HeartSaVioR, it had to be reviewed. I just happened to review and leave some comments late. Logically if that's not documented, how do users know what configuration is effective when? At least I had to read the codes to confirm.

Also, I am trying to make sure we're on the same page so I wouldn't happen to leave this comment again since you are a regular contributor. I don't think this is a good pattern to don't document the relationship between configurations. I am going to send an email to the dev list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon
Thanks for initiating the thread in dev mailing list. I'm following up the thread and will be back once we get some sort of consensus.

HyukjinKwon pushed a commit that referenced this pull request Feb 17, 2020
… for rolling event log

### What changes were proposed in this pull request?

This patch addresses the post-hoc review comment linked here - #25670 (comment)

### Why are the changes needed?

We would like to explicitly document the direct relationship before we finish up structuring of configurations.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

N/A

Closes #27576 from HeartSaVioR/SPARK-28869-FOLLOWUP-doc.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
HyukjinKwon pushed a commit that referenced this pull request Feb 17, 2020
… for rolling event log

### What changes were proposed in this pull request?

This patch addresses the post-hoc review comment linked here - #25670 (comment)

### Why are the changes needed?

We would like to explicitly document the direct relationship before we finish up structuring of configurations.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

N/A

Closes #27576 from HeartSaVioR/SPARK-28869-FOLLOWUP-doc.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
(cherry picked from commit 446b2d2)
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
sjincho pushed a commit to sjincho/spark that referenced this pull request Apr 15, 2020
… for rolling event log

### What changes were proposed in this pull request?

This patch addresses the post-hoc review comment linked here - apache#25670 (comment)

### Why are the changes needed?

We would like to explicitly document the direct relationship before we finish up structuring of configurations.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

N/A

Closes apache#27576 from HeartSaVioR/SPARK-28869-FOLLOWUP-doc.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
PavithraRamachandran pushed a commit to PavithraRamachandran/spark that referenced this pull request Jun 7, 2022
This patch is a part of [SPARK-28594](https://issues.apache.org/jira/browse/SPARK-28594) and design doc for SPARK-28594 is linked here: https://docs.google.com/document/d/12bdCC4nA58uveRxpeo8k7kGOI2NRTXmXyBOweSi4YcY/edit?usp=sharing

This patch proposes adding new feature to event logging, rolling event log files via configured file size.

Previously event logging is done with single file and related codebase (`EventLoggingListener`/`FsHistoryProvider`) is tightly coupled with it. This patch adds layer on both reader (`EventLogFileReader`) and writer (`EventLogFileWriter`) to decouple implementation details between "handling events" and "how to read/write events from/to file".

This patch adds two properties, `spark.eventLog.rollLog` and `spark.eventLog.rollLog.maxFileSize` which provides configurable behavior of rolling log. The feature is disabled by default, as we only expect huge event log for huge/long-running application. For other cases single event log file would be sufficient and still simpler.

This is a part of SPARK-28594 which addresses event log growing infinitely for long-running application.

This patch itself also provides some option for the situation where event log file gets huge and consume their storage. End users may give up replaying their events and want to delete the event log file, but given application is still running and writing the file, it's not safe to delete the file. End users will be able to delete some of old files after applying rolling over event log.

No, as the new feature is turned off by default.

Added unit tests, as well as basic manual tests.

Basic manual tests - ran SHS, ran structured streaming query with roll event log enabled, verified split files are generated as well as SHS can load these files, with handling app status as incomplete/complete.

Closes apache#25670 from HeartSaVioR/SPARK-28869.

Lead-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Co-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
dongjoon-hyun added a commit that referenced this pull request Nov 2, 2023
### What changes were proposed in this pull request?

This PR aims to enable `spark.eventLog.rolling.enabled` by default for Apache Spark 4.0.0.

### Why are the changes needed?

Since Apache Spark 3.0.0, we have been using event log rolling not only for **long-running jobs**, but also for **some failed jobs** to archive the partial event logs incrementally.
- #25670

### Does this PR introduce _any_ user-facing change?

- No because `spark.eventLog.enabled` is disabled by default.
- For the users with `spark.eventLog.enabled=true`, yes, `spark-events` directory will have different layouts. However, all 3.3+ `Spark History Server` can read both old and new event logs. I believe that the event log users are already using this configuration to avoid the loss of event logs for long-running jobs and some failed jobs.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43638 from dongjoon-hyun/SPARK-45771.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
szehon-ho pushed a commit to szehon-ho/spark that referenced this pull request Feb 7, 2024
### What changes were proposed in this pull request?

This PR aims to enable `spark.eventLog.rolling.enabled` by default for Apache Spark 4.0.0.

### Why are the changes needed?

Since Apache Spark 3.0.0, we have been using event log rolling not only for **long-running jobs**, but also for **some failed jobs** to archive the partial event logs incrementally.
- apache#25670

### Does this PR introduce _any_ user-facing change?

- No because `spark.eventLog.enabled` is disabled by default.
- For the users with `spark.eventLog.enabled=true`, yes, `spark-events` directory will have different layouts. However, all 3.3+ `Spark History Server` can read both old and new event logs. I believe that the event log users are already using this configuration to avoid the loss of event logs for long-running jobs and some failed jobs.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#43638 from dongjoon-hyun/SPARK-45771.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants