Skip to content

Commit

Permalink
[CARBONDATA-1838] Refactor SortStepRowUtil to make it more readable
Browse files Browse the repository at this point in the history
Refactor and optimize SortRowStepUtil to make it efficient and more readable.

Firstly we get all the indices for the 3 groups: dictionary columns, non dictionary dimension columns and measures;
Then for each group, just iterate the source row and copy data to each group without any if-else branch.

This closes apache#1594
  • Loading branch information
xuchuanyin authored and anubhav100 committed Dec 7, 2017
1 parent 4b69eae commit 130055c
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ object DataLoadProcessorStepOnSpark {
val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString)
val conf = DataLoadProcessBuilder.createConfiguration(model)
val sortParameters = SortParameters.createSortParameters(conf)

val sortStepRowUtil = new SortStepRowUtil(sortParameters)
TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) =>
wrapException(e, model)
}
Expand All @@ -138,7 +138,7 @@ object DataLoadProcessorStepOnSpark {

override def next(): CarbonRow = {
val row =
new CarbonRow(SortStepRowUtil.convertRow(rows.next().getData, sortParameters))
new CarbonRow(sortStepRowUtil.convertRow(rows.next().getData))
rowCounter.add(1)
row
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,53 +21,82 @@
import org.apache.carbondata.processing.sort.sortdata.SortParameters;

public class SortStepRowUtil {
public static Object[] convertRow(Object[] data, SortParameters parameters) {
int measureCount = parameters.getMeasureColCount();
int dimensionCount = parameters.getDimColCount();
int complexDimensionCount = parameters.getComplexDimColCount();
int noDictionaryCount = parameters.getNoDictionaryCount();
boolean[] isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
private int measureCount;
private int dimensionCount;
private int complexDimensionCount;
private int noDictionaryCount;
private int[] dictDimIdx;
private int[] nonDictIdx;
private int[] measureIdx;

// create new row of size 3 (1 for dims , 1 for high card , 1 for measures)
public SortStepRowUtil(SortParameters parameters) {
this.measureCount = parameters.getMeasureColCount();
this.dimensionCount = parameters.getDimColCount();
this.complexDimensionCount = parameters.getComplexDimColCount();
this.noDictionaryCount = parameters.getNoDictionaryCount();
boolean[] isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();

Object[] holder = new Object[3];
int index = 0;
int nonDicIndex = 0;
int allCount = 0;
int[] dim = new int[dimensionCount];
byte[][] nonDicArray = new byte[noDictionaryCount + complexDimensionCount][];
Object[] measures = new Object[measureCount];
try {
// read dimension values
for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) {
if (isNoDictionaryDimensionColumn[i]) {
nonDicArray[nonDicIndex++] = (byte[]) data[i];
} else {
dim[index++] = (int) data[allCount];
}
allCount++;
}

for (int i = 0; i < complexDimensionCount; i++) {
nonDicArray[nonDicIndex++] = (byte[]) data[allCount];
allCount++;
// be careful that the default value is 0
this.dictDimIdx = new int[dimensionCount - noDictionaryCount];
this.nonDictIdx = new int[noDictionaryCount + complexDimensionCount];
this.measureIdx = new int[measureCount];

// indices for dict dim columns
for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) {
if (isNoDictionaryDimensionColumn[i]) {
nonDictIdx[nonDicIndex++] = i;
} else {
dictDimIdx[index++] = allCount;
}
allCount++;
}

index = 0;
// indices for non dict dim/complex columns
for (int i = 0; i < complexDimensionCount; i++) {
nonDictIdx[nonDicIndex++] = allCount;
allCount++;
}

// read measure values
for (int i = 0; i < measureCount; i++) {
measures[index++] = data[allCount];
allCount++;
// indices for measure columns
for (int i = 0; i < measureCount; i++) {
measureIdx[i] = allCount;
allCount++;
}
}

public Object[] convertRow(Object[] data) {
// create new row of size 3 (1 for dims , 1 for high card , 1 for measures)
Object[] holder = new Object[3];
try {

int[] dictDims = new int[dimensionCount - noDictionaryCount];
byte[][] nonDictArray = new byte[noDictionaryCount + complexDimensionCount][];
Object[] measures = new Object[measureCount];

// write dict dim data
for (int idx = 0; idx < dictDimIdx.length; idx++) {
dictDims[idx] = (int) data[dictDimIdx[idx]];
}

NonDictionaryUtil.prepareOutObj(holder, dim, nonDicArray, measures);
// write non dict dim data
for (int idx = 0; idx < nonDictIdx.length; idx++) {
nonDictArray[idx] = (byte[]) data[nonDictIdx[idx]];
}

// write measure data
for (int idx = 0; idx < measureIdx.length; idx++) {
measures[idx] = data[measureIdx[idx]];
}
NonDictionaryUtil.prepareOutObj(holder, dictDims, nonDictArray, measures);

// increment number if record read
} catch (Exception e) {
throw new RuntimeException("Problem while converting row ", e);
}

//return out row
return holder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
private AbstractQueue<SortTempChunkHolder> recordHolderHeapLocal;

private SortParameters parameters;

private SortStepRowUtil sortStepRowUtil;
/**
* tempFileLocation
*/
Expand All @@ -68,6 +68,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
public UnsafeSingleThreadFinalSortFilesMerger(SortParameters parameters,
String[] tempFileLocation) {
this.parameters = parameters;
this.sortStepRowUtil = new SortStepRowUtil(parameters);
this.tempFileLocation = tempFileLocation;
this.tableName = parameters.getTableName();
}
Expand Down Expand Up @@ -184,7 +185,7 @@ private void createRecordHolderQueue() {
* @return sorted row
*/
public Object[] next() {
return SortStepRowUtil.convertRow(getSortedRecordFromFile(), parameters);
return sortStepRowUtil.convertRow(getSortedRecordFromFile());
}

/**
Expand Down

0 comments on commit 130055c

Please sign in to comment.