Skip to content

Commit

Permalink
Fixing UpsertPartitioner to ensure that input records are determinist…
Browse files Browse the repository at this point in the history
…ically assigned to output partitions
  • Loading branch information
ovj committed Sep 2, 2017
1 parent 63f1b12 commit 5aaf353
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,26 @@

package com.uber.hoodie.table;

import com.uber.hoodie.common.model.HoodieCompactionMetadata;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.google.common.hash.Hashing;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.HoodieCleanStat;
import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieCompactionMetadata;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordLocation;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.exception.HoodieUpsertException;
import com.uber.hoodie.func.LazyInsertIterable;
import com.uber.hoodie.io.HoodieCleanHelper;
import com.uber.hoodie.io.HoodieMergeHandle;

import java.util.Optional;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.spark.Partitioner;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
Expand All @@ -56,13 +45,11 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
Expand All @@ -73,8 +60,8 @@
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;
import scala.Option;
import scala.Tuple2;

/**
* Implementation of a very heavily read-optimized Hoodie Table where
Expand All @@ -90,12 +77,6 @@ public HoodieCopyOnWriteTable(HoodieWriteConfig config, HoodieTableMetaClient me
super(config, metaClient);
}



// seed for random number generator. No particular significance, just makes testing deterministic
private static final long RANDOM_NUMBER_SEED = 356374L;


private static Logger logger = LogManager.getLogger(HoodieCopyOnWriteTable.class);

enum BucketType {
Expand Down Expand Up @@ -169,6 +150,11 @@ class UpsertPartitioner extends Partitioner {
*/
private int totalBuckets = 0;

/**
* Stat for the current workload. Helps in determining total inserts, upserts etc.
*/
private WorkloadStat globalStat;

/**
* Helps decide which bucket an incoming update should go to.
*/
Expand All @@ -187,17 +173,11 @@ class UpsertPartitioner extends Partitioner {
*/
private HashMap<Integer, BucketInfo> bucketInfoMap;


/**
* Random number generator to use for splitting inserts into buckets by weight
*/
private Random rand = new Random(RANDOM_NUMBER_SEED);


UpsertPartitioner(WorkloadProfile profile) {
updateLocationToBucket = new HashMap<>();
partitionPathToInsertBuckets = new HashMap<>();
bucketInfoMap = new HashMap<>();
globalStat = profile.getGlobalStat();

assignUpdates(profile);
assignInserts(profile);
Expand Down Expand Up @@ -379,7 +359,9 @@ public int getPartition(Object key) {
List<InsertBucket> targetBuckets = partitionPathToInsertBuckets.get(keyLocation._1().getPartitionPath());
// pick the target bucket to use based on the weights.
double totalWeight = 0.0;
double r = rand.nextDouble();
final long totalInserts = Math.max(1, globalStat.getNumInserts());
final double r = 1.0 * Math.floorMod(Hashing.md5().hashString(keyLocation._1().getRecordKey()).asLong(),
totalInserts) / totalInserts;
for (InsertBucket insertBucket: targetBuckets) {
totalWeight += insertBucket.weight;
if (r <= totalWeight) {
Expand Down
32 changes: 16 additions & 16 deletions hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -898,16 +898,16 @@ public void testSmallInsertHandlingForUpserts() throws Exception {

FileSystem fs = FSUtils.getFs();
final String TEST_PARTITION_PATH = "2016/09/26";
final int INSERT_SPLIT_LIMIT = 10;
final int INSERT_SPLIT_LIMIT = 100;
// setup the small file handling params
HoodieWriteConfig config = getSmallInsertWriteConfig(INSERT_SPLIT_LIMIT); // hold upto 20 records max
HoodieWriteConfig config = getSmallInsertWriteConfig(INSERT_SPLIT_LIMIT); // hold upto 200 records max
dataGen = new HoodieTestDataGenerator(new String[] {TEST_PARTITION_PATH});

HoodieWriteClient client = new HoodieWriteClient(jsc, config);

// Inserts => will write file1
String commitTime1 = "001";
List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, INSERT_SPLIT_LIMIT); // this writes ~500kb
List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, INSERT_SPLIT_LIMIT); // this writes ~5000kb
Set<String> keys1 = HoodieClientTestUtils.getRecordKeys(inserts1);

JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
Expand All @@ -917,13 +917,13 @@ public void testSmallInsertHandlingForUpserts() throws Exception {

assertEquals("Just 1 file needs to be added.", 1, statuses.size());
String file1 = statuses.get(0).getFileId();
assertEquals("file should contain 10 records",
assertEquals("file should contain 100 records",
ParquetUtils.readRowKeysFromParquet(new Path(basePath, TEST_PARTITION_PATH + "/" + FSUtils.makeDataFileName(commitTime1, 0, file1))).size(),
10);
100);

// Update + Inserts such that they just expand file1
String commitTime2 = "002";
List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime2, 4);
List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime2, 40);
Set<String> keys2 = HoodieClientTestUtils.getRecordKeys(inserts2);
List<HoodieRecord> insertsAndUpdates2 = new ArrayList<>();
insertsAndUpdates2.addAll(inserts2);
Expand All @@ -937,7 +937,7 @@ public void testSmallInsertHandlingForUpserts() throws Exception {
assertEquals("Existing file should be expanded", file1, statuses.get(0).getFileId());
assertEquals("Existing file should be expanded", commitTime1, statuses.get(0).getStat().getPrevCommit());
Path newFile = new Path(basePath, TEST_PARTITION_PATH + "/" + FSUtils.makeDataFileName(commitTime2, 0, file1));
assertEquals("file should contain 14 records", ParquetUtils.readRowKeysFromParquet(newFile).size(), 14);
assertEquals("file should contain 140 records", ParquetUtils.readRowKeysFromParquet(newFile).size(), 140);

List<GenericRecord> records = ParquetUtils.readAvroRecords(newFile);
for (GenericRecord record: records) {
Expand All @@ -948,7 +948,7 @@ public void testSmallInsertHandlingForUpserts() throws Exception {

// update + inserts such that file1 is updated and expanded, a new file2 is created.
String commitTime3 = "003";
List<HoodieRecord> insertsAndUpdates3 = dataGen.generateInserts(commitTime3, 20);
List<HoodieRecord> insertsAndUpdates3 = dataGen.generateInserts(commitTime3, 200);
Set<String> keys3 = HoodieClientTestUtils.getRecordKeys(insertsAndUpdates3);
List<HoodieRecord> updates3 = dataGen.generateUpdates(commitTime3, inserts2);
insertsAndUpdates3.addAll(updates3);
Expand Down Expand Up @@ -999,15 +999,15 @@ public void testSmallInsertHandlingForUpserts() throws Exception {
public void testSmallInsertHandlingForInserts() throws Exception {

final String TEST_PARTITION_PATH = "2016/09/26";
final int INSERT_SPLIT_LIMIT = 10;
final int INSERT_SPLIT_LIMIT = 100;
// setup the small file handling params
HoodieWriteConfig config = getSmallInsertWriteConfig(INSERT_SPLIT_LIMIT); // hold upto 20 records max
HoodieWriteConfig config = getSmallInsertWriteConfig(INSERT_SPLIT_LIMIT); // hold upto 200 records max
dataGen = new HoodieTestDataGenerator(new String[] {TEST_PARTITION_PATH});
HoodieWriteClient client = new HoodieWriteClient(jsc, config);

// Inserts => will write file1
String commitTime1 = "001";
List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, INSERT_SPLIT_LIMIT); // this writes ~500kb
List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, INSERT_SPLIT_LIMIT); // this writes ~5000kb
Set<String> keys1 = HoodieClientTestUtils.getRecordKeys(inserts1);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
List<WriteStatus> statuses= client.insert(insertRecordsRDD1, commitTime1).collect();
Expand All @@ -1017,13 +1017,13 @@ public void testSmallInsertHandlingForInserts() throws Exception {

assertEquals("Just 1 file needs to be added.", 1, statuses.size());
String file1 = statuses.get(0).getFileId();
assertEquals("file should contain 10 records",
assertEquals("file should contain 100 records",
ParquetUtils.readRowKeysFromParquet(new Path(basePath, TEST_PARTITION_PATH + "/" + FSUtils.makeDataFileName(commitTime1, 0, file1))).size(),
10);
100);

// Second, set of Inserts should just expand file1
String commitTime2 = "002";
List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime2, 4);
List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime2, 40);
Set<String> keys2 = HoodieClientTestUtils.getRecordKeys(inserts2);
JavaRDD<HoodieRecord> insertRecordsRDD2 = jsc.parallelize(inserts2, 1);
statuses = client.insert(insertRecordsRDD2, commitTime2).collect();
Expand All @@ -1033,7 +1033,7 @@ public void testSmallInsertHandlingForInserts() throws Exception {
assertEquals("Existing file should be expanded", file1, statuses.get(0).getFileId());
assertEquals("Existing file should be expanded", commitTime1, statuses.get(0).getStat().getPrevCommit());
Path newFile = new Path(basePath, TEST_PARTITION_PATH + "/" + FSUtils.makeDataFileName(commitTime2, 0, file1));
assertEquals("file should contain 14 records", ParquetUtils.readRowKeysFromParquet(newFile).size(), 14);
assertEquals("file should contain 140 records", ParquetUtils.readRowKeysFromParquet(newFile).size(), 140);

List<GenericRecord> records = ParquetUtils.readAvroRecords(newFile);
for (GenericRecord record: records) {
Expand All @@ -1045,7 +1045,7 @@ public void testSmallInsertHandlingForInserts() throws Exception {

// Lots of inserts such that file1 is updated and expanded, a new file2 is created.
String commitTime3 = "003";
List<HoodieRecord> insert3 = dataGen.generateInserts(commitTime3, 20);
List<HoodieRecord> insert3 = dataGen.generateInserts(commitTime3, 200);
JavaRDD<HoodieRecord> insertRecordsRDD3 = jsc.parallelize(insert3, 1);
statuses = client.insert(insertRecordsRDD3, commitTime3).collect();
assertNoWriteErrors(statuses);
Expand Down

0 comments on commit 5aaf353

Please sign in to comment.