Skip to content
Permalink
Browse files
[CARBONDATA-4292] Spatial index creation using spark dataframe
Why is this PR needed?
To support spatial index creation using spark data frame

What changes were proposed in this PR?
Added spatial properties in carbonOptions and edited existing testcases.

Does this PR introduce any user interface change?
Yes

Is any new testcase added?
Yes

This closes #4222
  • Loading branch information
ShreelekhyaG authored and Indhumathi27 committed Oct 12, 2021
1 parent 8b3d78b commit b8d9a97926db77a8d75c8988b88c9d5f74df3415
Show file tree
Hide file tree
Showing 5 changed files with 467 additions and 250 deletions.
@@ -74,6 +74,30 @@ create table source_index(id BIGINT, latitude long, longitude long) stored by 'c
'SPATIAL_INDEX.mygeohash.gridSize'='50',
'SPATIAL_INDEX.mygeohash.conversionRatio'='1000000');
```

Create spatial table using spark dataframe

val geoSchema = StructType(Seq(StructField("timevalue", LongType, nullable = true),
StructField("longitude", LongType, nullable = false),
StructField("latitude", LongType, nullable = false)))

val geoDf = sqlContext.read.option("delimeter", ",").option("header", "true").schema(geoSchema)
.csv(s"$resourcesPath/geodata.csv")

geoDf.write
.format("carbondata")
.option("tableName", "geo1")
.option("SPATIAL_INDEX", "mygeohash")
.option("SPATIAL_INDEX.mygeohash.type", "geohash")
.option("SPATIAL_INDEX.mygeohash.sourcecolumns", "longitude, latitude")
.option("SPATIAL_INDEX.mygeohash.originLatitude", "39.832277")
.option("SPATIAL_INDEX.mygeohash.gridSize", "50")
.option("SPATIAL_INDEX.mygeohash.conversionRatio", "1000000")
.option("SPATIAL_INDEX.mygeohash.class", "org.apache.carbondata.geo.GeoHashIndex")
.mode(SaveMode.Overwrite)
.save()


Note:
* `mygeohash` in the above example represent the index name.
* Columns present in spatial_index table properties cannot be altered
@@ -103,6 +127,9 @@ Load/Insert with custom geoId
insert into source_index select 0, 1,116.285807,40.084087;
```

Note:
* Load custom geoId values using dataframe is not supported.

### Select Query

Query with Polygon UDF predicate
@@ -84,5 +84,27 @@ class CarbonOption(options: Map[String, String]) {

lazy val dateformat: Option[String] = options.get("dateformat")

lazy val SPATIAL_INDEX: Option[String] = options.get("SPATIAL_INDEX")

val indexName = SPATIAL_INDEX.getOrElse("")

lazy val SPATIAL_INDEX_type: Option[String] = options.get(
s"${ CarbonCommonConstants.SPATIAL_INDEX }.$indexName.type")

lazy val SPATIAL_INDEX_sourcecolumns: Option[String] = options.get(
s"${ CarbonCommonConstants.SPATIAL_INDEX }.$indexName.sourcecolumns")

lazy val SPATIAL_INDEX_originLatitude: Option[String] = options.get(
s"${ CarbonCommonConstants.SPATIAL_INDEX }.$indexName.originLatitude")

lazy val SPATIAL_INDEX_gridSize: Option[String] = options.get(
s"${ CarbonCommonConstants.SPATIAL_INDEX }.$indexName.gridSize")

lazy val SPATIAL_INDEX_conversionRatio: Option[String] = options.get(
s"${ CarbonCommonConstants.SPATIAL_INDEX }.$indexName.conversionRatio")

lazy val SPATIAL_INDEX_class: Option[String] = options.get(
s"${ CarbonCommonConstants.SPATIAL_INDEX }.$indexName.class")

def toMap: Map[String, String] = options
}
@@ -23,6 +23,7 @@ import org.apache.spark.sql.util.CarbonException

import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonType}
import org.apache.carbondata.spark.CarbonOption
@@ -82,6 +83,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
}

private def makeCreateTableString(schema: StructType, options: CarbonOption): String = {
val indexName = options.indexName
val property = Map(
"SORT_COLUMNS" -> options.sortColumns,
"SORT_SCOPE" -> options.sortScope,
@@ -91,7 +93,18 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
"TABLE_PAGE_SIZE_INMB" -> options.tablePageSizeInMb,
"STREAMING" -> Option(options.isStreaming.toString),
"DATEFORMAT" -> options.dateformat,
"TIMESTAMPFORMAT" -> options.timestampformat
"TIMESTAMPFORMAT" -> options.timestampformat,
"SPATIAL_INDEX" -> options.SPATIAL_INDEX,
s"${ CarbonCommonConstants.SPATIAL_INDEX }.$indexName.type" -> options.SPATIAL_INDEX_type,
s"${ CarbonCommonConstants.SPATIAL_INDEX }.$indexName.sourcecolumns" ->
options.SPATIAL_INDEX_sourcecolumns,
s"${ CarbonCommonConstants.SPATIAL_INDEX }.$indexName.originLatitude" ->
options.SPATIAL_INDEX_originLatitude,
s"${ CarbonCommonConstants.SPATIAL_INDEX }.$indexName.gridSize" ->
options.SPATIAL_INDEX_gridSize,
s"${ CarbonCommonConstants.SPATIAL_INDEX }.$indexName.conversionRatio" ->
options.SPATIAL_INDEX_conversionRatio,
s"${ CarbonCommonConstants.SPATIAL_INDEX }.$indexName.class" -> options.SPATIAL_INDEX_class
).filter(_._2.isDefined)
.map(property => s"'${property._1}' = '${property._2.get}'").mkString(",")

@@ -922,7 +922,11 @@ object CommonLoadUtils {
.map(columnName => columnName.toLowerCase())
attributes.filterNot(a => staticPartCols.contains(a.name.toLowerCase))
}
if (expectedColumns.length != dfAttributes.length) {
val spatialProperty = catalogTable.properties.get(CarbonCommonConstants.SPATIAL_INDEX)
// For spatial table, dataframe attributes will not contain geoId column.
val isSpatialTable = spatialProperty.isDefined && spatialProperty.nonEmpty &&
dfAttributes.length + 1 == expectedColumns.size
if (expectedColumns.length != dfAttributes.length && !isSpatialTable) {
throw new AnalysisException(
s"Cannot insert into table $loadParams.tableName because the number of columns are " +
s"different: " +

0 comments on commit b8d9a97

Please sign in to comment.