Skip to content

Commit

Permalink
[CARBONDATA-2887] Fix complex filters on spark carbon file format
Browse files Browse the repository at this point in the history
Problem:
Filters on complex types are not working using carbon fileformat as it try to push down nonull filter of complex type to carbon,
 but carbon does not handle any type of filters in complex types.
Solution:
Removed all types complex filters pushed down from carbon fileformat

This closes #2659
  • Loading branch information
ravipesala authored and kumarvishal09 committed Aug 29, 2018
1 parent d801548 commit 2f537b7
Show file tree
Hide file tree
Showing 12 changed files with 355 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ object TestQueryExecutor {
TestQueryExecutor.projectPath + "/core/target",
TestQueryExecutor.projectPath + "/hadoop/target",
TestQueryExecutor.projectPath + "/processing/target",
TestQueryExecutor.projectPath + "/integration/spark-datasource/target",
TestQueryExecutor.projectPath + "/integration/spark-common/target",
TestQueryExecutor.projectPath + "/integration/spark2/target",
TestQueryExecutor.projectPath + "/integration/spark-common/target/jars",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@ import java.util

import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.{InMemoryFileIndex, _}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.types.StructType

import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, HDFSCarbonFile}
Expand Down Expand Up @@ -79,9 +78,9 @@ class CarbonFileIndex(
}

private def prune(dataFilters: Seq[Expression],
directories: Seq[PartitionDirectory]) = {
directories: Seq[PartitionDirectory]): Seq[PartitionDirectory] = {
val tablePath = parameters.get("path")
if (tablePath.nonEmpty) {
if (tablePath.nonEmpty && dataFilters.nonEmpty) {
val hadoopConf = sparkSession.sessionState.newHadoopConf()
// convert t sparks source filter
val filters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
Expand All @@ -104,7 +103,7 @@ class CarbonFileIndex(
map(new HDFSCarbonFile(_))
}.toArray.asInstanceOf[Array[CarbonFile]]
if (indexFiles.length == 0 && totalFiles > 0) {
throw new IOException("No Index files are present in the table location :" + tablePath.get)
return directories
}
CarbonInputFormat.setReadCommittedScope(
hadoopConf,
Expand All @@ -125,7 +124,11 @@ class CarbonFileIndex(
}
prunedDirs
} else {
directories
directories.map { dir =>
val files = dir.files
.filter(_.getPath.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT))
PartitionDirectory(dir.values, files)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class CarbonFileIndexReplaceRule extends Rule[LogicalPlan] {
val fsRelation = l.relation.asInstanceOf[HadoopFsRelation]
val fileIndex = fsRelation.location
val carbonFileIndex = new CarbonFileIndex(fsRelation.sparkSession,
fsRelation.schema,
fsRelation.dataSchema,
fsRelation.options,
updateFileIndex(fileIndex, fsRelation))
val fsRelationCopy = fsRelation.copy(location = carbonFileIndex)(fsRelation.sparkSession)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.types._

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.datatype
import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField}
import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField, StructType => CarbonStructType}
import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
import org.apache.carbondata.core.scan.expression.conditional._
import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
Expand Down Expand Up @@ -196,20 +195,11 @@ object CarbonSparkDataSourceUtil {
def prepareLoadModel(options: Map[String, String],
dataSchema: StructType): CarbonLoadModel = {
val schema = new Schema(dataSchema.fields.map { field =>
field.dataType match {
case s: StructType =>
new Field(field.name,
field.dataType.typeName,
s.fields
.map(f => new datatype.StructField(f.name,
CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(f.dataType))).toList.asJava)
case a: ArrayType =>
new Field(field.name,
field.dataType.typeName,
Seq(new datatype.StructField(field.name,
CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(a.elementType))).toList.asJava)
case other =>
new Field(field.name, field.dataType.simpleString)
val dataType = convertSparkToCarbonDataType(field.dataType)
dataType match {
case s: CarbonStructType =>
new Field(field.name, s, s.getFields)
case _ => new Field(field.name, dataType)
}
})
val builder = new CarbonWriterBuilder
Expand All @@ -228,8 +218,16 @@ object CarbonSparkDataSourceUtil {
builder.localDictionaryThreshold(
options.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT).toInt)
builder.sortBy(
options.get(CarbonCommonConstants.SORT_COLUMNS).map(_.split(",").map(_.trim)).orNull)
val sortCols = options.get(CarbonCommonConstants.SORT_COLUMNS) match {
case Some(cols) =>
if (cols.trim.isEmpty) {
Array[String]()
} else {
cols.split(",").map(_.trim)
}
case _ => null
}
builder.sortBy(sortCols)
builder.uniqueIdentifier(System.currentTimeMillis())
val model = builder.buildLoadModel(schema)
val tableInfo = model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,14 @@ class SparkCarbonFileFormat extends FileFormat
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
val tablePath = options.get("path") match {
case Some(path) => path
case Some(path) =>
val defaultFsUrl =
sparkSession.sparkContext.hadoopConfiguration.get(CarbonCommonConstants.FS_DEFAULT_FS)
if (defaultFsUrl == null) {
path
} else {
defaultFsUrl + CarbonCommonConstants.FILE_SEPARATOR + path
}
case _ => FileFactory.getUpdatedFilePath(files.head.getPath.getParent.toUri.toString)
}

Expand Down Expand Up @@ -225,6 +232,8 @@ class SparkCarbonFileFormat extends FileFormat
while (i < data.length) {
if (!row.isNullAt(i)) {
dataType match {
case StringType =>
data(i) = row.getUTF8String(i).toString
case d: DecimalType =>
data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
case s: StructType =>
Expand All @@ -233,6 +242,8 @@ class SparkCarbonFileFormat extends FileFormat
data(i) = new ArrayObject(extractData(row.getArray(i), s.elementType))
case d: DateType =>
data(i) = (row.getInt(i) + cutOffDate).asInstanceOf[AnyRef]
case d: TimestampType =>
data(i) = (row.getLong(i) / 1000).asInstanceOf[AnyRef]
case other => data(i) = row.get(i, dataType)
}
} else {
Expand Down Expand Up @@ -281,7 +292,7 @@ class SparkCarbonFileFormat extends FileFormat
*/
override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
val conf = sparkSession.sessionState.conf
conf.wholeStageEnabled &&
supportVector(sparkSession, schema) && conf.wholeStageEnabled &&
schema.length <= conf.wholeStageMaxNumFields &&
schema.forall(_.dataType.isInstanceOf[AtomicType])
}
Expand All @@ -297,21 +308,25 @@ class SparkCarbonFileFormat extends FileFormat
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
val filter: Option[CarbonExpression] = filters.flatMap { filter =>
val dataTypeMap = dataSchema.map(f => f.name -> f.dataType).toMap
// Filter out the complex filters as carbon does not support them.
val filter: Option[CarbonExpression] = filters.filterNot{ ref =>
ref.references.exists{ p =>
!dataTypeMap(p).isInstanceOf[AtomicType]
}
}.flatMap { filter =>
CarbonSparkDataSourceUtil.createCarbonFilter(dataSchema, filter)
}.reduceOption(new AndExpression(_, _))

val projection = requiredSchema.map(_.name).toArray
val carbonProjection = new CarbonProjection
projection.foreach(carbonProjection.addColumn)

var supportBatchValue: Boolean = false

val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
val readVector = supportVector(sparkSession, resultSchema)
if (readVector) {
supportBatchValue = supportBatch(sparkSession, resultSchema)
}

var supportBatchValue: Boolean = supportBatch(sparkSession, resultSchema)
val readVector = supportVector(sparkSession, resultSchema) && supportBatchValue

val model = CarbonSparkDataSourceUtil.prepareLoadModel(options, dataSchema)
CarbonInputFormat
.setTableInfo(hadoopConf, model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
Expand Down
21 changes: 21 additions & 0 deletions integration/spark-datasource/src/test/resources/Array.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
Cust00000000000000000000,2015,1,20,M,SSC,Y,1234$5678$9101$11121$12357,United Kingdom$England$Basildon$AAID001001$United Kingdom$England$Basildon$AD003001$AAID001001$United Kingdom$England$Basildon$AD003001$United Kingdom$England$Basildon$COUNTY00045,2015-01-01 00:00:00$2014-01-01 00:00:00$2013-01-01 00:00:00$2012-01-01 00:00:00$2011-01-01 00:00:00,21,55,58,337982404.6,989431364.6
Cust00000000000000000001,2015,1,30,F,Degree,N,1235$5679$9102$11122$12358,United States$MO$Parkville$AAID001002$United States$MO$Parkville$AD003002$AAID001002$United States$MO$Parkville$AD003002$United States$MO$Parkville$COUNTY00046,2015-01-02 00:00:00$2014-01-02 00:00:00$2013-01-02 00:00:00$2012-01-02 00:00:00$2011-01-02 00:00:00,104,59,50,686815400.5,157442142.4
Cust00000000000000000002,2015,1,40,M,graduation,D,1236$5680$9103$11123$12359,United States$OR$Astoria$AAID001003$United States$OR$Astoria$AD003003$AAID001003$United States$OR$Astoria$AD003003$United States$OR$Astoria$COUNTY00047,2015-01-03 00:00:00$2014-01-03 00:00:00$2013-01-03 00:00:00$2012-01-03 00:00:00$2011-01-03 00:00:00,141,190,145,106733870.5,182602141
Cust00000000000000000003,2015,1,50,F,PG,Y,1237$5681$9104$11124$12360,Australia$Victoria$Echuca$AAID001004$Australia$Victoria$Echuca$AD003004$AAID001004$Australia$Victoria$Echuca$AD003004$Australia$Victoria$Echuca$COUNTY00048,2015-01-04 00:00:00$2014-01-04 00:00:00$2013-01-04 00:00:00$2012-01-04 00:00:00$2011-01-04 00:00:00,162,162,129,702614376.9,499071850.4
Cust00000000000000000004,2015,1,60,M,MS,N,1238$5682$9105$11125$12361,United States$AL$Cahaba Heights$AAID001005$United States$AL$Cahaba Heights$AD003005$AAID001005$United States$AL$Cahaba Heights$AD003005$United States$AL$Cahaba Heights$COUNTY00049,2015-01-05 00:00:00$2014-01-05 00:00:00$2013-01-05 00:00:00$2012-01-05 00:00:00$2011-01-05 00:00:00,35,139,93,469745206.2,480746358.2
Cust00000000000000000005,2015,1,70,F,Doctor,D,1239$5683$9106$11126$12362,United States$NJ$Mickleton$AAID001006$United States$NJ$Mickleton$AD003006$AAID001006$United States$NJ$Mickleton$AD003006$United States$NJ$Mickleton$COUNTY00050,2015-01-06 00:00:00$2014-01-06 00:00:00$2013-01-06 00:00:00$2012-01-06 00:00:00$2011-01-06 00:00:00,143,11,117,765486177,832680202.8
Cust00000000000000000006,2015,1,80,M,Layer,Y,1240$5684$9107$11127$12363,United States$IL$Peoria$AAID001007$United States$IL$Peoria$AD003007$AAID001007$United States$IL$Peoria$AD003007$United States$IL$Peoria$COUNTY00051,2015-01-07 00:00:00$2014-01-07 00:00:00$2013-01-07 00:00:00$2012-01-07 00:00:00$2011-01-07 00:00:00,69,46,3,384037419,960098816.4
Cust00000000000000000007,2015,1,90,F,Cop,N,1241$5685$9108$11128$12364,United States$TN$Martin$AAID001008$United States$TN$Martin$AD003008$AAID001008$United States$TN$Martin$AD003008$United States$TN$Martin$COUNTY00052,2015-01-08 00:00:00$2014-01-08 00:00:00$2013-01-08 00:00:00$2012-01-08 00:00:00$2011-01-08 00:00:00,31,96,153,370423493.5,378702989.2
Cust00000000000000000008,2015,1,95,M,Bank,D,1242$5686$9109$11129$12365,Israel$Tel Aviv$Tel Aviv$AAID001009$Israel$Tel Aviv$Tel Aviv$AD003009$AAID001009$Israel$Tel Aviv$Tel Aviv$AD003009$Israel$Tel Aviv$Tel Aviv$COUNTY00053,2015-01-09 00:00:00$2014-01-09 00:00:00$2013-01-09 00:00:00$2012-01-09 00:00:00$2011-01-09 00:00:00,74,178,146,84894789.63,241636065.5
Cust00000000000000000009,2015,1,45,F,Group1,Y,1243$5687$9110$11130$12366,France$Ile-de-France$Chatou$AAID001010$France$Ile-de-France$Chatou$AD003010$AAID001010$France$Ile-de-France$Chatou$AD003010$France$Ile-de-France$Chatou$COUNTY00054,2015-01-10 00:00:00$2014-01-10 00:00:00$2013-01-10 00:00:00$2012-01-10 00:00:00$2011-01-10 00:00:00,154,74,113,533819711.4,517387103.8
Cust00000000000000000010,2015,1,20,M,Group2,N,1244$5688$9111$11131$12367,United States$NY$New York$AAID001011$United States$NY$New York$AD003011$AAID001011$United States$NY$New York$AD003011$United States$NY$New York$COUNTY00055,2015-01-11 00:00:00$2014-01-11 00:00:00$2013-01-11 00:00:00$2012-01-11 00:00:00$2011-01-11 00:00:00,90,41,160,29095200.54,541633736.8
Cust00000000000000000011,2015,1,30,F,Group3,D,1245$5689$9112$11132$12368,Netherlands$Noord-Brabant$Eindhoven$AAID001012$Netherlands$Noord-Brabant$Eindhoven$AD003012$AAID001012$Netherlands$Noord-Brabant$Eindhoven$AD003012$Netherlands$Noord-Brabant$Eindhoven$COUNTY00056,2015-01-12 00:00:00$2014-01-12 00:00:00$2013-01-12 00:00:00$2012-01-12 00:00:00$2011-01-12 00:00:00,168,165,135,981582131.7,667832871.1
Cust00000000000000000012,2015,1,40,M,Group4,Y,1246$5690$9113$11133$12369,United States$TX$Shavano Park$AAID001013$United States$TX$Shavano Park$AD003013$AAID001013$United States$TX$Shavano Park$AD003013$United States$TX$Shavano Park$COUNTY00057,2015-01-13 00:00:00$2014-01-13 00:00:00$2013-01-13 00:00:00$2012-01-13 00:00:00$2011-01-13 00:00:00,49,105,4,764456100.7,870094004.8
Cust00000000000000000013,2015,1,50,F,Group5,N,1247$5691$9114$11134$12370,United States$ID$Eagle$AAID001014$United States$ID$Eagle$AD003014$AAID001014$United States$ID$Eagle$AD003014$United States$ID$Eagle$COUNTY00058,2015-01-14 00:00:00$2014-01-14 00:00:00$2013-01-14 00:00:00$2012-01-14 00:00:00$2011-01-14 00:00:00,163,43,29,344022418.1,646897328.3
Cust00000000000000000014,2015,1,60,M,Group6,D,1248$5692$9115$11135$12371,United States$NJ$Riverside$AAID001015$United States$NJ$Riverside$AD003015$AAID001015$United States$NJ$Riverside$AD003015$United States$NJ$Riverside$COUNTY00059,2015-01-15 00:00:00$2014-01-15 00:00:00$2013-01-15 00:00:00$2012-01-15 00:00:00$2011-01-15 00:00:00,195,109,34,483925883.8,961737525
Cust00000000000000000015,2015,1,70,F,SSC,Y,1249$5693$9116$11136$12372,Ireland$Meath$Julianstown$AAID001016$Ireland$Meath$Julianstown$AD003016$AAID001016$Ireland$Meath$Julianstown$AD003016$Ireland$Meath$Julianstown$COUNTY00060,2015-01-16 00:00:00$2014-01-16 00:00:00$2013-01-16 00:00:00$2012-01-16 00:00:00$2011-01-16 00:00:00,149,174,78,315807967.4,274095983.1
Cust00000000000000000016,2015,1,80,M,Degree,N,1250$5694$9117$11137$12373,Canada$Ontario$Ottawa$AAID001017$Canada$Ontario$Ottawa$AD003017$AAID001017$Canada$Ontario$Ottawa$AD003017$Canada$Ontario$Ottawa$COUNTY00061,2015-01-17 00:00:00$2014-01-17 00:00:00$2013-01-17 00:00:00$2012-01-17 00:00:00$2011-01-17 00:00:00,156,114,95,50368016.86,667794471.6
Cust00000000000000000017,2015,1,90,F,graduation,D,1251$5695$9118$11138$12374,India$Andhra Pradesh$Hyderabad$AAID001018$India$Andhra Pradesh$Hyderabad$AD003018$AAID001018$India$Andhra Pradesh$Hyderabad$AD003018$India$Andhra Pradesh$Hyderabad$COUNTY00062,2015-01-18 00:00:00$2014-01-18 00:00:00$2013-01-18 00:00:00$2012-01-18 00:00:00$2011-01-18 00:00:00,48,39,102,504966685.5,786884573.4
Cust00000000000000000018,2015,1,95,M,PG,Y,1252$5696$9119$11139$12375,United Kingdom$England$London$AAID001019$United Kingdom$England$London$AD003019$AAID001019$United Kingdom$England$London$AD003019$United Kingdom$England$London$COUNTY00063,2015-01-19 00:00:00$2014-01-19 00:00:00$2013-01-19 00:00:00$2012-01-19 00:00:00$2011-01-19 00:00:00,3,180,55,781101377.9,422578922.1
Cust00000000000000000019,2015,1,45,F,MS,N,1253$5697$9120$11140$12376,United States$UT$Salt Lake City$AAID001020$United States$UT$Salt Lake City$AD003020$AAID001020$United States$UT$Salt Lake City$AD003020$United States$UT$Salt Lake City$COUNTY00064,2015-01-20 00:00:00$2014-01-20 00:00:00$2013-01-20 00:00:00$2012-01-20 00:00:00$2011-01-20 00:00:00,113,50,30,108832885.3,988692298
Cust00000000000000000020,2015,1,20,M,Doctor,D,1254$5698$9121$11141$12377,United Kingdom$England$Manchester$AAID001021$United Kingdom$England$Manchester$AD003021$AAID001021$United Kingdom$England$Manchester$AD003021$United Kingdom$England$Manchester$COUNTY00065,2015-01-21 00:00:00$2014-01-21 00:00:00$2013-01-21 00:00:00$2012-01-21 00:00:00$2011-01-21 00:00:00,60,145,96,409507003.5,592528603.7
1 change: 1 addition & 0 deletions integration/spark-datasource/src/test/resources/j2.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1,gb3e5135-5533-4ee7-51b3-F61F1355b471,2,2,,,2,563FXN1S,2016-06-28,OORM1L,,,46315_4,,,,,,,,66116E013000000000000000,66116E013000000000000000,13.143.170.55,0.0.0.1,,1,1,ZOCERS,1,1,,,seach out for star wars starwars starwars@foxmovies.comAA,,,,64,6416557544541,26557544541,560111140564316,64075303555565,504,55,,,,63613316334514,,,,,,211111111111111111,,,1,1,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,11163575,20160628
Loading

0 comments on commit 2f537b7

Please sign in to comment.