Skip to content

Commit

Permalink
[CARBONDATA-2472] Fixed:Refactor NonTransactional table code for Inde…
Browse files Browse the repository at this point in the history
…x file IO performance

[CARBONDATA-2472] Fixed:Refactor NonTransactional table code for Index file IO performance

Problem: now for non-transactional table validating each index file schema for each query. This causes IO operation for each query.

Root cause: Reading all index files for each time query cause lot of IO time.

Solution: Read all index file at first time query and validate and from next time, read only new index file's and validate.

This closes #2299
  • Loading branch information
ajantha-bhat authored and ravipesala committed May 18, 2018
1 parent b88c097 commit 2f79e14
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,19 @@
import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.converter.SchemaConverter;
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.util.BlockletDataMapUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.events.Event;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
Expand All @@ -66,6 +73,7 @@
public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
implements BlockletDetailsFetcher, SegmentPropertiesFetcher, CacheableDataMap {

private static final Log LOG = LogFactory.getLog(BlockletDataMapFactory.class);
private static final String NAME = "clustered.btree.blocklet";

public static final DataMapSchema DATA_MAP_SCHEMA =
Expand Down Expand Up @@ -115,13 +123,57 @@ private Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers
Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
segmentMap.get(segment.getSegmentNo());
if (tableBlockIndexUniqueIdentifiers == null) {
CarbonTable carbonTable = this.getCarbonTable();
if (!carbonTable.getTableInfo().isTransactionalTable()) {
// For NonTransactional table, compare the schema of all index files with inferred schema.
// If there is a mismatch throw exception. As all files must be of same schema.
validateSchemaForNewTranscationalTableFiles(segment, carbonTable);
}
tableBlockIndexUniqueIdentifiers =
BlockletDataMapUtil.getTableBlockUniqueIdentifiers(segment);
segmentMap.put(segment.getSegmentNo(), tableBlockIndexUniqueIdentifiers);
}
return tableBlockIndexUniqueIdentifiers;
}

private void validateSchemaForNewTranscationalTableFiles(Segment segment, CarbonTable carbonTable)
throws IOException {
SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
Map<String, String> indexFiles = segment.getCommittedIndexFile();
for (Map.Entry<String, String> indexFileEntry : indexFiles.entrySet()) {
Path indexFile = new Path(indexFileEntry.getKey());
org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil.inferSchemaFromIndexFile(
indexFile.toString(), carbonTable.getTableName());
TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
tableInfo, identifier.getDatabaseName(),
identifier.getTableName(),
identifier.getTablePath());
List<ColumnSchema> indexFileColumnList =
wrapperTableInfo.getFactTable().getListOfColumns();
List<ColumnSchema> tableColumnList =
carbonTable.getTableInfo().getFactTable().getListOfColumns();
if (!isSameColumnSchemaList(indexFileColumnList, tableColumnList)) {
LOG.error("Schema of " + indexFile.getName()
+ " doesn't match with the table's schema");
throw new IOException("All the files doesn't have same schema. "
+ "Unsupported operation on nonTransactional table. Check logs.");
}
}
}

private boolean isSameColumnSchemaList(List<ColumnSchema> indexFileColumnList,
List<ColumnSchema> tableColumnList) {
if (indexFileColumnList.size() != tableColumnList.size()) {
LOG.error("Index file's column size is " + indexFileColumnList.size()
+ " but table's column size is " + tableColumnList.size());
return false;
}
for (int i = 0; i < tableColumnList.size(); i++) {
return indexFileColumnList.get(i).equalsWithStrictCheck(tableColumnList.get(i));
}
return false;
}

/**
* Get the blocklet detail information based on blockletid, blockid and segmentid. This method is
* exclusively for BlockletDataMapFactory as detail information is only available in this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,15 @@
import org.apache.carbondata.core.metadata.schema.table.WritableUtil;
import org.apache.carbondata.core.preagg.TimeSeriesUDF;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
* Store the information about the column meta data present the table
*/
public class ColumnSchema implements Serializable, Writable {

private static final Log LOG = LogFactory.getLog(ColumnSchema.class);
/**
* serialization version
*/
Expand Down Expand Up @@ -330,13 +334,17 @@ public void setParentColumnTableRelations(
return false;
}
} else if (!columnName.equals(other.columnName)) {
LOG.error("column name is " + columnName
+ " but other column name is " + other.columnName);
return false;
}
if (dataType == null) {
if (other.dataType != null) {
return false;
}
} else if (!dataType.equals(other.dataType)) {
LOG.error("column name is" + columnName + " data type is " + dataType
+ " but other column data type is " + other.dataType);
return false;
}
return true;
Expand All @@ -353,18 +361,40 @@ public boolean equalsWithStrictCheck(Object obj) {
return false;
}
ColumnSchema other = (ColumnSchema) obj;
if (!columnUniqueId.equals(other.columnUniqueId) ||
(isDimensionColumn != other.isDimensionColumn) ||
(scale != other.scale) ||
(precision != other.precision) ||
(isSortColumn != other.isSortColumn)) {
if (!columnUniqueId.equals(other.columnUniqueId)) {
LOG.error("Index file's column " + columnName + " columnUniqueId is " + columnUniqueId
+ " but table's column columnUniqueId is " + other.columnUniqueId);
return false;
}
if (isDimensionColumn != other.isDimensionColumn) {
LOG.error("Index file's column " + columnName + " isDimensionColumn is " + isDimensionColumn
+ " but table's column isDimensionColumn is " + other.isDimensionColumn);
return false;
}
if (scale != other.scale) {
LOG.error("Index file's column " + columnName + " scale is " + scale
+ " but table's column scale is " + other.scale);
return false;
}
if (precision != other.precision) {
LOG.error("Index file's column " + columnName + " precision is " + precision
+ " but table's column precision is " + other.precision);
return false;
}
if (isSortColumn != other.isSortColumn) {
LOG.error("Index file's column " + columnName + " isSortColumn is " + isSortColumn
+ " but table's column isSortColumn is " + other.isSortColumn);
return false;
}
if (encodingList.size() != other.encodingList.size()) {
LOG.error("Index file's column " + columnName + " encoding size is " + encodingList.size()
+ " but table's column encoding size is " + other.encodingList.size());
return false;
}
for (int i = 0; i < encodingList.size(); i++) {
if (encodingList.get(i).compareTo(other.encodingList.get(i)) != 0) {
LOG.error("Index file's column " + columnName + " encoding is " + encodingList.get(i)
+ " but table's column encoding is " + other.encodingList.get(i));
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,11 @@
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.converter.SchemaConverter;
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
import org.apache.carbondata.core.metadata.schema.PartitionInfo;
import org.apache.carbondata.core.metadata.schema.SchemaReader;
import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
import org.apache.carbondata.core.mutate.UpdateVO;
Expand Down Expand Up @@ -153,34 +150,6 @@ public List<InputSplit> getSplits(JobContext job) throws IOException {
SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager
.getValidAndInvalidSegments(loadMetadataDetails, this.readCommittedScope);

// For NonTransactional table, compare the schema of all index files with inferred schema.
// If there is a mismatch throw exception. As all files must be of same schema.
if (!carbonTable.getTableInfo().isTransactionalTable()) {
SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
for (Segment segment : segments.getValidSegments()) {
Map<String, String> indexFiles = segment.getCommittedIndexFile();
for (Map.Entry<String, String> indexFileEntry : indexFiles.entrySet()) {
Path indexFile = new Path(indexFileEntry.getKey());
org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil.inferSchemaFromIndexFile(
indexFile.toString(), carbonTable.getTableName());
TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
tableInfo, identifier.getDatabaseName(),
identifier.getTableName(),
identifier.getTablePath());
List<ColumnSchema> indexFileColumnList =
wrapperTableInfo.getFactTable().getListOfColumns();
List<ColumnSchema> tableColumnList =
carbonTable.getTableInfo().getFactTable().getListOfColumns();
if (!compareColumnSchemaList(indexFileColumnList, tableColumnList)) {
LOG.error("Schema of " + indexFile.getName()
+ " doesn't match with the table's schema");
throw new IOException("All the files doesn't have same schema. "
+ "Unsupported operation on nonTransactional table. Check logs.");
}
}
}
}

// to check whether only streaming segments access is enabled or not,
// if access streaming segment is true then data will be read from streaming segments
boolean accessStreamingSegments = getAccessStreamingSegments(job.getConfiguration());
Expand Down Expand Up @@ -296,17 +265,6 @@ public List<InputSplit> getSplits(JobContext job) throws IOException {
return splits;
}

private boolean compareColumnSchemaList(List<ColumnSchema> indexFileColumnList,
List<ColumnSchema> tableColumnList) {
if (indexFileColumnList.size() != tableColumnList.size()) {
return false;
}
for (int i = 0; i < tableColumnList.size(); i++) {
return indexFileColumnList.get(i).equalsWithStrictCheck(tableColumnList.get(i));
}
return false;
}

/**
* Below method will be used to get the filter segments when query is fired on pre Aggregate
* and main table in case of streaming.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,11 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,

val tableInfo = if (external) {
// read table info from schema file in the provided table path
// external table also must convert table name to lower case
val identifier = AbsoluteTableIdentifier.from(
tablePath.get,
CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession),
tableIdentifier.table)
CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession).toLowerCase(),
tableIdentifier.table.toLowerCase())
val table = try {
val schemaPath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) {
Expand Down

0 comments on commit 2f79e14

Please sign in to comment.