Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][CORE][SPARK-28867] InMemoryStore checkpoint to speed up replay log file in HistoryServer #25577

Closed
wants to merge 7 commits into from

Conversation

Ngone51
Copy link
Member

@Ngone51 Ngone51 commented Aug 25, 2019

What changes were proposed in this pull request?

This PR aims to improve the replay performance in HistoryServer by periodically checkpoint InMemoryStore in an in-completed application and achieve incremental replay.The main idea
is, for an in-completed application, we periodically (normally every N events num) checkpoint InMemoryStore with processed events num(X) into event log dir. And in HistoryServer, it reconstructs InMemoryStore from checkpoint file and gets X. Then, we could skip X events while replaying the log file basing on the partial InMemoryStore. Note that we should also recover those live entities from the the partial InMemoryStore in AppStatusListener to perform incremental replay. For a completed application, HistoryServer could just reconstructs InMemoryStore and no need to do replay.

And in this PR, we only focus on handling InMemoryStore in HistoryServer, while LevelDB is planed to be handled in similar way in following PR.

Basic experiment on a completed application of 20055 events shows the improvement of this optimization:

without optimization with optimization(including deserialization time)
4343 78(70)
4512 92(85)
4475 74(68)
4254 93(78)
4126 81(71)

Work TODO

  • compression support when checkpoint InMemoryStore
  • More accurate conversion from wrapper data to live entity
  • checkpoint file cleaning in HistoryServer
  • overcome frequently StackOverError in deserialization
  • SparkListenerStageExecutorMetrics synchronization between EventLoggingListener and AppStatusListener when log stage executor metrics is enabled
  • unit tests

Why are the changes needed?

Change is needed because HistoryServer now could be very slow to replay a large log file at the first time and it always re-replay an in-progress log file after it changes which leads to low efficiency.

Does this PR introduce any user-facing change?

Yes, if user wants to use this optimization by several new configurations.

How was this patch tested?

Only tested manually yet, still work in process.

@SparkQA
Copy link

SparkQA commented Aug 25, 2019

Test build #109707 has finished for PR 25577 at commit 62df913.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class ArrayWrappers implements Serializable
  • public class InMemoryStore implements KVStore, Serializable
  • public class KVTypeInfo implements Serializable
  • case class InMemoryStoreSnapshot(

@Ngone51
Copy link
Member Author

Ngone51 commented Aug 26, 2019

@@ -345,6 +345,7 @@ private[spark] class EventLoggingListener(
}

private[spark] object EventLoggingListener extends Logging {
val CKP = ".ckp"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is CKP short for "checkpoint"? How about using the full name as "checkpoint"

private[spark] object Status {

val IMS_CHECKPOINT_ENABLED =
ConfigBuilder("spark.appStateStore.ims.checkpoint.enabled")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is ims short for InMemoryStore? How about just spark.appStateStore.checkpoint.enabled


def eventInc(finish: Boolean = false): Unit = {
processedEventsNum += 1
val shouldCheckpoint = !finished && (processedEventsNum - lastRecordEventsNum >=
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finish || processedEventsNum - lastRecordEventsNum >= batchSize

if (shouldCheckpoint) {
// flush to make sure that all processed events' related data have write into InMemoryStore
listener.flush(listener.update(_, System.nanoTime()))
latch = new CountDownLatch(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it thread-safe to create a new CountDownLatch here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since all events are processed in the single dispatchThread, so we'll only have that one thread to create new CountDownLatch. But I just realize that variable latch maybe override if the number of backlog events exceeds checkpoint batch size while last checkpoint is still in progress. So, I update the shouldCheckpoint condition to isDone && (finish || processedEventsNum - lastRecordEventsNum >= batchSize) to cover that corner case.

@gengliangwang
Copy link
Member

gengliangwang commented Aug 27, 2019

Also cc @zsxwing @vanzin

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Aug 27, 2019

Hi @Ngone51 , I came across this PR as I've been working on similar things, SPARK-28594.

While the goals of both issues seem orthogonal (as you want to speed up current event log whereas I'm proposing another version of event log to support rolling), the basic approach looks similar, snapshotting ("checkpoint" in here). I've experimented various approaches of snapshotting in POC (HeartSaVioR@d2f4c89) and came up with the snapshot approach which is cross-compatible with any KVStore implementations with only small modification of KVStore interface.

Could you please go through the design doc, especially how to snapshot the KVStore instance and the format of the snapshot file? The design doc is in reviewing, but given the piece is common thing across both goals, I'll work on snapshot part first if you think it looks good.

It would be also great if you could provide design doc and the plan for this issue so that we can evaluate this as well.

Thanks in advance!

@gengliangwang
Copy link
Member

gengliangwang commented Aug 27, 2019

@HeartSaVioR +1 with rolling event logs.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Aug 27, 2019

As I described in design doc as well as above comment, existing event log and rolled event log will co-exist and they'll not be compatible with each other. So this issue is still valid if this addresses existing event log, though I guess similar approach can be applied (snapshotting N files vs 1 file with N lines).

Rolling event log is not a replacement of default event log as there would be many 3rd party of analyze tools which depend on current event log.

@SparkQA
Copy link

SparkQA commented Aug 27, 2019

Test build #109810 has finished for PR 25577 at commit a08c3e9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Ngone51
Copy link
Member Author

Ngone51 commented Aug 27, 2019

Hi @HeartSaVioR , thanks for sharing your work here. I've gone through your design doc of some main parts and it's quite inspiring, especially for the part of leveraging KVStoreScalaSerializer to dump objects. While in this PR, using JavaSerialization is quite simple and brute and has already revealed the disadvantages(StackOverError) while deserializing deep nested objects. And I can see snapshot part could be used in both issues.

One thing I want to point out is that we still the piece of "recover live entities" in AppStatusListener to make incremental replay work. I've elaborated details in the doc, please check it. Thanks.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Aug 28, 2019

Thanks for pointing out about "recover live entities". I've commented in your comment in the doc, but elaborate again to get some guides about what we should do from experts for event logging part.

I thought live* fields are working as "cache" (code comment made me a bit confused). If it's designed as a "cache" instead of playing as truth of source, it should also try to get from KVStore if the entity is not in cache, and fill the cache if found. Looks like it's not: AppStatusListener doesn't seek KVStore if it can't find the entity from live* fields.

Some part of SQLAppStatusListener deals with that, though not entire parts deal with.

val exec = Option(liveExecutions.get(executionId))
.orElse {
try {
// Should not overwrite the kvstore with new entry, if it already has the SQLExecution
// data corresponding to the execId.
val sqlStoreData = kvstore.read(classOf[SQLExecutionUIData], executionId)
val executionData = new LiveExecutionData(executionId)
executionData.description = sqlStoreData.description
executionData.details = sqlStoreData.details
executionData.physicalPlanDescription = sqlStoreData.physicalPlanDescription
executionData.metrics = sqlStoreData.metrics
executionData.submissionTime = sqlStoreData.submissionTime
executionData.completionTime = sqlStoreData.completionTime
executionData.jobs = sqlStoreData.jobs
executionData.stages = sqlStoreData.stages
executionData.metricsValues = sqlStoreData.metricValues
executionData.endEvents = sqlStoreData.jobs.size + 1
liveExecutions.put(executionId, executionData)
Some(executionData)
} catch {
case _: NoSuchElementException => None
}
}.getOrElse(getOrCreateExecution(executionId))

It seems that AppStatusListener always assumes KVStore is empty and we're replaying all events. As we're breaking the assumption, at least as of now I'd say it's a bug and we should fix the bug, regardless of how to snapshot - once we load KVStore and provide to AppStatusListener we should fix.

"initLiveEntities" in your PR seems to deal with this, so hopefully you can create smaller PR to fix the bug first. (You could test the functionality without snapshotting, as once you know the type of entities you could retrieve entities from KVStore.)

One thing I would like to modify is, I don't think we should initialize only for certain condition. We should just initialize all the time for startup of AppStatusListener. It should be fast enough if KVStore is empty, and it must be done regardless of flags if KVStore is not empty - correctness matters, speed should matter less. SQLAppStatusListener may also need to be fixed as well.

Once liveEntities can be restored from KVStore, it would also work for incremental replaying, as well as incremental snapshotting (replaying some more events and snapshotting again). live flag makes thing a bit complicated due to "maybeUpdate" and "liveUpdate", but AppStatusListener guarantees to flush these live entities to KVStore eventually, at least when ElementTrackingStore is closed. I guess it would be pretty much helpful if ElementTrackingStore provides "flush" to force flush without closing it.

@vanzin
As you're expert on this area, could you go through this and provide guidance on the approach?

Feels like we could deal with below steps:

  1. address live entities initialization in AppStatusListener/SQLAppStatusListener with non-empty KVStore (based on the part of this PR, maybe @Ngone51 would be better to deal with this)
  2. address snapshotting of KVStore
  3. based on 1 and 2, address both issues concurrently, SPARK-28867 (@Ngone51), SPARK-28594 (me)

The approach would be much better than the origin plan of SPARK-28594 when you prefer smaller PR while reviewing, as SPARK-28594 will be split to 3 sub-issues instead of 2.

Let me know it makes sense to you. Thanks in advance!

@vanzin
Copy link
Contributor

vanzin commented Aug 28, 2019

Sorry guys but I don't have unlimited bandwidth, and I have to be picky about the stuff I spend my time on. Right now I don't have time to go into details about yet another approach to handle this, so you'll have to reach a consensus among yourselves.

What I'll say is that snapshotting live entities is a bad idea, and that the snapshot and the source of the events should be in agreement about what data lives in the snapshot and how new events are applied to it afterwards (and I have no idea about whether that comment applies to this approach).

AppStatusListener has code to "flush" live entities to the kvstore, so you should never need to snapshot any live data.

@HeartSaVioR
Copy link
Contributor

Thanks for providing valuable inputs, and sorry to call you out for different topics (as you're already spending efforts on reviewing #22138 for me). I feel high likely you would be the one reviewing event logging stuff and sign-off, so I would like to make sure what I propose also makes sense to you before start working (especially I'm proposing the plan to another contributor in community), but I also completely understand you're pretty busy handling multiple things.

Still, I'd feel safer if someone being expert on this area reviews proposed plan and approach, so let me find other committers expected to have knowledge on this topic. If I couldn't get some help in couple of days (maybe this week), I'll just go ahead and try to deal with my plan with @Ngone51 .

@gengliangwang and @squito , could you help me reviewing the proposed plan?

@Ngone51
Copy link
Member Author

Ngone51 commented Aug 29, 2019

AppStatusListener has code to "flush" live entities to the kvstore, so you should never need to snapshot any live data.

That's true, but KVStore still lack some necessary info to recover live entities due to live entities don't always write all fields out. e.g. LiveStage don't write field savedTasks out, while that field is used to indicate whether we should cleanup too old tasks according to the configured task retain threshold in AppStatusListener. Of course, we could update to write that field out, but that would break api.v1.StageData(Not sure what standard we have on these api data, maybe @vanzin has any suggestion ?) and introduce unnecessary filed in UI related data.

Also, LiveStage doesn't write completedIndices, activeTasksPerExecutor, blackListedExecutors(This seems like a defect ? SHS yet can't show blacklist info about executors, though we have it int Live UI.).

Other entities have similar problems. For some fields we could recover them in indirect way and some fields are not necessary needed. But, fields like savedTasks are still necessary.

I think if we could write all those fields out whatever, then "recover live entities" can't be a big problem.

@Ngone51
Copy link
Member Author

Ngone51 commented Aug 29, 2019

We should just initialize all the time for startup of AppStatusListener.

At least for now, KVStore is always empty when AppStatusListener startup, right ? Though, it would be fast to init entities from empty KVStore, but the logic itself doesn't make sense to me. I mean, what do we expect to get from the KVStore since we've already know it's empty ?

The roadmap looks good to me. But some details about step 3 may still need full discussion. e.g.
Do we only need snapshot for completed apps in SHS since snapshot for in-completed apps could be generated by live AppStatusListener ?

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Aug 29, 2019

At least for now, KVStore is always empty when AppStatusListener startup, right ?
I mean, what do we expect to get from the KVStore since we've already know it's empty ?

In my perspective the assumption shouldn't be there (there's even no assertion), or at least documented. We do neither. (Let's at least document it if we don't want to address it soon.)

We're planning to break the assumption soon so I'm not sure it gives a value to continue relying on current assumption. You may be able to just rely on current for now, and I have to just modify it afterwards. Just a matter of who will deal with that. If you're thinking about having flag to provide hint to AppStatusListener whether KVStore is empty or not, please, no more flag, unless we figure out initialization with empty KVStore is considerably slow. (couple of "seconds" for example.) Let's not doing premature optimization.

Do we only need snapshot for completed apps in SHS since snapshot for in-completed apps could be generated by live AppStatusListener ?

Assuming you're referring single huge application event log, you may be right if there's no failure at all, but suppose any failure in SHS or even just a restart due to maintenance, without snapshotting in-completed apps we'll lose live AppStatusListener and end up with replaying all again. I guess that's not what we expect from incremental reload, right?

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Aug 29, 2019

That's true, but KVStore still lack some necessary info to recover live entities due to live entities don't always write all fields out.

Good point, thanks for pointing out. Once we want to reconstruct AppStatusListener via restoring KVStore, there should not be any difference between live entities and stored entities. If there's any difference, it should be fixed. (otherwise it's just no sense to snapshot KVStore - we should snapshot AppStatusListener with entities in KVStore as well) Ideally AppStatusListener should not be stateful except entities in underlying KVStore. We could leverage some caching in performance perspective, but source of truth should be KVStore, not AppStatusListener. (same applies for SQLAppStatusListener)

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Aug 29, 2019

I'd like to ask you to come up with design doc for SPARK-28867 with details, as it would be pretty much helpful to cross-review the design and be consistent with the approach when addressing similar thing. My design doc for SPARK-28594 only covers the goal of itself, and given common interest is how to snapshot (checkpoint), I'd like to see the design doc for details on remaining parts.

@HeartSaVioR
Copy link
Contributor

@Ngone51
Bump. I've done a first part of my work (#25670) and plan to do the next part of my work - snapshotting. That requires dealing with restore/store live entities so would like to see your availability of dealing with this.
Please feel free to let me know if you don't have time to do for now - I'll just go ahead and you could leverage the output from here. Thanks in advance!

@HeartSaVioR
Copy link
Contributor

I just went through snapshotting/restoring KVStore and submitted a patch. As @Ngone51 stated previously, all instances writing to the KVStore should synchronize its state with KVStore to not miss anything. As there's no response for couple of weeks I'll also try to deal with this issue.

@Ngone51
Copy link
Member Author

Ngone51 commented Sep 17, 2019

Hey @HeartSaVioR , sorry for the late reply. I'd like to try recover live entities part to support our following work in recent days. I'd give an initial version as soon as possible.

And sorry I can't provide a detail design doc to you. As I think the idea in this PR is quite simple and I've already elaborated it in PR desc. In my way, it does not have much careful design on snapshot, but simply using Java serialization.

And I just realize a big difference between our two issues from high level, that is, in my issue, I aways need to accurately track the proceeded events number in AppStatusListener why you needn't. That's because, in my issue, in SHS, events comes from AppStatusListener(snapshot/checkpoint of InMemoryStore) and EventLoggingListener(event log file) while in your issue events only comes from EventLoggingListener(snapshot also generated from event log file). However, tracking the proceeded events number accurately in AppStatusListener is difficult, at least, under current framework. For example, SparkListenerStageExecutorMetrics only generated in EventLoggingListener and log to file, but never shows in a living AppStatusListener. And this would result in inconsistent status between AppStatusListener and EventLoggingListener.

Anyway, I'd try to implement live entities recovering part firstly.

@Ngone51
Copy link
Member Author

Ngone51 commented Sep 18, 2019

For snapshotting in driver, I also considered both sides (driver/SHS) in design phase but took SHS because listener is the thing which must not be blocked by long operation.

Well, in SPARK-28867, I don't think there's a choice of creating snapshot on driver or SHS. As I mentioned above, there can be a case where SHS not running before an application finished. So, I think it can only create snapshot on driver.

Either you'll end up with cloning InMemoryStore and snapshotting with cloned one (requires more memory on driver, but unless deep copy is required, then maybe applicable), or block listener to process next event until snapshot has been taken. You can't take snapshot asynchronously with live AppStatusListener.

Yes, you're right. And in this PR, we don't have a copy for InMemoryStore and we just block(but with optimizations) during snapshotting. I also realized that listener couldn't take too much time while processing events. So, in this PR, we use a separate thread to do snapshot and we don't block for frequent events(e.g. SparkListenerTaskStart, SparkListenerTaskEnd). Snapshotting is more likely to be triggered within frequent incoming events, and this would make blocking less happening.

ideally you're ensuring your design is already reviewed internally in your team, but that's optional.

Actually, I've reached an agreement with @gengliangwang before I made this PR.

I can see the major benefits from taking snapshot from driver, so if you can prove it doesn't bring considerable latency in listener side and no memory pressure, it would be better approach.

As you know, this's still a work in process and I only wrote a prototype yet in order to validate the thoughts. And there still be some work to be done before this PR gets perfectly work. The one is that correctly handling different behavior between AppStatusListener and EventLoggingListener, which I mentioned above. Maybe, we could resolve it with more careful control at code level, but I'm afraid the whole idea is not good enough and codes will result in buggy finally. All in all, this still couldn't be a competed implementation yet. So, for me, it still far from testing of latency and memory.

@zsxwing
Copy link
Member

zsxwing commented Sep 18, 2019

I'm +1 on taking snapshot in driver rather than SHS. One of the issues I hit in the past is that it cannot render UI for a long-running spark application because replaying events takes too long. For example, if you have a streaming query running 7 days, the event logs will be huge and it may take SHS several days to replay events. If we can take snapshot in driver, the number of events need to replay in SHS will be small.

You can't take snapshot asynchronously with live AppStatusListener.

I think we can take snapshot of InMemoryStore asynchronously. For example, we can have two maps in InMemoryStore. Firstly, we write to one map. When flushing out, we freeze the current map and new updates go to the other one. We can write out the frozen map asynchronously and any query going to InMemoryStore can just check both two maps. Then flushing them out, we add all the items in backup map to the frozen map and re-activate it. The number of items to copy here should be small.

@zsxwing
Copy link
Member

zsxwing commented Sep 18, 2019

In addition, SHS needs to support multiple applications. If applications can generate snapshots in driver, it will reduce the load of SHS a lot.

@HeartSaVioR
Copy link
Contributor

One of the issues I hit in the past is that it cannot render UI for a long-running spark application because replaying events takes too long. For example, if you have a streaming query running 7 days, the event logs will be huge and it may take SHS several days to replay events. If we can take snapshot in driver, the number of events need to replay in SHS will be small.

I guess SPARK-28594 would be more preferred approach for streaming query in the end. I agree with you for provided issue, but retaining huge single event log file itself is also challenging.

Unlike SPARK-28867 where Spark just needs to count on events in AppStatusListener (assuming events are in order between EventLoggingListener and AppStatusListener), SPARK-28594 should deal with snapshot from EventLoggingListener which don't know about AppStatusListener so it would be harder to take a snapshot from driver. (sync up twos) That's advantage of SPARK-28867, but it still doesn't deal with major issue.

For example, we can have two maps in InMemoryStore. Firstly, we write to one map. When flushing out, we freeze the current map and new updates go to the other one. We can write out the frozen map asynchronously and any query going to InMemoryStore can just check both two maps. Then flushing them out, we add all the items in backup map to the frozen map and re-activate it. The number of items to copy here should be small.

This seems to assume there're only "appends", but the reality is that there're also "updates". This will require special care of updating existing object and it needs to choose one of 1) simply cloning all events 2) copying map and cloning object whenever it is updated 3) let update be synchronous.

@zsxwing
Copy link
Member

zsxwing commented Sep 18, 2019

I guess SPARK-28594 would be more preferred approach for streaming query in the end. I agree with you for provided issue, but retaining huge single event log file itself is also challenging.

I think that's a separate issue since It doesn't change how long it takes to replay the logs. It will have multiple event log files but the number of events need to be replayed is not changed.

@zsxwing
Copy link
Member

zsxwing commented Sep 18, 2019

This seems to assume there're only "appends", but the reality is that there're also "updates". This will require special care of updating existing object and it needs to choose one of 1) simply cloning all events 2) copying map and cloning object whenever it is updated 3) let update be synchronous.

The trick part we need to deal with is deleting objects. But I think that's doable. We can have a delete flag in the backup map, and delete the objects when merging two maps. I think the major latency comes from the lock when we copying the items from the second map to the first one. But if flushing snapshot is not very slow, we won't accumulate lots of objects in the second map, then the number of objects to copy should be small and the latency should be acceptable. IIRC, AppStatusListener has some codes to avoid flushing items to InMemoryStore too frequently. This is also helpful here.

@zsxwing
Copy link
Member

zsxwing commented Sep 18, 2019

By the way, I'm totally +1 on SPARK-28594. But I think without doing snapshots in driver, SPARK-28594 cannot really solve the problem.

@HeartSaVioR
Copy link
Contributor

I think that's a separate issue since It doesn't change how long it takes to replay the logs.

Agreed. So ideally thinking it would be ideal if we can do two things altogether 1) roll the event log files 2) snapshotting events in driver side, but without bringing some complexity we would have to let end users pick either one. There might be still the ways to do both but I couldn't think of one which doesn't bring latency issue, additional (considerable amount of) CPU/memory consumption.

@zsxwing
Copy link
Member

zsxwing commented Sep 18, 2019

additional (considerable amount of) CPU/memory consumption.

Additional memory consumption should be small since the part to copy is not a lot. And the additional CPU consumption is acceptable since I think most of heavy work should be done in executors. I don't expect that the driver needs to do heavy CPU work. Otherwise, the cluster resource will be just wasted because the cluster is just idle to wait for driver to do something.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Sep 18, 2019

I guess we are talking based on different things, what I referred is SPARK-28594, whereas you seem to refer your suggestion. I agree your suggestion doesn't bring considerable consumption of memory (and maybe also CPU). I'm just trying not to let both approaches be diverged (approach A for single event log, approach B for rolling event log).

My point for your suggestion was that when there's an update on object, even freezing the map, content of elements in map could be changed which would be out of sync with handled count of events for frozen map. It should be prevented.

Btw, I still feel this represents the needs of design doc for SPARK-28867. We're talking based on WIP and adding ideas here which is on top of uncertain thing. This kind of discussion is ideal to be happening on design phase.

@HeartSaVioR
Copy link
Contributor

Never mind about my point if you meant starting with fresh map instead of sharing objects. Handling two maps as single view would bring overhead (don't expect considerable one though) but it doesn't have issue I've pointed out.

@zsxwing
Copy link
Member

zsxwing commented Sep 18, 2019

Never mind about my point if you meant starting with fresh map instead of sharing objects. Handling two maps as single view would bring overhead (don't expect considerable one though) but it doesn't have issue I've pointed out.

Yep, that's what I meant.

@Ngone51
Copy link
Member Author

Ngone51 commented Sep 20, 2019

I think we can take snapshot of InMemoryStore asynchronously. For example, we can have two maps in InMemoryStore.

This really sounds like a good idea! @zsxwing

@Ngone51
Copy link
Member Author

Ngone51 commented Sep 20, 2019

I'm just trying not to let both approaches be diverged (approach A for single event log, approach B for rolling event log).

I remeber in SPARK-28594, EventLoggingListener would generates multiple log files in the format like:

[sequeneceId-1, fileSize-1] [sequeneceId-2, fileSize-2] ... [sequeneceId-n, fileSize-n]

And, measuring current writing file size could be a tricky part of rolling up log file.

What if we also use processed events number to roll up log file ? For example, we config to roll up log file per 1000 events. Then, we'll have multiple log files in the format like:

[sequeneceId-1, 1000] [sequeneceId-2, 1000] ... [sequeneceId-n, 123]

Then, we'll have two cases to integrate SPARK-28667 with SPARK-28594 while snapshotting on driver is enable:

  1. SHS replays event log files for a completed application

In this case, SHS would firstly load [driver-snapshot, X-events-num]. Then, locate the event log file by processed num X. For example, if X is 1500, then, SHS would start replay from file [sequeneceId-2, 1000] (rather than [sequeneceId-2, 1000] )and skip first 500 events in file-2 to replay. And the following behavior would follow SPARK-28594's rolling up mechanism which described in the design doc(e.g. snapshot in SHS).

  1. SHS replays event log files for an in-completed application

In this case, driver-snapshot would be continuously generated every interval(e.g. per 1000 events). And SHS could always firstly load the newest [driver-snapshot, X-events-num] and then replay the event log file and finally generates [SHS-snapshot, Y-events-num]. Next time, driver generates a newer [driver-snapshot, Z-events-num], SHS needs to decides which snapshot it should load depends on Y > Z or Y < Z. And then, repeat the replay steps. But if Z < Y, SHS needs to re-replay out-of-date event log file, which may already deleted. So, actually, I'd preffer not to snashot in SHS and always use driver-snapshot in this case.

WDYT ? @HeartSaVioR

Though, there may be a way to integrate SPARK-28667 with SPARK-28594, but I think it's fine for us to focus on SPARK-28594 currently. As methioned above, thery're separate issues indeed. So, I think you @HeartSaVioR don't need to get too much SPARK-28667 details into SPARK-28594. SPARK-28667 could introduce some adjustments into finished SPARK-28594 later to make they're compatible with each other.

This kind of discussion is ideal to be happening on design phase.

I think we'll have a new design to include @zsxwing 's idea about two maps in InMemoryStore,
the way to work with SPARK-28594 and the way to accurately record process events num, later. Personally, I don't have a good design for this issue initially, but these discussions make the design more and more better.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Sep 20, 2019

The one of main goals in SPARK-28594 is limiting the overall size of log directory per application. (End users have been concerned about it.) That means, we should provide a way to roll the event log file within deterministic size, which is not applicable to roll file per lines. In the following patch I'll introduce max number of files (max file size is introduced in #25670 ) and clean up old event files via replacing these old files with snapshot file - so it'll take a snapshot for different purpose, though it also helps faster reading.

Given two issues take a snapshot for different purposes, I'm kind of OK to go with different approaches and consolidate the approach later (assuming the snapshot file is compatible).

One thing I might be concerning about is, we only talk about the new approach for in-memory store which Spark hides the implementation of KVStore via wrapping it with ElementTrackingStore. The change should be reflected to KVStore API so that caller side would deal with the way of snapshotting properly. (Now we only add some necessary methods in KVStore to snapshot from outside, but if we have both sync/async snapshot for KVStore, that should be reflected to the KVStore API.)

To add some context on this, previously (in internal reviewing) I proposed snapshotting underlying LevelDB - archiving directory would just work - for LevelDB KVStore implementation and I was suggested to find a way to support snapshotting for all implementations of KVStore as we want to let users choose the implementation wisely (in SHS side). That's why current snapshot mechanism is based on KVStore interface. Once we respect the format of snapshot file, both sync/async snapshots would be compatible, but in same spirit, ideally we should support both approaches of snapshot smoothly, via KVStore interface level.

@Ngone51
Copy link
Member Author

Ngone51 commented Sep 21, 2019

That means, we should provide a way to roll the event log file within deterministic size, which is not applicable to roll file per lines.

All right. We may could attach both size and lines to the log file(a.g. [sequeneceId, size, evetsNum]), then, it does no harm to SPARK-28594.

consolidate the approach later (assuming the snapshot file is compatible).

I think SPARK-28594 and SPARK-28867 are both going to leverage #25811 to dump KVStore. So, the snapshot file should be compatible.

if we have both sync/async snapshot for KVStore, that should be reflected to the KVStore API.

As for "sync/async snapshot", do you mean the blocking way and "two maps" way for snapshotting of InMemoryStore ? If so, I'd say yes, we need KVStore level API to let user do snapshot, e.g. dump(), since InMemoryStore would have different behavior compares to LevelDB in dump(). But, I think whatever which way we have for InMemoryStore, the output snapshot file should always have the same format. And, very possibly, leverage #25811, too.

@HeartSaVioR
Copy link
Contributor

Sorry, but we need to remember that my overall design passed the review in both internal and community. I think I can go on and revisit this if you succeed to design something based on new idea and being reviewed. It's not even verified via POC, so I can't see any reason to adopt it right now.

If so, I'd say yes, we need KVStore level API to let user do snapshot, e.g. dump(), since InMemoryStore would have different behavior compares to LevelDB in dump().

The thing is, caller logic should be different based on the support of sync/async snapshot. We'll have dump()/dumpAsync() and supportAsyncDump() and let caller deal with this. If caller needs asynchronous dump to respect some SLA (like AppStatusListener in driver) caller should fail if KVStore implementation doesn't support async dump.

@Ngone51
Copy link
Member Author

Ngone51 commented Sep 22, 2019

It's not even verified via POC, so I can't see any reason to adopt it right now.

Oh, sorry. I was just discussing the possible integrate way in the following design and not ask you to make the change right now.

The thing is, caller logic should be different based on the support of sync/async snapshot. We'll have dump()/dumpAsync() and supportAsyncDump() and let caller deal with this.

Sorry, I don't know this part well. Why do we need to implement sync and async dump separately ? And what's factor should caller depends on to decide which way it should choose ? If we have both sync and async way to dump KVStore, why don't we just choose async way to reduce latency ?

I always think that KVStore should just implement dump() and hide the details from the caller, e.g. whether it's sync or async. And KVStore should always expose its interface to caller as a single view without considering it supports sync dump or async dump while using it.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Sep 23, 2019

Oh, sorry. I was just discussing the possible integrate way in the following design and not ask you to make the change right now.

Yeah, I see. I feel I was over-reacted. Sorry about that. Just wanted to not going back and spend another time to wait for new thing.

Sorry, I don't know this part well. Why do we need to implement sync and async dump separately ?

It's not always easy to implement async dump for all possible implementations of KVStore (though KVStore is private API so for now we only have InMemoryStore and LevelDB...). We only discussed an idea of async snapshot for InMemoryStore - LevelDB is not considered here. So for now even in happy case InMemoryStore will support asynchronous dump and LevelDB will not.

Suppose a case, LevelDB is provided to the AppStatusListener as underlying KVStore. (That wouldn't be true for live application but let's see it as interface level of view - as it receives ElementTrackingStore.) If you call dump() it should run as synchronous, and listener would be stuck until dump() has been finished and open the chance to drop events due to heavy latency. That's what we want to avoid and the reason to discuss asynchronous approach, right? For live application of AppStatusListener, underlying KVStore must support asynchronous dump. Otherwise caller should indicate the availability earlier (very first phase) and make a decision to just fail or give up dumping KVStore. So caller has its requirement for KVStore while dumping.

@squito
Copy link
Contributor

squito commented Oct 14, 2019

On taking the snapshot in the driver -- I have nothing against that, if we can get it to work, but I wouldn't ignore making this work in the SHS. (a) the driver is already managing a lot -- its a single point of failure for spark applications, it can easily get overwhelmed with other things, and all sorts of things will go wrong if we take too long to make that snapshot. (b) while I think @zsxwing has a good idea on how to make it work, note that its a lot more complicated than doing it in the SHS, as @HeartSaVioR has pointed already there are not the same concerns there.

Of course, the SHS also easily gets overwhelmed -- but there are other things we can do to improve that, without putting pressure on the driver. You could create multiple instances of the SHS (with a master serving the application listing, but sharding the individual app UIs among the slaves); you could generate the "pre-parsed" version in a standalone process which doesn't even serve a UI at all; you could enable faster re-parsing for the UI while still leveraging in-memory state, which is much simpler (SPARK-20656).

@Ngone51
Copy link
Member Author

Ngone51 commented Oct 15, 2019

@squito We don't ignore snapshot in the SHS. As you can see, @HeartSaVioR and I are currently working together on SPARK-28594. And snapshot in the SHS is a necessary part of SPARK-28594.

And, though snapshot in the driver can be more complicated than doing it in the SHS, we may still want to give a try. Because, IIUC, SPARK-28594 and SPARK-20656 are only useful for optimizing replaying in-complete event log files(though, SPARK-28594 has more objectives than this). But for a completed event log file, they don't reduce replay time at all (even with snapshot). Thus, snapshot in the driver actually indicates to optimize replaying the completed event log files. And this is actually what SPARK-28867 trying to do. Of course, snapshot in the driver would also optimize replaying for in-complete event log files as well.

Our plan now is to address SPARK-28594 firstly. After that, we'll move to SPARK-28867. As you know, there would be more complications to do it. So, it still requires new and detailed design and discussions in the community.

@squito
Copy link
Contributor

squito commented Oct 15, 2019

Hmm, now I'm very confused. I don't think anything needs to be done to speed up the replay of completed applications in the SHS. As long as you have the SHS configured to use local disk, after it parses the logs once, it'll just read the leveldb kvstore which will be very fast.

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L341-L343

Is your goal to avoid having the SHS even parse the file one time? Trying to do it from the driver seems so much more complicated than having another dedicated process which just parses the eventlog files, and produces the leveldb kvstore. If you really wanted to do that, I'd have the driver just write out the leveldb kvstore when the application terminates. You could update AppStatusStore and SQLAppStatusStore to dump the contents of kvstore to another.

SPARK-28594 could also make the first parse faster, depending on the exact implementation in the end -- because the "first" parse of the completed application will only need to read the end of the event logs, as there was already a lot of parsing done of the logs of the incomplete application to produce the snapshot and rolled logs.

minor: can I ask you to use the terms "in-progress" vs "completed" applications? There are a few times in the discussion when you say "in-complete" which don't really seem to refer to "in-progress", and I'm not sure if that's a typo or my misunderstanding etc. (eg. the pr description seems to mostly focus on in-progress, so I'm surprised you're saying this is primarily for complete applications.)

@Ngone51
Copy link
Member Author

Ngone51 commented Oct 16, 2019

As long as you have the SHS configured to use local disk, after it parses the logs once, it'll just read the leveldb kvstore which will be very fast.

Yeah, but this has a prerequisite that SHS should always be running with those in-progress applications. If SHS is not running(e.g. not started, crashed) while in-progress applications are running, we'll have completed event log files at the end. Then, a running SHS would need to parse those completed event log files from start to end.

But, to be honest, I feel this can be rare case as user should have a running SHS along with in-progress application in most time if he/she really wants to use SHS. But, whenever SHS shutdown or user directly provides some completed event log files from somewhere else, problem of slow replaying still exists.

And I agree that, with SPARK-28594, the first time parse (or say, replay) in SHS will be much better than current, as we'd always parse files from start to end whenever that in-progress files updated currently. Of course, we should let SPARK-28594 or start a new task to support optimizing single file later while it only supports multiple rolled files yet.

All in all, if we could ignore that rare case, SPARK-28867 wouldn't be really necessary after SPARK-28594 is done.

Is your goal to avoid having the SHS even parse the file one time?

If an application finished normally, then we could avoid parsing the file. If not(e.g. crash),
we'd need to do incremental parse basing on the snapshot before crash.

If you really wanted to do that, I'd have the driver just write out the leveldb kvstore when the application terminates.

Actually, this is the original plan when we try to do this. But as @gengliangwang points out that application may crash unexpectedly and LevelDB could be corrupt. So, we made current plan, which snapshot periodically and do incremental parse if application crashes.

the pr description seems to mostly focus on in-progress, so I'm surprised you're saying this is primarily for complete applications.

Sorry for the misleading. As mentioned above, if an application finished normally, then, we don't need to parse the file. So this path can be more simple. But if application crashes, then, we'd need to do incremental parse, which requires more works(e.g. recover live entities, decide from where to continue). So, I paid more effort on explaining how do we handle in-progress application.

@squito
Copy link
Contributor

squito commented Oct 17, 2019

If you really wanted to do that, I'd have the driver just write out the leveldb kvstore when the application terminates.

Actually, this is the original plan when we try to do this. But as @gengliangwang points out that application may crash unexpectedly and LevelDB could be corrupt.

yes, that is true. However, now we're considering quite a bit of additional complexity, to help only when the SHS isn't running (nor some separate "pre-processor" independent of the SHS which produces the leveldb store), and your application crashes. And even then, without this change, things would still work correctly, just not as efficiently.

I think with all that in mind, I'm leaning against a change like this, it doesn't seem worth the complexity.

@Ngone51
Copy link
Member Author

Ngone51 commented Oct 19, 2019

I think with all that in mind, I'm leaning against a change like this, it doesn't seem worth the complexity.

I think I'll agree with this point. As I mentioned above, I think SPARK-28594 will do optimization for most common use cases while SPARK-28867 may only cover for several rare cases but bringing more complexity.

Anyway, let's forget about SPARK-28867(snapshot in driver) before we coming up with a more viable idea. @squito @HeartSaVioR @vanzin

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jan 28, 2020
@github-actions github-actions bot closed this Jan 29, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
8 participants