Skip to content
Permalink
Browse files
[CARBONDATA-4247][CARBONDATA-4241] Fix Wrong timestamp value query re…
…sults for data before

1900 years with Spark 3.1

Why is this PR needed?
1. Spark 3.1, will store timestamp value as julian micros and rebase timestamp value from
JulianToGregorianMicros during query.
-> Since carbon parse and formats timestamp value with SimpleDateFormatter, query gives
incorrect results, when rebased with JulianToGregorianMicros by spark.
2. CARBONDATA-4241 -> Global sort load and compaction fails on table having timestamp column

What changes were proposed in this PR?
1. Use Java Instant to parse new timestamp values. For old stores and query with Spark 3.1,
Rebase the timestamp value from Julian to Gregorian Micros
2. If timestamp value is of type Instant, then convert value to java timestamp.

Does this PR introduce any user interface change?
No

Is any new testcase added?
No (Existing testcase is sufficient)

This closes #4177
  • Loading branch information
Indhumathi27 authored and akashrn5 committed Jul 29, 2021
1 parent f2698fe commit feb052173b5c834dfa3632b01df275ed061aea1d
Showing 34 changed files with 392 additions and 41 deletions.
@@ -122,6 +122,16 @@ private CarbonCommonConstants() {
*/
public static final String CARBON_TIMESTAMP_MILLIS = "dd-MM-yyyy HH:mm:ss:SSS";

/**
* CARBON Default format - time segment
*/
public static final String CARBON_TIME_SEGMENT_DEFAULT_FORMAT = " HH:mm:ss";

/**
* CARBON Default data - time segment
*/
public static final String CARBON_TIME_SEGMENT_DATA_DEFAULT_FORMAT = " 00:00:00";

/**
* Property for specifying the format of DATE data type column.
* e.g. yyyy/MM/dd , or using default value
@@ -2648,4 +2658,16 @@ private CarbonCommonConstants() {

public static final String CARBON_SDK_EMPTY_METADATA_PATH = "emptyMetadataFolder";

/**
* Property to identify if the spark version is above 3.x version
*/
public static final String CARBON_SPARK_VERSION_SPARK3 = "carbon.spark.version.spark3";

public static final String CARBON_SPARK_VERSION_SPARK3_DEFAULT = "false";

/**
* Carbon Spark 3.x supported data file written version
*/
public static final String CARBON_SPARK3_VERSION = "2.2.0";

}
@@ -17,6 +17,7 @@

package org.apache.carbondata.core.datastore.block;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

@@ -51,6 +52,12 @@ public abstract class AbstractIndex implements Cacheable {
*/
private long deleteDeltaTimestamp;

public List<TableBlockInfo> getBlockInfos() {
return blockInfos;
}

protected List<TableBlockInfo> blockInfos;

/**
* map of blockletIdAndPageId to deleted rows
*/
@@ -87,6 +87,11 @@ public class TableBlockInfo implements Distributable, Serializable {

private transient DataFileFooter dataFileFooter;

/**
* Carbon Data file written version
*/
private String carbonDataFileWrittenVersion = null;

/**
* comparator to sort by block size in descending order.
* Since each line is not exactly the same, the size of a InputSplit may differs,
@@ -132,6 +137,7 @@ public TableBlockInfo copy() {
info.deletedDeltaFilePath = deletedDeltaFilePath;
info.detailInfo = detailInfo.copy();
info.indexWriterPath = indexWriterPath;
info.carbonDataFileWrittenVersion = carbonDataFileWrittenVersion;
return info;
}

@@ -353,4 +359,13 @@ public String toString() {
sb.append('}');
return sb.toString();
}

public String getCarbonDataFileWrittenVersion() {
return carbonDataFileWrittenVersion;
}

public void setCarbonDataFileWrittenVersion(String carbonDataFileWrittenVersion) {
this.carbonDataFileWrittenVersion = carbonDataFileWrittenVersion;
}

}
@@ -256,6 +256,7 @@ private ColumnPage decodeDimensionByMeta(DataChunk2 pageMetadata, ByteBuffer pag
if (vectorInfo != null) {
// set encodings of current page in the vectorInfo, used for decoding the complex child page
vectorInfo.encodings = encodings;
vectorInfo.vector.setCarbonDataFileWrittenVersion(vectorInfo.carbonDataFileWrittenVersion);
decoder
.decodeAndFillVector(pageData.array(), offset, pageMetadata.data_page_length, vectorInfo,
nullBitSet, isLocalDictEncodedPage, pageMetadata.numberOfRowsInpage,
@@ -245,6 +245,7 @@ protected ColumnPage decodeMeasure(DataChunk2 pageMetadata, ByteBuffer pageData,
ColumnPageDecoder codec =
encodingFactory.createDecoder(encodings, encoderMetas, compressorName, vectorInfo != null);
if (vectorInfo != null) {
vectorInfo.vector.setCarbonDataFileWrittenVersion(vectorInfo.carbonDataFileWrittenVersion);
codec.decodeAndFillVector(pageData.array(), offset, pageMetadata.data_page_length, vectorInfo,
nullBitSet, false, pageMetadata.numberOfRowsInpage, reusableDataBuffer);
return null;
@@ -31,8 +31,6 @@
*/
public class IndexWrapper extends AbstractIndex {

private List<TableBlockInfo> blockInfos;

public IndexWrapper(List<TableBlockInfo> blockInfos, SegmentProperties segmentProperties) {
this.blockInfos = blockInfos;
this.segmentProperties = segmentProperties;
@@ -75,6 +75,11 @@ public class DataFileFooter implements Serializable {
*/
private Boolean isSorted = true;

/**
* carbon data file written version
*/
private String carbonDataFileWrittenVersion = null;

/**
* @return the versionId
*/
@@ -174,4 +179,12 @@ public void setSorted(Boolean isSorted) {
public Boolean isSorted() {
return isSorted;
}

public String getCarbonDataFileWrittenVersion() {
return carbonDataFileWrittenVersion;
}

public void setCarbonDataFileWrittenVersion(String carbonDataFileWrittenVersion) {
this.carbonDataFileWrittenVersion = carbonDataFileWrittenVersion;
}
}
@@ -259,6 +259,8 @@ private void fillColumnVectorDetails(CarbonColumnarBatch columnarBatch,
BitSet deltaBitSet) {
for (int i = 0; i < allColumnInfo.length; i++) {
allColumnInfo[i].vectorOffset = columnarBatch.getRowCounter();
allColumnInfo[i].carbonDataFileWrittenVersion =
columnarBatch.getCarbonDataFileWrittenVersion();
allColumnInfo[i].vector = columnarBatch.columnVectors[i];
allColumnInfo[i].deletedRows = deltaBitSet;
if (null != allColumnInfo[i].dimension) {
@@ -224,6 +224,7 @@ private List<AbstractIndex> getDataBlocks(QueryModel queryModel) throws IOExcept
if (null == blockletDetailInfo) {
blockletDetailInfo = QueryUtil.getBlockletDetailInfo(fileFooter, blockInfo);
}
blockInfo.setCarbonDataFileWrittenVersion(fileFooter.getCarbonDataFileWrittenVersion());
blockInfo.setDetailInfo(blockletDetailInfo);
}
if (null == segmentProperties) {
@@ -244,6 +244,9 @@ public RawBlockletColumnChunks call() throws Exception {
}

public void processNextBatch(CarbonColumnarBatch columnarBatch) {
columnarBatch.setCarbonDataFileWrittenVersion(
this.blockExecutionInfo.getDataBlock().getDataRefNode().getTableBlockInfo()
.getCarbonDataFileWrittenVersion());
if (updateScanner()) {
this.scannerResultAggregator.collectResultInColumnarBatch(scannedResult, columnarBatch);
}
@@ -38,6 +38,11 @@ public class RowBatch extends CarbonIterator<Object[]> {
*/
protected int counter;

/**
* Carbon data file written version
*/
private String carbonDataFileWrittenVersion = null;

public RowBatch() {
this.rows = new ArrayList<>();
}
@@ -115,4 +120,12 @@ public List<Object[]> nextBatch() {
counter = counter + rows.size();
return rows;
}

public String getCarbonDataFileWrittenVersion() {
return carbonDataFileWrittenVersion;
}

public void setCarbonDataFileWrittenVersion(String carbonDataFileWrittenVersion) {
this.carbonDataFileWrittenVersion = carbonDataFileWrittenVersion;
}
}
@@ -87,6 +87,8 @@
*/
private QueryStatisticsModel queryStatisticsModel;

String carbonDataFileWrittenVersion;

AbstractDetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel,
ExecutorService execService) {
batchSize = CarbonProperties.getQueryBatchSize();
@@ -222,6 +224,8 @@ private DataBlockIterator getDataBlockIterator() {
}
if (blockExecutionInfos.size() > 0) {
BlockExecutionInfo executionInfo = blockExecutionInfos.get(0);
carbonDataFileWrittenVersion = executionInfo.getDataBlock().getDataRefNode()
.getTableBlockInfo().getCarbonDataFileWrittenVersion();
blockExecutionInfos.remove(executionInfo);
return new DataBlockIterator(executionInfo, fileReader, batchSize, queryStatisticsModel,
execService);
@@ -69,6 +69,10 @@ public Object[] next() {
return currentChunk.next();
}

public String getCarbonDataFileWrittenVersion() {
return currentChunk.getCarbonDataFileWrittenVersion();
}

/**
* read next batch
*
@@ -56,6 +56,7 @@ private RowBatch getBatchResult() {
updateDataBlockIterator();
if (dataBlockIterator != null) {
rowBatch.setRows(dataBlockIterator.next());
rowBatch.setCarbonDataFileWrittenVersion(carbonDataFileWrittenVersion);
}
}
return rowBatch;
@@ -116,6 +116,8 @@ public interface CarbonColumnVector {

void setLazyPage(LazyPageLoader lazyPage);

void setCarbonDataFileWrittenVersion(String carbonDataFileWrittenVersion);

// Added default implementation for interface,
// to avoid implementing presto required functions for spark or core module.
default List<CarbonColumnVector> getChildrenVector() {
@@ -33,6 +33,11 @@ public class CarbonColumnarBatch {

private int rowsFiltered;

/**
* Carbon Data file written version
*/
private String carbonDataFileWrittenVersion = null;

public CarbonColumnarBatch(CarbonColumnVector[] columnVectors, int batchSize,
boolean[] filteredRows) {
this.columnVectors = columnVectors;
@@ -92,4 +97,12 @@ public void markFiltered(int rowId) {
}
}
}

public String getCarbonDataFileWrittenVersion() {
return carbonDataFileWrittenVersion;
}

public void setCarbonDataFileWrittenVersion(String carbonDataFileWrittenVersion) {
this.carbonDataFileWrittenVersion = carbonDataFileWrittenVersion;
}
}
@@ -35,6 +35,7 @@ public class ColumnVectorInfo implements Comparable<ColumnVectorInfo> {
public int size;
public CarbonColumnVector vector;
public int vectorOffset;
public String carbonDataFileWrittenVersion;
public ProjectionDimension dimension;
public ProjectionMeasure measure;
public int ordinal;
@@ -500,6 +500,11 @@ public void putArray(int rowId, int offset, int length) {
lengths[rowId] = length;
}

@Override
public void setCarbonDataFileWrittenVersion(String carbonDataFileWrittenVersion) {
// do nothing here
}

@Override
public void putAllByteArray(byte[] data, int offset, int length) {
byteArr = data;
@@ -246,4 +246,9 @@ public void putArray(int rowId, int offset, int length) {
public CarbonColumnVector getColumnVector() {
return this.columnVector;
}

@Override
public void setCarbonDataFileWrittenVersion(String carbonDataFileWrittenVersion) {
this.columnVector.setCarbonDataFileWrittenVersion(carbonDataFileWrittenVersion);
}
}
@@ -104,6 +104,11 @@ public void putNotNull(int rowId) {
// nothing to do
}

@Override
public void setCarbonDataFileWrittenVersion(String carbonDataFileWrittenVersion) {
this.columnVector.setCarbonDataFileWrittenVersion(carbonDataFileWrittenVersion);
}

@Override
public void putFloats(int rowId, int count, float[] src, int srcIndex) {
for (int i = srcIndex; i < count; i++) {
@@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.List;

import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
@@ -73,6 +74,10 @@ public DataFileFooter convertDataFileFooter(FileHeader fileHeader, FileFooter3 f
DataFileFooter dataFileFooter = new DataFileFooter();
dataFileFooter.setVersionId(ColumnarFormatVersion.valueOf((short) fileHeader.getVersion()));
dataFileFooter.setNumberOfRows(footer.getNum_rows());
if (null != footer.getExtra_info()) {
dataFileFooter.setCarbonDataFileWrittenVersion(
footer.getExtra_info().get(CarbonCommonConstants.CARBON_WRITTEN_VERSION));
}
dataFileFooter.setSchemaUpdatedTimeStamp(fileHeader.getTime_stamp());
if (footer.isSetIs_sort()) {
dataFileFooter.setSorted(footer.isIs_sort());

0 comments on commit feb0521

Please sign in to comment.