Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change RealtimePlumber to use SegmentAllocate API #4774

Closed

Conversation

kevinconaway
Copy link
Contributor

@kevinconaway kevinconaway commented Sep 10, 2017

@gianm per our conversation on the mailing list, I'm submitting a PR that changes the RealtimePlumber / RealtimeIndexTask to use the "SegmentAllocate" API. Some notes:

I refactored the "segment tracking" code out of AppenderatorDriver and in to a standalone class, SegmentTracker so that this can be reused.

I struggled with the best way to add this in to the Plumber API given that the RealtimePlumber is used by both the RealtimeIndexTask and the RealtimeManager but it is instantiated in different scenarios. I ended up passing the SegmentAllocator to PlumberSchool#findPlumber and passing the sequence name as an argument to Plumber#add. I'm open to suggestions on ways to improve this but this seemed the best way, to me.

In RealtimePlumber, I'm using the task ID as the sequence name. My understanding from reading the existing code, particularly the KafkaIndexTask, is that the sequence name should be unique for the duration of the task. Please correct me if my understanding is incorrect. I'm not so certain about how to best generate a sequence name for the RealtimeManager though. The test coverage here was pretty basic, and I don't have experience running realtime nodes, so I'm not sure if this is implemented properly.

In RealtimePlumber, I'm persisting the segment tracking metadata and the segments identifiers as JSON in files, similar to what is done to the identifier in AppenderatorDriver. Please let me know if this is improper. The other option would be to store them along with the index metadata but this didn't seem like the right place

Please let me know what feedback you have, thank you!

@kevinconaway
Copy link
Contributor Author

@gianm Should this behavior be on by default for all tasks or should it be configurable?

@kevinconaway
Copy link
Contributor Author

@gianm or @jihoonson do you have any thoughts on this one?

@gianm
Copy link
Contributor

gianm commented Sep 18, 2017

Sorry for not taking a look yet @kevinconaway, we're in the middle of getting master ready for an 0.11.0 branch cut and should be able to take a look after that.

Have you been running this patch in your own cluster?

@kevinconaway
Copy link
Contributor Author

We're currently testing it out, not on our full production workload yet.

@KenjiTakahashi
Copy link
Contributor

We seem to have developed a need for this as well. I'm gonna build it and test it locally, then possibly push to our staging instance for some real load tests. Just FYI :-).

@kevinconaway
Copy link
Contributor Author

@KenjiTakahashi this is currently up to date with the 0.11.0 branch.

I'm not going to sync it with master for the time being to keep it stable for our testing.

@KenjiTakahashi
Copy link
Contributor

@kevinconaway Thanks for the heads up, that should be fine with us. I've just tested it locally and it seems to be working as expected so far :-). We're gonna push it to try it out on staging probably tomorrow/early next week.

@KenjiTakahashi
Copy link
Contributor

We've been running it on staging for a couple of days and for some time it worked fine. But now tasks are consistently failing with

2017-10-06T13:39:35,888 WARN [task-runner-0-priority-0] io.druid.segment.realtime.SegmentTracker - Cannot allocate segment for timestamp[2017-09-05T00:00:00.000Z], sequenceName[index_realtime_logdb_data_0_2017-10-06T13:39:26.855Z_ohinnidg].
2017-10-06T13:39:35,910 ERROR [task-runner-0-priority-0] io.druid.indexing.common.task.RealtimeIndexTask - Exception aborted realtime processing[logdb_data]: {class=io.druid.indexing.common.task.RealtimeIndexTask, exceptionType=class java.lang.NullPointerException, exceptionMessage=null}
java.lang.NullPointerException
        at java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936) ~[?:1.8.0_131]
        at io.druid.segment.realtime.plumber.RealtimePlumber.getSink(RealtimePlumber.java:255) ~[druid-server-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
        at io.druid.segment.realtime.plumber.RealtimePlumber.add(RealtimePlumber.java:238) ~[druid-server-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
        at io.druid.segment.realtime.plumber.Plumbers.addNextRow(Plumbers.java:77) ~[druid-server-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
        at io.druid.indexing.common.task.RealtimeIndexTask.run(RealtimeIndexTask.java:350) [druid-indexing-service-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
        at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:436) [druid-indexing-service-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
        at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:408) [druid-indexing-service-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_131]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_131]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_131]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]

@kevinconaway Any idea what might be causing this?

@kevinconaway
Copy link
Contributor Author

We've been running for ~2 weeks now and haven't experienced any issues after I fixed 5ef8fa5 but that was related to segments not being handed off.

There should be some more logging around why the segments can't be allocated. Do you want to create a gist with the entire task log and share it here?

@KenjiTakahashi
Copy link
Contributor

We do have that fix anyway, so it's not that. Here's the whole log: https://gist.github.com/KenjiTakahashi/602a3d29055a18855e3cb6e9ccc254ac

@KenjiTakahashi
Copy link
Contributor

KenjiTakahashi commented Oct 12, 2017

@kevinconaway I've found the culprit. It turned out that the failures were caused by some older data appearing, for segments which were first created with "stock" 0.10 Druid. And they had a different shardSpec config.
So a mistake on our side in the end, although it would be nice if Druid reported something more meaningful than NullPointerException :-).

@kevinconaway
Copy link
Contributor Author

although it would be nice if Druid reported something more meaningful than NullPointerException

For sure. That said, I do believe that this should fail fast and throw an exception rather than silently ignoring or rejecting the data. If Druid hadn't thrown an error, you might not have noticed the missing data until well later, making it difficult to detect what happened.

Thoughts?

@kevinconaway
Copy link
Contributor Author

Sorry for not taking a look yet @kevinconaway, we're in the middle of getting master ready for an 0.11.0 branch cut and should be able to take a look after that.

@gianm It looksl ike 0.11.0 is getting more stable. Do you have time to take a look at this PR now?

We've been running it for a few weeks on our production load without issue.

@KenjiTakahashi
Copy link
Contributor

@kevinconaway Sorry, I somehow missed your comment earlier. Yeah, failing fast is fine in this case, I think. It is ultimately a server misconfiguration and client alone cannot do much about it.

We've been now running this for a couple of days on two instances, no problems thus far.

@kevinconaway
Copy link
Contributor Author

@gianm is there a path to getting this reviewed and merged?

@gianm
Copy link
Contributor

gianm commented Nov 15, 2017

@kevinconaway, I'm sorry it took so long to look at this.

At any rate I have one main question first. I see you've worked on making the RealtimePlumber use a SegmentAllocator rather than come up with its own identifiers. As an alternate approach did you look at trying to make the RealtimeIndexTask use an AppenderatorDriver rather than a Plumber? The Appenderator system is a bit newer and I was hoping that it would eventually replace Plumbers. They should be just as capable. It should also reduce the need for refactoring anything outside of RealtimeIndexTask.

The main issue I was thinking of with that approach is that the on-disk directory structures aren't exactly the same, so that would affect the ability of tasks to restore. I think it should be ok if it's just one time, though. If we really felt strongly about preserving the ability to restore across the upgrade, then I guess the task could peek at its on-disk structure and migrate it if needed.

@kevinconaway
Copy link
Contributor Author

I see you've worked on making the RealtimePlumber use a SegmentAllocator rather than come up with its own identifiers.

Yep, the main reason here is to allow the RealtimeIndexTask to append data to existing intervals and allow for late arriving data, similar to how the kafka indexing service works

As an alternate approach did you look at trying to make the RealtimeIndexTask use an AppenderatorDriver rather than a Plumber? The Appenderator system is a bit newer and I was hoping that it would eventually replace Plumbers. They should be just as capable. It should also reduce the need for refactoring anything outside of RealtimeIndexTask.

I did look at that, specifically the AppenderatorPlumber. I opted not to use it because it wasn't really being used in any of the existing code paths and since I'm not as familiar with the code base, I thought it best to use the code thats been battle-tested, so to speak

The main issue I was thinking of with that approach is that the on-disk directory structures aren't exactly the same, so that would affect the ability of tasks to restore. I think it should be ok if it's just one time, though. If we really felt strongly about preserving the ability to restore across the upgrade, then I guess the task could peek at its on-disk structure and migrate it if needed.

Are you saying that the on-disk directory structures between the RealtimePlumber and the AppenderatorDriver are different?

I don't have a strong opinion on whats best here in terms of preserving that so I'll defer to you on that.

@gianm
Copy link
Contributor

gianm commented Nov 16, 2017

I did look at that, specifically the AppenderatorPlumber. I opted not to use it because it wasn't really being used in any of the existing code paths and since I'm not as familiar with the code base, I thought it best to use the code thats been battle-tested, so to speak

That is right, the AppenderatorPlumber is not used anywhere -- it was meant to be a bridge from the (old) Plumber interface to the (newer) Appenderator layer but it never actually got linked anywhere. Mostly because I thought it would make more sense to port over the callers (like RealtimeIndexTask) to directly use an AppenderatorDriver. Does that look like it would work for you?

If so, I think that'd be the cleanest thing -- it would lead to eventually just removing Plumbers completely.

Are you saying that the on-disk directory structures between the RealtimePlumber and the AppenderatorDriver are different?

That is what I'm saying. They are similar but slightly different, enough that they aren't compatible. You can see it if you run both code paths side by side, or if you look at the differences in bootstrapping code in the respective startJob methods.

I don't have a strong opinion on whats best here in terms of preserving that so I'll defer to you on that.

If the task could do a migration of the older structure to the newer structure when starting up, I think that's best. It would just run one time but it would make people's lives easier when upgrading.

@kevinconaway
Copy link
Contributor Author

Mostly because I thought it would make more sense to port over the callers (like RealtimeIndexTask) to directly use an AppenderatorDriver. Does that look like it would work for you?

I think so. It would be less code to refactor as it seems like the AppenderatorDriver implementation keeps changing.

I'll work on this in the coming weeks.

Does that timeline work?

@gianm
Copy link
Contributor

gianm commented Nov 27, 2017

I think so. It would be less code to refactor as it seems like the AppenderatorDriver implementation keeps changing.

Yeah, that is definitely an advantage. The internals are still evolving but the interface should be relatively more stable.

I'll work on this in the coming weeks.

Does that timeline work?

Sure, that sounds good to me!

kevinconaway pushed a commit to kevinconaway/druid that referenced this pull request Jan 15, 2018
kevinconaway pushed a commit to kevinconaway/druid that referenced this pull request Jan 15, 2018
@kevinconaway
Copy link
Contributor Author

@gianm Per our conversation, I've gone ahead and opened a separate PR for updating the RealtimeIndexTask to use the AppenderatorDriver: #5261

jihoonson pushed a commit that referenced this pull request Feb 6, 2018
* Change RealtimeIndexTask to use AppenderatorDriver instead of RealtimePlumber.  Related to #4774

* Remove unused throwableDuringPublishing

* Fix usage of forbidden API

* Update realtime index IT to account for not skipping older data any more

* Separate out waiting on publish futures and handoff futures to avoid a race condition where the handoff timeout expires before the segment is published

* #5261 Add separate AppenderatorDriverRealtimeIndexTask and revert changes to RealtimeIndexTask

* #5261 Add separate AppenderatorDriverRealtimeIndexTask and revert changes to RealtimeIndexTask

* #5261 Readability improvements in AppenderatorDriverRealtimeIndexTask.  Combine publish and handoff futures in to single future

* #5261 Add separate tuningConfig for RealtimeAppenderatorIndexTask.  Revert changes to RealtimeTuningConfig

* #5261 Change JSON type to realtime_appenderator to keep the same naming pattern as RealtimeIndexTask
@kevinconaway
Copy link
Contributor Author

Closing this out now that #5261 has been merged.

@KenjiTakahashi FYI

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants