Skip to content

Commit

Permalink
removethe validation for same schema in a location and fix drop datam…
Browse files Browse the repository at this point in the history
…ap issue
  • Loading branch information
akashrn5 committed Feb 6, 2019
1 parent 2ecf30c commit 9e87f9f
Show file tree
Hide file tree
Showing 13 changed files with 109 additions and 58 deletions.
Expand Up @@ -322,6 +322,16 @@ public TableDataMap getDataMap(CarbonTable table, DataMapSchema dataMapSchema) {
tableIndices = allDataMaps.get(tableUniqueName);
}
}
// in case of fileformat or sdk, when table is dropped or schema is changed the datamaps are
// not cleared, they need to be cleared by using API, so compare the columns, if not same, clear
// the datamaps on that table
if (allDataMaps.size() > 0 && null != allDataMaps.get(tableUniqueName)
&& allDataMaps.get(tableUniqueName).size() > 0 && !allDataMaps.get(tableUniqueName).get(0)
.getTable().getTableInfo().getFactTable().getListOfColumns()
.equals(table.getTableInfo().getFactTable().getListOfColumns())) {
clearDataMaps(tableUniqueName);
tableIndices = null;
}
TableDataMap dataMap = null;
if (tableIndices != null) {
dataMap = getTableDataMap(dataMapSchema.getDataMapName(), tableIndices);
Expand Down Expand Up @@ -422,7 +432,7 @@ public TableDataMap registerDataMap(CarbonTable table,
blockletDetailsFetcher = getBlockletDetailsFetcher(table);
}
segmentPropertiesFetcher = (SegmentPropertiesFetcher) blockletDetailsFetcher;
TableDataMap dataMap = new TableDataMap(table.getAbsoluteTableIdentifier(),
TableDataMap dataMap = new TableDataMap(table,
dataMapSchema, dataMapFactory, blockletDetailsFetcher, segmentPropertiesFetcher);

tableIndices.add(dataMap);
Expand Down
Expand Up @@ -43,6 +43,7 @@
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
Expand All @@ -64,6 +65,8 @@
@InterfaceAudience.Internal
public final class TableDataMap extends OperationEventListener {

private CarbonTable table;

private AbsoluteTableIdentifier identifier;

private DataMapSchema dataMapSchema;
Expand All @@ -80,10 +83,11 @@ public final class TableDataMap extends OperationEventListener {
/**
* It is called to initialize and load the required table datamap metadata.
*/
TableDataMap(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema,
TableDataMap(CarbonTable table, DataMapSchema dataMapSchema,
DataMapFactory dataMapFactory, BlockletDetailsFetcher blockletDetailsFetcher,
SegmentPropertiesFetcher segmentPropertiesFetcher) {
this.identifier = identifier;
this.identifier = table.getAbsoluteTableIdentifier();
this.table = table;
this.dataMapSchema = dataMapSchema;
this.dataMapFactory = dataMapFactory;
this.blockletDetailsFetcher = blockletDetailsFetcher;
Expand Down Expand Up @@ -115,8 +119,8 @@ public List<ExtendedBlocklet> prune(List<Segment> segments, Expression filterExp
} else {
segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment);
for (DataMap dataMap : dataMaps.get(segment)) {
pruneBlocklets
.addAll(dataMap.prune(filterExp, segmentProperties, partitions, identifier));
pruneBlocklets.addAll(dataMap
.prune(filterExp, segmentProperties, partitions, table));
}
}
blocklets.addAll(addSegmentId(
Expand All @@ -126,6 +130,10 @@ public List<ExtendedBlocklet> prune(List<Segment> segments, Expression filterExp
return blocklets;
}

public CarbonTable getTable() {
return table;
}

/**
* Pass the valid segments and prune the datamap using filter expression
*
Expand Down
Expand Up @@ -24,7 +24,7 @@
import org.apache.carbondata.core.indexstore.Blocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;

Expand Down Expand Up @@ -52,7 +52,7 @@ List<T> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
* blocklets where these filters can exist.
*/
List<T> prune(Expression filter, SegmentProperties segmentProperties,
List<PartitionSpec> partitions, AbsoluteTableIdentifier identifier) throws IOException;
List<PartitionSpec> partitions, CarbonTable carbonTable) throws IOException;

// TODO Move this method to Abstract class
/**
Expand Down
Expand Up @@ -25,7 +25,7 @@
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.indexstore.Blocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.scan.expression.Expression;

/**
Expand All @@ -37,7 +37,7 @@ public abstract class CoarseGrainDataMap implements DataMap<Blocklet> {

@Override
public List<Blocklet> prune(Expression expression, SegmentProperties segmentProperties,
List<PartitionSpec> partitions, AbsoluteTableIdentifier identifier) throws IOException {
List<PartitionSpec> partitions, CarbonTable carbonTable) throws IOException {
throw new UnsupportedOperationException("Filter expression not supported");
}

Expand Down
Expand Up @@ -24,7 +24,7 @@
import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.scan.expression.Expression;

/**
Expand All @@ -36,7 +36,7 @@ public abstract class FineGrainDataMap implements DataMap<FineGrainBlocklet> {

@Override
public List<FineGrainBlocklet> prune(Expression filter, SegmentProperties segmentProperties,
List<PartitionSpec> partitions, AbsoluteTableIdentifier identifier) throws IOException {
List<PartitionSpec> partitions, CarbonTable carbonTable) throws IOException {
throw new UnsupportedOperationException("Filter expression not supported");
}

Expand Down
Expand Up @@ -45,7 +45,6 @@
import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
Expand Down Expand Up @@ -708,17 +707,18 @@ protected boolean useMinMaxForExecutorPruning(FilterResolverIntf filterResolverI

@Override
public List<Blocklet> prune(Expression expression, SegmentProperties properties,
List<PartitionSpec> partitions, AbsoluteTableIdentifier identifier) throws IOException {
List<PartitionSpec> partitions, CarbonTable carbonTable) throws IOException {
FilterResolverIntf filterResolverIntf = null;
if (expression != null) {
QueryModel.FilterProcessVO processVO =
new QueryModel.FilterProcessVO(properties.getDimensions(), properties.getMeasures(),
new ArrayList<CarbonDimension>());
QueryModel.processFilterExpression(processVO, expression, null, null);
QueryModel.processFilterExpression(processVO, expression, null, null, carbonTable);
// Optimize Filter Expression and fit RANGE filters is conditions apply.
FilterOptimizer rangeFilterOptimizer = new RangeFilterOptmizer(expression);
rangeFilterOptimizer.optimizeFilter();
filterResolverIntf = CarbonTable.resolveFilter(expression, identifier);
filterResolverIntf =
CarbonTable.resolveFilter(expression, carbonTable.getAbsoluteTableIdentifier());
}
return prune(filterResolverIntf, properties, partitions);
}
Expand Down
Expand Up @@ -1058,7 +1058,7 @@ public void processFilterExpression(Expression filterExpression,
new QueryModel.FilterProcessVO(getDimensionByTableName(getTableName()),
getMeasureByTableName(getTableName()), getImplicitDimensionByTableName(getTableName()));
QueryModel.processFilterExpression(processVO, filterExpression, isFilterDimensions,
isFilterMeasures);
isFilterMeasures, this);

if (null != filterExpression) {
// Optimize Filter Expression and fit RANGE filters is conditions apply.
Expand Down
Expand Up @@ -273,12 +273,15 @@ private void updateColumns(QueryModel queryModel, List<ColumnSchema> columnsInTa
if (queryModel.getTable().isTransactionalTable()) {
return;
}
// First validate the schema of the carbondata file
boolean sameColumnSchemaList = BlockletDataMapUtil.isSameColumnSchemaList(columnsInTable,
queryModel.getTable().getTableInfo().getFactTable().getListOfColumns());
// First validate the schema of the carbondata file if the same column name have different
// datatype
boolean sameColumnSchemaList = BlockletDataMapUtil
.isSameColumnAndDifferentDatatypeInSchema(columnsInTable,
queryModel.getTable().getTableInfo().getFactTable().getListOfColumns());
if (!sameColumnSchemaList) {
LOGGER.error("Schema of " + filePath + " doesn't match with the table's schema");
throw new IOException("All the files doesn't have same schema. "
LOGGER.error("Datatype of the common columns present in " + filePath + " doesn't match with"
+ "the column's datatype in table schema");
throw new IOException("All common columns present in the files doesn't have same datatype. "
+ "Unsupported operation on nonTransactional table. Check logs.");
}
List<ProjectionDimension> dimensions = queryModel.getProjectionDimensions();
Expand Down Expand Up @@ -331,10 +334,11 @@ private void updateColumns(QueryModel queryModel, List<ColumnSchema> columnsInTa
private void createFilterExpression(QueryModel queryModel, SegmentProperties properties) {
Expression expression = queryModel.getFilterExpression();
if (expression != null) {
QueryModel.FilterProcessVO processVO =
new QueryModel.FilterProcessVO(properties.getDimensions(), properties.getMeasures(),
new ArrayList<CarbonDimension>());
QueryModel.processFilterExpression(processVO, expression, null, null);
QueryModel.FilterProcessVO processVO = new QueryModel.FilterProcessVO(
properties.getDimensions(),
properties.getMeasures(),
new ArrayList<CarbonDimension>());
QueryModel.processFilterExpression(processVO, expression, null, null, queryModel.getTable());
// Optimize Filter Expression and fit RANGE filters is conditions apply.
FilterOptimizer rangeFilterOptimizer = new RangeFilterOptmizer(expression);
rangeFilterOptimizer.optimizeFilter();
Expand Down
Expand Up @@ -145,29 +145,33 @@ public static QueryModel newInstance(CarbonTable carbonTable) {
}

public static void processFilterExpression(FilterProcessVO processVO, Expression filterExpression,
final boolean[] isFilterDimensions, final boolean[] isFilterMeasures) {
final boolean[] isFilterDimensions, final boolean[] isFilterMeasures,
CarbonTable carbonTable) {
if (null != filterExpression) {
if (null != filterExpression.getChildren() && filterExpression.getChildren().size() == 0) {
if (filterExpression instanceof ConditionalExpression) {
List<ColumnExpression> listOfCol =
((ConditionalExpression) filterExpression).getColumnList();
for (ColumnExpression expression : listOfCol) {
setDimAndMsrColumnNode(processVO, expression, isFilterDimensions, isFilterMeasures);
setDimAndMsrColumnNode(processVO, expression, isFilterDimensions, isFilterMeasures,
carbonTable);
}
}
}
for (Expression expression : filterExpression.getChildren()) {
if (expression instanceof ColumnExpression) {
setDimAndMsrColumnNode(processVO, (ColumnExpression) expression, isFilterDimensions,
isFilterMeasures);
isFilterMeasures, carbonTable);
} else if (expression instanceof UnknownExpression) {
UnknownExpression exp = ((UnknownExpression) expression);
List<ColumnExpression> listOfColExpression = exp.getAllColumnList();
for (ColumnExpression col : listOfColExpression) {
setDimAndMsrColumnNode(processVO, col, isFilterDimensions, isFilterMeasures);
setDimAndMsrColumnNode(processVO, col, isFilterDimensions, isFilterMeasures,
carbonTable);
}
} else {
processFilterExpression(processVO, expression, isFilterDimensions, isFilterMeasures);
processFilterExpression(processVO, expression, isFilterDimensions, isFilterMeasures,
carbonTable);
}
}
}
Expand All @@ -184,7 +188,7 @@ private static CarbonMeasure getCarbonMetadataMeasure(String name, List<CarbonMe
}

private static void setDimAndMsrColumnNode(FilterProcessVO processVO, ColumnExpression col,
boolean[] isFilterDimensions, boolean[] isFilterMeasures) {
boolean[] isFilterDimensions, boolean[] isFilterMeasures, CarbonTable table) {
CarbonDimension dim;
CarbonMeasure msr;
String columnName;
Expand All @@ -209,13 +213,25 @@ private static void setDimAndMsrColumnNode(FilterProcessVO processVO, ColumnExpr
if (null != isFilterMeasures) {
isFilterMeasures[msr.getOrdinal()] = true;
}
} else {
} else if (null != CarbonUtil.findDimension(processVO.getImplicitDimensions(), columnName)) {
// check if this is an implicit dimension
dim = CarbonUtil
.findDimension(processVO.getImplicitDimensions(), columnName);
dim = CarbonUtil.findDimension(processVO.getImplicitDimensions(), columnName);
col.setCarbonColumn(dim);
col.setDimension(dim);
col.setDimension(true);
} else {
// in case of sdk or fileformat, there can be chance that each carbondata file may have
// different schema, so every segment properties will have dims and measures based on
// corresponding segment. So the filter column may not be present in it. so generate the
// dimension and measure from the carbontable
CarbonDimension dimension =
table.getDimensionByName(table.getTableName(), col.getColumnName());
CarbonMeasure measure = table.getMeasureByName(table.getTableName(), col.getColumnName());
col.setDimension(dimension);
col.setMeasure(measure);
col.setCarbonColumn(dimension == null ? measure : dimension);
col.setDimension(null != dimension);
col.setMeasure(null != measure);
}
}

Expand Down
Expand Up @@ -109,10 +109,10 @@ public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
isTransactionalTable);
for (DataFileFooter footer : indexInfo) {
if ((!isTransactionalTable) && (tableColumnList.size() != 0) &&
!isSameColumnSchemaList(footer.getColumnInTable(), tableColumnList)) {
LOG.error("Schema of " + identifier.getIndexFileName()
+ " doesn't match with the table's schema");
throw new IOException("All the files doesn't have same schema. "
!isSameColumnAndDifferentDatatypeInSchema(footer.getColumnInTable(), tableColumnList)) {
LOG.error("Datatype of the common columns present in " + identifier.getIndexFileName()
+ " doesn't match with the column's datatype in table schema");
throw new IOException("All common columns present in the files doesn't have same datatype. "
+ "Unsupported operation on nonTransactional table. Check logs.");
}
if ((tableColumnList != null) && (tableColumnList.size() == 0)) {
Expand Down Expand Up @@ -252,16 +252,23 @@ public static boolean isCacheLevelBlock(CarbonTable carbonTable) {
return true;
}

public static boolean isSameColumnSchemaList(List<ColumnSchema> indexFileColumnList,
List<ColumnSchema> tableColumnList) {
if (indexFileColumnList.size() != tableColumnList.size()) {
LOG.error("Index file's column size is " + indexFileColumnList.size()
+ " but table's column size is " + tableColumnList.size());
return false;
}
/**
* This method validates whether the schema present in index and table contains the same column
* name but with different dataType.
*/
public static boolean isSameColumnAndDifferentDatatypeInSchema(
List<ColumnSchema> indexFileColumnList, List<ColumnSchema> tableColumnList) {
for (int i = 0; i < tableColumnList.size(); i++) {
if (!tableColumnList.contains(indexFileColumnList.get(i))) {
return false;
for (int j = 0; j < indexFileColumnList.size(); j++) {
if (indexFileColumnList.get(j).getColumnName()
.equalsIgnoreCase(tableColumnList.get(i).getColumnName()) && !indexFileColumnList.get(j)
.getDataType().getName()
.equalsIgnoreCase(tableColumnList.get(i).getDataType().getName())) {
LOG.error("Datatype of the Column " + indexFileColumnList.get(j).getColumnName()
+ " present in index file, is not same as datatype of the column with same name"
+ "present in table");
return false;
}
}
}
return true;
Expand Down
18 changes: 14 additions & 4 deletions core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
Expand Up @@ -2188,9 +2188,14 @@ public static String getFilePathExternalFilePath(String path, Configuration conf
CarbonFile segment = FileFactory.getCarbonFile(path, configuration);

CarbonFile[] dataFiles = segment.listFiles();
CarbonFile latestCarbonFile = null;
long latestDatafileTimestamp = 0L;
// get the latest carbondatafile to get the latest schema in the folder
for (CarbonFile dataFile : dataFiles) {
if (dataFile.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
return dataFile.getAbsolutePath();
if (dataFile.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)
&& dataFile.getLastModifiedTime() > latestDatafileTimestamp) {
latestCarbonFile = dataFile;
latestDatafileTimestamp = dataFile.getLastModifiedTime();
} else if (dataFile.isDirectory()) {
// if the list has directories that doesn't contain data files,
// continue checking other files/directories in the list.
Expand All @@ -2201,8 +2206,13 @@ public static String getFilePathExternalFilePath(String path, Configuration conf
}
}
}
//returning null only if the path doesn't have data files.
return null;

if (latestCarbonFile != null) {
return latestCarbonFile.getAbsolutePath();
} else {
//returning null only if the path doesn't have data files.
return null;
}
}

/**
Expand Down
Expand Up @@ -272,7 +272,7 @@ class PrestoTestNonTransactionalTableFiles extends FunSuiteLike with BeforeAndAf
.executeQuery("select count(*) as RESULT from files ")
}
assert(exception.getMessage()
.contains("All the files doesn't have same schema"))
.contains("All common columns present in the files doesn't have same datatype"))
cleanTestData()
}

Expand Down
Expand Up @@ -1225,12 +1225,8 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
assert(spark.sql("select * from sdkout").collect().length == 5)
buildTestDataOtherDataType(5, null, warehouse1+"/sdk1", 2)
spark.sql("refresh table sdkout")
intercept[Exception] {
spark.sql("select * from sdkout").show()
}
intercept[Exception] {
spark.sql("select * from sdkout where salary=100").show()
}
assert(spark.sql("select * from sdkout").count() == 10)
assert(spark.sql("select * from sdkout where salary=100").count() == 1)
FileFactory.deleteAllFilesOfDir(new File(warehouse1+"/sdk1"))
}

Expand Down

0 comments on commit 9e87f9f

Please sign in to comment.