Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-2325] Add hive sync support to kafka connect #3660

Merged
merged 6 commits into from Nov 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
114 changes: 90 additions & 24 deletions hudi-kafka-connect/README.md
Expand Up @@ -27,20 +27,14 @@ The first thing you need to do to start using this connector is building it. In
- [Java 1.8+](https://openjdk.java.net/)
- [Apache Maven](https://maven.apache.org/)
- Install [kcat](https://github.com/edenhill/kcat)
= Install jq. `brew install jq`

After installing these dependencies, execute the following commands. This will install all the Hudi dependency jars,
including the fat packaged jar that contains all the dependencies required for a functional Hudi Kafka Connect Sink.

```bash
cd $HUDI_DIR
mvn clean -DskipTests install
```

Henceforth, incremental builds can be performed as follows.

```bash
mvn clean -pl hudi-kafka-connect install -DskipTests
mvn clean -pl packaging/hudi-kafka-connect-bundle install
mvn package -DskipTests -pl packaging/hudi-kafka-connect-bundle -am
```

Next, we need to make sure that the hudi sink connector bundle jar is in Kafka Connect classpath. Note that the connect
Expand All @@ -56,31 +50,58 @@ After building the package, we need to install the Apache Kafka

### 1 - Starting the environment

To try out the Connect Sink locally, set up a Kafka broker locally. Download the latest apache kafka from https://kafka.apache.org/downloads.
Once downloaded and built, run the Zookeeper server and Kafka server using the command line tools.
For runtime dependencies, we encourage using the confluent HDFS connector jars. We have tested our setup with version `10.1.0`.
After downloading the connector, copy the jars from the lib folder to the Kafka Connect classpath.

```bash
export KAFKA_HOME=/path/to/kafka_install_dir
cd $KAFKA_HOME
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
./bin/kafka-server-start.sh ./config/server.properties
confluent-hub install confluentinc/kafka-connect-hdfs:10.1.0
```
Add `confluentinc-kafka-connect-hdfs-10.1.0/lib` to the plugin.path (comma separated) in $HUDI_DIR/hudi-kafka-connect/demo/connect-distributed.properties

### 2 - Set up the docker containers

Wait until the kafka cluster is up and running.
To run the connect locally, we need kafka, zookeeper, hdfs, hive etc. To make the setup easier, we use the docker
containers from the hudi docker demo. Refer to [this link for the setup](https://hudi.apache.org/docs/docker_demo)

### 2 - Set up the schema registry
Essentially, follow the steps listed here:

/etc/hosts : The demo references many services running in container by the hostname. Add the following settings to /etc/hosts
```bash
127.0.0.1 adhoc-1
127.0.0.1 adhoc-2
127.0.0.1 namenode
127.0.0.1 datanode1
127.0.0.1 hiveserver
127.0.0.1 hivemetastore
127.0.0.1 kafkabroker
127.0.0.1 sparkmaster
127.0.0.1 zookeeper
```

Bring up the docker containers
```bash
cd $HUDI_DIR/docker
./setup_demo.sh
```

The schema registry and kafka connector can be run from host system directly (mac/ linux).

### 3 - Set up the schema registry

Hudi leverages schema registry to obtain the latest schema when writing records. While it supports most popular schema
registries, we use Confluent schema registry. Download the
latest [confluent platform](https://docs.confluent.io/platform/current/installation/index.html) and run the schema
registry service.

NOTE: You might need to change the port from `8081` to `8082`.

```bash
cd $CONFLUENT_DIR
/bin/kafka-configs --zookeeper localhost --entity-type topics --entity-name _schemas --alter --add-config cleanup.policy=compact
./bin/schema-registry-start etc/schema-registry/schema-registry.properties
Comment on lines +100 to 101
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a follow-up, maybe we can add all these Kafka-related environment setup, including schema registry, to the docker demo, to make it easier for users to try out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack.

```

### 3 - Create the Hudi Control Topic for Coordination of the transactions
### 4 - Create the Hudi Control Topic for Coordination of the transactions

The control topic should only have `1` partition, since its used to coordinate the Hudi write transactions across the multiple Connect tasks.

Expand All @@ -90,7 +111,7 @@ cd $KAFKA_HOME
./bin/kafka-topics.sh --create --topic hudi-control-topic --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
```

### 4 - Create the Hudi Topic for the Sink and insert data into the topic
### 5 - Create the Hudi Topic for the Sink and insert data into the topic

Open a terminal to execute the following command:

Expand All @@ -106,7 +127,7 @@ to generate, with each batch containing a number of messages and idle time betwe
bash setupKafka.sh -n <num_kafka_messages_per_batch> -b <num_batches>
```

### 5 - Run the Sink connector worker (multiple workers can be run)
### 6 - Run the Sink connector worker (multiple workers can be run)

The Kafka connect is a distributed platform, with the ability to run one or more workers (each running multiple tasks)
that parallely process the records from the Kafka partitions for the same topic. We provide a properties file with
Expand All @@ -120,7 +141,7 @@ cd $KAFKA_HOME
./bin/connect-distributed.sh $HUDI_DIR/hudi-kafka-connect/demo/connect-distributed.properties
```

### 6 - To add the Hudi Sink to the Connector (delete it if you want to re-configure)
### 7 - To add the Hudi Sink to the Connector (delete it if you want to re-configure)

Once the Connector has started, it will not run the Sink, until the Hudi sink is added using the web api. The following
curl APIs can be used to delete and add a new Hudi Sink. Again, a default configuration is provided for the Hudi Sink,
Expand All @@ -144,8 +165,8 @@ Note: HUDI-2325 tracks Hive sync, which will unlock pretty much every other quer

```bash
ls -a /tmp/hoodie/hudi-test-topic
. .hoodie partition-1 partition-3
.. partition-0 partition-2 partition-4
. .hoodie partition_1 partition_3
.. partition_0 partition_2 partition_4

ls -lt /tmp/hoodie/hudi-test-topic/.hoodie
total 72
Expand All @@ -160,7 +181,7 @@ total 72
-rw-r--r-- 1 user wheel 0 Sep 13 21:41 20210913214114.commit.requested
drwxr-xr-x 2 user wheel 64 Sep 13 21:41 archived

ls -l /tmp/hoodie/hudi-test-topic/partition-0
ls -l /tmp/hoodie/hudi-test-topic/partition_0
total 5168
-rw-r--r-- 1 user wheel 439332 Sep 13 21:43 2E0E6DB44ACC8479059574A2C71C7A7E-0_0-0-0_20210913214114.parquet
-rw-r--r-- 1 user wheel 440179 Sep 13 21:42 3B56FAAAE2BDD04E480C1CBACD463D3E-0_0-0-0_20210913214114.parquet
Expand All @@ -170,7 +191,52 @@ total 5168
-rw-r--r-- 1 user wheel 440214 Sep 13 21:43 E200FA75DCD1CED60BE86BCE6BF5D23A-0_0-0-0_20210913214114.parquet
```

### 7 - Run async compaction and clustering if scheduled
### 8- Querying via Hive

```bash
docker exec -it adhoc-2 /bin/bash
beeline -u jdbc:hive2://hiveserver:10000 \
--hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat \
--hiveconf hive.stats.autogather=false


# List Tables
0: jdbc:hive2://hiveserver:10000> show tables;
+---------------------+--+
| tab_name |
+---------------------+--+
| huditesttopic_ro |
| huditesttopic_rt |
+---------------------+--+
3 rows selected (1.199 seconds)
0: jdbc:hive2://hiveserver:10000>


# Look at partitions that were added
0: jdbc:hive2://hiveserver:10000> show partitions huditesttopic_rt;
+-------------------+--+
| partition |
+-------------------+--+
| date=partition_0 |
| date=partition_1 |
| date=partition_2 |
| date=partition_3 |
| date=partition_4 |
+-------------------+--+
1 row selected (0.24 seconds)


0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from huditesttopic_rt;
+----------------------+---------+----------------------+---------+------------+-----------+--+
| _hoodie_commit_time | symbol | ts | volume | open | close |
+----------------------+---------+----------------------+---------+------------+-----------+--+
| 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
| 20180924222155 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 |
+----------------------+---------+----------------------+---------+------------+-----------+--+
```


### 9 - Run async compaction and clustering if scheduled

When using Merge-On-Read (MOR) as the table type, async compaction and clustering can be scheduled when the Sink is
running. Inline compaction and clustering are disabled by default due to performance reason. By default, async
Expand Down
19 changes: 15 additions & 4 deletions hudi-kafka-connect/demo/config-sink.json
@@ -1,7 +1,7 @@
{
"name": "hudi-sink",
"config": {
"bootstrap.servers": "localhost:9092",
"bootstrap.servers": "kafkabroker:9092",
"connector.class": "org.apache.hudi.connect.HoodieSinkConnector",
"tasks.max": "4",
rmahindra123 marked this conversation as resolved.
Show resolved Hide resolved
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
Expand All @@ -11,10 +11,21 @@
"hoodie.table.name": "hudi-test-topic",
"hoodie.table.type": "MERGE_ON_READ",
"hoodie.metadata.enable": "false",
"hoodie.base.path": "file:///tmp/hoodie/hudi-test-topic",
"hoodie.base.path": "hdfs://namenode:8020/user/hive/warehouse/hudi-test-topic",
"hoodie.datasource.write.recordkey.field": "volume",
"hoodie.datasource.write.partitionpath.field": "date",
"hoodie.schemaprovider.class": "org.apache.hudi.schema.SchemaRegistryProvider",
"hoodie.deltastreamer.schemaprovider.registry.url": "http://localhost:8081/subjects/hudi-test-topic/versions/latest"
}
"hoodie.deltastreamer.schemaprovider.registry.url": "http://localhost:8082/subjects/hudi-test-topic/versions/latest",
"hoodie.kafka.commit.interval.secs": 60,
"hoodie.meta.sync.enable": "true",
"hoodie.meta.sync.classes": "org.apache.hudi.hive.HiveSyncTool",
"hoodie.datasource.hive_sync.table": "huditesttopic",
"hoodie.datasource.hive_sync.partition_fields": "date",
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms",
"dfs.client.use.datanode.hostname": "true",
"hive.metastore.uris": "thrift://hivemetastore:9083",
"hive.metastore.client.socket.timeout": "1500s"
}
}
2 changes: 1 addition & 1 deletion hudi-kafka-connect/demo/connect-distributed.properties
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.
##

bootstrap.servers=localhost:9092
bootstrap.servers=kafkabroker:9092
group.id=hudi-connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
Expand Down
11 changes: 6 additions & 5 deletions hudi-kafka-connect/demo/setupKafka.sh
Expand Up @@ -50,6 +50,7 @@ fi

## defaults
rawDataFile=${HUDI_DIR}/docker/demo/data/batch_1.json
kafkaBrokerHostname=kafkabroker
kafkaTopicName=hudi-test-topic
numKafkaPartitions=4
recordKey=volume
Expand Down Expand Up @@ -115,23 +116,23 @@ done
if [ $recreateTopic = "Y" ]; then
# First delete the existing topic
echo "Delete Kafka topic $kafkaTopicName ..."
${KAFKA_HOME}/bin/kafka-topics.sh --delete --topic ${kafkaTopicName} --bootstrap-server localhost:9092
${KAFKA_HOME}/bin/kafka-topics.sh --delete --topic ${kafkaTopicName} --bootstrap-server ${kafkaBrokerHostname}:9092

# Create the topic with 4 partitions
echo "Create Kafka topic $kafkaTopicName ..."
${KAFKA_HOME}/bin/kafka-topics.sh --create --topic ${kafkaTopicName} --partitions $numKafkaPartitions --replication-factor 1 --bootstrap-server localhost:9092
${KAFKA_HOME}/bin/kafka-topics.sh --create --topic ${kafkaTopicName} --partitions $numKafkaPartitions --replication-factor 1 --bootstrap-server ${kafkaBrokerHostname}:9092
fi

# Setup the schema registry
export SCHEMA=$(sed 's|/\*|\n&|g;s|*/|&\n|g' ${schemaFile} | sed '/\/\*/,/*\//d' | jq tostring)
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data "{\"schema\": $SCHEMA}" http://localhost:8081/subjects/${kafkaTopicName}/versions
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data "{\"schema\": $SCHEMA}" http://localhost:8082/subjects/${kafkaTopicName}/versions
curl -X GET http://localhost:8081/subjects/${kafkaTopicName}/versions/latest

# Generate kafka messages from raw records
# Each records with unique keys and generate equal messages across each hudi partition
partitions={}
for ((i = 0; i < ${numHudiPartitions}; i++)); do
partitions[$i]="partition-"$i
partitions[$i]="partition_"$i
done

events_file=/tmp/kcat-input.events
Expand Down Expand Up @@ -170,5 +171,5 @@ for ((i = 1;i<=numBatch;i++)); do
done

echo "publish to Kafka ..."
grep -v '^$' ${events_file} | kcat -P -b localhost:9092 -t hudi-test-topic
grep -v '^$' ${events_file} | kcat -P -b ${kafkaBrokerHostname}:9092 -t ${kafkaTopicName}
done
20 changes: 19 additions & 1 deletion hudi-kafka-connect/pom.xml
Expand Up @@ -148,14 +148,19 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.12.1</version>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<!-- Protobuf -->
<dependency>
Expand Down Expand Up @@ -195,6 +200,19 @@
<version>${hadoop.version}</version>
</dependency>

<!-- Hive -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-common</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-metastore</artifactId>
<version>${hive.version}</version>
</dependency>


<!-- Hudi - Test -->
<dependency>
<groupId>org.apache.hudi</groupId>
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.apache.hudi.common.util.SerializationUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.connect.ControlMessage;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.CustomAvroKeyGenerator;
Expand Down Expand Up @@ -63,6 +64,7 @@
public class KafkaConnectUtils {

private static final Logger LOG = LogManager.getLogger(KafkaConnectUtils.class);
private static final String HOODIE_CONF_PREFIX = "hoodie.";

public static int getLatestNumPartitions(String bootstrapServers, String topicName) {
Properties props = new Properties();
Expand All @@ -85,9 +87,15 @@ public static int getLatestNumPartitions(String bootstrapServers, String topicNa
*
* @return
*/
public static Configuration getDefaultHadoopConf() {
public static Configuration getDefaultHadoopConf(KafkaConnectConfigs connectConfigs) {
Configuration hadoopConf = new Configuration();
hadoopConf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
connectConfigs.getProps().keySet().stream().filter(prop -> {
// In order to prevent printing unnecessary warn logs, here filter out the hoodie
// configuration items before passing to hadoop/hive configs
return !prop.toString().startsWith(HOODIE_CONF_PREFIX);
}).forEach(prop -> {
hadoopConf.set(prop.toString(), connectConfigs.getProps().get(prop.toString()).toString());
});
Comment on lines +92 to +98
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see similar code in KafkaOffsetGen::excludeHoodieConfigs(). Could you keep one of them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could, but i have to convert the kafka map configs to hadoop configs. also later we can change this logic if required. for instance, if we want to just have hadoop confs start with "conf.hadoop." in kafka connect, we can do that independently. wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Then let's keep this method.

return hadoopConf;
}

Expand Down
Expand Up @@ -94,8 +94,6 @@ protected KafkaConnectConfigs() {

protected KafkaConnectConfigs(Properties props) {
super(props);
Properties newProps = new Properties();
newProps.putAll(props);
}

public static KafkaConnectConfigs.Builder newBuilder() {
Expand Down