Skip to content

Commit

Permalink
Adding support for UserDefinedBulkInsertPartitioner
Browse files Browse the repository at this point in the history
  • Loading branch information
ovj committed Sep 8, 2017
1 parent 63f1b12 commit 43638a8
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 3 deletions.
35 changes: 32 additions & 3 deletions hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.io.HoodieCommitArchiveLog;
import com.uber.hoodie.metrics.HoodieMetrics;
import com.uber.hoodie.table.UserDefinedBulkInsertPartitioner;
import com.uber.hoodie.table.HoodieTable;
import com.uber.hoodie.table.WorkloadProfile;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -215,6 +216,27 @@ public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, final Strin
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
return bulkInsert(records, commitTime, Option.empty());
}

/**
* Loads the given HoodieRecords, as inserts into the table. This is suitable for doing big bulk
* loads into a Hoodie table for the very first time (e.g: converting an existing dataset to
* Hoodie).
*
* This implementation uses sortBy (which does range partitioning based on reservoir sampling) and
* attempts to control the numbers of files with less memory compared to the {@link
* HoodieWriteClient#insert(JavaRDD, String)}. Optionally it allows users to specify their own partitioner. If
* specified then it will be used for repartitioning records. See {@link UserDefinedBulkInsertPartitioner}.
*
* @param records HoodieRecords to insert
* @param commitTime Commit Time handle
* @param bulkInsertPartitioner If specified then it will be used to partition input records before they are
* inserted into hoodie.
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, final String commitTime,
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
writeContext = metrics.getCommitCtx();
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable
Expand All @@ -225,16 +247,23 @@ public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, final S
JavaRDD<HoodieRecord<T>> dedupedRecords =
combineOnCondition(config.shouldCombineBeforeInsert(), records, config.getInsertShuffleParallelism());

// Now, sort the records and line them up nicely for loading.
JavaRDD<HoodieRecord<T>> sortedRecords = dedupedRecords
final JavaRDD<HoodieRecord<T>> repartitionedRecords;
if (bulkInsertPartitioner.isDefined()) {
repartitionedRecords =
bulkInsertPartitioner.get().repartitionRecords(dedupedRecords,
config.getBulkInsertShuffleParallelism());
} else {
// Now, sort the records and line them up nicely for loading.
repartitionedRecords = dedupedRecords
.sortBy(record -> {
// Let's use "partitionPath + key" as the sort key. Spark, will ensure
// the records split evenly across RDD partitions, such that small partitions fit
// into 1 RDD partition, while big ones spread evenly across multiple RDD partitions
return String
.format("%s+%s", record.getPartitionPath(), record.getRecordKey());
}, true, config.getBulkInsertShuffleParallelism());
JavaRDD<WriteStatus> writeStatusRDD = sortedRecords
}
JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords
.mapPartitionsWithIndex(new BulkInsertMapFunction<T>(commitTime, config, table), true)
.flatMap(writeStatuses -> writeStatuses.iterator());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.table;

import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import org.apache.spark.api.java.JavaRDD;

/**
* Repartition input records into at least expected number of output spark partitions. It should give
* below guarantees
* - Output spark partition will have records from only one hoodie partition.
* - Average records per output spark partitions should be almost equal to (#inputRecords / #outputSparkPartitions)
* to avoid possible skews.
*/
public interface UserDefinedBulkInsertPartitioner<T extends HoodieRecordPayload> {

JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions);
}

0 comments on commit 43638a8

Please sign in to comment.