- Main repository at GitLab: https://gitlab.com/hgdeoro/popyka
- Mirror at GitHub: https://github.com/hgdeoro/popyka
Supported versions of Python:
Supported versions of PostgreSql:
Tox results:
Effortless and extendable data change capture (CDC) with Python.
Change data capture (CDC) refers to the process of capturing changes made to the data in a database and then delivering those changes in real-time to a downstream system.
- PoC: README-PoC.md & From Zero to CDC: A 3-days Agile Journey to the PoC.
- Implementing a system test for Popyka.
Existing CDC options can be overly complex and resource-intensive, hindering adoption by smaller systems.
⚡ Popyka is a simpler CDC solution that gets you started in minutes.
- Built for agility and ease of use.
- Simplifies CDC operations.
- Works with PostgreSql (12, 13, 14, 15, 16) and
wal2json
. - Works with Python 3.10, 3.11, 3.12.
- Built in
Filters
andProcessor
- Tailor it to your needs: easily extensible through custom
Filters
andProcessors
, letting you adapt it to your specific workflows.
⚡ Streamline Data Delivery with Custom Filters and Processors
Popyka empowers you to write custom filters and processors in Python to tailor data messages for downstream Kafka consumers. This functionality seamlessly integrates your own business logic, giving you complete control over the data transformation process.
⚡ Unleash Data Delivery Flexibility
Popyka's custom processors empower you to write data to various destinations, including Redis, S3, Snowflake, Cassandra, and more. Go beyond Kafka and tailor your data pipeline to your specific needs.
Popyka leverages PostgreSQL's logical replication protocol to capture all data changes
(BEGIN
, COMMIT
, INSERT
, UPDATE
, DELETE
, TRUNCATE
) efficiently.
Filters allows you to focus on specific messages based on your requirements.
Processors are the workhorses of Popyka. They take the captured data, transform it as needed (think of it as data wrangling), and then send it to other systems to keep everyone in sync.
Popyka ensures you only process the data you care about. It achieves this by letting you configure filters that act like smart sieves, allowing only relevant changes to proceed.
Popyka filters offer three actions:
IGNORE
discards the change (any remaining filter is ignored),PROCESS
accepts it (any remaining filter is ignored),CONTINUE
sends it to subsequent filters for evaluation.
Popyka provides built filters: IgnoreTxFilter
and TableNameIgnoreFilter
.
Extending Popyka
Extend Popyka's filtering capabilities by inheriting from the Filter
class
and implementing the filter()
method. This method should return a value from the Result enum,
indicating whether to IGNORE
, PROCESS
, or CONTINUE
filtering the data change.
class MyCustomFilter(Filter):
def filter(self, change) -> Result:
...
The base Filter
class is defined in api.py.
Processors take the filtered changes and manipulate them according to your needs, then export the transformed data to external systems.
Popyka simplifies sending data changes to Kafka. It includes a built-in processor ProduceToKafkaProcessor
that lets you easily export the database changes to your Kafka cluster, keeping your other systems in sync.
Extending Popyka
You can extend Popyka's processing capabilities by inheriting from the Processor
class and implementing the process_change()
method.
class MyCustomProcessor(Processor):
def process_change(self, change):
...
The base Processor
class is defined in api.py.
Popyka provides built-in error handlers for common scenarios within the popyka.builtin.error_handlers
module.
These handlers offer a convenient way to manage errors without writing custom logic.
Abort
: This handler terminates execution and causes Popyka to exit with an error. This is the default behavior.ContinueNextProcessor
: This handler gracefully ignores the error and allows the next processor in the pipeline to process the change.ContinueNextMessage
: This handler skips any remaining processors configured in the pipeline and continues processing the next message.RetryProcessor
: This handler attempts to re-process the failed change using the same processor that encountered the error.
By using these built-in handlers, you can implement basic error handling strategies without writing complex custom code.
For example:
processors:
- class: popyka.builtin.processors.ProduceToKafkaProcessor
error_handlers:
- class: popyka.builtin.error_handlers.RetryProcessor
- class: popyka.builtin.error_handlers.Abort
If ProduceToKafkaProcessor
fails, Popyka will automatically retry processing the message according to the number of
retries configured by the POPYKA_DEFAULT_RETRIES
environment variable. However, if the processor keeps failing
after exceeding the configured retries, Popyka will stop retrying and delegate the error to the next configured error handler.
To configure Popyka to retry a failing processor like ProduceToKafkaProcessor
before continuing
with the next processor in the pipeline, you can use the following configuration:
processors:
- class: popyka.builtin.processors.ProduceToKafkaProcessor
error_handlers:
- class: popyka.builtin.error_handlers.RetryProcessor
config: {}
- class: popyka.builtin.error_handlers.ContinueNextProcessor
config: {}
- class: some.module.SendMessageToDLQ
Unlike the behavior mentioned earlier (where Popyka aborts after exceeding the retry limit),
if retries are exhausted the error will be handled by the next configured error handler: SendMessageToDLQ
.
The base ErrorHandler
class is defined in api.py.
The basic configuration is done using environment variables:
POPYKA_CONFIG
: Path to configuration file. If unset, uses default configuration: popyka-default.yaml.- Example:
/etc/popyka/config.yaml
- Example:
POPYKA_PYTHONPATH
: Directories to add to the python path.POPYKA_COMPACT_DUMP
.POPYKA_MAX_PROCESSING_ATTEMPTS
.POPYKA_DEFAULT_RETRIES
.
The default configuration file is popyka-default.yaml. It uses bash-style environment variables interpolation, and requires to set:
POPYKA_DB_DSN
: libpq-compatible connection URI.- Example:
postgresql://hostname:5432/database-name
- See: Connection URIs
- Example:
POPYKA_KAFKA_BOOTSTRAP_SERVERS
: Kafka bootstrap servers, comma separated.- Example:
bootstrap-server-1:9092,bootstrap-server-2:9092,bootstrap-server-3:9092
- See: Apache Kafka Python Client
- Example:
Other environment variables that you can use with the default configuration file:
POPYKA_DB_SLOT_NAME
: A unique, cluster-wide identifier for the PostgreSql logical replication slot.POPYKA_KAFKA_TOPIC
: Kafka topic name where the changes should be published to.
To launch the Django Admin demo:
$ cd samples/django-admin # cd into sample project
$ docker compose up -d # bring all the services up: PostgreSql, Kafka, popyka.
# You might need to repeat this command.
$ docker compose logs -f demo-popyka # to see the CDC working
- Navigate to django admin and login using
admin
:admin
. - You can see Kafka contents using RedPanda console.
You might find this files of interest:
- Code & README: samples/django-admin
- docker-compose.yml
- Default config: popyka-default.yaml
- Alternative config: popyka-config-ignore-tables.yaml
Popyka leverages PostgreSQL logical replication for data streaming. This approach is powerful, but if you're unfamiliar with streaming replication, consider setting up robust monitoring. Managing new replication slots can introduce complexities, and monitoring helps ensure smooth operation.
If Popyka is no longer in use, remember to remove these replication slots to avoid unnecessary resource consumption.
To enable logical replication for Popyka to function, you might need to adjust your PostgreSQL
configuration parameter wal_level
to logical
. This setting ensures the write-ahead log (WAL)
captures the necessary information for Popyka to track changes in your database.
The v1 is under development on the main
branch.
- create mechanism to allow inclusion of user's Python code (docker)
- dev experience: generate documentation of public API
create mechanism to allow inclusion of user's Python code (docker compose)DONEdocument usageDONEci/cd: run system testsDONEci/cd: publish docker image to public repositoryDONEci/cd: add coverage &badgeci/cd: build image from release tagDONEci/cd: run unittestsDONEadd mechanism to overwrite path to configuration fileDONEimplement e2e testDONEimprove configuration mechanism to support real world scenariosDONEdev experience: create sample projects using popykaDONE*fix issues on MacOS (for local development, it requiresDONE--network=host
)improve automated testingDONEdefine supported Python versions and run tests on all supported versionsDONEimplement semantic versioningDONE
At the moment, Popyka stream changes, but doesn't handle pre-existing data. Probably there are simple ways to implement this. Some ideas:
- Can 'copy_data' from
subscription
be used? - Use
xmin
andskip locked
to incrementally copy data before starting consumption?
See: https://www.morling.dev/blog/insatiable-postgres-replication-slot/