Skip to content

Commit

Permalink
Support read multiple sdk writer placed at same path
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat committed Apr 20, 2018
1 parent 84ab077 commit ae07151
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class TableSchemaBuilder {

private String tableName;

// set true if it is NonTransactional set in CarbonWriterBuilder
private boolean isUnmanagedTable = false;

public TableSchemaBuilder blockSize(int blockSize) {
if (blockSize <= 0) {
throw new IllegalArgumentException("blockSize should be greater than 0");
Expand All @@ -75,6 +78,11 @@ public TableSchemaBuilder tableName(String tableName) {
return this;
}

public TableSchemaBuilder isUnmanagedTable(boolean isUnmanagedTable) {
this.isUnmanagedTable = isUnmanagedTable;
return this;
}

public TableSchema build() {
TableSchema schema = new TableSchema();
schema.setTableName(tableName);
Expand All @@ -95,7 +103,6 @@ public TableSchema build() {
if (blockletSize > 0) {
property.put(CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB, String.valueOf(blockletSize));
}
// TODO: check other table properties
if (property.size() != 0) {
schema.setTableProperties(property);
}
Expand All @@ -119,7 +126,18 @@ public TableSchemaBuilder addColumn(StructField field, boolean isSortColumn) {
}
newColumn.setSchemaOrdinal(ordinal++);
newColumn.setColumnar(true);
newColumn.setColumnUniqueId(UUID.randomUUID().toString());

// For unmanagedTable, multiple sdk writer output with same column name can be placed in
// single folder for query.
// That time many places in code, columnId check will fail. To avoid that
// keep column ID as same as column name.
// Anyhow Alter table is not supported for unmanaged table.
// SO, this will not have any impact.
if (isUnmanagedTable) {
newColumn.setColumnUniqueId(field.getFieldName());
} else {
newColumn.setColumnUniqueId(UUID.randomUUID().toString());
}
newColumn.setColumnReferenceId(newColumn.getColumnUniqueId());
newColumn.setEncodingList(createEncoding(field.getDataType(), isSortColumn));
if (DataTypes.isDecimal(field.getDataType())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,7 @@ private BlockExecutionInfo getBlockExecutionInfoForBlock(QueryModel queryModel,
List<ProjectionDimension> projectDimensions = RestructureUtil
.createDimensionInfoAndGetCurrentBlockQueryDimension(blockExecutionInfo,
queryModel.getProjectionDimensions(), tableBlockDimensions,
segmentProperties.getComplexDimensions(), queryModel.getProjectionMeasures().size(),
queryModel.getTable().getTableInfo().isUnManagedTable());
segmentProperties.getComplexDimensions(), queryModel.getProjectionMeasures().size());
blockExecutionInfo.setBlockId(
CarbonUtil.getBlockId(queryModel.getAbsoluteTableIdentifier(), filePath, segmentId,
queryModel.getTable().getTableInfo().isUnManagedTable()));
Expand Down Expand Up @@ -518,9 +517,7 @@ private List<ProjectionMeasure> getCurrentBlockQueryMeasures(BlockExecutionInfo
// getting the measure info which will be used while filling up measure data
List<ProjectionMeasure> updatedQueryMeasures = RestructureUtil
.createMeasureInfoAndGetCurrentBlockQueryMeasures(executionInfo,
queryModel.getProjectionMeasures(),
tableBlock.getSegmentProperties().getMeasures(),
queryModel.getTable().getTableInfo().isUnManagedTable());
queryModel.getProjectionMeasures(), tableBlock.getSegmentProperties().getMeasures());
// setting the measure aggregator for all aggregation function selected
// in query
executionInfo.getMeasureInfo().setMeasureDataTypes(queryProperties.measureDataTypes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
Expand Down Expand Up @@ -58,13 +57,12 @@ public class RestructureUtil {
* @param queryDimensions
* @param tableBlockDimensions
* @param tableComplexDimension
* @param isUnManagedTable
* @return list of query dimension which is present in the table block
*/
public static List<ProjectionDimension> createDimensionInfoAndGetCurrentBlockQueryDimension(
BlockExecutionInfo blockExecutionInfo, List<ProjectionDimension> queryDimensions,
List<CarbonDimension> tableBlockDimensions, List<CarbonDimension> tableComplexDimension,
int measureCount, boolean isUnManagedTable) {
int measureCount) {
List<ProjectionDimension> presentDimension =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
boolean[] isDimensionExists = new boolean[queryDimensions.size()];
Expand All @@ -84,7 +82,7 @@ public static List<ProjectionDimension> createDimensionInfoAndGetCurrentBlockQue
queryDimension.getDimension().getDataType();
} else {
for (CarbonDimension tableDimension : tableBlockDimensions) {
if (isColumnMatches(isUnManagedTable, queryDimension.getDimension(), tableDimension)) {
if (tableDimension.getColumnId().equals(queryDimension.getDimension().getColumnId())) {
ProjectionDimension currentBlockDimension = new ProjectionDimension(tableDimension);
tableDimension.getColumnSchema()
.setPrecision(queryDimension.getDimension().getColumnSchema().getPrecision());
Expand All @@ -106,7 +104,7 @@ public static List<ProjectionDimension> createDimensionInfoAndGetCurrentBlockQue
continue;
}
for (CarbonDimension tableDimension : tableComplexDimension) {
if (isColumnMatches(isUnManagedTable, queryDimension.getDimension(), tableDimension)) {
if (tableDimension.getColumnId().equals(queryDimension.getDimension().getColumnId())) {
ProjectionDimension currentBlockDimension = new ProjectionDimension(tableDimension);
// TODO: for complex dimension set scale and precision by traversing
// the child dimensions
Expand Down Expand Up @@ -142,21 +140,6 @@ public static List<ProjectionDimension> createDimensionInfoAndGetCurrentBlockQue
return presentDimension;
}

/**
* Match the columns for managed and unmanaged tables
* @param isUnManagedTable
* @param queryColumn
* @param tableColumn
* @return
*/
private static boolean isColumnMatches(boolean isUnManagedTable,
CarbonColumn queryColumn, CarbonColumn tableColumn) {
// If it is unmanaged table just check the column names, no need to validate column id as
// multiple sdk's output placed in a single folder doesn't have same column ID but can
// have same column name
return (tableColumn.getColumnId().equals(queryColumn.getColumnId()) ||
(isUnManagedTable && tableColumn.getColName().equals(queryColumn.getColName())));
}

/**
* This method will validate and return the default value to be
Expand Down Expand Up @@ -355,12 +338,11 @@ public static Object getMeasureDefaultValueByType(ColumnSchema columnSchema,
* @param blockExecutionInfo
* @param queryMeasures measures present in query
* @param currentBlockMeasures current block measures
* @param isUnManagedTable
* @return measures present in the block
*/
public static List<ProjectionMeasure> createMeasureInfoAndGetCurrentBlockQueryMeasures(
BlockExecutionInfo blockExecutionInfo, List<ProjectionMeasure> queryMeasures,
List<CarbonMeasure> currentBlockMeasures, boolean isUnManagedTable) {
List<CarbonMeasure> currentBlockMeasures) {
MeasureInfo measureInfo = new MeasureInfo();
List<ProjectionMeasure> presentMeasure = new ArrayList<>(queryMeasures.size());
int numberOfMeasureInQuery = queryMeasures.size();
Expand All @@ -373,7 +355,7 @@ public static List<ProjectionMeasure> createMeasureInfoAndGetCurrentBlockQueryMe
// then setting measure exists is true
// otherwise adding a default value of a measure
for (CarbonMeasure carbonMeasure : currentBlockMeasures) {
if (isColumnMatches(isUnManagedTable, carbonMeasure, queryMeasure.getMeasure())) {
if (carbonMeasure.getColumnId().equals(queryMeasure.getMeasure().getColumnId())) {
ProjectionMeasure currentBlockMeasure = new ProjectionMeasure(carbonMeasure);
carbonMeasure.getColumnSchema().setDataType(queryMeasure.getMeasure().getDataType());
carbonMeasure.getColumnSchema().setPrecision(queryMeasure.getMeasure().getPrecision());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public class RestructureUtilTest {
List<ProjectionDimension> result = null;
result = RestructureUtil
.createDimensionInfoAndGetCurrentBlockQueryDimension(blockExecutionInfo, queryDimensions,
tableBlockDimensions, tableComplexDimensions, queryMeasures.size(), false);
tableBlockDimensions, tableComplexDimensions, queryMeasures.size());
List<CarbonDimension> resultDimension = new ArrayList<>(result.size());
for (ProjectionDimension queryDimension : result) {
resultDimension.add(queryDimension.getDimension());
Expand Down Expand Up @@ -127,7 +127,7 @@ public class RestructureUtilTest {
List<ProjectionMeasure> queryMeasures = Arrays.asList(queryMeasure1, queryMeasure2, queryMeasure3);
BlockExecutionInfo blockExecutionInfo = new BlockExecutionInfo();
RestructureUtil.createMeasureInfoAndGetCurrentBlockQueryMeasures(blockExecutionInfo, queryMeasures,
currentBlockMeasures, false);
currentBlockMeasures);
MeasureInfo measureInfo = blockExecutionInfo.getMeasureInfo();
boolean[] measuresExist = { true, true, false };
assertThat(measureInfo.getMeasureExists(), is(equalTo(measuresExist)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,11 @@ class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll {
Row("robot1", 1, 0.5),
Row("robot2", 2, 1.0)))

//test filter query
checkAnswer(sql("select * from sdkOutputTable where age = 1"), Seq(
Row("robot1", 1, 0.5),
Row("robot1", 1, 0.5)))

// test the default sort column behavior in unmanaged table
checkExistence(sql("describe formatted sdkOutputTable"), true,
"SORT_COLUMNS name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,10 @@ private CarbonTable buildCarbonTable() {
tableSchemaBuilder = tableSchemaBuilder.blockletSize(blockletSize);
}

if (isUnManagedTable) {
tableSchemaBuilder = tableSchemaBuilder.isUnmanagedTable(isUnManagedTable);
}

List<String> sortColumnsList = new ArrayList<>();
if (sortColumns == null) {
// If sort columns are not specified, default set all dimensions to sort column.
Expand Down

0 comments on commit ae07151

Please sign in to comment.