Skip to content

Commit

Permalink
KYLIN-4417 Use hash rather than random to avoid potential issue in Co…
Browse files Browse the repository at this point in the history
…nvergeCuboidDataPartitioner
  • Loading branch information
kyotoYaho authored and nichunen committed May 18, 2020
1 parent 8d2a53f commit beb976a
Showing 1 changed file with 12 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.kylin.engine.mr.steps;

import java.util.Random;

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
Expand All @@ -28,10 +26,12 @@
import org.apache.kylin.engine.mr.common.BatchConstants;

import com.google.common.base.Preconditions;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;

public class ConvergeCuboidDataPartitioner extends Partitioner<Text, Text> implements Configurable {

private Random rand = new Random();
private static final HashFunction hashFunc = Hashing.murmur3_128();

private Configuration conf;
private boolean enableSharding;
Expand All @@ -40,12 +40,14 @@ public class ConvergeCuboidDataPartitioner extends Partitioner<Text, Text> imple

@Override
public int getPartition(Text key, Text value, int numReduceTasks) {
long hash = hashFunc.hashBytes(key.getBytes()).asLong();

long cuboidID = RowKeySplitter.getCuboidId(key.getBytes(), enableSharding);
// the first numReduceBaseCuboid are for base cuboid
if (cuboidID == baseCuboidID) {
return rand.nextInt(numReduceBaseCuboid);
return getRemainder(hash, numReduceBaseCuboid);
} else {
return numReduceBaseCuboid + rand.nextInt(numReduceTasks - numReduceBaseCuboid);
return numReduceBaseCuboid + getRemainder(hash, numReduceTasks - numReduceBaseCuboid);
}
}

Expand All @@ -64,4 +66,9 @@ public void setConf(Configuration conf) {
public Configuration getConf() {
return conf;
}

private static int getRemainder(long val, int base) {
int rem = (int) val % base;
return rem >= 0 ? rem : rem + base;
}
}

0 comments on commit beb976a

Please sign in to comment.