Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental kafa simple consumer based firehose #1609

Merged
merged 2 commits into from
Aug 28, 2015

Conversation

himanshug
Copy link
Contributor

tracks and addresses the review comments on #1482

copied description from #1482

This feature introduces a simple consumer implementation for realtime firehose. It keeps track of current offset metadata by storing it in metadata.drd with smooshed files, and will be able to recover the previous offset position after restart.
On restart, it take down the offset from the sequentially "last" valid persisted file, and rename all incomplete persist directory to a path for corrupted data. e.g. there are sub dirs /1, /2, /3, ..../8 under directory mydatasource/20150630T10:00:00-11:00:00/, and /7 doesn't contain meta.smoosh, RealtimePlumber will rename both /7 and /8 to corrupted/mydatasource/20150630T10:00:00-11:00:00/* and use offset from /6 as starting point.
When indexing, it follows the logic:

  1. Call start()
  2. Read currRow()
  3. Call advance()
  4. If index should be committed: commit()
  5. GOTO 2
    Note it will only advance when current row is successfully processed, which means saving the end offset of current message.

Google Group reference:
https://groups.google.com/forum/#!topic/druid-development/9HB9hCcqvuI

The goal of this PR is to

  1. Introduce a driver that can handle a new FirehoseV2 interface that should enable better real-time support going forward.
  2. Start persisting metadata along with segments so that a Firehose can potentially use that information to restart.
  3. Help inform what kinds of things we need to think about and deal with as we move forward with rationalizing the real-time ingestion story

Even after this is merged, firehoseV2 is expected to be experimental and should not be the goto firehose for realtime ingestion. That will come after more adjustments, likely. Or, it's possible that this initial attempt informs things such that we actually go and change the interfaces or add a firehoseV3. As it stands, the PR does the useful thing that we initially need it to do and is hopefully a good spring board for further evolution.

@@ -352,9 +390,13 @@ public void doRun()
{
try {
for (Pair<FireHydrant, Interval> pair : indexesToPersist) {
metrics.incrementRowOutputCount(persistHydrant(pair.lhs, schema, pair.rhs));
metrics.incrementRowOutputCount(
persistHydrant(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outstanding review comment --
@gianm - Will we lose data if one hydrant is persisted with the metadata, then the plumber crashes? If I'm reading the code right, that would cause the next bootstrap to think that all the previously read data was persisted.

@himanshug - hmmm... that sounds correct, still thinking what would be the right thing to do here...
may be create a marker file in the end at /persist_dir/datasource/, and use commit metadata information only if the marker file was present?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cheddar what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to store the metadata outside the segments in a separate file. This is because the commit metadata isn't really associated with an individual segment-- it's associated with a set of segments that are persisted at the same time. So storing it in the segments is asking for problems.

Sort of like this:

{
  "metadata" : {"foo": "bar"},
  "segments": [
    {"id": "datasource_2000_2001_2000_1", "hydrant": 10},
    {"id": "datasource_2001_2002_2001_1", "hydrant": 12},
  ]
}

When a realtime node crashes and starts back up, it would delete any hydrants numbered higher than the ones in the commit file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I look more at this code, I think I agree about commitMetadata be associated with whole datasource at this level really and not to individual segments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also just include the set of segments for the same chunk of metadata in more metadata on each of the segments.

I think there is value is storing it inside the segment as a form of lineage.

I also don't necessarily feel so strongly about it that I would be against a separate file, necessarily. I don't think that has to be changed in this initial PR, however. It actually unravels and creeps out the scope quite a bit 'cause it also requires us to consider the hand-off in terms of the full set of segments being handed off instead of individual segments being handed off (that is, if one of the set succeeds in handing off and the others fail, the real-time would believe that it needs to re-ingest the data).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noted the discussion in comments for future.

@gianm
Copy link
Contributor

gianm commented Aug 25, 2015

👍 after merge conflicts / travis are resolved

@himanshug
Copy link
Contributor Author

@gianm resolved the merge conflict, jdk8 build actually failed on an unrelated test and hopefully will pass this time.

@gianm gianm closed this Aug 26, 2015
@gianm gianm reopened this Aug 26, 2015
@gianm
Copy link
Contributor

gianm commented Aug 26, 2015

bouncing for travis

@gianm
Copy link
Contributor

gianm commented Aug 26, 2015

Thanks @himanshug

Hmm, does @cheddar's +1 on #1482 count towards this one? Anyone else want to / available to take a look?

@fjy
Copy link
Contributor

fjy commented Aug 26, 2015

@cheddar's +1 should count

@gianm
Copy link
Contributor

gianm commented Aug 26, 2015

Ok, sgtm. Will merge in a bit unless there are further comments. /cc @drcrallen @nishantmonu51 @xvrl who had commented on the previous PR.

byte[] stringBytes = new byte[length];
in.get(stringBytes);
return new String(stringBytes, UTF8);
return new String(readBytes(in, length), UTF8);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can StringUtils.fromUtf8 be used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will change

{
return new File(
persistDir.getAbsolutePath()
.replace(schema.getDataSource(), "corrupted/" + schema.getDataSource())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to use Path to build path than to assume "/" is the proper delimiter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will change that to use File.Separator to remove the delimiter assumption

@drcrallen
Copy link
Contributor

Is it possible to test the corruption code paths?

@himanshug
Copy link
Contributor Author

@drcrallen addressed all your review comments in latest commit.
wrt to test for corruption code paths, I think @gianm is blocked on this PR getting merged to make progress on new plumber which is going to change some(maybe more than some) of this code. We will add more tests later if needed to cover various scenarios.

@gianm
Copy link
Contributor

gianm commented Aug 27, 2015

yeah, I was hoping to rebase the other PR off this one and then build on that.

</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(not blocking for this PR) we may need to consider better ways to handle the "different scala version require different artifact IDs" thing.

@himanshug
Copy link
Contributor Author

@drcrallen updated code to have messaging around metadata parsing failure in IndexIO

@fjy
Copy link
Contributor

fjy commented Aug 27, 2015

I'm 👍 on this. It is experimental and not extremely impacting.

@fjy
Copy link
Contributor

fjy commented Aug 27, 2015

I think we have all the votes we need for this PR. Any more blockers/concerns?

@himanshug @gianm

@drcrallen
Copy link
Contributor

👍 Provided that it will be revisited to add better corruption testing, and address some of the firehose concerns that could be deferred until later.

@gianm
Copy link
Contributor

gianm commented Aug 27, 2015

lgtm, @himanshug do you want to squash the commits a little? Maybe into one for the original patch and one for your changes, or however you want to do it

lvjq and others added 2 commits August 27, 2015 20:50
firehoseV2 addition to Realtime[Manager|Plumber],
essential segment metadata persist support,
kafka-simple-consumer-firehose extension patch
@himanshug
Copy link
Contributor Author

@gianm rebased/squashed in 2 commits , 1 from original patch and another with the changes to address review comments

@gianm gianm closed this Aug 28, 2015
@gianm gianm reopened this Aug 28, 2015
@gianm gianm closed this Aug 28, 2015
@gianm gianm reopened this Aug 28, 2015
@gianm gianm closed this Aug 28, 2015
@gianm gianm reopened this Aug 28, 2015
@himanshug himanshug closed this Aug 28, 2015
@himanshug himanshug reopened this Aug 28, 2015
@himanshug
Copy link
Contributor Author

@gianm ok, finally the build has passed :)

gianm added a commit that referenced this pull request Aug 28, 2015
Experimental kafa simple consumer based firehose
@gianm gianm merged commit 19c63a1 into apache:master Aug 28, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants