Skip to content

Commit

Permalink
change to carbon
Browse files Browse the repository at this point in the history
  • Loading branch information
ravipesala committed Aug 23, 2018
1 parent 9eaef32 commit 41b2aed
Show file tree
Hide file tree
Showing 13 changed files with 296 additions and 242 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,22 @@

package org.apache.carbondata.spark.util;

import java.util.ArrayList;
import java.util.List;

import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.hadoop.CarbonInputSplit;

import org.apache.spark.SparkConf;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.CarbonMetastoreTypes;
import org.apache.spark.util.SparkTypeConverter;
import org.apache.spark.util.Utils;

public class Util {
Expand All @@ -46,4 +57,66 @@ public static boolean isBlockWithoutBlockletInfoExists(List<CarbonInputSplit> sp
}
return false;
}

private static org.apache.spark.sql.types.DataType convertCarbonToSparkDataType(
DataType carbonDataType) {
if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.STRING) {
return DataTypes.StringType;
} else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT) {
return DataTypes.ShortType;
} else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT) {
return DataTypes.IntegerType;
} else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) {
return DataTypes.LongType;
} else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE) {
return DataTypes.DoubleType;
} else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN) {
return DataTypes.BooleanType;
} else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(carbonDataType)) {
return DataTypes.createDecimalType();
} else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.TIMESTAMP) {
return DataTypes.TimestampType;
} else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DATE) {
return DataTypes.DateType;
} else {
return null;
}
}

public static StructType convertToSparkSchema(CarbonTable table, ColumnSchema[] carbonColumns) {
List<StructField> fields = new ArrayList<>(carbonColumns.length);
for (int i = 0; i < carbonColumns.length; i++) {
ColumnSchema carbonColumn = carbonColumns[i];
DataType dataType = carbonColumn.getDataType();
if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(dataType)) {
fields.add(new StructField(carbonColumn.getColumnName(),
new DecimalType(carbonColumn.getPrecision(), carbonColumn.getScale()),
true, Metadata.empty()));
} else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isStructType(dataType)) {
fields.add(
new StructField(
carbonColumn.getColumnName(),
CarbonMetastoreTypes.toDataType(
String.format("struct<%s>",
SparkTypeConverter.getStructChildren(table, carbonColumn.getColumnName()))),
true,
Metadata.empty()));
} else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isArrayType(dataType)) {
fields.add(
new StructField(
carbonColumn.getColumnName(),
CarbonMetastoreTypes.toDataType(
String.format("array<%s>",
SparkTypeConverter.getArrayChildren(
table,
carbonColumn.getColumnName()))),
true,
Metadata.empty()));
} else {
fields.add(new StructField(carbonColumn.getColumnName(),
convertCarbonToSparkDataType(carbonColumn.getDataType()), true, Metadata.empty()));
}
}
return new StructType(fields.toArray(new StructField[0]));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,11 @@

package org.apache.spark.util

import java.util
import java.util.{ArrayList, List}

import scala.util.parsing.combinator.RegexParsers

import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CarbonException

import org.apache.carbondata.core.metadata.datatype
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.spark.util.CarbonScalaUtil

object CarbonMetastoreTypes extends RegexParsers {
protected lazy val primitiveType: Parser[DataType] =
"string" ^^^ StringType |
Expand Down Expand Up @@ -109,43 +101,4 @@ object CarbonMetastoreTypes extends RegexParsers {
case DateType => "date"
}
}

def convertToSparkSchema(table: CarbonTable, carbonColumns: Array[ColumnSchema]): StructType = {
val fields: util.List[StructField] = new util.ArrayList[StructField](carbonColumns.length)
var i: Int = 0
while ( { i < carbonColumns.length }) {
val carbonColumn: ColumnSchema = carbonColumns(i)
val dataType: datatype.DataType = carbonColumn.getDataType
if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(dataType)) fields
.add(new StructField(carbonColumn.getColumnName,
new DecimalType(carbonColumn.getPrecision, carbonColumn.getScale),
true,
Metadata.empty))
else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isStructType(dataType)) fields
.add(new StructField(carbonColumn.getColumnName,
CarbonMetastoreTypes
.toDataType(String
.format("struct<%s>",
SparkTypeConverter.getStructChildren(table, carbonColumn.getColumnName))),
true,
Metadata.empty))
else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isArrayType(dataType)) fields
.add(new StructField(carbonColumn.getColumnName,
CarbonMetastoreTypes
.toDataType(String
.format("array<%s>",
SparkTypeConverter.getArrayChildren(table, carbonColumn.getColumnName))),
true,
Metadata.empty))
else fields
.add(new StructField(carbonColumn.getColumnName,
CarbonScalaUtil.convertCarbonToSparkDataType (carbonColumn.getDataType),
true,
Metadata.empty))

{ i += 1; i - 1 }
}
new StructType(fields.toArray(new Array[StructField](0)))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class CarbonDictionaryWrapper extends Dictionary {

private Binary[] binaries;

public CarbonDictionaryWrapper(Encoding encoding, CarbonDictionary dictionary) {
CarbonDictionaryWrapper(Encoding encoding, CarbonDictionary dictionary) {
super(encoding);
binaries = new Binary[dictionary.getDictionarySize()];
for (int i = 0; i < binaries.length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ import org.apache.carbondata.core.scan.expression.logical.AndExpression
import org.apache.carbondata.hadoop.CarbonInputSplit
import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat}

/**
* Its a custom implementation which uses carbon's driver pruning feature to prune carbondata files
* using carbonindex.
*/
class CarbonFileIndex(
sparkSession: SparkSession,
dataSchema: StructType,
Expand Down Expand Up @@ -72,39 +76,46 @@ class CarbonFileIndex(

private def prune(dataFilters: Seq[Expression],
directories: Seq[PartitionDirectory]) = {
val hadoopConf = new Configuration(sparkSession.sparkContext.hadoopConfiguration)
val filters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
val filter: Option[CarbonExpression] = filters.flatMap { filter =>
CarbonSparkDataSourceUtil.createCarbonFilter(dataSchema, filter)
}.reduceOption(new AndExpression(_, _))

val tablePath = parameters("path")
val model = CarbonSparkDataSourceUtil.prepareLoadModel(parameters, dataSchema)
CarbonInputFormat.setTableInfo(
hadoopConf,
model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
CarbonInputFormat.setTransactionalTable(hadoopConf, false)
if (rootPaths.nonEmpty) {
if (!rootPaths.head.equals(new Path(tablePath)) &&
rootPaths.head.toString.contains(tablePath)) {
CarbonInputFormat.setSubFoldersToRead(hadoopConf, rootPaths.map(_.toUri.toString).toArray)
val tablePath = parameters.get("path")
if (tablePath.nonEmpty) {
val hadoopConf = new Configuration(sparkSession.sparkContext.hadoopConfiguration)
// convert t sparks source filter
val filters = dataFilters.flatMap(DataSourceStrategy.translateFilter)

// convert to carbon filter expressions
val filter: Option[CarbonExpression] = filters.flatMap { filter =>
CarbonSparkDataSourceUtil.createCarbonFilter(dataSchema, filter)
}.reduceOption(new AndExpression(_, _))
val model = CarbonSparkDataSourceUtil.prepareLoadModel(parameters, dataSchema)
CarbonInputFormat.setTableInfo(
hadoopConf,
model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
CarbonInputFormat.setTransactionalTable(hadoopConf, false)
if (rootPaths.nonEmpty) {
// Check for any subfolders are present here.
if (!rootPaths.head.equals(new Path(tablePath.get)) &&
rootPaths.head.toString.contains(tablePath.get)) {
CarbonInputFormat.setSubFoldersToRead(hadoopConf, rootPaths.map(_.toUri.toString).toArray)
}
}
filter match {
case Some(c) => CarbonInputFormat.setFilterPredicates(hadoopConf, c)
case None => None
}
val format: CarbonFileInputFormat[Object] = new CarbonFileInputFormat[Object]
val jobConf = new JobConf(hadoopConf)
SparkHadoopUtil.get.addCredentials(jobConf)
val splits = format.getSplits(Job.getInstance(jobConf))
.asInstanceOf[util.List[CarbonInputSplit]].asScala
val prunedDirs = directories.map { dir =>
val files = dir.files
.filter(d => splits.exists(_.getBlockPath.equalsIgnoreCase(d.getPath.getName)))
PartitionDirectory(dir.values, files)
}
prunedDirs
} else {
directories
}
filter match {
case Some(c) => CarbonInputFormat.setFilterPredicates(hadoopConf, c)
case None => None
}
val format: CarbonFileInputFormat[Object] = new CarbonFileInputFormat[Object]
val jobConf = new JobConf(hadoopConf)
SparkHadoopUtil.get.addCredentials(jobConf)
val splits = format.getSplits(Job.getInstance(jobConf))
.asInstanceOf[util.List[CarbonInputSplit]].asScala
val prunedDirs = directories.map { dir =>
val files = dir.files
.filter(d => splits.exists(_.getBlockPath.equalsIgnoreCase(d.getPath.getName)))
PartitionDirectory(dir.values, files)
}
prunedDirs
}

override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,19 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath

/**
* Rule to replace FileIndex with CarbonFileIndex for better driver pruning.
*/
class CarbonFileIndexReplaceRule extends Rule[LogicalPlan] {

val createSubFolder = CarbonProperties.getInstance()
/**
* This property creates subfolder for every load
*/
private val createSubFolder = CarbonProperties.getInstance()
.getProperty("carbonfileformat.create.folder.perload", "false").toBoolean

override def apply(plan: LogicalPlan): LogicalPlan = {
val p = plan.transform {
val transformedPlan = plan.transform {
case l: LogicalRelation
if l.relation.isInstanceOf[HadoopFsRelation] &&
l.relation.asInstanceOf[HadoopFsRelation].fileFormat.toString.equals("carbon") &&
Expand All @@ -54,10 +60,10 @@ class CarbonFileIndexReplaceRule extends Rule[LogicalPlan] {
val path = new Path(insert.outputPath, System.nanoTime().toString)
insert.copy(outputPath = path)
}
p
transformedPlan
}

def updateFileIndex(fileIndex: InMemoryFileIndex,
private def updateFileIndex(fileIndex: InMemoryFileIndex,
hadoopFsRelation: HadoopFsRelation): InMemoryFileIndex = {
if (fileIndex.rootPaths.length == 1) {
val carbonFile = FileFactory.getCarbonFile(fileIndex.rootPaths.head.toUri.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import org.apache.carbondata.sdk.file.{CarbonWriterBuilder, Field, Schema}

object CarbonSparkDataSourceUtil {

/**
* Convert from carbon datatype to sparks datatype
*/
def convertCarbonToSparkDataType(dataType: CarbonDataType): types.DataType = {
if (CarbonDataTypes.isDecimal(dataType)) {
DecimalType(dataType.asInstanceOf[CarbonDecimalType].getPrecision,
Expand All @@ -51,7 +54,9 @@ object CarbonSparkDataSourceUtil {
}
}

// TODO: move this to spark module
/**
* Convert from sparks datatype to carbon datatype
*/
def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = {
dataType match {
case StringType => CarbonDataTypes.STRING
Expand Down Expand Up @@ -185,6 +190,9 @@ object CarbonSparkDataSourceUtil {
javaList
}

/**
* Create load model for carbon
*/
def prepareLoadModel(options: Map[String, String],
dataSchema: StructType): CarbonLoadModel = {
val schema = new Schema(dataSchema.fields.map { field =>
Expand All @@ -206,7 +214,7 @@ object CarbonSparkDataSourceUtil {
})
val builder = new CarbonWriterBuilder
builder.isTransactionalTable(false)
builder.outputPath(options("path"))
builder.outputPath(options.getOrElse("path", ""))
val blockSize = options.get(CarbonCommonConstants.TABLE_BLOCKSIZE).map(_.toInt)
if (blockSize.isDefined) {
builder.withBlockSize(blockSize.get)
Expand Down
Loading

0 comments on commit 41b2aed

Please sign in to comment.