Skip to content
Permalink
Browse files
[CARBONDATA-4296]: schema evolution, enforcement and deduplication ut…
…ilities added

Why is this PR needed?
This PR adds schema enforcement, schema evolution and deduplication capabilities for
carbondata streamer tool specifically. For the existing IUD scenarios, some work
needs to be done to handle it completely, for example -
1. passing default values and storing them in table properties.

Changes proposed for the phase 2 -
1. Handling delete use cases with upsert operation/command itself. Right now we
consider update as delete + insert. With the new streamer tool, it is possible that
user sets upsert as the operation type and incoming stream has delete records as well.
What changes were proposed in this PR?

Configs and utility methods are added for the following use cases -
1. Schema enforcement
2. Schema evolution - add column, delete column, data type change scenario
3. Deduplicate the incoming dataset against incoming dataset itself. This is useful
in scenarios where incoming stream of data has multiple updates for the same record
and we want to pick the latest.
4. Deduplicate the incoming dataset against existing target dataset. This is useful
when operation type is set as INSERT and user does not want to insert duplicate records.

This closes #4227
  • Loading branch information
pratyakshsharma authored and kunal642 committed Nov 15, 2021
1 parent 07b41a5 commit 3be05d2a44d805cf763df05cbeacce2d90a44da0
Showing 6 changed files with 824 additions and 100 deletions.
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.common.exceptions.sql;

import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;

@InterfaceAudience.User
@InterfaceStability.Stable
public class CarbonSchemaException extends Exception {

private static final long serialVersionUID = 1L;

private final String message;

public CarbonSchemaException(String message) {
super(message);
this.message = message;
}

public String getMessage() {
return this.message;
}
}
@@ -2681,4 +2681,58 @@ private CarbonCommonConstants() {

public static final String CARBON_CDC_MINMAX_PRUNING_ENABLED_DEFAULT = "false";

//////////////////////////////////////////////////////////////////////////////////////////
// CDC streamer configs start here
//////////////////////////////////////////////////////////////////////////////////////////

/**
* Name of the field from source schema whose value can be used for picking the latest updates for
* a particular record in the incoming batch in case of duplicates record keys. Useful if the
* write operation type is UPDATE or UPSERT. This will be used only if
* carbon.streamer.upsert.deduplicate is enabled.
*/
@CarbonProperty public static final String CARBON_STREAMER_SOURCE_ORDERING_FIELD =
"carbon.streamer.source.ordering.field";

public static final String CARBON_STREAMER_SOURCE_ORDERING_FIELD_DEFAULT = "";

/**
* This property specifies if the incoming batch needs to be deduplicated in case of INSERT
* operation type. If set to true, the incoming batch will be deduplicated against the existing
* data in the target carbondata table.
*/
@CarbonProperty public static final String CARBON_STREAMER_INSERT_DEDUPLICATE =
"carbon.streamer.insert.deduplicate";

public static final String CARBON_STREAMER_INSERT_DEDUPLICATE_DEFAULT = "false";

/**
* This property specifies if the incoming batch needs to be deduplicated (when multiple updates
* for the same record key are present in the incoming batch) in case of UPSERT/UPDATE operation
* type. If set to true, the user needs to provide proper value for the source ordering field as
* well.
*/
@CarbonProperty public static final String CARBON_STREAMER_UPSERT_DEDUPLICATE =
"carbon.streamer.upsert.deduplicate";

public static final String CARBON_STREAMER_UPSERT_DEDUPLICATE_DEFAULT = "true";

/**
* The metadata columns coming from the source stream data, which should not be included in the
* target data.
*/
@CarbonProperty public static final String CARBON_STREAMER_META_COLUMNS =
"carbon.streamer.meta.columns";

/**
* This flag decides if table schema needs to change as per the incoming batch schema.
* If set to true, incoming schema will be validated with existing table schema.
* If the schema has evolved, the incoming batch cannot be ingested and
* job will simply fail.
*/
@CarbonProperty public static final String CARBON_ENABLE_SCHEMA_ENFORCEMENT =
"carbon.enable.schema.enforcement";

public static final String CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT = "true";

}
@@ -29,7 +29,7 @@ import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, CarbonToSparkAdapter, Column, DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.{AnalysisException, CarbonEnv, CarbonToSparkAdapter, Column, DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.avro.AvroFileFormatFactory
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
@@ -43,6 +43,7 @@ import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructT
import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata, LongAccumulator}

import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -98,8 +99,35 @@ case class CarbonMergeDataSetCommand(
throw new UnsupportedOperationException(
"Carbon table supposed to be present in merge dataset")
}

val properties = CarbonProperties.getInstance()
if (operationType != null) {
val filterDupes = properties
.getProperty(CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE,
CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE_DEFAULT).toBoolean
val isSchemaEnforcementEnabled = properties
.getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT,
CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean
if (
!MergeOperationType.withName(operationType.toUpperCase).equals(MergeOperationType.INSERT) &&
filterDupes) {
throw new MalformedCarbonCommandException("property CARBON_STREAMER_INSERT_DEDUPLICATE" +
" should only be set with operation type INSERT")
}
if (isSchemaEnforcementEnabled) {
// call the util function to verify if incoming schema matches with target schema
CarbonMergeDataSetUtil.verifySourceAndTargetSchemas(targetDsOri, srcDS)
} else {
CarbonMergeDataSetUtil.handleSchemaEvolution(
targetDsOri, srcDS, sparkSession)
}
}

// Target dataset must be backed by carbondata table.
val targetCarbonTable = relations.head.carbonRelation.carbonTable
val tgtTable = relations.head.carbonRelation.carbonTable
val targetCarbonTable: CarbonTable = CarbonEnv.getCarbonTable(Option(tgtTable.getDatabaseName),
tgtTable.getTableName)(sparkSession)

// select only the required columns, it can avoid lot of and shuffling.
val targetDs = if (mergeMatches == null && operationType != null) {
targetDsOri.select(keyColumn)
@@ -149,11 +177,29 @@ case class CarbonMergeDataSetCommand(
joinColumns.map(srcDS.col): _*)
} else {
srcDS
}
}

// deduplicate the incoming dataset
// TODO: handle the case for partial updates
val orderingField = properties.getProperty(
CarbonCommonConstants.CARBON_STREAMER_SOURCE_ORDERING_FIELD,
CarbonCommonConstants.CARBON_STREAMER_SOURCE_ORDERING_FIELD_DEFAULT)
val deduplicatedSrcDs = if (keyColumn != null) {
CarbonMergeDataSetUtil.deduplicateBeforeWriting(repartitionedSrcDs,
targetDs,
sparkSession,
sourceAliasName,
targetDsAliasName,
keyColumn,
orderingField,
targetCarbonTable)
} else {
repartitionedSrcDs
}

// cache the source data as we will be scanning multiple times
repartitionedSrcDs.cache()
val deDuplicatedRecords = repartitionedSrcDs.count()
deduplicatedSrcDs.cache()
val deDuplicatedRecords = deduplicatedSrcDs.count()
LOGGER.info(s"Number of records from source data: $deDuplicatedRecords")
// Create accumulators to log the stats
val stats = Stats(createLongAccumulator("insertedRows"),
@@ -221,7 +267,7 @@ case class CarbonMergeDataSetCommand(
new util.LinkedHashMap[String, util.List[FilePathMinMaxVO]]
val colToSplitsFilePathAndMinMaxMap: mutable.Map[String, util.List[FilePathMinMaxVO]] =
CarbonMergeDataSetUtil.getSplitsAndLoadToCache(targetCarbonTable,
repartitionedSrcDs,
deduplicatedSrcDs,
columnMinMaxInBlocklet,
columnToIndexMap,
sparkSession)
@@ -281,7 +327,7 @@ case class CarbonMergeDataSetCommand(
// find the file paths to scan.
finalCarbonFilesToScan = CarbonMergeDataSetUtil.getFilesToScan(joinCarbonColumns,
joinColumnToTreeMapping,
repartitionedSrcDs)
deduplicatedSrcDs)

LOGGER.info(s"Finished min-max pruning. Carbondata files to scan during merge is: ${
finalCarbonFilesToScan.length}")
@@ -298,7 +344,7 @@ case class CarbonMergeDataSetCommand(
targetDs
.withColumn(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, expr("getTupleId()"))
.where(s"getBlockPaths('${finalCarbonFilesToScan.mkString(",")}')")
.join(repartitionedSrcDs.select(keyColumn),
.join(deduplicatedSrcDs.select(keyColumn),
expr(s"$targetDsAliasName.$keyColumn = $sourceAliasName.$keyColumn"),
joinType)
} else {
@@ -308,7 +354,7 @@ case class CarbonMergeDataSetCommand(
if (!isInsertOperation) {
targetDs
.withColumn(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, expr("getTupleId()"))
.join(repartitionedSrcDs.select(keyColumn),
.join(deduplicatedSrcDs.select(keyColumn),
expr(s"$targetDsAliasName.$keyColumn = $sourceAliasName.$keyColumn"),
joinType)
} else {
@@ -318,21 +364,21 @@ case class CarbonMergeDataSetCommand(
val mergeHandler: MergeHandler =
MergeOperationType.withName(operationType.toUpperCase) match {
case MergeOperationType.UPSERT =>
UpsertHandler(sparkSession, frame, targetCarbonTable, stats, repartitionedSrcDs)
UpsertHandler(sparkSession, frame, targetCarbonTable, stats, deduplicatedSrcDs)
case MergeOperationType.UPDATE =>
UpdateHandler(sparkSession, frame, targetCarbonTable, stats, repartitionedSrcDs)
UpdateHandler(sparkSession, frame, targetCarbonTable, stats, deduplicatedSrcDs)
case MergeOperationType.DELETE =>
DeleteHandler(sparkSession, frame, targetCarbonTable, stats, repartitionedSrcDs)
DeleteHandler(sparkSession, frame, targetCarbonTable, stats, deduplicatedSrcDs)
case MergeOperationType.INSERT =>
InsertHandler(sparkSession, frame, targetCarbonTable, stats, repartitionedSrcDs)
InsertHandler(sparkSession, frame, targetCarbonTable, stats, deduplicatedSrcDs)
}

// execute merge handler
mergeHandler.handleMerge()
LOGGER.info(
" Time taken to merge data :: " + (System.currentTimeMillis() - st))
// clear the cached src
repartitionedSrcDs.unpersist()
deduplicatedSrcDs.unpersist()
return Seq()
}
// validate the merge matches and actions.
@@ -354,15 +400,15 @@ case class CarbonMergeDataSetCommand(
.withColumn(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, expr("getTupleId()"))
.withColumn("exist_on_target", lit(1))
.where(s"getBlockPaths('${finalCarbonFilesToScan.mkString(",")}')")
.join(repartitionedSrcDs.withColumn("exist_on_src", lit(1)),
.join(deduplicatedSrcDs.withColumn("exist_on_src", lit(1)),
mergeMatches.joinExpr,
joinType)
.withColumn(status_on_mergeds, condition)
} else {
targetDs
.withColumn(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, expr("getTupleId()"))
.withColumn("exist_on_target", lit(1))
.join(repartitionedSrcDs.withColumn("exist_on_src", lit(1)),
.join(deduplicatedSrcDs.withColumn("exist_on_src", lit(1)),
mergeMatches.joinExpr,
joinType)
.withColumn(status_on_mergeds, condition)
@@ -451,7 +497,7 @@ case class CarbonMergeDataSetCommand(
HorizontalCompaction.tryHorizontalCompaction(
sparkSession, targetCarbonTable)
// clear the cached src
repartitionedSrcDs.unpersist()
deduplicatedSrcDs.unpersist()
Seq.empty
}

0 comments on commit 3be05d2

Please sign in to comment.