This project provides a template for developing Flink applications using the Python API.
- Run and debug Flink jobs locally
- Unit testing using Pytest
- Dependency management using Poetry
- A development environment using the devcontainers standard
To start using this development environment locally, follow the getting started guide for your preferred editor:
Prerequisite: Please ensure Docker is installed and running on your machine.
vscode-installation.mp4
If you prefer not to install and configure Docker locally, GitHub Codespaces offers a convenient alternative. It allows you to create a fully configured development environment in the cloud. Please refer to the GitHub Codespaces documentation to get started.
This development environment optimizes Apache Flink's Python API development using Docker Compose for a minimal local Flink Cluster, featuring a Job Manager and a Task Manager. Using the devcontainers standard, it simplifies the deployment of code changes and enables convenient debugging.
graph LR
subgraph LocalEnvironment["Local Environment"]
direction LR
Editor["Editor"]
subgraph Docker["Docker Compose"]
direction TB
JobManager["Job Manager"]
subgraph Devcontainer["Development Container"]
direction LR
RemoteServer["Remote Development Server"]
Repository["Code Repository"]
TaskManager["Task Manager"]
end
end
end
Editor -->|"Accesses"| RemoteServer
RemoteServer -->|"Accesses"| Repository
Editor -->|"Starts Job"| JobManager
JobManager -->|"Starts Task"| TaskManager
TaskManager -->|"Uses"| Repository
Interact with the code via your preferred editor, connected to the Remote Development Server inside the Task Manager container. The code repository is directly cloned into this container, making any updates immediately ready for deployment. Running a new Flink job involves the Job Manager coordinating with the Task Manager container to execute tasks using the latest code.
The setup allows local code execution in the development environment, enabling debugger use through your editor. This repository provides utilities for connecting the debugger to a running Flink Job, allowing for in-depth inspection of tasks and setting breakpoints directly in the execution flow.
The development environment is structured as follows:
| Folder | Description |
|---|---|
bin |
Contains scripts for common development tasks. |
examples |
Contains example Flink jobs and unit tests. |
pyflink_dev |
The Python package where you can add code for your Flink jobs. Rename this for your project. |
Follow these instructions to execute common development tasks efficiently.
The Flink UI is available at http://localhost:8081.
To run a flink job locally, simply run it with python:
python examples/word_count.pyNote: To debug a locally running job, run the script in debug mode.
To submit a job to the job manager, use the following command. For instance, to run the word_count job:
flink-run examples/word_count.pyNote: The
flink-runcommand is automatically registered in the development environment. After initial setup, it may occur that the command is not recognized. In this case, open a new terminal session or restart the development environment.
Debugging a live Flink job is straightforward with the use of the flink-debug command.
Note: The
flink-debugcommand is automatically registered in the development environment. After initial setup, it may occur that the command is not recognized. In this case, open a new terminal session or restart the development environment.
To start debugging, execute the following command in your terminal:
flink-debug <optional_task_id>Note: Without arguments,
flink-debugtargets1-1. Specify a different task ID if needed.
Upon successful execution, the command will output a process ID (e.g., 1234). Make a note of this ID as you will need it to attach your debugger to the correct process.
If you're using VS Code, navigate to the debugger tab. Next, click the green play button to initiate the debugger. Make sure to select the debugger profile provided with your development environment. VS Code will present a list of running processes. Choose the one that corresponds to the process ID you noted earlier.
To put these instructions into practice, consider debugging the word_stream example. This job continuously outputs words to standard output. Begin by starting the job with the following command:
flink-run examples/word_stream.pyNext, initiate the debugger:
flink-debugRemember to note the process ID displayed in your console.
With the job running, proceed to open the examples/word_stream.py file and set a breakpoint at line 39. This is the body of the function that processes messages as they are received from the data source. Finally, attach your debugger to the noted process. The execution will halt at the breakpoint the moment the next message is processed, enabling you to inspect the flow in real-time.
debugging-example.mp4
The development environment leverages its local execution capabilities to enable unit testing for Flink jobs. An example unit test for the word count functionality is provided in examples/test__word_count.py. To run the unit tests, simply use the pytest command.