From 6f49d80ae53ed2036d215842e8f4e0062408df10 Mon Sep 17 00:00:00 2001 From: Indhumathi27 Date: Tue, 18 Dec 2018 16:08:42 +0530 Subject: [PATCH] [CARBONDATA-3184]Fix DataLoad Failure with 'using carbondata' --- .../org/apache/spark/sql/CarbonSource.scala | 13 +++++++++++-- .../apache/spark/sql/CarbonToSparkAdapater.scala | 9 +++++++++ .../apache/spark/sql/CarbonToSparkAdapater.scala | 9 +++++++++ .../apache/spark/sql/CarbonToSparkAdapater.scala | 9 +++++++++ .../spark/util/AllDictionaryTestCase.scala | 16 ++++++++++++++-- 5 files changed, 52 insertions(+), 4 deletions(-) diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala index 9899e8bdb8f..1d5c71dc3c3 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql +import java.net.URI import java.util.Locale import scala.collection.JavaConverters._ @@ -25,7 +26,7 @@ import scala.language.implicitConversions import org.apache.commons.lang.StringUtils import org.apache.spark.sql.catalyst.analysis.NoSuchTableException -import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command.{TableModel, TableNewProcessor} import org.apache.spark.sql.execution.strategy.CarbonLateDecodeStrategy @@ -37,6 +38,7 @@ import org.apache.spark.sql.sources._ import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CarbonException +import org.apache.spark.util.SparkUtil import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory @@ -331,7 +333,14 @@ object CarbonSource { properties, query) // updating params - val updatedFormat = storageFormat.copy(properties = map) + var updatedFormat: CatalogStorageFormat = null + // For Spark version 2.2 and above, check if catalog table locationUri is empty, then assign + // the value of tablepath to locationUri + if (SparkUtil.isSparkVersionXandAbove("2.2") && tableDesc.storage.locationUri.isEmpty) { + updatedFormat = CarbonToSparkAdapater.getUpdatedStorageFormat(storageFormat, map, tablePath) + } else { + updatedFormat = storageFormat.copy(properties = map) + } tableDesc.copy(storage = updatedFormat) } else { val tableInfo = CarbonUtil.convertGsonToTableInfo(properties.asJava) diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala index d5fe6a45c7e..52c27eeb78a 100644 --- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala +++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala @@ -18,8 +18,11 @@ package org.apache.spark.sql +import java.net.URI + import org.apache.spark.SparkContext import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} +import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, SubqueryExpression} import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation @@ -72,4 +75,10 @@ object CarbonToSparkAdapater { ExpressionSet(filterPredicates) .filter(_.references.subsetOf(partitionSet))) } + + def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat, + map: Map[String, String], + tablePath: String): CatalogStorageFormat = { + storageFormat.copy(properties = map, locationUri = Some(tablePath)) + } } diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala index 7a68e3e5ee9..244b0979393 100644 --- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala +++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonToSparkAdapater.scala @@ -18,8 +18,11 @@ package org.apache.spark.sql +import java.net.URI + import org.apache.spark.SparkContext import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} +import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression} import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext import org.apache.spark.sql.catalyst.optimizer.OptimizeCodegen @@ -79,4 +82,10 @@ object CarbonToSparkAdapater { def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = { Seq(OptimizeCodegen(conf)) } + + def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat, + map: Map[String, String], + tablePath: String): CatalogStorageFormat = { + storageFormat.copy(properties = map, locationUri = Some(new URI(tablePath))) + } } diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala index cec4c367b5e..4c4ce4dbbe0 100644 --- a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala +++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapater.scala @@ -18,8 +18,11 @@ package org.apache.spark.sql +import java.net.URI + import org.apache.spark.SparkContext import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} +import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, SubqueryExpression} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation} import org.apache.spark.sql.catalyst.rules.Rule @@ -81,4 +84,10 @@ object CarbonToSparkAdapater { def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = { Seq.empty } + + def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat, + map: Map[String, String], + tablePath: String): CatalogStorageFormat = { + storageFormat.copy(properties = map, locationUri = Some(new URI(tablePath))) + } } diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala index bceb0fced02..58e5665bbed 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala @@ -18,7 +18,7 @@ package org.apache.carbondata.spark.util import org.apache.spark.sql.common.util.Spark2QueryTest import org.apache.spark.sql.hive.CarbonRelation -import org.apache.spark.sql.{CarbonEnv, SparkSession} +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -82,6 +82,7 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll { override def beforeAll { sql("drop table if exists sample") sql("drop table if exists complextypes") + sql("drop table if exists tabletest") buildTestData // second time comment this line buildTable @@ -167,9 +168,20 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll { DictionaryTestCaseUtil. checkDictionary(complexRelation, "channelsId", "1650") } - + + test("test create table thorugh 'using carbondata' and load data") { + sql( + "CREATE TABLE tabletest (empno INT, workgroupcategory STRING, deptno INT, projectcode INT, " + + "attendance INT) USING carbondata") + sql( + s"""LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE tabletest OPTIONS + |('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""".stripMargin) + checkAnswer(sql("select count(*) from tabletest"), Row(10)) + } + override def afterAll { sql("drop table sample") sql("drop table complextypes") + sql("drop table tabletest") } }