Skip to content

Commit

Permalink
Added UUID as task number for fileformat
Browse files Browse the repository at this point in the history
  • Loading branch information
ravipesala committed Jul 13, 2019
1 parent ebe78dc commit b21bc41
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -305,13 +305,13 @@ public static String getSegmentPath(String tablePath, String segmentId) {
* @param factUpdateTimeStamp unique identifier to identify an update
* @return gets data file name only with out path
*/
public static String getCarbonDataFileName(Integer filePartNo, Long taskNo, int bucketNumber,
public static String getCarbonDataFileName(Integer filePartNo, String taskNo, int bucketNumber,
int batchNo, String factUpdateTimeStamp, String segmentNo) {
return DATA_PART_PREFIX + filePartNo + "-" + taskNo + BATCH_PREFIX + batchNo + "-"
+ bucketNumber + "-" + segmentNo + "-" + factUpdateTimeStamp + CARBON_DATA_EXT;
}

public static String getShardName(Long taskNo, int bucketNumber, int batchNo,
public static String getShardName(String taskNo, int bucketNumber, int batchNo,
String factUpdateTimeStamp, String segmentNo) {
return taskNo + BATCH_PREFIX + batchNo + "-" + bucketNumber + "-" + segmentNo + "-"
+ factUpdateTimeStamp;
Expand All @@ -324,14 +324,14 @@ public static String getShardName(Long taskNo, int bucketNumber, int batchNo,
* @param factUpdatedTimeStamp time stamp
* @return filename
*/
public static String getCarbonIndexFileName(long taskNo, int bucketNumber, int batchNo,
public static String getCarbonIndexFileName(String taskNo, int bucketNumber, int batchNo,
String factUpdatedTimeStamp, String segmentNo) {
return getShardName(taskNo, bucketNumber, batchNo, factUpdatedTimeStamp, segmentNo)
+ INDEX_FILE_EXT;
}

public static String getCarbonStreamIndexFileName() {
return getCarbonIndexFileName(0, 0, 0, "0", "0");
return getCarbonIndexFileName("0", 0, 0, "0", "0");
}

public static String getCarbonStreamIndexFilePath(String segmentDir) {
Expand Down Expand Up @@ -516,8 +516,8 @@ public static String getSegmentNo(String carbonDataFileName) {
/**
* Return the taskId part from taskNo(include taskId + batchNo)
*/
public static long getTaskIdFromTaskNo(String taskNo) {
return Long.parseLong(taskNo.split(BATCH_PREFIX)[0]);
public static String getTaskIdFromTaskNo(String taskNo) {
return taskNo.split(BATCH_PREFIX)[0];
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,8 +482,8 @@ protected List<CarbonInputSplit> getDataBlocksOfSegment(JobContext job, CarbonTa
// partition info first and then read data.
// For other normal query should use newest partitionIdList
if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) {
long partitionId = CarbonTablePath.DataFileUtil
.getTaskIdFromTaskNo(CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath()));
long partitionId = Long.parseLong(CarbonTablePath.DataFileUtil
.getTaskIdFromTaskNo(CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath())));
if (oldPartitionIdList != null) {
partitionIndex = oldPartitionIdList.indexOf((int) partitionId);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.carbondata.execution.datasources

import java.net.URI
import java.util.UUID

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -153,7 +154,8 @@ class SparkCarbonFileFormat extends FileFormat
path
}
context.getConfiguration.set("carbon.outputformat.writepath", updatedPath)
context.getConfiguration.set("carbon.outputformat.taskno", System.nanoTime() + "")
context.getConfiguration.set("carbon.outputformat.taskno",
UUID.randomUUID().toString.replace("-", ""))
new CarbonOutputWriter(path, context, dataSchema.fields)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public RowResultProcessor(CarbonTable carbonTable, CarbonLoadModel loadModel,
CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable,
segProp, tableName, tempStoreLocation, carbonStoreLocation);
CarbonDataFileAttributes carbonDataFileAttributes =
new CarbonDataFileAttributes(Long.parseLong(loadModel.getTaskNo()),
new CarbonDataFileAttributes(loadModel.getTaskNo(),
loadModel.getFactTimeStamp());
carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
carbonFactDataHandlerModel.setBucketId(bucketId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class CarbonDataFileAttributes {
/**
* task Id which is unique for each spark task
*/
private long taskId;
private String taskId;

/**
* load start time
Expand All @@ -36,15 +36,24 @@ public class CarbonDataFileAttributes {
* @param taskId
* @param factTimeStamp
*/
public CarbonDataFileAttributes(long taskId, long factTimeStamp) {
public CarbonDataFileAttributes(String taskId, long factTimeStamp) {
this.taskId = taskId;
this.factTimeStamp = factTimeStamp;
}

/**
* @param taskId
* @param factTimeStamp
*/
public CarbonDataFileAttributes(long taskId, long factTimeStamp) {
this.taskId = String.valueOf(taskId);
this.factTimeStamp = factTimeStamp;
}

/**
* @return
*/
public long getTaskId() {
public String getTaskId() {
return taskId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ public static CarbonFactDataHandlerModel createCarbonFactDataHandlerModel(
}
}
CarbonDataFileAttributes carbonDataFileAttributes =
new CarbonDataFileAttributes(Long.parseLong(configuration.getTaskNo()),
new CarbonDataFileAttributes(configuration.getTaskNo(),
(Long) configuration.getDataLoadProperty(DataLoadProcessorConstants.FACT_TIME_STAMP));
String carbonDataDirectoryPath = getCarbonDataFolderLocation(configuration);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,8 +388,8 @@ public void testTaskNo() throws IOException {
Assert.assertNotNull(dataFiles);
Assert.assertTrue(dataFiles.length > 0);
String taskNo = CarbonTablePath.DataFileUtil.getTaskNo(dataFiles[0].getName());
long taskID = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(taskNo);
Assert.assertEquals("Task Id is not matched", taskID, 5);
String taskID = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(taskNo);
Assert.assertEquals("Task Id is not matched", taskID, "5");
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private void initialize(TaskAttemptContext job) throws IOException {

segmentDir = CarbonTablePath.getSegmentPath(
carbonTable.getAbsoluteTableIdentifier().getTablePath(), segmentId);
fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0", segmentId);
fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo + "", 0, 0, "0", segmentId);

// initialize metadata
isNoDictionaryDimensionColumn =
Expand Down

0 comments on commit b21bc41

Please sign in to comment.