Skip to content

A project demonstrating the integration between Apache Flink and Confluent Platform.

Notifications You must be signed in to change notification settings

ableasdale/confluent-platform-with-flink

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

15 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Confluent Platform with Apache Flink

This project will set up the following components:

  • Confluent Platform 7.5.1

    • broker
    • control-center
    • restproxy
    • connect
  • Apache Flink 1.18

    • sql-client
    • jobmanager
    • taskmanager

When the project is running, you'll be able to access the following URLs in your browser:

And you have access to a number of ReST APIs:

Getting Started

Build the container instances and start them:

docker-compose build --pull sql-client
docker-compose build --pull jobmanager
docker-compose up

Confirm that the various endpoints are responding:

ReST Proxy:

curl -XGET http://localhost:8082/v3/clusters | jq

Connect:

curl -XGET http://localhost:8083/ | jq

Schema Registry:

curl -XGET http://localhost:8084/config | jq

Broker Metadata:

curl -XGET http://localhost:8090/v1/metadata/id | jq

Generate some data

After the containers have started, we can start by getting data loaded into Kafka; to do this, let's use datagen to create some pageviews:

curl -i -X PUT http://localhost:8083/connectors/datagen_local_01/config \
     -H "Content-Type: application/json" \
     -d '{
            "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "kafka.topic": "pageviews",
            "quickstart": "pageviews",
            "max.interval": 1000,
            "iterations": 10000000,
            "tasks.max": "1"
        }'

As soon as that is done, let's run the consumer to inspect the topic:

docker-compose exec broker kafka-console-consumer --topic pageviews --bootstrap-server broker:29092 --from-beginning --max-messages 10

The messages are not looking quite right; let's use the kafka-avro-console-consumer instead:

docker-compose exec connect kafka-avro-console-consumer \
 --bootstrap-server broker:29092 \
 --property schema.registry.url=http://schemaregistry:8084 \
 --topic pageviews \
 --property print.key=true \
 --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
 --property key.separator=" : " \
 --max-messages 10

That works nicely... Let's see how Flink handles Avro:

docker-compose exec sql-client sql-client.sh

To create the pageviews table, you can use this:

CREATE TABLE pageviews (
    viewtime BIGINT,
    userid STRING,
    pageid STRING,
    `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
    `proc_time` AS PROCTIME(),
    WATERMARK FOR `ts` AS `ts` 
) WITH (
    'connector' = 'kafka', 
    'topic' = 'pageviews', 
    'scan.startup.mode' = 'earliest-offset', 
    'properties.bootstrap.servers' = 'broker:29092', 
    'value.format' = 'avro-confluent',
    'value.avro-confluent.schema-registry.url' = 'http://schemaregistry:8084'
);

Now we can create a SELECT statement using the pageviews topic as the source:

SELECT * FROM pageviews;

We get some output! :)

Set up a join

We're going to use user_id for this join

curl -i -X PUT http://localhost:8083/connectors/datagen_users/config \
     -H "Content-Type: application/json" \
     -d '{
            "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
            "kafka.topic": "users",
            "quickstart": "users",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "max.interval": 1000,
            "iterations": 10000000,
            "tasks.max": "1"
        }'

docker-compose exec sql-client sql-client.sh

CREATE TABLE users (
    registertime BIGINT,
    userid STRING,
    regionid STRING,
    gender STRING,
    `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
    `proc_time` AS PROCTIME(),
    WATERMARK FOR `ts` AS `ts` 
) WITH (
    'connector' = 'kafka', 
    'topic' = 'users', 
    'scan.startup.mode' = 'earliest-offset', 
    'properties.bootstrap.servers' = 'broker:29092', 
    'value.format' = 'avro-confluent',
    'value.avro-confluent.schema-registry.url' = 'http://schemaregistry:8084'
);

You should see:

[INFO] Execute statement succeed.
SELECT * FROM users;

Let's create the stock-trades topic and load some data into it:

curl -i -X PUT http://localhost:8083/connectors/datagen_stock/config \
     -H "Content-Type: application/json" \
     -d '{
            "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
            "kafka.topic": "stock-trades",
            "quickstart": "Stock_Trades",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "max.interval": 100,
            "iterations": 10000000,
            "tasks.max": "1"
        }'

docker-compose exec sql-client sql-client.sh

CREATE TABLE stock_trades (
    side STRING,
    quantity INT,
    symbol STRING,
    price INT,
    account STRING,
    userid STRING,
    regionid STRING,
    `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
    `proc_time` AS PROCTIME(),
    WATERMARK FOR `ts` AS `ts` 
) WITH (
    'connector' = 'kafka', 
    'topic' = 'stock-trades', 
    'scan.startup.mode' = 'earliest-offset', 
    'properties.bootstrap.servers' = 'broker:29092', 
    'value.format' = 'avro-confluent',
    'value.avro-confluent.schema-registry.url' = 'http://schemaregistry:8084'
);
SELECT * FROM stock_trades;

Let's now attempt a join!

SELECT * FROM stock_trades
  INNER JOIN users
  ON stock_trades.userid = users.userid;

Seems to be working as expected.


### Notes below

OR:

docker-compose build --pull sql-client

Start the containers:

docker-compose up

Make sure there are some messages in the user_behavior topic:

docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic user_behavior --bootstrap-server kafka:9094 --from-beginning --max-messages 10'

If you see messages in the Kafka topic, the next thing to do is to access the topic from within Flink:

docker-compose exec sql-client ./sql-client.sh

You should see:

Reading default environment from: file:/opt/flink/conf/sql-client-conf.yaml

You'll start to see results paging through as a result of the SELECT statement.

docker-compose exec sql-client bash

cd $SQL_CLIENT_HOME/lib
cp flink-sql-connector-kafka-3.0.0-1.17.jar ../../flink/lib/
cd ../../flink/lib/
chown flink:flink flink-sql-connector-kafka-3.0.0-1.17.jar
exit
docker-compose exec broker kafka-console-consumer --topic user_behavior --bootstrap-server broker:29092 --from-beginning --max-messages 10

Now let's connect to the Flink SQL Client:

docker-compose exec sql-client sql-client.sh

About

A project demonstrating the integration between Apache Flink and Confluent Platform.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published