Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replacing SortBy with custom partitioner #245

Merged
merged 1 commit into from Sep 9, 2017

Conversation

ovj
Copy link
Contributor

@ovj ovj commented Aug 16, 2017

Replacing bulkInsert sortby with custom partitioner such that;

  • Single output spark partition doesn't have records from more than one hoodie partition.
  • Trying to fit as many records with same hoodie partition as we can from same input spark partition into output spark partition.
    @vinothchandar, @prazanna ,@n3nash , @jianxu

@vinothchandar
Copy link
Member

Can you make the partitioner pluggable? Also could you share the performance analysis you did, since thats core aspect of this PR

@vinothchandar
Copy link
Member

and please merge both the commits together..

*/
public class BulkInsertPartitioner<T extends HoodieRecordPayload> extends Partitioner {
private static Logger logger = LogManager.getLogger(BulkInsertPartitioner.class);
private static final int BUCKET_MULTIPLIER = 4;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadoc/comments

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats special about 4, that makes it work for all workloads ?

private HoodiePartitionMapper hoodiePartitionMapper;
private int numOfPartitions = -1;

private BulkInsertPartitioner(int outputSparkPartitions) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parity between constructor arg and member variable naming, pls

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the constructor once I add partitioner as an argument. I will have to init it via "repartitionRecords" method. Can you please look at it once again?

*/
public class BulkInsertPartitioner<T extends HoodieRecordPayload> extends Partitioner {
private static Logger logger = LogManager.getLogger(BulkInsertPartitioner.class);
private static final int BUCKET_MULTIPLIER = 4;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats special about 4, that makes it work for all workloads ?

// As we are going over input records twice; we need to cache input records.
JavaRDD<HoodieRecord<T>> cachedRecords = records.persist(StorageLevel.MEMORY_AND_DISK_SER());
BulkInsertPartitioner<T> bulkInsertPartitioner = new BulkInsertPartitioner<>(outputSparkPartitions);
bulkInsertPartitioner.init(records);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets pass in the records into the constructor? is there a specific reason for the init method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we are passing Partitioner as an argument then we would not have control over object creation for all cases. In that case this would be needed. Please see the updated PR.


return cachedRecords.mapPartitionsWithIndex(
(partitionNumber, recordIterator) ->
new Iterator<Tuple2<Tuple2<HoodieRecord, Integer>, Integer>>() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rework this using the LazyIterableIterator

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do.

/**
* Helper class for counting.
*/
private class Counter implements Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please replace this with a standard AtomicLong

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or similar..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do

@@ -0,0 +1,16 @@
package com.uber.hoodie.table;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license.. Please ensure things build locally, before pushing into PR..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry my bad. I was running it locally with skip.rat=true

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewing still

public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) {
init(records, outputSparkPartitions);
// As we are going over input records twice; we need to cache input records.
JavaRDD<HoodieRecord<T>> cachedRecords = records.persist(StorageLevel.MEMORY_AND_DISK_SER());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bulkInsert() api targets full loads with multi TB input. Are you proposing to cache even under these cases? The spilling here (likely to happen) will cause significant extra IO. Have you tested across such workloads?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Not tested it for multi TB. Max I have tested it for is 800GB of input size.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark does reservoir sampling to handle large inputs such as these

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why we are doing caching (DISK+MEMORY) is because we are reading it twice. So it is good if we cache it even for large datasets.

For SortBy - I do see that initial boundary computation may not need spark to read complete data but to later sort them it will have to read the entire data (per partition/executor) right? am I missing anything here?

I have collected the results for 64GB dataset with 32 executors and 16 hoodie writers (insertParallelism). [This is almost equivalent to 1TB for 512 executors].

With SortBy
screen shot 2017-08-20 at 12 29 22 pm

With Custom partitioner

screen shot 2017-08-20 at 12 29 57 pm

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I grok this correctly, its ~32 min (27 + 3.2 + 1.7) vs 20 min (11 + 6.4 + 2).. The puzzling this is the different mostly seems to come from RDDWrapper.java.. what is this piece of code?

Also overall, I think a lot of wht you are trying to accomplish is already in the UpsertPartitioner, so not sure if we need a new partitioner for this.. Happy to chat more f2f if needed.. Still not sold fully..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RDDWrapper is actually just waiting for Hoodie to finish writing. This code is exactly same for both the runs.

Yes you are right. Overall it is very close to UpsertPartitioner but there are fundamental differences in it. It's better if we chat f2f. :) will ping you.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you compare this with simply doing .insert() with the necessary tuning.. Really like to understand the fundamental difference in approach here before adding in a new partitioner

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sg lets f2f

@ovj
Copy link
Contributor Author

ovj commented Aug 18, 2017

whats special about 4, that makes it work for all workloads ? >> Nothing special I just wanted to ensure that number of buckets are some multiple of output partitions.

@ovj ovj force-pushed the custom_partitioner branch 4 times, most recently from 71d229f to c93d2b8 Compare August 25, 2017 22:15
@ovj
Copy link
Contributor Author

ovj commented Aug 25, 2017

Attaching the results

With SortBy

sortby

With UpsertPartitioner

upsertpartitioner

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for revising it. left some comments..
I still have the high level question that : given we are fixing & re-using existing UpsertPartitioner, can we just leave bulkInsert as it is, using sortBy and just use insert when you need the faster performance.. In this case, this entire PR will add just the fixes to UpsertPartitioner

/**
* Packs incoming records to be upserted, into buckets (1 bucket = 1 RDD partition)
*/
public class UpsertPartitioner<T extends HoodieRecordPayload> extends Partitioner {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please lets move this back to how structure was orginally - its easier to see the changes.. Like to be very prudent here, since this is can potentially cause large regressions..

I would appreciate a simple, incremental diff on UpsertPartitioner

}

/**
* Helper class for an insert bucket along with the weight [0.0, 0.1]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so you have removed the weights? and just picking an insert bucket based on floorMod?

return updateLocationToBucket.get(location.getFileId());
} else {
final List<InsertBucket> insertBuckets = partitionPathToInsertBuckets.get(keyLocation._1().getPartitionPath());
final int insertBucketIndex = Math.floorMod(keyLocation._1().getRecordKey().hashCode(), insertBuckets.size());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the weights are gone, how do we ensure the small files are not expanded beyond the maximum configured file size? This will have effects for a future update. We need to handle this case.

@ovj
Copy link
Contributor Author

ovj commented Aug 26, 2017

@vinothchandar is it ok to limit the scope of this PR to just enable user to define their own partitioner for bulkInsert api? And keeping sortBy as the default way for bulkInsert? That way it is zero impact for existing users. Let me know what you think. I will open another issue to fix UpsertPartitioner random issue.

@vinothchandar
Copy link
Member

yeah that plan sounds good, Will review both PRs sometime this week

* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, final String commitTime,
Option<BulkInsertPartitioner> bulkInsertPartitioner) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you just pass in a plain spark partitioner? Seems like all that is being passed in, is the input RDD and a parallelism..

* - Output spark partition will have records from only one hoodie partition.
* - Average records per output spark partitions should be almost equal to (#inputRecords / #outputSparkPartitions).
*/
public interface BulkInsertPartitioner<T extends HoodieRecordPayload> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's get rid of this interface..

@@ -43,23 +48,6 @@
import com.uber.hoodie.exception.HoodieRollbackException;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.table.HoodieTable;
import org.apache.avro.generic.GenericRecord;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please avoid any non code changes in PRs. It just makes it hard to review..

@@ -158,7 +157,7 @@ public void testFilterExist() throws Exception {

JavaRDD<HoodieRecord> smallRecordsRDD = jsc.parallelize(records.subList(0, 75), 1);
// We create three parquet file, each having one record. (two different partitions)
List<WriteStatus> statuses = writeClient.bulkInsert(smallRecordsRDD, newCommitTime).collect();
List<WriteStatus> statuses = writeClient.bulkInsert(smallRecordsRDD, newCommitTime, Option.empty()).collect();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets provide an overloaded method bulkInsert(rdd, commitTime) in the HoodieWriteClient itself, so its consistent with other APIs.

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main thing is just passing in a Spark Partitioner object..

@ovj
Copy link
Contributor Author

ovj commented Sep 9, 2017

Spoke with @vinothchandar offline. Updated the PR. Now we are going to let users define their own partitioner logic.

@vinothchandar vinothchandar merged commit 5c639c0 into apache:master Sep 9, 2017
vinishjail97 pushed a commit to vinishjail97/hudi that referenced this pull request Dec 15, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants