Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Appenderators, DataSource metadata, KafkaIndexTask #2220

Merged
merged 6 commits into from
Mar 14, 2016

Conversation

gianm
Copy link
Contributor

@gianm gianm commented Jan 7, 2016

See also #1642. FYI we are running this in our cluster as of 2016-03-02, things seem to work so far…

Set of three related things (each one a separate commit):

kafka-indexing-service core extension
Includes KafkaIndexTask, which reads a specific offset range from specific partitions, and can use dataSource metadata transactions to guarantee exactly-once ingestion.

Each task has a finite lifecycle, so it is expected that a process will be supervising existing tasks and creating new ones when needed. @dclim is working on this process in https://github.com/dclim/druid/tree/kafka-supervisor.

This extension requires the other two features (DataSource metadata and Appenderators).

DataSource metadata
Geared towards supporting transactional inserts of new segments. This involves an interface DataSourceMetadata that allows combining of partially specified metadata (useful for partitioned ingestion). It also involves changes to the SegmentInsertAction to allow it to take a startMetadata and endMetadata for compare-and-swap.

DataSource metadata is stored in a new "dataSource" table.

Appenderators
Like Plumbers, but different. Appenderators are a way of getting more control over the ingestion process than a Plumber allows. They are less ambitious than Plumbers, but more flexible. In particular, they offer facilities to deal with:

  • Indexing and persisting data
  • Checkpointing and restoring ingestion state
  • Merging and pushing segments to deep storage
  • Serving queries

But they do not do any of these things:

  • Decide which rows to index and which to drop
  • Decide which segments to put your rows into
  • Decide when to push and publish segments
  • Monitor handoff

So you can think of Appenderators as a way of separating out the mechanical functionality of Plumbers from their decision-making processes. The idea is that existing Plumbers could be implemented using Appenderators, but you could also use them to implement workflows that the existing Plumbers can't support.

Discussion?
Some open questions (and reasons this is still marked Discuss),

  • Should RealtimePlumber be adapted to use an Appenderator? Pros of doing this are that the AppenderatorImpl actually has a lot of functionality and code overlap with RealtimePlumber (the query-runner stuff is particularly annoying since it is mostly similar but also kinda different). I think the main stumbling block is that the RealtimePlumber and the AppenderatorImpl have different persist directory layouts (mostly because the RealtimePlumber never has to have more than one shard per interval, but the AppenderatorImpl might) and so the code will need to be able to migrate existing data.
  • Should the other existing indexy things be adapted to use an Appenderator too? Like IndexTask and IndexGeneratorReducer.

BTW, I think if we do either of these things, they should be in future PRs rather than this one.

@gianm gianm changed the title Appenderators, DataSource ,etadata, KafkaIndexTask Appenderators, DataSource metadata, KafkaIndexTask Jan 7, 2016
@fjy
Copy link
Contributor

fjy commented Jan 7, 2016

I am so excited!

@gianm gianm added the Discuss label Jan 7, 2016
@gianm gianm changed the title Appenderators, DataSource metadata, KafkaIndexTask [wip] Appenderators, DataSource metadata, KafkaIndexTask Jan 7, 2016
@gianm
Copy link
Contributor Author

gianm commented Jan 7, 2016

[wip] because TODOs remain, but would really appreciate comments

@gianm gianm force-pushed the appenderator-kafka branch 3 times, most recently from f5e4f44 to 1fa64c3 Compare January 10, 2016 02:50
toolbox.getEmitter().emit(metricBuilder.build("segment/txn/success", 1));
} else {
toolbox.getEmitter().emit(metricBuilder.build("segment/txn/failure", 1));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i understand the usefulness of segment/txn/failure but not sure how segment/txn/success is going to be useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you want to confirm that txns are actually happening? I dunno? It seemed like if one existed then the other should too

@himanshug
Copy link
Contributor

did a scan, looks like a step in right direction, waiting to see the impl for SegmentAllocator.
i'm not sure about the name kafka-indexing-service which gave me the notion that this was some kind of "indexing service", which rings me overlord bcoz of the renaming history

also, i believe, we should set the "scope" for this PR to at least result in a working (even if experimental) version of no-window period realtime ingestion with tranquility.

@gianm
Copy link
Contributor Author

gianm commented Jan 11, 2016

@himanshug SegmentAllocator has an implementation in KafkaIndexTask.

Do you have a suggestion for a better name for kafka-indexing-service? I just called it that because it has Kafka stuff for the indexing service.

Hmm IMO this PR should be scoped to be the base for tranquility working and for kafka working. It actually doesn't fully achieve either one but it was getting big enough that I thought it made sense to cut it off at this point and do the rest in follow on PRs.

I think the follow on PRs would be:

  1. One supporting Kafka that introduces a supervisor that actually spawns a series of KafkaIndexTasks
  2. One supporting Tranquility that introduces a push-oriented task based on the FiniteAppenderatorDriver, which Tranquility can push to
  3. A Tranquility PR (in the Tranquility repo) that coordinates that new task in (2)

@himanshug
Copy link
Contributor

we can rename kafka-indexing-service to just kafka-indexing or may be I'm overthinking?

regarding scope, I would move the kafka module in a separate PR and do (2) in this PR itself. That said, I will let you take the final call on that.

@gianm
Copy link
Contributor Author

gianm commented Jan 11, 2016

@himanshug kafka-indexing sounds cool

Hmm, I would prefer to have this PR have the stuff it has for now, mostly because this stuff is "done" (ish) and the follow-on stuff is not done :). I did start working on (2) though, so depending on how long this PR is open for, we could potentially include that instead of the kafka stuff.

Out of curiosity why do you prefer to have the tranquility stuff here instead of the kafka stuff?

@himanshug
Copy link
Contributor

"Out of curiosity why do you prefer to have the tranquility stuff here instead of the kafka stuff?"
to get you focused on that and have stuff working for tranquility as early as possible.

also, I dint realize that (2) was going to be "tranquility specific" code, i thought that was core druid change and not a new module.

@gianm
Copy link
Contributor Author

gianm commented Jan 11, 2016

Haha, ok :)

I actually had been doing the tranquility stuff first and then took a detour through the kafka stuff as I think it is a lot simpler, and wanted to get some kind of initial thing working. The tranquility stuff ended up getting kind of complicated. I plan to go back to that soon though. But in the meantime, some review on this stuff would be great.

final long sleepMillis = computeNextRetrySleep(++nTry);
log.warn(e, "Failed publishAll (try %d), retrying in %,dms.", nTry, sleepMillis);
Thread.sleep(sleepMillis);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

various places where we have long running stuff, we need to check for thread interrupt status and finish early, so that processes stop properly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an interrupt will cause publishAll to throw an InterruptedException (due to the Thread.sleep), is that enough?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the Exception e caught here is an InterruptedException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, will fix.

@gianm gianm mentioned this pull request Jan 14, 2016
log.makeAlert("Unknown query type, [%s]", query.getClass())
.addData("dataSource", query.getDataSource())
.emit();
return new NoopQueryRunner<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should probably be an exception instead of responding with empty result.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, will change to ISE

@fjy
Copy link
Contributor

fjy commented Mar 8, 2016

👍

handoffNotifier.start();

final FiniteAppenderatorDriverMetadata metadata = objectMapper.convertValue(
appenderator.startJob(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

appenderator.startJob() might give null in many cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's ok, convertValue turns null into null, and then there's a null check in this method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, dint know that converValue could handle null.

@himanshug
Copy link
Contributor

@gianm had some comments but overall looks good and from what I see this PR does not change anything in RealtimeIndexTask so it should have no impact on existing ingestion mechanism.

regarding the discussion points..

  1. I think we can update RealtimePlumber at some point if/when possible.
  2. I'm not sure if we need to update IndexGeneratorReducer with this.

Appenderators are a way of getting more control over the ingestion process
than a Plumber allows. The idea is that existing Plumbers could be implemented
using Appenderators, but you could also implement things that Plumbers can't do.

FiniteAppenderatorDrivers help simplify indexing a finite stream of data.

Also:
- Sink: Ability to consider itself "finished" vs "still writable".
- Sink: Ability to return the number of rows contained within the sink.
Geared towards supporting transactional inserts of new segments. This involves an
interface "DataSourceMetadata" that allows combining of partially specified metadata
(useful for partitioned ingestion).

DataSource metadata is stored in a new "dataSource" table.
@gianm
Copy link
Contributor Author

gianm commented Mar 11, 2016

@himanshug updated with most of your review comments addressed, but I am not entirely sure what you mean on the thread w/ #2220 (comment).

Reads a specific offset range from specific partitions, and can use dataSource metadata
transactions to guarantee exactly-once ingestion.

Each task has a finite lifecycle, so it is expected that some process will be supervising
existing tasks and creating new ones when needed.
@himanshug
Copy link
Contributor

👍 @gianm looks good overall, can you pls cleanup your commit history as needed.

@gianm
Copy link
Contributor Author

gianm commented Mar 14, 2016

@himanshug I had it split up as 6 commit on purpose (I think the 6 commits are pretty distinct), do you think I should combine some of them?

@himanshug
Copy link
Contributor

i did say, "as needed" in there. if its already done then, great. 👍

@gianm
Copy link
Contributor Author

gianm commented Mar 14, 2016

cool, just checking 😄

himanshug added a commit that referenced this pull request Mar 14, 2016
Appenderators, DataSource metadata, KafkaIndexTask
@himanshug himanshug merged commit d51a0a0 into apache:master Mar 14, 2016
@gianm gianm deleted the appenderator-kafka branch March 15, 2016 03:23
@gianm gianm mentioned this pull request May 4, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants