Skip to content
Permalink
Browse files
[FLINK-19145][walkthroughs] Add PyFlink-walkthrough to Flink playground
  • Loading branch information
alpinegizmo committed Oct 12, 2020
1 parent 8b1cadb commit a0c07e99dc9c8f621dcdee8f2b8aa534dbf0f94e
Showing 15 changed files with 548 additions and 0 deletions.
@@ -15,6 +15,8 @@ Flink job. The playground is presented in detail in

* The **Table Walkthrough** (in the `table-walkthrough` folder) shows to use the Table API to build an analytics pipeline that reads streaming data from Kafka and writes results to MySQL, along with a real-time dashboard in Grafana. The walkthrough is presented in detail in ["Real Time Reporting with the Table API"](https://ci.apache.org/projects/flink/flink-docs-release-1.11/try-flink/table_api.html), which is part of the _Try Flink_ section of the Flink documentation.

* The **PyFlink Walkthrough** (in the `pyflink-walkthrough` folder) provides a complete example that uses the Python API, and guides you through the steps needed to run and manage Pyflink Jobs. The pipeline used in this walkthrough reads data from Kafka, performs aggregations, and writes results to Elasticsearch that are visualized with Kibana. This walkthrough is presented in detail in the [pyflink-walkthrough README](pyflink-walkthrough).

## About

Apache Flink is an open source project of The Apache Software Foundation (ASF).
@@ -0,0 +1,48 @@
###############################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################

###############################################################################
# Build PyFlink Playground Image
###############################################################################

FROM flink:1.11.0-scala_2.11
ARG FLINK_VERSION=1.11.1

# Install pyflink
RUN set -ex; \
apt-get update; \
apt-get -y install python3; \
apt-get -y install python3-pip; \
apt-get -y install python3-dev; \
ln -s /usr/bin/python3 /usr/bin/python; \
ln -s /usr/bin/pip3 /usr/bin/pip; \
apt-get update; \
python -m pip install --upgrade pip; \
pip install apache-flink==1.11.1; \
pip install kafka-python;


# Download connector libraries
RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-json/${FLINK_VERSION}/flink-json-${FLINK_VERSION}.jar; \
wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/${FLINK_VERSION}/flink-sql-connector-kafka_2.11-${FLINK_VERSION}.jar; \
wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/${FLINK_VERSION}/flink-sql-connector-elasticsearch7_2.11-${FLINK_VERSION}.jar;


RUN echo "taskmanager.memory.jvm-metaspace.size: 512m" >> /opt/flink/conf/flink-conf.yaml;

WORKDIR /opt/flink
@@ -0,0 +1,156 @@
# pyflink-walkthrough

## Background

In this playground, you will learn how to build and run an end-to-end PyFlink pipeline for data analytics, covering the following steps:

* Reading data from a Kafka source;
* Creating data using a [UDF](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/udfs/python_udfs.html);
* Performing a simple aggregation over the source data;
* Writing the results to Elasticsearch and visualizing them in Kibana.

The environment is based on Docker Compose, so the only requirement is that you have [Docker](https://docs.docker.com/get-docker/)
installed in your machine.

### Kafka
You will be using Kafka to store sample input data about payment transactions. A simple data generator [generate_source_data.py](generator/generate_source_data.py) is provided to
continuously write new records to the `payment_msg` Kafka topic. Each record is structured as follows:

`{"createTime": "2020-08-12 06:29:02", "orderId": 1597213797, "payAmount": 28306.44976403719, "payPlatform": 0, "provinceId": 4}`

* `createTime`: The creation time of the transaction.
* `orderId`: The id of the current transaction.
* `payAmount`: The amount being paid with this transaction.
* `payPlatform`: The platform used to create this payment: pc or mobile.
* `provinceId`: The id of the province for the user.

### PyFlink

The transaction data will be processed with PyFlink using the Python script [payment_msg_processing.py](payment_msg_proccessing.py).
This script will first map the `provinceId` in the input records to its corresponding province name using a Python UDF,
and then compute the sum of the transaction amounts for each province. The following code snippet will explain the main processing logic in [payment_msg_processing.py](payment_msg_proccessing.py).

```python
t_env.from_path("payment_msg") \ # Get the created Kafka source table named payment_msg
.select("province_id_to_name(provinceId) as province, payAmount") \ # Select the provinceId and payAmount field and transform the provinceId to province name by a UDF
.group_by("province") \ # Group the selected fields by province
.select("province, sum(payAmount) as pay_amount") \ # Sum up payAmount for each province
.execute_insert("es_sink") # Write the result into ElaticSearch Sink
```


### ElasticSearch

ElasticSearch is used to store the results and to provide an efficient query service.

### Kibana

Kibana is an open source data visualization dashboard for ElasticSearch. You will use it to visualize
the total transaction paymentAmount and proportion for each provinces in this PyFlink pipeline through a dashboard.

## Setup

As mentioned, the environment for this walkthrough is based on Docker Compose; It uses a custom image
to spin up Flink (JobManager+TaskManager), Kafka+Zookeeper, the data generator, and Elasticsearch+Kibana containers.

You can find the [docker-compose.yaml](docker-compose.yml) file of the pyflink-walkthrough in the `pyflink-walkthrough` root directory.

### Building the Docker image

First, build the Docker image by running:

```bash
$ cd pyflink-walkthrough
$ docker-compose build
```

### Starting the Playground

Once the Docker image build is complete, run the following command to start the playground:

```bash
$ docker-compose up -d
```

One way of checking if the playground was successfully started is to access some of the services that are exposed:

1. visiting Flink Web UI [http://localhost:8081](http://localhost:8081).
2. visiting Elasticsearch [http://localhost:9200](http://localhost:9200).
3. visiting Kibana [http://localhost:5601](http://localhost:5601).

**Note:** you may need to wait around 1 minute before all the services come up.

### Checking the Kafka service

You can use the following command to read data from the Kafka topic and check whether it's generated correctly:
```shell script
$ docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic payment_msg
{"createTime":"2020-07-27 09:25:32.77","orderId":1595841867217,"payAmount":7732.44,"payPlatform":0,"provinceId":3}
{"createTime":"2020-07-27 09:25:33.231","orderId":1595841867218,"payAmount":75774.05,"payPlatform":0,"provinceId":3}
{"createTime":"2020-07-27 09:25:33.72","orderId":1595841867219,"payAmount":65908.55,"payPlatform":0,"provinceId":0}
{"createTime":"2020-07-27 09:25:34.216","orderId":1595841867220,"payAmount":15341.11,"payPlatform":0,"provinceId":1}
{"createTime":"2020-07-27 09:25:34.698","orderId":1595841867221,"payAmount":37504.42,"payPlatform":0,"provinceId":0}
```
You can also create a new topic by executing the following command:
```shell script
$ docker-compose exec kafka kafka-topics.sh --bootstrap-server kafka:9092 --create --topic <YOUR-TOPIC-NAME> --partitions 8 --replication-factor 1
```

## Running the PyFlink job

1. Submit the PyFlink job.
```shell script
$ docker-compose exec jobmanager ./bin/flink run -py /opt/pyflink-walkthrough/payment_msg_proccessing.py -d
```
Navigate to the [Flink Web UI](http://localhost:8081) after the job is submitted successfully. There should be a job in the running job list.
Click the job to get more details. You should see that the `StreamGraph` of the `payment_msg_proccessing` consists of two nodes, each with a parallelism of 1.
There is also a table in the bottom of the page that shows some metrics for each node (e.g. bytes received/sent, records received/sent). Note that Flink's metrics only
report bytes and records and records communicated within the Flink cluster, and so will always report 0 bytes and 0 records received by sources, and 0 bytes and 0 records
sent to sinks - so don't be confused that noting is reported as being read from Kafka, or written to Elasticsearch.

![image](pic/submitted.png)

![image](pic/detail.png)

2. Navigate to the [Kibana UI](http://localhost:5601), open the menu list by clicking the menu button in the upper left corner, then choose the Dashboard item to turn to the dashboard page and choose the pre-created dashboard `payment_dashboard`.
There will be a vertical bar chart and a pie chart demonstrating the total amount and the proportion of each province.

![image](pic/dash_board.png)

![image](pic/chart.png)

3. Stop the PyFlink job:

Visit the Flink Web UI at [http://localhost:8081/#/overview](http://localhost:8081/#/overview) , select the job, and click `Cancel Job` in the upper right corner.

![image](pic/cancel.png)

### Stopping the Playground

To stop the playground, run the following command:

```bash
$ docker-compose down
```

## Further Explorations

If you would like to explore this example more deeply, you can edit [payment_msg_processing.py](payment_msg_proccessing.py)
or create new PyFlink projects that perform more complex processing. You can do this locally under
the `pyflink-walkthrough` directory, since it is mounted on the `jobmanager` docker container.

Ideas:
* Add your own Kafka source table;
* Create a new index for the Elasticsearch sink;
* Count the number of transactions, grouped by a 1 minute [tumbling window](https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#tumble-tumbling-windows) and `payPlatform`.

After making a modification, you can submit the new job by executing the same command mentioned at
[Running the PyFlink Job](#running-the-pyflink-job)
```shell script
$ docker-compose exec jobmanager ./bin/flink run -py /opt/pyflink-walkthrough/payment_msg_proccessing.py -d
```

Furthermore, you can also [create new Kibana dashboards](https://www.elastic.co/guide/en/kibana/7.8/dashboard-create-new-dashboard.html)
to visualize other aspects of the data in Elasticsearch.
@@ -0,0 +1,96 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

version: '2.1'
services:
jobmanager:
build: .
image: pyflink/pyflink:1.11.0-scala_2.11
volumes:
- .:/opt/pyflink-walkthrough
hostname: "jobmanager"
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: pyflink/pyflink:1.11.0-scala_2.11
volumes:
- .:/opt/pyflink-walkthrough
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- jobmanager:jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
zookeeper:
image: wurstmeister/zookeeper:3.4.6
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:2.12-2.2.1
ports:
- "9092"
depends_on:
- zookeeper
environment:
HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "payment_msg:1:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
generator:
build: generator
image: generator:1.0
depends_on:
- kafka
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.8.0
environment:
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- discovery.type=single-node
ports:
- "9200:9200"
- "9300:9300"
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
kibana:
image: docker.elastic.co/kibana/kibana:7.8.0
ports:
- "5601:5601"
depends_on:
- elasticsearch
load-kibana-dashboard:
build: ./kibana
command: ['/bin/bash', '-c', 'cat /tmp/load/load_ndjson.sh | tr -d "\r" | bash']
depends_on:
- kibana
@@ -0,0 +1,27 @@
###############################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################

FROM python:3.7-alpine

RUN set -ex; \
python -m pip install --upgrade pip; \
pip install kafka-python;

ADD generate_source_data.py /

CMD ["python", "-u", "./generate_source_data.py"]

0 comments on commit a0c07e9

Please sign in to comment.