Postgresql To Kinesis For Java
A tool for publishing inserts, updates, and deletes made on a Postgresql database to an Amazon Kinesis Stream. pg2k4j may be run as a stand-alone application from the command line, or used as a Java library where its functionality can be extended and customized.
First, setup your Postgres database to support logical replication and create an AWS Kinesis Stream.
Run pg2k4j as a Stand-alone Application
docker run -v /path/to/.aws/creds/:/aws_creds disneystreaming/pg2k4j --awsconfiglocation=/aws_creds --awsprofile=default --pgdatabase=<your_postgres_db> --pghost=<your_postgres_host> --pguser=<your_postgres_user> --pgpassword=<your_postgres_pw> --streamname=<your_kinesis_streamname>
When you observe the below log, pg2k4j is set to publish changes to Kinesis.
[main] INFO com.disneystreaming.pg2k4j.SlotReaderKinesisWriter - Consuming from slot pg2k4j
Use as a Java Library
pg2k4j artifacts are published to maven central
<dependency> <groupId>com.disneystreaming.pg2k4j</groupId> <artifactId>pg2k4j</artifactId> <version>LATEST</version> </dependency>
compile group: 'com.disneystreaming.pg2k4j', name: 'pg2k4j', version: 'LATEST'
libraryDependencies += "info.pg2k4j" % "pg2k4j" % "LATEST"
Why We Wrote pg2k4j
pg2k4j is an implementation of a powerful design pattern called Change Data Capture. By using pg2k4j, anyone can know the state of your database at any point in time by consuming from the Kinesis Stream. At DSS we have rapidly changing datasets that many teams need to access and process in their own way. pg2k4j alleviates the need to grant database access to each team or to stand up an API on top of the dataset. This keeps the load down on your database, making it possible to max out its write throughput.
Benefits Over Existing Solutions
Before writing pg2k4j, we explored existing solutions. We used pg2kinesis but found that this implementation simply couldn't keep up with the write throughput that we required. As a JVM app, pg2k4j can natively integrate with Amazon's Kinesis Producer Library allowing it to achieve write speeds of over 1 million records per minute, which is orders of magnitude faster than the performance we observed with its Python counterpart.
How it Works
Logical Replication Slot on the Postgresql database.1. pg2k4j Opens up a
A replication slot will stream changes made on the database to the listener of the replication slot in the format specified by the plugin used for that replication slot. By default pg2k4j uses the wal2json plugin which outputs a json representation of a SlotMessage to the listening thread. Postgres writes all data changes to the Write Ahead Log, which, as well as ensuring data integrity and crash safety, makes it possible to perform logical replication. Each replication slot maintains a pointer to a position in the WAL, indicating the last sequence number this replication slot has processed. This pointer allows Postgres to flush all sections of the WAL which occurred before this sequence number. Crucially, if the application maintaining the replication slot does not update this sequence number, the storage space on the database will fill up because Postgres won't be able to clear any sections of the WAL. To view this sequence number you can run the below query on your database.
select * from pg_replication_slots
Details of how pg2k4j manages this pointer are outlined later in this section.
deserializes the json output sent by the wal2json plugin to a SlotMessage.2. pg2k4j
This method should be overridden when using any plugin besides wal2json as the contents from the WAL would not be json representations of a SlotMessage.
3. pg2k4j writes this contents to the Kinesis Stream.
4. The callback is invoked when the records succeed or fail to make it to the stream.
On a successful write to the stream pg2k4j will advance the replication slot's sequence number, indicating that any data before this point may be flushed by the database. By advancing the sequence number after receiving confirmation that the record arrived on the stream, pg2k4j guarantees that each data change reaches Kinesis. Even on Postgres restart or pg2k4j restart this guarantee is preserved.
There is one other scenario wherein pg2k4j will advance the sequence number. It's important to note that each Postgres instance may have many databases, but a replication slot is configured against a single database. In the scenario where the replication slot database is idle but the other databases are active, it's important that pg2k4j still advances its pointer into the WAL so that Postgres doesn't hang onto these sections of the WAL. That's why pg2k4j advances the sequence number after a certain period of inactivity which defaults to 5 minutes.
This section is a walk through on how to create your Posgresql instance configured for logical replication as an RDS instance. pg2k4j by no means requires that your Postgesql instance is an RDS instance, but since Kinesis is an AWS product, many users will likely also be running their Postgres instance on AWS. For an example of how to configure a non-RDS instance of Postgres refer to the integration tests. This section also walks through how to set up a Kinesis Stream, for which pg2k4j requires no sepcial configuration.
Start AWS RDS Postgres
Create a Parameter Group
In AWS console navigate to RDS then parameter groups and create a parameter group for the
In this parameter group, set the following values:
rds.logical_replication 1 max_wal_senders 10 max_replication_slots 10
In AWS console navigate to RDS->Instances and select
Launch Instance. Follow the creation wizard,
Postgresql for the DB engine and
Postgresql 10.3-R1 for the DB engine version.
As shown below, associate the parameter group you created in the previous step with this instance.
Create Kinesis Stream
In AWS console navigate to Kinesis, and create a stream.
Fork the repo and submit a pr with a description detailing what this code does and what bug or feature it addresses. Any methods containing substantial logic should include javadocs.
Be sure that both integration tests and unit pass and that any new code introduced has corresponding tests. Run unit tests with
>> mvn clean test Tests run: 13, Failures: 0, Errors: 0, Skipped: 0
and integration tests with
mvn clean verify Tests run: 2, Failures: 0, Errors: 0, Skipped: 0
Releasing is automatic and is driven by tags. To release simply tag the master branch with a Semantic version, e.g.
This will update the pom with the version, publish to maven, and build and push the Docker image to Dockerhub.