Skip to content

AhmetFurkanDEMIR/Flink-Example

Repository files navigation

Flink Example

Project to create flink data architecture with products previously taken from Trendyol.com e-commerce site.

Dataset link: https://www.kaggle.com/datasets/ahmetfurkandemr/trendyol-product-comments

Apache Flink is an open-source, unified stream-processing and batch-processing framework developed by the Apache Software Foundation. The core of Apache Flink is a distributed streaming data-flow engine written in Java and Scala. Flink executes arbitrary dataflow programs in a data-parallel and pipelined (hence task parallel) manner. Flink's pipelined runtime system enables the execution of bulk/batch and stream processing programs. Furthermore, Flink's runtime supports the execution of iterative algorithms natively.

Run the project

First, it is necessary to start the docker services by running the start.sh file.

sudo bash start.sh

After running this command, all services will start. The services automatically connect and data is entered into the postgresql database, then the data is written to the kafka topic with debezium.

Flink Run

Flink Ui | 0.0.0.0:8081

Flink starts up automatically like other services, but we need to make some configurations in Flink.

First, download the jar file from the link below to the /flink-jar folder.

Link: https://github.com/AhmetFurkanDEMIR/Flink-Example/releases/download/jar/untitled-1.0-SNAPSHOT-jar-with-dependencies.jar

This .jar file reads the data in the kafka topic and simplifies this data, makes it useful and prints it on a new topic. source codes of the file.

Find the container address of Flink jobmanager with the following command.

docker ps

And again, enter the container with the following command.

docker exec -it xxxxxx /bin/bash
flink run -c org.example.App /flink-example/untitled-1.0-SNAPSHOT-jar-with-dependencies.jar

If the command runs successfully, you can see the job running via Flink Ui as shown in the picture below. And never close this window. Check link

After the job runs without errors, you need to insert the data in the sql file into the database to ensure data flow. SQL file

psql -h 0.0.0.0 -d postgres -U demir -f sql/insert_data.sql

After the data is inserted, you can see that a new topic named debezium_connector_table.public.trendyol_product-clear-out has been opened on Kafka.

After these operations, open a new terminal and enter the Flink Jobmanager container, which is the process you did before.

docker exec -it xxxxxx /bin/bash

Then run Flink SQL Client.

./bin/sql-client.sh -j /flink-example/flink-sql-connector-kafka-3.0.1-1.18.jar

If everything is OK, you should connect to a command interface like the one below.

Create a table from the kafka topic with the following command in Flink SQL.

CREATE TABLE trendyol_product_flink_table(
  `Product_Id` STRING,
  `Product_Name` STRING,
  `Product_Brand` STRING,
  `Product_Link` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'debezium_connector_table.public.trendyol_product-clear-out',
  'properties.bootstrap.servers' = 'kafka:9092',
  'properties.group.id' = 'demir-flink-sql',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

And finally, run an SQL query on this table and watch with pleasure as your data flows :)

select * from trendyol_product_flink_table;

Check the jobs and SQL commands you run from the interface.

Ahmet Furkan Demir