Skip to content

Commit d0aaa0a

Browse files
authored
Jdbc sink example (#133)
* Jdbc Sink example
1 parent 306fe17 commit d0aaa0a

File tree

13 files changed

+935
-0
lines changed

13 files changed

+935
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ Example applications in Java, Python, Scala and SQL for Amazon Managed Service f
2020
- [**SQS Sink**](./java/SQSSink) - Writing data to Amazon SQS
2121
- [**Prometheus Sink**](./java/PrometheusSink) - Sending metrics to Prometheus
2222
- [**Flink CDC**](./java/FlinkCDC) - Change Data Capture examples using Flink CDC
23+
- [**JdbcSink**](./java/JdbcSink) - Writes to a relational database executing upsert statements
2324

2425
#### Reading and writing files and transactional data lake formats
2526
- [**Iceberg**](./java/Iceberg) - Working with Apache Iceberg and Amazon S3 Tables

java/JdbcSink/README.md

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
## Flink JDBC Sink
2+
3+
This example demonstrates how to use the DataStream API JdbcSink to write to a relational database.
4+
5+
* Flink version: 1.20
6+
* Flink API: DataStream
7+
* Language: Java (11)
8+
* Flink connectors: JDBC sink, DataGen
9+
10+
This example demonstrates how to do UPSERT into a relational database.
11+
The example uses the UPSERT syntax of PostgreSQL, but it can be easily adapted to the syntaxes of other databases or into
12+
an append-only sink, with an INSERT INTO statement.
13+
14+
#### Which JdbcSink?
15+
16+
At the moment of publishing this example (August 2025) there are two different DataStream API JdbcSink implementations,
17+
available with the version `3.3.0-1.20` of the JDBC connector.
18+
19+
1. The new `org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink` which uses the Sink API V2 and
20+
is initialized using a builder: `JdbcSink.<StockPrice>builder()..build()`
21+
2. The legacy `org.apache.flink.connector.jdbc.JdbcSink` which uses the legacy `SinkFunction` API, now deprecated.
22+
The legacy sink is initialized with the syntax `JdbcSink.sink(...)`
23+
24+
This example uses the new sink.
25+
26+
At the moment of publishing this example (August 2025) the Apache Flink documentation
27+
[still refers to the deprecated sink](https://nightlies.apache.org/flink/flink-docs-lts/docs/connectors/datastream/jdbc/#jdbcsinksink).
28+
29+
### Data
30+
31+
The application generates comprehensive `StockPrice` objects with realistic fake data:
32+
33+
```json
34+
{
35+
"symbol": "AAPL",
36+
"timestamp": "2025-08-07T10:30:45",
37+
"price": 150.25
38+
}
39+
```
40+
41+
This data is written using upsert in the following database table, containing the latest price for every symbol.
42+
43+
The sink uses the PostgreSQL upsert syntax:
44+
45+
```
46+
INSERT INTO prices (symbol, price, timestamp) VALUES (?, ?, ?)
47+
ON CONFLICT(symbol) DO UPDATE SET price = ?, timestamp = ?
48+
```
49+
50+
This is specific to PostgreSQL, but the code can be adjusted to other databases as long as the SQL syntax supports doing
51+
an upsert with a single SQL statement.
52+
53+
### Runtime configuration
54+
55+
When running on Amazon Managed Service for Apache Flink the runtime configuration is read from *Runtime Properties*.
56+
57+
When running locally, the configuration is read from the [`resources/flink-application-properties-dev.json`](src/main/resources/flink-application-properties-dev.json) file located in the resources folder.
58+
59+
Runtime parameters:
60+
61+
| Group ID | Key | Description |
62+
|------------|----------------------|-------------------------------------------------------------------------------------------------------------------------------|
63+
| `DataGen` | `records.per.second` | Number of stock price records to generate per second (default: 10) |
64+
| `JdbcSink` | `url` | PostgreSQL JDBC URL. e.g. `jdbc:postgresql://your-rds-endpoint:5432/your-database`. Note: the URL includes the database name. |
65+
| `JdbcSink` | `table.name` | Destination table. e.g. `prices` (default: "prices") |
66+
| `JdbcSink` | `username` | Database user with INSERT and UPDATE permissions |
67+
| `JdbcSink` | `password` | Database password |
68+
| `JdbcSink` | `batch.size` | Number of records to batch before executing the SQL statement (default: 100) |
69+
| `JdbcSink` | `batch.interval.ms` | Maximum time in milliseconds to wait before executing a batch (default: 200) |
70+
| `JdbcSink` | `max.retries` | Maximum number of retries for failed database operations (default: 5) |
71+
72+
73+
### Database prerequisites
74+
75+
When running on Amazon Managed Service for Apache Flink with databases on AWS, you need to set up the database manually,
76+
ensuring you set up all the following:
77+
78+
> You can find the SQL script that sets up the dockerized database by checking out the init script for
79+
> [PostgreSQL](docker/postgres-init/init.sql).
80+
81+
1. **PostgreSQL Database**
82+
1. The database name must match the `url` configured in the JDBC sink
83+
2. The destination table must have the following schema:
84+
```sql
85+
CREATE TABLE prices (
86+
symbol VARCHAR(10) PRIMARY KEY,
87+
timestamp TIMESTAMP NOT NULL,
88+
price DECIMAL(10,2) NOT NULL
89+
);
90+
```
91+
3. The database user must have SELECT, INSERT, and UPDATE permissions on the prices table
92+
93+
94+
### Testing with local database using Docker Compose
95+
96+
This example can be run locally using Docker.
97+
98+
A [Docker Compose file](./docker/docker-compose.yml) is provided to run a local PostgreSQL database.
99+
The local database is initialized by creating the database, user, and prices table with sample data.
100+
101+
You can run the Flink application inside your IDE following the instructions in [Running in IntelliJ](#running-in-intellij).
102+
103+
To start the local database run `docker compose up -d` in the `./docker` folder.
104+
105+
Use `docker compose down -v` to shut it down, also removing the data volumes.
106+
107+
108+
### Running in IntelliJ
109+
110+
You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation.
111+
Run the database locally using Docker Compose, as described [above](#testing-with-local-database-using-docker-compose).
112+
113+
See [Running examples locally](../running-examples-locally.md) for details about running the application in the IDE.
114+
115+
116+
### Running on Amazon Managed Service for Apache Flink
117+
118+
To run the application in Amazon Managed Service for Apache Flink ensure the application configuration has the following:
119+
* VPC networking
120+
* The selected Subnets can route traffic to the PostgreSQL database
121+
* The Security Group allows traffic from the application to the database
122+
123+
124+
### Security Considerations
125+
126+
For production deployments:
127+
1. Store database credentials in AWS Secrets Manager.
128+
2. Use VPC endpoints for secure database connectivity.
129+
3. Enable SSL/TLS for database connections.
130+
131+
> ⚠️ **Password rotation**: if the password of your database is rotated, the JdbcSink fails, causing the job to restart.
132+
> If you fetch the password dynamically on application start (when you create the JdbcSink object) the job will be able
133+
> to restart with the new password. Fetching the password on start is not shown in this example.
134+
135+
### Implementation considerations
136+
137+
#### At-least-once or exactly-once
138+
139+
This implementation leverages the at-least-once mode of the JdbcSink. This is normally sufficient when the sink is
140+
executing a single idempotent statement such as an UPSERT: any duplicate will just overwrite the same record.
141+
142+
The JdbcSink also supports exactly-once mode which leverages XA transactions synchronized with Flink checkpoints,
143+
and relies on XADataSource. This prevents duplicate writes in case of failure and restart from checkpoint. Note that it
144+
does not prevent duplicates if you restart the application from an older Snapshot (Flink Savepoint), unless your SQL statement
145+
implements some form of idempotency.
146+
147+
#### No connection pooler?
148+
149+
The JdbcSink does not support using any database connection pooler, such as HikariCP.
150+
151+
The reason is that no connection pooling is required. The sink will open one database connection per parallelism (one per subtask),
152+
and reuse these connections unless they get closed.
153+
154+
#### Batching
155+
156+
The JdbcSink batches writes to reduce the number of requests to the database.
157+
The batch size and interval used in this example are for demonstrational purposes only.
158+
159+
You should test your actual application with a realistic throughput and realistic data to optimize these values for your
160+
workload.
161+
162+
163+
#### Which flink-connector-jdbc-* dependency?
164+
165+
To use JdbcSink in DataStream API, you need `flink-connector-jdbc-core` and the JDBC driver of the specific database. For example:
166+
```
167+
<dependency>
168+
<groupId>org.apache.flink</groupId>
169+
<artifactId>flink-connector-jdbc-core</artifactId>
170+
<version>3.3.0-1.20</version>
171+
</dependency>
172+
173+
<dependency>
174+
<groupId>org.postgresql</groupId>
175+
<artifactId>postgresql</artifactId>
176+
<version>42.7.2</version>
177+
</dependency>
178+
```
179+
180+
Including `flink-connector-jdbc` would bring in unnecessary dependencies and increase the size of the uber-jar file.
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
services:
2+
# PostgreSQL database
3+
postgres:
4+
image: postgres:15
5+
container_name: postgres
6+
restart: always
7+
environment:
8+
POSTGRES_DB: testdb
9+
POSTGRES_USER: flinkuser
10+
POSTGRES_PASSWORD: flinkpassword
11+
ports:
12+
- "5432:5432"
13+
volumes:
14+
- postgres_data:/var/lib/postgresql/data
15+
- ./postgres-init:/docker-entrypoint-initdb.d
16+
healthcheck:
17+
test: ["CMD-SHELL", "pg_isready -U flinkuser -d testdb"]
18+
interval: 10s
19+
timeout: 5s
20+
retries: 5
21+
start_period: 10s
22+
23+
volumes:
24+
postgres_data:
25+
driver: local
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
-- Create prices table for JDBC sink
2+
CREATE TABLE prices (
3+
symbol VARCHAR(10) PRIMARY KEY,
4+
timestamp TIMESTAMP NOT NULL,
5+
price DECIMAL(10,2) NOT NULL
6+
);
7+
8+
-- Insert some sample data for testing
9+
INSERT INTO prices (symbol, timestamp, price) VALUES
10+
('AAPL', NOW(), 150.25);
11+
12+
-- Display table structure
13+
\d prices;
14+
15+
-- Display sample data
16+
SELECT * FROM prices;
17+
18+
-- Show table statistics
19+
SELECT
20+
schemaname,
21+
tablename,
22+
attname as column_name,
23+
n_distinct,
24+
correlation
25+
FROM pg_stats
26+
WHERE tablename = 'prices';

0 commit comments

Comments
 (0)