Skip to content

lawrencejones/pgsink

Repository files navigation

pgsink CircleCI Go Reference

Path to v1.0.0: https://github.com/lawrencejones/pgsink/projects/1

Draft docs can be seen at: docs

pgsink is a Postgres change-capture device that supports high-throughput and low-latency capture to a variety of sinks.

You'd use this project if your primary database is Postgres and you want a stress-free, quick-to-setup and easy-to-operate tool to replicate your data to other stores such as BigQuery or Elasticsearch, which works with any size Postgres database.

Screen capture showing how to import a table

Similar projects

There are many change-capture projects out there, and several support Postgres.

As an example, pgsink is similar to debezium in performance and durability goals, but with a much simpler setup (no Kafka required). We also bear similarity to Netflix's dblog, with the benefit of being open-source and available for use.

We win in these comparisons when you want a simple, no additional dependencies setup. We also benefit from the sole focus on Postgres instead of many upstream sources, as we can optimise our data-access pattern for large, high-transaction volume Postgres databases. Examples of this are keeping transactions short to help vacuums, and traversing tables using efficient indexes.

This makes pgsink a much safer bet for people managing production critical Postgres databases.

Developing

As an overview of important packages, for those understanding the source code:

  • changelog the input to sinks, produced by subscription or import
  • decode configures decoding of Postgres types into Golang (text -> int64)
  • imports create, manage and work import jobs, producing changelog entries
  • logical parsing of the pgoutput logical encoding, used by subscription
  • sinks implements different types of sink, from files to Google BigQuery
  • subscription Postgres change capture via replication, generating a changelog

This project comes with a docker-compose development environment. Boot the environment like so:

$ docker-compose up -d
docker-compose up -d
pgsink_prometheus_1 is up-to-date
pgsink_postgres_1 is up-to-date
pgsink_grafana_1 is up-to-date

Then run make recreatedb to create a pgsink database. You can now access your database like so:

$ psql --host localhost --user pgsink pgsink
pgsink=> \q

pgsink will work with this database: try pgsink --sink=file --decode-only.

Database migrations

We use goose to run database migrations. Create new migrations like so:

$ go run internal/migration/cmd/goose.go --dir internal/migration create create_import_jobs_table go
2019/12/29 14:59:51 Created new file: internal/migration/20191229145951_create_import_jobs_table.go

Running migrations is done using the make target:

$ make migrate structure.sql
$ go run internal/migration/cmd/goose.go --install up
2021/01/09 15:38:29 requested --install, so creating schema 'pgsink'
2021/01/09 15:38:29 goose: no migrations to run. current version: 20210102200953
docker-compose --env-file=/dev/null exec -T postgres pg_dump -U postgres pgsink --schema-only --schema=pgsink >structure.sql

Getting started

Boot a Postgres database, then create an example table.

$ createdb pgsink
$ psql pgsink
psql (11.5)
Type "help" for help.

pgsink=# create table public.example (id bigserial primary key, msg text);
CREATE TABLE

pgsink=# insert into public.example (msg) values ('hello world');
INSERT 1

pgsink will stream these changes from the database and send it to the configured sink. Changes are expressed as a stream of messages, either a Schema that describes the structure of a Postgres table, or a Modification corresponding to an insert/update/delete of a row in Postgres.

Our example would produce the following modification, where timestamp is the time at which the change was committed and sequence the operation index within the transaction:

{
  "timestamp": "2019-10-04T16:05:55.123456+01:00",
  "sequence": 1,
  "namespace": "public",
  "name": "example",
  "before": null,
  "after": {
    "id": "1",
    "msg": "hello world"
  }
}

Also sent, arriving before the modification element, will be a schema entry that describes the public.example table. We represent these as Avro schemas, built from the Postgres catalog information.

{
  "timestamp": "2019-10-04T16:05:55.123456+01:00",
  "schema": {
    "namespace": "public.example",
    "type": "record",
    "name": "value",
    "fields": [
      {
        "name": "id",
        "type": ["long", "null"],
        "default": null
      },
      {
        "name": "msg",
        "type": ["string", "null"],
        "default": null
      }
    ]
  }
}

Schemas are published whenever we first discover a relation. Use the timestamp field to order each successive schema event to ensure stale messages don't override more recent data.