From 081f224e53db433fe70af83f3561d26f62d8e977 Mon Sep 17 00:00:00 2001 From: kumarvishal09 Date: Tue, 19 Mar 2019 19:22:30 +0530 Subject: [PATCH 1/9] Fixed performance issue --- .../datamap/DistributableDataMapFormat.java | 1 + .../carbondata/core/datamap/TableDataMap.java | 58 +-- .../core/datastore/impl/FileFactory.java | 28 +- .../core/indexstore/ExtendedBlocklet.java | 97 +++-- .../indexstore/SegmentPropertiesFetcher.java | 3 + .../TableBlockIndexUniqueIdentifier.java | 5 +- .../core/indexstore/UnsafeMemoryDMStore.java | 161 +++++--- .../blockletindex/BlockDataMap.java | 103 +++-- .../blockletindex/BlockletDataMap.java | 25 +- .../blockletindex/BlockletDataMapFactory.java | 9 +- .../core/indexstore/row/DataMapRow.java | 9 - .../core/indexstore/row/UnsafeDataMapRow.java | 217 ++--------- .../indexstore/schema/CarbonRowSchema.java | 8 + .../indexstore/schema/SchemaGenerator.java | 70 ++++ .../core/scan/model/QueryModel.java | 30 -- .../carbondata/hadoop/CarbonInputSplit.java | 352 ++++++++++++------ .../hadoop/internal/ObjectArrayWritable.java | 0 .../hadoop/internal/index/Block.java | 0 .../hadoop/CarbonMultiBlockSplit.java | 17 +- .../carbondata/hadoop/CarbonRecordReader.java | 2 +- .../hadoop/api/CarbonFileInputFormat.java | 6 +- .../hadoop/api/CarbonInputFormat.java | 55 +-- .../hadoop/api/CarbonTableInputFormat.java | 44 +-- .../util/CarbonVectorizedRecordReader.java | 2 +- .../presto/impl/CarbonLocalInputSplit.java | 4 +- .../apache/carbondata/spark/util/Util.java | 3 +- .../spark/rdd/CarbonMergerRDD.scala | 4 +- .../carbondata/spark/rdd/CarbonScanRDD.scala | 11 +- .../datasources/SparkCarbonFileFormat.scala | 6 +- 29 files changed, 716 insertions(+), 614 deletions(-) rename {hadoop => core}/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java (57%) rename {hadoop => core}/src/main/java/org/apache/carbondata/hadoop/internal/ObjectArrayWritable.java (100%) rename {hadoop => core}/src/main/java/org/apache/carbondata/hadoop/internal/index/Block.java (100%) diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java index 007541dbc81..4c2300812e2 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java @@ -110,6 +110,7 @@ public RecordReader createRecordReader(InputSplit inputS distributable.getDistributable(), dataMapExprWrapper.getFilterResolverIntf(distributable.getUniqueId()), partitions); for (ExtendedBlocklet blocklet : blocklets) { + blocklet.getDetailInfo(); blocklet.setDataMapUniqueId(distributable.getUniqueId()); } blockletIterator = blocklets.iterator(); diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java index 0d46fd8af9a..d37f5525c0e 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java @@ -125,7 +125,7 @@ public List prune(List segments, Expression filterExp } blocklets.addAll(addSegmentId( blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, segment), - segment.toString())); + segment)); } return blocklets; } @@ -146,15 +146,11 @@ public List prune(List segments, final FilterResolver final List blocklets = new ArrayList<>(); final Map> dataMaps = dataMapFactory.getDataMaps(segments); // for non-filter queries - if (filterExp == null) { - // if filter is not passed, then return all the blocklets. - return pruneWithoutFilter(segments, partitions, blocklets); - } // for filter queries int totalFiles = 0; int datamapsCount = 0; for (Segment segment : segments) { - for (DataMap dataMap : dataMaps.get(segment)) { + for (DataMap dataMap: dataMaps.get(segment)) { totalFiles += dataMap.getNumberOfEntries(); datamapsCount++; } @@ -166,11 +162,16 @@ public List prune(List segments, final FilterResolver // As 0.1 million files block pruning can take only 1 second. // Doing multi-thread for smaller values is not recommended as // driver should have minimum threads opened to support multiple concurrent queries. + if (filterExp == null) { + // if filter is not passed, then return all the blocklets. + return pruneWithoutFilter(segments, partitions, blocklets); + } return pruneWithFilter(segments, filterExp, partitions, blocklets, dataMaps); } // handle by multi-thread - return pruneWithFilterMultiThread(segments, filterExp, partitions, blocklets, dataMaps, - totalFiles); + List extendedBlocklets = + pruneMultiThread(segments, filterExp, partitions, blocklets, dataMaps, totalFiles); + return extendedBlocklets; } private List pruneWithoutFilter(List segments, @@ -179,7 +180,7 @@ private List pruneWithoutFilter(List segments, List allBlocklets = blockletDetailsFetcher.getAllBlocklets(segment, partitions); blocklets.addAll( addSegmentId(blockletDetailsFetcher.getExtendedBlocklets(allBlocklets, segment), - segment.toString())); + segment)); } return blocklets; } @@ -195,12 +196,12 @@ private List pruneWithFilter(List segments, } blocklets.addAll( addSegmentId(blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, segment), - segment.toString())); + segment)); } return blocklets; } - private List pruneWithFilterMultiThread(List segments, + private List pruneMultiThread(List segments, final FilterResolverIntf filterExp, final List partitions, List blocklets, final Map> dataMaps, int totalFiles) { @@ -277,7 +278,8 @@ private List pruneWithFilterMultiThread(List segments throw new RuntimeException(" not all the files processed "); } List> results = new ArrayList<>(numOfThreadsForPruning); - final Map> prunedBlockletMap = new ConcurrentHashMap<>(segments.size()); + final Map> prunedBlockletMap = + new ConcurrentHashMap<>(segments.size()); final ExecutorService executorService = Executors.newFixedThreadPool(numOfThreadsForPruning); final String threadName = Thread.currentThread().getName(); for (int i = 0; i < numOfThreadsForPruning; i++) { @@ -286,16 +288,22 @@ private List pruneWithFilterMultiThread(List segments @Override public Void call() throws IOException { Thread.currentThread().setName(threadName); for (SegmentDataMapGroup segmentDataMapGroup : segmentDataMapGroups) { - List pruneBlocklets = new ArrayList<>(); + List pruneBlocklets = new ArrayList<>(); List dataMapList = dataMaps.get(segmentDataMapGroup.getSegment()); + SegmentProperties segmentProperties = + segmentPropertiesFetcher.getSegmentPropertiesFromDataMap(dataMapList.get(0)); + Segment segment = segmentDataMapGroup.getSegment(); for (int i = segmentDataMapGroup.getFromIndex(); i <= segmentDataMapGroup.getToIndex(); i++) { - pruneBlocklets.addAll(dataMapList.get(i).prune(filterExp, - segmentPropertiesFetcher.getSegmentProperties(segmentDataMapGroup.getSegment()), - partitions)); + List dmPruneBlocklets = dataMapList.get(i).prune(filterExp, + segmentProperties, + partitions); + pruneBlocklets.addAll(addSegmentId(blockletDetailsFetcher + .getExtendedBlocklets(dmPruneBlocklets, segment), + segment)); } synchronized (prunedBlockletMap) { - List pruneBlockletsExisting = + List pruneBlockletsExisting = prunedBlockletMap.get(segmentDataMapGroup.getSegment()); if (pruneBlockletsExisting != null) { pruneBlockletsExisting.addAll(pruneBlocklets); @@ -322,14 +330,8 @@ private List pruneWithFilterMultiThread(List segments throw new RuntimeException(e); } } - for (Map.Entry> entry : prunedBlockletMap.entrySet()) { - try { - blocklets.addAll(addSegmentId( - blockletDetailsFetcher.getExtendedBlocklets(entry.getValue(), entry.getKey()), - entry.getKey().toString())); - } catch (IOException e) { - throw new RuntimeException(e); - } + for (Map.Entry> entry : prunedBlockletMap.entrySet()) { + blocklets.addAll(entry.getValue()); } return blocklets; } @@ -351,9 +353,9 @@ private int getNumOfThreadsForPruning() { } private List addSegmentId(List pruneBlocklets, - String segmentId) { + Segment segment) { for (ExtendedBlocklet blocklet : pruneBlocklets) { - blocklet.setSegmentId(segmentId); + blocklet.setSegment(segment); } return pruneBlocklets; } @@ -423,7 +425,7 @@ public List prune(List dataMaps, DataMapDistributable detailedBlocklet.setDataMapWriterPath(blockletwritePath); serializer.serializeBlocklet((FineGrainBlocklet) blocklet, blockletwritePath); } - detailedBlocklet.setSegmentId(distributable.getSegment().toString()); + detailedBlocklet.setSegment(distributable.getSegment()); detailedBlocklets.add(detailedBlocklet); } return detailedBlocklets; diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java index 7dbbe2ac53a..1182b497c6b 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java @@ -81,16 +81,30 @@ public static FileReader getFileHolder(FileType fileType, Configuration configur } public static FileType getFileType(String path) { - String lowerPath = path.toLowerCase(); - if (lowerPath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) { + if (path.startsWith(CarbonCommonConstants.HDFSURL_PREFIX) || path.toLowerCase() + .startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) { + return FileType.HDFS; + } else if (path.startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX) || path.toLowerCase() + .startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX)) { + return FileType.ALLUXIO; + } else if (path.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX) || path.toLowerCase() + .startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX)) { + return FileType.VIEWFS; + } else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX) || path + .startsWith(CarbonCommonConstants.S3A_PREFIX) || path + .startsWith(CarbonCommonConstants.S3_PREFIX)) { + return FileType.S3; + } + String lowerCase = path.toLowerCase(); + if (lowerCase.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) { return FileType.HDFS; - } else if (lowerPath.startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX)) { + } else if (lowerCase.startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX)) { return FileType.ALLUXIO; - } else if (lowerPath.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX)) { + } else if (lowerCase.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX)) { return FileType.VIEWFS; - } else if (lowerPath.startsWith(CarbonCommonConstants.S3N_PREFIX) || - lowerPath.startsWith(CarbonCommonConstants.S3A_PREFIX) || - lowerPath.startsWith(CarbonCommonConstants.S3_PREFIX)) { + } else if (lowerCase.startsWith(CarbonCommonConstants.S3N_PREFIX) || lowerCase + .startsWith(CarbonCommonConstants.S3A_PREFIX) || lowerCase + .startsWith(CarbonCommonConstants.S3_PREFIX)) { return FileType.S3; } return FileType.LOCAL; diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java index 22dff8e0971..8c4ea060e7e 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java @@ -16,58 +16,67 @@ */ package org.apache.carbondata.core.indexstore; +import java.io.IOException; +import java.util.List; + +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.indexstore.row.DataMapRow; +import org.apache.carbondata.core.metadata.ColumnarFormatVersion; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.hadoop.CarbonInputSplit; + /** * Detailed blocklet information */ public class ExtendedBlocklet extends Blocklet { - private String segmentId; - - private BlockletDetailInfo detailInfo; - - private long length; - - private String[] location; - - private String dataMapWriterPath; - private String dataMapUniqueId; - public ExtendedBlocklet(String filePath, String blockletId) { - super(filePath, blockletId); - } + private CarbonInputSplit inputSplit; public ExtendedBlocklet(String filePath, String blockletId, - boolean compareBlockletIdForObjectMatching) { + boolean compareBlockletIdForObjectMatching, ColumnarFormatVersion version) { super(filePath, blockletId, compareBlockletIdForObjectMatching); + try { + this.inputSplit = CarbonInputSplit.from(null, blockletId, filePath, 0, 0, version, null); + } catch (IOException e) { + throw new RuntimeException(e); + } } - public BlockletDetailInfo getDetailInfo() { - return detailInfo; + public ExtendedBlocklet(String filePath, String blockletId, ColumnarFormatVersion version) { + this(filePath, blockletId, true, version); } - public void setDetailInfo(BlockletDetailInfo detailInfo) { - this.detailInfo = detailInfo; + public BlockletDetailInfo getDetailInfo() { + return this.inputSplit.getDetailInfo(); } - public void setLocation(String[] location) { - this.location = location; + public void setDataMapRow(DataMapRow dataMapRow) { + this.inputSplit.setDataMapRow(dataMapRow); } public String[] getLocations() { - return location; + try { + return this.inputSplit.getLocations(); + } catch (IOException e) { + throw new RuntimeException(e); + } } public long getLength() { - return length; + return this.inputSplit.getLength(); } public String getSegmentId() { - return segmentId; + return this.inputSplit.getSegmentId(); } - public void setSegmentId(String segmentId) { - this.segmentId = segmentId; + public Segment getSegment() { + return this.inputSplit.getSegment(); + } + public void setSegment(Segment segment) { + this.inputSplit.setSegment(segment); } public String getPath() { @@ -75,11 +84,11 @@ public String getPath() { } public String getDataMapWriterPath() { - return dataMapWriterPath; + return this.inputSplit.getDataMapWritePath(); } public void setDataMapWriterPath(String dataMapWriterPath) { - this.dataMapWriterPath = dataMapWriterPath; + this.inputSplit.setDataMapWritePath(dataMapWriterPath); } public String getDataMapUniqueId() { @@ -98,13 +107,41 @@ public void setDataMapUniqueId(String dataMapUniqueId) { } ExtendedBlocklet that = (ExtendedBlocklet) o; - - return segmentId != null ? segmentId.equals(that.segmentId) : that.segmentId == null; + return inputSplit.getSegmentId() != null ? + inputSplit.getSegmentId().equals(that.inputSplit.getSegmentId()) : + that.inputSplit.getSegmentId() == null; } @Override public int hashCode() { int result = super.hashCode(); - result = 31 * result + (segmentId != null ? segmentId.hashCode() : 0); + result = 31 * result + (inputSplit.getSegmentId() != null ? + inputSplit.getSegmentId().hashCode() : + 0); return result; } + + public CarbonInputSplit getInputSplit() { + return inputSplit; + } + + public void setColumnCardinality(int[] cardinality) { + inputSplit.setColumnCardinality(cardinality); + } + + public void setLegacyStore(boolean isLegacyStore) { + inputSplit.setLegacyStore(isLegacyStore); + } + + public void setUseMinMaxForPruning(boolean useMinMaxForPruning) { + this.inputSplit.setUseMinMaxForPruning(useMinMaxForPruning); + } + + public void setIsBlockCache(boolean isBlockCache) { + this.inputSplit.setIsBlockCache(isBlockCache); + } + + public void setColumnSchema(List columnSchema) { + this.inputSplit.setColumnSchema(columnSchema); + } + } diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java index b7fb98c155f..03f8a1ddbe9 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java @@ -20,6 +20,7 @@ import java.io.IOException; import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datamap.dev.DataMap; import org.apache.carbondata.core.datastore.block.SegmentProperties; /** @@ -35,4 +36,6 @@ public interface SegmentPropertiesFetcher { */ SegmentProperties getSegmentProperties(Segment segment) throws IOException; + + SegmentProperties getSegmentPropertiesFromDataMap(DataMap coarseGrainDataMap) throws IOException; } diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java index 3226cebccf7..9f6a76e4913 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java @@ -37,12 +37,15 @@ public class TableBlockIndexUniqueIdentifier implements Serializable { private String segmentId; + private String uniqueName; + public TableBlockIndexUniqueIdentifier(String indexFilePath, String indexFileName, String mergeIndexFileName, String segmentId) { this.indexFilePath = indexFilePath; this.indexFileName = indexFileName; this.mergeIndexFileName = mergeIndexFileName; this.segmentId = segmentId; + this.uniqueName = indexFilePath + CarbonCommonConstants.FILE_SEPARATOR + indexFileName; } /** @@ -51,7 +54,7 @@ public TableBlockIndexUniqueIdentifier(String indexFilePath, String indexFileNam * @return */ public String getUniqueTableSegmentIdentifier() { - return indexFilePath + CarbonCommonConstants.FILE_SEPARATOR + indexFileName; + return this.uniqueName; } public String getIndexFilePath() { diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java index 0db1b0a0ec0..8185c250ccd 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java @@ -16,6 +16,7 @@ */ package org.apache.carbondata.core.indexstore; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.indexstore.row.DataMapRow; import org.apache.carbondata.core.indexstore.row.UnsafeDataMapRow; import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema; @@ -52,7 +53,7 @@ public UnsafeMemoryDMStore() throws MemoryException { this.allocatedSize = capacity; this.memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(MemoryType.ONHEAP, taskId, allocatedSize); - this.pointers = new int[1000]; + this.pointers = new int[100]; } /** @@ -66,7 +67,7 @@ private void ensureSize(int rowSize) throws MemoryException { increaseMemory(runningLength + rowSize); } if (this.pointers.length <= rowCount + 1) { - int[] newPointer = new int[pointers.length + 1000]; + int[] newPointer = new int[pointers.length + 100]; System.arraycopy(pointers, 0, newPointer, 0, pointers.length); this.pointers = newPointer; } @@ -84,9 +85,33 @@ private void increaseMemory(int requiredMemory) throws MemoryException { /** * Add the index row to unsafe. + * Below format is used to store data in memory block + * WRITE: + * + * FD: Fixed Column data + * VO: Variable column data offset + * VD: Variable column data + * LO: Last Offset + * + * Read: + * FD: Read directly based of byte postion added in CarbonRowSchema + * + * VD: Read based on below logic + * if not last variable column schema + * X = read actual variable column offset based on byte postion added in CarbonRowSchema + * Y = read next variable column offset (next 4 bytes) + * get the length + * len = (X-Y) + * read data from offset X of size len + * + * if last variable column + * X = read actual variable column offset based on byte postion added in CarbonRowSchema + * Y = read last offset (next 4 bytes) + * get the length + * len = (X-Y) + * read data from offset X of size len * * @param indexRow - * @return */ public void addIndexRow(CarbonRowSchema[] schema, DataMapRow indexRow) throws MemoryException { // First calculate the required memory to keep the row in unsafe @@ -94,88 +119,122 @@ public void addIndexRow(CarbonRowSchema[] schema, DataMapRow indexRow) throws Me // Check whether allocated memory is sufficient or not. ensureSize(rowSize); int pointer = runningLength; - + int bytePosition = 0; for (int i = 0; i < schema.length; i++) { - addToUnsafe(schema[i], indexRow, i); + switch (schema[i].getSchemaType()) { + case STRUCT: + CarbonRowSchema[] childSchemas = + ((CarbonRowSchema.StructCarbonRowSchema) schema[i]).getChildSchemas(); + for (int j = 0; j < childSchemas.length; j++) { + if (childSchemas[j].getBytePosition() > bytePosition) { + bytePosition = childSchemas[j].getBytePosition(); + } + } + break; + default: + if (schema[i].getBytePosition() > bytePosition) { + bytePosition = schema[i].getBytePosition(); + } + } } + // byte position of Last offset + bytePosition += CarbonCommonConstants.INT_SIZE_IN_BYTE; + // start byte position of variable length data + int varColPosition = bytePosition + CarbonCommonConstants.INT_SIZE_IN_BYTE; + // current position refers to current byte postion in memory block + int currentPosition; + for (int i = 0; i < schema.length; i++) { + switch (schema[i].getSchemaType()) { + case STRUCT: + CarbonRowSchema[] childSchemas = + ((CarbonRowSchema.StructCarbonRowSchema) schema[i]).getChildSchemas(); + DataMapRow row = indexRow.getRow(i); + for (int j = 0; j < childSchemas.length; j++) { + currentPosition = addToUnsafe(childSchemas[j], row, j, pointer, varColPosition); + if (currentPosition > 0) { + varColPosition = currentPosition; + } + } + break; + default: + currentPosition = addToUnsafe(schema[i], indexRow, i, pointer, varColPosition); + if (currentPosition > 0) { + varColPosition = currentPosition; + } + break; + } + } + // writting the last offset + getUnsafe() + .putInt(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + pointer + bytePosition, + varColPosition); + // after adding last offset increament the length by 4 bytes as last postion + // written as INT + runningLength += CarbonCommonConstants.INT_SIZE_IN_BYTE; pointers[rowCount++] = pointer; } - private void addToUnsafe(CarbonRowSchema schema, DataMapRow row, int index) { + private int addToUnsafe(CarbonRowSchema schema, DataMapRow row, int index, int startOffset, + int varPosition) { switch (schema.getSchemaType()) { case FIXED: DataType dataType = schema.getDataType(); if (dataType == DataTypes.BYTE) { - getUnsafe() - .putByte(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength, - row.getByte(index)); + getUnsafe().putByte(memoryBlock.getBaseObject(), + memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(), + row.getByte(index)); runningLength += row.getSizeInBytes(index); } else if (dataType == DataTypes.BOOLEAN) { - getUnsafe() - .putBoolean(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength, - row.getBoolean(index)); + getUnsafe().putBoolean(memoryBlock.getBaseObject(), + memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(), + row.getBoolean(index)); runningLength += row.getSizeInBytes(index); } else if (dataType == DataTypes.SHORT) { - getUnsafe() - .putShort(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength, - row.getShort(index)); + getUnsafe().putShort(memoryBlock.getBaseObject(), + memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(), + row.getShort(index)); runningLength += row.getSizeInBytes(index); } else if (dataType == DataTypes.INT) { - getUnsafe() - .putInt(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength, - row.getInt(index)); + getUnsafe().putInt(memoryBlock.getBaseObject(), + memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(), + row.getInt(index)); runningLength += row.getSizeInBytes(index); } else if (dataType == DataTypes.LONG) { - getUnsafe() - .putLong(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength, - row.getLong(index)); + getUnsafe().putLong(memoryBlock.getBaseObject(), + memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(), + row.getLong(index)); runningLength += row.getSizeInBytes(index); } else if (dataType == DataTypes.FLOAT) { - getUnsafe() - .putFloat(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength, - row.getFloat(index)); + getUnsafe().putFloat(memoryBlock.getBaseObject(), + memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(), + row.getFloat(index)); runningLength += row.getSizeInBytes(index); } else if (dataType == DataTypes.DOUBLE) { - getUnsafe() - .putDouble(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength, - row.getDouble(index)); + getUnsafe().putDouble(memoryBlock.getBaseObject(), + memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(), + row.getDouble(index)); runningLength += row.getSizeInBytes(index); } else if (dataType == DataTypes.BYTE_ARRAY) { byte[] data = row.getByteArray(index); getUnsafe().copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(), - memoryBlock.getBaseOffset() + runningLength, data.length); + memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(), data.length); runningLength += row.getSizeInBytes(index); } else { throw new UnsupportedOperationException( "unsupported data type for unsafe storage: " + schema.getDataType()); } - break; + return 0; case VARIABLE_SHORT: - byte[] data = row.getByteArray(index); - getUnsafe().putShort(memoryBlock.getBaseObject(), - memoryBlock.getBaseOffset() + runningLength, (short) data.length); - runningLength += 2; - getUnsafe().copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(), - memoryBlock.getBaseOffset() + runningLength, data.length); - runningLength += data.length; - break; case VARIABLE_INT: - byte[] data2 = row.getByteArray(index); + byte[] data = row.getByteArray(index); getUnsafe().putInt(memoryBlock.getBaseObject(), - memoryBlock.getBaseOffset() + runningLength, data2.length); + memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(), varPosition); runningLength += 4; - getUnsafe().copyMemory(data2, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(), - memoryBlock.getBaseOffset() + runningLength, data2.length); - runningLength += data2.length; - break; - case STRUCT: - CarbonRowSchema[] childSchemas = - ((CarbonRowSchema.StructCarbonRowSchema) schema).getChildSchemas(); - DataMapRow struct = row.getRow(index); - for (int i = 0; i < childSchemas.length; i++) { - addToUnsafe(childSchemas[i], struct, i); - } - break; + getUnsafe().copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(), + memoryBlock.getBaseOffset() + startOffset + varPosition, data.length); + runningLength += data.length; + varPosition += data.length; + return varPosition; default: throw new UnsupportedOperationException( "unsupported data type for unsafe storage: " + schema.getDataType()); diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java index a7818c210f7..279120433b3 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java @@ -36,7 +36,6 @@ import org.apache.carbondata.core.indexstore.AbstractMemoryDMStore; import org.apache.carbondata.core.indexstore.BlockMetaInfo; import org.apache.carbondata.core.indexstore.Blocklet; -import org.apache.carbondata.core.indexstore.BlockletDetailInfo; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.indexstore.SafeMemoryDMStore; @@ -45,6 +44,7 @@ import org.apache.carbondata.core.indexstore.row.DataMapRowImpl; import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema; import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex; import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex; @@ -128,7 +128,7 @@ public class BlockDataMap extends CoarseGrainDataMap filePath = path.getParent().toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET); isFilePathStored = true; } - byte[] fileName = path.getName().toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET); + byte[] fileName = path.getName().getBytes(CarbonCommonConstants.DEFAULT_CHARSET); byte[] segmentId = blockletDataMapInfo.getSegmentId().getBytes(CarbonCommonConstants.DEFAULT_CHARSET); if (!indexInfo.isEmpty()) { @@ -466,6 +466,13 @@ protected void addMinMaxFlagValues(DataMapRow row, CarbonRowSchema carbonRowSche row.setRow(minMaxFlagRow, ordinal); } + protected String getFileNameWithFilePath(DataMapRow dataMapRow, String filePath) { + String fileName = filePath + CarbonCommonConstants.FILE_SEPARATOR + new String( + dataMapRow.getByteArray(FILE_PATH_INDEX), CarbonCommonConstants.DEFAULT_CHARSET_CLASS) + + CarbonTablePath.getCarbonDataExtension(); + return FileFactory.getUpdatedFilePath(fileName); + } + protected String getFileNameFromPath(String filePath) { return CarbonTablePath.getCarbonDataFileName(filePath); } @@ -481,13 +488,6 @@ protected String getFilePath() { return CarbonTablePath.getSegmentPath(tablePath, segmentId); } - protected String getFileNameWithFilePath(DataMapRow dataMapRow, String filePath) { - String fileName = filePath + CarbonCommonConstants.FILE_SEPARATOR + new String( - dataMapRow.getByteArray(FILE_PATH_INDEX), CarbonCommonConstants.DEFAULT_CHARSET_CLASS) - + CarbonTablePath.getCarbonDataExtension(); - return FileFactory.getUpdatedFilePath(fileName); - } - private void addTaskSummaryRowToUnsafeMemoryStore(CarbonRowSchema[] taskSummarySchema, DataMapRow summaryRow, byte[] filePath, byte[] fileName, byte[] segmentId) { // write the task summary info to unsafe memory store @@ -648,6 +648,7 @@ protected int getTotalBlocklets() { return sum; } + private List prune(FilterResolverIntf filterExp) { if (memoryDMStore.getRowCount() == 0) { return new ArrayList<>(); @@ -656,13 +657,16 @@ private List prune(FilterResolverIntf filterExp) { CarbonRowSchema[] schema = getFileFooterEntrySchema(); String filePath = getFilePath(); int numEntries = memoryDMStore.getRowCount(); - int totalBlocklets = getTotalBlocklets(); + int totalBlocklets = 0; + if (ExplainCollector.enabled()) { + totalBlocklets = getTotalBlocklets(); + } int hitBlocklets = 0; if (filterExp == null) { for (int i = 0; i < numEntries; i++) { - DataMapRow safeRow = memoryDMStore.getDataMapRow(schema, i).convertToSafeRow(); - blocklets.add(createBlocklet(safeRow, getFileNameWithFilePath(safeRow, filePath), - getBlockletId(safeRow), false)); + DataMapRow dataMapRow = memoryDMStore.getDataMapRow(schema, i); + blocklets.add(createBlocklet(dataMapRow, getFileNameWithFilePath(dataMapRow, filePath), + getBlockletId(dataMapRow), false)); } hitBlocklets = totalBlocklets; } else { @@ -675,28 +679,31 @@ private List prune(FilterResolverIntf filterExp) { boolean useMinMaxForPruning = useMinMaxForExecutorPruning(filterExp); // min and max for executor pruning while (entryIndex < numEntries) { - DataMapRow safeRow = memoryDMStore.getDataMapRow(schema, entryIndex).convertToSafeRow(); - boolean[] minMaxFlag = getMinMaxFlag(safeRow, BLOCK_MIN_MAX_FLAG); - String fileName = getFileNameWithFilePath(safeRow, filePath); - short blockletId = getBlockletId(safeRow); + DataMapRow row = memoryDMStore.getDataMapRow(schema, entryIndex); + boolean[] minMaxFlag = getMinMaxFlag(row, BLOCK_MIN_MAX_FLAG); + String fileName = getFileNameWithFilePath(row, filePath); + short blockletId = getBlockletId(row); boolean isValid = - addBlockBasedOnMinMaxValue(filterExecuter, getMinMaxValue(safeRow, MAX_VALUES_INDEX), - getMinMaxValue(safeRow, MIN_VALUES_INDEX), minMaxFlag, fileName, blockletId); + addBlockBasedOnMinMaxValue(filterExecuter, getMinMaxValue(row, MAX_VALUES_INDEX), + getMinMaxValue(row, MIN_VALUES_INDEX), minMaxFlag, fileName, blockletId); if (isValid) { - blocklets.add(createBlocklet(safeRow, fileName, blockletId, useMinMaxForPruning)); - hitBlocklets += getBlockletNumOfEntry(entryIndex); + blocklets.add(createBlocklet(row, fileName, blockletId, useMinMaxForPruning)); + if (ExplainCollector.enabled()) { + hitBlocklets += getBlockletNumOfEntry(entryIndex); + } } entryIndex++; } } - - if (isLegacyStore) { - ExplainCollector.setShowPruningInfo(false); - } else { - ExplainCollector.setShowPruningInfo(true); - ExplainCollector.addTotalBlocklets(totalBlocklets); - ExplainCollector.addTotalBlocks(getTotalBlocks()); - ExplainCollector.addDefaultDataMapPruningHit(hitBlocklets); + if (ExplainCollector.enabled()) { + if (isLegacyStore) { + ExplainCollector.setShowPruningInfo(false); + } else { + ExplainCollector.setShowPruningInfo(true); + ExplainCollector.addTotalBlocklets(totalBlocklets); + ExplainCollector.addTotalBlocks(getTotalBlocks()); + ExplainCollector.addDefaultDataMapPruningHit(hitBlocklets); + } } return blocklets; } @@ -852,10 +859,10 @@ private ExtendedBlocklet createBlockletFromRelativeBlockletId(int absoluteBlockl rowIndex++; } } - DataMapRow safeRow = - memoryDMStore.getDataMapRow(getFileFooterEntrySchema(), rowIndex).convertToSafeRow(); + DataMapRow row = + memoryDMStore.getDataMapRow(getFileFooterEntrySchema(), rowIndex); String filePath = getFilePath(); - return createBlocklet(safeRow, getFileNameWithFilePath(safeRow, filePath), relativeBlockletId, + return createBlocklet(row, getFileNameWithFilePath(row, filePath), relativeBlockletId, false); } @@ -906,34 +913,16 @@ protected short getBlockletId(DataMapRow dataMapRow) { protected ExtendedBlocklet createBlocklet(DataMapRow row, String fileName, short blockletId, boolean useMinMaxForPruning) { - ExtendedBlocklet blocklet = new ExtendedBlocklet(fileName, blockletId + "", false); - BlockletDetailInfo detailInfo = getBlockletDetailInfo(row, blockletId, blocklet); - detailInfo.setBlockletInfoBinary(new byte[0]); - blocklet.setDetailInfo(detailInfo); + short versionNumber = row.getShort(VERSION_INDEX); + ExtendedBlocklet blocklet = new ExtendedBlocklet(fileName, blockletId + "", false, + ColumnarFormatVersion.valueOf(versionNumber)); + blocklet.setDataMapRow(row); + blocklet.setColumnCardinality(getColumnCardinality()); + blocklet.setLegacyStore(isLegacyStore); + blocklet.setUseMinMaxForPruning(useMinMaxForPruning); return blocklet; } - protected BlockletDetailInfo getBlockletDetailInfo(DataMapRow row, short blockletId, - ExtendedBlocklet blocklet) { - BlockletDetailInfo detailInfo = new BlockletDetailInfo(); - detailInfo.setRowCount(row.getInt(ROW_COUNT_INDEX)); - detailInfo.setVersionNumber(row.getShort(VERSION_INDEX)); - detailInfo.setBlockletId(blockletId); - detailInfo.setDimLens(getColumnCardinality()); - detailInfo.setSchemaUpdatedTimeStamp(row.getLong(SCHEMA_UPADATED_TIME_INDEX)); - try { - blocklet.setLocation( - new String(row.getByteArray(LOCATIONS), CarbonCommonConstants.DEFAULT_CHARSET) - .split(",")); - } catch (IOException e) { - throw new RuntimeException(e); - } - detailInfo.setBlockFooterOffset(row.getLong(BLOCK_FOOTER_OFFSET)); - detailInfo.setBlockSize(row.getLong(BLOCK_LENGTH)); - detailInfo.setLegacyStore(isLegacyStore); - return detailInfo; - } - private String[] getFileDetails() { try { String[] fileDetails = new String[3]; diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java index 191056dd6df..571e7208faf 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java @@ -30,12 +30,12 @@ import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder; import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.indexstore.BlockMetaInfo; -import org.apache.carbondata.core.indexstore.BlockletDetailInfo; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.indexstore.row.DataMapRow; import org.apache.carbondata.core.indexstore.row.DataMapRowImpl; import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema; import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex; @@ -231,11 +231,10 @@ public ExtendedBlocklet getDetailedBlocklet(String blockletId) { return super.getDetailedBlocklet(blockletId); } int absoluteBlockletId = Integer.parseInt(blockletId); - DataMapRow safeRow = memoryDMStore.getDataMapRow(getFileFooterEntrySchema(), absoluteBlockletId) - .convertToSafeRow(); - short relativeBlockletId = safeRow.getShort(BLOCKLET_ID_INDEX); + DataMapRow row = memoryDMStore.getDataMapRow(getFileFooterEntrySchema(), absoluteBlockletId); + short relativeBlockletId = row.getShort(BLOCKLET_ID_INDEX); String filePath = getFilePath(); - return createBlocklet(safeRow, getFileNameWithFilePath(safeRow, filePath), relativeBlockletId, + return createBlocklet(row, getFileNameWithFilePath(row, filePath), relativeBlockletId, false); } @@ -261,13 +260,15 @@ protected ExtendedBlocklet createBlocklet(DataMapRow row, String fileName, short if (isLegacyStore) { return super.createBlocklet(row, fileName, blockletId, useMinMaxForPruning); } - ExtendedBlocklet blocklet = new ExtendedBlocklet(fileName, blockletId + ""); - BlockletDetailInfo detailInfo = getBlockletDetailInfo(row, blockletId, blocklet); - detailInfo.setColumnSchemas(getColumnSchema()); - detailInfo.setBlockletInfoBinary(row.getByteArray(BLOCKLET_INFO_INDEX)); - detailInfo.setPagesCount(row.getShort(BLOCKLET_PAGE_COUNT_INDEX)); - detailInfo.setUseMinMaxForPruning(useMinMaxForPruning); - blocklet.setDetailInfo(detailInfo); + short versionNumber = row.getShort(VERSION_INDEX); + ExtendedBlocklet blocklet = new ExtendedBlocklet(fileName, blockletId + "", + ColumnarFormatVersion.valueOf(versionNumber)); + blocklet.setColumnSchema(getColumnSchema()); + blocklet.setUseMinMaxForPruning(useMinMaxForPruning); + blocklet.setIsBlockCache(false); + blocklet.setColumnCardinality(getColumnCardinality()); + blocklet.setLegacyStore(isLegacyStore); + blocklet.setDataMapRow(row); return blocklet; } diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java index 5892f78a6bd..2ef7b8809bb 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java @@ -192,7 +192,7 @@ private Set getTableBlockIndexUniqueIdentifiers @Override public List getExtendedBlocklets(List blocklets, Segment segment) throws IOException { - List detailedBlocklets = new ArrayList<>(); + List detailedBlocklets = new ArrayList<>(blocklets.size() + 1); // If it is already detailed blocklet then type cast and return same if (blocklets.size() > 0 && blocklets.get(0) instanceof ExtendedBlocklet) { for (Blocklet blocklet : blocklets) { @@ -379,6 +379,13 @@ public void deleteDatamapData(Segment segment) throws IOException { return dataMap.getSegmentProperties(); } + @Override public SegmentProperties getSegmentPropertiesFromDataMap(DataMap coarseGrainDataMap) + throws IOException { + assert (coarseGrainDataMap instanceof BlockDataMap); + BlockDataMap dataMap = (BlockDataMap) coarseGrainDataMap; + return dataMap.getSegmentProperties(); + } + @Override public List getAllBlocklets(Segment segment, List partitions) throws IOException { List blocklets = new ArrayList<>(); diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java index c0ea0a0d24a..c79735fbcb0 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java @@ -105,15 +105,6 @@ public int getColumnCount() { return schemas.length; } - /** - * default implementation - * - * @return - */ - public DataMapRow convertToSafeRow() { - return this; - } - public void setSchemas(CarbonRowSchema[] schemas) { if (null == this.schemas) { this.schemas = schemas; diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java index 70f0e0db9d3..e77d11ceb59 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java @@ -19,8 +19,6 @@ import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema; import org.apache.carbondata.core.memory.MemoryBlock; -import org.apache.carbondata.core.metadata.datatype.DataType; -import org.apache.carbondata.core.metadata.datatype.DataTypes; import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET; import static org.apache.carbondata.core.memory.CarbonUnsafe.getUnsafe; @@ -47,38 +45,39 @@ public UnsafeDataMapRow(CarbonRowSchema[] schemas, MemoryBlock block, int pointe @Override public byte[] getByteArray(int ordinal) { int length; - int position = getPosition(ordinal); + int currentOffset = 0; switch (schemas[ordinal].getSchemaType()) { case VARIABLE_SHORT: - length = getUnsafe().getShort(block.getBaseObject(), - block.getBaseOffset() + pointer + position); - position += 2; - break; case VARIABLE_INT: - length = getUnsafe().getInt(block.getBaseObject(), - block.getBaseOffset() + pointer + position); - position += 4; + final int schemaOrdinal = schemas[ordinal].getBytePosition(); + currentOffset = getUnsafe().getInt(block.getBaseObject(), + block.getBaseOffset() + pointer + schemaOrdinal); + int nextOffset = getUnsafe().getInt(block.getBaseObject(), + block.getBaseOffset() + pointer + schemaOrdinal + 4); + length = nextOffset - currentOffset; break; default: + currentOffset = schemas[ordinal].getBytePosition(); length = schemas[ordinal].getLength(); } byte[] data = new byte[length]; - getUnsafe().copyMemory(block.getBaseObject(), block.getBaseOffset() + pointer + position, data, - BYTE_ARRAY_OFFSET, data.length); + getUnsafe() + .copyMemory(block.getBaseObject(), block.getBaseOffset() + pointer + currentOffset, data, + BYTE_ARRAY_OFFSET, data.length); return data; } @Override public int getLengthInBytes(int ordinal) { int length; - int position = getPosition(ordinal); + int schemaOrdinal = schemas[ordinal].getBytePosition(); switch (schemas[ordinal].getSchemaType()) { case VARIABLE_SHORT: - length = getUnsafe().getShort(block.getBaseObject(), - block.getBaseOffset() + pointer + position); - break; case VARIABLE_INT: - length = getUnsafe().getInt(block.getBaseObject(), - block.getBaseOffset() + pointer + position); + int currentOffset = getUnsafe().getInt(block.getBaseObject(), + block.getBaseOffset() + pointer + schemaOrdinal); + int nextOffset = getUnsafe().getInt(block.getBaseObject(), + block.getBaseOffset() + pointer + schemaOrdinal + 4); + length = nextOffset - currentOffset; break; default: length = schemas[ordinal].getLength(); @@ -91,31 +90,14 @@ public UnsafeDataMapRow(CarbonRowSchema[] schemas, MemoryBlock block, int pointe } @Override public boolean getBoolean(int ordinal) { - return getUnsafe() - .getBoolean(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal)); - } - - private int getLengthInBytes(int ordinal, int position) { - int length; - switch (schemas[ordinal].getSchemaType()) { - case VARIABLE_SHORT: - length = getUnsafe().getShort(block.getBaseObject(), - block.getBaseOffset() + pointer + position); - break; - case VARIABLE_INT: - length = getUnsafe().getInt(block.getBaseObject(), - block.getBaseOffset() + pointer + position); - break; - default: - length = schemas[ordinal].getLength(); - } - return length; + return getUnsafe().getBoolean(block.getBaseObject(), + block.getBaseOffset() + pointer + schemas[ordinal].getBytePosition()); } @Override public DataMapRow getRow(int ordinal) { CarbonRowSchema[] childSchemas = ((CarbonRowSchema.StructCarbonRowSchema) schemas[ordinal]).getChildSchemas(); - return new UnsafeDataMapRow(childSchemas, block, pointer + getPosition(ordinal)); + return new UnsafeDataMapRow(childSchemas, block, pointer); } @Override public void setByteArray(byte[] byteArray, int ordinal) { @@ -123,8 +105,8 @@ private int getLengthInBytes(int ordinal, int position) { } @Override public int getInt(int ordinal) { - return getUnsafe() - .getInt(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal)); + return getUnsafe().getInt(block.getBaseObject(), + block.getBaseOffset() + pointer + schemas[ordinal].getBytePosition()); } @Override public void setInt(int value, int ordinal) { @@ -136,8 +118,8 @@ private int getLengthInBytes(int ordinal, int position) { } @Override public byte getByte(int ordinal) { - return getUnsafe() - .getByte(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal)); + return getUnsafe().getByte(block.getBaseObject(), + block.getBaseOffset() + pointer + schemas[ordinal].getBytePosition()); } @Override public void setShort(short value, int ordinal) { @@ -145,8 +127,8 @@ private int getLengthInBytes(int ordinal, int position) { } @Override public short getShort(int ordinal) { - return getUnsafe() - .getShort(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal)); + return getUnsafe().getShort(block.getBaseObject(), + block.getBaseOffset() + pointer + schemas[ordinal].getBytePosition()); } @Override public void setLong(long value, int ordinal) { @@ -154,8 +136,8 @@ private int getLengthInBytes(int ordinal, int position) { } @Override public long getLong(int ordinal) { - return getUnsafe() - .getLong(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal)); + return getUnsafe().getLong(block.getBaseObject(), + block.getBaseOffset() + pointer + schemas[ordinal].getBytePosition()); } @Override public void setFloat(float value, int ordinal) { @@ -163,8 +145,8 @@ private int getLengthInBytes(int ordinal, int position) { } @Override public float getFloat(int ordinal) { - return getUnsafe() - .getFloat(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal)); + return getUnsafe().getFloat(block.getBaseObject(), + block.getBaseOffset() + pointer + schemas[ordinal].getBytePosition()); } @Override public void setDouble(double value, int ordinal) { @@ -172,146 +154,11 @@ private int getLengthInBytes(int ordinal, int position) { } @Override public double getDouble(int ordinal) { - return getUnsafe() - .getDouble(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal)); + return getUnsafe().getDouble(block.getBaseObject(), + block.getBaseOffset() + pointer + schemas[ordinal].getBytePosition()); } @Override public void setRow(DataMapRow row, int ordinal) { throw new UnsupportedOperationException("Not supported to set on unsafe row"); } - - /** - * Convert unsafe to safe row. - * - * @return - */ - public DataMapRow convertToSafeRow() { - DataMapRowImpl row = new DataMapRowImpl(schemas); - int runningLength = 0; - for (int i = 0; i < schemas.length; i++) { - CarbonRowSchema schema = schemas[i]; - switch (schema.getSchemaType()) { - case FIXED: - DataType dataType = schema.getDataType(); - if (dataType == DataTypes.BYTE) { - row.setByte( - getUnsafe().getByte( - block.getBaseObject(), - block.getBaseOffset() + pointer + runningLength), - i); - runningLength += schema.getLength(); - } else if (dataType == DataTypes.BOOLEAN) { - row.setBoolean( - getUnsafe().getBoolean( - block.getBaseObject(), - block.getBaseOffset() + pointer + runningLength), - i); - runningLength += schema.getLength(); - } else if (dataType == DataTypes.SHORT) { - row.setShort( - getUnsafe().getShort( - block.getBaseObject(), - block.getBaseOffset() + pointer + runningLength), - i); - runningLength += schema.getLength(); - } else if (dataType == DataTypes.INT) { - row.setInt( - getUnsafe().getInt( - block.getBaseObject(), - block.getBaseOffset() + pointer + runningLength), - i); - runningLength += schema.getLength(); - } else if (dataType == DataTypes.LONG) { - row.setLong( - getUnsafe().getLong( - block.getBaseObject(), - block.getBaseOffset() + pointer + runningLength), - i); - runningLength += schema.getLength(); - } else if (dataType == DataTypes.FLOAT) { - row.setFloat( - getUnsafe().getFloat(block.getBaseObject(), - block.getBaseOffset() + pointer + runningLength), - i); - runningLength += schema.getLength(); - } else if (dataType == DataTypes.DOUBLE) { - row.setDouble( - getUnsafe().getDouble(block.getBaseObject(), - block.getBaseOffset() + pointer + runningLength), - i); - runningLength += schema.getLength(); - } else if (dataType == DataTypes.BYTE_ARRAY) { - byte[] data = new byte[schema.getLength()]; - getUnsafe().copyMemory( - block.getBaseObject(), - block.getBaseOffset() + pointer + runningLength, - data, - BYTE_ARRAY_OFFSET, - data.length); - row.setByteArray(data, i); - runningLength += data.length; - } else { - throw new UnsupportedOperationException( - "unsupported data type for unsafe storage: " + schema.getDataType()); - } - break; - case VARIABLE_SHORT: - int length = getUnsafe() - .getShort(block.getBaseObject(), block.getBaseOffset() + pointer + runningLength); - runningLength += 2; - byte[] data = new byte[length]; - getUnsafe().copyMemory(block.getBaseObject(), - block.getBaseOffset() + pointer + runningLength, - data, BYTE_ARRAY_OFFSET, data.length); - runningLength += data.length; - row.setByteArray(data, i); - break; - case VARIABLE_INT: - int length2 = getUnsafe() - .getInt(block.getBaseObject(), block.getBaseOffset() + pointer + runningLength); - runningLength += 4; - byte[] data2 = new byte[length2]; - getUnsafe().copyMemory(block.getBaseObject(), - block.getBaseOffset() + pointer + runningLength, - data2, BYTE_ARRAY_OFFSET, data2.length); - runningLength += data2.length; - row.setByteArray(data2, i); - break; - case STRUCT: - DataMapRow structRow = ((UnsafeDataMapRow) getRow(i)).convertToSafeRow(); - row.setRow(structRow, i); - runningLength += structRow.getTotalSizeInBytes(); - break; - default: - throw new UnsupportedOperationException( - "unsupported data type for unsafe storage: " + schema.getDataType()); - } - } - row.setTotalLengthInBytes(runningLength); - - return row; - } - - private int getSizeInBytes(int ordinal, int position) { - switch (schemas[ordinal].getSchemaType()) { - case FIXED: - return schemas[ordinal].getLength(); - case VARIABLE_SHORT: - return getLengthInBytes(ordinal, position) + 2; - case VARIABLE_INT: - return getLengthInBytes(ordinal, position) + 4; - case STRUCT: - return getRow(ordinal).getTotalSizeInBytes(); - default: - throw new UnsupportedOperationException("wrong type"); - } - } - - private int getPosition(int ordinal) { - int position = 0; - for (int i = 0; i < ordinal; i++) { - position += getSizeInBytes(i, position); - } - return position; - } } diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java index 7f47c000f54..30a7a9cff0a 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java @@ -28,6 +28,7 @@ public abstract class CarbonRowSchema implements Serializable { private static final long serialVersionUID = -8061282029097686495L; protected DataType dataType; + private int bytePosition = -1; public CarbonRowSchema(DataType dataType) { this.dataType = dataType; @@ -55,6 +56,13 @@ public int getLength() { return dataType.getSizeInBytes(); } + public void setBytePosition(int bytePosition) { + this.bytePosition = bytePosition; + } + + public int getBytePosition() { + return this.bytePosition; + } /** * schema type * @return diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java index 7a2e13a5f91..c88e1bebc59 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.datatype.DataTypes; @@ -60,9 +61,76 @@ public static CarbonRowSchema[] createBlockSchema(SegmentProperties segmentPrope // written in the metadata or not. addMinMaxFlagSchema(segmentProperties, indexSchemas, minMaxCacheColumns); CarbonRowSchema[] schema = indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]); + updateBytePosition(schema); return schema; } + /** + * Method to update the byte position which will be used in case of unsafe dm store + * @see org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java:87 + * + * @param schema + */ + private static void updateBytePosition(CarbonRowSchema[] schema) { + int currentSize; + int bytePosition = 0; + // First assign byte postion to all the fixed length schema + for (int i = 0; i < schema.length; i++) { + switch (schema[i].getSchemaType()) { + case STRUCT: + CarbonRowSchema[] childSchemas = + ((CarbonRowSchema.StructCarbonRowSchema) schema[i]).getChildSchemas(); + for (int j = 0; j < childSchemas.length; j++) { + currentSize = getSchemaSize(childSchemas[j]); + if (currentSize != -1) { + childSchemas[j].setBytePosition(bytePosition); + bytePosition += currentSize; + } + } + break; + default: + currentSize = getSchemaSize(schema[i]); + if (currentSize != -1) { + schema[i].setBytePosition(bytePosition); + bytePosition += currentSize; + } + break; + } + } + // adding byte position for storing offset in case of variable length columns + for (int i = 0; i < schema.length; i++) { + switch (schema[i].getSchemaType()) { + case STRUCT: + CarbonRowSchema[] childSchemas = + ((CarbonRowSchema.StructCarbonRowSchema) schema[i]).getChildSchemas(); + for (int j = 0; j < childSchemas.length; j++) { + if (childSchemas[j].getBytePosition() == -1) { + childSchemas[j].setBytePosition(bytePosition); + bytePosition += CarbonCommonConstants.INT_SIZE_IN_BYTE; + } + } + break; + default: + if (schema[i].getBytePosition() == -1) { + schema[i].setBytePosition(bytePosition); + bytePosition += CarbonCommonConstants.INT_SIZE_IN_BYTE; + } + break; + } + } + } + private static int getSchemaSize(CarbonRowSchema schema) { + switch (schema.getSchemaType()) { + case FIXED: + return schema.getLength(); + case VARIABLE_SHORT: + case VARIABLE_INT: + return -1; + default: + throw new UnsupportedOperationException("Invalid Type"); + } + } + /** * Method for creating blocklet Schema. Each blocklet row will share the same schema * @@ -98,6 +166,7 @@ public static CarbonRowSchema[] createBlockletSchema(SegmentProperties segmentPr // for relative blocklet id i.e. blocklet id that belongs to a particular part file indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT)); CarbonRowSchema[] schema = indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]); + updateBytePosition(schema); return schema; } @@ -138,6 +207,7 @@ public static CarbonRowSchema[] createTaskSummarySchema(SegmentProperties segmen } CarbonRowSchema[] schema = taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()]); + updateBytePosition(schema); return schema; } diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java index d6017f57b02..267527f8a23 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java @@ -18,19 +18,16 @@ package org.apache.carbondata.core.scan.model; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.carbondata.core.cache.dictionary.Dictionary; -import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; -import org.apache.carbondata.core.mutate.UpdateVO; import org.apache.carbondata.core.scan.expression.ColumnExpression; import org.apache.carbondata.core.scan.expression.Expression; import org.apache.carbondata.core.scan.expression.UnknownExpression; @@ -92,15 +89,6 @@ public class QueryModel { private DataTypeConverter converter; - /** - * Invalid table blocks, which need to be removed from - * memory, invalid blocks can be segment which are deleted - * or compacted - */ - private List invalidSegmentIds; - private Map invalidSegmentBlockIdMap = - new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - private boolean[] isFilterDimensions; private boolean[] isFilterMeasures; @@ -135,7 +123,6 @@ public class QueryModel { private QueryModel(CarbonTable carbonTable) { tableBlockInfos = new ArrayList(); - invalidSegmentIds = new ArrayList<>(); this.table = carbonTable; this.queryId = String.valueOf(System.nanoTime()); } @@ -350,14 +337,6 @@ public void setStatisticsRecorder(QueryStatisticsRecorder statisticsRecorder) { this.statisticsRecorder = statisticsRecorder; } - public List getInvalidSegmentIds() { - return invalidSegmentIds; - } - - public void setInvalidSegmentIds(List invalidSegmentIds) { - this.invalidSegmentIds = invalidSegmentIds; - } - public boolean isVectorReader() { return vectorReader; } @@ -365,15 +344,6 @@ public boolean isVectorReader() { public void setVectorReader(boolean vectorReader) { this.vectorReader = vectorReader; } - public void setInvalidBlockForSegmentId(List invalidSegmentTimestampList) { - for (UpdateVO anUpdateVO : invalidSegmentTimestampList) { - this.invalidSegmentBlockIdMap.put(anUpdateVO.getSegmentId(), anUpdateVO); - } - } - - public Map getInvalidBlockVOForSegmentId() { - return invalidSegmentBlockIdMap; - } public DataTypeConverter getConverter() { return converter; diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java similarity index 57% rename from hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java rename to core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java index bcf703cbd4e..bb1742cad99 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java +++ b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java @@ -33,11 +33,13 @@ import org.apache.carbondata.core.datastore.block.BlockletInfos; import org.apache.carbondata.core.datastore.block.Distributable; import org.apache.carbondata.core.datastore.block.TableBlockInfo; -import org.apache.carbondata.core.indexstore.Blocklet; import org.apache.carbondata.core.indexstore.BlockletDetailInfo; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapRowIndexes; +import org.apache.carbondata.core.indexstore.row.DataMapRow; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; -import org.apache.carbondata.core.mutate.UpdateVO; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.statusmanager.FileFormat; +import org.apache.carbondata.core.util.BlockletDataMapUtil; import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.path.CarbonTablePath; @@ -45,6 +47,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.hadoop.mapreduce.lib.input.FileSplit; /** @@ -61,10 +64,6 @@ public class CarbonInputSplit extends FileSplit private String bucketId; private String blockletId; - /* - * Invalid segments that need to be removed in task side index - */ - private List invalidSegments; /* * Number of BlockLets in a block @@ -73,14 +72,6 @@ public class CarbonInputSplit extends FileSplit private ColumnarFormatVersion version; - /** - * map of blocklocation and storage id - */ - private Map blockStorageIdMap = - new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - - private List invalidTimestampsList; - /** * list of delete delta files for split */ @@ -97,90 +88,115 @@ public class CarbonInputSplit extends FileSplit */ private Set validBlockletIds; + private transient DataMapRow dataMapRow; + + private transient int[] columnCardinality; + + private transient boolean isLegacyStore; + + private transient List columnSchema; + + private transient boolean useMinMaxForPruning; + + private boolean isBlockCache = true; + + private String filePath; + + private long start; + + private long length; + + private String[] location; + + private transient SplitLocationInfo[] hostInfos; + + private transient Path path; + + private transient String blockPath; + public CarbonInputSplit() { segment = null; taskId = "0"; bucketId = "0"; blockletId = "0"; numberOfBlocklets = 0; - invalidSegments = new ArrayList<>(); version = CarbonProperties.getInstance().getFormatVersion(); } - private CarbonInputSplit(String segmentId, String blockletId, Path path, long start, long length, - String[] locations, ColumnarFormatVersion version, String[] deleteDeltaFiles, + private CarbonInputSplit(String segmentId, String blockletId, String filePath, long start, + long length, ColumnarFormatVersion version, String[] deleteDeltaFiles, String dataMapWritePath) { - super(path, start, length, locations); - this.segment = Segment.toSegment(segmentId); - String taskNo = CarbonTablePath.DataFileUtil.getTaskNo(path.getName()); + this.filePath = filePath; + this.start = start; + this.length = length; + if (null != segmentId) { + this.segment = Segment.toSegment(segmentId); + } + String taskNo = CarbonTablePath.DataFileUtil.getTaskNo(this.filePath); if (taskNo.contains("_")) { taskNo = taskNo.split("_")[0]; } this.taskId = taskNo; - this.bucketId = CarbonTablePath.DataFileUtil.getBucketNo(path.getName()); + this.bucketId = CarbonTablePath.DataFileUtil.getBucketNo(this.filePath); this.blockletId = blockletId; - this.invalidSegments = new ArrayList<>(); this.version = version; this.deleteDeltaFiles = deleteDeltaFiles; this.dataMapWritePath = dataMapWritePath; } - public CarbonInputSplit(String segmentId, String blockletId, Path path, long start, long length, - String[] locations, int numberOfBlocklets, ColumnarFormatVersion version, + public CarbonInputSplit(String segmentId, String blockletId, String filePath, long start, + long length, String[] locations, int numberOfBlocklets, ColumnarFormatVersion version, String[] deleteDeltaFiles) { - this(segmentId, blockletId, path, start, length, locations, version, deleteDeltaFiles, null); + this(segmentId, blockletId, filePath, start, length, version, deleteDeltaFiles, null); + this.location = locations; this.numberOfBlocklets = numberOfBlocklets; } - - public CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations, - FileFormat fileFormat) { - super(path, start, length, locations); + public CarbonInputSplit(String segmentId, String filePath, long start, long length, + String[] locations, FileFormat fileFormat) { + this.filePath = filePath; + this.start = start; + this.length = length; + this.location = locations; this.segment = Segment.toSegment(segmentId); this.fileFormat = fileFormat; taskId = "0"; bucketId = "0"; blockletId = "0"; numberOfBlocklets = 0; - invalidSegments = new ArrayList<>(); version = CarbonProperties.getInstance().getFormatVersion(); } - public CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations, - String[] inMemoryHosts, FileFormat fileFormat) { - super(path, start, length, locations, inMemoryHosts); + public CarbonInputSplit(String segmentId, String filePath, long start, long length, + String[] locations, String[] inMemoryHosts, FileFormat fileFormat) { + this.filePath = filePath; + this.start = start; + this.length = length; + this.location = locations; + this.hostInfos = new SplitLocationInfo[inMemoryHosts.length]; + for (int i = 0; i < inMemoryHosts.length; i++) { + // because N will be tiny, scanning is probably faster than a HashSet + boolean inMemory = false; + for (String inMemoryHost : inMemoryHosts) { + if (inMemoryHost.equals(inMemoryHosts[i])) { + inMemory = true; + break; + } + } + hostInfos[i] = new SplitLocationInfo(inMemoryHosts[i], inMemory); + } this.segment = Segment.toSegment(segmentId); this.fileFormat = fileFormat; taskId = "0"; bucketId = "0"; blockletId = "0"; numberOfBlocklets = 0; - invalidSegments = new ArrayList<>(); version = CarbonProperties.getInstance().getFormatVersion(); } - /** - * Constructor to initialize the CarbonInputSplit with blockStorageIdMap - * @param segmentId - * @param path - * @param start - * @param length - * @param locations - * @param numberOfBlocklets - * @param version - * @param blockStorageIdMap - */ - public CarbonInputSplit(String segmentId, String blockletId, Path path, long start, long length, - String[] locations, int numberOfBlocklets, ColumnarFormatVersion version, - Map blockStorageIdMap, String[] deleteDeltaFiles) { - this(segmentId, blockletId, path, start, length, locations, numberOfBlocklets, version, - deleteDeltaFiles); - this.blockStorageIdMap = blockStorageIdMap; - } - - public static CarbonInputSplit from(String segmentId, String blockletId, FileSplit split, - ColumnarFormatVersion version, String dataMapWritePath) throws IOException { - return new CarbonInputSplit(segmentId, blockletId, split.getPath(), split.getStart(), - split.getLength(), split.getLocations(), version, null, dataMapWritePath); + public static CarbonInputSplit from(String segmentId, String blockletId, String path, long start, + long length, ColumnarFormatVersion version, String dataMapWritePath) throws IOException { + return new CarbonInputSplit(segmentId, blockletId, path, start, length, version, null, + dataMapWritePath); } public static List createBlocks(List splitList) { @@ -190,7 +206,7 @@ public static List createBlocks(List splitList new BlockletInfos(split.getNumberOfBlocklets(), 0, split.getNumberOfBlocklets()); try { TableBlockInfo blockInfo = - new TableBlockInfo(split.getPath().toString(), split.blockletId, split.getStart(), + new TableBlockInfo(split.getFilePath(), split.blockletId, split.getStart(), split.getSegment().toString(), split.getLocations(), split.getLength(), blockletInfos, split.getVersion(), split.getDeleteDeltaFiles()); blockInfo.setDetailInfo(split.getDetailInfo()); @@ -211,7 +227,7 @@ public static TableBlockInfo getTableBlockInfo(CarbonInputSplit inputSplit) { new BlockletInfos(inputSplit.getNumberOfBlocklets(), 0, inputSplit.getNumberOfBlocklets()); try { TableBlockInfo blockInfo = - new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.blockletId, + new TableBlockInfo(inputSplit.getFilePath(), inputSplit.blockletId, inputSplit.getStart(), inputSplit.getSegment().toString(), inputSplit.getLocations(), inputSplit.getLength(), blockletInfos, inputSplit.getVersion(), inputSplit.getDeleteDeltaFiles()); @@ -237,16 +253,13 @@ public Segment getSegment() { @Override public void readFields(DataInput in) throws IOException { - super.readFields(in); + this.filePath = in.readUTF(); + this.start = in.readLong(); + this.length = in.readLong(); this.segment = Segment.toSegment(in.readUTF()); this.version = ColumnarFormatVersion.valueOf(in.readShort()); this.bucketId = in.readUTF(); this.blockletId = in.readUTF(); - int numInvalidSegment = in.readInt(); - invalidSegments = new ArrayList<>(numInvalidSegment); - for (int i = 0; i < numInvalidSegment; i++) { - invalidSegments.add(in.readUTF()); - } int numberOfDeleteDeltaFiles = in.readInt(); deleteDeltaFiles = new String[numberOfDeleteDeltaFiles]; for (int i = 0; i < numberOfDeleteDeltaFiles; i++) { @@ -269,24 +282,24 @@ public Segment getSegment() { } @Override public void write(DataOutput out) throws IOException { - super.write(out); + out.writeUTF(filePath); + out.writeLong(start); + out.writeLong(length); out.writeUTF(segment.toString()); out.writeShort(version.number()); out.writeUTF(bucketId); out.writeUTF(blockletId); - out.writeInt(invalidSegments.size()); - for (String invalidSegment : invalidSegments) { - out.writeUTF(invalidSegment); - } out.writeInt(null != deleteDeltaFiles ? deleteDeltaFiles.length : 0); if (null != deleteDeltaFiles) { for (int i = 0; i < deleteDeltaFiles.length; i++) { out.writeUTF(deleteDeltaFiles[i]); } } - out.writeBoolean(detailInfo != null); + out.writeBoolean(detailInfo != null || dataMapRow != null); if (detailInfo != null) { detailInfo.write(out); + } else if (dataMapRow != null) { + writeBlockletDetailsInfo(out); } out.writeBoolean(dataMapWritePath != null); if (dataMapWritePath != null) { @@ -298,26 +311,6 @@ public Segment getSegment() { } } - public List getInvalidSegments() { - return invalidSegments; - } - - public void setInvalidSegments(List invalidSegments) { - List invalidSegmentIds = new ArrayList<>(); - for (Segment segment: invalidSegments) { - invalidSegmentIds.add(segment.getSegmentNo()); - } - this.invalidSegments = invalidSegmentIds; - } - - public void setInvalidTimestampRange(List invalidTimestamps) { - invalidTimestampsList = invalidTimestamps; - } - - public List getInvalidTimestampRange() { - return invalidTimestampsList; - } - /** * returns the number of blocklets * @@ -351,7 +344,7 @@ public String getBucketId() { // converr seg ID to double. double seg1 = Double.parseDouble(segment.getSegmentNo()); - double seg2 = Double.parseDouble(other.getSegmentId()); + double seg2 = Double.parseDouble(other.segment.getSegmentNo()); if (seg1 - seg2 < 0) { return -1; } @@ -363,8 +356,8 @@ public String getBucketId() { // if both the task id of the file is same then we need to compare the // offset of // the file - String filePath1 = this.getPath().getName(); - String filePath2 = other.getPath().getName(); + String filePath1 = this.getFilePath(); + String filePath2 = other.getFilePath(); if (CarbonTablePath.isCarbonDataFile(filePath1)) { byte[] firstTaskId = CarbonTablePath.DataFileUtil.getTaskNo(filePath1) .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); @@ -410,13 +403,15 @@ public String getBucketId() { int result = taskId.hashCode(); result = 31 * result + segment.hashCode(); result = 31 * result + bucketId.hashCode(); - result = 31 * result + invalidSegments.hashCode(); result = 31 * result + numberOfBlocklets; return result; } @Override public String getBlockPath() { - return getPath().getName(); + if (null == blockPath) { + blockPath = getPath().getName(); + } + return blockPath; } @Override public List getMatchedBlocklets() { @@ -429,10 +424,11 @@ public String getBucketId() { /** * returns map of blocklocation and storage id + * * @return */ public Map getBlockStorageIdMap() { - return blockStorageIdMap; + return new HashMap<>(); } public String[] getDeleteDeltaFiles() { @@ -443,10 +439,6 @@ public void setDeleteDeltaFiles(String[] deleteDeltaFiles) { this.deleteDeltaFiles = deleteDeltaFiles; } - public BlockletDetailInfo getDetailInfo() { - return detailInfo; - } - public void setDetailInfo(BlockletDetailInfo detailInfo) { this.detailInfo = detailInfo; } @@ -459,10 +451,6 @@ public void setFormat(FileFormat fileFormat) { this.fileFormat = fileFormat; } - public Blocklet makeBlocklet() { - return new Blocklet(getPath().getName(), blockletId); - } - public Set getValidBlockletIds() { if (null == validBlockletIds) { validBlockletIds = new HashSet<>(); @@ -474,4 +462,158 @@ public void setValidBlockletIds(Set validBlockletIds) { this.validBlockletIds = validBlockletIds; } + public void setDataMapWritePath(String dataMapWritePath) { + this.dataMapWritePath = dataMapWritePath; + } + + public void setSegment(Segment segment) { + this.segment = segment; + } + + public String getDataMapWritePath() { + return dataMapWritePath; + } + + public void setDataMapRow(DataMapRow dataMapRow) { + this.dataMapRow = dataMapRow; + } + + public void setColumnCardinality(int[] columnCardinality) { + this.columnCardinality = columnCardinality; + } + + public void setLegacyStore(boolean legacyStore) { + isLegacyStore = legacyStore; + } + + public void setColumnSchema(List columnSchema) { + this.columnSchema = columnSchema; + } + + public void setUseMinMaxForPruning(boolean useMinMaxForPruning) { + this.useMinMaxForPruning = useMinMaxForPruning; + } + + public void setIsBlockCache(boolean isBlockCache) { + this.isBlockCache = isBlockCache; + } + + private void writeBlockletDetailsInfo(DataOutput out) throws IOException { + out.writeInt(this.dataMapRow.getInt(BlockletDataMapRowIndexes.ROW_COUNT_INDEX)); + if (this.isBlockCache) { + out.writeShort(0); + } else { + out.writeShort(this.dataMapRow.getShort(BlockletDataMapRowIndexes.BLOCKLET_PAGE_COUNT_INDEX)); + } + out.writeShort(this.dataMapRow.getShort(BlockletDataMapRowIndexes.VERSION_INDEX)); + out.writeShort(Short.parseShort(this.blockletId)); + out.writeShort(this.columnCardinality.length); + for (int i = 0; i < this.columnCardinality.length; i++) { + out.writeInt(this.columnCardinality[i]); + } + out.writeLong(this.dataMapRow.getLong(BlockletDataMapRowIndexes.SCHEMA_UPADATED_TIME_INDEX)); + out.writeBoolean(false); + out.writeLong(this.dataMapRow.getLong(BlockletDataMapRowIndexes.BLOCK_FOOTER_OFFSET)); + // write -1 if columnSchemaBinary is null so that at the time of reading it can distinguish + // whether schema is written or not + if (null != this.columnSchema) { + byte[] columnSchemaBinary = BlockletDataMapUtil.convertSchemaToBinary(this.columnSchema); + out.writeInt(columnSchemaBinary.length); + out.write(columnSchemaBinary); + } else { + // write -1 if columnSchemaBinary is null so that at the time of reading it can distinguish + // whether schema is written or not + out.writeInt(-1); + } + if (this.isBlockCache) { + out.writeInt(0); + out.write(new byte[0]); + } else { + byte[] blockletInfoBinary = + this.dataMapRow.getByteArray(BlockletDataMapRowIndexes.BLOCKLET_INFO_INDEX); + out.writeInt(blockletInfoBinary.length); + out.write(blockletInfoBinary); + } + out.writeLong(this.dataMapRow.getLong(BlockletDataMapRowIndexes.BLOCK_LENGTH)); + out.writeBoolean(this.isLegacyStore); + out.writeBoolean(this.useMinMaxForPruning); + } + + public BlockletDetailInfo getDetailInfo() { + if (null != dataMapRow && detailInfo == null) { + detailInfo = new BlockletDetailInfo(); + detailInfo + .setRowCount(this.dataMapRow.getInt(BlockletDataMapRowIndexes.ROW_COUNT_INDEX)); + detailInfo + .setVersionNumber(this.dataMapRow.getShort(BlockletDataMapRowIndexes.VERSION_INDEX)); + detailInfo.setBlockletId(Short.parseShort(this.blockletId)); + detailInfo.setDimLens(this.columnCardinality); + detailInfo.setSchemaUpdatedTimeStamp( + this.dataMapRow.getLong(BlockletDataMapRowIndexes.SCHEMA_UPADATED_TIME_INDEX)); + detailInfo.setBlockFooterOffset( + this.dataMapRow.getLong(BlockletDataMapRowIndexes.BLOCK_FOOTER_OFFSET)); + detailInfo + .setBlockSize(this.dataMapRow.getLong(BlockletDataMapRowIndexes.BLOCK_LENGTH)); + detailInfo.setLegacyStore(isLegacyStore); + detailInfo.setUseMinMaxForPruning(useMinMaxForPruning); + if (!this.isBlockCache) { + detailInfo.setColumnSchemas(this.columnSchema); + detailInfo.setPagesCount( + this.dataMapRow.getShort(BlockletDataMapRowIndexes.BLOCKLET_PAGE_COUNT_INDEX)); + detailInfo.setBlockletInfoBinary( + this.dataMapRow.getByteArray(BlockletDataMapRowIndexes.BLOCKLET_INFO_INDEX)); + } else { + detailInfo.setBlockletInfoBinary(new byte[0]); + } + if (location == null) { + try { + location = new String(dataMapRow.getByteArray(BlockletDataMapRowIndexes.LOCATIONS), + CarbonCommonConstants.DEFAULT_CHARSET).split(","); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + dataMapRow = null; + } + return detailInfo; + } + + @Override + public SplitLocationInfo[] getLocationInfo() throws IOException { + return hostInfos; + } + + /** + * The file containing this split's data. + */ + public Path getPath() { + if (path == null) { + path = new Path(filePath); + return path; + } + return path; + } + + public String getFilePath() { + return this.filePath; + } + + /** The position of the first byte in the file to process. */ + public long getStart() { return start; } + + @Override + public long getLength() { return length; } + + @Override + public String toString() { return filePath + ":" + start + "+" + length; } + + @Override public String[] getLocations() throws IOException { + if (this.location == null && dataMapRow == null) { + return new String[] {}; + } else if (dataMapRow != null) { + location = new String(dataMapRow.getByteArray(BlockletDataMapRowIndexes.LOCATIONS), + CarbonCommonConstants.DEFAULT_CHARSET).split(","); + } + return this.location; + } } diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/ObjectArrayWritable.java b/core/src/main/java/org/apache/carbondata/hadoop/internal/ObjectArrayWritable.java similarity index 100% rename from hadoop/src/main/java/org/apache/carbondata/hadoop/internal/ObjectArrayWritable.java rename to core/src/main/java/org/apache/carbondata/hadoop/internal/ObjectArrayWritable.java diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/Block.java b/core/src/main/java/org/apache/carbondata/hadoop/internal/index/Block.java similarity index 100% rename from hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/Block.java rename to core/src/main/java/org/apache/carbondata/hadoop/internal/index/Block.java diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java index 0b991cb9f9f..4c99c4f89ac 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java @@ -100,7 +100,7 @@ public void setLength(long length) { public void calculateLength() { long total = 0; - if (splitList.size() > 0 && splitList.get(0).getDetailInfo() != null) { + if (splitList.size() > 1 && splitList.get(0).getDetailInfo() != null) { Map blockSizes = new HashMap<>(); for (CarbonInputSplit split : splitList) { blockSizes.put(split.getBlockPath(), split.getDetailInfo().getBlockSize()); @@ -116,11 +116,21 @@ public void calculateLength() { length = total; } - @Override - public String[] getLocations() { + @Override public String[] getLocations() { + getLocationIfNull(); return locations; } + private void getLocationIfNull() { + try { + if (locations == null && splitList.size() == 1) { + this.locations = this.splitList.get(0).getLocations(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + @Override public void write(DataOutput out) throws IOException { // write number of splits and then write all splits @@ -128,6 +138,7 @@ public void write(DataOutput out) throws IOException { for (CarbonInputSplit split: splitList) { split.write(out); } + getLocationIfNull(); out.writeInt(locations.length); for (int i = 0; i < locations.length; i++) { out.writeUTF(locations[i]); diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java index 1dfead3c252..d6ffb2edcef 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java @@ -81,7 +81,7 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext context) List splitList; if (inputSplit instanceof CarbonInputSplit) { splitList = new ArrayList<>(1); - String splitPath = ((CarbonInputSplit) inputSplit).getPath().toString(); + String splitPath = ((CarbonInputSplit) inputSplit).getFilePath(); // BlockFooterOffSet will be null in case of CarbonVectorizedReader as this has to be set // where multiple threads are able to read small set of files to calculate footer instead // of the main thread setting this for all the files. diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java index 7c08dd91380..d81b02c3749 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java @@ -48,7 +48,6 @@ import org.apache.carbondata.hadoop.CarbonInputSplit; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; @@ -167,7 +166,7 @@ public List getSplits(JobContext job) throws IOException { // Segment id is set to null because SDK does not write carbondata files with respect // to segments. So no specific name is present for this load. CarbonInputSplit split = - new CarbonInputSplit("null", new Path(carbonFile.getAbsolutePath()), 0, + new CarbonInputSplit("null", carbonFile.getAbsolutePath(), 0, carbonFile.getLength(), carbonFile.getLocations(), FileFormat.COLUMNAR_V3); split.setVersion(ColumnarFormatVersion.V3); BlockletDetailInfo info = new BlockletDetailInfo(); @@ -179,7 +178,8 @@ public List getSplits(JobContext job) throws IOException { } Collections.sort(splits, new Comparator() { @Override public int compare(InputSplit o1, InputSplit o2) { - return ((CarbonInputSplit) o1).getPath().compareTo(((CarbonInputSplit) o2).getPath()); + return ((CarbonInputSplit) o1).getFilePath() + .compareTo(((CarbonInputSplit) o2).getFilePath()); } }); } diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java index 26144e27950..aba0ab74dba 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java @@ -21,7 +21,14 @@ import java.io.DataInputStream; import java.io.IOException; import java.lang.reflect.Constructor; -import java.util.*; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; @@ -38,13 +45,11 @@ import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.metadata.schema.PartitionInfo; import org.apache.carbondata.core.metadata.schema.partition.PartitionType; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; -import org.apache.carbondata.core.mutate.UpdateVO; import org.apache.carbondata.core.profiler.ExplainCollector; import org.apache.carbondata.core.readcommitter.ReadCommittedScope; import org.apache.carbondata.core.scan.expression.Expression; @@ -80,7 +85,6 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.log4j.Logger; @@ -408,7 +412,6 @@ protected List getDataBlocksOfSegment(JobContext job, CarbonTa new Path[] { new Path(carbonTable.getTablePath()) }, job.getConfiguration()); List prunedBlocklets = getPrunedBlocklets(job, carbonTable, expression, segmentIds); - List resultFilteredBlocks = new ArrayList<>(); int partitionIndex = 0; List partitionIdList = new ArrayList<>(); @@ -416,13 +419,13 @@ protected List getDataBlocksOfSegment(JobContext job, CarbonTa partitionIdList = partitionInfo.getPartitionIds(); } for (ExtendedBlocklet blocklet : prunedBlocklets) { - long partitionId = CarbonTablePath.DataFileUtil - .getTaskIdFromTaskNo(CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath())); // OldPartitionIdList is only used in alter table partition command because it change // partition info first and then read data. // For other normal query should use newest partitionIdList if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) { + long partitionId = CarbonTablePath.DataFileUtil + .getTaskIdFromTaskNo(CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath())); if (oldPartitionIdList != null) { partitionIndex = oldPartitionIdList.indexOf((int) partitionId); } else { @@ -436,10 +439,7 @@ protected List getDataBlocksOfSegment(JobContext job, CarbonTa // for partition table, the task id of carbaondata file name is the partition id. // if this partition is not required, here will skip it. if (matchedPartitions == null || matchedPartitions.get(partitionIndex)) { - CarbonInputSplit inputSplit = convertToCarbonInputSplit(blocklet); - if (inputSplit != null) { - resultFilteredBlocks.add(inputSplit); - } + resultFilteredBlocks.add(blocklet.getInputSplit()); } } } @@ -493,7 +493,9 @@ private List getPrunedBlocklets(JobContext job, CarbonTable ca prunedBlocklets = defaultDataMap.prune(segmentIds, expression, partitionsToPrune); } - ExplainCollector.setDefaultDataMapPruningBlockHit(getBlockCount(prunedBlocklets)); + if (ExplainCollector.enabled()) { + ExplainCollector.setDefaultDataMapPruningBlockHit(getBlockCount(prunedBlocklets)); + } if (prunedBlocklets.size() == 0) { return prunedBlocklets; @@ -577,7 +579,7 @@ private void pruneSegments(List segments, List pruned segment.getFilteredIndexShardNames().clear(); // Check the segment exist in any of the pruned blocklets. for (ExtendedBlocklet blocklet : prunedBlocklets) { - if (blocklet.getSegmentId().equals(segment.toString())) { + if (blocklet.getSegment().toString().equals(segment.toString())) { found = true; // Set the pruned index file to the segment for further pruning. String shardName = CarbonTablePath.getShardName(blocklet.getFilePath()); @@ -593,17 +595,6 @@ private void pruneSegments(List segments, List pruned segments.removeAll(toBeRemovedSegments); } - private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet blocklet) throws IOException { - CarbonInputSplit split = CarbonInputSplit - .from(blocklet.getSegmentId(), blocklet.getBlockletId(), - new FileSplit(new Path(blocklet.getPath()), 0, blocklet.getLength(), - blocklet.getLocations()), - ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()), - blocklet.getDataMapWriterPath()); - split.setDetailInfo(blocklet.getDetailInfo()); - return split; - } - @Override public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { Configuration configuration = taskAttemptContext.getConfiguration(); @@ -639,20 +630,6 @@ public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext tas .filterExpression(filterExpression) .dataConverter(getDataTypeConverter(configuration)) .build(); - - // update the file level index store if there are invalid segment - if (inputSplit instanceof CarbonMultiBlockSplit) { - CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit; - List invalidSegments = split.getAllSplits().get(0).getInvalidSegments(); - if (invalidSegments.size() > 0) { - queryModel.setInvalidSegmentIds(invalidSegments); - } - List invalidTimestampRangeList = - split.getAllSplits().get(0).getInvalidTimestampRange(); - if ((null != invalidTimestampRangeList) && (invalidTimestampRangeList.size() > 0)) { - queryModel.setInvalidBlockForSegmentId(invalidTimestampRangeList); - } - } return queryModel; } @@ -672,7 +649,7 @@ private void checkAndAddImplicitExpression(Expression expression, InputSplit inp for (CarbonInputSplit carbonInputSplit : splits) { Set validBlockletIds = carbonInputSplit.getValidBlockletIds(); if (null != validBlockletIds && !validBlockletIds.isEmpty()) { - String uniqueBlockPath = carbonInputSplit.getPath().toString(); + String uniqueBlockPath = carbonInputSplit.getFilePath(); String shortBlockPath = CarbonTablePath .getShortBlockId(uniqueBlockPath.substring(uniqueBlockPath.lastIndexOf("/Part") + 1)); blockIdToBlockletIdMapping.put(shortBlockPath, validBlockletIds); diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 281143b0ce3..494831f6ad7 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -142,7 +142,6 @@ public List getSplits(JobContext job) throws IOException { SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(carbonTable, loadMetadataDetails); List invalidSegments = new ArrayList<>(); - List invalidTimestampsList = new ArrayList<>(); List streamSegments = null; // get all valid segments and set them into the configuration SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier, @@ -180,7 +179,6 @@ public List getSplits(JobContext job) throws IOException { } // remove entry in the segment index if there are invalid segments invalidSegments.addAll(segments.getInvalidSegments()); - invalidTimestampsList.addAll(updateStatusManager.getInvalidTimestampRange()); if (invalidSegments.size() > 0) { DataMapStoreManager.getInstance() .clearInvalidSegments(getOrCreateCarbonTable(job.getConfiguration()), invalidSegments); @@ -220,15 +218,6 @@ public List getSplits(JobContext job) throws IOException { List splits = getSplits(job, filter, filteredSegmentToAccess, matchedPartitions, partitionInfo, null, updateStatusManager); - // pass the invalid segment to task side in order to remove index entry in task side - if (invalidSegments.size() > 0) { - for (InputSplit split : splits) { - ((org.apache.carbondata.hadoop.CarbonInputSplit) split).setInvalidSegments(invalidSegments); - ((org.apache.carbondata.hadoop.CarbonInputSplit) split) - .setInvalidTimestampRange(invalidTimestampsList); - } - } - // add all splits of streaming List splitsOfStreaming = getSplitsOfStreaming(job, streamSegments, carbonTable); if (!splitsOfStreaming.isEmpty()) { @@ -384,16 +373,15 @@ public List getSplitsOfStreaming(JobContext job, List strea // there is 10% slop to avoid to generate very small split in the end while (((double) bytesRemaining) / splitSize > 1.1) { int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); - splits.add( - makeSplit(streamFile.getSegmentNo(), path, length - bytesRemaining, - splitSize, blkLocations[blkIndex].getHosts(), + splits.add(makeSplit(streamFile.getSegmentNo(), streamFile.getFilePath(), + length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1)); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); - splits.add(makeSplit(streamFile.getSegmentNo(), path, length - bytesRemaining, - bytesRemaining, blkLocations[blkIndex].getHosts(), + splits.add(makeSplit(streamFile.getSegmentNo(), streamFile.getFilePath(), + length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1)); } } @@ -402,15 +390,10 @@ public List getSplitsOfStreaming(JobContext job, List strea return splits; } - protected FileSplit makeSplit(String segmentId, Path file, long start, long length, - String[] hosts, FileFormat fileFormat) { - return new CarbonInputSplit(segmentId, file, start, length, hosts, fileFormat); - } - - - protected FileSplit makeSplit(String segmentId, Path file, long start, long length, + protected FileSplit makeSplit(String segmentId, String filePath, long start, long length, String[] hosts, String[] inMemoryHosts, FileFormat fileFormat) { - return new CarbonInputSplit(segmentId, file, start, length, hosts, inMemoryHosts, fileFormat); + return new CarbonInputSplit(segmentId, filePath, start, length, hosts, inMemoryHosts, + fileFormat); } /** @@ -422,10 +405,6 @@ protected FileSplit makeSplit(String segmentId, Path file, long start, long leng */ public List getSplitsOfOneSegment(JobContext job, String targetSegment, List oldPartitionIdList, PartitionInfo partitionInfo) { - List invalidSegments = new ArrayList<>(); - List invalidTimestampsList = new ArrayList<>(); - - try { carbonTable = getOrCreateCarbonTable(job.getConfiguration()); ReadCommittedScope readCommittedScope = @@ -465,13 +444,6 @@ public List getSplitsOfOneSegment(JobContext job, String targetSegme // do block filtering and get split List splits = getSplits(job, filter, segmentList, matchedPartitions, partitionInfo, oldPartitionIdList, new SegmentUpdateStatusManager(carbonTable)); - // pass the invalid segment to task side in order to remove index entry in task side - if (invalidSegments.size() > 0) { - for (InputSplit split : splits) { - ((CarbonInputSplit) split).setInvalidSegments(invalidSegments); - ((CarbonInputSplit) split).setInvalidTimestampRange(invalidTimestampsList); - } - } return splits; } catch (IOException e) { throw new RuntimeException("Can't get splits of the target segment ", e); @@ -542,7 +514,7 @@ private List getSplits(JobContext job, Expression expression, // In case IUD is not performed in this table avoid searching for // invalidated blocks. if (CarbonUtil - .isInvalidTableBlock(inputSplit.getSegmentId(), inputSplit.getPath().toString(), + .isInvalidTableBlock(inputSplit.getSegmentId(), inputSplit.getFilePath(), invalidBlockVOForSegmentId, updateStatusManager)) { continue; } diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java index e18a4d48af1..1c11275d224 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java @@ -81,7 +81,7 @@ public CarbonVectorizedRecordReader(QueryModel queryModel) { List splitList; if (inputSplit instanceof CarbonInputSplit) { // Read the footer offset and set. - String splitPath = ((CarbonInputSplit) inputSplit).getPath().toString(); + String splitPath = ((CarbonInputSplit) inputSplit).getFilePath(); if (((CarbonInputSplit) inputSplit).getDetailInfo().getBlockFooterOffset() == 0L) { FileReader reader = FileFactory.getFileHolder(FileFactory.getFileType(splitPath), taskAttemptContext.getConfiguration()); diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java index f4f50a57918..f68234c19f1 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java @@ -29,8 +29,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.gson.Gson; -import org.apache.hadoop.fs.Path; - /** * CarbonLocalInputSplit represents a block, it contains a set of blocklet. */ @@ -136,7 +134,7 @@ public void setDetailInfo(BlockletDetailInfo blockletDetailInfo) { public static CarbonInputSplit convertSplit(CarbonLocalInputSplit carbonLocalInputSplit) { CarbonInputSplit inputSplit = new CarbonInputSplit(carbonLocalInputSplit.getSegmentId(), - carbonLocalInputSplit.getBlockletId(), new Path(carbonLocalInputSplit.getPath()), + carbonLocalInputSplit.getBlockletId(), carbonLocalInputSplit.getPath(), carbonLocalInputSplit.getStart(), carbonLocalInputSplit.getLength(), carbonLocalInputSplit.getLocations() .toArray(new String[carbonLocalInputSplit.getLocations().size()]), diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java index d1193f5b2aa..20a2d39e859 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java @@ -52,7 +52,8 @@ public static String[] getConfiguredLocalDirs(SparkConf conf) { */ public static boolean isBlockWithoutBlockletInfoExists(List splitList) { for (CarbonInputSplit inputSplit : splitList) { - if (null == inputSplit.getDetailInfo().getBlockletInfo()) { + if (null == inputSplit.getDetailInfo() || null == inputSplit.getDetailInfo() + .getBlockletInfo()) { return true; } } diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 96d288f4dc6..0e44f6d305b 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -330,11 +330,11 @@ class CarbonMergerRDD[K, V]( .filter { split => FileFormat.COLUMNAR_V3.equals(split.getFileFormat) }.toList.asJava } carbonInputSplits ++:= splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter{ entry => - val blockInfo = new TableBlockInfo(entry.getPath.toString, + val blockInfo = new TableBlockInfo(entry.getFilePath, entry.getStart, entry.getSegmentId, entry.getLocations, entry.getLength, entry.getVersion, updateStatusManager.getDeleteDeltaFilePath( - entry.getPath.toString, + entry.getFilePath, Segment.toSegment(entry.getSegmentId).getSegmentNo) ) (!updated || (updated && (!CarbonUtil diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 0ab6a3aaba7..9e661396066 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -344,13 +344,12 @@ class CarbonScanRDD[T: ClassTag]( closePartition() } else { // Use block distribution - splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).groupBy { f => - f.getSegmentId.concat(f.getBlockPath) - }.values.zipWithIndex.foreach { splitWithIndex => + splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).zipWithIndex.foreach { + splitWithIndex => val multiBlockSplit = new CarbonMultiBlockSplit( - splitWithIndex._1.asJava, - splitWithIndex._1.flatMap(f => f.getLocations).distinct.toArray) + Seq(splitWithIndex._1).asJava, + null) val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit) result.add(partition) } @@ -704,7 +703,7 @@ class CarbonScanRDD[T: ClassTag]( }.asInstanceOf[java.util.List[CarbonInputSplit]] // for each split and given block path set all the valid blocklet ids splitList.asScala.map { split => - val uniqueBlockPath = split.getPath.toString + val uniqueBlockPath = split.getFilePath val shortBlockPath = CarbonTablePath .getShortBlockId(uniqueBlockPath .substring(uniqueBlockPath.lastIndexOf("/Part") + 1)) diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala index f725de37b35..6819a4c97b0 100644 --- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala +++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala @@ -383,7 +383,7 @@ class SparkCarbonFileFormat extends FileFormat if (file.filePath.endsWith(CarbonTablePath.CARBON_DATA_EXT)) { val split = new CarbonInputSplit("null", - new Path(new URI(file.filePath)), + new Path(new URI(file.filePath)).toString, file.start, file.length, file.locations, @@ -394,10 +394,10 @@ class SparkCarbonFileFormat extends FileFormat split.setDetailInfo(info) info.setBlockSize(file.length) // Read the footer offset and set. - val reader = FileFactory.getFileHolder(FileFactory.getFileType(split.getPath.toString), + val reader = FileFactory.getFileHolder(FileFactory.getFileType(split.getFilePath), broadcastedHadoopConf.value.value) val buffer = reader - .readByteBuffer(FileFactory.getUpdatedFilePath(split.getPath.toString), + .readByteBuffer(FileFactory.getUpdatedFilePath(split.getFilePath), file.length - 8, 8) info.setBlockFooterOffset(buffer.getLong) From 5085f6ef992f725514627cfd342f94597ca581f7 Mon Sep 17 00:00:00 2001 From: kumarvishal09 Date: Wed, 20 Mar 2019 18:56:49 +0530 Subject: [PATCH 2/9] Fixed Comment --- .../core/datastore/impl/FileFactory.java | 41 ++++++++++++------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java index 1182b497c6b..a27023fc159 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java @@ -81,20 +81,18 @@ public static FileReader getFileHolder(FileType fileType, Configuration configur } public static FileType getFileType(String path) { - if (path.startsWith(CarbonCommonConstants.HDFSURL_PREFIX) || path.toLowerCase() - .startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) { - return FileType.HDFS; - } else if (path.startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX) || path.toLowerCase() - .startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX)) { - return FileType.ALLUXIO; - } else if (path.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX) || path.toLowerCase() - .startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX)) { - return FileType.VIEWFS; - } else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX) || path - .startsWith(CarbonCommonConstants.S3A_PREFIX) || path - .startsWith(CarbonCommonConstants.S3_PREFIX)) { - return FileType.S3; + FileType fileType = getFileTypeWithActualPath(path); + if (fileType != null) { + return fileType; + } + fileType = getFileTypeWithLowerCase(path); + if (fileType != null) { + return fileType; } + return FileType.LOCAL; + } + + private static FileType getFileTypeWithLowerCase(String path) { String lowerCase = path.toLowerCase(); if (lowerCase.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) { return FileType.HDFS; @@ -107,7 +105,22 @@ public static FileType getFileType(String path) { .startsWith(CarbonCommonConstants.S3_PREFIX)) { return FileType.S3; } - return FileType.LOCAL; + return null; + } + + private static FileType getFileTypeWithActualPath(String path) { + if (path.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) { + return FileType.HDFS; + } else if (path.startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX)) { + return FileType.ALLUXIO; + } else if (path.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX)) { + return FileType.VIEWFS; + } else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX) || path + .startsWith(CarbonCommonConstants.S3A_PREFIX) || path + .startsWith(CarbonCommonConstants.S3_PREFIX)) { + return FileType.S3; + } + return null; } public static CarbonFile getCarbonFile(String path) { From d847a8a4bd076f75bc0b08c86f746feb0ee1e30d Mon Sep 17 00:00:00 2001 From: kumarvishal09 Date: Wed, 20 Mar 2019 23:35:04 +0530 Subject: [PATCH 3/9] remove testcase --- .../datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala index f6e5eab1a8f..11f10b94926 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala @@ -7,7 +7,7 @@ import org.apache.commons.io.FileUtils import org.apache.spark.sql.{CarbonEnv, SaveMode} import org.apache.spark.sql.test.Spark2TestQueryExecutor import org.apache.spark.sql.test.util.QueryTest -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Ignore} import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonV3DataFormatConstants} import org.apache.carbondata.core.datamap.status.DataMapStatusManager @@ -16,7 +16,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.datamap.bloom.BloomCoarseGrainDataMapTestUtil.deleteFile import org.apache.carbondata.datamap.bloom.BloomCoarseGrainDataMapTestUtil.createFile import org.apache.carbondata.datamap.bloom.BloomCoarseGrainDataMapTestUtil.checkBasicQuery - +@Ignore class BloomCoarseGrainDataMapFunctionSuite extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach { val bigFile = s"$resourcesPath/bloom_datamap_function_test_big.csv" val normalTable = "carbon_normal" From 1c986bc87c9d2538b89a434c7f2e23992f09e1bc Mon Sep 17 00:00:00 2001 From: kumarvishal09 Date: Thu, 21 Mar 2019 13:39:56 +0530 Subject: [PATCH 4/9] updated code testcase --- .../carbondata/core/datamap/Segment.java | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java index 85445eb0ec1..4797b533c8f 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java @@ -62,6 +62,8 @@ public class Segment implements Serializable { */ private LoadMetadataDetails loadMetadataDetails; + private String segmentString; + public Segment(String segmentNo) { this.segmentNo = segmentNo; } @@ -69,6 +71,7 @@ public Segment(String segmentNo) { public Segment(String segmentNo, ReadCommittedScope readCommittedScope) { this.segmentNo = segmentNo; this.readCommittedScope = readCommittedScope; + segmentString = segmentNo; } /** @@ -82,6 +85,11 @@ public Segment(String segmentNo, String segmentFileName) { this.segmentNo = segmentNo; this.segmentFileName = segmentFileName; this.readCommittedScope = null; + if (segmentFileName != null) { + segmentString = segmentNo + "#" + segmentFileName; + } else { + segmentString = segmentNo; + } } /** @@ -94,6 +102,11 @@ public Segment(String segmentNo, String segmentFileName, ReadCommittedScope read this.segmentNo = segmentNo; this.segmentFileName = segmentFileName; this.readCommittedScope = readCommittedScope; + if (segmentFileName != null) { + segmentString = segmentNo + "#" + segmentFileName; + } else { + segmentString = segmentNo; + } } /** @@ -107,6 +120,11 @@ public Segment(String segmentNo, String segmentFileName, ReadCommittedScope read this.segmentFileName = segmentFileName; this.readCommittedScope = readCommittedScope; this.loadMetadataDetails = loadMetadataDetails; + if (segmentFileName != null) { + segmentString = segmentNo + "#" + segmentFileName; + } else { + segmentString = segmentNo; + } } /** @@ -233,11 +251,7 @@ public void setFilteredIndexShardName(String filteredIndexShardName) { } @Override public String toString() { - if (segmentFileName != null) { - return segmentNo + "#" + segmentFileName; - } else { - return segmentNo; - } + return segmentString; } public LoadMetadataDetails getLoadMetadataDetails() { From c712473a53877325e1d12b2ec0dd19353a0deb3c Mon Sep 17 00:00:00 2001 From: kumarvishal09 Date: Thu, 21 Mar 2019 14:07:57 +0530 Subject: [PATCH 5/9] fixed failures --- .../carbondata/hadoop/api/CarbonTableInputFormat.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 494831f6ad7..68d4ce7ddfd 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -310,13 +310,6 @@ private List getFilteredSegment(JobContext job, List validSegm } } } - if (filteredSegmentToAccess.size() != segmentToAccessSet.size() && !validationRequired) { - for (Segment segment : segmentToAccessSet) { - if (!filteredSegmentToAccess.contains(segment)) { - filteredSegmentToAccess.add(segment); - } - } - } if (!filteredSegmentToAccess.containsAll(segmentToAccessSet)) { List filteredSegmentToAccessTemp = new ArrayList<>(filteredSegmentToAccess); filteredSegmentToAccessTemp.removeAll(segmentToAccessSet); From b4e4995095154f041be50f5f7b40d4dd82ad1ef4 Mon Sep 17 00:00:00 2001 From: kumarvishal09 Date: Thu, 21 Mar 2019 15:02:21 +0530 Subject: [PATCH 6/9] added test cases --- .../datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala index 11f10b94926..4010ef658db 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala @@ -16,7 +16,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.datamap.bloom.BloomCoarseGrainDataMapTestUtil.deleteFile import org.apache.carbondata.datamap.bloom.BloomCoarseGrainDataMapTestUtil.createFile import org.apache.carbondata.datamap.bloom.BloomCoarseGrainDataMapTestUtil.checkBasicQuery -@Ignore + class BloomCoarseGrainDataMapFunctionSuite extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach { val bigFile = s"$resourcesPath/bloom_datamap_function_test_big.csv" val normalTable = "carbon_normal" From 848f57cd8737194b6a9a80175ed53285cfef8c23 Mon Sep 17 00:00:00 2001 From: kumarvishal09 Date: Thu, 21 Mar 2019 15:43:58 +0530 Subject: [PATCH 7/9] fixed unsafe issue --- .../org/apache/carbondata/core/indexstore/row/DataMapRow.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java index c79735fbcb0..18adc06334b 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java @@ -78,6 +78,8 @@ public int getTotalSizeInBytes() { for (int i = 0; i < schemas.length; i++) { len += getSizeInBytes(i); } + // for last offset in unsafe data map row + len += 4; return len; } @@ -86,7 +88,6 @@ public int getSizeInBytes(int ordinal) { case FIXED: return schemas[ordinal].getLength(); case VARIABLE_SHORT: - return getLengthInBytes(ordinal) + 2; case VARIABLE_INT: return getLengthInBytes(ordinal) + 4; case STRUCT: From b3c8f3e6a925e7321348040932a3e7a2f2b405a9 Mon Sep 17 00:00:00 2001 From: kumarvishal09 Date: Thu, 21 Mar 2019 19:11:56 +0530 Subject: [PATCH 8/9] fixed unsafe issue --- .../carbondata/core/indexstore/UnsafeMemoryDMStore.java | 4 ++-- .../carbondata/core/indexstore/row/UnsafeDataMapRow.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java index 8185c250ccd..161730d2975 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java @@ -140,7 +140,7 @@ public void addIndexRow(CarbonRowSchema[] schema, DataMapRow indexRow) throws Me // byte position of Last offset bytePosition += CarbonCommonConstants.INT_SIZE_IN_BYTE; // start byte position of variable length data - int varColPosition = bytePosition + CarbonCommonConstants.INT_SIZE_IN_BYTE; + int varColPosition = pointer + bytePosition + CarbonCommonConstants.INT_SIZE_IN_BYTE; // current position refers to current byte postion in memory block int currentPosition; for (int i = 0; i < schema.length; i++) { @@ -231,7 +231,7 @@ private int addToUnsafe(CarbonRowSchema schema, DataMapRow row, int index, int s memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(), varPosition); runningLength += 4; getUnsafe().copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(), - memoryBlock.getBaseOffset() + startOffset + varPosition, data.length); + memoryBlock.getBaseOffset() + varPosition, data.length); runningLength += data.length; varPosition += data.length; return varPosition; diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java index e77d11ceb59..e16ff286902 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java @@ -45,7 +45,7 @@ public UnsafeDataMapRow(CarbonRowSchema[] schemas, MemoryBlock block, int pointe @Override public byte[] getByteArray(int ordinal) { int length; - int currentOffset = 0; + int currentOffset; switch (schemas[ordinal].getSchemaType()) { case VARIABLE_SHORT: case VARIABLE_INT: @@ -62,7 +62,7 @@ public UnsafeDataMapRow(CarbonRowSchema[] schemas, MemoryBlock block, int pointe } byte[] data = new byte[length]; getUnsafe() - .copyMemory(block.getBaseObject(), block.getBaseOffset() + pointer + currentOffset, data, + .copyMemory(block.getBaseObject(), block.getBaseOffset() + currentOffset, data, BYTE_ARRAY_OFFSET, data.length); return data; } From 4df0ea269b4178a07219af06407d87be071e904b Mon Sep 17 00:00:00 2001 From: kumarvishal09 Date: Thu, 21 Mar 2019 22:20:56 +0530 Subject: [PATCH 9/9] fixed dm issue --- .../carbondata/core/datamap/DataMapStoreManager.java | 11 ++++++++++- .../core/indexstore/UnsafeMemoryDMStore.java | 4 ++-- .../core/indexstore/row/UnsafeDataMapRow.java | 2 +- .../apache/carbondata/hadoop/CarbonRecordReader.java | 2 +- 4 files changed, 14 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java index 085d98ac8ef..524d8b0411e 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java @@ -473,6 +473,15 @@ public void clearInvalidSegments(CarbonTable carbonTable, List segments * @param identifier Table identifier */ public void clearDataMaps(AbsoluteTableIdentifier identifier) { + clearDataMaps(identifier, true); + } + + /** + * Clear the datamap/datamaps of a table from memory + * + * @param identifier Table identifier + */ + public void clearDataMaps(AbsoluteTableIdentifier identifier, boolean launchJob) { CarbonTable carbonTable = getCarbonTable(identifier); String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName(); List tableIndices = allDataMaps.get(tableUniqueName); @@ -483,7 +492,7 @@ public void clearDataMaps(AbsoluteTableIdentifier identifier) { tableIndices = allDataMaps.get(tableUniqueName); } } - if (null != carbonTable && tableIndices != null) { + if (null != carbonTable && tableIndices != null && launchJob) { try { DataMapUtil.executeDataMapJobForClearingDataMaps(carbonTable); } catch (IOException e) { diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java index 161730d2975..8185c250ccd 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java @@ -140,7 +140,7 @@ public void addIndexRow(CarbonRowSchema[] schema, DataMapRow indexRow) throws Me // byte position of Last offset bytePosition += CarbonCommonConstants.INT_SIZE_IN_BYTE; // start byte position of variable length data - int varColPosition = pointer + bytePosition + CarbonCommonConstants.INT_SIZE_IN_BYTE; + int varColPosition = bytePosition + CarbonCommonConstants.INT_SIZE_IN_BYTE; // current position refers to current byte postion in memory block int currentPosition; for (int i = 0; i < schema.length; i++) { @@ -231,7 +231,7 @@ private int addToUnsafe(CarbonRowSchema schema, DataMapRow row, int index, int s memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(), varPosition); runningLength += 4; getUnsafe().copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(), - memoryBlock.getBaseOffset() + varPosition, data.length); + memoryBlock.getBaseOffset() + startOffset + varPosition, data.length); runningLength += data.length; varPosition += data.length; return varPosition; diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java index e16ff286902..5f6c4dc6d97 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java @@ -62,7 +62,7 @@ public UnsafeDataMapRow(CarbonRowSchema[] schemas, MemoryBlock block, int pointe } byte[] data = new byte[length]; getUnsafe() - .copyMemory(block.getBaseObject(), block.getBaseOffset() + currentOffset, data, + .copyMemory(block.getBaseObject(), block.getBaseOffset() + pointer + currentOffset, data, BYTE_ARRAY_OFFSET, data.length); return data; } diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java index d6ffb2edcef..1a529e3cdb4 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java @@ -162,7 +162,7 @@ public List getBatchValue() { if (!skipClearDataMapAtClose) { // Clear the datamap cache DataMapStoreManager.getInstance().clearDataMaps( - queryModel.getTable().getAbsoluteTableIdentifier()); + queryModel.getTable().getAbsoluteTableIdentifier(), false); } // close read support readSupport.close();