Skip to content

Commit

Permalink
Merge 6f49d80 into 96b2ea3
Browse files Browse the repository at this point in the history
  • Loading branch information
Indhumathi27 committed Dec 21, 2018
2 parents 96b2ea3 + 6f49d80 commit fa7e242
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 4 deletions.
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql

import java.net.URI
import java.util.Locale

import scala.collection.JavaConverters._
Expand All @@ -25,7 +26,7 @@ import scala.language.implicitConversions

import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.{TableModel, TableNewProcessor}
import org.apache.spark.sql.execution.strategy.CarbonLateDecodeStrategy
Expand All @@ -37,6 +38,7 @@ import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.SparkUtil

import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
Expand Down Expand Up @@ -331,7 +333,14 @@ object CarbonSource {
properties,
query)
// updating params
val updatedFormat = storageFormat.copy(properties = map)
var updatedFormat: CatalogStorageFormat = null
// For Spark version 2.2 and above, check if catalog table locationUri is empty, then assign
// the value of tablepath to locationUri
if (SparkUtil.isSparkVersionXandAbove("2.2") && tableDesc.storage.locationUri.isEmpty) {
updatedFormat = CarbonToSparkAdapater.getUpdatedStorageFormat(storageFormat, map, tablePath)
} else {
updatedFormat = storageFormat.copy(properties = map)
}
tableDesc.copy(storage = updatedFormat)
} else {
val tableInfo = CarbonUtil.convertGsonToTableInfo(properties.asJava)
Expand Down
Expand Up @@ -18,8 +18,11 @@

package org.apache.spark.sql

import java.net.URI

import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation
Expand Down Expand Up @@ -72,4 +75,10 @@ object CarbonToSparkAdapater {
ExpressionSet(filterPredicates)
.filter(_.references.subsetOf(partitionSet)))
}

def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat,
map: Map[String, String],
tablePath: String): CatalogStorageFormat = {
storageFormat.copy(properties = map, locationUri = Some(tablePath))
}
}
Expand Up @@ -18,8 +18,11 @@

package org.apache.spark.sql

import java.net.URI

import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.catalyst.optimizer.OptimizeCodegen
Expand Down Expand Up @@ -79,4 +82,10 @@ object CarbonToSparkAdapater {
def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = {
Seq(OptimizeCodegen(conf))
}

def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat,
map: Map[String, String],
tablePath: String): CatalogStorageFormat = {
storageFormat.copy(properties = map, locationUri = Some(new URI(tablePath)))
}
}
Expand Up @@ -18,8 +18,11 @@

package org.apache.spark.sql

import java.net.URI

import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
import org.apache.spark.sql.catalyst.rules.Rule
Expand Down Expand Up @@ -81,4 +84,10 @@ object CarbonToSparkAdapater {
def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = {
Seq.empty
}

def getUpdatedStorageFormat(storageFormat: CatalogStorageFormat,
map: Map[String, String],
tablePath: String): CatalogStorageFormat = {
storageFormat.copy(properties = map, locationUri = Some(new URI(tablePath)))
}
}
Expand Up @@ -18,7 +18,7 @@ package org.apache.carbondata.spark.util

import org.apache.spark.sql.common.util.Spark2QueryTest
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.scalatest.BeforeAndAfterAll

import org.apache.carbondata.core.constants.CarbonCommonConstants
Expand Down Expand Up @@ -82,6 +82,7 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
override def beforeAll {
sql("drop table if exists sample")
sql("drop table if exists complextypes")
sql("drop table if exists tabletest")
buildTestData
// second time comment this line
buildTable
Expand Down Expand Up @@ -167,9 +168,20 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
DictionaryTestCaseUtil.
checkDictionary(complexRelation, "channelsId", "1650")
}


test("test create table thorugh 'using carbondata' and load data") {
sql(
"CREATE TABLE tabletest (empno INT, workgroupcategory STRING, deptno INT, projectcode INT, " +
"attendance INT) USING carbondata")
sql(
s"""LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE tabletest OPTIONS
|('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""".stripMargin)
checkAnswer(sql("select count(*) from tabletest"), Row(10))
}

override def afterAll {
sql("drop table sample")
sql("drop table complextypes")
sql("drop table tabletest")
}
}

0 comments on commit fa7e242

Please sign in to comment.