This repository contains the code for the book Stream Processing: Hands-on with Apache Flink.
In order to run the code samples we will need a Kafka and Flink cluster up and running. You can also run the Flink examples from within your favorite IDE in which case you don't need a Flink Cluster.
If you want to run the examples inside a Flink Cluster run to start the Pulsar and Flink clusters.
docker-compose up
When the cluster is up and running successfully run the following command for redpanda:
./redpanda-setup.sh
or this command for kafka setup
./kafka-setup.sh
CREATE FUNCTION maskfn AS 'io.streamingledger.udfs.MaskingFn' LANGUAGE JAVA USING JAR '/opt/flink/jars/spf-0.1.0.jar';
CREATE FUNCTION splitfn AS 'io.streamingledger.udfs.SplitFn' LANGUAGE JAVA USING JAR '/opt/flink/jars/spf-0.1.0.jar';
CREATE FUNCTION lookup AS 'io.streamingledger.udfs.AsyncLookupFn' LANGUAGE JAVA USING JAR '/opt/flink/jars/spf-0.1.0.jar';
CREATE TEMPORARY VIEW sample AS
SELECT *
FROM transactions
LIMIT 10;
SELECT transactionId, maskfn(UUID()) AS maskedCN FROM sample;
SELECT * FROM transactions, LATERAL TABLE(splitfn(operation));
SELECT
transactionId,
serviceResponse,
responseTime
FROM sample, LATERAL TABLE(lookup(transactionId));
- Package the application and create an executable jar file
mvn clan package
-
Copy it under the jar files to be included in the custom Flink images
-
Start the cluster to build the new images by running
docker-compose up
- Deploy the flink job
docker exec -it jobmanager ./bin/flink run \
--class io.streamingledger.datastream.BufferingStream \
jars/spf-0.1.0.jar
The transactions table:
CREATE TABLE transactions (
transactionId STRING,
accountId STRING,
customerId STRING,
eventTime BIGINT,
eventTime_ltz AS TO_TIMESTAMP_LTZ(eventTime, 3),
eventTimeFormatted STRING,
type STRING,
operation STRING,
amount DOUBLE,
balance DOUBLE,
WATERMARK FOR eventTime_ltz AS eventTime_ltz
) WITH (
'connector' = 'kafka',
'topic' = 'transactions',
'properties.bootstrap.servers' = 'redpanda:9092',
'properties.group.id' = 'group.transactions',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
);
The customers table:
CREATE TABLE customers (
customerId STRING,
sex STRING,
social STRING,
fullName STRING,
phone STRING,
email STRING,
address1 STRING,
address2 STRING,
city STRING,
state STRING,
zipcode STRING,
districtId STRING,
birthDate STRING,
updateTime BIGINT,
eventTime_ltz AS TO_TIMESTAMP_LTZ(updateTime, 3),
WATERMARK FOR eventTime_ltz AS eventTime_ltz,
PRIMARY KEY (customerId) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'customers',
'properties.bootstrap.servers' = 'redpanda:9092',
'key.format' = 'raw',
'value.format' = 'json',
'properties.group.id' = 'group.customers'
);
The account tables:
CREATE TABLE accounts (
accountId STRING,
districtId INT,
frequency STRING,
creationDate STRING,
updateTime BIGINT,
eventTime_ltz AS TO_TIMESTAMP_LTZ(updateTime, 3),
WATERMARK FOR eventTime_ltz AS eventTime_ltz,
PRIMARY KEY (accountId) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'accounts',
'properties.bootstrap.servers' = 'redpanda:9092',
'key.format' = 'raw',
'value.format' = 'json',
'properties.group.id' = 'group.accounts'
);
To load data into the systems run StateProducer
and TransactionsProducer
with your IntelliJ.
You can use the Flink SQL client by using the following command:
docker exec -it jobmanager ./bin/sql-client.sh
-- create database
CREATE DATABASE bank;
USE bank;
-- see above or queries package for the sql