Skip to content
Permalink
Browse files
[CARBONDATA-4305] Support Carbondata Streamer tool for incremental fe…
…tch and merge from kafka and DFS Sources

Why is this PR needed?
In the current Carbondata CDC solution, if any user wants to integrate it with a streaming source then he
need to write a separate spark application to capture changes which is an overhead. We should be able to
incrementally capture the data changes from primary databases and should be able to incrementally ingest
the same in the data lake so that the overall latency decreases. The former is taken care of using
log-based CDC systems like Maxwell and Debezium. Here is a solution for the second aspect using Apache Carbondata.

What changes were proposed in this PR?
Carbondata streamer tool is a spark streaming application which enables users to incrementally ingest data
from various sources, like Kafka(standard pipeline would be like MYSQL => debezium => (kafka + Schema registry) => Carbondata Streamer tool)
and DFS into their data lakes. The tool comes with out-of-the-box support for almost all types of schema
evolution use cases. With the streamer tool only add column support is given with drop column and
other schema changes capability in line in the upcoming days. Please refer to design document for
more details about usage and working of the tool.

This closes #4235
  • Loading branch information
akashrn5 authored and kunal642 committed Nov 26, 2021
1 parent 3be05d2 commit 18840af9c1f7154b58e3c397dfc5a4440674bcee
Showing 15 changed files with 1,312 additions and 1 deletion.
@@ -2684,6 +2684,150 @@ private CarbonCommonConstants() {
//////////////////////////////////////////////////////////////////////////////////////////
// CDC streamer configs start here
//////////////////////////////////////////////////////////////////////////////////////////
/**
* The database name where the target table is present to merge the incoming data. If not given by
* user, system will take the current database in the spark session.
*/
@CarbonProperty
public static final String CARBON_STREAMER_DATABASE_NAME = "carbon.streamer.target.database";

/**
* The target carbondata table where the data has to be merged. If this is not configured by user,
* the operation will fail.
*/
@CarbonProperty
public static final String CARBON_STREAMER_TABLE_NAME = "carbon.streamer.target.table";

/**
* Source type to ingest data from. It can be kafka or DFS
*/
@CarbonProperty
public static final String CARBON_STREAMER_SOURCE_TYPE = "carbon.streamer.source.type";

public static final String CARBON_STREAMER_SOURCE_TYPE_DEFAULT = "kafka";

/**
* An absolute path on a given file system from where data needs to be read to ingest into the
* target carbondata table. Mandatory if the ingestion source type is DFS.
*/
@CarbonProperty
public static final String CARBON_STREAMER_DFS_INPUT_PATH = "carbon.streamer.dfs.input.path";

/**
* Schema registry url in case schema registry is selected as schema provider.
*/
@CarbonProperty
public static final String CARBON_STREAMER_SCHEMA_REGISTRY_URL = "schema.registry.url";

// **************** kafka properties constants *********************
/**
* Kafka topics to consume data from. Mandatory if Kafka is selected as the ingestion source.
* If multiple topic are present, the value of the property can be comma separated topic names.
* If not present in case of kafka source, operation will fail.
*/
@CarbonProperty
public static final String CARBON_STREAMER_KAFKA_INPUT_TOPIC =
"carbon.streamer.input.kafka.topic";

/**
* Kafka brokers to connect to in case Kafka is selected as an ingestion source. If not present in
* case of kafka source, operation will fail.
*/
@CarbonProperty
public static final String KAFKA_BROKERS = "bootstrap.servers";

/**
* Kafka offset to fall back to in case no checkpoint is available for starting ingestion.
* Valid values - Latest and Earliest.
*/
@CarbonProperty
public static final String KAFKA_INITIAL_OFFSET_TYPE = "auto.offset.reset";

public static final String CARBON_STREAMER_KAFKA_INITIAL_OFFSET_TYPE_DEFAULT = "earliest";

/**
* Key deserializer for kafka. Mandatory for Kafka source.
*/
@CarbonProperty
public static final String KAFKA_KEY_DESERIALIZER = "key.deserializer";

// TODO: check how to take this value, class name or one wrapper above the deserializer
public static final String KAFKA_KEY_DESERIALIZER_DEFAULT =
"org.apache.kafka.common.serialization.StringDeserializer";

/**
* Value deserializer for Kafka. Mandatory for Kafka source
*/
@CarbonProperty
public static final String KAFKA_VALUE_DESERIALIZER = "value.deserializer";

public static final String KAFKA_VALUE_DESERIALIZER_DEFAULT =
"io.confluent.kafka.serializers.KafkaAvroDeserializer";

public static final String AVRO_SCHEMA = "carbon.streamer.avro.schema.deserialize";

/**
* Auto commit to kafka. If enabled, kafka will blindly commit the offsets to offset topic whether
* the respective operation is failed or not. So default we will keep it false. Since Spark
* Streaming checkpoint is enabled, it will take care committing the consumed offsets and it will
* be taken care for failure scenarios also.
*/
public static final String KAFKA_ENABLE_AUTO_COMMIT = "enable.auto.commit";

public static final String KAFKA_ENABLE_AUTO_COMMIT_DEFAULT = "false";

/**
* The carbondata streamer tool is a consumer for kafka ingestion. So this property will assign
* group id for streamer tool in case of kafka ingestion.
*/
@CarbonProperty
public static final String KAFKA_GROUP_ID = "group.id";

// ***************************************************************

/**
* Format of the incoming data/payload.
*/
@CarbonProperty
public static final String CARBON_STREAMER_INPUT_PAYLOAD_FORMAT =
"carbon.streamer.input.payload.format";

public static final String CARBON_STREAMER_INPUT_PAYLOAD_FORMAT_DEFAULT = "avro";

/**
* Schema provider for the incoming batch of data. Currently, 2 types of schema providers are
* supported - FileBasedProvider and SchemaRegistryProvider
*/
@CarbonProperty
public static final String CARBON_STREAMER_SCHEMA_PROVIDER = "carbon.streamer.schema.provider";

public static final String CARBON_STREAMER_SCHEMA_PROVIDER_DEFAULT = "SchemaRegistry";

public static final String CARBON_STREAMER_FILE_SCHEMA_PROVIDER = "FileSchema";

/**
* Path to file/folder containing the schema of incoming data. Mandatory if file-based schema
* provider is selected.
*/
@CarbonProperty
public static final String CARBON_STREAMER_SOURCE_SCHEMA_PATH =
"carbon.streamer.source.schema.path";

/**
* Different merge operations are supported - INSERT, UPDATE, DELETE, UPSERT
*/
@CarbonProperty
public static final String CARBON_STREAMER_MERGE_OPERATION_TYPE =
"carbon.streamer.merge.operation.type";

public static final String CARBON_STREAMER_MERGE_OPERATION_TYPE_DEFAULT = "upsert";

/**
* Name of the field in source schema reflecting the IUD operation types on source data rows.
*/
@CarbonProperty
public static final String CARBON_STREAMER_MERGE_OPERATION_FIELD =
"carbon.streamer.merge.operation.field";

/**
* Name of the field from source schema whose value can be used for picking the latest updates for
@@ -2696,6 +2840,13 @@ private CarbonCommonConstants() {

public static final String CARBON_STREAMER_SOURCE_ORDERING_FIELD_DEFAULT = "";

/**
* Join key/record key for a particular record. Will be used for deduplication of the incoming
* batch. If not present operation will fail.
*/
@CarbonProperty
public static final String CARBON_STREAMER_KEY_FIELD = "carbon.streamer.record.key.field";

/**
* This property specifies if the incoming batch needs to be deduplicated in case of INSERT
* operation type. If set to true, the incoming batch will be deduplicated against the existing
@@ -2717,9 +2868,18 @@ private CarbonCommonConstants() {

public static final String CARBON_STREAMER_UPSERT_DEDUPLICATE_DEFAULT = "true";

/**
* Minimum batch interval time between 2 continuous ingestion in continuous mode. Should be
* specified in seconds.
*/
@CarbonProperty
public static final String CARBON_STREAMER_BATCH_INTERVAL = "carbon.streamer.batch.interval";

public static final String CARBON_STREAMER_BATCH_INTERVAL_DEFAULT = "10";

/**
* The metadata columns coming from the source stream data, which should not be included in the
* target data.
* target data. The value should be comma separated column names.
*/
@CarbonProperty public static final String CARBON_STREAMER_META_COLUMNS =
"carbon.streamer.meta.columns";
@@ -119,6 +119,10 @@ public static CarbonProperties getInstance() {
return INSTANCE;
}

public Properties getAllPropertiesInstance() {
return carbonProperties;
}

/**
* This method is to validate only a specific key added to carbonProperties using addProperty
*
@@ -107,6 +107,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
<version>1.72</version>
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-cli</artifactId>
@@ -174,6 +179,48 @@
</exclusions>
</dependency>
<!-- spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>com.twitter</groupId>
<artifactId>chill-java</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>5.3.4</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>common-config</artifactId>
<version>5.3.4</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>common-utils</artifactId>
<version>5.3.4</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>5.3.4</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.streamer

import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.NullWritable
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonProperties

/**
* This class handles of preparing the Dstream and merging the data onto target carbondata table
* for the DFS Source containing avro data.
* @param carbonTable target carbondata table.
*/
class AvroDFSSource(carbonTable: CarbonTable) extends Source with Serializable {

override
def getStream(
ssc: StreamingContext,
sparkSession: SparkSession): CarbonDStream = {
val dfsFilePath = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_STREAMER_DFS_INPUT_PATH)
// here set the reader schema in the hadoop conf so that the AvroKeyInputFormat will read
// using the reader schema and populate the default values for the columns where data is not
// present. This will help to apply the schema changes to target carbondata table.
val value = ssc.fileStream[AvroKey[Any], NullWritable, AvroKeyInputFormat[Any]](FileFactory
.getUpdatedFilePath(dfsFilePath))
.map(rec => rec._1.datum().asInstanceOf[GenericRecord])
CarbonDStream(value.asInstanceOf[DStream[Any]])
}

override
def prepareDFAndMerge(inputStream: CarbonDStream): Unit = {
prepareDSForAvroSourceAndMerge(inputStream, carbonTable)
}
}
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.streamer

import scala.collection.JavaConverters._

import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonProperties

/**
* This class handles of preparing the Dstream and merging the data onto target carbondata table
* for the kafka Source containing avro data.
* @param carbonTable target carbondata table.
*/
class AvroKafkaSource(carbonTable: CarbonTable) extends Source with Serializable {

override
def getStream(
ssc: StreamingContext,
sparkSession: SparkSession): CarbonDStream = {
// separate out the non carbon properties and prepare the kafka param
val kafkaParams = CarbonProperties.getInstance()
.getAllPropertiesInstance
.asScala
.filter { prop => !prop._1.startsWith("carbon") }
kafkaParams.put(CarbonCommonConstants.AVRO_SCHEMA, schema.toString())
val topics = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_STREAMER_KAFKA_INPUT_TOPIC)
.split(CarbonCommonConstants.COMMA)
val value = KafkaUtils
.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
.map(obj => obj.value().asInstanceOf[GenericRecord])
CarbonDStream(value.asInstanceOf[DStream[Any]])
}

override
def prepareDFAndMerge(inputStream: CarbonDStream): Unit = {
prepareDSForAvroSourceAndMerge(inputStream, carbonTable)
}

}

0 comments on commit 18840af

Please sign in to comment.