Skip to content

Commit

Permalink
Fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ravipesala committed Aug 24, 2018
1 parent 08032d3 commit 23abf03
Show file tree
Hide file tree
Showing 16 changed files with 94 additions and 195 deletions.
Expand Up @@ -180,11 +180,11 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be
|'$writerPath' """.stripMargin)

//org.apache.spark.SparkException: Index file not present to read the carbondata file
val exception = intercept[java.lang.RuntimeException]
val exception = intercept[Exception]
{
sql("select * from sdkOutputTable").show(false)
}
assert(exception.getMessage().contains("Error while taking index snapshot"))
assert(exception.getMessage().contains("No Index files are present in the table location"))

sql("DROP TABLE sdkOutputTable")
// drop table should not delete the files
Expand Down
Expand Up @@ -109,9 +109,9 @@ class DBLocationCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {


test("Update operation on carbon table") {
sql("drop database if exists carbon cascade")
sql(s"create database carbon location '$dblocation'")
sql("use carbon")
sql("drop database if exists carbon1 cascade")
sql(s"create database carbon1 location '$dblocation'")
sql("use carbon1")
sql(
"""
CREATE TABLE automerge(id int, name string, city string, age int)
Expand All @@ -120,7 +120,7 @@ class DBLocationCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
val testData = s"$resourcesPath/sample.csv"
sql(s"LOAD DATA LOCAL INPATH '$testData' into table automerge")
// update operation
sql("""update carbon.automerge d set (d.id) = (d.id + 1) where d.id > 2""").show()
sql("""update carbon1.automerge d set (d.id) = (d.id + 1) where d.id > 2""").show()
checkAnswer(
sql("select count(*) from automerge"),
Seq(Row(6))
Expand All @@ -129,16 +129,16 @@ class DBLocationCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
}

test("Delete operation on carbon table") {
sql("drop database if exists carbon cascade")
sql(s"create database carbon location '$dblocation'")
sql("use carbon")
sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
sql("drop database if exists carbon1 cascade")
sql(s"create database carbon1 location '$dblocation'")
sql("use carbon1")
sql("""create table carbon1.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
sql("insert into carbontable select 'a',1,'aa','aaa'")
sql("insert into carbontable select 'b',1,'bb','bbb'")
// delete operation
sql("""delete from carbontable where c3 = 'aa'""").show
checkAnswer(
sql("""select c1,c2,c3,c5 from carbon.carbontable"""),
sql("""select c1,c2,c3,c5 from carbon1.carbontable"""),
Seq(Row("b",1,"bb","bbb"))
)
sql("drop table carbontable")
Expand Down
Expand Up @@ -537,15 +537,15 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {

test("Update operation on carbon table with singlepass") {
sql(s"""set ${ CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS }=true""")
sql("drop database if exists carbon cascade")
sql(s"create database carbon location '$dblocation'")
sql("use carbon")
sql("drop database if exists carbon1 cascade")
sql(s"create database carbon1 location '$dblocation'")
sql("use carbon1")
sql("""CREATE TABLE carbontable(id int, name string, city string, age int)
STORED BY 'org.apache.carbondata.format'""")
val testData = s"$resourcesPath/sample.csv"
sql(s"LOAD DATA LOCAL INPATH '$testData' into table carbontable")
// update operation
sql("""update carbon.carbontable d set (d.id) = (d.id + 1) where d.id > 2""").show()
sql("""update carbon1.carbontable d set (d.id) = (d.id + 1) where d.id > 2""").show()
checkAnswer(
sql("select count(*) from carbontable"),
Seq(Row(6))
Expand All @@ -556,15 +556,15 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
test("Update operation on carbon table with persist false") {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.isPersistEnabled, "false")
sql("drop database if exists carbon cascade")
sql(s"create database carbon location '$dblocation'")
sql("use carbon")
sql("drop database if exists carbon1 cascade")
sql(s"create database carbon1 location '$dblocation'")
sql("use carbon1")
sql("""CREATE TABLE carbontable(id int, name string, city string, age int)
STORED BY 'org.apache.carbondata.format'""")
val testData = s"$resourcesPath/sample.csv"
sql(s"LOAD DATA LOCAL INPATH '$testData' into table carbontable")
// update operation
sql("""update carbon.carbontable d set (d.id) = (d.id + 1) where d.id > 2""").show()
sql("""update carbon1.carbontable d set (d.id) = (d.id + 1) where d.id > 2""").show()
checkAnswer(
sql("select count(*) from carbontable"),
Seq(Row(6))
Expand Down
Expand Up @@ -29,6 +29,7 @@ import scala.util.Try
import com.univocity.parsers.common.TextParsingException
import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.command.{Field, UpdateTableModel}
Expand All @@ -43,7 +44,7 @@ import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumn
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory
import org.apache.carbondata.core.metadata.ColumnIdentifier
import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField}
import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, ColumnSchema}
Expand All @@ -58,55 +59,6 @@ object CarbonScalaUtil {

val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)

// TODO: move this to spark module
def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = {
dataType match {
case StringType => CarbonDataTypes.STRING
case ShortType => CarbonDataTypes.SHORT
case IntegerType => CarbonDataTypes.INT
case LongType => CarbonDataTypes.LONG
case DoubleType => CarbonDataTypes.DOUBLE
case FloatType => CarbonDataTypes.FLOAT
case DateType => CarbonDataTypes.DATE
case BooleanType => CarbonDataTypes.BOOLEAN
case TimestampType => CarbonDataTypes.TIMESTAMP
case ArrayType(elementType, _) =>
CarbonDataTypes.createArrayType(CarbonScalaUtil.convertSparkToCarbonDataType(elementType))
case StructType(fields) =>
val carbonFields = new util.ArrayList[CarbonStructField]
fields.map { field =>
carbonFields.add(
new CarbonStructField(
field.name,
CarbonScalaUtil.convertSparkToCarbonDataType(field.dataType)))
}
CarbonDataTypes.createStructType(carbonFields)
case NullType => CarbonDataTypes.NULL
case decimal: DecimalType =>
CarbonDataTypes.createDecimalType(decimal.precision, decimal.scale)
case _ => throw new UnsupportedOperationException("getting " + dataType + " from spark")
}
}

def convertCarbonToSparkDataType(dataType: CarbonDataType): types.DataType = {
if (CarbonDataTypes.isDecimal(dataType)) {
DecimalType(dataType.asInstanceOf[CarbonDecimalType].getPrecision,
dataType.asInstanceOf[CarbonDecimalType].getScale)
} else {
dataType match {
case CarbonDataTypes.STRING => StringType
case CarbonDataTypes.SHORT => ShortType
case CarbonDataTypes.INT => IntegerType
case CarbonDataTypes.LONG => LongType
case CarbonDataTypes.DOUBLE => DoubleType
case CarbonDataTypes.BOOLEAN => BooleanType
case CarbonDataTypes.TIMESTAMP => TimestampType
case CarbonDataTypes.DATE => DateType
case CarbonDataTypes.VARCHAR => StringType
}
}
}

def getString(value: Any,
serializationNullFormat: String,
delimiterLevel1: String,
Expand Down Expand Up @@ -150,7 +102,8 @@ object CarbonScalaUtil {
case _ =>
val convertedValue =
DataTypeUtil
.getDataBasedOnDataType(value, convertSparkToCarbonDataType(dataType))
.getDataBasedOnDataType(value,
CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataType))
if (convertedValue == null) {
if (defaultValue) {
return dataType match {
Expand Down Expand Up @@ -301,7 +254,8 @@ object CarbonScalaUtil {
pvalue
}
val carbonColumn = table.getColumnByName(table.getTableName, col.toLowerCase)
val dataType = CarbonScalaUtil.convertCarbonToSparkDataType(carbonColumn.getDataType)
val dataType =
CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(carbonColumn.getDataType)
try {
if (value.equals(hivedefaultpartition)) {
(col, value)
Expand Down
83 changes: 0 additions & 83 deletions integration/spark-datasource/pom.xml
Expand Up @@ -192,88 +192,5 @@
<maven.test.skip>true</maven.test.skip>
</properties>
</profile>
<profile>
<id>spark-2.1</id>
<properties>
<spark.version>2.1.0</spark.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.8</scala.version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<excludes>
<exclude>src/main/spark2.2</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/spark2.1</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>spark-2.2</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<spark.version>2.2.1</spark.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.8</scala.version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<excludes>
<exclude>src/main/spark2.1</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/spark2.2</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Expand Up @@ -183,7 +183,7 @@ object CarbonSparkDataSourceUtil {

// Convert scala list to java list, Cannot use scalaList.asJava as while deserializing it is
// not able find the classes inside scala list and gives ClassNotFoundException.
private def convertToJavaList(
def convertToJavaList(
scalaList: Seq[CarbonExpression]): java.util.List[CarbonExpression] = {
val javaList = new java.util.ArrayList[CarbonExpression]()
scalaList.foreach(javaList.add)
Expand Down Expand Up @@ -230,9 +230,22 @@ object CarbonSparkDataSourceUtil {
CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT).toInt)
builder.sortBy(
options.get(CarbonCommonConstants.SORT_COLUMNS).map(_.split(",").map(_.trim)).orNull)
builder.isTransactionalTable(false)
builder.uniqueIdentifier(System.currentTimeMillis())
val model = builder.buildLoadModel(schema)
val tableInfo = model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo
val properties =
tableInfo.getFactTable.getTableProperties
// Add the meta cache level
options.map{ case (key, value) =>
if (key.equalsIgnoreCase(CarbonCommonConstants.COLUMN_META_CACHE)) {
val columnsToBeCached = value.split(",").map(x => x.trim.toLowerCase).toSeq
// make the columns in create table order and then add it to table properties
val createOrder =
tableInfo.getFactTable.getListOfColumns.asScala.map(_.getColumnName).filter(
col => columnsToBeCached.contains(col))
properties.put(CarbonCommonConstants.COLUMN_META_CACHE, createOrder.mkString(","))
}
}
model
}
}
Expand Up @@ -329,8 +329,7 @@ class SparkCarbonFileFormat extends FileFormat
file: PartitionedFile => {
assert(file.partitionValues.numFields == partitionSchema.size)

if (!(file.filePath.endsWith(CarbonTablePath.INDEX_FILE_EXT) ||
file.filePath.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT))) {
if (file.filePath.endsWith(CarbonTablePath.CARBON_DATA_EXT)) {
val split = new CarbonInputSplit("null",
new Path(file.filePath),
file.start,
Expand Down
Expand Up @@ -23,13 +23,12 @@ import org.apache.spark.sql.types.StructType

import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport
import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport

/**
* Read support class which converts carbon row array format to sparks Internal row.
*/
class SparkUnsafeRowReadSuport(requiredSchema: StructType)
extends DictionaryDecodeReadSupport[InternalRow] {
class SparkUnsafeRowReadSuport(requiredSchema: StructType) extends CarbonReadSupport[InternalRow] {
private val unsafeProjection = UnsafeProjection.create(requiredSchema)
override def initialize(carbonColumns: Array[CarbonColumn],
carbonTable: CarbonTable): Unit = {
Expand All @@ -38,4 +37,8 @@ class SparkUnsafeRowReadSuport(requiredSchema: StructType)
override def readRow(data: Array[AnyRef]): InternalRow = {
unsafeProjection(new GenericInternalRow(data.asInstanceOf[Array[Any]]))
}

override def close(): Unit = {
// Nothing to close
}
}
Expand Up @@ -108,8 +108,13 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
.format("parquet").partitionBy("c2").saveAsTable("testparquet")
spark.sql("create table carbon_table(c1 string, c2 string, number int) using carbon PARTITIONED by (c2)")
spark.sql("insert into carbon_table select * from testparquet")
assert(spark.sql("select * from carbon_table").count() == 10)
TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from testparquet"))
// TODO fix in 2.1
if (!spark.sparkContext.version.contains("2.1")) {
assert(spark.sql("select * from carbon_table").count() == 10)
TestUtil
.checkAnswer(spark.sql("select * from carbon_table"),
spark.sql("select * from testparquet"))
}
spark.sql("drop table if exists carbon_table")
spark.sql("drop table if exists testparquet")
}
Expand Down
Expand Up @@ -316,7 +316,7 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
{
spark.sql("select * from sdkOutputTable").show(false)
}
assert(exception.getMessage().contains("Error while taking index snapshot"))
assert(exception.getMessage().contains("No Index files are present in the table location"))

spark.sql("DROP TABLE sdkOutputTable")
// drop table should not delete the files
Expand Down
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit}
import scala.collection.JavaConverters._

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.types.{StructField, StructType}

Expand All @@ -30,7 +31,6 @@ import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandExcepti
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.spark.StreamingOption
import org.apache.carbondata.spark.util.CarbonScalaUtil
import org.apache.carbondata.streaming.CarbonStreamException

/**
Expand Down Expand Up @@ -60,7 +60,7 @@ object StreamJobManager {
}
val fields = sink.getCreateOrderColumn(sink.getTableName).asScala.map { column =>
StructField(column.getColName,
CarbonScalaUtil.convertCarbonToSparkDataType(column.getDataType))
CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(column.getDataType))
}
if (!querySchema.equals(StructType(fields))) {
throw new MalformedCarbonCommandException(s"Schema of table ${sink.getTableName} " +
Expand Down

0 comments on commit 23abf03

Please sign in to comment.