Skip to content

Commit

Permalink
block rebuilding for bloomfilter/lucene/preagg datamap
Browse files Browse the repository at this point in the history
block rebuilding for bloomfilter/lucene/preagg datamap
  • Loading branch information
xuchuanyin committed Aug 6, 2018
1 parent e26a742 commit 1da08df
Show file tree
Hide file tree
Showing 13 changed files with 151 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,5 @@ public DataMapCatalog createDataMapCatalog() {

public abstract DataMapFactory getDataMapFactory();

public abstract boolean supportRebuild();
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,10 @@ public boolean isOperationBlocked(TableOperation operation, Object... targets) {
return false;
}

/**
* whether this datamap support rebuild
*/
public boolean supportRebuild() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,6 @@ class MVDataMapProvider(
override def getDataMapFactory: DataMapFactory[_ <: DataMap[_ <: Blocklet]] = {
throw new UnsupportedOperationException
}

override def supportRebuild(): Boolean = true
}
19 changes: 11 additions & 8 deletions docs/datamap/datamap-management.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
this work for additional information regarding copyright ownership.
The ASF licenses this file to you under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
See the License for the specific language governing permissions and
limitations under the License.
-->

Expand All @@ -38,8 +38,8 @@ Currently, there are 5 DataMap implementation in CarbonData.
| preaggregate | single table pre-aggregate table | No DMPROPERTY is required | Automatic |
| timeseries | time dimension rollup table. | event_time, xx_granularity, please refer to [Timeseries DataMap](https://github.com/apache/carbondata/blob/master/docs/datamap/timeseries-datamap-guide.md) | Automatic |
| mv | multi-table pre-aggregate table, | No DMPROPERTY is required | Manual |
| lucene | lucene indexing for text column | index_columns to specifying the index columns | Manual/Automatic |
| bloomfilter | bloom filter for high cardinality column, geospatial column | index_columns to specifying the index columns | Manual/Automatic |
| lucene | lucene indexing for text column | index_columns to specifying the index columns | Automatic |
| bloomfilter | bloom filter for high cardinality column, geospatial column | index_columns to specifying the index columns | Automatic |

## DataMap Management

Expand All @@ -48,6 +48,9 @@ There are two kinds of management semantic for DataMap.
1. Automatic Refresh: Create datamap without `WITH DEFERED REBUILD` in the statement, which is by default.
2. Manual Refresh: Create datamap with `WITH DEFERED REBUILD` in the statement

**CAUTION:**
Manual refresh currently only works fine for MV, it has some bugs with other types of datamap in Carbondata 1.4.1.

### Automatic Refresh

When user creates a datamap on the main table without using `WITH DEFERED REBUILD` syntax, the datamap will be managed by system automatically.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,22 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
sql("drop datamap dm on table datamap_test")
}

test("test lucene rebuild data map") {
// for CARBONDATA-2820, we will first block deferred rebuild for lucene
test("test block rebuild for lucene") {
sql(
s"""
| CREATE DATAMAP dm ON TABLE datamap_test
| USING 'lucene'
| DMProperties('INDEX_COLUMNS'='city')
""".stripMargin)
val exception = intercept[MalformedDataMapCommandException] {
sql(s"REBUILD DATAMAP dm ON TABLE datamap_test")
}
sql("drop datamap dm on table datamap_test")
assert(exception.getMessage.contains("provider 'lucene' does not support rebuild"))
}

ignore("test lucene rebuild data map") {
sql("DROP TABLE IF EXISTS datamap_test4")
sql(
"""
Expand Down Expand Up @@ -658,7 +673,7 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
assert(ex6.getMessage.contains("Delete operation is not supported"))
}

test("test lucene fine grain multiple data map on table") {
ignore("test lucene fine grain multiple data map on table") {
sql("DROP TABLE IF EXISTS datamap_test5")
sql(
"""
Expand Down Expand Up @@ -691,7 +706,7 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS datamap_test5")
}

test("test lucene fine grain datamap rebuild") {
ignore("test lucene fine grain datamap rebuild") {
sql("DROP TABLE IF EXISTS datamap_test5")
sql(
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -964,4 +964,28 @@ test("check load and select for avg double datatype") {
.addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
}

test("test rebuild is not supported for preagg") {
val baseTable = "maintable"
val preagg = "preaggtable"
sql(s"DROP TABLE IF EXISTS $baseTable")
sql(
s"""
| CREATE TABLE $baseTable(id int, name string, city string, age int)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
sql(
s"""
| CREATE DATAMAP $preagg ON TABLE $baseTable
| USING 'preaggregate'
| AS select id, sum(age) from $baseTable group by id
""".stripMargin)
sql(s"LOAD DATA LOCAL INPATH '$testData' into table $baseTable")
checkExistence(sql(s"SHOW DATAMAP ON TABLE $baseTable"), true, preagg, "preaggregate")
val exception = intercept[MalformedDataMapCommandException] {
sql(s"REBUILD DATAMAP $preagg ON TABLE $baseTable").show()
}
LOGGER.error(s"XU ${exception.getMessage}")
assert(exception.getMessage.contains("provider 'preaggregate' does not support rebuild"))
sql(s"DROP TABLE IF EXISTS $baseTable")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -310,4 +310,6 @@ class TestDataMapFactory(
}
}
}

override def supportRebuild(): Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,9 @@ private DataMapFactory<? extends DataMap> createDataMapFactory()
public DataMapFactory getDataMapFactory() {
return dataMapFactory;
}

@Override
public boolean supportRebuild() {
return dataMapFactory.supportRebuild();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,9 @@ public void rebuild() {
public DataMapFactory getDataMapFactory() {
throw new UnsupportedOperationException();
}

@Override
public boolean supportRebuild() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.DataCommand

import org.apache.carbondata.core.datamap.{DataMapRegistry, DataMapStoreManager}
import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
import org.apache.carbondata.core.datamap.status.DataMapStatusManager
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.datamap.{DataMapManager, IndexDataMapRebuildRDD}
import org.apache.carbondata.datamap.DataMapManager
import org.apache.carbondata.events.{UpdateDataMapPostExecutionEvent, _}

/**
Expand All @@ -36,7 +36,15 @@ case class CarbonDataMapRebuildCommand(
tableIdentifier: Option[TableIdentifier]) extends DataCommand {

override def processData(sparkSession: SparkSession): Seq[Row] = {
val schema = DataMapStoreManager.getInstance().getDataMapSchema(dataMapName)
import scala.collection.JavaConverters._
val schemaOption = CarbonDataMapShowCommand(tableIdentifier).getAllDataMaps(sparkSession)
.asScala
.find(p => p.getDataMapName.equalsIgnoreCase(dataMapName))
if (schemaOption.isEmpty) {
throw new MalformedDataMapCommandException(
s"Datamap with name $dataMapName does not exist on table ${tableIdentifier.get.table}")
}
val schema = schemaOption.get

val table = tableIdentifier match {
case Some(identifier) =>
Expand All @@ -47,7 +55,13 @@ case class CarbonDataMapRebuildCommand(
schema.getRelationIdentifier.getTableName
)(sparkSession)
}

val provider = DataMapManager.get().getDataMapProvider(table, schema, sparkSession)
if (!provider.supportRebuild()) {
throw new MalformedDataMapCommandException(
s"Datamap with name $dataMapName and provider '${schema.getProviderName}'" +
s" does not support rebuild")
}
provider.rebuild()

// After rebuild successfully enable the datamap.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ case class CarbonDataMapShowCommand(tableIdentifier: Option[TableIdentifier])
}

override def processData(sparkSession: SparkSession): Seq[Row] = {
convertToRow(getAllDataMaps(sparkSession))
}

/**
* get all datamaps for this table, including preagg, index datamaps and mv
*/
def getAllDataMaps(sparkSession: SparkSession): util.List[DataMapSchema] = {
val dataMapSchemaList: util.List[DataMapSchema] = new util.ArrayList[DataMapSchema]()
tableIdentifier match {
case Some(table) =>
Expand All @@ -59,10 +66,10 @@ case class CarbonDataMapShowCommand(tableIdentifier: Option[TableIdentifier])
if (!indexSchemas.isEmpty) {
dataMapSchemaList.addAll(indexSchemas)
}
convertToRow(dataMapSchemaList)
case _ =>
convertToRow(DataMapStoreManager.getInstance().getAllDataMapSchemas)
dataMapSchemaList.addAll(DataMapStoreManager.getInstance().getAllDataMapSchemas)
}
dataMapSchemaList
}

private def convertToRow(schemaList: util.List[DataMapSchema]) = {
Expand Down
28 changes: 28 additions & 0 deletions integration/spark2/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
log4j.rootLogger=INFO,CON
#log4j.appender.R5=org.apache.carbondata.common.logging.impl.AuditExtendedRollingFileAppender
#log4j.appender.R5.File=./carbondataaudit.log
#log4j.appender.R5.threshold=INFO
#log4j.appender.R5.layout=org.apache.log4j.PatternLayout
#log4j.appender.R5.layout.ConversionPattern=%d [%t] %p [%c] %X{CLIENT_IP} %X{USER_NAME} %X{MODULE} %X{OPERATRION}- %m%n

log4j.appender.CON=org.apache.log4j.ConsoleAppender
log4j.appender.CON.layout=org.apache.log4j.PatternLayout
log4j.appender.CON.layout.ConversionPattern=%d [%t] %p [%c] %X{CLIENT_IP} %X{USER_NAME} %X{MODULE} %X{OPERATRION}- %m%n
log4j.appender.CON.threshold=DEBUG
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,27 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll with
sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
}

test("test create bloom datamap and REBUILD DATAMAP") {
// for CARBONDATA-2820, we will first block deferred rebuild for bloom
test("test block rebuild for bloom") {
sql(
s"""
| CREATE TABLE $bloomDMSampleTable(id INT, name STRING, city STRING, age INT,
| s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING)
| STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128')
| """.stripMargin)
sql(
s"""
| CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable
| USING 'bloomfilter'
| DMProperties('INDEX_COLUMNS'='city,id', 'BLOOM_SIZE'='640000')
""".stripMargin)
val exception = intercept[MalformedDataMapCommandException] {
sql(s"REBUILD DATAMAP $dataMapName ON TABLE $bloomDMSampleTable")
}
assert(exception.getMessage.contains("provider 'bloomfilter' does not support rebuild"))
}

ignore("test create bloom datamap and REBUILD DATAMAP") {
sql(
s"""
| CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT,
Expand Down Expand Up @@ -219,7 +239,7 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll with
sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
}

test("test create bloom datamap with DEFERRED REBUILD, query hit datamap") {
ignore("test create bloom datamap with DEFERRED REBUILD, query hit datamap") {
sql(
s"""
| CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT,
Expand Down Expand Up @@ -297,7 +317,7 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll with
sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
}

test("test create bloom datamap with DEFERRED REBUILD, query not hit datamap") {
ignore("test create bloom datamap with DEFERRED REBUILD, query not hit datamap") {
sql(
s"""
| CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT,
Expand Down

0 comments on commit 1da08df

Please sign in to comment.