Skip to content

Commit

Permalink
KYLIN-4962 Fix NPE in ShrunkDict step
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengshengjun authored and hit-lacus committed Apr 13, 2021
1 parent 4921031 commit 69368b0
Showing 1 changed file with 28 additions and 30 deletions.
Expand Up @@ -199,7 +199,7 @@ protected void execute(OptionsHelper optionsHelper) throws Exception {
if (cubeDesc.isShrunkenDictFromGlobalEnabled() && shrunkInputPath != null) {
recordInputRDD = SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable).cache();
recordInputRDD
.foreachPartition(new CreateShrunkenDictionary(cubeName, cubeDesc, cubeSegment, envConfig, sConf));
.foreachPartition(new CreateShrunkenDictionary(cubeName, segmentId, metaUrl, sConf));
encodedBaseRDD = recordInputRDD.mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
} else {
encodedBaseRDD = SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable)
Expand Down Expand Up @@ -518,12 +518,11 @@ public Tuple2<ByteArray, Long> call(Tuple2<ByteArray, Long> longTuple2, Tuple2<B

public static class CreateShrunkenDictionary implements VoidFunction<Iterator<String[]>> {
private String cubeName;
private CubeDesc cubeDesc;
private CubeSegment cubeSeg;

private KylinConfig kylinConfig;
private String segmentId;
private String metaUrl;
private SerializableConfiguration scof;
private CubeJoinedFlatTableEnrich intermediateTableDesc;

private CubeSegment cubeSeg;

private List<TblColRef> globalColumns;
private int[] globalColumnIndex;
Expand All @@ -533,33 +532,34 @@ public static class CreateShrunkenDictionary implements VoidFunction<Iterator<St

private String splitKey;

public CreateShrunkenDictionary(String cubeName, CubeDesc cubeDesc, CubeSegment cubeSegment, KylinConfig kylinConfig,
SerializableConfiguration serializableConfiguration) {
public CreateShrunkenDictionary(String cubeName, String segmentId, String metaUrl, SerializableConfiguration conf) {
this.cubeName = cubeName;
this.cubeDesc = cubeDesc;
this.cubeSeg = cubeSegment;
this.kylinConfig = kylinConfig;
this.scof = serializableConfiguration;
this.intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg),
cubeDesc);
this.scof = conf;
this.segmentId = segmentId;
this.metaUrl = metaUrl;
}

public void init() {
try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
.setAndUnsetThreadLocalConfig(kylinConfig)) {
globalColumns = cubeDesc.getAllGlobalDictColumnsNeedBuilt();
globalColumnIndex = new int[globalColumns.size()];
globalColumnValues = Lists.newArrayListWithExpectedSize(globalColumns.size());

splitKey = String.valueOf(TaskContext.getPartitionId());

for (int i = 0; i < globalColumns.size(); i++) {
TblColRef colRef = globalColumns.get(i);
int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef);
globalColumnIndex[i] = columnIndexOnFlatTbl;
globalColumnValues.add(Sets.<String>newHashSet());
}
KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(scof, metaUrl);
CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
CubeDesc cubeDesc = cubeInstance.getDescriptor();
cubeSeg = cubeInstance.getSegmentById(segmentId);
CubeJoinedFlatTableEnrich intermediateTableDesc = new CubeJoinedFlatTableEnrich(
EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc);

globalColumns = cubeDesc.getAllGlobalDictColumnsNeedBuilt();
globalColumnIndex = new int[globalColumns.size()];
globalColumnValues = Lists.newArrayListWithExpectedSize(globalColumns.size());

splitKey = String.valueOf(TaskContext.getPartitionId());

for (int i = 0; i < globalColumns.size(); i++) {
TblColRef colRef = globalColumns.get(i);
int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef);
globalColumnIndex[i] = columnIndexOnFlatTbl;
globalColumnValues.add(Sets.<String>newHashSet());
}

}

@Override
Expand All @@ -573,9 +573,7 @@ public void call(Iterator<String[]> iter) throws Exception {
}
}
}
int count = 0;
while (iter.hasNext()) {
count++;
String[] rowArray = iter.next();
for (int i = 0; i < globalColumnIndex.length; i++) {
String fieldValue = rowArray[globalColumnIndex[i]];
Expand Down

0 comments on commit 69368b0

Please sign in to comment.