Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removing randomization from UpsertPartitioner #253

Merged
merged 1 commit into from Sep 8, 2017

Conversation

ovj
Copy link
Contributor

@ovj ovj commented Aug 27, 2017

Fixing UpsertPartitioner to ensure that input records are deterministically assigned to output partitions

@vinothchandar FYI I have separated this change from custom partitioner.

@@ -379,7 +359,7 @@ public int getPartition(Object key) {
List<InsertBucket> targetBuckets = partitionPathToInsertBuckets.get(keyLocation._1().getPartitionPath());
// pick the target bucket to use based on the weights.
double totalWeight = 0.0;
double r = rand.nextDouble();
double r = Math.floorMod(keyLocation._1().getRecordKey().hashCode(), MOD_BASE) / (MOD_BASE-1.0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this will give us a double uniformly distributed in range 0.0-1.0 .. Thoughts?

scala> Math.floorMod("dd".hashCode, 1015073) / (1015073-1.0);
res0: Double = 0.0031524857350020493

scala> Math.floorMod("cc".hashCode, 1015073) / (1015073-1.0);
res1: Double = 0.0031209608776520286

scala> Math.floorMod("cdjf".hashCode, 1015073) / (1015073-1.0);
res2: Double = 0.0035248731124491663

scala> Math.floorMod("jdha;fhfjs".hashCode, 1015073) / (1015073-1.0);
res3: Double = 0.6503627328898837

scala>

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to be a function of the string length?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hashCode returns an integer so MOD_BASE has to be an integer for fair distribution (can't be total_number_of_records as it is long). Yeah hashCode value is somewhat related to length of the string. Having lower value for MOD_BASE (3 or 4 digit prime) will reduce this problem. Also having all key lengths of 1 or 2 character long is very unlikely right? number of combinations which you get are very low. Should we replace 7 digit prime with 3/4/5 digit prime? what do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can assume .hashCode is reasonably uniform I guess or use MurMurHash from guava.. That seems more standard instead of dealing with prime numbers and so forth.. wdyt?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am just concerned that if its not uniformly distributed, then we will generate a skew by sending lots of records to few ranges in 0.0-1.0 which would in turn be mapped to only a few buckets.. In this case, if .hashCode() will map to ranges depending on length, then it seems like a misfit.. Doing a murmurhash (or MD5) seems like a better option..

Then a simple (1.0 * (hashValue % totalInsertsForPartition))/totalInsertsForPartition will yield a uniform value between 0.0-1.0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Md5 hash makes sense. Updated it to use this. hash values also look fairly distributed.

Here is the output for hash value.

  • "key" : "hash_value" : "time in nano seconds"
  • a : 6289574019528802036 : 16492818
  • aa : 9205979493955281985 : 33314
  • aaa : 5232998391707188295 : 27652
  • aaaa : 3170461272618321804 : 29583
  • aaaaa : 4125589970280795993 : 38808
  • aaaaaa : 3221507088467735029 : 36931
  • aaaaaaa : 5198010149255477597 : 30166

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need to use the total number of inserts as the MOD_BASE.. ?

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a suggestion around making this uniformly distributed

@ovj ovj force-pushed the upsert_partitioner_fix branch 3 times, most recently from 45145a8 to 25cbf56 Compare September 1, 2017 22:51
@@ -379,7 +359,9 @@ public int getPartition(Object key) {
List<InsertBucket> targetBuckets = partitionPathToInsertBuckets.get(keyLocation._1().getPartitionPath());
// pick the target bucket to use based on the weights.
double totalWeight = 0.0;
double r = rand.nextDouble();
final long totalInserts = Math.max(1, globalStat.getNumInserts());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be getWorkloadStat(partitionPath).getNumInserts() and not the global one right.. guess it does not matter?

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes look good. have you been able to test on a real dataset for any regressions on skews. Just calling out, since this is a pretty critical change.

@vinothchandar
Copy link
Member

@ovj Could you take a look at failing tests?

@ovj
Copy link
Contributor Author

ovj commented Sep 5, 2017

Hi @vinothchandar Those tests are flaky. If I try them locally they pass. "com.uber.hoodie.TestMergeOnReadTable".
Caused by: com.uber.hoodie.exception.HoodieInsertException: Failed to initialize HoodieStorageWriter for path /tmp/junit6603311014218813808/2016/03/15/933ada6f-cef9-48bc-be61-8eddccc48658_0_001.parquet at com.uber.hoodie.io.HoodieCreateHandle.<init>(HoodieCreateHandle.java:67) at com.uber.hoodie.func.LazyInsertIterable.computeNext(LazyInsertIterable.java:82) at com.uber.hoodie.func.LazyInsertIterable.computeNext(LazyInsertIterable.java:39) at com.uber.hoodie.func.LazyIterableIterator.next(LazyIterableIterator.java:118) ... 19 more Caused by: java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:857) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1740) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1682) at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:405) at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:401) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:401) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:344) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:920) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:901) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798) at com.uber.hoodie.io.storage.HoodieWrapperFileSystem.create(HoodieWrapperFileSystem.java:111) at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:223) at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:266) at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:217) at com.uber.hoodie.io.storage.HoodieParquetWriter.<init>(HoodieParquetWriter.java:66) at com.uber.hoodie.io.storage.HoodieStorageWriterFactory.newParquetStorageWriter(HoodieStorageWriterFactory.java:55) at com.uber.hoodie.io.storage.HoodieStorageWriterFactory.getStorageWriter(HoodieStorageWriterFactory.java:40) at com.uber.hoodie.io.HoodieCreateHandle.<init>(HoodieCreateHandle.java:65) ... 22 more

@vinothchandar
Copy link
Member

seems related to #243 .. restarted the build

@vinothchandar
Copy link
Member

@ovj like to merge this in + the other PR on plugging in partitioner, to cut a new release.. Can you update the PRs with the follow ups we discussed offline.. thnx!

@ovj
Copy link
Contributor Author

ovj commented Sep 7, 2017

@vinothchandar Everything looks good with testing results. I tested this fix to make sure it handles small files growth correctly and there is no noticeable difference than existing one. Also for new inserts data is getting distributed fairly evenly.

@vinothchandar vinothchandar merged commit ec40d04 into apache:master Sep 8, 2017
@vinothchandar
Copy link
Member

@ovj thanks for the valuable contribution and testing through this..

Merged.. (I am hoping the master CI passes now, given I pulled in @n3nash 's PR also, lets see. Might ping you to fix it right away, if there are indeed regressions by any chance)

@ovj
Copy link
Contributor Author

ovj commented Sep 8, 2017

Thanks @vinothchandar . Sure let me know.

vinishjail97 pushed a commit to vinishjail97/hudi that referenced this pull request Dec 15, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants