diff --git a/kafka-plugins-0.9/docs/KAFKABATCHSOURCE.md b/kafka-plugins-0.10/docs/KAFKABATCHSOURCE.md similarity index 92% rename from kafka-plugins-0.9/docs/KAFKABATCHSOURCE.md rename to kafka-plugins-0.10/docs/KAFKABATCHSOURCE.md index eafab36..5d0683a 100644 --- a/kafka-plugins-0.9/docs/KAFKABATCHSOURCE.md +++ b/kafka-plugins-0.10/docs/KAFKABATCHSOURCE.md @@ -10,7 +10,7 @@ Kafka batch source that emits a records with user specified schema. Usage Notes ----------- -Kafka Batch Source can be used to read events from a kafka topic. It uses kafka consumer [0.9.1 apis](https://kafka.apache.org/090/documentation.html) to read events from a kafka topic. The Kafka Batch Source supports providing additional kafka properties for the kafka consumer, reading from kerberos-enabled kafka and limiting the number of records read. Kafka Batch Source converts incoming kafka events into cdap structured records which then can be used for further transformations. +Kafka Batch Source can be used to read events from a kafka topic. It uses kafka consumer [0.10.2 apis](https://kafka.apache.org/0100/documentation.html) to read events from a kafka topic. The Kafka Batch Source supports providing additional kafka properties for the kafka consumer, reading from kerberos-enabled kafka and limiting the number of records read. Kafka Batch Source converts incoming kafka events into cdap structured records which then can be used for further transformations. The source will read from the earliest available offset or the initial offset that specified in the config for the first run, remember the last offset it read last run and continue from that offset for the next run. diff --git a/kafka-plugins-0.10/docs/KAFKASOURCE.md b/kafka-plugins-0.10/docs/KAFKASOURCE.md new file mode 100644 index 0000000..4d00321 --- /dev/null +++ b/kafka-plugins-0.10/docs/KAFKASOURCE.md @@ -0,0 +1,85 @@ +[![Build Status](https://travis-ci.org/hydrator/kafka-plugins.svg?branch=master)](https://travis-ci.org/hydrator/kafka-plugins) [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) + +Kafka Source +=========== + +Kafka streaming source that emits a records with user specified schema. + +plugin configuration + +Usage Notes +----------- + +Kafka Streaming Source can be used to read events from a kafka topic. It uses kafka consumer [0.10.2 apis](https://kafka.apache.org/0100/documentation.html) to read events from a kafka topic. Kafka Source converts incoming kafka events into cdap structured records which then can be used for further transformations. + +The source provides capabilities to read from latest offset or from beginning or from the provided kafka offset. The plugin relies on Spark Streaming offset [storage capabilities](https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html) to manager offsets and checkpoints. + +Plugin Configuration +--------------------- + +| Configuration | Required | Default | Description | +| :------------ | :------: | :----- | :---------- | +| **Kafka Brokers** | **Y** | N/A | List of Kafka brokers specified in host1:port1,host2:port2 form. | +| **Kafka Topic** | **Y** | N/A | The Kafka topic to read from. | +| **Topic Partition** | **N** | N/A | List of topic partitions to read from. If not specified, all partitions will be read. | +| **Default Initial Offset** | **N** | N/A | The default initial offset for all topic partitions. An offset of -2 means the smallest offset. An offset of -1 means the latest offset. Defaults to -1. Offsets are inclusive. If an offset of 5 is used, the message at offset 5 will be read. If you wish to set different initial offsets for different partitions, use the initialPartitionOffsets property. | +| **Initial Partition Offsets** | **N** | N/A | The initial offset for each topic partition. If this is not specified, all partitions will use the same initial offset, which is determined by the defaultInitialOffset property. Any partitions specified in the partitions property, but not in this property will use the defaultInitialOffset. An offset of -2 means the smallest offset. An offset of -1 means the latest offset. Offsets are inclusive. If an offset of 5 is used, the message at offset 5 will be read. | +| **Time Field** | **N** | N/A | Optional name of the field containing the read time of the batch. If this is not set, no time field will be added to output records. If set, this field must be present in the schema property and must be a long. | +| **Key Field** | **N** | N/A | Optional name of the field containing the message key. If this is not set, no key field will be added to output records. If set, this field must be present in the schema property and must be bytes. | +| **Partition Field** | **N** | N/A | Optional name of the field containing the partition the message was read from. If this is not set, no partition field will be added to output records. If set, this field must be present in the schema property and must be an int. | +| **Offset Field** | **N** | N/A | Optional name of the field containing the partition offset the message was read from. If this is not set, no offset field will be added to output records. If set, this field must be present in the schema property and must be a long. | +| **Format** | **N** | N/A | Optional format of the Kafka event message. Any format supported by CDAP is supported. For example, a value of 'csv' will attempt to parse Kafka payloads as comma-separated values. If no format is given, Kafka message payloads will be treated as bytes. | +| **Kerberos Principal** | **N** | N/A | The kerberos principal used for the source when kerberos security is enabled for kafka. | +| **Keytab Location** | **N** | N/A | The keytab location for the kerberos principal when kerberos security is enabled for kafka. | + + +Build +----- +To build this plugin: + +``` + mvn clean package +``` + +The build will create a .jar and .json file under the ``target`` directory. +These files can be used to deploy your plugins. + +Deployment +---------- +You can deploy your plugins using the CDAP CLI: + + > load artifact .jar config-file .json> + +For example, if your artifact is named 'kafka-plugins-': + + > load artifact target/kafka-plugins-.jar config-file target/kafka-plugins-.json + +## Mailing Lists + +CDAP User Group and Development Discussions: + +* `cdap-user@googlegroups.com ` + +The *cdap-user* mailing list is primarily for users using the product to develop +applications or building plugins for appplications. You can expect questions from +users, release announcements, and any other discussions that we think will be helpful +to the users. + +## License and Trademarks + +Copyright © 2018 Cask Data, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +in compliance with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software distributed under the +License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +either express or implied. See the License for the specific language governing permissions +and limitations under the License. + +Cask is a trademark of Cask Data, Inc. All rights reserved. + +Apache, Apache HBase, and HBase are trademarks of The Apache Software Foundation. Used with +permission. No endorsement by The Apache Software Foundation is implied by the use of these marks. diff --git a/kafka-plugins-0.9/docs/KAFKAWRITER-SINK.md b/kafka-plugins-0.10/docs/KAFKAWRITER-SINK.md similarity index 93% rename from kafka-plugins-0.9/docs/KAFKAWRITER-SINK.md rename to kafka-plugins-0.10/docs/KAFKAWRITER-SINK.md index 54374a2..99273c4 100644 --- a/kafka-plugins-0.9/docs/KAFKAWRITER-SINK.md +++ b/kafka-plugins-0.10/docs/KAFKAWRITER-SINK.md @@ -12,14 +12,14 @@ The sink also allows you to write events into kerberos-enabled kafka. Usage Notes ----------- -Kafka sink emits events in realtime to configured kafka topic and partition. It uses kafka producer [0.8.2 apis](https://kafka.apache.org/082/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html) to write events into kafka. +Kafka sink emits events in realtime to configured kafka topic and partition. It uses kafka producer [0.10.2 apis](https://kafka.apache.org/0100/documentation.html) to write events into kafka. This sink can be configured to operate in synchronous or asynchronous mode. In synchronous mode, each event will be sent to the broker synchronously on the thread that calls it. This is not sufficient on most of the high volume environments. In async mode, the kafka producer will batch together all the kafka events for greater throughput. But that makes it open for the possibility of dropping unsent events in case of client machine failure. Since kafka producer by default uses synchronous mode, this sink also uses Synchronous producer by default. It uses String partitioner and String serializer for key and value to write events to kafka. Optionally if kafka key is provided, producer will use that key to partition events accross multiple partitions in a given topic. This sink also allows compression configuration. By default compression is none. -Kafka producer can be tuned using many properties as shown [here](https://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/ProducerConfig.html). This sink allows user to configure any property supported by kafka 0.8.2 Producer. +Kafka producer can be tuned using many properties as shown [here](https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/ProducerConfig.html). This sink allows user to configure any property supported by kafka 0.10.2.0 Producer. Plugin Configuration diff --git a/kafka-plugins-0.10/docs/Kafka-alert-publisher.md b/kafka-plugins-0.10/docs/Kafka-alert-publisher.md new file mode 100644 index 0000000..2d53d74 --- /dev/null +++ b/kafka-plugins-0.10/docs/Kafka-alert-publisher.md @@ -0,0 +1,63 @@ +# kafka-alert-plugin + +Join CDAP community [![Build Status](https://travis-ci.org/hydrator/kafka-alert-plugin.svg?branch=master)](https://travis-ci.org/hydrator/kafka-alert-plugin) [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) []() + +Kafka Alert Publisher that allows you to publish alerts to kafka as json objects. The plugin internally uses kafka producer apis to publish alerts. +The plugin allows to specify kafka topic to use for publishing and other additional kafka producer properties. +This plugin uses kafka 0.10.2 java apis. + +Build +----- +To build this plugin: + +``` + mvn clean package +``` + +The build will create a .jar and .json file under the ``target`` directory. +These files can be used to deploy your plugins. + +Deployment +---------- +You can deploy your plugins using the CDAP CLI: + + > load artifact .jar config-file .json> + +For example, if your artifact is named 'kafka-alert-plugin-': + + > load artifact target/kafka-alert-plugin-.jar config-file target/kafka-alert-plugin-.json + +## Mailing Lists + +CDAP User Group and Development Discussions: + +* `cdap-user@googlegroups.com ` + +The *cdap-user* mailing list is primarily for users using the product to develop +applications or building plugins for appplications. You can expect questions from +users, release announcements, and any other discussions that we think will be helpful +to the users. + +## IRC Channel + +CDAP IRC Channel: #cdap on irc.freenode.net + + +## License and Trademarks + +Copyright © 2018 Cask Data, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +in compliance with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software distributed under the +License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +either express or implied. See the License for the specific language governing permissions +and limitations under the License. + +Cask is a trademark of Cask Data, Inc. All rights reserved. + +Apache, Apache HBase, and HBase are trademarks of The Apache Software Foundation. Used with +permission. No endorsement by The Apache Software Foundation is implied by the use of these marks. diff --git a/kafka-plugins-0.9/docs/Kafka-batchsink.md b/kafka-plugins-0.10/docs/Kafka-batchsink.md similarity index 96% rename from kafka-plugins-0.9/docs/Kafka-batchsink.md rename to kafka-plugins-0.10/docs/Kafka-batchsink.md index 18ec604..d966a3e 100644 --- a/kafka-plugins-0.9/docs/Kafka-batchsink.md +++ b/kafka-plugins-0.10/docs/Kafka-batchsink.md @@ -7,7 +7,7 @@ Kafka sink that allows you to write events into CSV or JSON to kafka. Plugin has the capability to push the data to a Kafka topic. It can also be configured to partition events being written to kafka based on a configurable key. The sink can also be configured to operate in sync or async mode and apply different -compression types to events. Kafka sink is compatible with Kafka 0.9 and 0.10 +compression types to events. This plugin uses kafka 0.10.2 java apis. Configuration @@ -55,4 +55,4 @@ Additional properties like number of acknowledgements and client id can also be "kafkaProperties": "acks:2,client.id:myclient", "key": "message" } - } \ No newline at end of file + } diff --git a/kafka-plugins-0.9/docs/Kafka-batchsource.md b/kafka-plugins-0.10/docs/Kafka-batchsource.md similarity index 98% rename from kafka-plugins-0.9/docs/Kafka-batchsource.md rename to kafka-plugins-0.10/docs/Kafka-batchsource.md index 48714f4..8242579 100644 --- a/kafka-plugins-0.9/docs/Kafka-batchsource.md +++ b/kafka-plugins-0.10/docs/Kafka-batchsource.md @@ -7,7 +7,7 @@ Kafka batch source. Emits the record from kafka. It will emit a record based on you use, or if no schema or format is specified, the message payload will be emitted. The source will remember the offset it read last run and continue from that offset for the next run. The Kafka batch source supports providing additional kafka properties for the kafka consumer, -reading from kerberos-enabled kafka and limiting the number of records read +reading from kerberos-enabled kafka and limiting the number of records read. This plugin uses kafka 0.10.2 java apis. Use Case -------- @@ -106,4 +106,3 @@ For each Kafka message read, it will output a record with the schema: | count | int | | price | double | +================================+ - \ No newline at end of file diff --git a/kafka-plugins-0.10/docs/Kafka-streamingsource.md b/kafka-plugins-0.10/docs/Kafka-streamingsource.md new file mode 100644 index 0000000..51fffdb --- /dev/null +++ b/kafka-plugins-0.10/docs/Kafka-streamingsource.md @@ -0,0 +1,118 @@ +# Kafka Streaming Source + + +Description +----------- +Kafka streaming source. Emits a record with the schema specified by the user. If no schema +is specified, it will emit a record with two fields: 'key' (nullable string) and 'message' +(bytes). This plugin uses kafka 0.10.2 java apis. + + +Use Case +-------- +This source is used whenever you want to read from Kafka. For example, you may want to read messages +from Kafka and write them to a Table. + + +Properties +---------- +**referenceName:** This will be used to uniquely identify this source for lineage, annotating metadata, etc. + +**brokers:** List of Kafka brokers specified in host1:port1,host2:port2 form. (Macro-enabled) + +**topic:** The Kafka topic to read from. (Macro-enabled) + +**partitions:** List of topic partitions to read from. If not specified, all partitions will be read. (Macro-enabled) + +**defaultInitialOffset:** The default initial offset for all topic partitions. +An offset of -2 means the smallest offset. An offset of -1 means the latest offset. Defaults to -1. +Offsets are inclusive. If an offset of 5 is used, the message at offset 5 will be read. +If you wish to set different initial offsets for different partitions, use the initialPartitionOffsets property. (Macro-enabled) + +**initialPartitionOffsets:** The initial offset for each topic partition. If this is not specified, +all partitions will use the same initial offset, which is determined by the defaultInitialOffset property. +Any partitions specified in the partitions property, but not in this property will use the defaultInitialOffset. +An offset of -2 means the smallest offset. An offset of -1 means the latest offset. +Offsets are inclusive. If an offset of 5 is used, the message at offset 5 will be read. (Macro-enabled) + +**schema:** Output schema of the source. If you would like the output records to contain a field with the +Kafka message key, the schema must include a field of type bytes or nullable bytes, and you must set the +keyField property to that field's name. Similarly, if you would like the output records to contain a field with +the timestamp of when the record was read, the schema must include a field of type long or nullable long, and you +must set the timeField property to that field's name. Any field that is not the timeField or keyField will be used +in conjuction with the format to parse Kafka message payloads. + +**format:** Optional format of the Kafka event message. Any format supported by CDAP is supported. +For example, a value of 'csv' will attempt to parse Kafka payloads as comma-separated values. +If no format is given, Kafka message payloads will be treated as bytes. + +**timeField:** Optional name of the field containing the read time of the batch. +If this is not set, no time field will be added to output records. +If set, this field must be present in the schema property and must be a long. + +**keyField:** Optional name of the field containing the message key. +If this is not set, no key field will be added to output records. +If set, this field must be present in the schema property and must be bytes. + +**partitionField:** Optional name of the field containing the partition the message was read from. +If this is not set, no partition field will be added to output records. +If set, this field must be present in the schema property and must be an int. + +**offsetField:** Optional name of the field containing the partition offset the message was read from. +If this is not set, no offset field will be added to output records. +If set, this field must be present in the schema property and must be a long. + +**maxRatePerPartition:** Maximum number of records to read per second per partition. Defaults to 1000. + +**principal** The kerberos principal used for the source when kerberos security is enabled for kafka. + +**keytabLocation** The keytab location for the kerberos principal when kerberos security is enabled for kafka. + +Example +------- +This example reads from the 'purchases' topic of a Kafka instance running +on brokers host1.example.com:9092 and host2.example.com:9092. The source will add +a time field named 'readTime' that contains a timestamp corresponding to the micro +batch when the record was read. It will also contain a field named 'key' which will have +the message key in it. It parses the Kafka messages using the 'csv' format +with 'user', 'item', 'count', and 'price' as the message schema. + + { + "name": "Kafka", + "type": "streamingsource", + "properties": { + "topics": "purchases", + "brokers": "host1.example.com:9092,host2.example.com:9092", + "format": "csv", + "timeField": "readTime", + "keyField": "key", + "schema": "{ + \"type\":\"record\", + \"name\":\"purchase\", + \"fields\":[ + {\"name\":\"readTime\",\"type\":\"long\"}, + {\"name\":\"key\",\"type\":\"bytes\"}, + {\"name\":\"user\",\"type\":\"string\"}, + {\"name\":\"item\",\"type\":\"string\"}, + {\"name\":\"count\",\"type\":\"int\"}, + {\"name\":\"price\",\"type\":\"double\"} + ] + }" + } + } + +For each Kafka message read, it will output a record with the schema: + + +================================+ + | field name | type | + +================================+ + | readTime | long | + | key | bytes | + | user | string | + | item | string | + | count | int | + | price | double | + +================================+ + +Note that the readTime field is not derived from the Kafka message, but from the time that the +message was read. diff --git a/kafka-plugins-0.10/docs/KafkaAlerts-alertpublisher.md b/kafka-plugins-0.10/docs/KafkaAlerts-alertpublisher.md new file mode 100644 index 0000000..47c3f13 --- /dev/null +++ b/kafka-plugins-0.10/docs/KafkaAlerts-alertpublisher.md @@ -0,0 +1,39 @@ +# Kafka Alert Publisher + + +Description +----------- +Kafka Alert Publisher that allows you to publish alerts to kafka as json objects. +The plugin internally uses kafka producer apis to publish alerts. +The plugin allows to specify kafka topic to use for publishing and other additional +kafka producer properties. This plugin uses kafka 0.10.2 java apis. + + +Configuration +------------- +**brokers:** List of Kafka brokers specified in host1:port1,host2:port2 form. + +**topic:** The Kafka topic to write to. This topic should already exist in kafka. + +**producerProperties** Specifies additional kafka producer properties like acks, client.id as key and value pair. + +**Kerberos Principal** The kerberos principal used for the source when kerberos security is enabled for kafka. + +**Keytab Location** The keytab location for the kerberos principal when kerberos security is enabled for kafka. + +Example +------- +This example publishes alerts to already existing kafka topic alarm as json objects. +The kafka broker is running at localhost and port 9092. Additional kafka producer properties +are like acks and client.id are specified as well. + + + { + "name": "Kafka", + "type": "alertpublisher", + "properties": { + "brokers": "localhost:9092", + "topic": "alarm", + "producerProperties": "acks:2,client.id:myclient" + } + } diff --git a/kafka-plugins-0.9/docs/kafka-batch-source-plugins-config.png b/kafka-plugins-0.10/docs/kafka-batch-source-plugins-config.png similarity index 100% rename from kafka-plugins-0.9/docs/kafka-batch-source-plugins-config.png rename to kafka-plugins-0.10/docs/kafka-batch-source-plugins-config.png diff --git a/kafka-plugins-0.9/docs/kafka-sink-plugin-config.png b/kafka-plugins-0.10/docs/kafka-sink-plugin-config.png similarity index 100% rename from kafka-plugins-0.9/docs/kafka-sink-plugin-config.png rename to kafka-plugins-0.10/docs/kafka-sink-plugin-config.png diff --git a/kafka-plugins-0.10/docs/kafka-source-plugin-config.png b/kafka-plugins-0.10/docs/kafka-source-plugin-config.png new file mode 100644 index 0000000..c9b36bd Binary files /dev/null and b/kafka-plugins-0.10/docs/kafka-source-plugin-config.png differ diff --git a/kafka-plugins-0.9/icons/Kafka-batchsink.png b/kafka-plugins-0.10/icons/Kafka-batchsink.png similarity index 100% rename from kafka-plugins-0.9/icons/Kafka-batchsink.png rename to kafka-plugins-0.10/icons/Kafka-batchsink.png diff --git a/kafka-plugins-0.9/icons/Kafka-batchsource.png b/kafka-plugins-0.10/icons/Kafka-batchsource.png similarity index 100% rename from kafka-plugins-0.9/icons/Kafka-batchsource.png rename to kafka-plugins-0.10/icons/Kafka-batchsource.png diff --git a/kafka-plugins-0.10/icons/KafkaAlerts-alertpublisher.png b/kafka-plugins-0.10/icons/KafkaAlerts-alertpublisher.png new file mode 100644 index 0000000..041cdb4 Binary files /dev/null and b/kafka-plugins-0.10/icons/KafkaAlerts-alertpublisher.png differ diff --git a/kafka-plugins-0.10/pom.xml b/kafka-plugins-0.10/pom.xml new file mode 100644 index 0000000..a579aa8 --- /dev/null +++ b/kafka-plugins-0.10/pom.xml @@ -0,0 +1,172 @@ + + + + kafka-plugins + co.cask.hydrator + 1.8.2-SNAPSHOT + + 4.0.0 + + Apache Kafka 0.10 plugins + kafka-plugins + 1.8.2-SNAPSHOT-0.10.2.0 + + + + org.apache.kafka + kafka_2.11 + ${kafka10.version} + + + org.slf4j + slf4j-log4j12 + + + + + org.apache.spark + spark-streaming-kafka-0-10_2.11 + ${spark2.version} + + + org.apache.kafka + kafka_2.11 + + + org.apache.spark + spark-tags_2.11 + + + + + org.apache.spark + spark-mllib_2.11 + ${spark2.version} + provided + + + org.apache.xbean + xbean-asm5-shaded + + + + + org.apache.spark + spark-streaming_2.11 + ${spark2.version} + provided + + + org.apache.spark + spark-core_2.11 + ${spark2.version} + provided + + + org.slf4j + slf4j-log4j12 + + + log4j + log4j + + + org.apache.hadoop + hadoop-client + + + com.esotericsoftware.reflectasm + reflectasm + + + org.apache.curator + curator-recipes + + + org.tachyonproject + tachyon-client + + + org.scala-lang + scala-compiler + + + org.eclipse.jetty.orbit + javax.servlet + + + + net.java.dev.jets3t + jets3t + + + org.apache.xbean + xbean-asm5-shaded + + + + + co.cask.cdap + cdap-spark-core2_2.11 + ${cdap.version} + test + + + co.cask.cdap + cdap-data-pipeline2_2.11 + ${cdap.version} + test + + + co.cask.cdap + cdap-data-streams2_2.11 + ${cdap.version} + test + + + + + + + org.apache.felix + maven-bundle-plugin + 3.3.0 + + + <_exportcontents>co.cask.hydrator.plugin.*;org.apache.spark.streaming.kafka010.*; + org.apache.kafka.*;com.google.common.base.*; + *;inline=false;scope=compile + true + lib + + + + + co.cask + cdap-maven-plugin + 1.0.0 + + + system:cdap-data-pipeline[4.3.0-SNAPSHOT,6.0.0-SNAPSHOT) + system:cdap-data-streams[4.3.0-SNAPSHOT,6.0.0-SNAPSHOT) + + + + + create-artifact-config + prepare-package + + create-plugin-json + + + + + + + + diff --git a/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/alertpublisher/KafkaAlertPublisher.java b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/alertpublisher/KafkaAlertPublisher.java new file mode 100644 index 0000000..5f71edd --- /dev/null +++ b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/alertpublisher/KafkaAlertPublisher.java @@ -0,0 +1,177 @@ +/* + * Copyright © 2018 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package co.cask.hydrator.plugin.alertpublisher; + +import co.cask.cdap.api.annotation.Description; +import co.cask.cdap.api.annotation.Macro; +import co.cask.cdap.api.annotation.Name; +import co.cask.cdap.api.annotation.Plugin; +import co.cask.cdap.api.dataset.lib.KeyValue; +import co.cask.cdap.api.plugin.PluginConfig; +import co.cask.cdap.etl.api.Alert; +import co.cask.cdap.etl.api.AlertPublisher; +import co.cask.cdap.etl.api.AlertPublisherContext; +import co.cask.cdap.etl.api.PipelineConfigurer; +import co.cask.hydrator.common.KeyValueListParser; +import co.cask.hydrator.plugin.common.KafkaHelpers; +import com.google.common.base.Strings; +import com.google.gson.Gson; +import kafka.common.Topic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; + +/** + * Kafka Alert Publisher + */ +@Plugin(type = AlertPublisher.PLUGIN_TYPE) +@Name("KafkaAlerts") +public class KafkaAlertPublisher extends AlertPublisher { + private static final Logger LOG = LoggerFactory.getLogger(KafkaAlertPublisher.class); + private static final Gson GSON = new Gson(); + private final Config config; + + private KafkaProducer producer; + + public KafkaAlertPublisher(Config config) { + this.config = config; + } + + @Override + public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException { + config.validate(); + } + + @Override + public void initialize(AlertPublisherContext context) throws Exception { + super.initialize(context); + config.validate(); + Properties props = new Properties(); + // Add client id property with stage name as value. + props.put(ProducerConfig.CLIENT_ID_CONFIG, context.getStageName()); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put("producer.type", "sync"); + + // Override any property set above with user specified producer properties + for (Map.Entry producerProperty : config.getProducerProperties().entrySet()) { + props.put(producerProperty.getKey(), producerProperty.getValue()); + } + + this.producer = new KafkaProducer<>(props); + } + + @Override + public void publish(Iterator iterator) throws Exception { + while (iterator.hasNext()) { + String alert = GSON.toJson(iterator.next()); + try { + // We do not specify key here. So the topic partitions will be chosen in round robin fashion. + ProducerRecord record = new ProducerRecord<>(config.topic, alert); + producer.send(record); + } catch (Exception e) { + // catch the exception and continue processing rest of the alerts + LOG.error("Exception while emitting alert {}", alert, e); + } + + } + } + + @Override + public void destroy() { + super.destroy(); + producer.close(); + } + + /** + * Kafka Producer Configuration. + */ + public static class Config extends PluginConfig { + + @Name("brokers") + @Description("Specifies the connection string where Producer can find one or more brokers to " + + "determine the leader for each topic.") + @Macro + private String brokers; + + @Name("topic") + @Description("Topic to which message needs to be published. The topic should already exist on kafka.") + @Macro + private String topic; + + @Name("producerProperties") + @Nullable + @Description("Additional kafka producer properties to set.") + private String producerProperties; + + @Description("The kerberos principal used for the source when kerberos security is enabled for kafka.") + @Macro + @Nullable + private String principal; + + @Description("The keytab location for the kerberos principal when kerberos security is enabled for kafka.") + @Macro + @Nullable + private String keytabLocation; + + public Config(String brokers, String topic, String producerProperties) { + this.brokers = brokers; + this.topic = topic; + this.producerProperties = producerProperties; + } + + private Map getProducerProperties() { + KeyValueListParser kvParser = new KeyValueListParser("\\s*,\\s*", ":"); + Map producerProps = new HashMap<>(); + if (!Strings.isNullOrEmpty(producerProperties)) { + for (KeyValue keyVal : kvParser.parse(producerProperties)) { + String key = keyVal.getKey(); + String val = keyVal.getValue(); + producerProps.put(key, val); + } + } + return producerProps; + } + + private void validate() { + // If the topic or brokers are macros they would not be available at config time. So do not perform + // validations yet. + if (Strings.isNullOrEmpty(topic) || Strings.isNullOrEmpty(brokers)) { + return; + } + + try { + Topic.validate(topic); + } catch (InvalidTopicException e) { + throw new IllegalArgumentException(String.format("Topic name %s is not a valid kafka topic. Please provide " + + "valid kafka topic name. %s", topic, e.getMessage())); + } + + KafkaHelpers.validateKerberosSetting(principal, keytabLocation); + } + } +} diff --git a/kafka-plugins-0.9/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaBatchSource.java b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaBatchSource.java similarity index 96% rename from kafka-plugins-0.9/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaBatchSource.java rename to kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaBatchSource.java index 5855fa7..3eda57e 100644 --- a/kafka-plugins-0.9/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaBatchSource.java +++ b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaBatchSource.java @@ -41,6 +41,7 @@ import co.cask.hydrator.common.ReferencePluginConfig; import co.cask.hydrator.common.SourceInputFormatProvider; import co.cask.hydrator.common.batch.JobUtils; +import co.cask.hydrator.plugin.common.KafkaHelpers; import com.google.common.base.Joiner; import com.google.common.base.Splitter; import com.google.common.base.Strings; @@ -49,6 +50,7 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.kafka.clients.consumer.ConsumerConfig; +import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -56,8 +58,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import javax.annotation.Nullable; - /** * Kafka batch source. */ @@ -397,6 +397,8 @@ public void validate() { format, messageSchema, e.getMessage()), e); } } + + KafkaHelpers.validateKerberosSetting(principal, keytabLocation); } } @@ -422,18 +424,13 @@ public void prepareRun(BatchSourceContext context) throws Exception { context.createDataset(tableName, KeyValueTable.class.getName(), DatasetProperties.EMPTY); } table = context.getDataset(tableName); + Map kafkaConf = new HashMap<>(); kafkaConf.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBrokers()); + // We save offsets in datasets, no need for Kafka to save them + kafkaConf.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + KafkaHelpers.setupKerberosLogin(kafkaConf, config.getPrincipal(), config.getKeytabLocation()); kafkaConf.putAll(config.getKafkaProperties()); - if (config.getKeytabLocation() != null && config.getPrincipal() != null) { - kafkaConf.put("sasl.jaas.config", String.format("com.sun.security.auth.module.Krb5LoginModule required \n" + - " useKeyTab=true \n" + - " storeKey=true \n" + - " useTicketCache=false \n" + - " keyTab=\"%s\" \n" + - " principal=\"%s\";", config.getKeytabLocation(), - config.getPrincipal())); - } kafkaRequests = KafkaInputFormat.saveKafkaRequests(conf, config.getTopic(), kafkaConf, config.getPartitions(), config.getInitialPartitionOffsets(), config.getMaxNumberRecords(), table); @@ -455,6 +452,7 @@ public void onRunFinish(boolean succeeded, BatchSourceContext context) { @Override public void initialize(BatchRuntimeContext context) throws Exception { super.initialize(context); + schema = config.getSchema(); Schema messageSchema = config.getMessageSchema(); for (Schema.Field field : schema.getFields()) { diff --git a/kafka-plugins-0.9/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaInputFormat.java b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaInputFormat.java similarity index 76% rename from kafka-plugins-0.9/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaInputFormat.java rename to kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaInputFormat.java index aaa1e39..1aa4265 100644 --- a/kafka-plugins-0.9/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaInputFormat.java +++ b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaInputFormat.java @@ -18,6 +18,7 @@ import co.cask.cdap.api.common.Bytes; import co.cask.cdap.api.dataset.lib.KeyValueTable; +import co.cask.hydrator.plugin.common.KafkaHelpers; import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.collect.Collections2; @@ -40,11 +41,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.lang.reflect.Type; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -61,14 +60,13 @@ public class KafkaInputFormat extends InputFormat { private static final Type LIST_TYPE = new TypeToken>() { }.getType(); @Override - public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { + public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) { return new KafkaRecordReader(); } @Override - public List getSplits(JobContext context) throws IOException, InterruptedException { + public List getSplits(JobContext context) { Gson gson = new Gson(); List finalRequests = gson.fromJson(context.getConfiguration().get(KAFKA_REQUEST), LIST_TYPE); List kafkaSplits = new ArrayList<>(); @@ -84,12 +82,13 @@ public List getSplits(JobContext context) throws IOException, Interr static List saveKafkaRequests(Configuration conf, String topic, Map kafkaConf, final Set partitions, Map initOffsets, - long maxNumberRecords, KeyValueTable table) throws Exception { + long maxNumberRecords, KeyValueTable table) { Properties properties = new Properties(); properties.putAll(kafkaConf); - try (Consumer consumer = new KafkaConsumer<>(properties, new ByteArrayDeserializer(), new ByteArrayDeserializer())) { + try (Consumer consumer = + new KafkaConsumer<>(properties, new ByteArrayDeserializer(), new ByteArrayDeserializer())) { // Get Metadata for all topics - @SuppressWarnings("unchecked") List partitionInfos = consumer.partitionsFor(topic); + List partitionInfos = consumer.partitionsFor(topic); if (!partitions.isEmpty()) { Collection filteredPartitionInfos = Collections2.filter(partitionInfos, @@ -111,7 +110,8 @@ public boolean apply(PartitionInfo input) { } } - private static List createKafkaRequests(Consumer consumer, Map kafkaConf, + private static List createKafkaRequests(Consumer consumer, + Map kafkaConf, List partitionInfos, Map offsets, long maxNumberRecords, KeyValueTable table) { @@ -123,8 +123,8 @@ public TopicPartition apply(PartitionInfo input) { return new TopicPartition(input.topic(), input.partition()); } }); - Map latestOffsets = getLatestOffsets(consumer, topicPartitions); - Map earliestOffsets = getEarliestOffsets(consumer, topicPartitions); + Map latestOffsets = KafkaHelpers.getLatestOffsets(consumer, topicPartitions); + Map earliestOffsets = KafkaHelpers.getEarliestOffsets(consumer, topicPartitions); List requests = new ArrayList<>(); for (PartitionInfo partitionInfo : partitionInfos) { @@ -157,34 +157,4 @@ public TopicPartition apply(PartitionInfo input) { } return requests; } - - private static Map getLatestOffsets(Consumer consumer, - List topicAndPartitions) { - consumer.assign(topicAndPartitions); - for (TopicPartition topicPartition : topicAndPartitions) { - consumer.seekToEnd(topicPartition); - } - - Map offsets = new HashMap<>(); - for (TopicPartition topicAndPartition : topicAndPartitions) { - long offset = consumer.position(topicAndPartition); - offsets.put(topicAndPartition, offset); - } - return offsets; - } - - private static Map getEarliestOffsets(Consumer consumer, - List topicAndPartitions) { - consumer.assign(topicAndPartitions); - for (TopicPartition topicPartition : topicAndPartitions) { - consumer.seekToBeginning(topicPartition); - } - - Map offsets = new HashMap<>(); - for (TopicPartition topicAndPartition : topicAndPartitions) { - long offset = consumer.position(topicAndPartition); - offsets.put(topicAndPartition, offset); - } - return offsets; - } } diff --git a/kafka-plugins-0.9/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaKey.java b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaKey.java similarity index 100% rename from kafka-plugins-0.9/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaKey.java rename to kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaKey.java diff --git a/kafka-plugins-0.9/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaMessage.java b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaMessage.java similarity index 100% rename from kafka-plugins-0.9/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaMessage.java rename to kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaMessage.java diff --git a/kafka-plugins-0.9/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaReader.java b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaReader.java similarity index 100% rename from kafka-plugins-0.9/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaReader.java rename to kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaReader.java diff --git a/kafka-plugins-0.9/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaRecordReader.java b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaRecordReader.java similarity index 100% rename from kafka-plugins-0.9/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaRecordReader.java rename to kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaRecordReader.java diff --git a/kafka-plugins-0.9/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaRequest.java b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaRequest.java similarity index 100% rename from kafka-plugins-0.9/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaRequest.java rename to kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaRequest.java diff --git a/kafka-plugins-0.9/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaSplit.java b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaSplit.java similarity index 100% rename from kafka-plugins-0.9/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaSplit.java rename to kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batchSource/KafkaSplit.java diff --git a/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/common/KafkaHelpers.java b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/common/KafkaHelpers.java new file mode 100644 index 0000000..3ea2704 --- /dev/null +++ b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/common/KafkaHelpers.java @@ -0,0 +1,123 @@ +/* + * Copyright © 2018 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package co.cask.hydrator.plugin.common; + +import com.google.common.base.Strings; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; + +/** + * Utility class for Kafka operations + */ +public final class KafkaHelpers { + private static final Logger LOG = LoggerFactory.getLogger(KafkaHelpers.class); + public static final String SASL_JAAS_CONFIG = "sasl.jaas.config"; + + // This class cannot be instantiated + private KafkaHelpers() { + } + + /** + * Fetch the latest offsets for the given topic-partitions + * + * @param consumer The Kafka consumer + * @param topicAndPartitions topic-partitions to fetch the offsets for + * @return Mapping of topic-partiton to its latest offset + */ + public static Map getLatestOffsets(Consumer consumer, + List topicAndPartitions) { + consumer.assign(topicAndPartitions); + consumer.seekToEnd(topicAndPartitions); + + Map offsets = new HashMap<>(); + for (TopicPartition topicAndPartition : topicAndPartitions) { + long offset = consumer.position(topicAndPartition); + offsets.put(topicAndPartition, offset); + } + return offsets; + } + + /** + * Fetch the earliest offsets for the given topic-partitions + * + * @param consumer The Kafka consumer + * @param topicAndPartitions topic-partitions to fetch the offsets for + * @return Mapping of topic-partiton to its earliest offset + */ + public static Map getEarliestOffsets(Consumer consumer, + List topicAndPartitions) { + consumer.assign(topicAndPartitions); + consumer.seekToBeginning(topicAndPartitions); + + Map offsets = new HashMap<>(); + for (TopicPartition topicAndPartition : topicAndPartitions) { + long offset = consumer.position(topicAndPartition); + offsets.put(topicAndPartition, offset); + } + return offsets; + } + + /** + * Adds the JAAS conf to the Kafka configuration object for Kafka client login, if needed. + * The JAAS conf is not added if either the principal or the keytab is null. + * + * @param conf Kafka configuration object to add the JAAS conf to + * @param principal Kerberos principal + * @param keytabLocation Kerberos keytab for the principal + */ + public static void setupKerberosLogin(Map conf, @Nullable String principal, + @Nullable String keytabLocation) { + if (principal != null && keytabLocation != null) { + LOG.debug("Adding Kerberos login conf to Kafka for principal {} and keytab {}", + principal, keytabLocation); + conf.put(SASL_JAAS_CONFIG, String.format("com.sun.security.auth.module.Krb5LoginModule required \n" + + " useKeyTab=true \n" + + " storeKey=true \n" + + " useTicketCache=false \n" + + " renewTicket=true \n" + + " keyTab=\"%s\" \n" + + " principal=\"%s\";", + keytabLocation, principal)); + } else { + LOG.debug("Not adding Kerberos login conf to Kafka since either the principal {} or the keytab {} is null", + principal, keytabLocation); + } + } + + /** + * Validates whether the principal and keytab are both set or both of them are null/empty + * + * @param principal Kerberos principal + * @param keytab Kerberos keytab for the principal + */ + public static void validateKerberosSetting(@Nullable String principal, @Nullable String keytab) { + if (Strings.isNullOrEmpty(principal) != Strings.isNullOrEmpty(keytab)) { + String emptyField = Strings.isNullOrEmpty(principal) ? "principal" : "keytab"; + String message = emptyField + " is empty. When Kerberos security is enabled for Kafka, " + + "then both the principal and the keytab have " + + "to be specified. If Kerberos is not enabled, then both should be empty."; + throw new IllegalArgumentException(message); + } + } +} diff --git a/kafka-plugins-0.9/src/main/java/co/cask/hydrator/plugin/sink/Kafka.java b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/sink/KafkaBatchSink.java similarity index 86% rename from kafka-plugins-0.9/src/main/java/co/cask/hydrator/plugin/sink/Kafka.java rename to kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/sink/KafkaBatchSink.java index db14de8..f97d6f9 100644 --- a/kafka-plugins-0.9/src/main/java/co/cask/hydrator/plugin/sink/Kafka.java +++ b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/sink/KafkaBatchSink.java @@ -33,17 +33,20 @@ import co.cask.hydrator.common.KeyValueListParser; import co.cask.hydrator.common.ReferenceBatchSink; import co.cask.hydrator.common.ReferencePluginConfig; +import co.cask.hydrator.plugin.common.KafkaHelpers; import com.google.common.base.Strings; import com.google.common.collect.Lists; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.io.Text; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.util.HashMap; import java.util.List; import java.util.Map; -import javax.annotation.Nullable; /** * Kafka sink to write to Kafka @@ -51,8 +54,8 @@ @Plugin(type = BatchSink.PLUGIN_TYPE) @Name("Kafka") @Description("KafkaSink to write events to kafka") -public class Kafka extends ReferenceBatchSink { - private static final Logger LOG = LoggerFactory.getLogger(Kafka.class); +public class KafkaBatchSink extends ReferenceBatchSink { + private static final Logger LOG = LoggerFactory.getLogger(KafkaBatchSink.class); // Configuration for the plugin. private final Config producerConfig; @@ -60,10 +63,9 @@ public class Kafka extends ReferenceBatchSink { private final KafkaOutputFormatProvider kafkaOutputFormatProvider; // Static constants for configuring Kafka producer. - private static final String BROKER_LIST = "bootstrap.servers"; private static final String ACKS_REQUIRED = "acks"; - public Kafka(Config producerConfig) { + public KafkaBatchSink(Config producerConfig) { super(producerConfig); this.producerConfig = producerConfig; this.kafkaOutputFormatProvider = new KafkaOutputFormatProvider(producerConfig); @@ -73,9 +75,7 @@ public Kafka(Config producerConfig) { public void configurePipeline(PipelineConfigurer pipelineConfigurer) { super.configurePipeline(pipelineConfigurer); - if (!producerConfig.async.equalsIgnoreCase("true") && !producerConfig.async.equalsIgnoreCase("false")) { - throw new IllegalArgumentException("Async flag has to be either TRUE or FALSE."); - } + producerConfig.validate(); } @Override @@ -169,7 +169,7 @@ public static class Config extends ReferencePluginConfig { private String keytabLocation; @Name("compressionType") - @Description("Additional kafka producer properties to set") + @Description("Compression type to be applied on message") @Macro private String compressionType; @@ -184,6 +184,14 @@ public Config(String brokers, String async, String key, String topic, String for this.kafkaProperties = kafkaProperties; this.compressionType = compressionType; } + + private void validate() { + if (!async.equalsIgnoreCase("true") && !async.equalsIgnoreCase("false")) { + throw new IllegalArgumentException("Async flag has to be either TRUE or FALSE."); + } + + KafkaHelpers.validateKerberosSetting(principal, keytabLocation); + } } private static class KafkaOutputFormatProvider implements OutputFormatProvider { @@ -193,21 +201,13 @@ private static class KafkaOutputFormatProvider implements OutputFormatProvider { this.conf = new HashMap<>(); conf.put("topic", kafkaSinkConfig.topic); - conf.put(BROKER_LIST, kafkaSinkConfig.brokers); + conf.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSinkConfig.brokers); conf.put("compression.type", kafkaSinkConfig.compressionType); + conf.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName()); + conf.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName()); - + KafkaHelpers.setupKerberosLogin(conf, kafkaSinkConfig.principal, kafkaSinkConfig.keytabLocation); addKafkaProperties(kafkaSinkConfig.kafkaProperties); - if (kafkaSinkConfig.principal != null && kafkaSinkConfig.keytabLocation != null) { - conf.put("additional." + "sasl.jaas.config", - String.format("com.sun.security.auth.module.Krb5LoginModule required \n" + - " useKeyTab=true \n" + - " storeKey=true \n" + - " useTicketCache=false \n" + - " keyTab=\"%s\" \n" + - " principal=\"%s\";", kafkaSinkConfig.keytabLocation, - kafkaSinkConfig.principal)); - } conf.put("async", kafkaSinkConfig.async); if (kafkaSinkConfig.async.equalsIgnoreCase("true")) { diff --git a/kafka-plugins-0.9/src/main/java/co/cask/hydrator/plugin/sink/KafkaOutputFormat.java b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/sink/KafkaOutputFormat.java similarity index 84% rename from kafka-plugins-0.9/src/main/java/co/cask/hydrator/plugin/sink/KafkaOutputFormat.java rename to kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/sink/KafkaOutputFormat.java index 7dc52d6..5578629 100644 --- a/kafka-plugins-0.9/src/main/java/co/cask/hydrator/plugin/sink/KafkaOutputFormat.java +++ b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/sink/KafkaOutputFormat.java @@ -16,6 +16,7 @@ package co.cask.hydrator.plugin.sink; +import co.cask.hydrator.plugin.common.KafkaHelpers; import com.google.common.base.Strings; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; @@ -26,7 +27,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,13 +40,8 @@ public class KafkaOutputFormat extends OutputFormat { private static final Logger LOG = LoggerFactory.getLogger(KafkaOutputFormat.class); - // Static constants for configuring Kafka producer. - private static final String BROKER_LIST = "bootstrap.servers"; - private static final String KEY_SERIALIZER = "key.serializer"; - private static final String VAL_SERIALIZER = "value.serializer"; private KafkaProducer producer; - @Override public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException { } @@ -92,7 +87,12 @@ public RecordWriter getRecordWriter(TaskAttemptContext context) Properties props = new Properties(); // Configure the properties for kafka. - props.put(BROKER_LIST, configuration.get(BROKER_LIST)); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + configuration.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + configuration.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + configuration.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)); props.put("compression.type", configuration.get("compression.type")); if (!Strings.isNullOrEmpty(configuration.get("hasKey"))) { @@ -115,10 +115,14 @@ public RecordWriter getRecordWriter(TaskAttemptContext context) LOG.info("Property key: {}, value: {}", entry.getKey().substring(11), entry.getValue()); } + // Add Kerberos login information if any + if (!Strings.isNullOrEmpty(configuration.get(KafkaHelpers.SASL_JAAS_CONFIG))) { + props.put(KafkaHelpers.SASL_JAAS_CONFIG, configuration.get(KafkaHelpers.SASL_JAAS_CONFIG)); + } + // CDAP-9178: cached the producer object to avoid being created on every batch interval if (producer == null) { - producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props, new StringSerializer(), - new StringSerializer()); + producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); } return new KafkaRecordWriter(producer, topic); diff --git a/kafka-plugins-0.9/src/main/java/co/cask/hydrator/plugin/sink/KafkaRecordWriter.java b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/sink/KafkaRecordWriter.java similarity index 100% rename from kafka-plugins-0.9/src/main/java/co/cask/hydrator/plugin/sink/KafkaRecordWriter.java rename to kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/sink/KafkaRecordWriter.java diff --git a/kafka-plugins-0.9/src/main/java/co/cask/hydrator/plugin/sink/StringPartitioner.java b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/sink/StringPartitioner.java similarity index 100% rename from kafka-plugins-0.9/src/main/java/co/cask/hydrator/plugin/sink/StringPartitioner.java rename to kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/sink/StringPartitioner.java diff --git a/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/source/KafkaConfig.java b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/source/KafkaConfig.java new file mode 100644 index 0000000..fcbbe6e --- /dev/null +++ b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/source/KafkaConfig.java @@ -0,0 +1,408 @@ +/* + * Copyright © 2018 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package co.cask.hydrator.plugin.source; + +import co.cask.cdap.api.annotation.Description; +import co.cask.cdap.api.annotation.Macro; +import co.cask.cdap.api.data.format.FormatSpecification; +import co.cask.cdap.api.data.schema.Schema; +import co.cask.cdap.api.dataset.lib.KeyValue; +import co.cask.cdap.format.RecordFormats; +import co.cask.hydrator.common.KeyValueListParser; +import co.cask.hydrator.common.ReferencePluginConfig; +import co.cask.hydrator.plugin.common.KafkaHelpers; +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; +import com.google.common.base.Strings; +import org.apache.kafka.common.TopicPartition; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Conf for Kafka streaming source. + */ +@SuppressWarnings("unused") +public class KafkaConfig extends ReferencePluginConfig implements Serializable { + + private static final long serialVersionUID = 8069169417140954175L; + + @Description("List of Kafka brokers specified in host1:port1,host2:port2 form. For example, " + + "host1.example.com:9092,host2.example.com:9092.") + @Macro + private String brokers; + + @Description("Kafka topic to read from.") + @Macro + private String topic; + + @Description("The topic partitions to read from. If not specified, all partitions will be read.") + @Nullable + @Macro + private String partitions; + + @Description("The initial offset for each topic partition. If this is not specified, " + + "all partitions will have the same initial offset, which is determined by the defaultInitialOffset property. " + + "An offset of -2 means the smallest offset. An offset of -1 means the latest offset. " + + "Offsets are inclusive. If an offset of 5 is used, the message at offset 5 will be read.") + @Nullable + @Macro + private String initialPartitionOffsets; + + @Description("The default initial offset for all topic partitions. " + + "An offset of -2 means the smallest offset. An offset of -1 means the latest offset. Defaults to -1. " + + "Offsets are inclusive. If an offset of 5 is used, the message at offset 5 will be read. " + + "If you wish to set different initial offsets for different partitions, use the initialPartitionOffsets property.") + @Nullable + @Macro + private Long defaultInitialOffset; + + @Description("Output schema of the source, including the timeField and keyField. " + + "The fields excluding the timeField and keyField are used in conjunction with the format " + + "to parse Kafka payloads.") + private String schema; + + @Description("Optional format of the Kafka event. Any format supported by CDAP is supported. " + + "For example, a value of 'csv' will attempt to parse Kafka payloads as comma-separated values. " + + "If no format is given, Kafka message payloads will be treated as bytes.") + @Nullable + private String format; + + @Description("Optional name of the field containing the read time of the batch. " + + "If this is not set, no time field will be added to output records. " + + "If set, this field must be present in the schema property and must be a long.") + @Nullable + private String timeField; + + @Description("Optional name of the field containing the message key. " + + "If this is not set, no key field will be added to output records. " + + "If set, this field must be present in the schema property and must be bytes.") + @Nullable + private String keyField; + + @Description("Optional name of the field containing the kafka partition that was read from. " + + "If this is not set, no partition field will be added to output records. " + + "If set, this field must be present in the schema property and must be an integer.") + @Nullable + private String partitionField; + + @Description("Optional name of the field containing the kafka offset that the message was read from. " + + "If this is not set, no offset field will be added to output records. " + + "If set, this field must be present in the schema property and must be a long.") + @Nullable + private String offsetField; + + @Description("Max number of records to read per second per partition. 0 means there is no limit. Defaults to 1000.") + @Nullable + private Integer maxRatePerPartition; + + @Description("Additional kafka consumer properties to set.") + @Macro + @Nullable + private String kafkaProperties; + + @Description("The kerberos principal used for the source when kerberos security is enabled for kafka.") + @Macro + @Nullable + private String principal; + + @Description("The keytab location for the kerberos principal when kerberos security is enabled for kafka.") + @Macro + @Nullable + private String keytabLocation; + + public KafkaConfig() { + super(""); + defaultInitialOffset = -1L; + maxRatePerPartition = 1000; + } + + public String getTopic() { + return topic; + } + + public String getBrokers() { + return brokers; + } + + @Nullable + public String getTimeField() { + return Strings.isNullOrEmpty(timeField) ? null : timeField; + } + + @Nullable + public String getKeyField() { + return Strings.isNullOrEmpty(keyField) ? null : keyField; + } + + @Nullable + public String getPartitionField() { + return Strings.isNullOrEmpty(partitionField) ? null : partitionField; + } + + @Nullable + public String getOffsetField() { + return Strings.isNullOrEmpty(offsetField) ? null : offsetField; + } + + @Nullable + public String getFormat() { + return Strings.isNullOrEmpty(format) ? null : format; + } + + @Nullable + public Integer getMaxRatePerPartition() { + return maxRatePerPartition; + } + + public Schema getSchema() { + try { + return Strings.isNullOrEmpty(schema) ? null : Schema.parseJson(schema); + } catch (IOException e) { + throw new IllegalArgumentException("Unable to parse schema: " + e.getMessage()); + } + } + + // gets the message schema from the schema field. If the time, key, partition, or offset fields are in the configured + // schema, they will be removed. + public Schema getMessageSchema() { + Schema schema = getSchema(); + List messageFields = new ArrayList<>(); + boolean timeFieldExists = false; + boolean keyFieldExists = false; + boolean partitionFieldExists = false; + boolean offsetFieldExists = false; + + for (Schema.Field field : schema.getFields()) { + String fieldName = field.getName(); + Schema fieldSchema = field.getSchema(); + Schema.Type fieldType = fieldSchema.isNullable() ? fieldSchema.getNonNullable().getType() : fieldSchema.getType(); + // if the field is not the time field and not the key field, it is a message field. + if (fieldName.equals(timeField)) { + if (fieldType != Schema.Type.LONG) { + throw new IllegalArgumentException("The time field must be of type long or nullable long."); + } + timeFieldExists = true; + } else if (fieldName.equals(keyField)) { + if (fieldType != Schema.Type.BYTES) { + throw new IllegalArgumentException("The key field must be of type bytes or nullable bytes."); + } + keyFieldExists = true; + } else if (fieldName.equals(partitionField)) { + if (fieldType != Schema.Type.INT) { + throw new IllegalArgumentException("The partition field must be of type int."); + } + partitionFieldExists = true; + } else if (fieldName.equals(offsetField)) { + if (fieldType != Schema.Type.LONG) { + throw new IllegalArgumentException("The offset field must be of type long."); + } + offsetFieldExists = true; + } else { + messageFields.add(field); + } + } + if (messageFields.isEmpty()) { + throw new IllegalArgumentException( + "Schema must contain at least one other field besides the time and key fields."); + } + + if (getTimeField() != null && !timeFieldExists) { + throw new IllegalArgumentException(String.format( + "timeField '%s' does not exist in the schema. Please add it to the schema.", timeField)); + } + if (getKeyField() != null && !keyFieldExists) { + throw new IllegalArgumentException(String.format( + "keyField '%s' does not exist in the schema. Please add it to the schema.", keyField)); + } + if (getPartitionField() != null && !partitionFieldExists) { + throw new IllegalArgumentException(String.format( + "partitionField '%s' does not exist in the schema. Please add it to the schema.", partitionField)); + } + if (getOffsetField() != null && !offsetFieldExists) { + throw new IllegalArgumentException(String.format( + "offsetField '%s' does not exist in the schema. Please add it to the schema.", offsetFieldExists)); + } + return Schema.recordOf("kafka.message", messageFields); + } + + /** + * Get the initial partition offsets for the specified partitions. If an initial offset is specified in the + * initialPartitionOffsets property, that value will be used. Otherwise, the defaultInitialOffset will be used. + * + * @param partitionsToRead the partitions to read + * @return initial partition offsets. + */ + public Map getInitialPartitionOffsets(Set partitionsToRead) { + Map partitionOffsets = new HashMap<>(); + + // set default initial partitions + for (Integer partition : partitionsToRead) { + partitionOffsets.put(new TopicPartition(topic, partition), defaultInitialOffset); + } + + // if initial partition offsets are specified, overwrite the defaults. + if (initialPartitionOffsets != null) { + for (KeyValue partitionAndOffset : KeyValueListParser.DEFAULT.parse(initialPartitionOffsets)) { + String partitionStr = partitionAndOffset.getKey(); + String offsetStr = partitionAndOffset.getValue(); + int partition; + try { + partition = Integer.parseInt(partitionStr); + } catch (NumberFormatException e) { + throw new IllegalArgumentException(String.format( + "Invalid partition '%s' in initialPartitionOffsets.", partitionStr)); + } + long offset; + try { + offset = Long.parseLong(offsetStr); + } catch (NumberFormatException e) { + throw new IllegalArgumentException(String.format( + "Invalid offset '%s' in initialPartitionOffsets for partition %d.", partitionStr, partition)); + } + partitionOffsets.put(new TopicPartition(topic, partition), offset); + } + } + + return partitionOffsets; + } + + /** + * @return broker host to broker port mapping. + */ + public Map getBrokerMap() { + Map brokerMap = new HashMap<>(); + for (KeyValue hostAndPort : KeyValueListParser.DEFAULT.parse(brokers)) { + String host = hostAndPort.getKey(); + String portStr = hostAndPort.getValue(); + try { + brokerMap.put(host, Integer.parseInt(portStr)); + } catch (NumberFormatException e) { + throw new IllegalArgumentException(String.format( + "Invalid port '%s' for host '%s'.", portStr, host)); + } + } + if (brokerMap.isEmpty()) { + throw new IllegalArgumentException("Must specify kafka brokers."); + } + return brokerMap; + } + + /** + * @return set of partitions to read from. Returns an empty list if no partitions were specified. + */ + public Set getPartitions() { + Set partitionSet = new HashSet<>(); + if (partitions == null) { + return partitionSet; + } + for (String partition : Splitter.on(',').trimResults().split(partitions)) { + try { + partitionSet.add(Integer.parseInt(partition)); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + String.format("Invalid partition '%s'. Partitions must be integers.", partition)); + } + } + return partitionSet; + } + + @Nullable + public String getPrincipal() { + return principal; + } + + @Nullable + public String getKeytabLocation() { + return keytabLocation; + } + + public Map getKafkaProperties() { + KeyValueListParser kvParser = new KeyValueListParser("\\s*,\\s*", ":"); + Map conf = new HashMap<>(); + if (!Strings.isNullOrEmpty(kafkaProperties)) { + for (KeyValue keyVal : kvParser.parse(kafkaProperties)) { + conf.put(keyVal.getKey(), keyVal.getValue()); + } + } + return conf; + } + + public void validate() { + // brokers can be null since it is macro enabled. + if (brokers != null) { + getBrokerMap(); + } + getPartitions(); + getInitialPartitionOffsets(getPartitions()); + + if (maxRatePerPartition == null || maxRatePerPartition < 0) { + throw new IllegalArgumentException(String.format("Invalid maxRatePerPartition %d. Rate must be 0 or greater.", + maxRatePerPartition)); + } + + if (!Strings.isNullOrEmpty(timeField) && !Strings.isNullOrEmpty(keyField) && timeField.equals(keyField)) { + throw new IllegalArgumentException(String.format( + "The timeField and keyField cannot both have the same name (%s).", timeField)); + } + + Schema messageSchema = getMessageSchema(); + // if format is empty, there must be just a single message field of type bytes or nullable types. + if (Strings.isNullOrEmpty(format)) { + List messageFields = messageSchema.getFields(); + if (messageFields.size() > 1) { + List fieldNames = new ArrayList<>(); + for (Schema.Field messageField : messageFields) { + fieldNames.add(messageField.getName()); + } + throw new IllegalArgumentException(String.format( + "Without a format, the schema must contain just a single message field of type bytes or nullable bytes. " + + "Found %s message fields (%s).", messageFields.size(), Joiner.on(',').join(fieldNames))); + } + + Schema.Field messageField = messageFields.get(0); + Schema messageFieldSchema = messageField.getSchema(); + Schema.Type messageFieldType = messageFieldSchema.isNullable() ? + messageFieldSchema.getNonNullable().getType() : messageFieldSchema.getType(); + if (messageFieldType != Schema.Type.BYTES) { + throw new IllegalArgumentException(String.format( + "Without a format, the message field must be of type bytes or nullable bytes, but field %s is of type %s.", + messageField.getName(), messageField.getSchema())); + } + } else { + // otherwise, if there is a format, make sure we can instantiate it. + FormatSpecification formatSpec = new FormatSpecification(format, messageSchema, new HashMap()); + + try { + RecordFormats.createInitializedFormat(formatSpec); + } catch (Exception e) { + throw new IllegalArgumentException(String.format( + "Unable to instantiate a message parser from format '%s' and message schema '%s': %s", + format, messageSchema, e.getMessage()), e); + } + } + + KafkaHelpers.validateKerberosSetting(principal, keytabLocation); + } +} diff --git a/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/source/KafkaStreamingSource.java b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/source/KafkaStreamingSource.java new file mode 100644 index 0000000..58f67d3 --- /dev/null +++ b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/source/KafkaStreamingSource.java @@ -0,0 +1,287 @@ +/* + * Copyright © 2018 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package co.cask.hydrator.plugin.source; + +import co.cask.cdap.api.annotation.Description; +import co.cask.cdap.api.annotation.Name; +import co.cask.cdap.api.annotation.Plugin; +import co.cask.cdap.api.data.format.FormatSpecification; +import co.cask.cdap.api.data.format.RecordFormat; +import co.cask.cdap.api.data.format.StructuredRecord; +import co.cask.cdap.api.data.schema.Schema; +import co.cask.cdap.api.flow.flowlet.StreamEvent; +import co.cask.cdap.etl.api.PipelineConfigurer; +import co.cask.cdap.etl.api.streaming.StreamingContext; +import co.cask.cdap.etl.api.streaming.StreamingSource; +import co.cask.cdap.format.RecordFormats; +import co.cask.hydrator.plugin.common.KafkaHelpers; +import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import kafka.api.OffsetRequest; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.streaming.Time; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.kafka010.ConsumerStrategies; +import org.apache.spark.streaming.kafka010.KafkaUtils; +import org.apache.spark.streaming.kafka010.LocationStrategies; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +/** + * Kafka Streaming source + */ +@Plugin(type = StreamingSource.PLUGIN_TYPE) +@Name("Kafka") +@Description("Kafka streaming source.") +public class KafkaStreamingSource extends ReferenceStreamingSource { + private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamingSource.class); + private final KafkaConfig conf; + + public KafkaStreamingSource(KafkaConfig conf) { + super(conf); + this.conf = conf; + } + + @Override + public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException { + super.configurePipeline(pipelineConfigurer); + conf.validate(); + pipelineConfigurer.getStageConfigurer().setOutputSchema(conf.getSchema()); + if (conf.getMaxRatePerPartition() != null && conf.getMaxRatePerPartition() > 0) { + Map pipelineProperties = new HashMap<>(); + pipelineProperties.put("spark.streaming.kafka.maxRatePerPartition", conf.getMaxRatePerPartition().toString()); + pipelineConfigurer.setPipelineProperties(pipelineProperties); + } + } + + @Override + public JavaDStream getStream(StreamingContext context) throws Exception { + context.registerLineage(conf.referenceName); + + Map kafkaParams = new HashMap<>(); + kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.getBrokers()); + // Spark saves the offsets in checkpoints, no need for Kafka to save them + kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + kafkaParams.put("key.deserializer", ByteArrayDeserializer.class.getCanonicalName()); + kafkaParams.put("value.deserializer", ByteArrayDeserializer.class.getCanonicalName()); + KafkaHelpers.setupKerberosLogin(kafkaParams, conf.getPrincipal(), conf.getKeytabLocation()); + // Create a unique string for the group.id using the pipeline name and the topic. + // group.id is a Kafka consumer property that uniquely identifies the group of + // consumer processes to which this consumer belongs. + kafkaParams.put("group.id", Joiner.on("-").join(context.getPipelineName().length(), conf.getTopic().length(), + context.getPipelineName(), conf.getTopic())); + kafkaParams.putAll(conf.getKafkaProperties()); + + Properties properties = new Properties(); + properties.putAll(kafkaParams); + try (Consumer consumer = new KafkaConsumer<>(properties, new ByteArrayDeserializer(), + new ByteArrayDeserializer())) { + Map offsets = conf.getInitialPartitionOffsets(getPartitions(consumer)); + // KafkaUtils doesn't understand -1 and -2 as smallest offset and latest offset. + // so we have to replace them with the actual smallest and latest + List earliestOffsetRequest = new ArrayList<>(); + List latestOffsetRequest = new ArrayList<>(); + for (Map.Entry entry : offsets.entrySet()) { + TopicPartition topicAndPartition = entry.getKey(); + Long offset = entry.getValue(); + if (offset == OffsetRequest.EarliestTime()) { + earliestOffsetRequest.add(topicAndPartition); + } else if (offset == OffsetRequest.LatestTime()) { + latestOffsetRequest.add(topicAndPartition); + } + } + + Set allOffsetRequest = + Sets.newHashSet(Iterables.concat(earliestOffsetRequest, latestOffsetRequest)); + Map offsetsFound = new HashMap<>(); + offsetsFound.putAll(KafkaHelpers.getEarliestOffsets(consumer, earliestOffsetRequest)); + offsetsFound.putAll(KafkaHelpers.getLatestOffsets(consumer, latestOffsetRequest)); + for (TopicPartition topicAndPartition : allOffsetRequest) { + offsets.put(topicAndPartition, offsetsFound.get(topicAndPartition)); + } + + Set missingOffsets = Sets.difference(allOffsetRequest, offsetsFound.keySet()); + if (!missingOffsets.isEmpty()) { + throw new IllegalStateException(String.format( + "Could not find offsets for %s. Please check all brokers were included in the broker list.", missingOffsets)); + } + LOG.info("Using initial offsets {}", offsets); + + return KafkaUtils.createDirectStream( + context.getSparkStreamingContext(), LocationStrategies.PreferConsistent(), + ConsumerStrategies.Subscribe(Collections.singleton(conf.getTopic()), kafkaParams, offsets) + ).transform(new RecordTransform(conf)); + } + } + + private Set getPartitions(Consumer consumer) { + Set partitions = conf.getPartitions(); + if (!partitions.isEmpty()) { + return partitions; + } + + partitions = new HashSet<>(); + for (PartitionInfo partitionInfo : consumer.partitionsFor(conf.getTopic())) { + partitions.add(partitionInfo.partition()); + } + return partitions; + } + + /** + * Applies the format function to each rdd. + */ + private static class RecordTransform + implements Function2>, Time, JavaRDD> { + + private final KafkaConfig conf; + + RecordTransform(KafkaConfig conf) { + this.conf = conf; + } + + @Override + public JavaRDD call(JavaRDD> input, Time batchTime) { + Function, StructuredRecord> recordFunction = conf.getFormat() == null ? + new BytesFunction(batchTime.milliseconds(), conf) : new FormatFunction(batchTime.milliseconds(), conf); + return input.map(recordFunction); + } + } + + /** + * Common logic for transforming kafka key, message, partition, and offset into a structured record. + * Everything here should be serializable, as Spark Streaming will serialize all functions. + */ + private abstract static class BaseFunction implements Function, StructuredRecord> { + private final long ts; + protected final KafkaConfig conf; + private transient String messageField; + private transient String timeField; + private transient String keyField; + private transient String partitionField; + private transient String offsetField; + private transient Schema schema; + + BaseFunction(long ts, KafkaConfig conf) { + this.ts = ts; + this.conf = conf; + } + + @Override + public StructuredRecord call(ConsumerRecord in) throws Exception { + // first time this was called, initialize schema and time, key, and message fields. + if (schema == null) { + schema = conf.getSchema(); + timeField = conf.getTimeField(); + keyField = conf.getKeyField(); + partitionField = conf.getPartitionField(); + offsetField = conf.getOffsetField(); + for (Schema.Field field : schema.getFields()) { + String name = field.getName(); + if (!name.equals(timeField) && !name.equals(keyField)) { + messageField = name; + break; + } + } + } + + StructuredRecord.Builder builder = StructuredRecord.builder(schema); + if (timeField != null) { + builder.set(timeField, ts); + } + if (keyField != null) { + builder.set(keyField, in.key()); + } + if (partitionField != null) { + builder.set(partitionField, in.partition()); + } + if (offsetField != null) { + builder.set(offsetField, in.offset()); + } + addMessage(builder, messageField, in.value()); + return builder.build(); + } + + protected abstract void addMessage(StructuredRecord.Builder builder, String messageField, + byte[] message) throws Exception; + } + + /** + * Transforms kafka key and message into a structured record when message format is not given. + * Everything here should be serializable, as Spark Streaming will serialize all functions. + */ + private static class BytesFunction extends BaseFunction { + + BytesFunction(long ts, KafkaConfig conf) { + super(ts, conf); + } + + @Override + protected void addMessage(StructuredRecord.Builder builder, String messageField, byte[] message) { + builder.set(messageField, message); + } + } + + /** + * Transforms kafka key and message into a structured record when message format and schema are given. + * Everything here should be serializable, as Spark Streaming will serialize all functions. + */ + private static class FormatFunction extends BaseFunction { + private transient RecordFormat recordFormat; + + FormatFunction(long ts, KafkaConfig conf) { + super(ts, conf); + } + + @Override + protected void addMessage(StructuredRecord.Builder builder, String messageField, byte[] message) throws Exception { + // first time this was called, initialize record format + if (recordFormat == null) { + Schema messageSchema = conf.getMessageSchema(); + FormatSpecification spec = + new FormatSpecification(conf.getFormat(), messageSchema, new HashMap()); + recordFormat = RecordFormats.createInitializedFormat(spec); + } + + StructuredRecord messageRecord = recordFormat.read(new StreamEvent(ByteBuffer.wrap(message))); + for (Schema.Field field : messageRecord.getSchema().getFields()) { + String fieldName = field.getName(); + builder.set(fieldName, messageRecord.get(fieldName)); + } + } + } + +} diff --git a/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/source/ReferenceStreamingSource.java b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/source/ReferenceStreamingSource.java new file mode 100644 index 0000000..6fb6d96 --- /dev/null +++ b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/source/ReferenceStreamingSource.java @@ -0,0 +1,46 @@ +/* + * Copyright © 2018 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package co.cask.hydrator.plugin.source; + +import co.cask.cdap.api.dataset.DatasetProperties; +import co.cask.cdap.etl.api.PipelineConfigurer; +import co.cask.cdap.etl.api.streaming.StreamingSource; +import co.cask.hydrator.common.Constants; +import co.cask.hydrator.common.IdUtils; +import co.cask.hydrator.common.ReferencePluginConfig; + +/** + * Base streaming source that adds an External Dataset for a reference name, and performs a single getDataset() + * call to make sure CDAP records that it was accessed. + * + * @param type of object read by the source. + */ +public abstract class ReferenceStreamingSource extends StreamingSource { + private final ReferencePluginConfig conf; + + public ReferenceStreamingSource(ReferencePluginConfig conf) { + this.conf = conf; + } + + @Override + public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException { + super.configurePipeline(pipelineConfigurer); + // Verify that reference name meets dataset id constraints + IdUtils.validateId(conf.referenceName); + pipelineConfigurer.createDataset(conf.referenceName, Constants.EXTERNAL_DATASET_TYPE, DatasetProperties.EMPTY); + } +} diff --git a/kafka-plugins-0.9/src/test/java/co/cask/hydrator/EmbeddedKafkaServer.java b/kafka-plugins-0.10/src/test/java/co/cask/hydrator/EmbeddedKafkaServer.java similarity index 89% rename from kafka-plugins-0.9/src/test/java/co/cask/hydrator/EmbeddedKafkaServer.java rename to kafka-plugins-0.10/src/test/java/co/cask/hydrator/EmbeddedKafkaServer.java index 068185b..e0721d2 100644 --- a/kafka-plugins-0.9/src/test/java/co/cask/hydrator/EmbeddedKafkaServer.java +++ b/kafka-plugins-0.10/src/test/java/co/cask/hydrator/EmbeddedKafkaServer.java @@ -19,16 +19,20 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.util.concurrent.AbstractIdleService; +import kafka.metrics.KafkaMetricsReporter; import kafka.server.KafkaConfig; import kafka.server.KafkaServer; -import kafka.utils.Time; import org.I0Itec.zkclient.exception.ZkTimeoutException; +import org.apache.kafka.common.utils.Time; import org.apache.twill.internal.utils.Networks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Option; +import scala.collection.JavaConverters; +import scala.collection.Seq; import java.net.BindException; +import java.util.Collections; import java.util.Properties; import java.util.Random; import java.util.concurrent.TimeUnit; @@ -97,6 +101,9 @@ protected void shutDown() throws Exception { } private KafkaServer createKafkaServer(KafkaConfig kafkaConfig) { + Seq metricsReporters = + JavaConverters.collectionAsScalaIterableConverter( + Collections.emptyList()).asScala().toSeq(); return new KafkaServer(kafkaConfig, new Time() { @Override @@ -117,7 +124,12 @@ public void sleep(long ms) { Thread.interrupted(); } } - }, Option.apply("embedded-server")); + + @Override + public long hiResClockMs() { + return System.currentTimeMillis(); + } + }, Option.apply("embedded-server"), metricsReporters); } /** diff --git a/kafka-plugins-0.9/src/test/java/co/cask/hydrator/Kafka9BatchSourceTest.java b/kafka-plugins-0.10/src/test/java/co/cask/hydrator/KafkaBatchSourceTest.java similarity index 97% rename from kafka-plugins-0.9/src/test/java/co/cask/hydrator/Kafka9BatchSourceTest.java rename to kafka-plugins-0.10/src/test/java/co/cask/hydrator/KafkaBatchSourceTest.java index 65acc7c..f6505ce 100644 --- a/kafka-plugins-0.9/src/test/java/co/cask/hydrator/Kafka9BatchSourceTest.java +++ b/kafka-plugins-0.10/src/test/java/co/cask/hydrator/KafkaBatchSourceTest.java @@ -52,6 +52,7 @@ import org.apache.twill.kafka.client.KafkaClientService; import org.apache.twill.kafka.client.KafkaPublisher; import org.apache.twill.zookeeper.ZKClientService; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -68,7 +69,7 @@ /** * Unit tests for our plugins. */ -public class Kafka9BatchSourceTest extends HydratorTestBase { +public class KafkaBatchSourceTest extends HydratorTestBase { private static final ArtifactSummary APP_ARTIFACT = new ArtifactSummary("data-pipeline", "1.0.0"); @ClassRule public static final TestConfiguration CONFIG = new TestConfiguration("explore.enabled", false); @@ -108,6 +109,14 @@ public static void setupTestClass() throws Exception { kafkaClient.startAndWait(); } + @AfterClass + public static void cleanup() { + kafkaClient.stopAndWait(); + kafkaServer.stopAndWait(); + zkClient.stopAndWait(); + zkServer.stopAndWait(); + } + @Test public void testKafkaSource() throws Exception { Schema schema = Schema.recordOf( diff --git a/kafka-plugins-0.10/src/test/java/co/cask/hydrator/KafkaSinkAndAlertsPublisherTest.java b/kafka-plugins-0.10/src/test/java/co/cask/hydrator/KafkaSinkAndAlertsPublisherTest.java new file mode 100644 index 0000000..d9dc825 --- /dev/null +++ b/kafka-plugins-0.10/src/test/java/co/cask/hydrator/KafkaSinkAndAlertsPublisherTest.java @@ -0,0 +1,271 @@ +/* + * Copyright © 2018 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package co.cask.hydrator; + +import co.cask.cdap.api.artifact.ArtifactSummary; +import co.cask.cdap.api.data.format.StructuredRecord; +import co.cask.cdap.api.data.schema.Schema; +import co.cask.cdap.api.dataset.table.Table; +import co.cask.cdap.datapipeline.DataPipelineApp; +import co.cask.cdap.datapipeline.SmartWorkflow; +import co.cask.cdap.etl.api.Alert; +import co.cask.cdap.etl.mock.alert.NullAlertTransform; +import co.cask.cdap.etl.mock.batch.MockSource; +import co.cask.cdap.etl.mock.test.HydratorTestBase; +import co.cask.cdap.etl.proto.v2.ETLBatchConfig; +import co.cask.cdap.etl.proto.v2.ETLPlugin; +import co.cask.cdap.etl.proto.v2.ETLStage; +import co.cask.cdap.proto.ProgramRunStatus; +import co.cask.cdap.proto.artifact.AppRequest; +import co.cask.cdap.proto.id.ApplicationId; +import co.cask.cdap.proto.id.ArtifactId; +import co.cask.cdap.proto.id.NamespaceId; +import co.cask.cdap.test.ApplicationManager; +import co.cask.cdap.test.DataSetManager; +import co.cask.cdap.test.TestConfiguration; +import co.cask.cdap.test.WorkflowManager; +import co.cask.hydrator.plugin.alertpublisher.KafkaAlertPublisher; +import co.cask.hydrator.plugin.sink.KafkaBatchSink; +import com.google.common.base.Charsets; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.gson.Gson; +import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.twill.common.Cancellable; +import org.apache.twill.internal.kafka.client.ZKKafkaClientService; +import org.apache.twill.internal.utils.Networks; +import org.apache.twill.internal.zookeeper.InMemoryZKServer; +import org.apache.twill.kafka.client.FetchedMessage; +import org.apache.twill.kafka.client.KafkaClientService; +import org.apache.twill.kafka.client.KafkaConsumer; +import org.apache.twill.zookeeper.ZKClientService; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; + +import java.io.File; +import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * Kafka Sink and Alerts Publisher test + */ +public class KafkaSinkAndAlertsPublisherTest extends HydratorTestBase { + private static final ArtifactSummary APP_ARTIFACT = new ArtifactSummary("data-pipeline", "1.0.0"); + @ClassRule + public static final TestConfiguration CONFIG = new TestConfiguration("explore.enabled", false); + + private static final Gson GSON = new Gson(); + + private static ZKClientService zkClient; + private static KafkaClientService kafkaClient; + private static InMemoryZKServer zkServer; + private static EmbeddedKafkaServer kafkaServer; + private static int kafkaPort; + + @BeforeClass + public static void setupTestClass() throws Exception { + ArtifactId parentArtifact = NamespaceId.DEFAULT.artifact(APP_ARTIFACT.getName(), APP_ARTIFACT.getVersion()); + + // add the data-pipeline artifact and mock plugins + setupBatchArtifacts(parentArtifact, DataPipelineApp.class); + + // add our plugins artifact with the data-pipeline artifact as its parent. + // this will make our plugins available to data-pipeline. + addPluginArtifact(NamespaceId.DEFAULT.artifact("example-plugins", "1.0.0"), + parentArtifact, + KafkaBatchSink.class, + KafkaAlertPublisher.class, + RangeAssignor.class, + StringSerializer.class); + + zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).build(); + zkServer.startAndWait(); + + kafkaPort = Networks.getRandomPort(); + kafkaServer = new EmbeddedKafkaServer(generateKafkaConfig(zkServer.getConnectionStr(), + kafkaPort, TMP_FOLDER.newFolder())); + kafkaServer.startAndWait(); + + zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); + zkClient.startAndWait(); + + kafkaClient = new ZKKafkaClientService(zkClient); + kafkaClient.startAndWait(); + } + + @AfterClass + public static void cleanup() { + kafkaClient.stopAndWait(); + kafkaServer.stopAndWait(); + zkClient.stopAndWait(); + zkServer.stopAndWait(); + } + + @Test + public void testKafkaSinkAndAlertsPublisher() throws Exception { + Schema schema = Schema.recordOf( + "user", + Schema.Field.of("id", Schema.nullableOf(Schema.of(Schema.Type.LONG))), + Schema.Field.of("first", Schema.of(Schema.Type.STRING)), + Schema.Field.of("last", Schema.of(Schema.Type.STRING))); + + // create the pipeline config + String inputName = "sinkTestInput"; + + String usersTopic = "records"; + String alertsTopic = "alerts"; + Map sinkProperties = new HashMap<>(); + sinkProperties.put("brokers", "localhost:" + kafkaPort); + sinkProperties.put("referenceName", "kafkaTest"); + sinkProperties.put("topic", usersTopic); + sinkProperties.put("schema", schema.toString()); + sinkProperties.put("format", "csv"); + sinkProperties.put("key", "last"); + sinkProperties.put("async", "FALSE"); + sinkProperties.put("compressionType", "none"); + + Map alertProperties = new HashMap<>(); + alertProperties.put("brokers", "localhost:" + kafkaPort); + alertProperties.put("topic", alertsTopic); + + ETLStage source = new ETLStage("source", MockSource.getPlugin(inputName)); + ETLStage sink = + new ETLStage("sink", new ETLPlugin("Kafka", KafkaBatchSink.PLUGIN_TYPE, sinkProperties, null)); + ETLStage transform = new ETLStage("nullAlert", NullAlertTransform.getPlugin("id")); + ETLStage alert = + new ETLStage("alert", new ETLPlugin("KafkaAlerts", KafkaAlertPublisher.PLUGIN_TYPE, alertProperties)); + + ETLBatchConfig pipelineConfig = ETLBatchConfig.builder("* * * * *") + .addStage(source) + .addStage(transform) + .addStage(sink) + .addStage(alert) + .addConnection(source.getName(), transform.getName()) + .addConnection(transform.getName(), sink.getName()) + .addConnection(transform.getName(), alert.getName()) + .build(); + + // create the pipeline + ApplicationId pipelineId = NamespaceId.DEFAULT.app("testKafkaSink"); + ApplicationManager appManager = deployApplication(pipelineId, new AppRequest<>(APP_ARTIFACT, pipelineConfig)); + + + Set expected = ImmutableSet.of("100,samuel,jackson", + "200,dwayne,johnson", + "300,christopher,walken", + "400,donald,trump"); + + List records = new ArrayList<>(); + for (String e : expected) { + String[] splits = e.split(","); + StructuredRecord record = + StructuredRecord.builder(schema) + .set("id", Long.parseLong(splits[0])) + .set("first", splits[1]) + .set("last", splits[2]) + .build(); + records.add(record); + } + + // Add a null record to get an alert + StructuredRecord nullRecord = + StructuredRecord.builder(schema) + .set("first", "terry") + .set("last", "crews") + .build(); + records.add(nullRecord); + + DataSetManager sourceTable = getDataset(inputName); + MockSource.writeInput(sourceTable, records); + + WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); + workflowManager.start(); + workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 1, TimeUnit.MINUTES); + + // Assert users + Set actual = readKafkaRecords(usersTopic, expected.size()); + Assert.assertEquals(expected, actual); + + // Assert alerts + Set actualAlerts = readKafkaRecords(alertsTopic, 1); + // NullAlertTransform always returns empty hash map in alert + Assert.assertEquals(ImmutableSet.of(new Alert(transform.getName(), new HashMap())), + ImmutableSet.copyOf(Iterables.transform(actualAlerts, + new Function() { + @Override + public Alert apply(String s) { + return GSON.fromJson(s, Alert.class); + } + } + ))); + } + + private Set readKafkaRecords(String topic, final int maxMessages) throws InterruptedException { + KafkaConsumer kafkaConsumer = kafkaClient.getConsumer(); + + final Set kafkaMessages = new HashSet<>(); + KafkaConsumer.Preparer preparer = kafkaConsumer.prepare(); + preparer.addFromBeginning(topic, 0); + + final CountDownLatch stopLatch = new CountDownLatch(1); + Cancellable cancellable = preparer.consume(new KafkaConsumer.MessageCallback() { + @Override + public long onReceived(Iterator messages) { + long nextOffset = 0; + while (messages.hasNext()) { + FetchedMessage message = messages.next(); + nextOffset = message.getNextOffset(); + String payload = Charsets.UTF_8.decode(message.getPayload()).toString(); + kafkaMessages.add(payload); + } + // We are done when maxMessages are received + if (kafkaMessages.size() >= maxMessages) { + stopLatch.countDown(); + } + return nextOffset; + } + + @Override + public void finished() { + // nothing to do + } + }); + + stopLatch.await(30, TimeUnit.SECONDS); + cancellable.cancel(); + return kafkaMessages; + } + + private static Properties generateKafkaConfig(String zkConnectStr, int port, File logDir) { + Properties prop = new Properties(); + prop.setProperty("log.dir", logDir.getAbsolutePath()); + prop.setProperty("port", Integer.toString(port)); + prop.setProperty("broker.id", "1"); + prop.setProperty("num.partitions", "1"); + prop.setProperty("zookeeper.connect", zkConnectStr); + prop.setProperty("zookeeper.connection.timeout.ms", "1000000"); + prop.setProperty("default.replication.factor", "1"); + return prop; + } + +} diff --git a/kafka-plugins-0.10/src/test/java/co/cask/hydrator/KafkaStreamingSourceTest.java b/kafka-plugins-0.10/src/test/java/co/cask/hydrator/KafkaStreamingSourceTest.java new file mode 100644 index 0000000..e124cc6 --- /dev/null +++ b/kafka-plugins-0.10/src/test/java/co/cask/hydrator/KafkaStreamingSourceTest.java @@ -0,0 +1,266 @@ +/* + * Copyright © 2018 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package co.cask.hydrator; + +import co.cask.cdap.api.artifact.ArtifactRange; +import co.cask.cdap.api.artifact.ArtifactSummary; +import co.cask.cdap.api.artifact.ArtifactVersion; +import co.cask.cdap.api.data.format.StructuredRecord; +import co.cask.cdap.api.data.schema.Schema; +import co.cask.cdap.api.dataset.table.Table; +import co.cask.cdap.common.utils.Networks; +import co.cask.cdap.common.utils.Tasks; +import co.cask.cdap.datapipeline.DataPipelineApp; +import co.cask.cdap.datastreams.DataStreamsApp; +import co.cask.cdap.datastreams.DataStreamsSparkLauncher; +import co.cask.cdap.etl.api.streaming.StreamingSource; +import co.cask.cdap.etl.mock.batch.MockSink; +import co.cask.cdap.etl.mock.test.HydratorTestBase; +import co.cask.cdap.etl.proto.v2.DataStreamsConfig; +import co.cask.cdap.etl.proto.v2.ETLPlugin; +import co.cask.cdap.etl.proto.v2.ETLStage; +import co.cask.cdap.proto.artifact.AppRequest; +import co.cask.cdap.proto.id.ApplicationId; +import co.cask.cdap.proto.id.ArtifactId; +import co.cask.cdap.proto.id.NamespaceId; +import co.cask.cdap.test.ApplicationManager; +import co.cask.cdap.test.DataSetManager; +import co.cask.cdap.test.SparkManager; +import co.cask.cdap.test.TestConfiguration; +import co.cask.hydrator.common.http.HTTPPollConfig; +import co.cask.hydrator.plugin.source.KafkaStreamingSource; +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.Uninterruptibles; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.spark.streaming.kafka010.KafkaUtils; +import org.apache.twill.internal.kafka.client.ZKKafkaClientService; +import org.apache.twill.internal.zookeeper.InMemoryZKServer; +import org.apache.twill.kafka.client.Compression; +import org.apache.twill.kafka.client.KafkaClientService; +import org.apache.twill.kafka.client.KafkaPublisher; +import org.apache.twill.zookeeper.ZKClientService; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +/** + * Tests for Spark plugins. + */ +public class KafkaStreamingSourceTest extends HydratorTestBase { + + @ClassRule + public static final TestConfiguration CONFIG = new TestConfiguration("explore.enabled", false); + + private static final ArtifactId DATAPIPELINE_ARTIFACT_ID = + NamespaceId.DEFAULT.artifact("data-pipeline", "4.3.2"); + private static final ArtifactId DATASTREAMS_ARTIFACT_ID = + NamespaceId.DEFAULT.artifact("data-streams", "4.3.2"); + private static final ArtifactSummary DATASTREAMS_ARTIFACT = + new ArtifactSummary("data-streams", "4.3.2"); + + private static ZKClientService zkClient; + private static KafkaClientService kafkaClient; + private static InMemoryZKServer zkServer; + private static EmbeddedKafkaServer kafkaServer; + private static int kafkaPort; + + @ClassRule + public static TemporaryFolder tmpFolder = new TemporaryFolder(); + + @BeforeClass + public static void setupTest() throws Exception { + // add the artifact for data pipeline app + setupBatchArtifacts(DATAPIPELINE_ARTIFACT_ID, DataPipelineApp.class); + + setupStreamingArtifacts(DATASTREAMS_ARTIFACT_ID, DataStreamsApp.class); + + // add artifact for spark plugins + Set parents = ImmutableSet.of( + new ArtifactRange(NamespaceId.DEFAULT.getNamespace(), DATAPIPELINE_ARTIFACT_ID.getArtifact(), + new ArtifactVersion(DATAPIPELINE_ARTIFACT_ID.getVersion()), true, + new ArtifactVersion(DATAPIPELINE_ARTIFACT_ID.getVersion()), true), + new ArtifactRange(NamespaceId.DEFAULT.getNamespace(), DATASTREAMS_ARTIFACT_ID.getArtifact(), + new ArtifactVersion(DATASTREAMS_ARTIFACT_ID.getVersion()), true, + new ArtifactVersion(DATASTREAMS_ARTIFACT_ID.getVersion()), true) + ); + addPluginArtifact(NamespaceId.DEFAULT.artifact("spark-plugins", "1.0.0"), parents, + KafkaStreamingSource.class, KafkaUtils.class, ByteArrayDeserializer.class, TopicPartition.class, + HTTPPollConfig.class); + + zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).build(); + zkServer.startAndWait(); + + kafkaPort = Networks.getRandomPort(); + kafkaServer = new EmbeddedKafkaServer(generateKafkaConfig(zkServer.getConnectionStr(), + kafkaPort, TMP_FOLDER.newFolder())); + kafkaServer.startAndWait(); + + zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); + zkClient.startAndWait(); + + kafkaClient = new ZKKafkaClientService(zkClient); + kafkaClient.startAndWait(); + } + + @AfterClass + public static void cleanup() { + kafkaClient.stopAndWait(); + kafkaServer.stopAndWait(); + zkClient.stopAndWait(); + zkServer.stopAndWait(); + } + + @Test + public void testKafkaStreamingSource() throws Exception { + Schema schema = Schema.recordOf( + "user", + Schema.Field.of("id", Schema.of(Schema.Type.LONG)), + Schema.Field.of("first", Schema.of(Schema.Type.STRING)), + Schema.Field.of("last", Schema.of(Schema.Type.STRING))); + + Map properties = new HashMap<>(); + properties.put("referenceName", "kafkaPurchases"); + properties.put("brokers", "localhost:" + kafkaPort); + properties.put("topic", "users"); + properties.put("defaultInitialOffset", "-2"); + properties.put("format", "csv"); + properties.put("schema", schema.toString()); + + ETLStage source = new ETLStage("source", new ETLPlugin("Kafka", StreamingSource.PLUGIN_TYPE, properties, null)); + + DataStreamsConfig etlConfig = DataStreamsConfig.builder() + .addStage(source) + .addStage(new ETLStage("sink", MockSink.getPlugin("kafkaOutput"))) + .addConnection("source", "sink") + .setBatchInterval("1s") + .setStopGracefully(true) + .build(); + + AppRequest appRequest = new AppRequest<>(DATASTREAMS_ARTIFACT, etlConfig); + ApplicationId appId = NamespaceId.DEFAULT.app("KafkaSourceApp"); + ApplicationManager appManager = deployApplication(appId, appRequest); + + // write some messages to kafka + Map messages = new HashMap<>(); + messages.put("a", "1,samuel,jackson"); + messages.put("b", "2,dwayne,johnson"); + messages.put("c", "3,christopher,walken"); + sendKafkaMessage("users", messages); + + SparkManager sparkManager = appManager.getSparkManager(DataStreamsSparkLauncher.NAME); + sparkManager.start(); + sparkManager.waitForStatus(true, 10, 1); + + final DataSetManager
outputManager = getDataset("kafkaOutput"); + Tasks.waitFor( + ImmutableMap.of(1L, "samuel jackson", 2L, "dwayne johnson", 3L, "christopher walken"), + new Callable>() { + @Override + public Map call() throws Exception { + outputManager.flush(); + Map actual = new HashMap<>(); + for (StructuredRecord outputRecord : MockSink.readOutput(outputManager)) { + actual.put((Long) outputRecord.get("id"), outputRecord.get("first") + " " + outputRecord.get("last")); + } + return actual; + } + }, + 2, + TimeUnit.MINUTES); + + sparkManager.stop(); + sparkManager.waitForStatus(false, 10, 1); + + // clear the output table + MockSink.clear(outputManager); + + // now write some more messages to kafka and start the program again to make sure it picks up where it left off + messages = new HashMap<>(); + messages.put("d", "4,terry,crews"); + messages.put("e", "5,sylvester,stallone"); + sendKafkaMessage("users", messages); + + sparkManager.start(); + sparkManager.waitForStatus(true, 10, 1); + + Tasks.waitFor( + ImmutableMap.of(4L, "terry crews", 5L, "sylvester stallone"), + new Callable>() { + @Override + public Map call() throws Exception { + outputManager.flush(); + Map actual = new HashMap<>(); + for (StructuredRecord outputRecord : MockSink.readOutput(outputManager)) { + actual.put((Long) outputRecord.get("id"), outputRecord.get("first") + " " + outputRecord.get("last")); + } + return actual; + } + }, + 2, + TimeUnit.MINUTES); + + sparkManager.stop(); + } + + private static Properties generateKafkaConfig(String zkConnectStr, int port, File logDir) { + Properties prop = new Properties(); + prop.setProperty("log.dir", logDir.getAbsolutePath()); + prop.setProperty("port", Integer.toString(port)); + prop.setProperty("broker.id", "1"); + prop.setProperty("num.partitions", "1"); + prop.setProperty("zookeeper.connect", zkConnectStr); + prop.setProperty("zookeeper.connection.timeout.ms", "1000000"); + prop.setProperty("default.replication.factor", "1"); + return prop; + } + + private void sendKafkaMessage(@SuppressWarnings("SameParameterValue") String topic, Map messages) { + KafkaPublisher publisher = kafkaClient.getPublisher(KafkaPublisher.Ack.ALL_RECEIVED, Compression.NONE); + + // If publish failed, retry up to 20 times, with 100ms delay between each retry + // This is because leader election in Kafka 08 takes time when a topic is being created upon publish request. + int count = 0; + do { + KafkaPublisher.Preparer preparer = publisher.prepare(topic); + for (Map.Entry entry : messages.entrySet()) { + preparer.add(Charsets.UTF_8.encode(entry.getValue()), entry.getKey()); + } + try { + preparer.send().get(); + break; + } catch (Exception e) { + // Backoff if send failed. + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } + } while (count++ < 20); + } + + +} diff --git a/kafka-plugins-0.9/widgets/Kafka-batchsink.json b/kafka-plugins-0.10/widgets/Kafka-batchsink.json similarity index 100% rename from kafka-plugins-0.9/widgets/Kafka-batchsink.json rename to kafka-plugins-0.10/widgets/Kafka-batchsink.json diff --git a/kafka-plugins-0.9/widgets/Kafka-batchsource.json b/kafka-plugins-0.10/widgets/Kafka-batchsource.json similarity index 100% rename from kafka-plugins-0.9/widgets/Kafka-batchsource.json rename to kafka-plugins-0.10/widgets/Kafka-batchsource.json diff --git a/kafka-plugins-0.10/widgets/Kafka-streamingsource.json b/kafka-plugins-0.10/widgets/Kafka-streamingsource.json new file mode 100644 index 0000000..56187f5 --- /dev/null +++ b/kafka-plugins-0.10/widgets/Kafka-streamingsource.json @@ -0,0 +1,151 @@ +{ + "metadata": { + "spec-version": "1.5" + }, + "display-name": "Kafka Consumer", + "configuration-groups": [ + { + "label": "Kafka Configuration", + "properties": [ + { + "widget-type": "textbox", + "label": "Reference Name", + "name": "referenceName" + }, + { + "widget-type": "csv", + "label": "Kafka Brokers", + "name": "brokers", + "widget-attributes": { + "delimiter": "," + } + }, + { + "widget-type": "textbox", + "label": "Kafka Topic", + "name": "topic" + }, + { + "widget-type": "csv", + "label": "Topic Partitions", + "name": "partitions", + "widget-attributes": { + "delimiter": "," + } + }, + { + "widget-type": "textbox", + "label": "Default Initial Offset", + "name": "defaultInitialOffset" + }, + { + "widget-type": "keyvalue", + "label": "Initial Partition Offsets", + "name": "initialPartitionOffsets", + "widget-attributes": { + "showDelimiter": "false", + "key-placeholder": "Partition", + "value-placeholder": "Offset" + } + }, + { + "widget-type": "textbox", + "label": "Time Field", + "name": "timeField" + }, + { + "widget-type": "textbox", + "label": "Key Field", + "name": "keyField" + }, + { + "widget-type": "textbox", + "label": "Partition Field", + "name": "partitionField" + }, + { + "widget-type": "textbox", + "label": "Offset Field", + "name": "offsetField" + }, + { + "widget-type": "textbox", + "label": "Max Rate Per Partition", + "name": "maxRatePerPartition", + "widget-attributes": { + "default": "1000" + } + }, + { + "widget-type": "keyvalue", + "label": "Additional Kafka Consumer Properties", + "name": "kafkaProperties", + "widget-attributes": { + "showDelimiter": "false", + "key-placeholder": "Kafka consumer property", + "value-placeholder": "Kafka consumer property value" + } + } + ] + }, + { + "label": "Format", + "properties": [ + { + "widget-type": "select", + "label": "Format", + "name": "format", + "widget-attributes": { + "values": [ + "", + "avro", + "binary", + "clf", + "csv", + "grok", + "syslog", + "text", + "tsv" + ], + "default": "" + } + } + ] + }, + { + "label": "Authentication", + "properties": [ + { + "widget-type": "textbox", + "label": "Kerberos Principal", + "name": "principal" + }, + { + "widget-type": "textbox", + "label": "Keytab Location", + "name": "keytabLocation" + } + ] + } + ], + "outputs": [ + { + "name": "schema", + "widget-type": "schema", + "widget-attributes": { + "default-schema": { + "name": "etlSchemaBody", + "type": "record", + "fields": [ + { + "name": "message", + "type": ["bytes", "null"] + } + ] + }, + "schema-default-type": "string", + "property-watch": "format" + } + } + ] +} diff --git a/kafka-plugins-0.10/widgets/KafkaAlerts-alertpublisher.json b/kafka-plugins-0.10/widgets/KafkaAlerts-alertpublisher.json new file mode 100644 index 0000000..3ae173f --- /dev/null +++ b/kafka-plugins-0.10/widgets/KafkaAlerts-alertpublisher.json @@ -0,0 +1,52 @@ +{ + "metadata": { + "spec-version": "1.5" + }, + "display-name": "Kafka Alert Publisher", + "configuration-groups": [ + { + "label": "Kafka Alert Publisher Config", + "properties": [ + { + "widget-type": "csv", + "label": "Kafka Brokers", + "name": "brokers", + "widget-attributes": { + "delimiter": "," + } + }, + { + "widget-type": "textbox", + "label": "Kafka Topic", + "name": "topic" + }, + { + "widget-type": "keyvalue", + "label": "Additional Kafka Producer Properties", + "name": "producerProperties", + "widget-attributes": { + "showDelimiter": "false", + "key-placeholder": "Kafka producer property", + "value-placeholder": "Kafka producer property value" + } + } + ] + }, + { + "label": "Authentication", + "properties": [ + { + "widget-type": "textbox", + "label": "Kerberos Principal", + "name": "principal" + }, + { + "widget-type": "textbox", + "label": "Keytab Location", + "name": "keytabLocation" + } + ] + } + ], + "outputs": [ ] +} diff --git a/kafka-plugins-0.8/docs/KAFKASOURCE.md b/kafka-plugins-0.8/docs/KAFKASOURCE.md index 46e95ea..6d1c0ca 100644 --- a/kafka-plugins-0.8/docs/KAFKASOURCE.md +++ b/kafka-plugins-0.8/docs/KAFKASOURCE.md @@ -12,7 +12,7 @@ Usage Notes Kafka Streaming Source can be used to read events from a kafka topic. It uses kafka consumer [0.8.2 apis](https://kafka.apache.org/082/documentation.html) to read events from a kafka topic. Kafka Source converts incoming kafka events into cdap structured records which then can be used for further transformations. -The source provides capabilities to read from latest offset or from beginning or from the provided kafka offset. The plugin relies on Spark Streaming offset [storage capabilities](https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html) to manager offsets and checkpoints. +The source provides capabilities to read from latest offset or from beginning or from the provided kafka offset. The plugin relies on Spark Streaming offset [storage capabilities](https://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html) to manager offsets and checkpoints. Plugin Configuration --------------------- diff --git a/kafka-plugins-0.8/docs/Kafka-alert-publisher.md b/kafka-plugins-0.8/docs/Kafka-alert-publisher.md index bbda8a0..78031fe 100644 --- a/kafka-plugins-0.8/docs/Kafka-alert-publisher.md +++ b/kafka-plugins-0.8/docs/Kafka-alert-publisher.md @@ -4,7 +4,7 @@ Kafka Alert Publisher that allows you to publish alerts to kafka as json objects. The plugin internally uses kafka producer apis to publish alerts. The plugin allows to specify kafka topic to use for publishing and other additional kafka producer properties. -Please note that this plugin uses kafka 0.8.2 java apis, so it may not be compatible with higher versions of kafka. +This plugin uses kafka 0.8.2 java apis. Build ----- @@ -60,4 +60,4 @@ and limitations under the License. Cask is a trademark of Cask Data, Inc. All rights reserved. Apache, Apache HBase, and HBase are trademarks of The Apache Software Foundation. Used with -permission. No endorsement by The Apache Software Foundation is implied by the use of these marks. \ No newline at end of file +permission. No endorsement by The Apache Software Foundation is implied by the use of these marks. diff --git a/kafka-plugins-0.8/docs/Kafka-batchsink.md b/kafka-plugins-0.8/docs/Kafka-batchsink.md index 104c1c0..967edea 100644 --- a/kafka-plugins-0.8/docs/Kafka-batchsink.md +++ b/kafka-plugins-0.8/docs/Kafka-batchsink.md @@ -7,7 +7,7 @@ Kafka sink that allows you to write events into CSV or JSON to kafka. Plugin has the capability to push the data to a Kafka topic. It can also be configured to partition events being written to kafka based on a configurable key. The sink can also be configured to operate in sync or async mode and apply different -compression types to events. Kafka sink is compatible with Kafka 0.8, 0.9 and 0.10 +compression types to events. This plugin uses kafka 0.8.2 java apis. Configuration @@ -50,4 +50,4 @@ Additional properties like number of acknowledgements and client id can also be "kafkaProperties": "acks:2,client.id:myclient", "key": "message" } - } \ No newline at end of file + } diff --git a/kafka-plugins-0.8/docs/Kafka-batchsource.md b/kafka-plugins-0.8/docs/Kafka-batchsource.md index 6456a44..fdb062d 100644 --- a/kafka-plugins-0.8/docs/Kafka-batchsource.md +++ b/kafka-plugins-0.8/docs/Kafka-batchsource.md @@ -5,7 +5,7 @@ Description ----------- Kafka batch source. Emits the record from kafka. It will emit a record based on the schema and format you use, or if no schema or format is specified, the message payload will be emitted. The source will -remember the offset it read last run and continue from that offset for the next run. +remember the offset it read last run and continue from that offset for the next run. This plugin uses kafka 0.8.2 java apis. Use Case -------- @@ -94,4 +94,3 @@ For each Kafka message read, it will output a record with the schema: | count | int | | price | double | +================================+ - \ No newline at end of file diff --git a/kafka-plugins-0.8/docs/Kafka-streamingsource.md b/kafka-plugins-0.8/docs/Kafka-streamingsource.md index d047e18..44e0761 100644 --- a/kafka-plugins-0.8/docs/Kafka-streamingsource.md +++ b/kafka-plugins-0.8/docs/Kafka-streamingsource.md @@ -5,7 +5,7 @@ Description ----------- Kafka streaming source. Emits a record with the schema specified by the user. If no schema is specified, it will emit a record with two fields: 'key' (nullable string) and 'message' -(bytes). Kafka source is compatible with Kafka 0.8, 0.9 and 0.10 +(bytes). This plugin uses kafka 0.8.2 java apis. Use Case diff --git a/kafka-plugins-0.8/docs/KafkaAlerts-alertpublisher.md b/kafka-plugins-0.8/docs/KafkaAlerts-alertpublisher.md index 763e2b3..b1000f3 100644 --- a/kafka-plugins-0.8/docs/KafkaAlerts-alertpublisher.md +++ b/kafka-plugins-0.8/docs/KafkaAlerts-alertpublisher.md @@ -6,8 +6,7 @@ Description Kafka Alert Publisher that allows you to publish alerts to kafka as json objects. The plugin internally uses kafka producer apis to publish alerts. The plugin allows to specify kafka topic to use for publishing and other additional -kafka producer properties. Please note that this plugin uses kafka 0.8.2 java apis -so it may not be compatible with higher versions of kafka. +kafka producer properties. This plugin uses kafka 0.8.2 java apis. Configuration @@ -33,4 +32,4 @@ are like acks and client.id are specified as well. "topic": "alarm", "producerProperties": "acks:2,client.id:myclient" } - } \ No newline at end of file + } diff --git a/kafka-plugins-0.8/icons/KafkaAlerts-alertpublisher.png b/kafka-plugins-0.8/icons/KafkaAlerts-alertpublisher.png new file mode 100644 index 0000000..041cdb4 Binary files /dev/null and b/kafka-plugins-0.8/icons/KafkaAlerts-alertpublisher.png differ diff --git a/kafka-plugins-0.8/pom.xml b/kafka-plugins-0.8/pom.xml index 85a05f6..c812ab7 100644 --- a/kafka-plugins-0.8/pom.xml +++ b/kafka-plugins-0.8/pom.xml @@ -5,13 +5,13 @@ kafka-plugins co.cask.hydrator - 1.8.1 + 1.8.2-SNAPSHOT 4.0.0 Apache Kafka 0.8 plugins kafka-plugins - 1.8.1-0.8.2.2 + 1.8.2-SNAPSHOT-0.8.2.2 @@ -25,6 +25,93 @@ + + org.apache.spark + spark-streaming-kafka_2.10 + ${spark1.version} + + + org.apache.spark + spark-mllib_2.10 + ${spark1.version} + provided + + + org.apache.spark + spark-streaming_2.10 + ${spark1.version} + provided + + + org.apache.spark + spark-core_2.10 + ${spark1.version} + provided + + + org.slf4j + slf4j-log4j12 + + + log4j + log4j + + + org.apache.hadoop + hadoop-client + + + com.esotericsoftware.reflectasm + reflectasm + + + org.apache.curator + curator-recipes + + + org.tachyonproject + tachyon-client + + + org.scala-lang + scala-compiler + + + org.eclipse.jetty.orbit + javax.servlet + + + + net.java.dev.jets3t + jets3t + + + asm + asm + + + + + co.cask.cdap + cdap-spark-core + ${cdap.version} + test + + + co.cask.cdap + cdap-data-pipeline + ${cdap.version} + test + + + co.cask.cdap + cdap-data-streams + ${cdap.version} + test + @@ -33,24 +120,15 @@ org.apache.felix maven-bundle-plugin 3.3.0 - true <_exportcontents>co.cask.hydrator.plugin.*;org.apache.spark.streaming.kafka.*; - kafka.serializer.*;kafka.common; + kafka.serializer.*;kafka.common;com.google.common.base.*; *;inline=false;scope=compile true lib - - - package - - bundle - - - co.cask @@ -59,6 +137,7 @@ system:cdap-data-pipeline[4.3.0-SNAPSHOT,6.0.0-SNAPSHOT) + system:cdap-data-streams[4.3.0-SNAPSHOT,6.0.0-SNAPSHOT) diff --git a/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/alertpublisher/KafkaAlertPublisher.java b/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/alertpublisher/KafkaAlertPublisher.java index 340181a..aed09a5 100644 --- a/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/alertpublisher/KafkaAlertPublisher.java +++ b/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/alertpublisher/KafkaAlertPublisher.java @@ -1,3 +1,19 @@ +/* + * Copyright © 2018 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + package co.cask.hydrator.plugin.alertpublisher; import co.cask.cdap.api.annotation.Description; @@ -34,7 +50,7 @@ @Name("KafkaAlerts") public class KafkaAlertPublisher extends AlertPublisher { private static final Logger LOG = LoggerFactory.getLogger(KafkaAlertPublisher.class); - public static final Gson GSON = new Gson(); + private static final Gson GSON = new Gson(); private final Config config; private KafkaProducer producer; diff --git a/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/sink/Kafka.java b/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/sink/Kafka.java index f07637e..185f0d9 100644 --- a/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/sink/Kafka.java +++ b/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/sink/Kafka.java @@ -38,6 +38,8 @@ import org.apache.avro.reflect.Nullable; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.io.Text; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,9 +62,6 @@ public class Kafka extends ReferenceBatchSink { private final KafkaOutputFormatProvider kafkaOutputFormatProvider; // Static constants for configuring Kafka producer. - private static final String BROKER_LIST = "bootstrap.servers"; - private static final String KEY_SERIALIZER = "key.serializer"; - private static final String VAL_SERIALIZER = "value.serializer"; private static final String ACKS_REQUIRED = "acks"; public Kafka(Config producerConfig) { @@ -161,7 +160,7 @@ public static class Config extends ReferencePluginConfig { private String kafkaProperties; @Name("compressionType") - @Description("Additional kafka producer properties to set") + @Description("Compression type to be applied on message") @Macro private String compressionType; @@ -185,10 +184,10 @@ private static class KafkaOutputFormatProvider implements OutputFormatProvider { this.conf = new HashMap<>(); conf.put("topic", kafkaSinkConfig.topic); - conf.put(BROKER_LIST, kafkaSinkConfig.brokers); + conf.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSinkConfig.brokers); conf.put("compression.type", kafkaSinkConfig.compressionType); - conf.put(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer"); - conf.put(VAL_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer"); + conf.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName()); + conf.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName()); addKafkaProperties(kafkaSinkConfig.kafkaProperties); diff --git a/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/sink/KafkaOutputFormat.java b/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/sink/KafkaOutputFormat.java index 62ca176..f3a1e42 100644 --- a/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/sink/KafkaOutputFormat.java +++ b/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/sink/KafkaOutputFormat.java @@ -39,13 +39,8 @@ public class KafkaOutputFormat extends OutputFormat { private static final Logger LOG = LoggerFactory.getLogger(KafkaOutputFormat.class); - // Static constants for configuring Kafka producer. - private static final String BROKER_LIST = "bootstrap.servers"; - private static final String KEY_SERIALIZER = "key.serializer"; - private static final String VAL_SERIALIZER = "value.serializer"; private KafkaProducer producer; - @Override public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException { } @@ -91,9 +86,12 @@ public RecordWriter getRecordWriter(TaskAttemptContext context) Properties props = new Properties(); // Configure the properties for kafka. - props.put(BROKER_LIST, configuration.get(BROKER_LIST)); - props.put(KEY_SERIALIZER, configuration.get(KEY_SERIALIZER)); - props.put(VAL_SERIALIZER, configuration.get(VAL_SERIALIZER)); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + configuration.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + configuration.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + configuration.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)); props.put("compression.type", configuration.get("compression.type")); if (!Strings.isNullOrEmpty(configuration.get("hasKey"))) { diff --git a/kafka-plugins-0.8/src/test/java/co/cask/hydrator/Kafka8BatchSourceTest.java b/kafka-plugins-0.8/src/test/java/co/cask/hydrator/KafkaBatchSourceTest.java similarity index 97% rename from kafka-plugins-0.8/src/test/java/co/cask/hydrator/Kafka8BatchSourceTest.java rename to kafka-plugins-0.8/src/test/java/co/cask/hydrator/KafkaBatchSourceTest.java index 706f773..91f9ea4 100644 --- a/kafka-plugins-0.8/src/test/java/co/cask/hydrator/Kafka8BatchSourceTest.java +++ b/kafka-plugins-0.8/src/test/java/co/cask/hydrator/KafkaBatchSourceTest.java @@ -52,6 +52,7 @@ import org.apache.twill.kafka.client.KafkaClientService; import org.apache.twill.kafka.client.KafkaPublisher; import org.apache.twill.zookeeper.ZKClientService; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -68,7 +69,7 @@ /** * Unit tests for our plugins. */ -public class Kafka8BatchSourceTest extends HydratorTestBase { +public class KafkaBatchSourceTest extends HydratorTestBase { private static final ArtifactSummary APP_ARTIFACT = new ArtifactSummary("data-pipeline", "1.0.0"); @ClassRule public static final TestConfiguration CONFIG = new TestConfiguration("explore.enabled", false); @@ -107,6 +108,14 @@ public static void setupTestClass() throws Exception { kafkaClient.startAndWait(); } + @AfterClass + public static void cleanup() { + kafkaClient.stopAndWait(); + kafkaServer.stopAndWait(); + zkClient.stopAndWait(); + zkServer.stopAndWait(); + } + @Test public void testKafkaSource() throws Exception { Schema schema = Schema.recordOf( diff --git a/kafka-plugins-0.8/src/test/java/co/cask/hydrator/KafkaSinkAndAlertsPublisherTest.java b/kafka-plugins-0.8/src/test/java/co/cask/hydrator/KafkaSinkAndAlertsPublisherTest.java new file mode 100644 index 0000000..ac6ab15 --- /dev/null +++ b/kafka-plugins-0.8/src/test/java/co/cask/hydrator/KafkaSinkAndAlertsPublisherTest.java @@ -0,0 +1,277 @@ +/* + * Copyright © 2018 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package co.cask.hydrator; + +import co.cask.cdap.api.artifact.ArtifactSummary; +import co.cask.cdap.api.data.format.StructuredRecord; +import co.cask.cdap.api.data.schema.Schema; +import co.cask.cdap.api.dataset.table.Table; +import co.cask.cdap.datapipeline.DataPipelineApp; +import co.cask.cdap.datapipeline.SmartWorkflow; +import co.cask.cdap.etl.api.Alert; +import co.cask.cdap.etl.mock.alert.NullAlertTransform; +import co.cask.cdap.etl.mock.batch.MockSource; +import co.cask.cdap.etl.mock.test.HydratorTestBase; +import co.cask.cdap.etl.proto.v2.ETLBatchConfig; +import co.cask.cdap.etl.proto.v2.ETLPlugin; +import co.cask.cdap.etl.proto.v2.ETLStage; +import co.cask.cdap.proto.ProgramRunStatus; +import co.cask.cdap.proto.artifact.AppRequest; +import co.cask.cdap.proto.id.ApplicationId; +import co.cask.cdap.proto.id.ArtifactId; +import co.cask.cdap.proto.id.NamespaceId; +import co.cask.cdap.test.ApplicationManager; +import co.cask.cdap.test.DataSetManager; +import co.cask.cdap.test.TestConfiguration; +import co.cask.cdap.test.WorkflowManager; +import co.cask.hydrator.plugin.alertpublisher.KafkaAlertPublisher; +import co.cask.hydrator.plugin.sink.Kafka; +import com.google.common.base.Charsets; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.gson.Gson; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.twill.common.Cancellable; +import org.apache.twill.internal.kafka.EmbeddedKafkaServer; +import org.apache.twill.internal.kafka.client.ZKKafkaClientService; +import org.apache.twill.internal.utils.Networks; +import org.apache.twill.internal.zookeeper.InMemoryZKServer; +import org.apache.twill.kafka.client.FetchedMessage; +import org.apache.twill.kafka.client.KafkaClientService; +import org.apache.twill.kafka.client.KafkaConsumer; +import org.apache.twill.zookeeper.ZKClientService; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; + +import java.io.File; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * Kafka Sink and Alerts Publisher test + */ +public class KafkaSinkAndAlertsPublisherTest extends HydratorTestBase { + private static final ArtifactSummary APP_ARTIFACT = new ArtifactSummary("data-pipeline", "1.0.0"); + @ClassRule + public static final TestConfiguration CONFIG = new TestConfiguration("explore.enabled", false); + + private static final Gson GSON = new Gson(); + + private static ZKClientService zkClient; + private static KafkaClientService kafkaClient; + private static InMemoryZKServer zkServer; + private static EmbeddedKafkaServer kafkaServer; + private static int kafkaPort; + + @BeforeClass + public static void setupTestClass() throws Exception { + ArtifactId parentArtifact = NamespaceId.DEFAULT.artifact(APP_ARTIFACT.getName(), APP_ARTIFACT.getVersion()); + + // add the data-pipeline artifact and mock plugins + setupBatchArtifacts(parentArtifact, DataPipelineApp.class); + + // add our plugins artifact with the data-pipeline artifact as its parent. + // this will make our plugins available to data-pipeline. + addPluginArtifact(NamespaceId.DEFAULT.artifact("example-plugins", "1.0.0"), + parentArtifact, + Kafka.class, + KafkaAlertPublisher.class, + StringSerializer.class); + + zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).build(); + zkServer.startAndWait(); + + kafkaPort = Networks.getRandomPort(); + kafkaServer = new EmbeddedKafkaServer(generateKafkaConfig(zkServer.getConnectionStr(), + kafkaPort, TMP_FOLDER.newFolder())); + kafkaServer.startAndWait(); + + zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); + zkClient.startAndWait(); + + kafkaClient = new ZKKafkaClientService(zkClient); + kafkaClient.startAndWait(); + } + + @AfterClass + public static void cleanup() { + kafkaClient.stopAndWait(); + kafkaServer.stopAndWait(); + zkClient.stopAndWait(); + zkServer.stopAndWait(); + } + + @Test + public void testKafkaSinkAndAlertsPublisher() throws Exception { + Schema schema = Schema.recordOf( + "user", + Schema.Field.of("id", Schema.nullableOf(Schema.of(Schema.Type.LONG))), + Schema.Field.of("first", Schema.of(Schema.Type.STRING)), + Schema.Field.of("last", Schema.of(Schema.Type.STRING))); + + // create the pipeline config + String inputName = "sinkTestInput"; + + String usersTopic = "records"; + String alertsTopic = "alerts"; + Map sinkProperties = new HashMap<>(); + sinkProperties.put("brokers", "localhost:" + kafkaPort); + sinkProperties.put("referenceName", "kafkaTest"); + sinkProperties.put("topic", usersTopic); + sinkProperties.put("schema", schema.toString()); + sinkProperties.put("format", "csv"); + sinkProperties.put("key", "last"); + sinkProperties.put("async", "FALSE"); + sinkProperties.put("compressionType", "none"); + + Map alertProperties = new HashMap<>(); + alertProperties.put("brokers", "localhost:" + kafkaPort); + alertProperties.put("topic", alertsTopic); + + ETLStage source = new ETLStage("source", MockSource.getPlugin(inputName)); + ETLStage sink = + new ETLStage("sink", new ETLPlugin("Kafka", Kafka.PLUGIN_TYPE, sinkProperties, null)); + ETLStage transform = new ETLStage("nullAlert", NullAlertTransform.getPlugin("id")); + ETLStage alert = + new ETLStage("alert", new ETLPlugin("KafkaAlerts", KafkaAlertPublisher.PLUGIN_TYPE, alertProperties)); + + ETLBatchConfig pipelineConfig = ETLBatchConfig.builder("* * * * *") + .addStage(source) + .addStage(transform) + .addStage(sink) + .addStage(alert) + .addConnection(source.getName(), transform.getName()) + .addConnection(transform.getName(), sink.getName()) + .addConnection(transform.getName(), alert.getName()) + .build(); + + // create the pipeline + ApplicationId pipelineId = NamespaceId.DEFAULT.app("testKafkaSink"); + ApplicationManager appManager = deployApplication(pipelineId, new AppRequest<>(APP_ARTIFACT, pipelineConfig)); + + + Set expected = ImmutableSet.of("100,samuel,jackson", + "200,dwayne,johnson", + "300,christopher,walken", + "400,donald,trump"); + + List records = new ArrayList<>(); + for (String e : expected) { + String[] splits = e.split(","); + StructuredRecord record = + StructuredRecord.builder(schema) + .set("id", Long.parseLong(splits[0])) + .set("first", splits[1]) + .set("last", splits[2]) + .build(); + records.add(record); + } + + // Add a null record to get an alert + StructuredRecord nullRecord = + StructuredRecord.builder(schema) + .set("first", "terry") + .set("last", "crews") + .build(); + records.add(nullRecord); + + DataSetManager
sourceTable = getDataset(inputName); + MockSource.writeInput(sourceTable, records); + + WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); + workflowManager.start(); + workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 1, TimeUnit.MINUTES); + + // Assert users + Set actual = readKafkaRecords(usersTopic, expected.size()); + Assert.assertEquals(expected, actual); + + // Assert alerts + Set actualAlerts = readKafkaRecords(alertsTopic, 1); + // NullAlertTransform always returns empty hash map in alert + Assert.assertEquals(ImmutableSet.of(new Alert(transform.getName(), new HashMap())), + ImmutableSet.copyOf(Iterables.transform(actualAlerts, + new Function() { + @Override + public Alert apply(String s) { + return GSON.fromJson(s, Alert.class); + } + } + ))); + } + + private Set readKafkaRecords(String topic, final int maxMessages) throws InterruptedException { + KafkaConsumer kafkaConsumer = kafkaClient.getConsumer(); + + final Set kafkaMessages = new HashSet<>(); + KafkaConsumer.Preparer preparer = kafkaConsumer.prepare(); + preparer.addFromBeginning(topic, 0); + + final CountDownLatch stopLatch = new CountDownLatch(1); + Cancellable cancellable = preparer.consume(new KafkaConsumer.MessageCallback() { + @Override + public long onReceived(Iterator messages) { + long nextOffset = 0; + while (messages.hasNext()) { + FetchedMessage message = messages.next(); + nextOffset = message.getNextOffset(); + String payload = Charsets.UTF_8.decode(message.getPayload()).toString(); + kafkaMessages.add(payload); + } + // We are done when maxMessages are received + if (kafkaMessages.size() >= maxMessages) { + stopLatch.countDown(); + } + return nextOffset; + } + + @Override + public void finished() { + // nothing to do + } + }); + + stopLatch.await(30, TimeUnit.SECONDS); + cancellable.cancel(); + return kafkaMessages; + } + + private static Properties generateKafkaConfig(String zkConnectStr, int port, File logDir) { + Properties prop = new Properties(); + prop.setProperty("log.dir", logDir.getAbsolutePath()); + prop.setProperty("port", Integer.toString(port)); + prop.setProperty("broker.id", "1"); + prop.setProperty("num.partitions", "1"); + prop.setProperty("zookeeper.connect", zkConnectStr); + prop.setProperty("zookeeper.connection.timeout.ms", "1000000"); + prop.setProperty("default.replication.factor", "1"); + return prop; + } + +} diff --git a/kafka-plugins-0.8/src/test/java/co/cask/hydrator/KafkaStreamingSourceTest.java b/kafka-plugins-0.8/src/test/java/co/cask/hydrator/KafkaStreamingSourceTest.java new file mode 100644 index 0000000..0950556 --- /dev/null +++ b/kafka-plugins-0.8/src/test/java/co/cask/hydrator/KafkaStreamingSourceTest.java @@ -0,0 +1,268 @@ +/* + * Copyright © 2018 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package co.cask.hydrator; + +import co.cask.cdap.api.artifact.ArtifactRange; +import co.cask.cdap.api.artifact.ArtifactSummary; +import co.cask.cdap.api.artifact.ArtifactVersion; +import co.cask.cdap.api.data.format.StructuredRecord; +import co.cask.cdap.api.data.schema.Schema; +import co.cask.cdap.api.dataset.table.Table; +import co.cask.cdap.common.utils.Networks; +import co.cask.cdap.common.utils.Tasks; +import co.cask.cdap.datapipeline.DataPipelineApp; +import co.cask.cdap.datastreams.DataStreamsApp; +import co.cask.cdap.datastreams.DataStreamsSparkLauncher; +import co.cask.cdap.etl.api.streaming.StreamingSource; +import co.cask.cdap.etl.mock.batch.MockSink; +import co.cask.cdap.etl.mock.test.HydratorTestBase; +import co.cask.cdap.etl.proto.v2.DataStreamsConfig; +import co.cask.cdap.etl.proto.v2.ETLPlugin; +import co.cask.cdap.etl.proto.v2.ETLStage; +import co.cask.cdap.proto.artifact.AppRequest; +import co.cask.cdap.proto.id.ApplicationId; +import co.cask.cdap.proto.id.ArtifactId; +import co.cask.cdap.proto.id.NamespaceId; +import co.cask.cdap.test.ApplicationManager; +import co.cask.cdap.test.DataSetManager; +import co.cask.cdap.test.SparkManager; +import co.cask.cdap.test.TestConfiguration; +import co.cask.hydrator.common.http.HTTPPollConfig; +import co.cask.hydrator.plugin.source.KafkaStreamingSource; +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.Uninterruptibles; +import kafka.common.TopicAndPartition; +import kafka.serializer.DefaultDecoder; +import org.apache.spark.streaming.kafka.KafkaUtils; +import org.apache.twill.internal.kafka.EmbeddedKafkaServer; +import org.apache.twill.internal.kafka.client.ZKKafkaClientService; +import org.apache.twill.internal.zookeeper.InMemoryZKServer; +import org.apache.twill.kafka.client.Compression; +import org.apache.twill.kafka.client.KafkaClientService; +import org.apache.twill.kafka.client.KafkaPublisher; +import org.apache.twill.zookeeper.ZKClientService; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +/** + * Tests for Spark plugins. + */ +public class KafkaStreamingSourceTest extends HydratorTestBase { + + @ClassRule + public static final TestConfiguration CONFIG = new TestConfiguration("explore.enabled", false); + + private static final ArtifactId DATAPIPELINE_ARTIFACT_ID = + NamespaceId.DEFAULT.artifact("data-pipeline", "4.3.2"); + private static final ArtifactId DATASTREAMS_ARTIFACT_ID = + NamespaceId.DEFAULT.artifact("data-streams", "4.3.2"); + private static final ArtifactSummary DATASTREAMS_ARTIFACT = + new ArtifactSummary("data-streams", "4.3.2"); + + private static ZKClientService zkClient; + private static KafkaClientService kafkaClient; + private static InMemoryZKServer zkServer; + private static EmbeddedKafkaServer kafkaServer; + private static int kafkaPort; + + @ClassRule + public static TemporaryFolder tmpFolder = new TemporaryFolder(); + + + @BeforeClass + public static void setupTest() throws Exception { + // add the artifact for data pipeline app + setupBatchArtifacts(DATAPIPELINE_ARTIFACT_ID, DataPipelineApp.class); + + setupStreamingArtifacts(DATASTREAMS_ARTIFACT_ID, DataStreamsApp.class); + + // add artifact for spark plugins + Set parents = ImmutableSet.of( + new ArtifactRange(NamespaceId.DEFAULT.getNamespace(), DATAPIPELINE_ARTIFACT_ID.getArtifact(), + new ArtifactVersion(DATAPIPELINE_ARTIFACT_ID.getVersion()), true, + new ArtifactVersion(DATAPIPELINE_ARTIFACT_ID.getVersion()), true), + new ArtifactRange(NamespaceId.DEFAULT.getNamespace(), DATASTREAMS_ARTIFACT_ID.getArtifact(), + new ArtifactVersion(DATASTREAMS_ARTIFACT_ID.getVersion()), true, + new ArtifactVersion(DATASTREAMS_ARTIFACT_ID.getVersion()), true) + ); + addPluginArtifact(NamespaceId.DEFAULT.artifact("spark-plugins", "1.0.0"), parents, + KafkaStreamingSource.class, KafkaUtils.class, DefaultDecoder.class, TopicAndPartition.class, + HTTPPollConfig.class); + + zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).build(); + zkServer.startAndWait(); + + kafkaPort = Networks.getRandomPort(); + kafkaServer = new EmbeddedKafkaServer(generateKafkaConfig(zkServer.getConnectionStr(), + kafkaPort, TMP_FOLDER.newFolder())); + kafkaServer.startAndWait(); + + zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); + zkClient.startAndWait(); + + kafkaClient = new ZKKafkaClientService(zkClient); + kafkaClient.startAndWait(); + } + + @AfterClass + public static void cleanup() { + kafkaClient.stopAndWait(); + kafkaServer.stopAndWait(); + zkClient.stopAndWait(); + zkServer.stopAndWait(); + } + + @Test + public void testKafkaStreamingSource() throws Exception { + Schema schema = Schema.recordOf( + "user", + Schema.Field.of("id", Schema.of(Schema.Type.LONG)), + Schema.Field.of("first", Schema.of(Schema.Type.STRING)), + Schema.Field.of("last", Schema.of(Schema.Type.STRING))); + + Map properties = new HashMap<>(); + properties.put("referenceName", "kafkaPurchases"); + properties.put("brokers", "localhost:" + kafkaPort); + properties.put("topic", "users"); + properties.put("defaultInitialOffset", "-2"); + properties.put("format", "csv"); + properties.put("schema", schema.toString()); + + ETLStage source = new ETLStage("source", new ETLPlugin("Kafka", StreamingSource.PLUGIN_TYPE, properties, null)); + + DataStreamsConfig etlConfig = DataStreamsConfig.builder() + .addStage(source) + .addStage(new ETLStage("sink", MockSink.getPlugin("kafkaOutput"))) + .addConnection("source", "sink") + .setBatchInterval("1s") + .setStopGracefully(true) + .build(); + + AppRequest appRequest = new AppRequest<>(DATASTREAMS_ARTIFACT, etlConfig); + ApplicationId appId = NamespaceId.DEFAULT.app("KafkaSourceApp"); + ApplicationManager appManager = deployApplication(appId, appRequest); + + // write some messages to kafka + Map messages = new HashMap<>(); + messages.put("a", "1,samuel,jackson"); + messages.put("b", "2,dwayne,johnson"); + messages.put("c", "3,christopher,walken"); + sendKafkaMessage("users", messages); + + SparkManager sparkManager = appManager.getSparkManager(DataStreamsSparkLauncher.NAME); + sparkManager.start(); + sparkManager.waitForStatus(true, 10, 1); + + final DataSetManager
outputManager = getDataset("kafkaOutput"); + Tasks.waitFor( + ImmutableMap.of(1L, "samuel jackson", 2L, "dwayne johnson", 3L, "christopher walken"), + new Callable>() { + @Override + public Map call() throws Exception { + outputManager.flush(); + Map actual = new HashMap<>(); + for (StructuredRecord outputRecord : MockSink.readOutput(outputManager)) { + actual.put((Long) outputRecord.get("id"), outputRecord.get("first") + " " + outputRecord.get("last")); + } + return actual; + } + }, + 2, + TimeUnit.MINUTES); + + sparkManager.stop(); + sparkManager.waitForStatus(false, 10, 1); + + // clear the output table + MockSink.clear(outputManager); + + // now write some more messages to kafka and start the program again to make sure it picks up where it left off + messages = new HashMap<>(); + messages.put("d", "4,terry,crews"); + messages.put("e", "5,sylvester,stallone"); + sendKafkaMessage("users", messages); + + sparkManager.start(); + sparkManager.waitForStatus(true, 10, 1); + + Tasks.waitFor( + ImmutableMap.of(4L, "terry crews", 5L, "sylvester stallone"), + new Callable>() { + @Override + public Map call() throws Exception { + outputManager.flush(); + Map actual = new HashMap<>(); + for (StructuredRecord outputRecord : MockSink.readOutput(outputManager)) { + actual.put((Long) outputRecord.get("id"), outputRecord.get("first") + " " + outputRecord.get("last")); + } + return actual; + } + }, + 2, + TimeUnit.MINUTES); + + sparkManager.stop(); + } + + private static Properties generateKafkaConfig(String zkConnectStr, int port, File logDir) { + Properties prop = new Properties(); + prop.setProperty("log.dir", logDir.getAbsolutePath()); + prop.setProperty("port", Integer.toString(port)); + prop.setProperty("broker.id", "1"); + prop.setProperty("num.partitions", "1"); + prop.setProperty("zookeeper.connect", zkConnectStr); + prop.setProperty("zookeeper.connection.timeout.ms", "1000000"); + prop.setProperty("default.replication.factor", "1"); + return prop; + } + + private void sendKafkaMessage(@SuppressWarnings("SameParameterValue") String topic, Map messages) { + KafkaPublisher publisher = kafkaClient.getPublisher(KafkaPublisher.Ack.ALL_RECEIVED, Compression.NONE); + + // If publish failed, retry up to 20 times, with 100ms delay between each retry + // This is because leader election in Kafka 08 takes time when a topic is being created upon publish request. + int count = 0; + do { + KafkaPublisher.Preparer preparer = publisher.prepare(topic); + for (Map.Entry entry : messages.entrySet()) { + preparer.add(Charsets.UTF_8.encode(entry.getValue()), entry.getKey()); + } + try { + preparer.send().get(); + break; + } catch (Exception e) { + // Backoff if send failed. + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } + } while (count++ < 20); + } + + +} diff --git a/kafka-plugins-0.9/pom.xml b/kafka-plugins-0.9/pom.xml deleted file mode 100644 index 933edb0..0000000 --- a/kafka-plugins-0.9/pom.xml +++ /dev/null @@ -1,78 +0,0 @@ - - - - kafka-plugins - co.cask.hydrator - 1.8.1 - - 4.0.0 - - Apache Kafka 0.9 plugins - kafka-plugins - 1.8.1-0.9.0.1 - - - - org.apache.kafka - kafka_2.10 - ${kafka9.version} - - - org.slf4j - slf4j-log4j12 - - - - - - - - - org.apache.felix - maven-bundle-plugin - 3.3.0 - true - - - <_exportcontents>co.cask.hydrator.plugin.*;org.apache.spark.streaming.kafka.*; - kafka.serializer.*;kafka.common;org.apache.kafka.common.serialization.*; - org.apache.kafka.clients.consumer.* - *;inline=false;scope=compile - true - lib - - - - - package - - bundle - - - - - - co.cask - cdap-maven-plugin - 1.0.0 - - - system:cdap-data-pipeline[4.3.0-SNAPSHOT,6.0.0-SNAPSHOT) - - - - - create-artifact-config - prepare-package - - create-plugin-json - - - - - - - - diff --git a/pom.xml b/pom.xml index ed89620..d10a316 100644 --- a/pom.xml +++ b/pom.xml @@ -8,7 +8,7 @@ co.cask.hydrator kafka-plugins pom - 1.8.1 + 1.8.2-SNAPSHOT @@ -20,7 +20,7 @@ kafka-plugins-0.8 - kafka-plugins-0.9 + kafka-plugins-0.10 @@ -66,11 +66,12 @@ UTF-8 4.3.2 1.8.2 - 1.6.1 + 1.6.1 + 2.2.0 widgets docs 0.8.2.2 - 0.9.0.1 + 0.10.2.0 2.3.0 ${project.basedir} @@ -80,39 +81,50 @@ co.cask.cdap cdap-etl-api ${cdap.version} + provided co.cask.cdap cdap-etl-api-spark ${cdap.version} - - - co.cask.cdap - cdap-common - ${cdap.version} - - - co.cask.cdap - cdap-formats - ${cdap.version} + provided co.cask.cdap - hydrator-test + cdap-common ${cdap.version} - test + provided + + + org.apache.twill + twill-core + + + org.apache.twill + twill-yarn + + co.cask.cdap - cdap-data-pipeline + cdap-formats ${cdap.version} - test co.cask.cdap - cdap-data-streams + hydrator-test ${cdap.version} test + + + org.apache.kafka + kafka_2.10 + + + org.apache.spark + spark-core_2.10 + + co.cask.hydrator @@ -242,69 +254,9 @@ 3.0 - org.apache.spark - spark-streaming-kafka_2.10 - ${spark.version} - - - org.apache.spark - spark-mllib_2.10 - ${spark.version} - provided - - - org.apache.spark - spark-streaming_2.10 - ${spark.version} - provided - - - org.apache.spark - spark-core_2.10 - ${spark.version} - provided - - - org.slf4j - slf4j-log4j12 - - - log4j - log4j - - - org.apache.hadoop - hadoop-client - - - com.esotericsoftware.reflectasm - reflectasm - - - org.apache.curator - curator-recipes - - - org.tachyonproject - tachyon-client - - - org.scala-lang - scala-compiler - - - org.eclipse.jetty.orbit - javax.servlet - - - - net.java.dev.jets3t - jets3t - - + com.google.guava + guava + 13.0.1 @@ -325,15 +277,6 @@ maven-bundle-plugin 2.5.4 true - - - <_exportcontents>co.cask.hydrator.plugin.*;org.apache.spark.streaming.kafka.*; - kafka.serializer.*;kafka.common; - *;inline=false;scope=compile - true - lib - - package @@ -350,6 +293,10 @@ org.apache.maven.plugins maven-surefire-plugin 2.14.1 + + -Xmx2048m -Djava.awt.headless=true -XX:MaxPermSize=256m -XX:+UseConcMarkSweepGC -XX:OnOutOfMemoryError="kill -9 %p" -XX:+HeapDumpOnOutOfMemoryError + false + org.apache.felix