Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Combiner to merge InputRows at the Mapper during Hadoop Batch Ingestion #1472

Merged
merged 5 commits into from
Jul 20, 2015

Conversation

himanshug
Copy link
Contributor

this patch
(1) takes control of the serialization format of InputRow from mapper to reducer and in turn allows InputFormat to return records of arbitrary type and not just Text (only thing is that configured InputRowParser can produce InputRow from that arbitrary type)
(2) introduces combiner and merges input rows at mapper nodes if possible

while(iter.hasNext()) {
context.progress();
InputRow value = InputRowHelper.fromBytes(iter.next().getBytes(), config.getSchema().getDataSchema().getAggregators());
index.add(value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to check index size bounds here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nishantmonu51 can you point me to some code doing the bound checking? I dont see that happening in IndexGeneratorReducer.reduce(..) either.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reducer checks whether a row can be appended to the indexer or not by calling index.canAppendRow() , If the index is full it persists the current one and creates another index.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, I see that now.
In this case persist(..) is not really an option, what we can do is.. flush rows from that index into context.write(..) and create another index if/when index.canAppendRow() returns false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@nishantmonu51
Copy link
Member

how much using combiner helped in improving ingestion time ?
do you have any numbers for ingestion time with using combiner ?

@himanshug
Copy link
Contributor Author

@nishantmonu51 i haven't done much perf testing on this because performance improvements will really depend upon how much merging is possible, however I have ensured that if there was only one row for a key then combiner is effectively noop with almost no overhead.

@@ -193,14 +200,39 @@ public boolean run()
}
}

public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesWritable, Writable>
private static IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs, StupidPool bufferPool, HadoopDruidIndexerConfig config)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there any changes in this function or did it just get moved around? It might be easier to leave it in place for review and worry about formatting separately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just moved so that it can be used in combiner class too.

@cheddar
Copy link
Contributor

cheddar commented Jul 6, 2015

It looks like the tact taken was to create IncrementalIndex objects in the combiner and then persist those. I agree that this will functionally work, but I fear that it will be really difficult to manage and tune from the MR side. There are new configurations to manage about how much memory to give the combiner, etc. that will make it difficult for someone who doesn't know MR to have this "just work".

I think we need to move the dimension names and values into the actual Key object and have each combiner work on only a single entry.

ByteArrayDataOutput out = ByteStreams.newDataOutput();

List<String> dimList = row.getDimensions();
String[] dims = dimList == null ? EMPTY_STR_ARRAY : dimList.toArray(EMPTY_STR_ARRAY);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to have the dimList be the list used in most parts of the function, and dims be absent? the ArrayWritable can take dimList.toArray(new String[dimList.size()]) or similar

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will do

@fjy fjy added this to the 0.8.1 milestone Jul 9, 2015

private static final String[] EMPTY_STR_ARRAY = new String[0];

public final static byte[] toBytes(InputRow row, AggregatorFactory[] aggs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of relying on hadoop serialization here, would it make sense for us to define our own so we can potentially re-use that serde elsewhere? For instance, it might speed up serializing and passing large groupby results between nodes, and might be useful for on-disk merging of group by results in the future? Just a thought.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense. However, from toByte(..) fromBytes(..) contract perspective, there is no dependency on hadoop libs. when I did it, hadoop libs just felt like most obvious and simple choice since it was already included in this module.
important thing is that serde implementation can be changed completely at any time with no functional user facing impact. We can do it whenever needed.

@himanshug
Copy link
Contributor Author

@drcrallen I have cleaned up the commit history already. commits in the history now represent key logical stages of the development which I would like to preserve.

@fjy
Copy link
Contributor

fjy commented Jul 17, 2015

@cheddar @xvrl can someone else who has reviewed this sign off on this?

StringArrayWritable sw = new StringArrayWritable(dims);
sw.write(out);

out.writeLong(row.getTimestampFromEpoch());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd move this to the beginning. It's the only fixed-width portion of what is being serialized and having it in a known location can make it easier to re-use this format for other things (like partitioning, etc.) if we ever want to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, moved timestamp to be first in the serialized byte[] .

@cheddar
Copy link
Contributor

cheddar commented Jul 20, 2015

Aside from the bits in serialization, I'm 👍

@himanshug Can you verify those bits and then merge at will?

…tricSerde

This provides the alternative to using ComplexMetricSerde.getObjectStrategy()
and using the serde methods from ObjectStrategy as that usage pattern is deprecated.
This allows for arbitrary InputFormat while hadoop batch ingestion that
can return records of value type other than Text
…dd a

hadoop combiner to hadoop batch ingestion to do merges on the mappers if possible
@fjy
Copy link
Contributor

fjy commented Jul 20, 2015

@himanshug I can merge this since it has 2 +1s now. Do you still want to fix anything else up?

@himanshug
Copy link
Contributor Author

@fjy pls go ahead, i'm done.

fjy added a commit that referenced this pull request Jul 20, 2015
Use Combiner to merge InputRows at the Mapper during Hadoop Batch Ingestion
@fjy fjy merged commit c4ed8fe into apache:master Jul 20, 2015
@xvrl
Copy link
Member

xvrl commented Jul 20, 2015

@himanshug did we ever do some performance testing to get a sense of how much speed improvements we can get for things that aggregate well, and how much impact it may have on things that don't aggregate well? It would be useful to know at which point it makes sense to start using the combiner or not.

@himanshug
Copy link
Contributor Author

@xvrl not yet, but, will do that.

@himanshug himanshug deleted the combiner branch August 21, 2015 03:31
@zhaown
Copy link
Contributor

zhaown commented Oct 18, 2015

@xvrl I've just moved my old avro hadoop indexing module to that based on @himanshug 's unification of hadoop indexing code in 0.8.1, it's more clear in code compare with our old hacky way. And here are some performance testing results against combiner. Obviously combiner is wonderful for my data which I think is low cardinality, caused 75% indexing time off. Great job!

Avro indexing w/ mapout compressed and w/o combiner

Used 310 mapper and 2 reducer, with Average time: map=51s, reduce=43m, shuffle=11m, merge=4m

Counters are:

   File System Counters
            FILE: Number of bytes read=8142706921
            FILE: Number of bytes written=10872950580
            FILE: Number of read operations=0
            FILE: Number of large read operations=0
            FILE: Number of write operations=0
            HDFS: Number of bytes read=10162886557
            HDFS: Number of bytes written=38274218
            HDFS: Number of read operations=1252
            HDFS: Number of large read operations=0
            HDFS: Number of write operations=12
    Job Counters 
            Killed map tasks=3
            Launched map tasks=313
            Launched reduce tasks=2
            Data-local map tasks=299
            Rack-local map tasks=14
            Total time spent by all maps in occupied slots (ms)=64539028
            Total time spent by all reduces in occupied slots (ms)=130652370
            Total time spent by all map tasks (ms)=16134757
            Total time spent by all reduce tasks (ms)=7258465
            Total vcore-seconds taken by all map tasks=16134757
            Total vcore-seconds taken by all reduce tasks=7258465
            Total megabyte-seconds taken by all map tasks=129078056000
            Total megabyte-seconds taken by all reduce tasks=254046275000
    Map-Reduce Framework
            Map input records=156015341
            Map output records=156015341
            Map output bytes=87648872970
            Map output materialized bytes=2724142889
            Input split bytes=105124
            Combine input records=0
            Combine output records=0
            Reduce input groups=2
            Reduce shuffle bytes=2724142889
            Reduce input records=156015341
            Reduce output records=0
            Spilled Records=598545609
            Shuffled Maps =620
            Failed Shuffles=0
            Merged Map outputs=620
            GC time elapsed (ms)=291648
           CPU time spent (ms)=34082800
            Physical memory (bytes) snapshot=557086097408
            Virtual memory (bytes) snapshot=2414911942656
            Total committed heap usage (bytes)=727351558144
Avro indexing w/ mapout compressed and w/ combiner

Used 310 mapper and 2 reducer, with Average time: map=1m, reduce=10m, shuffle=3m, merge=16s

Counters are:

    File System Counters
            FILE: Number of bytes read=3578916544
            FILE: Number of bytes written=5004288082
            FILE: Number of read operations=0
            FILE: Number of large read operations=0
            FILE: Number of write operations=0
            HDFS: Number of bytes read=10162886557
            HDFS: Number of bytes written=38274218
            HDFS: Number of read operations=1252
            HDFS: Number of large read operations=0
            HDFS: Number of write operations=12
    Job Counters 
            Killed map tasks=4
            Launched map tasks=314
            Launched reduce tasks=2
            Data-local map tasks=299
            Rack-local map tasks=15
            Total time spent by all maps in occupied slots (ms)=83878228
            Total time spent by all reduces in occupied slots (ms)=29246580
            Total time spent by all map tasks (ms)=20969557
            Total time spent by all reduce tasks (ms)=1624810
            Total vcore-seconds taken by all map tasks=20969557
            Total vcore-seconds taken by all reduce tasks=1624810
            Total megabyte-seconds taken by all map tasks=167756456000
            Total megabyte-seconds taken by all reduce tasks=56868350000
    Map-Reduce Framework
            Map input records=156015341
            Map output records=156015341
            Map output bytes=87648872970
            Map output materialized bytes=1431268523
            Input split bytes=105124
            Combine input records=195582520
            Combine output records=64974072
            Reduce input groups=2
            Reduce shuffle bytes=1431268523
            Reduce input records=25406893
            Reduce output records=0
            Spilled Records=94043172
            Shuffled Maps =620
            Failed Shuffles=0
            Merged Map outputs=620
            GC time elapsed (ms)=344549
            CPU time spent (ms)=34477640
            Physical memory (bytes) snapshot=623229333504
            Virtual memory (bytes) snapshot=2418919870464
            Total committed heap usage (bytes)=772202823680

@drcrallen
Copy link
Contributor

23393222 vs 22594367 total map and reduce vcore time so ~3.5% reduction. That seems odd given the other numbers you quoted.

@himanshug
Copy link
Contributor Author

@zhaown thanks for reporting the numbers, glad that its working well for you.

@drcrallen I don't think adding the numbers straight away is an indicator of total job time. You would have to see something like..
av. map phase time = map-vcore/num-mappers
av. reduce phase time = reduce-vcore/num-reducers
total job time ~ av. map phase time + av. reduce phase time + overhead (sort, shuffle, scheduling...)

ignoring overhead,
for 1st job, total job time ~ (16134757/310 ms ~ 52 secs ) + (7258465/2 ms ~ 3629 secs) = 3681 secs
for 2nd job, total job time ~ (20969557/310 ms ~ 67 secs ) + (1624810/2 ms ~ 812 secs ) = 879 secs

% improvement ~ 75%

@zhaown
Copy link
Contributor

zhaown commented Oct 20, 2015

@drcrallen @himanshug I think adding the numbers straight away is the total cpu-clock time istead of the wall-clock time. After all Druid has to do what it need to do for indexing, combiner does no magic reducing works besides shuffle-like-overhead. It's like moving some work from reducer to mapper, and while we cannot have many reducers as we don't want many output files, we can have many mappers, so using combiner increases parallelism, reduces wall-clock time.

Actually to further increase parallelism, I change targetPartitionSize from 5,000,000 to 1,000,000, then I have 6 reducers instead of 2, reduce total indexing time by ~50%+ off.

@drcrallen
Copy link
Contributor

@zhaown @himanshug So is it safe to say that this patch helps an individual job running on its own cluster, but the amortized run time of a multi-tenant cluster which is run at or above capacity might not change (or not change much)?

@himanshug
Copy link
Contributor Author

@drcrallen individual job finishing faster would mean slots getting freed up quicker. also, this is good because your ingestion finishes faster. however, since this reduces the time taken for reducers mainly which are way less in number than mappers so overall cluster capacity might not change that much.

@drcrallen
Copy link
Contributor

@himanshug As such does it also change the memory requirements of the mapper?

@himanshug
Copy link
Contributor Author

@drcrallen yes, but just enough to hold one "merged" row in an index object and overhead associated with combiner. not sure how to calculate per mapper overhead from the numbers above.

@drcrallen
Copy link
Contributor

@himanshug sure no problem, I just want to make sure the potential impact areas are communicated

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants