Skip to content

Commit

Permalink
test using carbondata
Browse files Browse the repository at this point in the history
  • Loading branch information
Indhumathi27 committed Dec 20, 2018
1 parent d7ff3e6 commit e00319b
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql

import java.net.URI
import java.util.Locale

import scala.collection.JavaConverters._
Expand All @@ -25,7 +26,7 @@ import scala.language.implicitConversions

import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.{TableModel, TableNewProcessor}
import org.apache.spark.sql.execution.strategy.CarbonLateDecodeStrategy
Expand All @@ -37,6 +38,7 @@ import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.SparkUtil

import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
Expand Down Expand Up @@ -331,7 +333,12 @@ object CarbonSource {
properties,
query)
// updating params
val updatedFormat = storageFormat.copy(properties = map)
var updatedFormat: CatalogStorageFormat = null
if (SparkUtil.isSparkVersionXandAbove("2.2") && tableDesc.storage.locationUri.isEmpty) {
updatedFormat = CarbonToSparkAdapater.getStorageFormat(storageFormat, map, tablePath)
} else {
updatedFormat = storageFormat.copy(properties = map)
}
tableDesc.copy(storage = updatedFormat)
} else {
val tableInfo = CarbonUtil.convertGsonToTableInfo(properties.asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
case _ => throw new NoSuchTableException(database, tableIdentifier.table)
}
val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
catalogTable.location.toString, database, tableIdentifier.table)
catalogTable.location.toString, database, tableIdentifier.table)
CarbonEnv.getInstance(sparkSession).carbonMetaStore.
createCarbonRelation(catalogTable.storage.properties, identifier, sparkSession)
case _ => throw new NoSuchTableException(database, tableIdentifier.table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

package org.apache.spark.sql

import java.net.URI

import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation
Expand Down Expand Up @@ -72,4 +75,10 @@ object CarbonToSparkAdapater {
ExpressionSet(filterPredicates)
.filter(_.references.subsetOf(partitionSet)))
}

def getStorageFormat(storageFormat: CatalogStorageFormat,
map: Map[String, String],
tablePath: String): CatalogStorageFormat = {
storageFormat.copy(properties = map, locationUri = Some(tablePath))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

package org.apache.spark.sql

import java.net.URI

import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.catalyst.optimizer.OptimizeCodegen
Expand Down Expand Up @@ -79,4 +82,10 @@ object CarbonToSparkAdapater {
def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = {
Seq(OptimizeCodegen(conf))
}

def getStorageFormat(storageFormat: CatalogStorageFormat,
map: Map[String, String],
tablePath: String): CatalogStorageFormat = {
storageFormat.copy(properties = map, locationUri = new URI(tablePath))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

package org.apache.spark.sql

import java.net.URI

import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
import org.apache.spark.sql.catalyst.rules.Rule
Expand Down Expand Up @@ -81,4 +84,10 @@ object CarbonToSparkAdapater {
def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = {
Seq.empty
}

def getStorageFormat(storageFormat: CatalogStorageFormat,
map: Map[String, String],
tablePath: String): CatalogStorageFormat = {
storageFormat.copy(properties = map, locationUri = new URI(tablePath))
}
}

0 comments on commit e00319b

Please sign in to comment.