Skip to content

Commit

Permalink
PR apache#176: Kafka to PubSub template
Browse files Browse the repository at this point in the history
Please approve this CL. It will be submitted automatically, and its GitHub pull request will be marked as merged.

Imported from GitHub PR apache#176

This PR adds Kafka to PubSub Flex template.

Copybara import of the project:

  - 7aceebf9219d17becb789b6ccff647ee6ded6420 Implemented Kafka to Pub/Sub template by Artur Khanin <artur.khanin@akvelon.com>

COPYBARA_INTEGRATE_REVIEW=GoogleCloudPlatform/DataflowTemplates#176 from akvelon:KafkaToPubsubTemplate 7aceebf9219d17becb789b6ccff647ee6ded6420
PiperOrigin-RevId: 343484865
  • Loading branch information
cloud-teleport authored and prathapreddy123 committed Dec 3, 2020
1 parent b870f38 commit cae5d2b
Show file tree
Hide file tree
Showing 15 changed files with 1,192 additions and 1 deletion.
205 changes: 205 additions & 0 deletions v2/kafka-to-pubsub/README.md
@@ -0,0 +1,205 @@
# Dataflow Flex Template to ingest data from Apache Kafka to Google Cloud Pub/Sub

This directory contains a Dataflow Flex Template that creates a pipeline
to read data from a single or multiple topics from
[Apache Kafka](https://kafka.apache.org/) and write data into a single topic
in [Google Pub/Sub](https://cloud.google.com/pubsub).

Supported data formats:
- Serializable plaintext formats, such as JSON
- [PubSubMessage](https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage)

Supported input source configurations:
- Single or multiple Apache Kafka bootstrap servers
- Apache Kafka SASL/SCRAM authentication over plaintext or SSL connection
- Secrets vault service [HashiCorp Vault](https://www.vaultproject.io/)

Supported destination configuration:
- Single Google Pub/Sub topic

Supported SSL certificate location:
- Bucket in [Google Cloud Storage](https://cloud.google.com/storage)

In a simple scenario, the template will create an Apache Beam pipeline that will read messages
from a source Kafka server with a source topic, and stream the text messages
into specified Pub/Sub destination topic.
Other scenarios may need Kafka SASL/SCRAM authentication, that can be performed over plain text or SSL encrypted connection.
The template supports using a single Kafka user account to authenticate in the provided source Kafka servers and topics.
To support SASL authentication over SSL the template will need access to a secrets vault service with
Kafka username and password, and with SSL certificate location in Google Cloud Storage Bucket, currently supporting HashiCorp Vault.

## Requirements

- Java 8
- Kafka Bootstrap Server(s) up and running
- Source Kafka Topic(s)
- Destination PubSub output topic created
- (Optional) An existing HashiCorp Vault
- (Optional) A configured secure SSL connection for Kafka

## Getting Started

This section describes what is needed to get the template up and running.
- Set up the environment
- Build Apache Kafka to Google Pub/Sub Dataflow Flex Template
- Create a Dataflow job to ingest data using the template

### Setting Up Project Environment

#### Pipeline variables:

```
PROJECT=<my-project>
BUCKET_NAME=<my-bucket>
REGION=<my-region>
```

#### Template Metadata Storage Bucket Creation

The Dataflow Flex template has to store its metadata in a bucket in
[Google Cloud Storage](https://cloud.google.com/storage), so it can be executed from the Google Cloud Platform.
Create the bucket in Google Cloud Storage if it doesn't exist yet:

```
gsutil mb gs://${BUCKET_NAME}
```

#### Containerization variables:

```
IMAGE_NAME=<my-image-name>
TARGET_GCR_IMAGE=gcr.io/${PROJECT}/${IMAGE_NAME}
BASE_CONTAINER_IMAGE=<my-base-container-image>
BASE_CONTAINER_IMAGE_VERSION=<my-base-container-image-version>
TEMPLATE_PATH="gs://${BUCKET_NAME}/templates/kafka-pubsub.json"
```
OPTIONAL
```
JS_PATH=gs://path/to/udf
JS_FUNC_NAME=my-js-function
```

## Build Apache Kafka to Google Cloud Pub/Sub Flex Dataflow Template

Dataflow Flex Templates package the pipeline as a Docker image and stage these images
on your project's [Container Registry](https://cloud.google.com/container-registry).

### Assembling the Uber-JAR

The Dataflow Flex Templates require your Java project to be built into
an Uber JAR file.

Navigate to the v2 folder:

```
cd /path/to/DataflowTemplates/v2
```

Build the Uber JAR:

```
mvn package -am -pl kafka-to-pubsub
```

ℹ️ An **Uber JAR** - also known as **fat JAR** - is a single JAR file that contains
both target package *and* all its dependencies.

The result of the `package` task execution is a `kafka-to-pubsub-1.0-SNAPSHOT.jar`
file that is generated under the `target` folder in kafka-to-pubsub directory.

### Creating the Dataflow Flex Template

To execute the template you need to create the template spec file containing all
the necessary information to run the job. This template already has the following
[metadata file](src/main/resources/kafka_to_pubsub_metadata.json) in resources.

Navigate to the template folder:

```
cd /path/to/DataflowTemplates/v2/kafka-to-pubsub
```

Build the Dataflow Flex Template:

```
gcloud dataflow flex-template build ${TEMPLATE_PATH} \
--image-gcr-path "${TARGET_GCR_IMAGE}" \
--sdk-language "JAVA" \
--flex-template-base-image ${BASE_CONTAINER_IMAGE} \
--metadata-file "src/main/resources/kafka_to_pubsub_metadata.json" \
--jar "target/kafka-to-pubsub-1.0-SNAPSHOT.jar" \
--env FLEX_TEMPLATE_JAVA_MAIN_CLASS="com.google.cloud.teleport.v2.templates.KafkaToPubsub"
```

### Executing Template

To deploy the pipeline, you should refer to the template file and pass the
[parameters](https://cloud.google.com/dataflow/docs/guides/specifying-exec-params#setting-other-cloud-dataflow-pipeline-options)
required by the pipeline.

The template requires the following parameters:
- bootstrapServers: Comma separated kafka bootstrap servers in format ip:port
- inputTopics: Comma separated list of Kafka topics to read from
- outputTopic: Pub/Sub topic to write the output, in the format of 'projects/yourproject/topics/yourtopic'

The template allows for the user to supply the following optional parameters:
- javascriptTextTransformGcsPath: Path to javascript function in GCS
- javascriptTextTransformFunctionName: Name of javascript function
- outputDeadLetterTopic: Topic for messages failed to reach the output topic(aka. DeadLetter topic)
- secretStoreUrl: URL to Kafka credentials in HashiCorp Vault secret storage in the format
'http(s)://vaultip:vaultport/path/to/credentials'
- vaultToken: Token to access HashiCorp Vault secret storage

You can do this in 3 different ways:
1. Using [Dataflow Google Cloud Console](https://console.cloud.google.com/dataflow/jobs)

2. Using `gcloud` CLI tool
```bash
gcloud dataflow flex-template run "kafka-to-pubsub-`date +%Y%m%d-%H%M%S`" \
--template-file-gcs-location "${TEMPLATE_PATH}" \
--parameters bootstrapServers="broker_1:9092,broker_2:9092" \
--parameters inputTopics="topic1,topic2" \
--parameters outputTopic="projects/${PROJECT}/topics/your-topic-name" \
--parameters outputDeadLetterTopic="projects/${PROJECT}/topics/your-topic-name" \
--parameters javascriptTextTransformGcsPath=${JS_PATH} \
--parameters javascriptTextTransformFunctionName=${JS_FUNC_NAME} \
--parameters secretStoreUrl="http(s)://host:port/path/to/credentials" \
--parameters vaultToken="your-token" \
--region "${REGION}"
```
3. With a REST API request
```
API_ROOT_URL="https://dataflow.googleapis.com"
TEMPLATES_LAUNCH_API="${API_ROOT_URL}/v1b3/projects/${PROJECT}/locations/${REGION}/flexTemplates:launch"
JOB_NAME="kafka-to-pubsub-`date +%Y%m%d-%H%M%S-%N`"
time curl -X POST -H "Content-Type: application/json" \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-d '
{
"launch_parameter": {
"jobName": "'$JOB_NAME'",
"containerSpecGcsPath": "'$TEMPLATE_PATH'",
"parameters": {
"bootstrapServers": "broker_1:9091, broker_2:9092",
"inputTopics": "topic1, topic2",
"outputTopic": "projects/'$PROJECT'/topics/your-topic-name",
"outputDeadLetterTopic": "projects/'$PROJECT'/topics/your-dead-letter-topic-name",
"javascriptTextTransformGcsPath": '$JS_PATH',
"javascriptTextTransformFunctionName": '$JS_FUNC_NAME',
"secretStoreUrl": "http(s)://host:port/path/to/credentials",
"vaultToken": "your-token"
}
}
}
'
"${TEMPLATES_LAUNCH_API}"
```

_Note_: Credentials inside secret storage should have appropriate SSL configuration with following parameters:
- `bucket` - the bucket in Google Cloud Storage with SSL certificate
- `ssl.truststore.location` - the location of the trust store file
- `ssl.truststore.password` - the password for the trust store file
- `ssl.keystore.location` - the location of the key store file
- `ssl.keystore.password` - the store password for the key store file
- `ssl.key.password` - the password of the private key in the key store file
50 changes: 50 additions & 0 deletions v2/kafka-to-pubsub/pom.xml
@@ -0,0 +1,50 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~ Copyright (C) 2020 Google Inc.
~
~ Licensed under the Apache License, Version 2.0 (the "License"); you may not
~ use this file except in compliance with the License. You may obtain a copy of
~ the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
~ License for the specific language governing permissions and limitations under
~ the License.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dynamic-templates</artifactId>
<groupId>com.google.cloud.teleport.v2</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>kafka-to-pubsub</artifactId>
<modelVersion>4.0.0</modelVersion>

<properties>
<kafka-clients.version>2.3.0</kafka-clients.version>
</properties>

<dependencies>
<dependency>
<groupId>com.google.cloud.teleport.v2</groupId>
<artifactId>common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka-clients.version}</version>
</dependency>
</dependencies>
</project>

@@ -0,0 +1,106 @@
/*
* Copyright (C) 2020 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.cloud.teleport.v2.kafka.consumer;

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.HashSet;import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Class to create Kafka Consumer with configured SSL.
*/
public class SslConsumerFactoryFn
implements SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
{
private final Map<String, String> sslConfig;
private static final String TRUSTSTORE_LOCAL_PATH = "/tmp/kafka.truststore.jks";
private static final String KEYSTORE_LOCAL_PATH = "/tmp/kafka.keystore.jks";

/* Logger for class.*/
private static final Logger LOG = LoggerFactory.getLogger(SslConsumerFactoryFn.class);

public SslConsumerFactoryFn(Map<String, String> sslConfig) {
this.sslConfig = sslConfig;
}

@Override
public Consumer<byte[], byte[]> apply(Map<String, Object> config)
{
String bucket = sslConfig.get("bucket");
String trustStorePath = sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG);
String keyStorePath = sslConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
String trustStorePassword = sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);
String keyStorePassword = sslConfig.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
String keyPassword = sslConfig.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG);
String outputTrustStoreFilePath;
String outputKeyStoreFilePath;
try
{
outputTrustStoreFilePath = TRUSTSTORE_LOCAL_PATH;
outputKeyStoreFilePath = KEYSTORE_LOCAL_PATH;
getGcsFileAsLocal(bucket, trustStorePath, outputTrustStoreFilePath);
getGcsFileAsLocal(bucket, keyStorePath, outputKeyStoreFilePath);
} catch (IOException e) {
LOG.error("Failed to retrieve data for SSL", e);
return new KafkaConsumer<>(config);
}

config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name());
config.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, outputTrustStoreFilePath);
config.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, outputKeyStoreFilePath);
config.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword);
config.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keyStorePassword);
config.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword);

return new KafkaConsumer<>(config);
}

/**
* Reads a file from GCS and writes it locally.
*
* @param bucket GCS bucket name
* @param filePath path to file in GCS
* @param outputFilePath path where to save file locally
* @throws IOException thrown if not able to read or write file
*/
public static void getGcsFileAsLocal(String bucket, String filePath, String outputFilePath) throws IOException {
String gcsFilePath = String.format("gs://%s/%s", bucket, filePath);
LOG.info("Reading contents from GCS file: {}", gcsFilePath);
Set<StandardOpenOption> options = new HashSet<>(2);
options.add(StandardOpenOption.CREATE);
options.add(StandardOpenOption.APPEND);
// Copy the GCS file into a local file and will throw an I/O exception in case file not found.
try (ReadableByteChannel readerChannel = FileSystems.open(FileSystems.matchSingleFileSpec(gcsFilePath).resourceId())) {
try (FileChannel writeChannel = FileChannel.open(Paths.get(outputFilePath), options)) {
writeChannel.transferFrom(readerChannel,0, Long.MAX_VALUE);
}
}
}
}

0 comments on commit cae5d2b

Please sign in to comment.