Skip to content
Permalink
Browse files
[CARBONDATA-4308]: added docs for streamer tool configs
Why is this PR needed?
Documentation for the CDC streamer tool is missing

What changes were proposed in this PR?
Add th documentation for the cdc streamer tool contains the configs
and all the image and example command to try out.

Does this PR introduce any user interface change?
No

Is any new testcase added?
No

This closes #4243
  • Loading branch information
pratyakshsharma authored and akashrn5 committed Dec 28, 2021
1 parent a072e7a commit 970f11d7dc7e4f11938ad0163cc416ed4b7b456d
Showing 4 changed files with 85 additions and 36 deletions.
@@ -2745,25 +2745,6 @@ private CarbonCommonConstants() {

public static final String CARBON_STREAMER_KAFKA_INITIAL_OFFSET_TYPE_DEFAULT = "earliest";

/**
* Key deserializer for kafka. Mandatory for Kafka source.
*/
@CarbonProperty
public static final String KAFKA_KEY_DESERIALIZER = "key.deserializer";

// TODO: check how to take this value, class name or one wrapper above the deserializer
public static final String KAFKA_KEY_DESERIALIZER_DEFAULT =
"org.apache.kafka.common.serialization.StringDeserializer";

/**
* Value deserializer for Kafka. Mandatory for Kafka source
*/
@CarbonProperty
public static final String KAFKA_VALUE_DESERIALIZER = "value.deserializer";

public static final String KAFKA_VALUE_DESERIALIZER_DEFAULT =
"io.confluent.kafka.serializers.KafkaAvroDeserializer";

public static final String AVRO_SCHEMA = "carbon.streamer.avro.schema.deserialize";

/**
@@ -2831,7 +2812,7 @@ private CarbonCommonConstants() {

/**
* Name of the field from source schema whose value can be used for picking the latest updates for
* a particular record in the incoming batch in case of duplicates record keys. Useful if the
* a particular record in the incoming batch in case of duplicate record keys. Useful if the
* write operation type is UPDATE or UPSERT. This will be used only if
* carbon.streamer.upsert.deduplicate is enabled.
*/
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
@@ -27,7 +27,7 @@ than changing regularly on a time basis.

SCD and CDC data changes can be merged to a carbon dataset online using the data frame level `MERGE`, `UPSERT`, `UPDATE`, `DELETE` and `INSERT` APIs.

#### MERGE API
### MERGE API

Below API merges the datasets online and applies the actions as per the conditions.

@@ -131,4 +131,86 @@ clauses can have at most one UPDATE and one DELETE action, These clauses have th

* Please refer example class [MergeTestCase](https://github.com/apache/carbondata/blob/master/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala) to understand and implement scd and cdc scenarios using APIs.
* Please refer example class [DataMergeIntoExample](https://github.com/apache/carbondata/blob/master/examples/spark/src/main/scala/org/apache/carbondata/examples/DataMergeIntoExample.scala) to understand and implement scd and cdc scenarios using sql.
* Please refer example class [DataUPSERTExample](https://github.com/apache/carbondata/blob/master/examples/spark/src/main/scala/org/apache/carbondata/examples/DataUPSERTExample.scala) to understand and implement cdc using UPSERT APIs.
* Please refer example class [DataUPSERTExample](https://github.com/apache/carbondata/blob/master/examples/spark/src/main/scala/org/apache/carbondata/examples/DataUPSERTExample.scala) to understand and implement cdc using UPSERT APIs.

### Streamer Tool

Carbondata streamer tool is a very powerful tool for incrementally capturing change events from varied sources like kafka or DFS and merging them into target carbondata table. This essentially means one needs to integrate with external solutions like Debezium or Maxwell for moving the change events to kafka, if one wishes to capture changes from primary databases like mysql. The tool currently requires incoming data to be present in avro format and incoming schema to evolve in backwards compatible way.

Below is a high level architecture of how the overall pipeline looks like -

![Carbondata streamer tool pipeline](../docs/images/carbondata-streamer-tool-pipeline.png?raw=true)

#### Configs

Streamer tool exposes below configs for users to cater to their CDC use cases -

| Parameter | Default Value | Description |
|-----------------------------------|------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| carbon.streamer.target.database | Current database from spark session | The database name where the target table is present to merge the incoming data. If not given by user, system will take the current database in the spark session. |
| carbon.streamer.target.table | (none) | The target carbondata table where the data has to be merged. If this is not configured by user, the operation will fail. |
| carbon.streamer.source.type | kafka | Streamer tool currently supports two types of data sources. One can ingest data from either kafka or DFS into target carbondata table using streamer tool. |
| carbon.streamer.dfs.input.path | (none) | An absolute path on a given file system from where data needs to be read to ingest into the target carbondata table. Mandatory if the ingestion source type is DFS. |
| schema.registry.url | (none) | Streamer tool supports 2 different ways to supply schema of incoming data. Schemas can be supplied using avro files (file based schema provider) or using schema registry. This property defines the url to connect to in case schema registry is used as the schema source. |
| carbon.streamer.input.kafka.topic | (none) | This is a mandatory property to be set in case kafka is chosen as the source of data. This property defines the topics from where streamer tool will consume the data. |
| bootstrap.servers | (none) | This is another mandatory property in case kafka is chosen as the source of data. This defines the end points for kafka brokers. |
| auto.offset.reset | earliest | Streamer tool maintains checkpoints to keep a track of the incoming messages which are already consumed. In case of first ingestion using kafka source, this property defines the offset from where ingestion will start. This property can take only 2 valid values - `latest` and `earliest` |
| enable.auto.commit | false | Kafka maintains an internal topic for storing offsets corresponding to the consumer groups. This property determines if kafka should actually go forward and commit the offsets consumed in this internal topic. We recommend to keep it as false since we use spark streaming checkpointing to take care of the same. |
| group.id | (none) | Streamer tool is ultimately a consumer for kafka. This property determines the consumer group id streamer tool belongs to. |
| carbon.streamer.input.payload.format | avro | This determines the format of the incoming messages from source. Currently only avro is supported. We have plans to extend this support to json as well in near future. Avro is the most preferred format for CDC use cases since it helps in making the message size very compact and has good support for schema evolution use cases as well. |
| carbon.streamer.schema.provider | SchemaRegistry | As discussed earlier, streamer tool supports 2 ways of supplying schema for incoming messages - schema registry and avro files. Confluent schema registry is the preferred way when using avro as the input format. |
| carbon.streamer.source.schema.path | (none) | This property defines the absolute path where files containing schemas for incoming messages are present. |
| carbon.streamer.merge.operation.type | upsert | This defines the operation that needs to be performed on the incoming batch of data while writing it to target data set. |
| carbon.streamer.merge.operation.field | (none) | This property defines the field in incoming schema which contains the type of operation performed at source. For example, Debezium includes a field called `op` when reading change events from primary database. Do not confuse this property with `carbon.streamer.merge.operation.type` which defines the operation to be performed on the incoming batch of data. However this property is needed so that streamer tool is able to identify rows deleted at source when the operation type is `upsert`. |
| carbon.streamer.record.key.field | (none) | This defines the record key for a particular incoming record. This is used by the streamer tool for performing deduplication. In case this is not defined, operation will fail. |
| carbon.streamer.batch.interval | 10 | Minimum batch interval time between 2 continuous ingestion in continuous mode. Should be specified in seconds. |
| carbon.streamer.source.ordering.field | <none> | Name of the field from source schema whose value can be used for picking the latest updates for a particular record in the incoming batch in case of multiple updates for the same record key. Useful if the write operation type is UPDATE or UPSERT. This will be used only if `carbon.streamer.upsert.deduplicate` is enabled. |
| carbon.streamer.insert.deduplicate | false | This property specifies if the incoming batch needs to be deduplicated in case of INSERT operation type. If set to true, the incoming batch will be deduplicated against the existing data in the target carbondata table. |
| carbon.streamer.upsert.deduplicate | true | This property specifies if the incoming batch needs to be deduplicated (when multiple updates for the same record key are present in the incoming batch) in case of UPSERT/UPDATE operation type. If set to true, the user needs to provide proper value for the source ordering field as well. |
| carbon.streamer.meta.columns | (none) | Generally when performing CDC operations on primary databases, few metadata columns are added along with the actual columns for book keeping purposes. This property enables users to list down all such metadata fields (comma separated) which should not be merged with the target carboondata table. |
| carbon.enable.schema.enforcement | true | This flag decides if table schema needs to change as per the incoming batch schema. If set to true, incoming schema will be validated with existing table schema. If the schema has evolved, the incoming batch cannot be ingested and job will simply fail. |

#### Commands

1. For kafka source -

```
bin/spark-submit --class org.apache.carbondata.streamer.CarbonDataStreamer \
--master <spark_master_url> \
<carbondata_assembly_jar_path> \
--database-name testdb \
--target-table target \
--record-key-field name \
--source-ordering-field age \
--source-type kafka \
--deduplicate false \
--input-kafka-topic test_topic \
--brokers <comma_separated_list_of_brokers> \
--schema-registry-url <schema_registry_url> \
--group-id testgroup \
--meta-columns __table,__db,__ts_ms,__file,__pos,__deleted
```

2. For DFS source -

```
bin/spark-submit --class org.apache.carbondata.streamer.CarbonDataStreamer \
--master <spark_master_url> \
<carbondata_assembly_jar_path> \
--database-name carbondb \
--target-table test \
--record-key-field name \
--source-ordering-field age \
--source-type dfs \
--dfs-source-input-path /home/root1/Projects/avrodata \
--schema-provider-type FileSchema \
--deduplicate false \
--source-schema-file-path /home/root1/Projects/avroschema/
```

#### Future Scope

Contributions are welcome from community members for supporting the below use cases and proposing new scenarios for streamer tool -

1. Support JSON format for incoming messages
2. Support full schema evolution

0 comments on commit 970f11d

Please sign in to comment.