# PyFlink Installation

### By Fatty

### Version 2023.11.15

## Anaconda3 Installation

Download the Anaconda3 package from TUNA first.

```Bash
wget https://mirrors.tuna.tsinghua.edu.cn/anaconda/archive/Anaconda3-2023.09-0-Linux-x86_64.sh
sh Anaconda3-2023.09-0-Linux-x86_64.sh
```
During the installation, please use the default settings.
It should be installed at `~/anaconda3`.

## Python 3.9 Installation

Python 3.9 installed by conda will be easy and reliable.

```Bash
conda create -n pyflink_39 python=3.9
conda activate pyflink_39
```

## Apache-Flink Installation

Then install the apache-flink package with pip.

```Bash
pip install apache-flink
```

## Some Tests

The following code comes from the official [documents version 1.18](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/python/datastream_tutorial/). 
And there might be some tiny modifications.

In [10]:
# Test with a Flink Python DataStream API Program
# The following code comes from the official [documents version 1.18](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/python/datastream_tutorial/).
# Save the code below as `DataStream_API_word_count.py`.
# import os
# # Get current absolute path
# current_file_path = os.path.abspath(__file__)
# # Get current dir path
# current_dir_path = os.path.dirname(current_file_path)
# # Change into current dir path
# os.chdir(current_dir_path)

import argparse
import logging
import sys
import numpy as np 
import pandas as pd
from pyflink.table import StreamTableEnvironment
from pyflink.common import WatermarkStrategy, Encoder, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors.file_system import FileSource, StreamFormat, FileSink, OutputFileConfig, RollingPolicy


word_count_data = ["To be, or not to be,--that is the question:--",
                   "Whether 'tis nobler in the mind to suffer",
                   "The slings and arrows of outrageous fortune",
                   "Or to take arms against a sea of troubles,",
                   "And by opposing end them?--To die,--to sleep,--",
                   "No more; and by a sleep to say we end",
                   "The heartache, and the thousand natural shocks",
                   "That flesh is heir to,--'tis a consummation",
                   "Devoutly to be wish'd. To die,--to sleep;--",
                   "To sleep! perchance to dream:--ay, there's the rub;",
                   "For in that sleep of death what dreams may come,",
                   "When we have shuffled off this mortal coil,",
                   "Must give us pause: there's the respect",
                   "That makes calamity of so long life;",
                   "For who would bear the whips and scorns of time,",
                   "The oppressor's wrong, the proud man's contumely,",
                   "The pangs of despis'd love, the law's delay,",
                   "The insolence of office, and the spurns",
                   "That patient merit of the unworthy takes,",
                   "When he himself might his quietus make",
                   "With a bare bodkin? who would these fardels bear,",
                   "To grunt and sweat under a weary life,",
                   "But that the dread of something after death,--",
                   "The undiscover'd country, from whose bourn",
                   "No traveller returns,--puzzles the will,",
                   "And makes us rather bear those ills we have",
                   "Than fly to others that we know not of?",
                   "Thus conscience does make cowards of us all;",
                   "And thus the native hue of resolution",
                   "Is sicklied o'er with the pale cast of thought;",
                   "And enterprises of great pith and moment,",
                   "With this regard, their currents turn awry,",
                   "And lose the name of action.--Soft you now!",
                   "The fair Ophelia!--Nymph, in thy orisons",
                   "Be all my sins remember'd."]


def word_count(input_path, output_path):
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.BATCH)
    # write all the data to one file
    env.set_parallelism(1)

    # define the source
    if input_path is not None:
        ds = env.from_source(
            source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),
                                                       input_path)
                             .process_static_file_set().build(),
            watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
            source_name="file_source"
        )
    else:
        print("Executing word_count example with default input data set.")
        print("Use --input to specify file input.")
        ds = env.from_collection(word_count_data)

    def split(line):
        yield from line.split()

    # compute word count
    ds = ds.flat_map(split) \
        .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
        .key_by(lambda i: i[0]) \
        .reduce(lambda i, j: (i[0], i[1] + j[1]))

    # define the sink
    if output_path is not None:
        ds.sink_to(
            sink=FileSink.for_row_format(
                base_path=output_path,
                encoder=Encoder.simple_string_encoder())
            .with_output_file_config(
                OutputFileConfig.builder()
                .with_part_prefix("prefix")
                .with_part_suffix(".ext")
                .build())
            .with_rolling_policy(RollingPolicy.default_rolling_policy())
            .build()
        )
    else:
        print("Printing result to stdout. Use --output to specify output path.")
        # ds.print()

        # Step 1: Create a `StreamTableEnvironment` object.
        t_env = StreamTableEnvironment.create(env)

        # Step 2: Convert the `DataStream` object to a `Table` object.
        table = t_env.from_data_stream(ds)

        # Step 3: Convert the `Table` object to a `pandas` dataframe.
        df = table.to_pandas()
        
        df.to_csv('./DataStream_API_word_count.csv', index=False)
        print(df)

    # submit for execution
    env.execute()


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input',
        required=False,
        help='Input file to process.')
    parser.add_argument(
        '--output',
        dest='output',
        required=False,
        help='Output file to write results to.')

    argv = sys.argv[1:]
    known_args, _ = parser.parse_known_args(argv)

    word_count(known_args.input, known_args.output)


NameError: name '__file__' is not defined

In [None]:
# Test with a Flink Python Table API Program
# The following code comes from the official [documents version 1.18](https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/dev/python/table_api_tutorial/).
# Save the code below as `Table_API_word_count.py`.

import argparse
import logging
import sys

from pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,
                           DataTypes, FormatDescriptor)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtf

word_count_data = ["To be, or not to be,--that is the question:--",
                   "Whether 'tis nobler in the mind to suffer",
                   "The slings and arrows of outrageous fortune",
                   "Or to take arms against a sea of troubles,",
                   "And by opposing end them?--To die,--to sleep,--",
                   "No more; and by a sleep to say we end",
                   "The heartache, and the thousand natural shocks",
                   "That flesh is heir to,--'tis a consummation",
                   "Devoutly to be wish'd. To die,--to sleep;--",
                   "To sleep! perchance to dream:--ay, there's the rub;",
                   "For in that sleep of death what dreams may come,",
                   "When we have shuffled off this mortal coil,",
                   "Must give us pause: there's the respect",
                   "That makes calamity of so long life;",
                   "For who would bear the whips and scorns of time,",
                   "The oppressor's wrong, the proud man's contumely,",
                   "The pangs of despis'd love, the law's delay,",
                   "The insolence of office, and the spurns",
                   "That patient merit of the unworthy takes,",
                   "When he himself might his quietus make",
                   "With a bare bodkin? who would these fardels bear,",
                   "To grunt and sweat under a weary life,",
                   "But that the dread of something after death,--",
                   "The undiscover'd country, from whose bourn",
                   "No traveller returns,--puzzles the will,",
                   "And makes us rather bear those ills we have",
                   "Than fly to others that we know not of?",
                   "Thus conscience does make cowards of us all;",
                   "And thus the native hue of resolution",
                   "Is sicklied o'er with the pale cast of thought;",
                   "And enterprises of great pith and moment,",
                   "With this regard, their currents turn awry,",
                   "And lose the name of action.--Soft you now!",
                   "The fair Ophelia!--Nymph, in thy orisons",
                   "Be all my sins remember'd."]


def word_count(input_path, output_path):
    t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
    # write all the data to one file
    t_env.get_config().set("parallelism.default", "1")

    # define the source
    if input_path is not None:
        t_env.create_temporary_table(
            'source',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        .column('word', DataTypes.STRING())
                        .build())
                .option('path', input_path)
                .format('csv')
                .build())
        tab = t_env.from_path('source')
    else:
        print("Executing word_count example with default input data set.")
        print("Use --input to specify file input.")
        tab = t_env.from_elements(map(lambda i: (i,), word_count_data),
                                  DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())]))

    # define the sink
    if output_path is not None:
        t_env.create_temporary_table(
            'sink',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        .column('word', DataTypes.STRING())
                        .column('count', DataTypes.BIGINT())
                        .build())
                .option('path', output_path)
                .format(FormatDescriptor.for_format('canal-json')
                        .build())
                .build())
    else:
        print("Printing result to stdout. Use --output to specify output path.")
        t_env.create_temporary_table(
            'sink',
            TableDescriptor.for_connector('print')
                .schema(Schema.new_builder()
                        .column('word', DataTypes.STRING())
                        .column('count', DataTypes.BIGINT())
                        .build())
                .build())

    @udtf(result_types=[DataTypes.STRING()])
    def split(line: Row):
        for s in line[0].split():
            yield Row(s)

    # compute word count
    tab.flat_map(split).alias('word') \
        .group_by(col('word')) \
        .select(col('word'), lit(1).count) \
        .execute_insert('sink') \
        .wait()
        
    # remove .wait if submitting to a remote cluster, refer to
    # https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster
    # for more details
    # Convert the `Table` object to a `pandas` dataframe.
    df = tab.to_pandas()
    df.to_csv('./Table_API_word_count.csv', index=False)
    print(df)

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input',
        required=False,
        help='Input file to process.')
    parser.add_argument(
        '--output',
        dest='output',
        required=False,
        help='Output file to write results to.')

    argv = sys.argv[1:]
    known_args, _ = parser.parse_known_args(argv)

    word_count(known_args.input, known_args.output)

Executing word_count example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.


Use Docker to build a local Kafka cluster.

1. Install Docker and Docker Compose:
```Bash
sudo apt install Docker Docker-compose
```
2. Create a local `docker-compose.yml` file with following contents:

```yaml
version: '3'
services:
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: 'bitnami/kafka:latest'
    ports:
      - '9092:9092'
    environment:
      - KAFKA_ADVERTISED_HOST_NAME=localhost
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
      - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
      - KAFKA_CREATE_TOPICS=test:1:1
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper
```

3. Locate the `docker-compose.yml` and run the following command:

```Bash
docker-compose up -d
```

This will run a local Kafka cluster containing a Zookeeper Instance and a Kafka Instance, which will run on port 9092 of localhost.

Next time after downloading those needed files, you can run the following command to start the Kafka cluster:
```Bash
docker run -d -p 9092:9092 \
-e ALLOW_ANONYMOUS_LOGIN=yes \
-e KAFKA_ADVERTISED_HOST_NAME=localhost \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-e KAFKA_CREATE_TOPICS=test:1:1 \
-e ALLOW_PLAINTEXT_LISTENER=yes \
bitnami/kafka:latest
```

In [None]:
#Following code uses kafka-python module to send data to a local Kafka cluster. 
#This code opens a text file named `hamlet.txt` and sends its contents as a stream to a specified Kafka Topic `hamlet`:
#Following code uses kafka-python module to send data to a local Kafka cluster. 
#This code opens a text file named `hamlet.txt` and sends its contents as a stream to a specified Kafka Topic `hamlet`:

from kafka import KafkaProducer
import time
import os

def send_file_to_kafka(file_path: str, topic: str, bootstrap_servers: str):
    # Create a KafkaProducer object with the given bootstrap servers
    producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
    # Get the size of the file in bytes
    file_size = os.path.getsize(file_path)
    # Open the file in read binary mode
    while True:
        with open(file_path, "rb") as f:
            # Read the file in chunks of 1024 bytes
            while True:
                data = f.read(1024)
                # If no data is read, break out of the loop
                if not data:
                    break
                # Send the data to the given topic
                producer.send(topic, data)
                # Print the number of bytes sent to the topic
                bytes_sent = len(data)
                print(f"Sent {bytes_sent} bytes to Kafka topic {topic}")
                # Calculate the percentage of the file sent
                percent_sent = (f.tell() / file_size) * 100
                # Print the percentage of the file sent
                print(f"{percent_sent:.2f}% of the file sent")
                # Wait for 3 seconds
                time.sleep(3)
        # Wait for user input to continue or exit
        user_input = input("Press 'c' to continue sending the file or 'q' to quit: ")
        if user_input == "q":
            break


# Call the function with the file path, topic, and bootstrap servers
send_file_to_kafka("./hamlet.txt",  "hamlet", "localhost:9092")

# In this code, the send_file_to_kafka function accepts three parameters: file_path, topic, and bootstrap_servers. 
# file_path is the path to the local file, topic is the Kafka topic to which the data should be sent, and bootstrap_servers is the address of the Kafka cluster. 
# The function uses a with statement to open the file, reads its contents, and sends them as streaming data to the specified Kafka topic. 
# During the sending process, it prints out the transmission progress and uses the time.sleep method to pause for 0.1 seconds to control the sending rate. 

# Install Kafka on Ubuntu 22.04:

```Bash
sudo apt update
sudo apt install openjdk-11-jdk -y
wget https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
tar xvf kafka_2.13-3.5.1.tgz
sudo nano /etc/systemd/system/zookeeper.service
sudo nano /etc/systemd/system/kafka.service
sudo systemctl start zookeeper
sudo systemctl start kafka
```

First, download the Kafka connector jar package from the official Apache Flink website or from the Maven repository 1.
Next, copy the downloaded jar file to the lib directory of PyFlink. The path of the lib directory is generally /usr/local/lib/python3.8.2/site-packages/pyflink/lib 2.
Finally, you can use the Kafka connector in PyFlink by importing the required classes and creating an instance of the FlinkKafkaConsumer class 2.


In [None]:
# A easy way to show the stream with kafka-python

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "hamlet",
    bootstrap_servers=["localhost:9092"],
    auto_offset_reset="earliest",
    enable_auto_commit=True,
    group_id="my-group",
    value_deserializer=lambda x: x.decode("utf-8")
)

for message in consumer:
    # Print the number of bytes received from the Kafka topic
    print(f"Received {len(message.value)} bytes from Kafka topic {message.topic}")
    # Print the contents of the message
    print(f"{message.value}")


# In the above code, we use the `KafkaConsumer` class to create a consumer object. 
# We pass `hamlet` as the topic name to the constructor. 
# We also pass `localhost:9092` as the address of the bootstrap server. 
# We use the `value_deserializer` parameter to decode the messages received from the Kafka topic. 
# We use a `for` loop to iterate over the consumer object and use the `print` function to print the contents of the message. 


In [None]:
# A easy way to show the stream with pyflink
# The word counting part is not ready yet
import os
# Get current absolute path
current_file_path = os.path.abspath(__file__)
# Get current dir path
current_dir_path = os.path.dirname(current_file_path)
# Change into current dir path
os.chdir(current_dir_path)
output_path = current_dir_path
import argparse
import logging
import sys
import numpy as np 
import pandas as pd
from pyflink.table import StreamTableEnvironment
from pyflink.common import WatermarkStrategy, Encoder, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors.file_system import FileSource, StreamFormat, FileSink, OutputFileConfig, RollingPolicy
from pyflink.common import Types, SimpleStringSchema
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer

def split(line):
    yield from line.split()

def read_from_kafka():
    # Create a Flink execution environment
    env = StreamExecutionEnvironment.get_execution_environment()    

    # Add the Flink SQL Kafka connector jar file to the classpath
    env.add_jars("file:///home/hadoop/Desktop/PyFlink-Tutorial/flink-sql-connector-kafka-3.1-SNAPSHOT.jar")

    # Print a message to indicate that data reading from Kafka has started
    print("start reading data from kafka")

    # Create a Kafka consumer
    kafka_consumer = FlinkKafkaConsumer(
        topics='hamlet', # The topic to consume messages from
        deserialization_schema= SimpleStringSchema('UTF-8'), # The schema to deserialize messages
        properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'my-group'} # The Kafka broker address and consumer group ID
    )

    # Start reading messages from the earliest offset
    kafka_consumer.set_start_from_earliest()

    # Add the Kafka consumer as a source to the Flink execution environment and print the messages to the console
    env.add_source(kafka_consumer).print()

    # # Start the Flink job
    # env.execute()
    
    # ds = env.from_source(source=kafka_consumer, source_name= "kafka_consumer",watermark_strategy=WatermarkStrategy.for_monotonous_timestamps() )
    # # write all the data to one file
    
    # # compute word count
    # ds = ds.flat_map(split) \
    #     .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
    #     .key_by(lambda i: i[0]) \
    #     .reduce(lambda i, j: (i[0], i[1] + j[1]))

    #     # define the sink
    # if output_path is not None:
    #     ds.sink_to(
    #         sink=FileSink.for_row_format(
    #             base_path=output_path,
    #             encoder=Encoder.simple_string_encoder())
    #         .with_output_file_config(
    #             OutputFileConfig.builder()
    #             .with_part_prefix("prefix")
    #             .with_part_suffix(".ext")
    #             .build())
    #         .with_rolling_policy(RollingPolicy.default_rolling_policy())
    #         .build()
    #     )
    # else:
    #     print("Printing result to stdout. Use --output to specify output path.")
    #     # ds.print()

    #     # Step 1: Create a `StreamTableEnvironment` object.
    #     t_env = StreamTableEnvironment.create(env)

    #     # Step 2: Convert the `DataStream` object to a `Table` object.
    #     table = t_env.from_data_stream(ds)

    #     # Step 3: Convert the `Table` object to a `pandas` dataframe.
    #     df = table.to_pandas()
        
    #     df.to_csv('./DataStream_API_word_count.csv', index=False)
    #     print(df)

    # submit for execution
    env.execute()

if __name__ == '__main__':
    # Set up logging
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    # Call the read_from_kafka function
    read_from_kafka()
