Skip to content

confluentinc/infoq-kafka-ksql

Repository files navigation

InfoQ - Apache Kafka and KSQL article - code samples

Overview

These are the code samples and snippets that were used in writing this InfoQ article. You can find a related suplemental article on the Confluent blog, "The Changing Face of ETL".

Startup

Run the stack:

docker-compose up -d

Deploy connectors etc:

./scripts/setup.sh

Check that the connectors are running:

curl -s "http://localhost:8083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort

Should show:

mysql-source-demo-customers      |  RUNNING  |  RUNNING
mysql-source-demo-customers-raw  |  RUNNING  |  RUNNING
curl -s "http://localhost:18083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:18083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort

Should show:

es_sink_ratings-with-customer-data  |  RUNNING  |  RUNNING
es_sink_unhappy_platinum_customers  |  RUNNING  |  RUNNING

Note that the AWS S3 sink requires further setup, so will initially be in the FAILED state.

KSQL

To run the KSQL CLI:

docker run --network infoq-kafka-ksql_default --interactive --tty \
     confluentinc/cp-ksql-cli:5.0.0-beta180702222458 \
     http://ksql-server:8088

Kafka Console Consumer

To run kafka-console-consumer within Docker setup, use:

docker-compose exec kafka \
    kafka-console-consumer \
    --bootstrap-server kafka:29092 \
    --topic UNHAPPY_PLATINUM_CUSTOMERS

Running the Python Slack/Kafka app

Virtualenv is useful for isolating your python libs etc

virtualenv slack-kafka
# Source a different file depending on your shell
source ./slack-kafka/bin/activate.fish

Install the libs needed

pip install slackclient confluent_kafka

Set your API token as an env var:

export SLACK_API_TOKEN=your-token-goes-here

Run the app:

python python_kafka_notify.py

You’ll see echo’d to stdout each message that’s processed and sent to Slack:

End of partition reached UNHAPPY_PLATINUM_CUSTOMERS/0
Received message: {"CLUB_STATUS":"platinum","EMAIL":"hcoda4@senate.gov","STARS":1,"MESSAGE":"Exceeded all my expectations. Thank you !"}

Sending message "`hcoda4@senate.gov` just left a bad review :disappointed:
> Exceeded all my expectations. Thank you !

_Please contact them immediately and see if we can fix the issue *right here, right now*_" to channel unhappy-customers
slack01

Setup Kibana

In a browser, go to Kibana at http://localhost:5601/app/kibana#/management/kibana/indices/ and set an index (any index) as default (click the Star in the top-right)

Import dashboard kibana_objects.json using the Import button at http://localhost:5601/app/kibana#/management/kibana/objects

Kibana indexes

Streaming data to S3 from Kafka

Create a file called aws_creds.txt in the same folder as the docker-compose.yml. This will be mounted and passed to Kafka Connect on startup. It should look like this:

[default]
aws_access_key_id=xxxxxxxxxxxxxxx
aws_secret_access_key=yyyyyyyyyyyyyy

Create the target bucket to write to and update scripts/create-s3-sink.sh accordingly with the region and bucket name.

Deploy the connector:

docker-compose exec kafka-connect-cp bash -c '/scripts/create-s3-sink.sh'

You should see the sink successfully running:

curl -s "http://localhost:18083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:18083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort
es_sink_ratings-with-customer-data  |  RUNNING  |  RUNNING
es_sink_unhappy_platinum_customers  |  RUNNING  |  RUNNING
s3-sink-ratings                     |  RUNNING  |  RUNNING

In S3 you should see data:

$ aws s3 ls rmoff-demo-ratings/topics/ratings-with-customer-data/partition=0/
2018-07-20 13:26:29        569 ratings-with-customer-data+0+0000000000.json
2018-07-20 13:26:31        615 ratings-with-customer-data+0+0000000003.json
2018-07-20 13:26:32        587 ratings-with-customer-data+0+0000000006.json
2018-07-20 13:26:33        592 ratings-with-customer-data+0+0000000009.json
s3 bucket ratings

Streaming data to Google Cloud Storage (GCS) from Kafka

Install the GCS connector:

docker-compose exec kafka-connect-cp bash -c 'confluent-hub install --no-prompt confluentinc/kafka-connect-gcs:5.0.0'
docker-compose restart kafka-connect-cp

Download your service account JSON credentials from GCP to a file called gcp_creds.json.This will be mounted and passed to Kafka Connect on startup.

Create the target bucket to write to and update scripts/create-gcs-sink.sh accordingly with the region and bucket name.

Deploy the connector:

docker-compose exec kafka-connect-cp bash -c '/scripts/create-gcs-sink.sh'

You should see the sink successfully running:

curl -s "http://localhost:18083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:18083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort
…
gcs-sink-ratings                     |  RUNNING  |  RUNNING

In GCS you should see data:

$ gsutil ls gs://rmoff-demo-ratings/topics/ratings-with-customer-data/partition=0/
gs://rmoff-demo-ratings/topics/ratings-with-customer-data/partition=0/ratings-with-customer-data+0+0000000000.json
gs://rmoff-demo-ratings/topics/ratings-with-customer-data/partition=0/ratings-with-customer-data+0+0000000064.json
gs://rmoff-demo-ratings/topics/ratings-with-customer-data/partition=0/ratings-with-customer-data+0+0000000128.json
gs://rmoff-demo-ratings/topics/ratings-with-customer-data/partition=0/ratings-with-customer-data+0+0000000192.json
gcs bucket ratings

You can also send CSV data to GCS for use in Data Studio - see scripts/create-gcs-sink-csv.sh. A neater way is probably to go via BigQuery though, since you don’t lose the schema that way.

gcp datastudio

Streaming data to Google BigQuery from Kafka

Install the BigQuery connector:

docker-compose exec kafka-connect-cp bash -c 'confluent-hub install --no-prompt wepay/kafka-connect-bigquery:1.1.0'
docker-compose restart kafka-connect-cp

Download your service account JSON credentials from GCP to a file called gcp_creds.json.This will be mounted and passed to Kafka Connect on startup.

Make sure the dataset exists, create it through the Web UI, or CLI (./google-cloud-sdk/bin/bq mk ksql_demo).

Update scripts/create-gbq-sink.sh with your BQ project name & dataset.

Deploy the connector:

docker-compose exec kafka-connect-cp bash -c '/scripts/create-gbq-sink.sh'

You should see the sink successfully running:

curl -s "http://localhost:18083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:18083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort
…
gbq-sink-ratings                     |  RUNNING  |  RUNNING
gbq01