-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature for hadoop batch re-ingesion and delta ingestion #1374
Conversation
public static final String CONF_DRUID_SCHEMA = "druid.datasource.schema"; | ||
|
||
@Override | ||
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would you feel about having a DataSegmentInputSplit as per https://github.com/druid-io/druid/pull/1351/files#diff-e3e429f5bc3e9ee67f25b69b7e6c3969R981
?
Alternatively we could have a "HadoopDataSegment" which has a hadoopy loadSpec which is only a URI parsable by hadoop (in addition to the other things a DataSegment usually has. That could allow the peon/overlord to setup stuff correctly for hadoop.
We have the data to get all the splits done completely, it is just a matter of ensuring the data gets propagated to Hadoop correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think your DataSegmentSplit and my DatasourceInputSplit are pretty much the same (and reusable) except I'm just keeping "path" in it while you are keeping whole DataSegment inside it. Do you really need all of DataSegment, can we just keep "loadSpec" in the input split, that information should be enough to load the segment.
@@ -104,6 +94,8 @@ public final static InputRow parseInputRow(Writable value, InputRowParser parser | |||
if(parser instanceof StringInputRowParser && value instanceof Text) { | |||
//Note: This is to ensure backward compatibility with 0.7.0 and before | |||
return ((StringInputRowParser)parser).parse(value.toString()); | |||
} else if(value instanceof HadoopWritableInputRow) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we are adjusting things to make the Mapper understand more different types of objects, I think it might be cleaner to define the Mapper interface in terms of InputRow objects and move all of the parser and stuff to the InputFormat.
This would have the downside of making it a bit more difficult to take other InputFormats, but would have the plus of getting rid of the parser when the parser is not needed (it becomes a part of the InputFormat instead of the mapper).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If Mapper takes InputRow, then the row might contain a complex metric object such as HyperLogLogCollector. Then mapper needs to know how to serialize it.
On the other hand, If we introduce
HadoopWritableInputRow and enforce mapper to only take those (not done now to keep this feature PR simple and decided to put the if/else) then complete ownership of reading/parsing data goes to the InputFormat(which should understand the input data best) and mapper is assured to not get into serde issues.
The high-level direction seems good. The biggest question in my mind is do we keep the Writable InputRow or do we adjust the jobs to pull out the key/value and all that now? |
914b965
to
bbe682f
Compare
@himanshug @cheddar hey guys just wondering what the status of this is |
@himanshug #1472 is merged |
@fjy thanks, I will update this one soon, just doing some testing of the code to make sure that it works. |
bbe682f
to
12222cf
Compare
👍 after latest changes |
private int rowNum; | ||
private MapBasedRow currRow; | ||
|
||
private List<QueryableIndex> indexes = Lists.newArrayList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very minor nitpick: but this could be a Closer instead of a list of queryableIndex.
I'm fine with it as is, but for future reference a closer would make it more obvious what's happening here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm, for now, let us keep it as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
@himanshug I do have a concern about concurrency here, if I use delta ingestion but do not have a lock on the interval, then whatever I ingest may or may not be actually representative of the latest data. A simple example is if I run delta ingestion twice at the same time for different input paths to add. (or start a second one before the first has finished). The indexing service hadoop indexer should be handling locks correctly, and I think such a race condition should not be able to proceed when the indexing service is used. But the standalone hadoop indexer could very easily encounter a race. But since we in general do not support accounting for race conditions in the stand alone stuff, that is just how its going to be. Does that sound correct? |
@drcrallen that sounds right to me. The stance on standalone stuff (hadoop & realtime) has always been that you need to think about consistency and concurrency yourself, since Druid only does meaningful locking when you use the indexing service. |
@drcrallen wrt concurrency, that is true. But, this PR is not introducing/changing that behavior. The issue you described with standalone hadoop indexer is there even today and not related to this PR really. |
@himanshug / @gianm Cool, I'm good with maintaining existing behavior, and just wanted to make sure our expectations are documented in case anyone ever comes back to this PR's comment thread. |
…ecomes reusable Conflicts: indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
Conflicts: indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java indexing-service/pom.xml
… we can grab data from multiple places in same ingestion Conflicts: indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java Conflicts: indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java
…xer uses overlord action to get list of segments and passes when running as an overlord task. and, uses metadata store directly when running as standalone hadoop indexer also, serialized list of segments is passed to DatasourcePathSpec so that hadoop classloader issues do not creep up
f3e2c61
to
cfd81bf
Compare
@gianm @drcrallen @nishantmonu51 |
) | ||
{ | ||
this.mapper = Preconditions.checkNotNull(mapper, "null mapper"); | ||
this.segments = segments; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Convention in some other places is to check for null and set as ImmutalbeList.of() if the argument is null. Would that be appropriate to use here? it seems most of the logic will throw errors if this is not set to a non-empty list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, it is valid for segments to not be specified, the check is done as the first thing inside addInputPaths(..). it is expected to be set by then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok sure
I'm 👍 if we can get one other committer to agree with the comment in #1374 (comment) |
👍 at a3bab5b |
Feature for hadoop batch re-ingesion and delta ingestion
This PR implements following features.
We implement a "DatasourceInputFormat" and "DatasourcePathSpec" that can be used to batch re-ingest data back to druid from existing Datasource (something like IngestSegmentFirehose) segments.
Also, provided changes to allow specifying multiple PathSpecs (by using "MultiplePathSpec" ) in the batch ingestion so that you can combine both DatasourcePathSpec and StaticPathSpec (or any other) in the same ingestion to add late arriving data to existing ingested interval (aka "Delta" Ingestion)
original proposal discussion: https://groups.google.com/forum/?utm_medium=email&utm_source=footer#!msg/druid-development/EXYAPcV6pk4/Nu1uRGKEctAJ