Skip to content

Commit

Permalink
Revert "[CARBONDATA-2532][Integration] Carbon to support spark 2.3 ve…
Browse files Browse the repository at this point in the history
…rsion, ColumnVector Interface"

This reverts commit 2b8ae26.
  • Loading branch information
jackylk committed Jul 18, 2018
1 parent 0aab4e7 commit 96fe233
Show file tree
Hide file tree
Showing 12 changed files with 177 additions and 987 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.InitInputMetrics
import org.apache.carbondata.spark.format.{CsvReadSupport, VectorCsvReadSupport}
import org.apache.carbondata.spark.util.{SparkDataTypeConverterImpl, Util}
import org.apache.carbondata.streaming.CarbonStreamInputFormat
import org.apache.carbondata.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}

/**
* This RDD is used to perform query on CarbonData file. Before sending tasks to scan
Expand Down Expand Up @@ -431,13 +431,13 @@ class CarbonScanRDD[T: ClassTag](
// create record reader for row format
DataTypeUtil.setDataTypeConverter(dataTypeConverterClz.newInstance())
val inputFormat = new CarbonStreamInputFormat
inputFormat.setVectorReader(vectorReader)
inputFormat.setInputMetricsStats(inputMetricsStats)
val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
.asInstanceOf[CarbonStreamRecordReader]
streamReader.setVectorReader(vectorReader)
streamReader.setInputMetricsStats(inputMetricsStats)
model.setStatisticsRecorder(
CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId))
inputFormat.setModel(model)
val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
.asInstanceOf[RecordReader[Void, Object]]
streamReader.setQueryModel(model)
streamReader
case FileFormat.EXTERNAL =>
require(storageFormat.equals("csv"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util
import java.util.{Date, UUID}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{Job, RecordReader, TaskAttemptID, TaskType}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext}
import org.apache.spark.sql.SparkSession
Expand All @@ -35,7 +35,6 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.scan.model.QueryModel
import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
Expand All @@ -49,7 +48,7 @@ import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, C
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.{HandoffResult, HandoffResultImpl}
import org.apache.carbondata.spark.util.{CommonUtil, SparkDataTypeConverterImpl}
import org.apache.carbondata.streaming.CarbonStreamInputFormat
import org.apache.carbondata.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}


/**
Expand All @@ -75,7 +74,7 @@ class HandoffPartition(
* and we can extract it later
*/
class StreamingRawResultIterator(
recordReader: RecordReader[Void, Any]
recordReader: CarbonStreamRecordReader
) extends RawResultIterator(null, null, null, true) {

override def hasNext: Boolean = {
Expand Down Expand Up @@ -164,10 +163,10 @@ class StreamHandoffRDD[K, V](
val model = format.createQueryModel(inputSplit, attemptContext)
val inputFormat = new CarbonStreamInputFormat
val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
.asInstanceOf[RecordReader[Void, Any]]
inputFormat.setVectorReader(false)
inputFormat.setModel(model)
inputFormat.setUseRawRow(true)
.asInstanceOf[CarbonStreamRecordReader]
streamReader.setVectorReader(false)
streamReader.setQueryModel(model)
streamReader.setUseRawRow(true)
streamReader.initialize(inputSplit, attemptContext)
val iteratorList = new util.ArrayList[RawResultIterator](1)
iteratorList.add(new StreamingRawResultIterator(streamReader))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,177 +23,173 @@
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.spark.util.CarbonScalaUtil;

import org.apache.spark.sql.CarbonVectorProxy;
import org.apache.spark.sql.execution.vectorized.ColumnVector;
import org.apache.spark.sql.types.Decimal;

class ColumnarVectorWrapper implements CarbonColumnVector {

private CarbonVectorProxy writableColumnVector;
private ColumnVector columnVector;

private boolean[] filteredRows;

private int counter;

private int ordinal;

private boolean filteredRowsExist;

private DataType blockDataType;

ColumnarVectorWrapper(CarbonVectorProxy writableColumnVector,
boolean[] filteredRows, int ordinal) {
this.writableColumnVector = writableColumnVector;
ColumnarVectorWrapper(ColumnVector columnVector, boolean[] filteredRows) {
this.columnVector = columnVector;
this.filteredRows = filteredRows;
this.ordinal = ordinal;
}

@Override public void putBoolean(int rowId, boolean value) {
if (!filteredRows[rowId]) {
writableColumnVector.putBoolean(counter++, value, ordinal);
columnVector.putBoolean(counter++, value);
}
}

@Override public void putFloat(int rowId, float value) {
if (!filteredRows[rowId]) {
writableColumnVector.putFloat(counter++, value,ordinal);
columnVector.putFloat(counter++, value);
}
}

@Override public void putShort(int rowId, short value) {
if (!filteredRows[rowId]) {
writableColumnVector.putShort(counter++, value, ordinal);
columnVector.putShort(counter++, value);
}
}

@Override public void putShorts(int rowId, int count, short value) {
if (filteredRowsExist) {
for (int i = 0; i < count; i++) {
if (!filteredRows[rowId]) {
writableColumnVector.putShort(counter++, value, ordinal);
columnVector.putShort(counter++, value);
}
rowId++;
}
} else {
writableColumnVector.putShorts(rowId, count, value, ordinal);
columnVector.putShorts(rowId, count, value);
}
}

@Override public void putInt(int rowId, int value) {
if (!filteredRows[rowId]) {
writableColumnVector.putInt(counter++, value, ordinal);
columnVector.putInt(counter++, value);
}
}

@Override public void putInts(int rowId, int count, int value) {
if (filteredRowsExist) {
for (int i = 0; i < count; i++) {
if (!filteredRows[rowId]) {
writableColumnVector.putInt(counter++, value, ordinal);
columnVector.putInt(counter++, value);
}
rowId++;
}
} else {
writableColumnVector.putInts(rowId, count, value, ordinal);
columnVector.putInts(rowId, count, value);
}
}

@Override public void putLong(int rowId, long value) {
if (!filteredRows[rowId]) {
writableColumnVector.putLong(counter++, value, ordinal);
columnVector.putLong(counter++, value);
}
}

@Override public void putLongs(int rowId, int count, long value) {
if (filteredRowsExist) {
for (int i = 0; i < count; i++) {
if (!filteredRows[rowId]) {
writableColumnVector.putLong(counter++, value, ordinal);
columnVector.putLong(counter++, value);
}
rowId++;
}
} else {
writableColumnVector.putLongs(rowId, count, value, ordinal);
columnVector.putLongs(rowId, count, value);
}
}

@Override public void putDecimal(int rowId, BigDecimal value, int precision) {
if (!filteredRows[rowId]) {
Decimal toDecimal = Decimal.apply(value);
writableColumnVector.putDecimal(counter++, toDecimal, precision, ordinal);
columnVector.putDecimal(counter++, toDecimal, precision);
}
}

@Override public void putDecimals(int rowId, int count, BigDecimal value, int precision) {
Decimal decimal = Decimal.apply(value);
for (int i = 0; i < count; i++) {
if (!filteredRows[rowId]) {
writableColumnVector.putDecimal(counter++, decimal, precision, ordinal);
columnVector.putDecimal(counter++, decimal, precision);
}
rowId++;
}
}

@Override public void putDouble(int rowId, double value) {
if (!filteredRows[rowId]) {
writableColumnVector.putDouble(counter++, value, ordinal);
columnVector.putDouble(counter++, value);
}
}

@Override public void putDoubles(int rowId, int count, double value) {
if (filteredRowsExist) {
for (int i = 0; i < count; i++) {
if (!filteredRows[rowId]) {
writableColumnVector.putDouble(counter++, value, ordinal);
columnVector.putDouble(counter++, value);
}
rowId++;
}
} else {
writableColumnVector.putDoubles(rowId, count, value, ordinal);
columnVector.putDoubles(rowId, count, value);
}
}

@Override public void putBytes(int rowId, byte[] value) {
if (!filteredRows[rowId]) {
writableColumnVector.putByteArray(counter++, value, ordinal);
columnVector.putByteArray(counter++, value);
}
}

@Override public void putBytes(int rowId, int count, byte[] value) {
for (int i = 0; i < count; i++) {
if (!filteredRows[rowId]) {
writableColumnVector.putByteArray(counter++, value, ordinal);
columnVector.putByteArray(counter++, value);
}
rowId++;
}
}

@Override public void putBytes(int rowId, int offset, int length, byte[] value) {
if (!filteredRows[rowId]) {
writableColumnVector.putByteArray(counter++, value, offset, length, ordinal);
columnVector.putByteArray(counter++, value, offset, length);
}
}

@Override public void putNull(int rowId) {
if (!filteredRows[rowId]) {
writableColumnVector.putNull(counter++, ordinal);
columnVector.putNull(counter++);
}
}

@Override public void putNulls(int rowId, int count) {
if (filteredRowsExist) {
for (int i = 0; i < count; i++) {
if (!filteredRows[rowId]) {
writableColumnVector.putNull(counter++, ordinal);
columnVector.putNull(counter++);
}
rowId++;
}
} else {
writableColumnVector.putNulls(rowId, count,ordinal);
columnVector.putNulls(rowId, count);
}
}

@Override public boolean isNull(int rowId) {
return writableColumnVector.isNullAt(rowId,ordinal);
return columnVector.isNullAt(rowId);
}

@Override public void putObject(int rowId, Object obj) {
Expand All @@ -211,7 +207,7 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
}

@Override public DataType getType() {
return CarbonScalaUtil.convertSparkToCarbonDataType(writableColumnVector.dataType(ordinal));
return CarbonScalaUtil.convertSparkToCarbonDataType(columnVector.dataType());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.CarbonVectorProxy;
import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
* A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the
Expand All @@ -67,11 +68,9 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {

private int batchIdx = 0;

private static final int DEFAULT_BATCH_SIZE = 4 * 1024;

private int numBatched = 0;

private CarbonVectorProxy vectorProxy;
private ColumnarBatch columnarBatch;

private CarbonColumnarBatch carbonColumnarBatch;

Expand Down Expand Up @@ -155,9 +154,9 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
@Override
public void close() throws IOException {
logStatistics(rowCount, queryModel.getStatisticsRecorder());
if (vectorProxy != null) {
vectorProxy.close();
vectorProxy = null;
if (columnarBatch != null) {
columnarBatch.close();
columnarBatch = null;
}
// clear dictionary cache
Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping();
Expand Down Expand Up @@ -191,15 +190,15 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
@Override
public Object getCurrentValue() throws IOException, InterruptedException {
if (returnColumnarBatch) {
int value = vectorProxy.numRows();
int value = columnarBatch.numValidRows();
rowCount += value;
if (inputMetricsStats != null) {
inputMetricsStats.incrementRecordRead((long) value);
}
return vectorProxy.getColumnarBatch();
return columnarBatch;
}
rowCount += 1;
return vectorProxy.getRow(batchIdx - 1);
return columnarBatch.getRow(batchIdx - 1);
}

@Override
Expand Down Expand Up @@ -261,21 +260,22 @@ private void initBatch(MemoryMode memMode) {
CarbonScalaUtil.convertCarbonToSparkDataType(DataTypes.DOUBLE), true, null);
}
}
vectorProxy = new CarbonVectorProxy(MemoryMode.OFF_HEAP,DEFAULT_BATCH_SIZE,fields);

columnarBatch = ColumnarBatch.allocate(new StructType(fields), memMode);
CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
boolean[] filteredRows = new boolean[vectorProxy.numRows()];
boolean[] filteredRows = new boolean[columnarBatch.capacity()];
for (int i = 0; i < fields.length; i++) {
vectors[i] = new ColumnarVectorWrapper(vectorProxy, filteredRows, i);
vectors[i] = new ColumnarVectorWrapper(columnarBatch.column(i), filteredRows);
}
carbonColumnarBatch = new CarbonColumnarBatch(vectors, vectorProxy.numRows(), filteredRows);
carbonColumnarBatch = new CarbonColumnarBatch(vectors, columnarBatch.capacity(), filteredRows);
}

private void initBatch() {
initBatch(DEFAULT_MEMORY_MODE);
}

private void resultBatch() {
if (vectorProxy == null) initBatch();
if (columnarBatch == null) initBatch();
}


Expand All @@ -284,12 +284,12 @@ private void resultBatch() {
* Advances to the next batch of rows. Returns false if there are no more.
*/
private boolean nextBatch() {
vectorProxy.reset();
columnarBatch.reset();
carbonColumnarBatch.reset();
if (iterator.hasNext()) {
iterator.processNextBatch(carbonColumnarBatch);
int actualSize = carbonColumnarBatch.getActualSize();
vectorProxy.setNumRows(actualSize);
columnarBatch.setNumRows(actualSize);
numBatched = actualSize;
batchIdx = 0;
return true;
Expand Down

0 comments on commit 96fe233

Please sign in to comment.