From ea1052afae48aaae3dfd7ff445973eb88bb67a4f Mon Sep 17 00:00:00 2001 From: Indhumathi27 Date: Tue, 18 Dec 2018 16:08:42 +0530 Subject: [PATCH] [CARBONDATA-3184]Fix DataLoad Failure with 'using carbondata' --- .../org/apache/spark/sql/CarbonSource.scala | 3 ++- .../apache/spark/sql/CarbonToSparkAdapater.scala | 9 +++++++++ .../apache/spark/sql/CarbonToSparkAdapater.scala | 9 +++++++++ .../apache/spark/sql/CarbonToSparkAdapater.scala | 9 +++++++++ .../spark/util/AllDictionaryTestCase.scala | 16 ++++++++++++++-- 5 files changed, 43 insertions(+), 3 deletions(-) diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala index 9899e8bdb8f..7f72d424730 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala @@ -331,7 +331,8 @@ object CarbonSource { properties, query) // updating params - val updatedFormat = storageFormat.copy(properties = map) + val updatedFormat = CarbonToSparkAdapater + .getUpdatedStorageFormat(storageFormat, map, tablePath) tableDesc.copy(storage = updatedFormat) } else { val tableInfo = CarbonUtil.convertGsonToTableInfo(properties.asJava) diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala index d5fe6a45c7e..52c27eeb78a 100644 --- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala +++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala @@ -18,8 +18,11 @@ package org.apache.spark.sql +import java.net.URI + import org.apache.spark.SparkContext import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} +import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, SubqueryExpression} import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation @@ -72,4 +75,10 @@ object CarbonToSparkAdapater { ExpressionSet(filterPredicates) .filter(_.references.subsetOf(partitionSet))) } + + def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat, + map: Map[String, String], + tablePath: String): CatalogStorageFormat = { + storageFormat.copy(properties = map, locationUri = Some(tablePath)) + } } diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala index 7a68e3e5ee9..244b0979393 100644 --- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala +++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala @@ -18,8 +18,11 @@ package org.apache.spark.sql +import java.net.URI + import org.apache.spark.SparkContext import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} +import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression} import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext import org.apache.spark.sql.catalyst.optimizer.OptimizeCodegen @@ -79,4 +82,10 @@ object CarbonToSparkAdapater { def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = { Seq(OptimizeCodegen(conf)) } + + def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat, + map: Map[String, String], + tablePath: String): CatalogStorageFormat = { + storageFormat.copy(properties = map, locationUri = Some(new URI(tablePath))) + } } diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala index cec4c367b5e..4c4ce4dbbe0 100644 --- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala +++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala @@ -18,8 +18,11 @@ package org.apache.spark.sql +import java.net.URI + import org.apache.spark.SparkContext import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} +import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, SubqueryExpression} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation} import org.apache.spark.sql.catalyst.rules.Rule @@ -81,4 +84,10 @@ object CarbonToSparkAdapater { def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = { Seq.empty } + + def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat, + map: Map[String, String], + tablePath: String): CatalogStorageFormat = { + storageFormat.copy(properties = map, locationUri = Some(new URI(tablePath))) + } } diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala index bceb0fced02..58e5665bbed 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala @@ -18,7 +18,7 @@ package org.apache.carbondata.spark.util import org.apache.spark.sql.common.util.Spark2QueryTest import org.apache.spark.sql.hive.CarbonRelation -import org.apache.spark.sql.{CarbonEnv, SparkSession} +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -82,6 +82,7 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll { override def beforeAll { sql("drop table if exists sample") sql("drop table if exists complextypes") + sql("drop table if exists tabletest") buildTestData // second time comment this line buildTable @@ -167,9 +168,20 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll { DictionaryTestCaseUtil. checkDictionary(complexRelation, "channelsId", "1650") } - + + test("test create table thorugh 'using carbondata' and load data") { + sql( + "CREATE TABLE tabletest (empno INT, workgroupcategory STRING, deptno INT, projectcode INT, " + + "attendance INT) USING carbondata") + sql( + s"""LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE tabletest OPTIONS + |('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""".stripMargin) + checkAnswer(sql("select count(*) from tabletest"), Row(10)) + } + override def afterAll { sql("drop table sample") sql("drop table complextypes") + sql("drop table tabletest") } }