Skip to content
Permalink
Browse files
[CARBONDATA-4119][CARBONDATA-4238][CARBONDATA-4237][CARBONDATA-4236] …
…Support geo insert without geoId and document changes

Why is this PR needed?
1. To insert without geoid (like load) on geo table.
2. [CARBONDATA-4119] : User Input for GeoID column not validated.
3. [CARBONDATA-4238] : Documentation Issue in ddl-of-carbondata.md#add-columns
4. [CARBONDATA-4237] : Documentation issues in streaming-guide.md, file-structure-of-carbondata.md and sdk-guide.md.
5. [CARBONDATA-4236] : Documenatation issues in configuration-parameters.md.
6. import processing class in streaming-guide.md is wrong

What changes were proposed in this PR?
1. Made changes to support insert on geo table with auto-generated geoId.
2. [CARBONDATA-4119] : Added documentation about insert with custom geoId. Changes in docs/spatial-index-guide.md
3. Other documentation changes added.

Does this PR introduce any user interface change?
No

Is any new testcase added?
Yes

This closes #4205
  • Loading branch information
ShreelekhyaG authored and Indhumathi27 committed Aug 24, 2021
1 parent 9f9ea1f commit 8de65a2767a1d20ff3cad947303c686a0017df43
Showing 10 changed files with 60 additions and 25 deletions.
@@ -2527,9 +2527,9 @@ private CarbonCommonConstants() {

/**
* Default value for SI segment Compaction / merge small files
* Making this true degrade the LOAD performance
* Making this true degrades the LOAD performance
* When the number of small files increase for SI segments(it can happen as number of columns will
* be less and we store position id and reference columns), user an either set to true which will
* be less and we store position id and reference columns), user can either set to true which will
* merge the data files for upcoming loads or run SI rebuild command which does this job for all
* segments. (REBUILD INDEX <index_table>)
*/
@@ -116,8 +116,8 @@ This section provides the details of all the configurations required for the Car
| carbon.enable.page.level.reader.in.compaction|false|Enabling page level reader for compaction reduces the memory usage while compacting more number of segments. It allows reading only page by page instead of reading whole blocklet to memory. **NOTE:** Please refer to [file-structure-of-carbondata](./file-structure-of-carbondata.md#carbondata-file-format) to understand the storage format of CarbonData and concepts of pages.|
| carbon.concurrent.compaction | true | Compaction of different tables can be executed concurrently. This configuration determines whether to compact all qualifying tables in parallel or not. **NOTE:** Compacting concurrently is a resource demanding operation and needs more resources there by affecting the query performance also. This configuration is **deprecated** and might be removed in future releases. |
| carbon.compaction.prefetch.enable | false | Compaction operation is similar to Query + data load where in data from qualifying segments are queried and data loading performed to generate a new single segment. This configuration determines whether to query ahead data from segments and feed it for data loading. **NOTE:** This configuration is disabled by default as it needs extra resources for querying extra data. Based on the memory availability on the cluster, user can enable it to improve compaction performance. |
| carbon.enable.range.compaction | true | To configure Ranges-based Compaction to be used or not for RANGE_COLUMN. If true after compaction also the data would be present in ranges. |
| carbon.si.segment.merge | false | Making this true degrade the LOAD performance. When the number of small files increase for SI segments(it can happen as number of columns will be less and we store position id and reference columns), user an either set to true which will merge the data files for upcoming loads or run SI refresh command which does this job for all segments. (REFRESH INDEX <index_table>) |
| carbon.enable.range.compaction | true | To configure Range-based Compaction to be used or not for RANGE_COLUMN. If true after compaction also the data would be present in ranges. |
| carbon.si.segment.merge | false | Making this true degrades the LOAD performance. When the number of small files increase for SI segments(it can happen as number of columns will be less and we store position id and reference columns), user can either set to true which will merge the data files for upcoming loads or run SI refresh command which does this job for all segments. (REFRESH INDEX <index_table>) |
| carbon.partition.data.on.tasklevel | false | When enabled, tasks launched for Local sort partition load will be based on one node one task. Compaction will be performed based on task level for a partition. Load performance might be degraded, because, the number of tasks launched is equal to number of nodes in case of local sort. For compaction, memory consumption will be less, as more number of tasks will be launched for a partition |

## Query Configuration
@@ -778,7 +778,7 @@ CarbonData DDL statements are documented here,which includes:
**NOTE:** Adding of only single-level Complex datatype columns(only array and struct) is supported.
Example -
```
ALTER TABLE <table-name> ADD COLUMNS(arrField array<array<int>>, structField struct<id1:string,name1:string>)
ALTER TABLE <table-name> ADD COLUMNS(arrField array<int>, structField struct<id1:string,name1:string>)
```
Users can specify which columns to include and exclude for local dictionary generation after adding new columns. These will be appended with the already existing local dictionary include and exclude columns of main table respectively.
@@ -46,14 +46,13 @@ The CarbonData files are stored in the location specified by the ***spark.sql.wa

The file directory structure is as below:

![File Directory Structure](../docs/images/2-1_1.png?raw=true)

1. ModifiedTime.mdt records the timestamp of the metadata with the modification time attribute of the file. When the drop table and create table are used, the modification time of the file is updated. This is common to all databases and hence is kept in parallel to databases
2. The **default** is the database name and contains the user tables.default is used when user doesn't specify any database name;else user configured database name will be the directory name. user_table is the table name.
3. Metadata directory stores schema files, tablestatus and dictionary files (including .dict, .dictmeta and .sortindex). There are three types of metadata data information files.
4. data and index files are stored under directory named **Fact**. The Fact directory has a Part0 partition directory, where 0 is the partition number.
5. There is a Segment_0 directory under the Part0 directory, where 0 is the segment number.
6. There are two types of files, carbondata and carbonindex, in the Segment_0 directory.
![File Directory Structure](../docs/images/2-1_1_latest.PNG?raw=true)

1. The **default** is the database name and contains the user tables.default is used when user doesn't specify any database name;else user configured database name will be the directory name. user_table is the table name.
2. Metadata directory stores schema files, tablestatus and segment details (includes .segment file for each segment). There are three types of metadata data information files.
3. data and index files are stored under directory named **Fact**. The Fact directory has a Part0 partition directory, where 0 is the partition number.
4. There is a Segment_0 directory under the Part0 directory, where 0 is the segment number.
5. There are two types of files, carbondata and carbonmergeindex, in the Segment_0 directory.



Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
@@ -742,9 +742,7 @@ int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
System.out.println(String.format("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t",
i, row[0], row[1], row[2], row[3], row[4], row[5],
new Date((day * ((int) row[6]))), new Timestamp((long) row[7] / 1000), row[8]
));
i, row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8]));
i++;
}
@@ -91,6 +91,15 @@ Note:
| SPATIAL_INDEX.xxx.conversionRatio | Conversion factor. It allows user to translate longitude and latitude to long. For example, if the data to load is longitude = 13.123456, latitude = 101.12356. User can configure conversion ratio sub-property value as 1000000, and change data to load as longitude = 13123456 and latitude = 10112356. Operations on long is much faster compared to floating-point numbers.|
| SPATIAL_INDEX.xxx.class | Optional user custom implementation class. Value is fully qualified class name.|

### Load/Insert
Load/Insert with no geoId column, then geoId will be generated internally.
```
insert into source_index select 1,116.285807,40.084087;
```
Load/Insert with custom geoId
```
insert into source_index select 0, 1,116.285807,40.084087;
```

### Select Query

@@ -62,7 +62,8 @@ Start spark-shell in new terminal, type :paste, then copy and run the following
import java.io.File
import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
import org.apache.spark.sql.streaming.Trigger.ProcessingTime
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.streaming.parser.CarbonStreamParser
@@ -148,6 +149,8 @@ TBLPROPERTIES('streaming'='true')
DESC FORMATTED streaming_table
```

NOTE: Streaming table doesn't support alter table schema operations such as alter add column, drop column, rename column, change datatype and rename table name.

## Alter streaming property
For an old table, use ALTER TABLE command to set the streaming property.
```sql
@@ -22,14 +22,15 @@ import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql._
import org.apache.spark.sql.CarbonExpressions.CarbonUnresolvedRelation
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, Expression, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.execution.command.LoadDataCommand
import org.apache.spark.sql.execution.command.mutation.CarbonProjectForDeleteCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.strategy.{CarbonPlanHelper, DMLHelper}
import org.apache.spark.sql.types.NullType
import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.CarbonReflectionUtils

@@ -267,17 +268,31 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi
child: LogicalPlan,
containsMultipleInserts: Boolean): LogicalPlan = {
val carbonDSRelation = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
if (carbonDSRelation.carbonRelation.output.size > CarbonCommonConstants
val carbonTable = carbonDSRelation.carbonRelation.carbonTable
val tableProperties = carbonTable.getTableInfo.getFactTable.getTableProperties
val spatialProperty = tableProperties.get(CarbonCommonConstants.SPATIAL_INDEX)
val expectedOutput = carbonDSRelation.carbonRelation.output
if (expectedOutput.size > CarbonCommonConstants
.DEFAULT_MAX_NUMBER_OF_COLUMNS) {
CarbonException.analysisException(
s"Maximum number of columns supported: " +
s"${CarbonCommonConstants.DEFAULT_MAX_NUMBER_OF_COLUMNS}")
}
var newLogicalPlan = child
if (spatialProperty != null && !spatialProperty.isEmpty &&
child.output.size + 1 == expectedOutput.size) {
newLogicalPlan = child.transform {
// To support insert sql to automatically generate GeoId if customized input is not given.
case p: Project =>
val geoId = Alias(Literal(null, NullType).asInstanceOf[Expression], "NULL")()
val list = Seq(geoId) ++ p.projectList
Project(list, p.child)
}
}
// In spark, PreprocessTableInsertion rule has below cast logic.
// It was missed in carbon when implemented insert into rules.
val actualOutput = child.output
val expectedOutput = carbonDSRelation.carbonRelation.output
var newChildOutput = child.output.zip(expectedOutput)
val actualOutput = newLogicalPlan.output
var newChildOutput = newLogicalPlan.output.zip(expectedOutput)
.map {
case (actual, expected) =>
if (expected.dataType.sameType(actual.dataType) &&
@@ -292,7 +307,7 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi
explicitMetadata = Option(expected.metadata))
}
} ++ actualOutput.takeRight(actualOutput.size - expectedOutput.size)
if (newChildOutput.size >= carbonDSRelation.carbonRelation.output.size ||
if (newChildOutput.size >= expectedOutput.size ||
carbonDSRelation.carbonTable.isHivePartitionTable) {
newChildOutput = newChildOutput.zipWithIndex.map { columnWithIndex =>
columnWithIndex._1 match {
@@ -301,10 +316,10 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi
case attr => attr
}
}
val newChild: LogicalPlan = if (newChildOutput == child.output) {
val newChild: LogicalPlan = if (newChildOutput == newLogicalPlan.output) {
throw new UnsupportedOperationException(s"Spark version $SPARK_VERSION is not supported")
} else {
Project(newChildOutput, child)
Project(newChildOutput, newLogicalPlan)
}

val overwrite = CarbonReflectionUtils.getOverWriteOption("overwrite", p)
@@ -868,6 +868,17 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
)
}

test("test insert with autogenerated geoid") {
createTable()
// insert without geoid
sql(s"insert into $table1 select 1575428400000,116285807,40084087")
// insert with customized geoid
sql(s"insert into $table1 select 0,1575428400000,116285807,40084087")
checkAnswer(sql(s"select *from $table1"),
Seq(Row(855280799612L, 1575428400000L, 116285807, 40084087),
Row(0, 1575428400000L, 116285807, 40084087)))
}

override def afterEach(): Unit = {
drop()
}

0 comments on commit 8de65a2

Please sign in to comment.