Skip to content

Commit

Permalink
Merge 25942c5 into aa07020
Browse files Browse the repository at this point in the history
  • Loading branch information
akashrn5 committed Jan 28, 2019
2 parents aa07020 + 25942c5 commit 023cb4d
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 17 deletions.
Expand Up @@ -76,6 +76,12 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
spark.sql("drop table if exists testformat")
}

test("test alter") {
// spark.sql("create table t1(name string, age int) using carbon")
// spark.sql("insert into t1 select 'abc',4")
spark.sql("alter table t1 add columns(add string)")
}

test("test add columns for table of using carbon with sql") {
// TODO: should support add columns for carbon dataSource table
// Limit from spark
Expand Down
Expand Up @@ -117,7 +117,7 @@ public Map<String, List<RawResultIterator>> processTableBlocks(Configuration con
resultList.put(CarbonCompactionUtil.SORTED_IDX,
new ArrayList<RawResultIterator>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE));

List<TableBlockInfo> list = null;
List<TableBlockInfo> tableBlockInfos = null;
QueryModelBuilder builder = new QueryModelBuilder(carbonTable)
.projectAllColumns()
.dataConverter(dataTypeConverter)
Expand All @@ -130,7 +130,6 @@ public Map<String, List<RawResultIterator>> processTableBlocks(Configuration con
for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) {
String segmentId = taskMap.getKey();
List<DataFileFooter> listMetadata = dataFileMetadataSegMapping.get(segmentId);
SegmentProperties sourceSegProperties = getSourceSegmentProperties(listMetadata);
// for each segment get taskblock info
TaskBlockInfo taskBlockInfo = taskMap.getValue();
Set<String> taskBlockListMapping = taskBlockInfo.getTaskSet();
Expand All @@ -139,26 +138,69 @@ public Map<String, List<RawResultIterator>> processTableBlocks(Configuration con
CarbonCompactionUtil.isRestructured(listMetadata, carbonTable.getTableLastUpdatedTime())
|| !CarbonCompactionUtil.isSorted(listMetadata.get(0));
for (String task : taskBlockListMapping) {
list = taskBlockInfo.getTableBlockInfoList(task);
Collections.sort(list);
LOGGER.info(
"for task -" + task + "- in segment id -" + segmentId + "- block size is -" + list
.size());
queryModel.setTableBlockInfos(list);
if (sortingRequired) {
resultList.get(CarbonCompactionUtil.UNSORTED_IDX).add(
new RawResultIterator(executeBlockList(list, segmentId, task, configuration),
sourceSegProperties, destinationSegProperties, false));
} else {
resultList.get(CarbonCompactionUtil.SORTED_IDX).add(
new RawResultIterator(executeBlockList(list, segmentId, task, configuration),
sourceSegProperties, destinationSegProperties, false));
tableBlockInfos = taskBlockInfo.getTableBlockInfoList(task);
// during update there may be a chance that the cardinality may change within the segment
// which may lead to failure while converting the row, so get all the blocks present in a
// task and then split into multiple lists of same key length and create separate
// RawResultIterator for each tableBlockInfo of same key length. If all the blocks have same
// keylength, then make a single RawResultIterator for all the blocks
List<List<TableBlockInfo>> listOfTableBlocksBasedOnKeyLength =
getListOfTableBlocksBasedOnKeyLength(tableBlockInfos);
for (List<TableBlockInfo> tableBlockInfoList : listOfTableBlocksBasedOnKeyLength) {
Collections.sort(tableBlockInfoList);
LOGGER.info("for task -" + task + "- in segment id -" + segmentId + "- block size is -"
+ tableBlockInfos.size());
queryModel.setTableBlockInfos(tableBlockInfoList);
if (sortingRequired) {
resultList.get(CarbonCompactionUtil.UNSORTED_IDX).add(
getRawResultIterator(configuration, segmentId, task, tableBlockInfoList));
} else {
resultList.get(CarbonCompactionUtil.SORTED_IDX).add(
getRawResultIterator(configuration, segmentId, task, tableBlockInfoList));
}
}
}
}
return resultList;
}

private RawResultIterator getRawResultIterator(Configuration configuration, String segmentId,
String task, List<TableBlockInfo> tableBlockInfoList)
throws QueryExecutionException, IOException {
return new RawResultIterator(
executeBlockList(tableBlockInfoList, segmentId, task, configuration),
getSourceSegmentProperties(
Collections.singletonList(tableBlockInfoList.get(0).getDataFileFooter())),
destinationSegProperties, false);
}

/**
* This method returns the List of TableBlockInfoList, where each listOfTableBlockInfos will have
* same keySize
* @param tableBlockInfos List of tableBlockInfos present in each task
*/
private List<List<TableBlockInfo>> getListOfTableBlocksBasedOnKeyLength(
List<TableBlockInfo> tableBlockInfos) {
List<List<TableBlockInfo>> listOfTableBlockInfoListOnKeySize = new ArrayList<>();
Map<Integer, List<TableBlockInfo>> keySizeToTableBlockInfoMap = new HashMap<>();
for (TableBlockInfo tableBlock : tableBlockInfos) {
// get the keySizeInBytes for the dataFileFooter
int keySizeInBytes =
getSourceSegmentProperties(Collections.singletonList(tableBlock.getDataFileFooter()))
.getDimensionKeyGenerator().getKeySizeInBytes();
List<TableBlockInfo> tempBlockInfoList = keySizeToTableBlockInfoMap.get(keySizeInBytes);
if (tempBlockInfoList == null) {
tempBlockInfoList = new ArrayList<>();
keySizeToTableBlockInfoMap.put(keySizeInBytes, tempBlockInfoList);
}
tempBlockInfoList.add(tableBlock);
}
for (Map.Entry<Integer, List<TableBlockInfo>> taskMap : keySizeToTableBlockInfoMap.entrySet()) {
listOfTableBlockInfoListOnKeySize.add(taskMap.getValue());
}
return listOfTableBlockInfoListOnKeySize;
}

/**
* This method will create the source segment properties based on restructured block existence
*
Expand Down
Expand Up @@ -144,10 +144,10 @@ public static Map<String, List<DataFileFooter>> createDataFileFooterMappingForSe
if (null == dataFileMatadata.isSorted()) {
dataFileMatadata.setSorted(isSortedTable);
}
blockInfo.setDataFileFooter(dataFileMatadata);
} else {
dataFileMatadata = CarbonUtil.readMetadataFile(blockInfo);
}
blockInfo.setDataFileFooter(dataFileMatadata);
if (null == metadataList) {
// if it is not present
eachSegmentBlocks.add(dataFileMatadata);
Expand Down

0 comments on commit 023cb4d

Please sign in to comment.