diff --git a/cogrouping-streams/kstreams/.gitattributes b/cogrouping-streams/kstreams/.gitattributes deleted file mode 100644 index 097f9f98..00000000 --- a/cogrouping-streams/kstreams/.gitattributes +++ /dev/null @@ -1,9 +0,0 @@ -# -# https://help.github.com/articles/dealing-with-line-endings/ -# -# Linux start script should use lf -/gradlew text eol=lf - -# These are Windows script files and should use crlf -*.bat text eol=crlf - diff --git a/cogrouping-streams/kstreams/gradle.properties b/cogrouping-streams/kstreams/gradle.properties deleted file mode 100644 index 18f452c7..00000000 --- a/cogrouping-streams/kstreams/gradle.properties +++ /dev/null @@ -1,6 +0,0 @@ -# This file was generated by the Gradle 'init' task. -# https://docs.gradle.org/current/userguide/build_environment.html#sec:gradle_configuration_properties - -org.gradle.parallel=true -org.gradle.caching=true - diff --git a/column-difference/ksql/README.md b/column-difference/ksql/README.md new file mode 100644 index 00000000..7132aa15 --- /dev/null +++ b/column-difference/ksql/README.md @@ -0,0 +1,175 @@ +# Column difference + +This tutorial demonstrates how to calculate the difference between two columns. + +## Setup + +The first thing we do is to create a stream named `PURCHASE_STREAM` + +```sql +CREATE STREAM PURCHASE_STREAM ( + ID VARCHAR, + PREVIOUS_PURCHASE DOUBLE, + CURRENT_PURCHASE DOUBLE, + TXN_TS VARCHAR, + FIRST_NAME VARCHAR, + LAST_NAME VARCHAR) + + WITH (KAFKA_TOPIC='customer_purchases', + VALUE_FORMAT='JSON', + PARTITIONS=1); +``` + +## Calculate the difference between two columns + +Now create a query to determine the difference between two columns: + +```sql +CREATE STREAM PURCHASE_HISTORY_STREAM AS + SELECT FIRST_NAME, + LAST_NAME, + CURRENT_PURCHASE - PREVIOUS_PURCHASE as PURCHASE_DIFF +FROM PURCHASE_STREAM; +``` + +## Running the example + +
+ ksqlDB CLI + +#### Prerequisites + +* Docker running via [Docker Desktop](https://docs.docker.com/desktop/) or [Docker Engine](https://docs.docker.com/engine/install/) +* [Docker Compose](https://docs.docker.com/compose/install/). Ensure that the command `docker compose version` succeeds. + +#### Run the commands + +First, start ksqlDB and Kafka: + + ```shell + docker compose -f ./docker/docker-compose-ksqldb.yml up -d + ``` +Next, open the ksqlDB CLI: + + ```shell + docker exec -it ksqldb-cli ksql http://ksqldb-server:8088 + ``` + +Finally, run following SQL statements to create the `PURCHASE_STREAM` stream backed by Kafka running in Docker, populate it with +test data, and run the query that calculates the column difference. + +```sql + CREATE STREAM PURCHASE_STREAM ( + ID VARCHAR, + PREVIOUS_PURCHASE DOUBLE, + CURRENT_PURCHASE DOUBLE, + TXN_TS VARCHAR, + FIRST_NAME VARCHAR, + LAST_NAME VARCHAR) + + WITH (KAFKA_TOPIC='customer_purchases', + VALUE_FORMAT='JSON', + PARTITIONS=1); +``` + +Before we get too far, let’s set the `auto.offset.reset` configuration parameter to earliest. This means all new ksqlDB queries will +automatically compute their results from the beginning of a stream, rather than the end. This isn’t always what you’ll want to do in +production, but it makes query results much easier to see in examples like this. + +`SET 'auto.offset.reset' = 'earliest';` + +And let's adjust the column width, so we can easily see the results: + +`SET CLI COLUMN-WIDTH 20` + +```sql +INSERT INTO PURCHASE_STREAM (ID, PREVIOUS_PURCHASE, CURRENT_PURCHASE, TXN_TS, FIRST_NAME, LAST_NAME) VALUES ('1', 8000.54, 5004.89, '2020-12-04 02:35:43', 'Art', 'Vandeley'); +INSERT INTO PURCHASE_STREAM (ID, PREVIOUS_PURCHASE, CURRENT_PURCHASE, TXN_TS, FIRST_NAME, LAST_NAME) VALUES ('2', 500.33, 1000.89, '2020-12-04 02:35:44', 'Nick', 'Fury'); +INSERT INTO PURCHASE_STREAM (ID, PREVIOUS_PURCHASE, CURRENT_PURCHASE, TXN_TS, FIRST_NAME, LAST_NAME) VALUES ('3', 333.18, 804.89, '2020-12-04 02:35:45', 'Natasha', 'Romanov'); +INSERT INTO PURCHASE_STREAM (ID, PREVIOUS_PURCHASE, CURRENT_PURCHASE, TXN_TS, FIRST_NAME, LAST_NAME) VALUES ('4', 72848.11, 60040.89, '2020-12-04 02:35:46', 'Wanda', 'Maximoff'); +``` + +```sql +CREATE STREAM PURCHASE_HISTORY_STREAM AS +SELECT FIRST_NAME, + LAST_NAME, + CURRENT_PURCHASE - PREVIOUS_PURCHASE as PURCHASE_DIFF +FROM PURCHASE_STREAM; +``` + +```sql + SELECT * from PURCHASE_HISTORY_STREAM; +``` + +The query output should look something like this: + +```plaintext ++--------------------+--------------------+--------------------+ +|FIRST_NAME |LAST_NAME |PURCHASE_DIFF | ++--------------------+--------------------+--------------------+ +|Art |Vandeley |-2995.6499999999996 | +|Nick |Fury |500.56 | +|Natasha |Romanov |471.71 | +|Wanda |Maximoff |-12807.220000000001 | +``` +When you are finished, clean up the containers used for this tutorial by running: + + ```shell + docker compose -f ./docker/docker-compose-ksqldb.yml down -v + ``` +
+
+ Confluent Cloud + +#### Prerequisites + +* A [Confluent Cloud](https://confluent.cloud/signup) account +* A ksqlDB cluster created in Confluent Cloud. Follow [this quick start](https://docs.confluent.io/cloud/current/get-started/index.html#section-2-add-ksql-cloud-to-the-cluster) to create one. + +#### Run the commands + +In the Confluent Cloud Console, navigate to your environment and then click the `ksqlDB` link from left-side menu. Then click on the +name of ksqlDB cluster you created. + +Finally, run following SQL statements in the ksqlDB UI `Editor` tab to create the `PURCHASE_STREAM` stream, populate it with +test data, and run the column difference query. + +```sql +CREATE STREAM PURCHASE_STREAM ( + ID VARCHAR, + PREVIOUS_PURCHASE DOUBLE, + CURRENT_PURCHASE DOUBLE, + TXN_TS VARCHAR, + FIRST_NAME VARCHAR, + LAST_NAME VARCHAR) + + WITH (KAFKA_TOPIC='customer_purchases', + VALUE_FORMAT='JSON', + PARTITIONS=1); +``` + + +```sql +INSERT INTO PURCHASE_STREAM (ID, PREVIOUS_PURCHASE, CURRENT_PURCHASE, TXN_TS, FIRST_NAME, LAST_NAME) VALUES ('1', 8000.54, 5004.89, '2020-12-04 02:35:43', 'Art', 'Vandeley'); +INSERT INTO PURCHASE_STREAM (ID, PREVIOUS_PURCHASE, CURRENT_PURCHASE, TXN_TS, FIRST_NAME, LAST_NAME) VALUES ('2', 500.33, 1000.89, '2020-12-04 02:35:44', 'Nick', 'Fury'); +INSERT INTO PURCHASE_STREAM (ID, PREVIOUS_PURCHASE, CURRENT_PURCHASE, TXN_TS, FIRST_NAME, LAST_NAME) VALUES ('3', 333.18, 804.89, '2020-12-04 02:35:45', 'Natasha', 'Romanov'); +INSERT INTO PURCHASE_STREAM (ID, PREVIOUS_PURCHASE, CURRENT_PURCHASE, TXN_TS, FIRST_NAME, LAST_NAME) VALUES ('4', 72848.11, 60040.89, '2020-12-04 02:35:46', 'Wanda', 'Maximoff'); +``` + +```sql +CREATE STREAM PURCHASE_HISTORY_STREAM AS +SELECT FIRST_NAME, + LAST_NAME, + CURRENT_PURCHASE - PREVIOUS_PURCHASE as PURCHASE_DIFF +FROM PURCHASE_STREAM; +``` + +```sql + SELECT * from PURCHASE_HISTORY_STREAM; +``` + +The query output should look like this: + +![column difference](img/column-diff.png) + +
\ No newline at end of file diff --git a/column-difference/ksql/code/.gitignore b/column-difference/ksql/code/.gitignore deleted file mode 100644 index 18caaf96..00000000 --- a/column-difference/ksql/code/.gitignore +++ /dev/null @@ -1,4 +0,0 @@ -extensions/ -tutorial-steps/dev/outputs/ -tutorial-steps/test/outputs/ -tutorial-steps/prod/outputs/ diff --git a/column-difference/ksql/code/Makefile b/column-difference/ksql/code/Makefile deleted file mode 100644 index 28c7bf36..00000000 --- a/column-difference/ksql/code/Makefile +++ /dev/null @@ -1,16 +0,0 @@ -STEPS_DIR := tutorial-steps -DEV_OUTPUTS_DIR := $(STEPS_DIR)/dev/outputs -TEST_OUTPUTS_DIR := $(STEPS_DIR)/test/outputs -PROD_OUTPUTS_DIR := $(STEPS_DIR)/prod/outputs -TEMP_DIR := $(shell mktemp -d) -SEQUENCE := "dev, test, prod, ccloud" - -tutorial: - rm -r $(DEV_OUTPUTS_DIR) || true - rm -r $(TEST_OUTPUTS_DIR) || true - mkdir $(DEV_OUTPUTS_DIR) - mkdir -p $(TEST_OUTPUTS_DIR) - harness-runner ../../../../../_data/harnesses/column-difference/ksql.yml $(TEMP_DIR) $(SEQUENCE) - diff --strip-trailing-cr $(STEPS_DIR)/dev/expected-transient-reporting.log $(DEV_OUTPUTS_DIR)/transient-reporting/output-0.log - diff --strip-trailing-cr $(STEPS_DIR)/test/expected-results.log $(TEST_OUTPUTS_DIR)/test-results.log - reset diff --git a/column-difference/ksql/code/docker-compose.yml b/column-difference/ksql/code/docker-compose.yml deleted file mode 100644 index eb6aca0d..00000000 --- a/column-difference/ksql/code/docker-compose.yml +++ /dev/null @@ -1,66 +0,0 @@ -version: '2' -services: - broker: - image: confluentinc/cp-kafka:7.4.1 - hostname: broker - container_name: broker - ports: - - 29092:29092 - environment: - KAFKA_BROKER_ID: 1 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092 - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_PROCESS_ROLES: broker,controller - KAFKA_NODE_ID: 1 - KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093 - KAFKA_LISTENERS: PLAINTEXT://broker:9092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:29092 - KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT - KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER - KAFKA_LOG_DIRS: /tmp/kraft-combined-logs - CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk - schema-registry: - image: confluentinc/cp-schema-registry:7.3.0 - hostname: schema-registry - container_name: schema-registry - depends_on: - - broker - ports: - - 8081:8081 - environment: - SCHEMA_REGISTRY_HOST_NAME: schema-registry - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: broker:9092 - ksqldb-server: - image: confluentinc/ksqldb-server:0.28.2 - hostname: ksqldb-server - container_name: ksqldb-server - depends_on: - - broker - - schema-registry - ports: - - 8088:8088 - environment: - KSQL_CONFIG_DIR: /etc/ksqldb - KSQL_LOG4J_OPTS: -Dlog4j.configuration=file:/etc/ksqldb/log4j.properties - KSQL_BOOTSTRAP_SERVERS: broker:9092 - KSQL_HOST_NAME: ksqldb-server - KSQL_LISTENERS: http://0.0.0.0:8088 - KSQL_CACHE_MAX_BYTES_BUFFERING: 0 - KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081 - KSQL_KSQL_STREAMS_AUTO_OFFSET_RESET: earliest - ksqldb-cli: - image: confluentinc/ksqldb-cli:0.28.2 - container_name: ksqldb-cli - depends_on: - - broker - - ksqldb-server - entrypoint: /bin/sh - tty: true - environment: - KSQL_CONFIG_DIR: /etc/ksqldb - volumes: - - ./src:/opt/app/src - - ./test:/opt/app/test diff --git a/column-difference/ksql/code/src/statements.sql b/column-difference/ksql/code/src/statements.sql deleted file mode 100644 index a71fa90d..00000000 --- a/column-difference/ksql/code/src/statements.sql +++ /dev/null @@ -1,18 +0,0 @@ -CREATE STREAM PURCHASE_STREAM ( - ID VARCHAR, - PREVIOUS_PURCHASE DOUBLE, - CURRENT_PURCHASE DOUBLE, - TXN_TS VARCHAR, - FIRST_NAME VARCHAR, - LAST_NAME VARCHAR) - - WITH (KAFKA_TOPIC='customer_purchases', - VALUE_FORMAT='JSON', - PARTITIONS=1); - - -CREATE STREAM PURCHASE_HISTORY_STREAM AS - SELECT FIRST_NAME, - LAST_NAME, - CURRENT_PURCHASE - PREVIOUS_PURCHASE as PURCHASE_DIFF -FROM PURCHASE_STREAM; \ No newline at end of file diff --git a/column-difference/ksql/code/test/input.json b/column-difference/ksql/code/test/input.json deleted file mode 100644 index 96cf5da6..00000000 --- a/column-difference/ksql/code/test/input.json +++ /dev/null @@ -1,48 +0,0 @@ -{ - "inputs": [ - { - "topic": "customer_purchases", - "value": { - "id": "1", - "previous_purchase": 8000.54, - "current_purchase": 5004.89, - "txn_ts": "2020-12-04 02:35:43", - "first_name": "Art", - "last_name": "Vandeley" - } - }, - { - "topic": "customer_purchases", - "value": { - "id": "2", - "previous_purchase": 500.33, - "current_purchase": 1000.89, - "txn_ts": "2020-12-04 02:35:44", - "first_name": "Nick", - "last_name": "Fury" - } - }, - { - "topic": "customer_purchases", - "value": { - "id": "3", - "previous_purchase": 333.18, - "current_purchase": 804.89, - "txn_ts": "2020-12-04 02:35:45", - "first_name": "Natasha", - "last_name": "Romanov" - } - }, - { - "topic": "customer_purchases", - "value": { - "id": "4", - "previous_purchase": 72848.11, - "current_purchase": 60040.89, - "txn_ts": "2020-12-04 02:35:46", - "first_name": "Wanda", - "last_name": "Maximoff" - } - } - ] -} \ No newline at end of file diff --git a/column-difference/ksql/code/test/output.json b/column-difference/ksql/code/test/output.json deleted file mode 100644 index e52ec876..00000000 --- a/column-difference/ksql/code/test/output.json +++ /dev/null @@ -1,36 +0,0 @@ -{ - "outputs": [ - { - "topic": "PURCHASE_HISTORY_STREAM", - "value": { - "FIRST_NAME" : "Art", - "LAST_NAME" : "Vandeley", - "PURCHASE_DIFF" : -2995.6499999999996 - } - }, - { - "topic": "PURCHASE_HISTORY_STREAM", - "value": { - "FIRST_NAME" : "Nick", - "LAST_NAME" : "Fury", - "PURCHASE_DIFF" : 500.56 - } - }, - { - "topic": "PURCHASE_HISTORY_STREAM", - "value": { - "FIRST_NAME" : "Natasha", - "LAST_NAME" : "Romanov", - "PURCHASE_DIFF" : 471.71 - } - }, - { - "topic": "PURCHASE_HISTORY_STREAM", - "value": { - "FIRST_NAME" : "Wanda", - "LAST_NAME" : "Maximoff", - "PURCHASE_DIFF" : -12807.220000000001 - } - } - ] -} diff --git a/column-difference/ksql/code/tutorial-steps/dev/clean-up.sh b/column-difference/ksql/code/tutorial-steps/dev/clean-up.sh deleted file mode 100644 index 36f5aa98..00000000 --- a/column-difference/ksql/code/tutorial-steps/dev/clean-up.sh +++ /dev/null @@ -1 +0,0 @@ -docker compose down diff --git a/column-difference/ksql/code/tutorial-steps/dev/console-producer.sh b/column-difference/ksql/code/tutorial-steps/dev/console-producer.sh deleted file mode 100644 index 901b0c6e..00000000 --- a/column-difference/ksql/code/tutorial-steps/dev/console-producer.sh +++ /dev/null @@ -1 +0,0 @@ -docker exec -i broker /usr/bin/kafka-console-producer --bootstrap-server broker:9092 --topic customer_purchases diff --git a/column-difference/ksql/code/tutorial-steps/dev/continuous-reporting.sql b/column-difference/ksql/code/tutorial-steps/dev/continuous-reporting.sql deleted file mode 100644 index ec3996f8..00000000 --- a/column-difference/ksql/code/tutorial-steps/dev/continuous-reporting.sql +++ /dev/null @@ -1,5 +0,0 @@ -CREATE STREAM PURCHASE_HISTORY_STREAM AS - SELECT FIRST_NAME, - LAST_NAME, - CURRENT_PURCHASE - PREVIOUS_PURCHASE as PURCHASE_DIFF -FROM PURCHASE_STREAM; \ No newline at end of file diff --git a/column-difference/ksql/code/tutorial-steps/dev/create-activity-stream.sql b/column-difference/ksql/code/tutorial-steps/dev/create-activity-stream.sql deleted file mode 100644 index 3471bf25..00000000 --- a/column-difference/ksql/code/tutorial-steps/dev/create-activity-stream.sql +++ /dev/null @@ -1,12 +0,0 @@ -CREATE STREAM PURCHASE_STREAM ( - ID VARCHAR, - PREVIOUS_PURCHASE DOUBLE, - CURRENT_PURCHASE DOUBLE, - TXN_TS VARCHAR, - FIRST_NAME VARCHAR, - LAST_NAME VARCHAR) - - WITH (KAFKA_TOPIC='customer_purchases', - VALUE_FORMAT='JSON', - PARTITIONS=1); - \ No newline at end of file diff --git a/column-difference/ksql/code/tutorial-steps/dev/docker-compose-up.sh b/column-difference/ksql/code/tutorial-steps/dev/docker-compose-up.sh deleted file mode 100644 index e6fb3f19..00000000 --- a/column-difference/ksql/code/tutorial-steps/dev/docker-compose-up.sh +++ /dev/null @@ -1 +0,0 @@ -docker compose up -d diff --git a/column-difference/ksql/code/tutorial-steps/dev/init.sh b/column-difference/ksql/code/tutorial-steps/dev/init.sh deleted file mode 100644 index f5b9c83e..00000000 --- a/column-difference/ksql/code/tutorial-steps/dev/init.sh +++ /dev/null @@ -1 +0,0 @@ -mkdir column-difference && cd column-difference diff --git a/column-difference/ksql/code/tutorial-steps/dev/input-events.json b/column-difference/ksql/code/tutorial-steps/dev/input-events.json deleted file mode 100644 index 2f680d84..00000000 --- a/column-difference/ksql/code/tutorial-steps/dev/input-events.json +++ /dev/null @@ -1,4 +0,0 @@ -{"id": "1", "previous_purchase": 8000.54, "current_purchase": 5004.89,"txn_ts": "2020-12-04 02:35:43", "first_name": "Art","last_name": "Vandeley"} -{"id": "2", "previous_purchase": 500.33, "current_purchase": 1000.89,"txn_ts": "2020-12-04 02:35:44", "first_name": "Nick","last_name": "Fury"} -{"id": "3", "previous_purchase": 333.18, "current_purchase": 804.89,"txn_ts": "2020-12-04 02:35:45", "first_name": "Natasha","last_name": "Romanov"} -{"id": "4", "previous_purchase": 72848.11, "current_purchase": 60040.89,"txn_ts": "2020-12-04 02:35:46", "first_name": "Wanda","last_name": "Maximoff"} diff --git a/column-difference/ksql/code/tutorial-steps/dev/make-dirs.sh b/column-difference/ksql/code/tutorial-steps/dev/make-dirs.sh deleted file mode 100644 index 6cd5156d..00000000 --- a/column-difference/ksql/code/tutorial-steps/dev/make-dirs.sh +++ /dev/null @@ -1 +0,0 @@ -mkdir src test diff --git a/column-difference/ksql/code/tutorial-steps/dev/query-with-column-difference.sql b/column-difference/ksql/code/tutorial-steps/dev/query-with-column-difference.sql deleted file mode 100644 index 8c81134f..00000000 --- a/column-difference/ksql/code/tutorial-steps/dev/query-with-column-difference.sql +++ /dev/null @@ -1,3 +0,0 @@ -SELECT FIRST_NAME, - LAST_NAME, - CURRENT_PURCHASE - PREVIOUS_PURCHASE as PURCHASE_DIFF \ No newline at end of file diff --git a/column-difference/ksql/code/tutorial-steps/dev/set-column-width.sql b/column-difference/ksql/code/tutorial-steps/dev/set-column-width.sql deleted file mode 100644 index 76f6aea6..00000000 --- a/column-difference/ksql/code/tutorial-steps/dev/set-column-width.sql +++ /dev/null @@ -1 +0,0 @@ -SET CLI COLUMN-WIDTH 20 diff --git a/column-difference/ksql/code/tutorial-steps/dev/set-properties.sql b/column-difference/ksql/code/tutorial-steps/dev/set-properties.sql deleted file mode 100644 index cbbce015..00000000 --- a/column-difference/ksql/code/tutorial-steps/dev/set-properties.sql +++ /dev/null @@ -1 +0,0 @@ -SET 'auto.offset.reset' = 'earliest'; diff --git a/column-difference/ksql/code/tutorial-steps/dev/start-cli.sh b/column-difference/ksql/code/tutorial-steps/dev/start-cli.sh deleted file mode 100644 index b2e4c397..00000000 --- a/column-difference/ksql/code/tutorial-steps/dev/start-cli.sh +++ /dev/null @@ -1 +0,0 @@ -docker exec -it ksqldb-cli ksql http://ksqldb-server:8088 diff --git a/column-difference/ksql/code/tutorial-steps/dev/transient-reporting.sql b/column-difference/ksql/code/tutorial-steps/dev/transient-reporting.sql deleted file mode 100644 index d4c7a4dd..00000000 --- a/column-difference/ksql/code/tutorial-steps/dev/transient-reporting.sql +++ /dev/null @@ -1,6 +0,0 @@ - SELECT FIRST_NAME, - LAST_NAME, - CURRENT_PURCHASE - PREVIOUS_PURCHASE as PURCHASE_DIFF -FROM PURCHASE_STREAM -EMIT CHANGES -LIMIT 4; diff --git a/column-difference/ksql/code/tutorial-steps/dev/wait-for-containers.sh b/column-difference/ksql/code/tutorial-steps/dev/wait-for-containers.sh deleted file mode 100644 index 4b450eba..00000000 --- a/column-difference/ksql/code/tutorial-steps/dev/wait-for-containers.sh +++ /dev/null @@ -1,3 +0,0 @@ -while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8088/) -eq 000 ] ; do sleep 5 ; done; -# Back off for ksqlDB server to get out of the initialization phase. -sleep 5 diff --git a/column-difference/ksql/code/tutorial-steps/test/run-tests.sh b/column-difference/ksql/code/tutorial-steps/test/run-tests.sh deleted file mode 100644 index 19cbea0d..00000000 --- a/column-difference/ksql/code/tutorial-steps/test/run-tests.sh +++ /dev/null @@ -1 +0,0 @@ -docker exec ksqldb-cli ksql-test-runner -i /opt/app/test/input.json -s /opt/app/src/statements.sql -o /opt/app/test/output.json diff --git a/column-difference/ksql/img/column-diff.png b/column-difference/ksql/img/column-diff.png new file mode 100644 index 00000000..ffc7def0 Binary files /dev/null and b/column-difference/ksql/img/column-diff.png differ diff --git a/column-difference/ksql/markup/answer/column-difference-answer.adoc b/column-difference/ksql/markup/answer/column-difference-answer.adoc deleted file mode 100644 index f42e0400..00000000 --- a/column-difference/ksql/markup/answer/column-difference-answer.adoc +++ /dev/null @@ -1,7 +0,0 @@ -Take the fields you want to calculate the difference between and use the `-` operator between them `field_1 - field_2` - -+++++ -
{% include_raw tutorials/column-difference/ksql/code/tutorial-steps/dev/query-with-column-difference.sql %}
-+++++ - -Note that the `-` operator expects numerical values. So if have columns where the numbers are stored as `VARCHAR` you'll have to use a `CAST` operation to convert them to a numerical type, otherwise you'll get an error in your query. diff --git a/column-difference/ksql/markup/dev/create-financial-transaction-stream.adoc b/column-difference/ksql/markup/dev/create-financial-transaction-stream.adoc deleted file mode 100644 index 770b4ec0..00000000 --- a/column-difference/ksql/markup/dev/create-financial-transaction-stream.adoc +++ /dev/null @@ -1,8 +0,0 @@ -The first thing we do is to create a stream named `PURCHASE_STREAM`. This statement creates the `customer_purchases` topic, since it doesn't already exist. For more details check out the https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/create-stream/#create-strea[ksqlDB documentation on the CREATE STREAM] statement. The data contained in the topic is just plain, schemaless JSON. - -+++++ -
{% include_raw tutorials/column-difference/ksql/code/tutorial-steps/dev/create-activity-stream.sql %}
-+++++ - - -Go ahead and create the stream now by pasting this statement into the ksqlDB window you opened at the beginning of this step. After you've created the stream, quit the ksqlDB CLI for now by typing `exit`. diff --git a/column-difference/ksql/markup/dev/init.adoc b/column-difference/ksql/markup/dev/init.adoc deleted file mode 100644 index dbd821b2..00000000 --- a/column-difference/ksql/markup/dev/init.adoc +++ /dev/null @@ -1,5 +0,0 @@ -To get started, make a new directory anywhere you'd like for this project: - -+++++ -
{% include_raw tutorials/column-difference/ksql/code/tutorial-steps/dev/init.sh %}
-+++++ \ No newline at end of file diff --git a/column-difference/ksql/markup/dev/make-dirs.adoc b/column-difference/ksql/markup/dev/make-dirs.adoc deleted file mode 100644 index e4a3ab72..00000000 --- a/column-difference/ksql/markup/dev/make-dirs.adoc +++ /dev/null @@ -1,5 +0,0 @@ -Then make the following directories to set up its structure: - -+++++ -
{% include_raw tutorials/column-difference/ksql/code/tutorial-steps/dev/make-dirs.sh %}
-+++++ diff --git a/column-difference/ksql/markup/dev/make-docker-compose.adoc b/column-difference/ksql/markup/dev/make-docker-compose.adoc deleted file mode 100644 index 190f96ba..00000000 --- a/column-difference/ksql/markup/dev/make-docker-compose.adoc +++ /dev/null @@ -1,5 +0,0 @@ -Next, create the following `docker-compose.yml` file to obtain Confluent Platform (for Kafka in the cloud, see https://www.confluent.io/confluent-cloud/tryfree/[Confluent Cloud]): - -+++++ -
{% include_raw tutorials/column-difference/ksql/code/docker-compose.yml %}
-+++++ diff --git a/column-difference/ksql/markup/dev/make-src-file.adoc b/column-difference/ksql/markup/dev/make-src-file.adoc deleted file mode 100644 index 2aa07617..00000000 --- a/column-difference/ksql/markup/dev/make-src-file.adoc +++ /dev/null @@ -1,5 +0,0 @@ -Now that you have a series of statements that's doing the right thing, the last step is to put them into a file so that they can be used outside the CLI session. Create a file at `src/statements.sql` with the following content: - -+++++ -
{% include_raw tutorials/column-difference/ksql/code/src/statements.sql %}
-+++++ diff --git a/column-difference/ksql/markup/dev/run-producer.adoc b/column-difference/ksql/markup/dev/run-producer.adoc deleted file mode 100644 index 7e22c754..00000000 --- a/column-difference/ksql/markup/dev/run-producer.adoc +++ /dev/null @@ -1,14 +0,0 @@ -Now let's produce some records for the `PURCHASE_STREAM` stream - -+++++ -
{% include_raw tutorials/column-difference/ksql/code/tutorial-steps/dev/console-producer.sh %}
-+++++ - -After starting the console producer it will wait for your input. -To send all send all the stock transactions click on the clipboard icon on the right, then paste the following into the terminal and press enter: - -+++++ -
{% include_raw tutorials/column-difference/ksql/code/tutorial-steps/dev/input-events.json %}
-+++++ - -After you've sent the records above, you can close the console producer with `Ctrl-C`. diff --git a/column-difference/ksql/markup/dev/set-properties.adoc b/column-difference/ksql/markup/dev/set-properties.adoc deleted file mode 100644 index 51d58497..00000000 --- a/column-difference/ksql/markup/dev/set-properties.adoc +++ /dev/null @@ -1,14 +0,0 @@ -Set ksqlDB to process data from the beginning of each Kafka topic. - -+++++ -
{% include_raw tutorials/column-difference/ksql/code/tutorial-steps/dev/set-properties.sql %}
-+++++ - -Then let's adjust the column width so we can easily see the results of the query - -+++++ -
{% include_raw tutorials/column-difference/ksql/code/tutorial-steps/dev/set-column-width.sql %}
-+++++ - - - diff --git a/column-difference/ksql/markup/dev/start-cli.adoc b/column-difference/ksql/markup/dev/start-cli.adoc deleted file mode 100644 index e0129125..00000000 --- a/column-difference/ksql/markup/dev/start-cli.adoc +++ /dev/null @@ -1,5 +0,0 @@ -To begin developing interactively, open up the ksqlDB CLI: - -+++++ -
{% include_raw tutorials/column-difference/ksql/code/tutorial-steps/dev/start-cli.sh %}
-+++++ diff --git a/column-difference/ksql/markup/dev/start-compose.adoc b/column-difference/ksql/markup/dev/start-compose.adoc deleted file mode 100644 index ff0aeed7..00000000 --- a/column-difference/ksql/markup/dev/start-compose.adoc +++ /dev/null @@ -1,5 +0,0 @@ -And launch it by running: - -+++++ -
{% include_raw tutorials/column-difference/ksql/code/tutorial-steps/dev/docker-compose-up.sh %}
-+++++ \ No newline at end of file diff --git a/column-difference/ksql/markup/dev/transient-reporting.adoc b/column-difference/ksql/markup/dev/transient-reporting.adoc deleted file mode 100644 index 35c3a6db..00000000 --- a/column-difference/ksql/markup/dev/transient-reporting.adoc +++ /dev/null @@ -1,33 +0,0 @@ -Now we write a query to concatenate multiple columns. To achieve this, we will use the `-` operator to calculate the difference between two columns. - -[source, sql] ----- - -SELECT FIRST_NAME, - LAST_NAME, - CURRENT_PURCHASE - PREVIOUS_PURCHASE as PURCHASE_DIFF <1> -FROM PURCHASE_STREAM -EMIT CHANGES -LIMIT 4; - - ----- - -<1> Using the `-` operator to calculate the difference between two columns. - -NOTE: The `-` operator expects numerical values. So if have columns where the numbers are stored as `VARCHAR` you'll have to use a `CAST` operation to convert them to a numerical type, otherwise you'll get an error in your query. - - -This query should produce the following output: - -+++++ -
{% include_raw tutorials/column-difference/ksql/code/tutorial-steps/dev/expected-transient-reporting.log %}
-+++++ - -Now that the reporting query works, let's update it to create a continuous query for your reporting scenario: - -+++++ -
{% include_raw tutorials/column-difference/ksql/code/tutorial-steps/dev/continuous-reporting.sql %}
-+++++ - -We're done with the ksqlDB CLI for now so go ahead and type `exit` to quit. diff --git a/column-difference/ksql/markup/test/make-test-input.adoc b/column-difference/ksql/markup/test/make-test-input.adoc deleted file mode 100644 index 7706c96f..00000000 --- a/column-difference/ksql/markup/test/make-test-input.adoc +++ /dev/null @@ -1,5 +0,0 @@ -Create a file at `test/input.json` with the inputs for testing: - -+++++ -
{% include_raw tutorials/column-difference/ksql/code/test/input.json %}
-+++++ diff --git a/column-difference/ksql/markup/test/make-test-output.adoc b/column-difference/ksql/markup/test/make-test-output.adoc deleted file mode 100644 index 2b501cc3..00000000 --- a/column-difference/ksql/markup/test/make-test-output.adoc +++ /dev/null @@ -1,5 +0,0 @@ -Create a file at `test/output.json` with the expected outputs: - -+++++ -
{% include_raw tutorials/column-difference/ksql/code/test/output.json %}
-+++++ diff --git a/column-difference/ksql/markup/test/run-tests.adoc b/column-difference/ksql/markup/test/run-tests.adoc deleted file mode 100644 index 072eb0c0..00000000 --- a/column-difference/ksql/markup/test/run-tests.adoc +++ /dev/null @@ -1,11 +0,0 @@ -Invoke the tests using the ksqlDB test runner and the statements file that you created earlier: - -+++++ -
{% include_raw tutorials/column-difference/ksql/code/tutorial-steps/test/run-tests.sh %}
-+++++ - -Which should pass: - -+++++ -
{% include_raw tutorials/column-difference/ksql/code/tutorial-steps/test/expected-results.log %}
-+++++ diff --git a/concatenation/ksql/README.md b/concatenation/ksql/README.md new file mode 100644 index 00000000..5a0ae525 --- /dev/null +++ b/concatenation/ksql/README.md @@ -0,0 +1,199 @@ +# Concatenation + +In this tutorial, we'll show how to use the concatenation operator to create a single value from multiple columns. + + +## Setup + +The first thing we do is create a stream named `ACTIVITY_STREAM` which simulates stock purchases and serves as our example of concatenating two columns together. + +```sql +CREATE STREAM ACTIVITY_STREAM ( + ID VARCHAR, + NUM_SHARES INT, + AMOUNT DOUBLE, + TXN_TS VARCHAR, + FIRST_NAME VARCHAR, + LAST_NAME VARCHAR, + SYMBOL VARCHAR ) + + WITH (KAFKA_TOPIC='stock_purchases', + VALUE_FORMAT='JSON', + PARTITIONS=1); +``` +## Concatenating columns + +Now let's create a stream that concatenates several columns to create a summary of activity. + +```sql +CREATE STREAM SUMMARY_RESULTS AS + SELECT FIRST_NAME + ' ' + LAST_NAME + + ' purchased ' + + CAST(NUM_SHARES AS VARCHAR) + + ' shares of ' + + SYMBOL AS SUMMARY +FROM ACTIVITY_STREAM; +``` + +## Running the example + +
+ ksqlDB CLI + +#### Prerequisites + +* Docker running via [Docker Desktop](https://docs.docker.com/desktop/) or [Docker Engine](https://docs.docker.com/engine/install/) +* [Docker Compose](https://docs.docker.com/compose/install/). Ensure that the command `docker compose version` succeeds. + +#### Run the commands + +First, start ksqlDB and Kafka: + + ```shell + docker compose -f ./docker/docker-compose-ksqldb.yml up -d + ``` +Next, open the ksqlDB CLI: + + ```shell + docker exec -it ksqldb-cli ksql http://ksqldb-server:8088 + ``` + +Finally, run following SQL statements to create the `ACTIVITY_STREAM` stream backed by Kafka running in Docker, populate it with +test data, and run the query that concatenates several columns. + +```sql + CREATE STREAM ACTIVITY_STREAM ( + ID VARCHAR, + NUM_SHARES INT, + AMOUNT DOUBLE, + TXN_TS VARCHAR, + FIRST_NAME VARCHAR, + LAST_NAME VARCHAR, + SYMBOL VARCHAR ) + + WITH (KAFKA_TOPIC='stock_purchases', + VALUE_FORMAT='JSON', + PARTITIONS=1); +``` + +Before we get too far, let’s set the `auto.offset.reset` configuration parameter to earliest. This means all new ksqlDB queries will +automatically compute their results from the beginning of a stream, rather than the end. This isn’t always what you’ll want to do in +production, but it makes query results much easier to see in examples like this. + +`SET 'auto.offset.reset' = 'earliest';` + +And let's adjust the column width, so we can easily see the results: + +`SET CLI COLUMN-WIDTH 50` + +```sql +INSERT INTO ACTIVITY_STREAM (id, num_shares, amount, txn_ts, first_name, last_name, symbol) +VALUES ('1', 30000, 5004.89, '2020-12-04 02:35:43', 'Art', 'Vandeley', 'IMEP'); + +INSERT INTO ACTIVITY_STREAM (id, num_shares, amount, txn_ts, first_name, last_name, symbol) +VALUES ('2', 500, 1000.89, '2020-12-04 02:35:44', 'Nick', 'Fury', 'IMEP'); + +INSERT INTO ACTIVITY_STREAM (id, num_shares, amount, txn_ts, first_name, last_name, symbol) +VALUES ('3', 45729, 804.89, '2020-12-04 02:35:45', 'Natasha', 'Romanov', 'STRK'); + +INSERT INTO ACTIVITY_STREAM (id, num_shares, amount, txn_ts, first_name, last_name, symbol) +VALUES ('4', 72848, 60040.89, '2020-12-04 02:35:46', 'Wanda', 'Maximoff', 'STRK'); +``` + +```sql +CREATE STREAM SUMMARY_RESULTS AS +SELECT FIRST_NAME + ' ' + LAST_NAME + + ' purchased ' + + CAST(NUM_SHARES AS VARCHAR) + + ' shares of ' + + SYMBOL AS SUMMARY +FROM ACTIVITY_STREAM; +``` + +```sql + SELECT * from SUMMARY_RESULTS; +``` + +The query output should look something like this: + +```plaintext ++--------------------------------------------------+ +|SUMMARY | ++--------------------------------------------------+ +|Art Vandeley purchased 30000 shares of IMEP | +|Nick Fury purchased 500 shares of IMEP | +|Natasha Romanov purchased 45729 shares of STRK | +|Wanda Maximoff purchased 72848 shares of STRK | +``` + +When you are finished, clean up the containers used for this tutorial by running: + + ```shell + docker compose -f ./docker/docker-compose-ksqldb.yml down -v + ``` +
+
+ Confluent Cloud + +#### Prerequisites + +* A [Confluent Cloud](https://confluent.cloud/signup) account +* A ksqlDB cluster created in Confluent Cloud. Follow [this quick start](https://docs.confluent.io/cloud/current/get-started/index.html#section-2-add-ksql-cloud-to-the-cluster) to create one. + +#### Run the commands + +In the Confluent Cloud Console, navigate to your environment and then click the `ksqlDB` link from left-side menu. Then click on the +name of ksqlDB cluster you created. + +Finally, run following SQL statements in the ksqlDB UI `Editor` tab to create the `ACTIVITY_STREAM` stream, populate it with +test data, and run the column difference query. + +```sql +CREATE STREAM ACTIVITY_STREAM ( + ID VARCHAR, + NUM_SHARES INT, + AMOUNT DOUBLE, + TXN_TS VARCHAR, + FIRST_NAME VARCHAR, + LAST_NAME VARCHAR, + SYMBOL VARCHAR ) + + WITH (KAFKA_TOPIC='stock_purchases', + VALUE_FORMAT='JSON', + PARTITIONS=1); +``` + + +```sql +INSERT INTO ACTIVITY_STREAM (id, num_shares, amount, txn_ts, first_name, last_name, symbol) +VALUES ('1', 30000, 5004.89, '2020-12-04 02:35:43', 'Art', 'Vandeley', 'IMEP'); + +INSERT INTO ACTIVITY_STREAM (id, num_shares, amount, txn_ts, first_name, last_name, symbol) +VALUES ('2', 500, 1000.89, '2020-12-04 02:35:44', 'Nick', 'Fury', 'IMEP'); + +INSERT INTO ACTIVITY_STREAM (id, num_shares, amount, txn_ts, first_name, last_name, symbol) +VALUES ('3', 45729, 804.89, '2020-12-04 02:35:45', 'Natasha', 'Romanov', 'STRK'); + +INSERT INTO ACTIVITY_STREAM (id, num_shares, amount, txn_ts, first_name, last_name, symbol) +VALUES ('4', 72848, 60040.89, '2020-12-04 02:35:46', 'Wanda', 'Maximoff', 'STRK'); +``` + +```sql +CREATE STREAM SUMMARY_RESULTS AS +SELECT FIRST_NAME + ' ' + LAST_NAME + + ' purchased ' + + CAST(NUM_SHARES AS VARCHAR) + + ' shares of ' + + SYMBOL AS SUMMARY +FROM ACTIVITY_STREAM; +``` + +```sql + SELECT * from SUMMARY_RESULTS; +``` + +The query output should look like this: + +![column concatenation](img/column-concatenation.png) + +
diff --git a/concatenation/ksql/code/.gitignore b/concatenation/ksql/code/.gitignore deleted file mode 100644 index 18caaf96..00000000 --- a/concatenation/ksql/code/.gitignore +++ /dev/null @@ -1,4 +0,0 @@ -extensions/ -tutorial-steps/dev/outputs/ -tutorial-steps/test/outputs/ -tutorial-steps/prod/outputs/ diff --git a/concatenation/ksql/code/Makefile b/concatenation/ksql/code/Makefile deleted file mode 100644 index 122bf4b4..00000000 --- a/concatenation/ksql/code/Makefile +++ /dev/null @@ -1,16 +0,0 @@ -STEPS_DIR := tutorial-steps -DEV_OUTPUTS_DIR := $(STEPS_DIR)/dev/outputs -TEST_OUTPUTS_DIR := $(STEPS_DIR)/test/outputs -PROD_OUTPUTS_DIR := $(STEPS_DIR)/prod/outputs -TEMP_DIR := $(shell mktemp -d) -SEQUENCE := "dev, test, prod, ccloud" - -tutorial: - rm -r $(DEV_OUTPUTS_DIR) || true - rm -r $(TEST_OUTPUTS_DIR) || true - mkdir $(DEV_OUTPUTS_DIR) - mkdir -p $(TEST_OUTPUTS_DIR) - harness-runner ../../../../../_data/harnesses/concatenation/ksql.yml $(TEMP_DIR) $(SEQUENCE) - diff --strip-trailing-cr $(STEPS_DIR)/dev/expected-transient-reporting.log $(DEV_OUTPUTS_DIR)/transient-reporting/output-0.log - diff --strip-trailing-cr $(STEPS_DIR)/test/expected-results.log $(TEST_OUTPUTS_DIR)/test-results.log - reset diff --git a/concatenation/ksql/code/docker-compose.yml b/concatenation/ksql/code/docker-compose.yml deleted file mode 100644 index eb6aca0d..00000000 --- a/concatenation/ksql/code/docker-compose.yml +++ /dev/null @@ -1,66 +0,0 @@ -version: '2' -services: - broker: - image: confluentinc/cp-kafka:7.4.1 - hostname: broker - container_name: broker - ports: - - 29092:29092 - environment: - KAFKA_BROKER_ID: 1 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092 - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_PROCESS_ROLES: broker,controller - KAFKA_NODE_ID: 1 - KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093 - KAFKA_LISTENERS: PLAINTEXT://broker:9092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:29092 - KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT - KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER - KAFKA_LOG_DIRS: /tmp/kraft-combined-logs - CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk - schema-registry: - image: confluentinc/cp-schema-registry:7.3.0 - hostname: schema-registry - container_name: schema-registry - depends_on: - - broker - ports: - - 8081:8081 - environment: - SCHEMA_REGISTRY_HOST_NAME: schema-registry - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: broker:9092 - ksqldb-server: - image: confluentinc/ksqldb-server:0.28.2 - hostname: ksqldb-server - container_name: ksqldb-server - depends_on: - - broker - - schema-registry - ports: - - 8088:8088 - environment: - KSQL_CONFIG_DIR: /etc/ksqldb - KSQL_LOG4J_OPTS: -Dlog4j.configuration=file:/etc/ksqldb/log4j.properties - KSQL_BOOTSTRAP_SERVERS: broker:9092 - KSQL_HOST_NAME: ksqldb-server - KSQL_LISTENERS: http://0.0.0.0:8088 - KSQL_CACHE_MAX_BYTES_BUFFERING: 0 - KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081 - KSQL_KSQL_STREAMS_AUTO_OFFSET_RESET: earliest - ksqldb-cli: - image: confluentinc/ksqldb-cli:0.28.2 - container_name: ksqldb-cli - depends_on: - - broker - - ksqldb-server - entrypoint: /bin/sh - tty: true - environment: - KSQL_CONFIG_DIR: /etc/ksqldb - volumes: - - ./src:/opt/app/src - - ./test:/opt/app/test diff --git a/concatenation/ksql/code/src/statements.sql b/concatenation/ksql/code/src/statements.sql deleted file mode 100644 index b2222e45..00000000 --- a/concatenation/ksql/code/src/statements.sql +++ /dev/null @@ -1,21 +0,0 @@ -CREATE STREAM ACTIVITY_STREAM ( - ID VARCHAR, - NUM_SHARES INT, - AMOUNT DOUBLE, - TXN_TS VARCHAR, - FIRST_NAME VARCHAR, - LAST_NAME VARCHAR, - SYMBOL VARCHAR ) - - WITH (KAFKA_TOPIC='stock_purchases', - VALUE_FORMAT='JSON', - PARTITIONS=1); - - -CREATE STREAM SUMMARY_RESULTS AS - SELECT FIRST_NAME + ' ' + LAST_NAME + - ' purchased ' + - CAST(NUM_SHARES AS VARCHAR) + - ' shares of ' + - SYMBOL AS SUMMARY -FROM ACTIVITY_STREAM; \ No newline at end of file diff --git a/concatenation/ksql/code/test/input.json b/concatenation/ksql/code/test/input.json deleted file mode 100644 index 0bf1878d..00000000 --- a/concatenation/ksql/code/test/input.json +++ /dev/null @@ -1,52 +0,0 @@ -{ - "inputs": [ - { - "topic": "stock_purchases", - "value": { - "id": "1", - "num_shares": 30000, - "amount": 5004.89, - "txn_ts": "2020-12-04 02:35:43", - "first_name": "Art", - "last_name": "Vandeley", - "symbol": "IMEP" - } - }, - { - "topic": "stock_purchases", - "value": { - "id": "2", - "num_shares": 500, - "amount": 1000.89, - "txn_ts": "2020-12-04 02:35:44", - "first_name": "Nick", - "last_name": "Fury", - "symbol": "IMEP" - } - }, - { - "topic": "stock_purchases", - "value": { - "id": "3", - "num_shares": 45729, - "amount": 804.89, - "txn_ts": "2020-12-04 02:35:45", - "first_name": "Natasha", - "last_name": "Romanov", - "symbol": "STRK" - } - }, - { - "topic": "stock_purchases", - "value": { - "id": "4", - "num_shares": 72848, - "amount": 60040.89, - "txn_ts": "2020-12-04 02:35:46", - "first_name": "Wanda", - "last_name": "Maximoff", - "symbol": "STRK" - } - } - ] -} \ No newline at end of file diff --git a/concatenation/ksql/code/test/output.json b/concatenation/ksql/code/test/output.json deleted file mode 100644 index fd021160..00000000 --- a/concatenation/ksql/code/test/output.json +++ /dev/null @@ -1,28 +0,0 @@ -{ - "outputs": [ - { - "topic": "SUMMARY_RESULTS", - "value": { - "SUMMARY" : "Art Vandeley purchased 30000 shares of IMEP" - } - }, - { - "topic": "SUMMARY_RESULTS", - "value": { - "SUMMARY" : "Nick Fury purchased 500 shares of IMEP" - } - }, - { - "topic": "SUMMARY_RESULTS", - "value": { - "SUMMARY" : "Natasha Romanov purchased 45729 shares of STRK" - } - }, - { - "topic": "SUMMARY_RESULTS", - "value": { - "SUMMARY" : "Wanda Maximoff purchased 72848 shares of STRK" - } - } - ] -} diff --git a/concatenation/ksql/code/tutorial-steps/dev/clean-up.sh b/concatenation/ksql/code/tutorial-steps/dev/clean-up.sh deleted file mode 100644 index 36f5aa98..00000000 --- a/concatenation/ksql/code/tutorial-steps/dev/clean-up.sh +++ /dev/null @@ -1 +0,0 @@ -docker compose down diff --git a/concatenation/ksql/code/tutorial-steps/dev/console-producer.sh b/concatenation/ksql/code/tutorial-steps/dev/console-producer.sh deleted file mode 100644 index 3e78209a..00000000 --- a/concatenation/ksql/code/tutorial-steps/dev/console-producer.sh +++ /dev/null @@ -1 +0,0 @@ -docker exec -i broker /usr/bin/kafka-console-producer --bootstrap-server broker:9092 --topic stock_purchases diff --git a/concatenation/ksql/code/tutorial-steps/dev/continuous-reporting.sql b/concatenation/ksql/code/tutorial-steps/dev/continuous-reporting.sql deleted file mode 100644 index 06c9cdff..00000000 --- a/concatenation/ksql/code/tutorial-steps/dev/continuous-reporting.sql +++ /dev/null @@ -1,7 +0,0 @@ -CREATE STREAM SUMMARY_RESULTS AS - SELECT FIRST_NAME + ' ' + LAST_NAME + - ' purchased ' + - CAST(NUM_SHARES AS VARCHAR) + - ' shares of ' + - SYMBOL AS SUMMARY -FROM ACTIVITY_STREAM; \ No newline at end of file diff --git a/concatenation/ksql/code/tutorial-steps/dev/create-activity-stream.sql b/concatenation/ksql/code/tutorial-steps/dev/create-activity-stream.sql deleted file mode 100644 index 74704cb9..00000000 --- a/concatenation/ksql/code/tutorial-steps/dev/create-activity-stream.sql +++ /dev/null @@ -1,13 +0,0 @@ -CREATE STREAM ACTIVITY_STREAM ( - ID VARCHAR, - NUM_SHARES INT, - AMOUNT DOUBLE, - TXN_TS VARCHAR, - FIRST_NAME VARCHAR, - LAST_NAME VARCHAR, - SYMBOL VARCHAR ) - - WITH (KAFKA_TOPIC='stock_purchases', - VALUE_FORMAT='JSON', - PARTITIONS=1); - \ No newline at end of file diff --git a/concatenation/ksql/code/tutorial-steps/dev/docker-compose-up.sh b/concatenation/ksql/code/tutorial-steps/dev/docker-compose-up.sh deleted file mode 100644 index e6fb3f19..00000000 --- a/concatenation/ksql/code/tutorial-steps/dev/docker-compose-up.sh +++ /dev/null @@ -1 +0,0 @@ -docker compose up -d diff --git a/concatenation/ksql/code/tutorial-steps/dev/init.sh b/concatenation/ksql/code/tutorial-steps/dev/init.sh deleted file mode 100644 index fdad3239..00000000 --- a/concatenation/ksql/code/tutorial-steps/dev/init.sh +++ /dev/null @@ -1 +0,0 @@ -mkdir concatenation && cd concatenation diff --git a/concatenation/ksql/code/tutorial-steps/dev/input-events.json b/concatenation/ksql/code/tutorial-steps/dev/input-events.json deleted file mode 100644 index 04c68718..00000000 --- a/concatenation/ksql/code/tutorial-steps/dev/input-events.json +++ /dev/null @@ -1,4 +0,0 @@ -{"id": "1", "num_shares": 30000, "amount": 5004.89,"txn_ts": "2020-12-04 02:35:43", "first_name": "Art","last_name": "Vandeley", "symbol": "IMEP"} -{"id": "2", "num_shares": 500, "amount": 1000.89,"txn_ts": "2020-12-04 02:35:44", "first_name": "Nick","last_name": "Fury", "symbol": "IMEP"} -{"id": "3", "num_shares": 45729, "amount": 804.89,"txn_ts": "2020-12-04 02:35:45", "first_name": "Natasha","last_name": "Romanov", "symbol": "STRK"} -{"id": "4", "num_shares": 72848, "amount": 60040.89,"txn_ts": "2020-12-04 02:35:46", "first_name": "Wanda","last_name": "Maximoff", "symbol": "STRK"} diff --git a/concatenation/ksql/code/tutorial-steps/dev/make-dirs.sh b/concatenation/ksql/code/tutorial-steps/dev/make-dirs.sh deleted file mode 100644 index 6cd5156d..00000000 --- a/concatenation/ksql/code/tutorial-steps/dev/make-dirs.sh +++ /dev/null @@ -1 +0,0 @@ -mkdir src test diff --git a/concatenation/ksql/code/tutorial-steps/dev/query-with-concatenation.sql b/concatenation/ksql/code/tutorial-steps/dev/query-with-concatenation.sql deleted file mode 100644 index 6092c82a..00000000 --- a/concatenation/ksql/code/tutorial-steps/dev/query-with-concatenation.sql +++ /dev/null @@ -1,5 +0,0 @@ -SELECT FIRST_NAME + ' ' + LAST_NAME + - ' purchased ' + - CAST(NUM_SHARES AS VARCHAR) + - ' shares of ' + - SYMBOL AS SUMMARY \ No newline at end of file diff --git a/concatenation/ksql/code/tutorial-steps/dev/set-column-width.sql b/concatenation/ksql/code/tutorial-steps/dev/set-column-width.sql deleted file mode 100644 index ab28f15d..00000000 --- a/concatenation/ksql/code/tutorial-steps/dev/set-column-width.sql +++ /dev/null @@ -1 +0,0 @@ -SET CLI COLUMN-WIDTH 50 diff --git a/concatenation/ksql/code/tutorial-steps/dev/set-properties.sql b/concatenation/ksql/code/tutorial-steps/dev/set-properties.sql deleted file mode 100644 index cbbce015..00000000 --- a/concatenation/ksql/code/tutorial-steps/dev/set-properties.sql +++ /dev/null @@ -1 +0,0 @@ -SET 'auto.offset.reset' = 'earliest'; diff --git a/concatenation/ksql/code/tutorial-steps/dev/start-cli.sh b/concatenation/ksql/code/tutorial-steps/dev/start-cli.sh deleted file mode 100644 index b2e4c397..00000000 --- a/concatenation/ksql/code/tutorial-steps/dev/start-cli.sh +++ /dev/null @@ -1 +0,0 @@ -docker exec -it ksqldb-cli ksql http://ksqldb-server:8088 diff --git a/concatenation/ksql/code/tutorial-steps/dev/transient-reporting.sql b/concatenation/ksql/code/tutorial-steps/dev/transient-reporting.sql deleted file mode 100644 index 5396812a..00000000 --- a/concatenation/ksql/code/tutorial-steps/dev/transient-reporting.sql +++ /dev/null @@ -1,8 +0,0 @@ -SELECT FIRST_NAME + ' ' + LAST_NAME + - ' purchased ' + - CAST(NUM_SHARES AS VARCHAR) + - ' shares of ' + - SYMBOL AS SUMMARY -FROM ACTIVITY_STREAM -EMIT CHANGES -LIMIT 4; diff --git a/concatenation/ksql/code/tutorial-steps/dev/wait-for-containers.sh b/concatenation/ksql/code/tutorial-steps/dev/wait-for-containers.sh deleted file mode 100644 index 4b450eba..00000000 --- a/concatenation/ksql/code/tutorial-steps/dev/wait-for-containers.sh +++ /dev/null @@ -1,3 +0,0 @@ -while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8088/) -eq 000 ] ; do sleep 5 ; done; -# Back off for ksqlDB server to get out of the initialization phase. -sleep 5 diff --git a/concatenation/ksql/code/tutorial-steps/test/run-tests.sh b/concatenation/ksql/code/tutorial-steps/test/run-tests.sh deleted file mode 100644 index 19cbea0d..00000000 --- a/concatenation/ksql/code/tutorial-steps/test/run-tests.sh +++ /dev/null @@ -1 +0,0 @@ -docker exec ksqldb-cli ksql-test-runner -i /opt/app/test/input.json -s /opt/app/src/statements.sql -o /opt/app/test/output.json diff --git a/concatenation/ksql/img/column-concatenation.png b/concatenation/ksql/img/column-concatenation.png new file mode 100644 index 00000000..da0bd7e6 Binary files /dev/null and b/concatenation/ksql/img/column-concatenation.png differ diff --git a/concatenation/ksql/markup/answer/concatenation-answer.adoc b/concatenation/ksql/markup/answer/concatenation-answer.adoc deleted file mode 100644 index 8f2add89..00000000 --- a/concatenation/ksql/markup/answer/concatenation-answer.adoc +++ /dev/null @@ -1,7 +0,0 @@ -Select the fields you want to combine and use the `+` operator to concatenate them into one field: - -+++++ -
{% include_raw tutorials/concatenation/ksql/code/tutorial-steps/dev/query-with-concatenation.sql %}
-+++++ - -Note that concatenation only works with `STRING` values, so you'll have to use a `CAST` operation on non-string fields as demonstrated above, otherwise your query will result in an error. diff --git a/concatenation/ksql/markup/dev/create-financial-transaction-stream.adoc b/concatenation/ksql/markup/dev/create-financial-transaction-stream.adoc deleted file mode 100644 index 9588d4df..00000000 --- a/concatenation/ksql/markup/dev/create-financial-transaction-stream.adoc +++ /dev/null @@ -1,8 +0,0 @@ -The first thing we do is to create a stream named `ACTIVITY_STREAM`. This statement creates the `stock_purchases` topic, since it doesn't already exist. For more details check out the https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/create-stream/#create-strea[ksqlDB documentation on the CREATE STREAM] statement. The data contained in the topic is just plain, schemaless JSON. - -+++++ -
{% include_raw tutorials/concatenation/ksql/code/tutorial-steps/dev/create-activity-stream.sql %}
-+++++ - - -Go ahead and create the stream now by pasting this statement into the ksqlDB window you opened at the beginning of this step. After you've created the stream, quit the ksqlDB CLI for now by typing `exit`. diff --git a/concatenation/ksql/markup/dev/init.adoc b/concatenation/ksql/markup/dev/init.adoc deleted file mode 100644 index 1392caf3..00000000 --- a/concatenation/ksql/markup/dev/init.adoc +++ /dev/null @@ -1,5 +0,0 @@ -To get started, make a new directory anywhere you'd like for this project: - -+++++ -
{% include_raw tutorials/concatenation/ksql/code/tutorial-steps/dev/init.sh %}
-+++++ \ No newline at end of file diff --git a/concatenation/ksql/markup/dev/make-dirs.adoc b/concatenation/ksql/markup/dev/make-dirs.adoc deleted file mode 100644 index 70b83616..00000000 --- a/concatenation/ksql/markup/dev/make-dirs.adoc +++ /dev/null @@ -1,5 +0,0 @@ -Then make the following directories to set up its structure: - -+++++ -
{% include_raw tutorials/concatenation/ksql/code/tutorial-steps/dev/make-dirs.sh %}
-+++++ diff --git a/concatenation/ksql/markup/dev/make-docker-compose.adoc b/concatenation/ksql/markup/dev/make-docker-compose.adoc deleted file mode 100644 index 76465246..00000000 --- a/concatenation/ksql/markup/dev/make-docker-compose.adoc +++ /dev/null @@ -1,5 +0,0 @@ -Next, create the following `docker-compose.yml` file to obtain Confluent Platform (for Kafka in the cloud, see https://www.confluent.io/confluent-cloud/tryfree/[Confluent Cloud]): - -+++++ -
{% include_raw tutorials/concatenation/ksql/code/docker-compose.yml %}
-+++++ diff --git a/concatenation/ksql/markup/dev/make-src-file.adoc b/concatenation/ksql/markup/dev/make-src-file.adoc deleted file mode 100644 index c5a5d492..00000000 --- a/concatenation/ksql/markup/dev/make-src-file.adoc +++ /dev/null @@ -1,5 +0,0 @@ -Now that you have a series of statements that's doing the right thing, the last step is to put them into a file so that they can be used outside the CLI session. Create a file at `src/statements.sql` with the following content: - -+++++ -
{% include_raw tutorials/concatenation/ksql/code/src/statements.sql %}
-+++++ diff --git a/concatenation/ksql/markup/dev/run-producer.adoc b/concatenation/ksql/markup/dev/run-producer.adoc deleted file mode 100644 index 441e6924..00000000 --- a/concatenation/ksql/markup/dev/run-producer.adoc +++ /dev/null @@ -1,14 +0,0 @@ -Now let's produce some records for the `ACTIVITY_STREAM` stream - -+++++ -
{% include_raw tutorials/concatenation/ksql/code/tutorial-steps/dev/console-producer.sh %}
-+++++ - -After starting the console producer it will wait for your input. -To send all send all the stock transactions click on the clipboard icon on the right, then paste the following into the terminal and press enter: - -+++++ -
{% include_raw tutorials/concatenation/ksql/code/tutorial-steps/dev/input-events.json %}
-+++++ - -After you've sent the records above, you can close the console producer with `Ctrl-C`. diff --git a/concatenation/ksql/markup/dev/set-properties.adoc b/concatenation/ksql/markup/dev/set-properties.adoc deleted file mode 100644 index eeecfee1..00000000 --- a/concatenation/ksql/markup/dev/set-properties.adoc +++ /dev/null @@ -1,14 +0,0 @@ -Set ksqlDB to process data from the beginning of each Kafka topic. - -+++++ -
{% include_raw tutorials/concatenation/ksql/code/tutorial-steps/dev/set-properties.sql %}
-+++++ - -Then let's adjust the column width so we can easily see the results of the query - -+++++ -
{% include_raw tutorials/concatenation/ksql/code/tutorial-steps/dev/set-column-width.sql %}
-+++++ - - - diff --git a/concatenation/ksql/markup/dev/start-cli.adoc b/concatenation/ksql/markup/dev/start-cli.adoc deleted file mode 100644 index dc274ce0..00000000 --- a/concatenation/ksql/markup/dev/start-cli.adoc +++ /dev/null @@ -1,5 +0,0 @@ -To begin developing interactively, open up the ksqlDB CLI: - -+++++ -
{% include_raw tutorials/concatenation/ksql/code/tutorial-steps/dev/start-cli.sh %}
-+++++ diff --git a/concatenation/ksql/markup/dev/start-compose.adoc b/concatenation/ksql/markup/dev/start-compose.adoc deleted file mode 100644 index b7766887..00000000 --- a/concatenation/ksql/markup/dev/start-compose.adoc +++ /dev/null @@ -1,5 +0,0 @@ -And launch it by running: - -+++++ -
{% include_raw tutorials/concatenation/ksql/code/tutorial-steps/dev/docker-compose-up.sh %}
-+++++ \ No newline at end of file diff --git a/concatenation/ksql/markup/dev/transient-reporting.adoc b/concatenation/ksql/markup/dev/transient-reporting.adoc deleted file mode 100644 index bcfd57de..00000000 --- a/concatenation/ksql/markup/dev/transient-reporting.adoc +++ /dev/null @@ -1,35 +0,0 @@ -Now we write a query to concatenate multiple columns. To achieve this, we will use the `+` operator between the fields in our `SELECT` statement rather than a comma. - -[source, sql] ----- - -SELECT FIRST_NAME + ' ' + LAST_NAME + - ' purchased ' + - CAST(NUM_SHARES AS VARCHAR) + <1> - ' shares of ' + - SYMBOL AS SUMMARY -FROM ACTIVITY_STREAM -EMIT CHANGES -LIMIT 4; - - ----- - -<1> The NUM_SHARES field is an `INT` so we need to cast it to a `VARCHAR` as `concatenate` only works with `STRING` types - -NOTE: You can also SELECT fields you don't want to concatenate. In that case you use a comma to separate the field from those you concatenate. For example, you can SELECT individual fields `field_1` and `field_2` at the same time that you concatenate `field_3` with `field_4`. For example -`SELECT field_1, field_2, field_3 + field_4` - -This query should produce the following output: - -+++++ -
{% include_raw tutorials/concatenation/ksql/code/tutorial-steps/dev/expected-transient-reporting.log %}
-+++++ - -Now that the reporting query works, let's update it to create a continuous query for your reporting scenario - -+++++ -
{% include_raw tutorials/concatenation/ksql/code/tutorial-steps/dev/continuous-reporting.sql %}
-+++++ - -We're done with the ksqlDB CLI for now so go ahead and type `exit` to quit. diff --git a/concatenation/ksql/markup/test/make-test-input.adoc b/concatenation/ksql/markup/test/make-test-input.adoc deleted file mode 100644 index 8324eaac..00000000 --- a/concatenation/ksql/markup/test/make-test-input.adoc +++ /dev/null @@ -1,5 +0,0 @@ -Create a file at `test/input.json` with the inputs for testing: - -+++++ -
{% include_raw tutorials/concatenation/ksql/code/test/input.json %}
-+++++ diff --git a/concatenation/ksql/markup/test/make-test-output.adoc b/concatenation/ksql/markup/test/make-test-output.adoc deleted file mode 100644 index 050577a4..00000000 --- a/concatenation/ksql/markup/test/make-test-output.adoc +++ /dev/null @@ -1,5 +0,0 @@ -Create a file at `test/output.json` with the expected outputs: - -+++++ -
{% include_raw tutorials/concatenation/ksql/code/test/output.json %}
-+++++ diff --git a/concatenation/ksql/markup/test/run-tests.adoc b/concatenation/ksql/markup/test/run-tests.adoc deleted file mode 100644 index ca3025a7..00000000 --- a/concatenation/ksql/markup/test/run-tests.adoc +++ /dev/null @@ -1,11 +0,0 @@ -Invoke the tests using the ksqlDB test runner and the statements file that you created earlier: - -+++++ -
{% include_raw tutorials/concatenation/ksql/code/tutorial-steps/test/run-tests.sh %}
-+++++ - -Which should pass: - -+++++ -
{% include_raw tutorials/concatenation/ksql/code/tutorial-steps/test/expected-results.log %}
-+++++ diff --git a/confluent-parallel-consumer-application/.gitignore b/confluent-parallel-consumer-application/.gitignore deleted file mode 100644 index 096b979e..00000000 --- a/confluent-parallel-consumer-application/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -kafka/code/tutorial-steps/dev/outputs/ -consumer-records.out diff --git a/confluent-parallel-consumer-application/confluent/code/configuration/dev.properties b/confluent-parallel-consumer-application/confluent/code/configuration/dev.properties deleted file mode 100644 index 33294831..00000000 --- a/confluent-parallel-consumer-application/confluent/code/configuration/dev.properties +++ /dev/null @@ -1,10 +0,0 @@ -# Consumer properties -key.deserializer=org.apache.kafka.common.serialization.StringDeserializer -value.deserializer=org.apache.kafka.common.serialization.StringDeserializer -max.poll.interval.ms=300000 -enable.auto.commit=false -auto.offset.reset=earliest - -# Application-specific properties -input.topic.name=parallel-consumer-input-topic -file.path=topic-output.txt diff --git a/confluent-parallel-consumer-application/confluent/markup/dev/ccloud-run-producer.adoc b/confluent-parallel-consumer-application/confluent/markup/dev/ccloud-run-producer.adoc deleted file mode 100644 index d98c9f04..00000000 --- a/confluent-parallel-consumer-application/confluent/markup/dev/ccloud-run-producer.adoc +++ /dev/null @@ -1,13 +0,0 @@ -Using a terminal window, run the following command to start a Confluent CLI producer: - -```plaintext -confluent kafka topic produce parallel-consumer-input-topic --parse-key -``` - -Each line represents input data for the Confluent Parallel Consumer application. To send all of the events below, paste the following into the prompt and press enter: - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/input.txt %}
-+++++ - -Enter `Ctrl-C` to exit. diff --git a/confluent-parallel-consumer-application/confluent/markup/dev/make-config-file.adoc b/confluent-parallel-consumer-application/confluent/markup/dev/make-config-file.adoc deleted file mode 100644 index e799a647..00000000 --- a/confluent-parallel-consumer-application/confluent/markup/dev/make-config-file.adoc +++ /dev/null @@ -1,5 +0,0 @@ -Then create a development configuration file at `configuration/dev.properties`: - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/confluent/code/configuration/dev.properties %}
-+++++ diff --git a/confluent-parallel-consumer-application/confluent/markup/dev/make-topic.adoc b/confluent-parallel-consumer-application/confluent/markup/dev/make-topic.adoc deleted file mode 100644 index edf03071..00000000 --- a/confluent-parallel-consumer-application/confluent/markup/dev/make-topic.adoc +++ /dev/null @@ -1,5 +0,0 @@ -In this step we’re going to create a topic for use during this tutorial. Use the following command to create the topic: - -```plaintext -confluent kafka topic create parallel-consumer-input-topic -``` \ No newline at end of file diff --git a/confluent-parallel-consumer-application/confluent/markup/perftest/ccloud-run-producer.adoc b/confluent-parallel-consumer-application/confluent/markup/perftest/ccloud-run-producer.adoc deleted file mode 100644 index e7fef46b..00000000 --- a/confluent-parallel-consumer-application/confluent/markup/perftest/ccloud-run-producer.adoc +++ /dev/null @@ -1,8 +0,0 @@ -Using a terminal window, run the following command to write 10,000 small dummy records to the input topic: - -``` -seq 1 10000 | confluent kafka topic produce perftest-parallel-consumer-input-topic -``` - -Let's kick off this command and let it run. It'll take a few minutes to produce all 10,000 records. -In the meantime, let's continue with the tutorial. diff --git a/confluent-parallel-consumer-application/confluent/markup/perftest/make-topic.adoc b/confluent-parallel-consumer-application/confluent/markup/perftest/make-topic.adoc deleted file mode 100644 index 5b4870ea..00000000 --- a/confluent-parallel-consumer-application/confluent/markup/perftest/make-topic.adoc +++ /dev/null @@ -1,5 +0,0 @@ -Use the following command to create a topic that we'll use for performance testing: - -```plaintext -confluent kafka topic create perftest-parallel-consumer-input-topic -``` \ No newline at end of file diff --git a/confluent-parallel-consumer-application/kafka/README.md b/confluent-parallel-consumer-application/kafka/README.md new file mode 100644 index 00000000..5a50698b --- /dev/null +++ b/confluent-parallel-consumer-application/kafka/README.md @@ -0,0 +1,39 @@ +# Confluent parallel consumer + +The Confluent Parallel Consumer is an open-source Apache 2.0-licensed Java library that enables you to consume from a Kafka topic with more parallelism than the number of partitions. In an Apache Kafka consumer group, the number of partitions is the parallelism limit. +Increasing the level of parallelism beyond the partition count is desirable in many situations. For example, when there are fixed partition counts for a reason beyond your control or if you need to make a high-latency call out to a database or microservice while consuming and want to increase throughput. + +In this tutorial, you'll build a small "hello world" application that uses the Confluent Parallel Consumer library. There are also some performance tests at a larger scale to compare the Confluent Parallel Consumer with a baseline built using a vanilla Apache Kafka consumer group you can explore on your own. + +## ParallelStreamProcessor + +For parallel record consuming, you'll use the [ParallelStreamProcessor](https://javadoc.io/doc/io.confluent.parallelconsumer/parallel-consumer-core/latest/io/confluent/parallelconsumer/ParallelStreamProcessor.html) which wraps a [KafkaConsumer](https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html). + +You create a new instance of a `KafkaConsumer`, create a `ParallelConsumerOptions` configuration object, then use the configuration to create a new `ParallelStreamProcessor` instance: + +```java +final Consumer consumer = new KafkaConsumer<>(appProperties); + final ParallelConsumerOptions options = ParallelConsumerOptions.builder() + .ordering(KEY) + .maxConcurrency(16) + .consumer(consumer) + .commitMode(PERIODIC_CONSUMER_SYNC) + .build(); + ParallelStreamProcessor eosStreamProcessor = createEosStreamProcessor(options); + + + eosStreamProcessor.poll(context -> recordHandler.processRecord(context.getSingleConsumerRecord())); +``` + +In this example, you're specifying ordering by key with a maximum concurrency of 16. You specify `PERIODIC_CONSUMER_SYNC` for the committing of offsets. The `PERIODIC_CONSUMER_SYNC` will block the Parallel Consumer’s processing loop until a successful commit response is received. Asynchronous is also supported, which optimizes for consumption throughput (the downside being higher risk of needing to process duplicate messages in error recovery scenarios). + +Then you start consuming records in parallel with the `ParallelStreamProcessor.poll` method which takes a `java.util.function.Consumer` instance to work with each +record it consumes. + +## Performance tests + +There are two performance tests `ParallelConsumerPerfTest` and the `MultithreadedKafkaConsumerPerfTest` you can run to +observe the power of parallel consumption first-hand. + + + diff --git a/confluent-parallel-consumer-application/kafka/build.gradle b/confluent-parallel-consumer-application/kafka/build.gradle new file mode 100644 index 00000000..2741d61c --- /dev/null +++ b/confluent-parallel-consumer-application/kafka/build.gradle @@ -0,0 +1,70 @@ +/* + * This file was generated by the Gradle 'init' task. + * + * This is a general purpose Gradle build. + * To learn more about Gradle by exploring our Samples at https://docs.gradle.org/8.5/samples + * This project uses @Incubating APIs which are subject to change. + */ + +buildscript { + repositories { + mavenCentral() + } +} + +plugins { + id "java" + id 'com.github.johnrengelman.shadow' version '8.1.1' +} + +java { + sourceCompatibility = JavaVersion.VERSION_17 + targetCompatibility = JavaVersion.VERSION_17 +} + +repositories { + mavenCentral() + maven { url 'https://packages.confluent.io/maven/' } +} + + +dependencies { + implementation project(':common') + implementation "org.slf4j:slf4j-simple:2.0.7" + implementation "io.confluent.parallelconsumer:parallel-consumer-core:0.5.2.4" + implementation "org.apache.commons:commons-lang3:3.12.0" + implementation "me.tongfei:progressbar:0.9.3" + implementation 'org.awaitility:awaitility:4.2.0' + + implementation "com.typesafe:config:1.4.2" + + testImplementation 'org.junit.jupiter:junit-jupiter-api:5.9.2' + testImplementation 'org.hamcrest:hamcrest:2.2' + testImplementation 'org.awaitility:awaitility:4.2.0' + testImplementation "io.confluent.parallelconsumer:parallel-consumer-core:0.5.2.4:tests" // for LongPollingMockConsumer + testRuntimeOnly 'org.junit.platform:junit-platform-launcher' + testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.9.2' +} + +test { + useJUnitPlatform() + testLogging { + outputs.upToDateWhen { false } + showStandardStreams = true + events "PASSED", "SKIPPED", "FAILED", "STANDARD_OUT", "STANDARD_ERROR" + exceptionFormat = "full" + } +} + +jar { + manifest { + attributes( + "Class-Path": configurations.runtimeClasspath.collect { it.getName() }.join(" ") + ) + } +} + +shadowJar { + archiveBaseName = "parallel-consumer-standalone" + archiveClassifier = '' +} diff --git a/confluent-parallel-consumer-application/kafka/code/Makefile b/confluent-parallel-consumer-application/kafka/code/Makefile deleted file mode 100644 index a2d23ff6..00000000 --- a/confluent-parallel-consumer-application/kafka/code/Makefile +++ /dev/null @@ -1,10 +0,0 @@ -STEPS_DIR := tutorial-steps -DEV_OUTPUTS_DIR := $(STEPS_DIR)/dev/outputs -TEMP_DIR := $(shell mktemp -d) - -tutorial: - rm -r $(DEV_OUTPUTS_DIR) || true - mkdir $(DEV_OUTPUTS_DIR) - harness-runner ../../../../../_data/harnesses/confluent-parallel-consumer-application/kafka.yml $(TEMP_DIR) - bash -c 'diff --strip-trailing-cr <(sort $(STEPS_DIR)/dev/expected-output.txt) <(sort $(DEV_OUTPUTS_DIR)/actual-output.txt)' - reset diff --git a/confluent-parallel-consumer-application/kafka/code/build.gradle b/confluent-parallel-consumer-application/kafka/code/build.gradle deleted file mode 100644 index cb840eb4..00000000 --- a/confluent-parallel-consumer-application/kafka/code/build.gradle +++ /dev/null @@ -1,54 +0,0 @@ -buildscript { - repositories { - mavenCentral() - } - dependencies { - classpath "com.github.jengelman.gradle.plugins:shadow:6.1.0" - } -} - -plugins { - id "java" -} - -sourceCompatibility = JavaVersion.VERSION_17 -targetCompatibility = JavaVersion.VERSION_17 -version = "0.0.1" - -repositories { - mavenCentral() -} - -apply plugin: "com.github.johnrengelman.shadow" - -dependencies { - implementation "io.confluent.parallelconsumer:parallel-consumer-core:0.5.2.4" - implementation "org.apache.commons:commons-lang3:3.12.0" - implementation "org.slf4j:slf4j-simple:2.0.0" - implementation "me.tongfei:progressbar:0.9.3" - implementation 'org.awaitility:awaitility:4.2.0' - - testImplementation "junit:junit:4.13.2" - testImplementation 'org.awaitility:awaitility:4.2.0' - testImplementation "io.confluent.parallelconsumer:parallel-consumer-core:0.5.2.4:tests" // for LongPollingMockConsumer -} - -test { - testLogging { - outputs.upToDateWhen { false } - showStandardStreams = true - exceptionFormat = "full" - } -} - -jar { - manifest { - attributes( - "Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "), - ) - } -} - -shadowJar { - archiveBaseName = "confluent-parallel-consumer-application-standalone" -} diff --git a/confluent-parallel-consumer-application/kafka/code/docker-compose.yml b/confluent-parallel-consumer-application/kafka/code/docker-compose.yml deleted file mode 100644 index 809eefdf..00000000 --- a/confluent-parallel-consumer-application/kafka/code/docker-compose.yml +++ /dev/null @@ -1,24 +0,0 @@ -version: '2' -services: - broker: - image: confluentinc/cp-kafka:7.4.1 - hostname: broker - container_name: broker - ports: - - 29092:29092 - environment: - KAFKA_BROKER_ID: 1 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092 - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_PROCESS_ROLES: broker,controller - KAFKA_NODE_ID: 1 - KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093 - KAFKA_LISTENERS: PLAINTEXT://broker:9092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:29092 - KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT - KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER - KAFKA_LOG_DIRS: /tmp/kraft-combined-logs - CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk diff --git a/confluent-parallel-consumer-application/kafka/code/settings.gradle b/confluent-parallel-consumer-application/kafka/code/settings.gradle deleted file mode 100644 index 856202d1..00000000 --- a/confluent-parallel-consumer-application/kafka/code/settings.gradle +++ /dev/null @@ -1,10 +0,0 @@ -/* - * This file was generated by the Gradle 'init' task. - * - * The settings file is used to specify which projects to include in your build. - * - * Detailed information about configuring a multi-project build in Gradle can be found - * in the user manual at https://docs.gradle.org/6.7.1/userguide/multi_project_builds.html - */ - -rootProject.name = 'confluent-parallel-consumer-application' diff --git a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/build-project.sh b/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/build-project.sh deleted file mode 100644 index 81875953..00000000 --- a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/build-project.sh +++ /dev/null @@ -1 +0,0 @@ -./gradlew build diff --git a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/build-uberjar.sh b/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/build-uberjar.sh deleted file mode 100644 index 12ffd144..00000000 --- a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/build-uberjar.sh +++ /dev/null @@ -1 +0,0 @@ -./gradlew shadowJar diff --git a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/clean-up.sh b/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/clean-up.sh deleted file mode 100644 index 36f5aa98..00000000 --- a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/clean-up.sh +++ /dev/null @@ -1 +0,0 @@ -docker compose down diff --git a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/console-consumer.sh b/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/console-consumer.sh deleted file mode 100644 index dd0fb0bd..00000000 --- a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/console-consumer.sh +++ /dev/null @@ -1 +0,0 @@ -docker exec -it schema-registry /usr/bin/kafka-avro-console-consumer --topic output-topic --bootstrap-server broker:9092 --from-beginning diff --git a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/console-producer.sh b/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/console-producer.sh deleted file mode 100644 index b3e99bf0..00000000 --- a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/console-producer.sh +++ /dev/null @@ -1 +0,0 @@ -kafka-console-producer --topic parallel-consumer-input-topic --bootstrap-server broker:9092 --property "parse.key=true" --property "key.separator=:" \ No newline at end of file diff --git a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/create-topic.sh b/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/create-topic.sh deleted file mode 100644 index b95d5340..00000000 --- a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/create-topic.sh +++ /dev/null @@ -1 +0,0 @@ -kafka-topics --create --topic parallel-consumer-input-topic --bootstrap-server broker:9092 --replication-factor 1 --partitions 1 \ No newline at end of file diff --git a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/docker-compose-up.sh b/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/docker-compose-up.sh deleted file mode 100644 index e6fb3f19..00000000 --- a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/docker-compose-up.sh +++ /dev/null @@ -1 +0,0 @@ -docker compose up -d diff --git a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/expected-output.txt b/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/expected-output.txt deleted file mode 100644 index a479dde9..00000000 --- a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/expected-output.txt +++ /dev/null @@ -1,4 +0,0 @@ -All streams lead to Kafka -Go to Current -Go to Kafka Summit -Consume gently down the stream diff --git a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/gradle-wrapper.sh b/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/gradle-wrapper.sh deleted file mode 100644 index 28d02d7b..00000000 --- a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/gradle-wrapper.sh +++ /dev/null @@ -1 +0,0 @@ -gradle wrapper diff --git a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/harness-console-producer.sh b/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/harness-console-producer.sh deleted file mode 100644 index 5cc20ac4..00000000 --- a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/harness-console-producer.sh +++ /dev/null @@ -1 +0,0 @@ -docker exec -i broker kafka-console-producer --topic parallel-consumer-input-topic --bootstrap-server broker:9092 --property "parse.key=true" --property "key.separator=:" \ No newline at end of file diff --git a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/harness-create-topic.sh b/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/harness-create-topic.sh deleted file mode 100644 index 71cbdeae..00000000 --- a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/harness-create-topic.sh +++ /dev/null @@ -1 +0,0 @@ -docker exec -t broker kafka-topics --create --topic example-topic --bootstrap-server broker:9092 --replication-factor 1 --partitions 1 \ No newline at end of file diff --git a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/init.sh b/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/init.sh deleted file mode 100644 index 31a1e1c4..00000000 --- a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/init.sh +++ /dev/null @@ -1 +0,0 @@ -mkdir confluent-parallel-consumer-application && cd confluent-parallel-consumer-application diff --git a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/input.txt b/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/input.txt deleted file mode 100644 index 9fd9e112..00000000 --- a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/input.txt +++ /dev/null @@ -1,4 +0,0 @@ -fun-line:All streams lead to Kafka -event-promo:Go to Current -event-promo:Go to Kafka Summit -fun-line:Consume gently down the stream diff --git a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/make-avro-dir.sh b/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/make-avro-dir.sh deleted file mode 100644 index 65bfafb4..00000000 --- a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/make-avro-dir.sh +++ /dev/null @@ -1 +0,0 @@ -mkdir -p src/main/avro diff --git a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/make-configuration-dir.sh b/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/make-configuration-dir.sh deleted file mode 100644 index 878943c6..00000000 --- a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/make-configuration-dir.sh +++ /dev/null @@ -1 +0,0 @@ -mkdir configuration diff --git a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/make-src-dir.sh b/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/make-src-dir.sh deleted file mode 100644 index 4caa4290..00000000 --- a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/make-src-dir.sh +++ /dev/null @@ -1 +0,0 @@ -mkdir -p src/main/java/io/confluent/developer diff --git a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/open-docker-shell.sh b/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/open-docker-shell.sh deleted file mode 100644 index 4b1bccf7..00000000 --- a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/open-docker-shell.sh +++ /dev/null @@ -1 +0,0 @@ -docker exec -it broker bash \ No newline at end of file diff --git a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/print-consumer-file-results.sh b/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/print-consumer-file-results.sh deleted file mode 100644 index 66841158..00000000 --- a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/print-consumer-file-results.sh +++ /dev/null @@ -1 +0,0 @@ -cat topic-output.txt \ No newline at end of file diff --git a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/run-dev-app.sh b/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/run-dev-app.sh deleted file mode 100644 index e27255c1..00000000 --- a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/run-dev-app.sh +++ /dev/null @@ -1 +0,0 @@ -java -cp build/libs/confluent-parallel-consumer-application-standalone-0.0.1-all.jar io.confluent.developer.ParallelConsumerApplication configuration/dev.properties diff --git a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/wait-for-containers.sh b/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/wait-for-containers.sh deleted file mode 100755 index 9d7e648d..00000000 --- a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/wait-for-containers.sh +++ /dev/null @@ -1,14 +0,0 @@ -#!/bin/bash - -function readiness_probe { - nc -z -w 2 0.0.0.0 29092 -} - -echo "Waiting for the broker to become available ..." - -readiness_probe - -while [[ $? != 0 ]]; do - sleep 5 - readiness_probe -done diff --git a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/perftest/ccloud-cat-config.sh b/confluent-parallel-consumer-application/kafka/code/tutorial-steps/perftest/ccloud-cat-config.sh deleted file mode 100644 index 59cbbca2..00000000 --- a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/perftest/ccloud-cat-config.sh +++ /dev/null @@ -1,2 +0,0 @@ -cat configuration/ccloud.properties >> configuration/perftest-kafka-consumer.properties -cat configuration/ccloud.properties >> configuration/perftest-parallel-consumer.properties diff --git a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/perftest/console-producer.sh b/confluent-parallel-consumer-application/kafka/code/tutorial-steps/perftest/console-producer.sh deleted file mode 100644 index fd11946a..00000000 --- a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/perftest/console-producer.sh +++ /dev/null @@ -1 +0,0 @@ -seq 1 10000 | kafka-console-producer --topic perftest-parallel-consumer-input-topic --bootstrap-server broker:9092 \ No newline at end of file diff --git a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/perftest/create-perftest-topic.sh b/confluent-parallel-consumer-application/kafka/code/tutorial-steps/perftest/create-perftest-topic.sh deleted file mode 100644 index c8d1a0b6..00000000 --- a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/perftest/create-perftest-topic.sh +++ /dev/null @@ -1 +0,0 @@ -kafka-topics --create --topic perftest-parallel-consumer-input-topic --bootstrap-server broker:9092 --replication-factor 1 --partitions 6 \ No newline at end of file diff --git a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/perftest/kafka-cat-config.sh b/confluent-parallel-consumer-application/kafka/code/tutorial-steps/perftest/kafka-cat-config.sh deleted file mode 100644 index 8ecd5da5..00000000 --- a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/perftest/kafka-cat-config.sh +++ /dev/null @@ -1,2 +0,0 @@ -cat configuration/dev.properties >> configuration/perftest-kafka-consumer.properties -cat configuration/dev.properties >> configuration/perftest-parallel-consumer.properties diff --git a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/perftest/run-kafka-consumer-perftest.sh b/confluent-parallel-consumer-application/kafka/code/tutorial-steps/perftest/run-kafka-consumer-perftest.sh deleted file mode 100644 index 6c4b725a..00000000 --- a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/perftest/run-kafka-consumer-perftest.sh +++ /dev/null @@ -1 +0,0 @@ -java -cp build/libs/confluent-parallel-consumer-application-standalone-0.0.1-all.jar io.confluent.developer.MultithreadedKafkaConsumerPerfTest configuration/perftest-kafka-consumer.properties diff --git a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/perftest/run-parallel-consumer-perftest.sh b/confluent-parallel-consumer-application/kafka/code/tutorial-steps/perftest/run-parallel-consumer-perftest.sh deleted file mode 100644 index 3c6f03f7..00000000 --- a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/perftest/run-parallel-consumer-perftest.sh +++ /dev/null @@ -1 +0,0 @@ -java -cp build/libs/confluent-parallel-consumer-application-standalone-0.0.1-all.jar io.confluent.developer.ParallelConsumerPerfTest configuration/perftest-parallel-consumer.properties diff --git a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/test/invoke-tests.sh b/confluent-parallel-consumer-application/kafka/code/tutorial-steps/test/invoke-tests.sh deleted file mode 100644 index d2b34833..00000000 --- a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/test/invoke-tests.sh +++ /dev/null @@ -1 +0,0 @@ -./gradlew test diff --git a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/test/make-test-dir.sh b/confluent-parallel-consumer-application/kafka/code/tutorial-steps/test/make-test-dir.sh deleted file mode 100644 index 589222d8..00000000 --- a/confluent-parallel-consumer-application/kafka/code/tutorial-steps/test/make-test-dir.sh +++ /dev/null @@ -1 +0,0 @@ -mkdir -p src/test/java/io/confluent/developer diff --git a/confluent-parallel-consumer-application/kafka/gradle/libs.versions.toml b/confluent-parallel-consumer-application/kafka/gradle/libs.versions.toml new file mode 100644 index 00000000..4ac3234a --- /dev/null +++ b/confluent-parallel-consumer-application/kafka/gradle/libs.versions.toml @@ -0,0 +1,2 @@ +# This file was generated by the Gradle 'init' task. +# https://docs.gradle.org/current/userguide/platforms.html#sub::toml-dependencies-format diff --git a/confluent-parallel-consumer-application/kafka/code/gradle/wrapper/gradle-wrapper.properties b/confluent-parallel-consumer-application/kafka/gradle/wrapper/gradle-wrapper.properties similarity index 74% rename from confluent-parallel-consumer-application/kafka/code/gradle/wrapper/gradle-wrapper.properties rename to confluent-parallel-consumer-application/kafka/gradle/wrapper/gradle-wrapper.properties index aa991fce..1af9e093 100644 --- a/confluent-parallel-consumer-application/kafka/code/gradle/wrapper/gradle-wrapper.properties +++ b/confluent-parallel-consumer-application/kafka/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,7 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.4.2-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-bin.zip +networkTimeout=10000 +validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/confluent-parallel-consumer-application/kafka/code/gradlew b/confluent-parallel-consumer-application/kafka/gradlew similarity index 85% rename from confluent-parallel-consumer-application/kafka/code/gradlew rename to confluent-parallel-consumer-application/kafka/gradlew index 1b6c7873..1aa94a42 100755 --- a/confluent-parallel-consumer-application/kafka/code/gradlew +++ b/confluent-parallel-consumer-application/kafka/gradlew @@ -55,7 +55,7 @@ # Darwin, MinGW, and NonStop. # # (3) This script is generated from the Groovy template -# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt # within the Gradle project. # # You can find Gradle at https://github.com/gradle/gradle/. @@ -80,13 +80,11 @@ do esac done -APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit - -APP_NAME="Gradle" +# This is normally unused +# shellcheck disable=SC2034 APP_BASE_NAME=${0##*/} - -# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' +# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036) +APP_HOME=$( cd "${APP_HOME:-./}" > /dev/null && pwd -P ) || exit # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD=maximum @@ -133,22 +131,29 @@ location of your Java installation." fi else JAVACMD=java - which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + if ! command -v java >/dev/null 2>&1 + then + die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. Please set the JAVA_HOME variable in your environment to match the location of your Java installation." + fi fi # Increase the maximum file descriptors if we can. if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then case $MAX_FD in #( max*) + # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC2039,SC3045 MAX_FD=$( ulimit -H -n ) || warn "Could not query maximum file descriptor limit" esac case $MAX_FD in #( '' | soft) :;; #( *) + # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC2039,SC3045 ulimit -n "$MAX_FD" || warn "Could not set maximum file descriptor limit to $MAX_FD" esac @@ -193,11 +198,15 @@ if "$cygwin" || "$msys" ; then done fi -# Collect all arguments for the java command; -# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of -# shell script including quotes and variable substitutions, so put them in -# double quotes to make sure that they get re-expanded; and -# * put everything else in single quotes, so that it's not re-expanded. + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Collect all arguments for the java command: +# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments, +# and any embedded shellness will be escaped. +# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be +# treated as '${Hostname}' itself on the command line. set -- \ "-Dorg.gradle.appname=$APP_BASE_NAME" \ @@ -205,6 +214,12 @@ set -- \ org.gradle.wrapper.GradleWrapperMain \ "$@" +# Stop when "xargs" is not available. +if ! command -v xargs >/dev/null 2>&1 +then + die "xargs is not available" +fi + # Use "xargs" to parse quoted args. # # With -n1 it outputs one arg per line, with the quotes and backslashes removed. diff --git a/confluent-parallel-consumer-application/kafka/code/gradlew.bat b/confluent-parallel-consumer-application/kafka/gradlew.bat similarity index 89% rename from confluent-parallel-consumer-application/kafka/code/gradlew.bat rename to confluent-parallel-consumer-application/kafka/gradlew.bat index 107acd32..93e3f59f 100644 --- a/confluent-parallel-consumer-application/kafka/code/gradlew.bat +++ b/confluent-parallel-consumer-application/kafka/gradlew.bat @@ -14,7 +14,7 @@ @rem limitations under the License. @rem -@if "%DEBUG%" == "" @echo off +@if "%DEBUG%"=="" @echo off @rem ########################################################################## @rem @rem Gradle startup script for Windows @@ -25,7 +25,8 @@ if "%OS%"=="Windows_NT" setlocal set DIRNAME=%~dp0 -if "%DIRNAME%" == "" set DIRNAME=. +if "%DIRNAME%"=="" set DIRNAME=. +@rem This is normally unused set APP_BASE_NAME=%~n0 set APP_HOME=%DIRNAME% @@ -40,7 +41,7 @@ if defined JAVA_HOME goto findJavaFromJavaHome set JAVA_EXE=java.exe %JAVA_EXE% -version >NUL 2>&1 -if "%ERRORLEVEL%" == "0" goto execute +if %ERRORLEVEL% equ 0 goto execute echo. echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. @@ -75,13 +76,15 @@ set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar :end @rem End local scope for the variables with windows NT shell -if "%ERRORLEVEL%"=="0" goto mainEnd +if %ERRORLEVEL% equ 0 goto mainEnd :fail rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of rem the _cmd.exe /c_ return code! -if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 -exit /b 1 +set EXIT_CODE=%ERRORLEVEL% +if %EXIT_CODE% equ 0 set EXIT_CODE=1 +if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE% +exit /b %EXIT_CODE% :mainEnd if "%OS%"=="Windows_NT" endlocal diff --git a/confluent-parallel-consumer-application/kafka/markup/dev/build-uberjar.adoc b/confluent-parallel-consumer-application/kafka/markup/dev/build-uberjar.adoc deleted file mode 100644 index 1664eac8..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/dev/build-uberjar.adoc +++ /dev/null @@ -1,5 +0,0 @@ -In your terminal, run: - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/build-uberjar.sh %}
-+++++ diff --git a/confluent-parallel-consumer-application/kafka/markup/dev/create-topic.adoc b/confluent-parallel-consumer-application/kafka/markup/dev/create-topic.adoc deleted file mode 100644 index 400481d7..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/dev/create-topic.adoc +++ /dev/null @@ -1,17 +0,0 @@ - -In this step we're going to create a topic for use during this tutorial. - -But first, you're going to open a shell on the broker docker container. - -Open a new terminal and window then run this command: -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/open-docker-shell.sh %}
-+++++ - -Now use the following command to create the topic: - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/create-topic.sh %}
-+++++ - -Keep this terminal window open as you'll need to run a console-producer in a few steps. diff --git a/confluent-parallel-consumer-application/kafka/markup/dev/explain-properties.adoc b/confluent-parallel-consumer-application/kafka/markup/dev/explain-properties.adoc deleted file mode 100644 index c2da3154..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/dev/explain-properties.adoc +++ /dev/null @@ -1,9 +0,0 @@ -Let's do a quick overview of some of the more important properties here: - -The `key.deserializer` and `value.deserializer` properties provide a class implementing the `Deserializer` interface for converting `byte` arrays into the expected object type of the key and value respectively. - -The `max.poll.interval.ms` is the maximum amount of time a consumer may take between calls to `Consumer.poll()`. If a consumer instance takes longer than the specified time, it's considered non-responsive and removed from the consumer-group triggering a rebalance. - -Setting `enable.auto.commit` configuration to `false` is required because the Confluent Parallel Consumer handles committing offsets in order to achieve fault tolerance. - -`auto.offset.reset` - If a consumer instance can't locate any offsets for its topic-partition assignment(s), it will resume processing from the _**earliest**_ available offset. diff --git a/confluent-parallel-consumer-application/kafka/markup/dev/init.adoc b/confluent-parallel-consumer-application/kafka/markup/dev/init.adoc deleted file mode 100644 index abb9e137..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/dev/init.adoc +++ /dev/null @@ -1,5 +0,0 @@ -Make a local directory anywhere you'd like for this project: - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/init.sh %}
-+++++ diff --git a/confluent-parallel-consumer-application/kafka/markup/dev/make-build-file.adoc b/confluent-parallel-consumer-application/kafka/markup/dev/make-build-file.adoc deleted file mode 100644 index 5bae1813..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/dev/make-build-file.adoc +++ /dev/null @@ -1,9 +0,0 @@ -In order to build the project, first https://gradle.org/install/[install Gradle] 7.5 or later if you don't already have it. -Create the following Gradle build file, named `build.gradle` for the project. Note the `parallel-consumer-core` dependency, -which is available in Maven Central. This artifact includes the Confluent Parallel Consumer's core API. -There are also separate modules for using the Confluent Parallel Consumer with reactive API frameworks like Vert.x (`parallel-consumer-vertx`) -and Reactor (`parallel-consumer-reactor`). These modules are out of scope for this introductory tutorial. - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/build.gradle %}
-+++++ diff --git a/confluent-parallel-consumer-application/kafka/markup/dev/make-config-dir.adoc b/confluent-parallel-consumer-application/kafka/markup/dev/make-config-dir.adoc deleted file mode 100644 index ab3fcbc5..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/dev/make-config-dir.adoc +++ /dev/null @@ -1,5 +0,0 @@ -Next, create a directory for configuration data: - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/make-configuration-dir.sh %}
-+++++ diff --git a/confluent-parallel-consumer-application/kafka/markup/dev/make-config-file.adoc b/confluent-parallel-consumer-application/kafka/markup/dev/make-config-file.adoc deleted file mode 100644 index 965cb899..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/dev/make-config-file.adoc +++ /dev/null @@ -1,5 +0,0 @@ -Then create a development configuration file at `configuration/dev.properties`: - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/configuration/dev.properties %}
-+++++ diff --git a/confluent-parallel-consumer-application/kafka/markup/dev/make-consumer-app.adoc b/confluent-parallel-consumer-application/kafka/markup/dev/make-consumer-app.adoc deleted file mode 100644 index d174a3cb..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/dev/make-consumer-app.adoc +++ /dev/null @@ -1,79 +0,0 @@ - -To complete this introductory application, you'll build a main application class and a couple of supporting classes. - - -First, you'll create the main application,`ParallelConsumerApplication`, which is the focal point of this tutorial; consuming records from a Kafka topic using the Confluent Parallel Consumer. - -Go ahead and copy the following into a file `src/main/java/io/confluent/developer/ParallelConsumerApplication.java`: - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/src/main/java/io/confluent/developer/ParallelConsumerApplication.java %}
-+++++ - -Let's go over some of the key parts of the `ParallelConsumerApplication` starting with the constructor: - -[source, java] -.ParallelConsumerApplication constructor ----- - public ParallelConsumerApplication(final ParallelStreamProcessor parallelConsumer, - final ConsumerRecordHandler recordHandler) { - this.parallelConsumer = parallelConsumer; - this.recordHandler = recordHandler; - } ----- - -Her we supply instances of the Confluent Parallel Consumer's `ParallelStreamProcessor` and the application's `ConsumerRecordHandler` via constructor parameters. - -The abstract `ConsumerRecordHandler` class makes it simple to change `ConsumerRecord` handling without having to change much code. - -In this tutorial you'll inject the dependencies in the `ParallelConsumerApplication.main()` method, but in practice you may want to use a dependency injection framework library, such as the https://spring.io/projects/spring-framework[Spring Framework]. - - -Next, let's review the `ParallelConsumerApplication.runConsume()` method, which provides the core functionality of this tutorial. - -[source, java] -.ParallelConsumerApplication.runConsume ----- - public void runConsume(final Properties appProperties) { - String topic = appProperties.getProperty("input.topic.name"); - - LOGGER.info("Subscribing Parallel Consumer to consume from {} topic", topic); - parallelConsumer.subscribe(Collections.singletonList(topic)); // <1> - - LOGGER.info("Polling for records. This method blocks", topic); - parallelConsumer.poll(context -> recordHandler.processRecord(context.getSingleConsumerRecord())); // <2> - } ----- - -<1> Subscribing to the Kafka topic. -<2> Simply `poll` once. With the Confluent Parallel Consumer, you call `poll` only once and it will poll indefinitely, -calling the lambda that you supply for each message. The library handles everything for you subject to how you configure -the Parallel Consumer. - -Speaking of configuration, this snippet instantiates the `ParallelStreamProcessor` that our application's -constructor expects: - -[source, java] ----- - final Consumer consumer = new KafkaConsumer<>(appProperties); // <1> - final ParallelConsumerOptions options = ParallelConsumerOptions.builder() // <2> - .ordering(KEY) // <3> - .maxConcurrency(16) // <4> - .consumer(consumer) // <5> - .commitMode(PERIODIC_CONSUMER_SYNC) // <6> - .build(); - ParallelStreamProcessor eosStreamProcessor = createEosStreamProcessor(options); // <7> ----- - -<1> Create the Apache Kafka Consumer that the Confluent Parallel Consumer wraps. -<2> Create the Parallel Consumer configuration via builder pattern. -<3> Specify consumption ordering by key. -<4> Specify the degree of parallelism. Here we specify 16 threads for illustrative purposes only (the application only consumes 3 records). -<5> The Apache Kafka Consumer that we are wrapping. -<6> Here we specify how to commit offsets. `PERIODIC_CONSUMER_SYNC` will block the Parallel Consumer's processing loop until a successful commit response is received. Asynchronous is also supported, which optimizes for -consumption throughput (the downside being higher risk of needing to process duplicate messages in error recovery scenarios). -<7> Create a `ParallelStreamProcessor` with the previously created configuration. This is the object we use to consume in lieu of a `KafkaConsumer`. - -Note that, by ordering by key, we can consume with a much higher degree of parallelism than we can with a vanilla consumer group (i.e., the number of topic partitions). -While a given input topic may not have many partitions, it may have a large number of unique keys. Each of these key → message sets can actually be processed concurrently. In other -words, regardless of the number of input partitions, the effective concurrency limit achievable with the Confluent Parallel Consumer is the number of unique keys across all messages in a topic. \ No newline at end of file diff --git a/confluent-parallel-consumer-application/kafka/markup/dev/make-docker-compose.adoc b/confluent-parallel-consumer-application/kafka/markup/dev/make-docker-compose.adoc deleted file mode 100644 index 9dd2951a..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/dev/make-docker-compose.adoc +++ /dev/null @@ -1,5 +0,0 @@ -Next, create the following `docker-compose.yml` file to obtain Confluent Platform (for Kafka in the cloud, see https://www.confluent.io/confluent-cloud/tryfree/[Confluent Cloud]): - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/docker-compose.yml %}
-+++++ diff --git a/confluent-parallel-consumer-application/kafka/markup/dev/make-gradle-wrapper.adoc b/confluent-parallel-consumer-application/kafka/markup/dev/make-gradle-wrapper.adoc deleted file mode 100644 index 26afc9e5..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/dev/make-gradle-wrapper.adoc +++ /dev/null @@ -1,5 +0,0 @@ -And be sure to run the following command to obtain the Gradle wrapper, which we will use to execute the build. The Gradle wrapper is a best practice ancillary build script that enables developers to more easily collaborate on Gradle projects by ensuring that developers all use the same correct Gradle version for the project (downloading Gradle at build time if necessary). - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/gradle-wrapper.sh %}
-+++++ diff --git a/confluent-parallel-consumer-application/kafka/markup/dev/make-src-dir.adoc b/confluent-parallel-consumer-application/kafka/markup/dev/make-src-dir.adoc deleted file mode 100644 index dc0a105a..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/dev/make-src-dir.adoc +++ /dev/null @@ -1,5 +0,0 @@ -Create a directory for the Java files in this project: - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/make-src-dir.sh %}
-+++++ diff --git a/confluent-parallel-consumer-application/kafka/markup/dev/make-supporting-classes.adoc b/confluent-parallel-consumer-application/kafka/markup/dev/make-supporting-classes.adoc deleted file mode 100644 index 1f4d5605..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/dev/make-supporting-classes.adoc +++ /dev/null @@ -1,45 +0,0 @@ -To complete this tutorial, you'll need to also create an abstract class that we will extend to process messages as we consume them. This -abstract class, `ConsumerRecordHandler`, encapsulates tracking the number of records processed, which will be useful later on when we run -performance tests and want to terminate the test application after consuming an expected number of records. - -First create the abstract class at `src/main/java/io/confluent/developer/ConsumerRecordHandler.java` - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/src/main/java/io/confluent/developer/ConsumerRecordHandler.java %}
-+++++ - -Using this abstract class will make it easier to change how you want to work with a `ConsumerRecord` without having to modify all of your existing code. - -Next you'll extend the `ConsumerRecordHandler` abstract class with a concrete class named `FileWritingRecordHandler`. Copy the following into file `src/main/java/io/confluent/developer/FileWritingRecordHandler.java`: - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/src/main/java/io/confluent/developer/FileWritingRecordHandler.java %}
-+++++ - -Let's take a peek under the hood at this class's `processRecordImpl` method, which gets calls for each record consumed: - -[source, java] -.FileWritingRecordHandler.processRecordImpl ----- - @Override - protected void processRecordImpl(final ConsumerRecord consumerRecord) { - try { - Files.write(path, singletonList(consumerRecord.value()), CREATE, WRITE, APPEND); // <1> - } catch (IOException e) { - throw new RuntimeException("unable to write record to file", e); - } - } ----- -<1> Simply write the record value to a file. - -In practice you're certain to perform a more realistic task for each record. - -Finally, create a utility class `PropertiesUtil` that we use in our consumer application to load Kafka Consumer and -application-specific properties. We'll also use this class in the two performance testing applications that we will create -later in this tutorial. - -Go ahead and create the `src/main/java/io/confluent/developer/PropertiesUtil.java` file: - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/src/main/java/io/confluent/developer/PropertiesUtil.java %}
-+++++ \ No newline at end of file diff --git a/confluent-parallel-consumer-application/kafka/markup/dev/print-consumer-file-results.adoc b/confluent-parallel-consumer-application/kafka/markup/dev/print-consumer-file-results.adoc deleted file mode 100644 index 7c6be1ff..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/dev/print-consumer-file-results.adoc +++ /dev/null @@ -1,19 +0,0 @@ -Your Confluent Parallel Consumer application should have consumed all the records sent and written them out to a file. - -In a new terminal, run this command to print the results to the console: - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/print-consumer-file-results.sh %}
-+++++ - -You should see something like this: - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/expected-output.txt %}
-+++++ - -Note that because we configured the Confluent Parallel Consumer to use `KEY` ordering, `Go to Current` appears before `Go to Kafka Summit` -because these values have the same `event-promo` key. Similarly, `All streams lead to Kafka` appears before `Consume gently down the stream` -because these values have the same `fun-line` key. - -At this point you can stop the Confluent Parallel Consumer application with `Ctrl-C` in the terminal window where it's running. diff --git a/confluent-parallel-consumer-application/kafka/markup/dev/run-dev-app.adoc b/confluent-parallel-consumer-application/kafka/markup/dev/run-dev-app.adoc deleted file mode 100644 index 96d14a14..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/dev/run-dev-app.adoc +++ /dev/null @@ -1,5 +0,0 @@ -Now that you have an uberjar for the `ParallelConsumerApplication`, you can launch it locally. When you run the following, the prompt won't return, because the application will run until you exit it. There is always another message to process, so streaming applications don't exit until you force them. - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/run-dev-app.sh %}
-+++++ diff --git a/confluent-parallel-consumer-application/kafka/markup/dev/run-producer.adoc b/confluent-parallel-consumer-application/kafka/markup/dev/run-producer.adoc deleted file mode 100644 index b4dab05b..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/dev/run-producer.adoc +++ /dev/null @@ -1,22 +0,0 @@ -//// - Example content file for how to include a console producer(s) in the tutorial. - Usually you'll include a line referencing the script to run the console producer and also include some content - describing how to input data as shown below. - - Again modify this file as you need for your tutorial, as this is just sample content. You also may have more than one - console producer to run depending on how you structure your tutorial - -//// - -Using the terminal window you opened in step three, run the following command to start a console-producer: - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/console-producer.sh %}
-+++++ - - -Each line represents input data for the Confluent Parallel Consumer application. To send all of the events below, paste the following into the prompt and press enter: - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/input.txt %}
-+++++ diff --git a/confluent-parallel-consumer-application/kafka/markup/dev/start-compose.adoc b/confluent-parallel-consumer-application/kafka/markup/dev/start-compose.adoc deleted file mode 100644 index 0d9b9a3e..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/dev/start-compose.adoc +++ /dev/null @@ -1,5 +0,0 @@ -And launch it by running: - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/docker-compose-up.sh %}
-+++++ diff --git a/confluent-parallel-consumer-application/kafka/markup/perftest/append-ccloud-config.adoc b/confluent-parallel-consumer-application/kafka/markup/perftest/append-ccloud-config.adoc deleted file mode 100644 index f51e63ca..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/perftest/append-ccloud-config.adoc +++ /dev/null @@ -1,7 +0,0 @@ -Using the command below, append the contents of `configuration/ccloud.properties` (with your Confluent Cloud configuration) -to `configuration/perftest-kafka-consumer.properties` and `configuration/perftest-parallel-consumer.properties`: - - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/tutorial-steps/perftest/ccloud-cat-config.sh %}
-+++++ diff --git a/confluent-parallel-consumer-application/kafka/markup/perftest/append-kafka-config.adoc b/confluent-parallel-consumer-application/kafka/markup/perftest/append-kafka-config.adoc deleted file mode 100644 index 732138e0..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/perftest/append-kafka-config.adoc +++ /dev/null @@ -1,7 +0,0 @@ -Using the command below, append the contents of `configuration/dev.properties` -to `configuration/perftest-kafka-consumer.properties` and `configuration/perftest-parallel-consumer.properties`: - - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/tutorial-steps/perftest/kafka-cat-config.sh %}
-+++++ diff --git a/confluent-parallel-consumer-application/kafka/markup/perftest/build-uberjar.adoc b/confluent-parallel-consumer-application/kafka/markup/perftest/build-uberjar.adoc deleted file mode 100644 index 40313061..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/perftest/build-uberjar.adoc +++ /dev/null @@ -1,5 +0,0 @@ -Let's rebuild the uberjar to include this performance test. In your terminal, run: - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/build-uberjar.sh %}
-+++++ diff --git a/confluent-parallel-consumer-application/kafka/markup/perftest/create-perftest-topic.adoc b/confluent-parallel-consumer-application/kafka/markup/perftest/create-perftest-topic.adoc deleted file mode 100644 index 0bc8090e..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/perftest/create-perftest-topic.adoc +++ /dev/null @@ -1,17 +0,0 @@ - -In this step we're going to create a topic for use during this tutorial. - -But first, you're going to open a shell on the broker docker container. - -Open a new terminal and window then run this command: -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/tutorial-steps/dev/open-docker-shell.sh %}
-+++++ - -Now use the following command to create the topic: - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/tutorial-steps/perftest/create-perftest-topic.sh %}
-+++++ - -Keep this terminal window open as you'll need to run a console-producer in a few steps. diff --git a/confluent-parallel-consumer-application/kafka/markup/perftest/make-config-file.adoc b/confluent-parallel-consumer-application/kafka/markup/perftest/make-config-file.adoc deleted file mode 100644 index 55bac717..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/perftest/make-config-file.adoc +++ /dev/null @@ -1,25 +0,0 @@ -Then create two performance test configuration files. The first is for performance testing a multi-threaded `KafkaConsumer`-based -performance test that we'll use to set a baseline. Create this file at `configuration/perftest-kafka-consumer.properties`: - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/configuration/perftest-kafka-consumer.properties %}
-+++++ - -Then create this file at `configuration/perftest-parallel-consumer.properties`: - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/configuration/perftest-parallel-consumer.properties %}
-+++++ - -Let's look at some of the more important properties in these configuration files: - -. We specify `fetch.min.bytes` to be 100000 in order to https://docs.confluent.io/cloud/current/client-apps/optimizing/throughput.html#consumer-fetching[optimize for consumer throughput] -. The application-specific property `records.to.consume` is set to `10000` to match the number of records that we produced in the previous step. This will cause the application to terminate upon consuming this many records. -. The application-specific property `record.handler.sleep.ms` is used to simulate a nontrivial amount of work to perform per record. In this case, we sleep for 20ms to simulate a low-but-nontrivial latency operation like a call to a database or REST API. - -In the configuration file for the Confluent Parallel Consumer performance test, there are a few Confluent Parallel Consumer-specific properties. - -. `parallel.consumer.max.concurrency` is set to `256`, much higher than the number of partitions in our topic -. We use `UNORDERED` ordering, `PERIODIC_CONSUMER_ASYNCHRONOUS` offset commit mode, and a high `parallel.consumer.seconds.between.commits` value of 60 seconds. - Together, these values optimize for throughput. This keeps our test analogous to the `KafkaConsumer`-based baseline. You may have noticed that, - because we are aiming to maximize throughput in these performance tests while ignoring the overhead of offsets handling, the baseline doesn't even commit offsets! \ No newline at end of file diff --git a/confluent-parallel-consumer-application/kafka/markup/perftest/make-kafka-consumer-perftest.adoc b/confluent-parallel-consumer-application/kafka/markup/perftest/make-kafka-consumer-perftest.adoc deleted file mode 100644 index 75e2e43e..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/perftest/make-kafka-consumer-perftest.adoc +++ /dev/null @@ -1,20 +0,0 @@ -Here you'll build a performance test application and supporting classes that implement -multi-threaded consuming (one `KafkaConsumer` per-partition to maximize parallelism). - -First, you'll create the main performance test application, `src/main/java/io/confluent/developer/MultithreadedKafkaConsumerPerfTest.java`: - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/src/main/java/io/confluent/developer/MultithreadedKafkaConsumerPerfTest.java %}
-+++++ - -Second, create the class that implements multi-threaded consuming, `src/main/java/io/confluent/developer/MultithreadedKafkaConsumer.java`: - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/src/main/java/io/confluent/developer/MultithreadedKafkaConsumer.java %}
-+++++ - -Finally, create the record handler that sleeps 20ms per record consumed, `src/main/java/io/confluent/developer/SleepingRecordHandler.java`: - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/src/main/java/io/confluent/developer/SleepingRecordHandler.java %}
-+++++ diff --git a/confluent-parallel-consumer-application/kafka/markup/perftest/make-parallel-consumer-perftest.adoc b/confluent-parallel-consumer-application/kafka/markup/perftest/make-parallel-consumer-perftest.adoc deleted file mode 100644 index c2faf9f3..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/perftest/make-parallel-consumer-perftest.adoc +++ /dev/null @@ -1,27 +0,0 @@ -Here you'll build a performance test application based on the Confluent Parallel Consumer. This test reuses a couple of classes -that we created previously: `PropertiesUtil` for loading consumer and application-specific properties, and `SleepingRecordHandler` -for simulating a nontrivial workload per-record just as we did in `MultithreadedKafkaConsumerPerfTest`. Please rewind -and create these if you skipped the parts of the tutorial that create these two classes. - -Because the Confluent Parallel Consumer API is much lighter weight than the lift required to multi-thread `KafkaConsumer` instances -per partition, let's knock out the entire thing in one class. Create the file `src/main/java/io/confluent/developer/ParallelConsumerPerfTest.java`: - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/src/main/java/io/confluent/developer/ParallelConsumerPerfTest.java %}
-+++++ - -Take a look at the code and note the simplicity. Most of the code is for properties file handling and tracking progress. The interesting part relevant to the Confluent Parallel Consumer -is in the four-line `runConsume()` method: - -[source, java] -.ParallelConsumerPerfTest.runConsume ----- - private void runConsume(final Properties appProperties) { - parallelConsumer.subscribe(Collections.singletonList(appProperties.getProperty("input.topic.name"))); - parallelConsumer.poll(context -> { - recordHandler.processRecord(context.getSingleConsumerRecord()); - }); - } ----- - -Bellisimo! diff --git a/confluent-parallel-consumer-application/kafka/markup/perftest/perf-test-extensions.adoc b/confluent-parallel-consumer-application/kafka/markup/perftest/perf-test-extensions.adoc deleted file mode 100644 index 690a1c80..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/perftest/perf-test-extensions.adoc +++ /dev/null @@ -1,19 +0,0 @@ -In this section of the tutorial, we created a performance test for the Confluent Parallel Consumer, and a `KafkaConsumer` baseline to which to compare. - -This gave us a couple of data points, but only for one specific test context: each test aimed to consume records as quickly as possible in a single JVM while simulating a 20ms workload per-record. - -We can turn a few knobs and pull some levers to gather more performance test results in other application contexts. Since we used helper classes and parameterized configuration in this tutorial, you can easily choose other performance test adventures. -Some questions you might explore: - -. How does performance compare if we increase or decrease the simulated workload time? -. What if we commit offsets more frequently or even synchronously or transactionally in each test? - In the case of the Confluent Parallel Consumer, this entails setting `parallel.consumer.seconds.between.commits` to a value lower than 60 seconds, - and using a `parallel.consumer.commit.mode` of `PERIODIC_CONSUMER_SYNC` or `PERIODIC_TRANSACTIONAL_PRODUCER`. - These commit modes better simulate an application designed to more easily pick up where it left off when recovering from an error. -. What if we change the properties of the `KafkaConsumer` instance(s) most relevant to throughput (`fetch.min.bytes` and `max.poll.records`)? -. What if we use `KEY` or `PARTITION` ordering when configuring the Confluent Parallel Consumer (as opposed to `UNORDERED`)? -. How does the throughput comparison change if we create `perftest-parallel-consumer-input-topic` with more (or fewer) partitions? -. What if we use larger, more realistic records and not just integers from 1 to 10,000? What if we also play with different - key spaces? - -Have fun with it! \ No newline at end of file diff --git a/confluent-parallel-consumer-application/kafka/markup/perftest/run-kafka-consumer-perftest.adoc b/confluent-parallel-consumer-application/kafka/markup/perftest/run-kafka-consumer-perftest.adoc deleted file mode 100644 index 10d25c56..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/perftest/run-kafka-consumer-perftest.adoc +++ /dev/null @@ -1,25 +0,0 @@ -Now that you have an uberjar containing `MultithreadedKafkaConsumerPerfTest`, you can launch it locally. -This will run until the expected 10,000 records have been consumed. Ensure that the `seq` command that you ran previously to -produce 10,000 records has completed before running this so that we can accurately test consumption throughput. - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/tutorial-steps/perftest/run-kafka-consumer-perftest.sh %}
-+++++ - -While the performance test runs, take a few sips of the beverage that you previously poured. It will take a minute or -two to complete, and the final line output will show you the latency for consuming all 10,000 records, e.g.: - -+++++ -
[main] INFO io.confluent.developer.MultithreadedKafkaConsumer - Total time to consume 10000 records: 40.46 seconds
-+++++ - -Before we build and run a Confluent Parallel Consumer analogue to this `KafkaConsumer` baseline, let's summarize what we've seen so far: - -. We populated a topic with default properties and produced 10,000 small records to it -. We maxed out the size of our consumer group by running a `KafkaConsumer` per partition, with each instance explicitly assigned to one partition -. We optimized each `KafkaConsumer` for throughput by setting high values for `max.poll.records` and `fetch.min.bytes` -. We struck a balance between latency accuracy and instrumentation overhead needed to track progress and - end when expected by using a 0.5 second `poll` timeout. (We want to report consumption latency shortly after consumption finishes, - but we also want to minimize busy waiting of the `KafkaConsumer` instances that finish first.) -. We scratched our head writing some tricky multi-threaded code. By the way, is any multi-threaded code not tricky? -. The reported performance test latency was *40.46 seconds* in our case (your number is surely different). \ No newline at end of file diff --git a/confluent-parallel-consumer-application/kafka/markup/perftest/run-parallel-consumer-perftest.adoc b/confluent-parallel-consumer-application/kafka/markup/perftest/run-parallel-consumer-perftest.adoc deleted file mode 100644 index 48ea8076..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/perftest/run-parallel-consumer-perftest.adoc +++ /dev/null @@ -1,18 +0,0 @@ -Now that you have an uberjar containing `ParallelConsumerPerfTest`, you can launch it locally. -This will run until the expected 10,000 records have been consumed. Ensure that the `seq` command that you ran previously to -produce 10,000 records has completed before running this so that we can accurately test consumption throughput. - -As you kick this off, bear in mind the latency that you recorded when you ran `MultithreadedKafkaConsumerPerfTest` (40.46 seconds in the run performed for the tutorial). - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/tutorial-steps/perftest/run-parallel-consumer-perftest.sh %}
-+++++ - -While the performance test runs, take a few sips of the beverage... actually never mind. It's done: - -+++++ -
[main] INFO io.confluent.developer.ParallelConsumerPerfTest - Time to consume 10000 records: 1.78 seconds
-+++++ - -Your latency will surely be different from the `1.78 seconds` shown here. But, assuming you are running the test on reasonable hardware and you aren't running any -extremely noisy neighbors on your machine, it should be just a few seconds. diff --git a/confluent-parallel-consumer-application/kafka/markup/perftest/run-producer.adoc b/confluent-parallel-consumer-application/kafka/markup/perftest/run-producer.adoc deleted file mode 100644 index acd5dbe2..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/perftest/run-producer.adoc +++ /dev/null @@ -1,8 +0,0 @@ -Using the terminal window you opened in step three, run the following command to write 10,000 small dummy records to the input topic: - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/tutorial-steps/perftest/console-producer.sh %}
-+++++ - -Let's kick off this command and let it run. It'll take a few minutes to produce all 10,000 records. -In the meantime, let's continue with the tutorial. \ No newline at end of file diff --git a/confluent-parallel-consumer-application/kafka/markup/test/invoke-tests.adoc b/confluent-parallel-consumer-application/kafka/markup/test/invoke-tests.adoc deleted file mode 100644 index ddeca7f8..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/test/invoke-tests.adoc +++ /dev/null @@ -1,5 +0,0 @@ -Now run the test, which is as simple as: - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/tutorial-steps/test/invoke-tests.sh %}
-+++++ diff --git a/confluent-parallel-consumer-application/kafka/markup/test/make-consumer-application-test.adoc b/confluent-parallel-consumer-application/kafka/markup/test/make-consumer-application-test.adoc deleted file mode 100644 index 5742aa76..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/test/make-consumer-application-test.adoc +++ /dev/null @@ -1,12 +0,0 @@ - - -Testing a Confluent Parallel Consumer application is not too complicated thanks to the https://github.com/confluentinc/parallel-consumer/blob/master/parallel-consumer-core/src/test/java/io/confluent/csid/utils/LongPollingMockConsumer.java[LongPollingMockConsumer] that is based on Apache Kafka's https://javadoc.io/doc/org.apache.kafka/kafka-clients/latest/org/apache/kafka/clients/consumer/MockConsumer.html[MockConsumer]. Since the Confluent Parallel Consumer's https://github.com/confluentinc/parallel-consumer[codebase] is well tested, we don't need to use a _live_ consumer and Kafka broker to test our application. We can simply use a mock consumer to process some data you'll feed into it. - - -There is only one method in `KafkaConsumerApplicationTest` annotated with `@Test`, and that is `consumerTest()`. This method actually runs your `ParallelConsumerApplication` with the mock consumer. - - -Now create the following file at `src/test/java/io/confluent/developer/ParallelConsumerApplicationTest.java`: -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/src/test/java/io/confluent/developer/ParallelConsumerApplicationTest.java %}
-+++++ diff --git a/confluent-parallel-consumer-application/kafka/markup/test/make-consumer-record-handler-test.adoc b/confluent-parallel-consumer-application/kafka/markup/test/make-consumer-record-handler-test.adoc deleted file mode 100644 index 04c22219..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/test/make-consumer-record-handler-test.adoc +++ /dev/null @@ -1,8 +0,0 @@ - -Now let's build a test for the `ConsumerRecordHandler` implementation used in your application. Even though we have a test for the `ParallelConsumerApplication`, it's -important that you can test this helper class in isolation. - -Create the following file at `src/test/java/io/confluent/developer/FileWritingRecordHandlerTest.java`: -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/src/test/java/io/confluent/developer/FileWritingRecordHandlerTest.java %}
-+++++ diff --git a/confluent-parallel-consumer-application/kafka/markup/test/make-test-dir.adoc b/confluent-parallel-consumer-application/kafka/markup/test/make-test-dir.adoc deleted file mode 100644 index 65c58d30..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/test/make-test-dir.adoc +++ /dev/null @@ -1,5 +0,0 @@ -Create a directory for the tests to live in: - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/tutorial-steps/test/make-test-dir.sh %}
-+++++ diff --git a/confluent-parallel-consumer-application/kafka/markup/test/make-test-file.adoc b/confluent-parallel-consumer-application/kafka/markup/test/make-test-file.adoc deleted file mode 100644 index ee814ebe..00000000 --- a/confluent-parallel-consumer-application/kafka/markup/test/make-test-file.adoc +++ /dev/null @@ -1,5 +0,0 @@ -First, create a test file at `configuration/test.properties`: - -+++++ -
{% include_raw tutorials/confluent-parallel-consumer-application/kafka/code/configuration/test.properties %}
-+++++ diff --git a/confluent-parallel-consumer-application/kafka/settings.gradle b/confluent-parallel-consumer-application/kafka/settings.gradle new file mode 100644 index 00000000..f118aff1 --- /dev/null +++ b/confluent-parallel-consumer-application/kafka/settings.gradle @@ -0,0 +1,11 @@ +/* + * This file was generated by the Gradle 'init' task. + * + * The settings file is used to specify which projects to include in your build. + * For more detailed information on multi-project builds, please refer to https://docs.gradle.org/8.5/userguide/building_swift_projects.html in the Gradle documentation. + * This project uses @Incubating APIs which are subject to change. + */ + +rootProject.name = 'parallel-consumer' +include ':common' +project(':common').projectDir = file('../../common') \ No newline at end of file diff --git a/confluent-parallel-consumer-application/kafka/code/src/main/java/io/confluent/developer/ConsumerRecordHandler.java b/confluent-parallel-consumer-application/kafka/src/main/java/io/confluent/developer/ConsumerRecordHandler.java similarity index 100% rename from confluent-parallel-consumer-application/kafka/code/src/main/java/io/confluent/developer/ConsumerRecordHandler.java rename to confluent-parallel-consumer-application/kafka/src/main/java/io/confluent/developer/ConsumerRecordHandler.java diff --git a/confluent-parallel-consumer-application/kafka/code/src/main/java/io/confluent/developer/FileWritingRecordHandler.java b/confluent-parallel-consumer-application/kafka/src/main/java/io/confluent/developer/FileWritingRecordHandler.java similarity index 100% rename from confluent-parallel-consumer-application/kafka/code/src/main/java/io/confluent/developer/FileWritingRecordHandler.java rename to confluent-parallel-consumer-application/kafka/src/main/java/io/confluent/developer/FileWritingRecordHandler.java diff --git a/confluent-parallel-consumer-application/kafka/code/src/main/java/io/confluent/developer/MultithreadedKafkaConsumer.java b/confluent-parallel-consumer-application/kafka/src/main/java/io/confluent/developer/MultithreadedKafkaConsumer.java similarity index 100% rename from confluent-parallel-consumer-application/kafka/code/src/main/java/io/confluent/developer/MultithreadedKafkaConsumer.java rename to confluent-parallel-consumer-application/kafka/src/main/java/io/confluent/developer/MultithreadedKafkaConsumer.java diff --git a/confluent-parallel-consumer-application/kafka/code/src/main/java/io/confluent/developer/MultithreadedKafkaConsumerPerfTest.java b/confluent-parallel-consumer-application/kafka/src/main/java/io/confluent/developer/MultithreadedKafkaConsumerPerfTest.java similarity index 100% rename from confluent-parallel-consumer-application/kafka/code/src/main/java/io/confluent/developer/MultithreadedKafkaConsumerPerfTest.java rename to confluent-parallel-consumer-application/kafka/src/main/java/io/confluent/developer/MultithreadedKafkaConsumerPerfTest.java diff --git a/confluent-parallel-consumer-application/kafka/code/src/main/java/io/confluent/developer/ParallelConsumerApplication.java b/confluent-parallel-consumer-application/kafka/src/main/java/io/confluent/developer/ParallelConsumerApplication.java similarity index 100% rename from confluent-parallel-consumer-application/kafka/code/src/main/java/io/confluent/developer/ParallelConsumerApplication.java rename to confluent-parallel-consumer-application/kafka/src/main/java/io/confluent/developer/ParallelConsumerApplication.java diff --git a/confluent-parallel-consumer-application/kafka/code/src/main/java/io/confluent/developer/ParallelConsumerPerfTest.java b/confluent-parallel-consumer-application/kafka/src/main/java/io/confluent/developer/ParallelConsumerPerfTest.java similarity index 100% rename from confluent-parallel-consumer-application/kafka/code/src/main/java/io/confluent/developer/ParallelConsumerPerfTest.java rename to confluent-parallel-consumer-application/kafka/src/main/java/io/confluent/developer/ParallelConsumerPerfTest.java diff --git a/confluent-parallel-consumer-application/kafka/code/src/main/java/io/confluent/developer/PropertiesUtil.java b/confluent-parallel-consumer-application/kafka/src/main/java/io/confluent/developer/PropertiesUtil.java similarity index 100% rename from confluent-parallel-consumer-application/kafka/code/src/main/java/io/confluent/developer/PropertiesUtil.java rename to confluent-parallel-consumer-application/kafka/src/main/java/io/confluent/developer/PropertiesUtil.java diff --git a/confluent-parallel-consumer-application/kafka/code/src/main/java/io/confluent/developer/SleepingRecordHandler.java b/confluent-parallel-consumer-application/kafka/src/main/java/io/confluent/developer/SleepingRecordHandler.java similarity index 100% rename from confluent-parallel-consumer-application/kafka/code/src/main/java/io/confluent/developer/SleepingRecordHandler.java rename to confluent-parallel-consumer-application/kafka/src/main/java/io/confluent/developer/SleepingRecordHandler.java diff --git a/confluent-parallel-consumer-application/kafka/code/configuration/dev.properties b/confluent-parallel-consumer-application/kafka/src/main/resources/dev.properties similarity index 100% rename from confluent-parallel-consumer-application/kafka/code/configuration/dev.properties rename to confluent-parallel-consumer-application/kafka/src/main/resources/dev.properties diff --git a/confluent-parallel-consumer-application/kafka/code/configuration/perftest-kafka-consumer.properties b/confluent-parallel-consumer-application/kafka/src/main/resources/perftest-kafka-consumer.properties similarity index 100% rename from confluent-parallel-consumer-application/kafka/code/configuration/perftest-kafka-consumer.properties rename to confluent-parallel-consumer-application/kafka/src/main/resources/perftest-kafka-consumer.properties diff --git a/confluent-parallel-consumer-application/kafka/code/configuration/perftest-parallel-consumer.properties b/confluent-parallel-consumer-application/kafka/src/main/resources/perftest-parallel-consumer.properties similarity index 100% rename from confluent-parallel-consumer-application/kafka/code/configuration/perftest-parallel-consumer.properties rename to confluent-parallel-consumer-application/kafka/src/main/resources/perftest-parallel-consumer.properties diff --git a/confluent-parallel-consumer-application/kafka/code/src/test/java/io/confluent/developer/FileWritingRecordHandlerTest.java b/confluent-parallel-consumer-application/kafka/src/test/java/io/confluent/developer/FileWritingRecordHandlerTest.java similarity index 96% rename from confluent-parallel-consumer-application/kafka/code/src/test/java/io/confluent/developer/FileWritingRecordHandlerTest.java rename to confluent-parallel-consumer-application/kafka/src/test/java/io/confluent/developer/FileWritingRecordHandlerTest.java index dca24d41..14403510 100644 --- a/confluent-parallel-consumer-application/kafka/code/src/test/java/io/confluent/developer/FileWritingRecordHandlerTest.java +++ b/confluent-parallel-consumer-application/kafka/src/test/java/io/confluent/developer/FileWritingRecordHandlerTest.java @@ -2,7 +2,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.io.IOException; import java.nio.file.Files; diff --git a/confluent-parallel-consumer-application/kafka/code/src/test/java/io/confluent/developer/ParallelConsumerApplicationTest.java b/confluent-parallel-consumer-application/kafka/src/test/java/io/confluent/developer/ParallelConsumerApplicationTest.java similarity index 95% rename from confluent-parallel-consumer-application/kafka/code/src/test/java/io/confluent/developer/ParallelConsumerApplicationTest.java rename to confluent-parallel-consumer-application/kafka/src/test/java/io/confluent/developer/ParallelConsumerApplicationTest.java index 8f1ec072..6266f96f 100644 --- a/confluent-parallel-consumer-application/kafka/code/src/test/java/io/confluent/developer/ParallelConsumerApplicationTest.java +++ b/confluent-parallel-consumer-application/kafka/src/test/java/io/confluent/developer/ParallelConsumerApplicationTest.java @@ -7,7 +7,7 @@ import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.awaitility.Awaitility; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.nio.file.Files; import java.nio.file.Path; @@ -19,7 +19,7 @@ import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.KEY; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.*; /** @@ -27,7 +27,7 @@ */ public class ParallelConsumerApplicationTest { - private static final String TEST_CONFIG_FILE = "configuration/test.properties"; + private static final String TEST_CONFIG_FILE = "src/test/resources/test.properties"; /** * Test the app end to end with a few records consumable via a mock consumer. The app diff --git a/confluent-parallel-consumer-application/kafka/code/configuration/test.properties b/confluent-parallel-consumer-application/kafka/src/test/resources/test.properties similarity index 100% rename from confluent-parallel-consumer-application/kafka/code/configuration/test.properties rename to confluent-parallel-consumer-application/kafka/src/test/resources/test.properties diff --git a/settings.gradle b/settings.gradle index 8dddd8e7..1d55864e 100644 --- a/settings.gradle +++ b/settings.gradle @@ -15,6 +15,7 @@ include 'aggregating-minmax:kstreams' include 'aggregating-minmax:flinksql' include 'aggregating-sum:kstreams' include 'cogrouping-streams:kstreams' +include 'confluent-parallel-consumer-application:kafka' include 'common' include 'cumulating-windows:flinksql' include 'filtering:flinksql'