Skip to content

Commit

Permalink
[CARBONDATA-2532][Integration] Carbon to support spark 2.3 version, C…
Browse files Browse the repository at this point in the history
…olumnVector Interface

Column vector and Columnar Batch interface compatibility issues has been
addressed in this PR, The changes were related to below modifications
done in spark interface
a) This is a refactoring of ColumnVector hierarchy and related classes.
b) make ColumnVector read-only
c) introduce WritableColumnVector with write interface
d) remove ReadOnlyColumnVector

In this PR inorder to hide the compatibility issues of columnar vector
API's from the existing common classes, i introduced an interface of the
proxy vector readers, this
proxy vector readers will take care the compatibility issues with
respect to spark different versions.
  • Loading branch information
sujith71955 authored and gvramana committed Jul 10, 2018
1 parent 6fa8638 commit 2b8ae26
Show file tree
Hide file tree
Showing 12 changed files with 987 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.InitInputMetrics
import org.apache.carbondata.spark.util.{SparkDataTypeConverterImpl, Util}
import org.apache.carbondata.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
import org.apache.carbondata.streaming.CarbonStreamInputFormat

/**
* This RDD is used to perform query on CarbonData file. Before sending tasks to scan
Expand Down Expand Up @@ -418,13 +418,13 @@ class CarbonScanRDD[T: ClassTag](
// create record reader for row format
DataTypeUtil.setDataTypeConverter(dataTypeConverterClz.newInstance())
val inputFormat = new CarbonStreamInputFormat
val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
.asInstanceOf[CarbonStreamRecordReader]
streamReader.setVectorReader(vectorReader)
streamReader.setInputMetricsStats(inputMetricsStats)
inputFormat.setVectorReader(vectorReader)
inputFormat.setInputMetricsStats(inputMetricsStats)
model.setStatisticsRecorder(
CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId))
streamReader.setQueryModel(model)
inputFormat.setModel(model)
val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
.asInstanceOf[RecordReader[Void, Object]]
streamReader
case _ =>
// create record reader for CarbonData file format
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util
import java.util.{Date, UUID}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType}
import org.apache.hadoop.mapreduce.{Job, RecordReader, TaskAttemptID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext}
import org.apache.spark.sql.SparkSession
Expand All @@ -35,6 +35,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.scan.model.QueryModel
import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
Expand All @@ -48,7 +49,7 @@ import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, C
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.{HandoffResult, HandoffResultImpl}
import org.apache.carbondata.spark.util.{CommonUtil, SparkDataTypeConverterImpl}
import org.apache.carbondata.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
import org.apache.carbondata.streaming.CarbonStreamInputFormat


/**
Expand All @@ -74,7 +75,7 @@ class HandoffPartition(
* and we can extract it later
*/
class StreamingRawResultIterator(
recordReader: CarbonStreamRecordReader
recordReader: RecordReader[Void, Any]
) extends RawResultIterator(null, null, null, true) {

override def hasNext: Boolean = {
Expand Down Expand Up @@ -163,10 +164,10 @@ class StreamHandoffRDD[K, V](
val model = format.createQueryModel(inputSplit, attemptContext)
val inputFormat = new CarbonStreamInputFormat
val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
.asInstanceOf[CarbonStreamRecordReader]
streamReader.setVectorReader(false)
streamReader.setQueryModel(model)
streamReader.setUseRawRow(true)
.asInstanceOf[RecordReader[Void, Any]]
inputFormat.setVectorReader(false)
inputFormat.setModel(model)
inputFormat.setUseRawRow(true)
streamReader.initialize(inputSplit, attemptContext)
val iteratorList = new util.ArrayList[RawResultIterator](1)
iteratorList.add(new StreamingRawResultIterator(streamReader))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,173 +23,177 @@
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.spark.util.CarbonScalaUtil;

import org.apache.spark.sql.execution.vectorized.ColumnVector;
import org.apache.spark.sql.CarbonVectorProxy;
import org.apache.spark.sql.types.Decimal;

class ColumnarVectorWrapper implements CarbonColumnVector {

private ColumnVector columnVector;
private CarbonVectorProxy writableColumnVector;

private boolean[] filteredRows;

private int counter;

private int ordinal;

private boolean filteredRowsExist;

private DataType blockDataType;

ColumnarVectorWrapper(ColumnVector columnVector, boolean[] filteredRows) {
this.columnVector = columnVector;
ColumnarVectorWrapper(CarbonVectorProxy writableColumnVector,
boolean[] filteredRows, int ordinal) {
this.writableColumnVector = writableColumnVector;
this.filteredRows = filteredRows;
this.ordinal = ordinal;
}

@Override public void putBoolean(int rowId, boolean value) {
if (!filteredRows[rowId]) {
columnVector.putBoolean(counter++, value);
writableColumnVector.putBoolean(counter++, value, ordinal);
}
}

@Override public void putFloat(int rowId, float value) {
if (!filteredRows[rowId]) {
columnVector.putFloat(counter++, value);
writableColumnVector.putFloat(counter++, value,ordinal);
}
}

@Override public void putShort(int rowId, short value) {
if (!filteredRows[rowId]) {
columnVector.putShort(counter++, value);
writableColumnVector.putShort(counter++, value, ordinal);
}
}

@Override public void putShorts(int rowId, int count, short value) {
if (filteredRowsExist) {
for (int i = 0; i < count; i++) {
if (!filteredRows[rowId]) {
columnVector.putShort(counter++, value);
writableColumnVector.putShort(counter++, value, ordinal);
}
rowId++;
}
} else {
columnVector.putShorts(rowId, count, value);
writableColumnVector.putShorts(rowId, count, value, ordinal);
}
}

@Override public void putInt(int rowId, int value) {
if (!filteredRows[rowId]) {
columnVector.putInt(counter++, value);
writableColumnVector.putInt(counter++, value, ordinal);
}
}

@Override public void putInts(int rowId, int count, int value) {
if (filteredRowsExist) {
for (int i = 0; i < count; i++) {
if (!filteredRows[rowId]) {
columnVector.putInt(counter++, value);
writableColumnVector.putInt(counter++, value, ordinal);
}
rowId++;
}
} else {
columnVector.putInts(rowId, count, value);
writableColumnVector.putInts(rowId, count, value, ordinal);
}
}

@Override public void putLong(int rowId, long value) {
if (!filteredRows[rowId]) {
columnVector.putLong(counter++, value);
writableColumnVector.putLong(counter++, value, ordinal);
}
}

@Override public void putLongs(int rowId, int count, long value) {
if (filteredRowsExist) {
for (int i = 0; i < count; i++) {
if (!filteredRows[rowId]) {
columnVector.putLong(counter++, value);
writableColumnVector.putLong(counter++, value, ordinal);
}
rowId++;
}
} else {
columnVector.putLongs(rowId, count, value);
writableColumnVector.putLongs(rowId, count, value, ordinal);
}
}

@Override public void putDecimal(int rowId, BigDecimal value, int precision) {
if (!filteredRows[rowId]) {
Decimal toDecimal = Decimal.apply(value);
columnVector.putDecimal(counter++, toDecimal, precision);
writableColumnVector.putDecimal(counter++, toDecimal, precision, ordinal);
}
}

@Override public void putDecimals(int rowId, int count, BigDecimal value, int precision) {
Decimal decimal = Decimal.apply(value);
for (int i = 0; i < count; i++) {
if (!filteredRows[rowId]) {
columnVector.putDecimal(counter++, decimal, precision);
writableColumnVector.putDecimal(counter++, decimal, precision, ordinal);
}
rowId++;
}
}

@Override public void putDouble(int rowId, double value) {
if (!filteredRows[rowId]) {
columnVector.putDouble(counter++, value);
writableColumnVector.putDouble(counter++, value, ordinal);
}
}

@Override public void putDoubles(int rowId, int count, double value) {
if (filteredRowsExist) {
for (int i = 0; i < count; i++) {
if (!filteredRows[rowId]) {
columnVector.putDouble(counter++, value);
writableColumnVector.putDouble(counter++, value, ordinal);
}
rowId++;
}
} else {
columnVector.putDoubles(rowId, count, value);
writableColumnVector.putDoubles(rowId, count, value, ordinal);
}
}

@Override public void putBytes(int rowId, byte[] value) {
if (!filteredRows[rowId]) {
columnVector.putByteArray(counter++, value);
writableColumnVector.putByteArray(counter++, value, ordinal);
}
}

@Override public void putBytes(int rowId, int count, byte[] value) {
for (int i = 0; i < count; i++) {
if (!filteredRows[rowId]) {
columnVector.putByteArray(counter++, value);
writableColumnVector.putByteArray(counter++, value, ordinal);
}
rowId++;
}
}

@Override public void putBytes(int rowId, int offset, int length, byte[] value) {
if (!filteredRows[rowId]) {
columnVector.putByteArray(counter++, value, offset, length);
writableColumnVector.putByteArray(counter++, value, offset, length, ordinal);
}
}

@Override public void putNull(int rowId) {
if (!filteredRows[rowId]) {
columnVector.putNull(counter++);
writableColumnVector.putNull(counter++, ordinal);
}
}

@Override public void putNulls(int rowId, int count) {
if (filteredRowsExist) {
for (int i = 0; i < count; i++) {
if (!filteredRows[rowId]) {
columnVector.putNull(counter++);
writableColumnVector.putNull(counter++, ordinal);
}
rowId++;
}
} else {
columnVector.putNulls(rowId, count);
writableColumnVector.putNulls(rowId, count,ordinal);
}
}

@Override public boolean isNull(int rowId) {
return columnVector.isNullAt(rowId);
return writableColumnVector.isNullAt(rowId,ordinal);
}

@Override public void putObject(int rowId, Object obj) {
Expand All @@ -207,7 +211,7 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
}

@Override public DataType getType() {
return CarbonScalaUtil.convertSparkToCarbonDataType(columnVector.dataType());
return CarbonScalaUtil.convertSparkToCarbonDataType(writableColumnVector.dataType(ordinal));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,9 @@
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
import org.apache.spark.sql.CarbonVectorProxy;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
* A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the
Expand All @@ -68,9 +67,11 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {

private int batchIdx = 0;

private static final int DEFAULT_BATCH_SIZE = 4 * 1024;

private int numBatched = 0;

private ColumnarBatch columnarBatch;
private CarbonVectorProxy vectorProxy;

private CarbonColumnarBatch carbonColumnarBatch;

Expand Down Expand Up @@ -154,9 +155,9 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
@Override
public void close() throws IOException {
logStatistics(rowCount, queryModel.getStatisticsRecorder());
if (columnarBatch != null) {
columnarBatch.close();
columnarBatch = null;
if (vectorProxy != null) {
vectorProxy.close();
vectorProxy = null;
}
// clear dictionary cache
Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping();
Expand Down Expand Up @@ -190,15 +191,15 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
@Override
public Object getCurrentValue() throws IOException, InterruptedException {
if (returnColumnarBatch) {
int value = columnarBatch.numValidRows();
int value = vectorProxy.numRows();
rowCount += value;
if (inputMetricsStats != null) {
inputMetricsStats.incrementRecordRead((long) value);
}
return columnarBatch;
return vectorProxy.getColumnarBatch();
}
rowCount += 1;
return columnarBatch.getRow(batchIdx - 1);
return vectorProxy.getRow(batchIdx - 1);
}

@Override
Expand Down Expand Up @@ -260,22 +261,21 @@ private void initBatch(MemoryMode memMode) {
CarbonScalaUtil.convertCarbonToSparkDataType(DataTypes.DOUBLE), true, null);
}
}

columnarBatch = ColumnarBatch.allocate(new StructType(fields), memMode);
vectorProxy = new CarbonVectorProxy(MemoryMode.OFF_HEAP,DEFAULT_BATCH_SIZE,fields);
CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
boolean[] filteredRows = new boolean[columnarBatch.capacity()];
boolean[] filteredRows = new boolean[vectorProxy.numRows()];
for (int i = 0; i < fields.length; i++) {
vectors[i] = new ColumnarVectorWrapper(columnarBatch.column(i), filteredRows);
vectors[i] = new ColumnarVectorWrapper(vectorProxy, filteredRows, i);
}
carbonColumnarBatch = new CarbonColumnarBatch(vectors, columnarBatch.capacity(), filteredRows);
carbonColumnarBatch = new CarbonColumnarBatch(vectors, vectorProxy.numRows(), filteredRows);
}

private void initBatch() {
initBatch(DEFAULT_MEMORY_MODE);
}

private void resultBatch() {
if (columnarBatch == null) initBatch();
if (vectorProxy == null) initBatch();
}


Expand All @@ -284,12 +284,12 @@ private void resultBatch() {
* Advances to the next batch of rows. Returns false if there are no more.
*/
private boolean nextBatch() {
columnarBatch.reset();
vectorProxy.reset();
carbonColumnarBatch.reset();
if (iterator.hasNext()) {
iterator.processNextBatch(carbonColumnarBatch);
int actualSize = carbonColumnarBatch.getActualSize();
columnarBatch.setNumRows(actualSize);
vectorProxy.setNumRows(actualSize);
numBatched = actualSize;
batchIdx = 0;
return true;
Expand Down
Loading

0 comments on commit 2b8ae26

Please sign in to comment.