Skip to content

PostgreSQL

Dirk Steynberg edited this page May 28, 2020 · 3 revisions

PostgreSQL to S3

Preamble

DBtoS3 is proud to announce support for full load and replication for all PostgreSQL Databases.

Requirements

Operating Systems

Windows Operating Systems will be required to install the PostgreSQL client in order to make use of all the available features. All other Operating Systems would simply require the postgres binary files which can be found by installing:

pip install psycopg2-binary

Database & AWS

  • Read access credentials will be needed to connect to your PostgreSQL database
  • A Read/Write user with access to S3 will also be required

Installation

The first step would be to install DBtoS3 in your python environment

pip install dbtos3

Usage

Environment Setup

To begin, first create an appropriate python project. It is crucial that this directory is not moved or tampered with once replication has begun. Within this directory, the DBtoS3 catalogue and logging will take place which is crucial for successful loading, replication and continuous logging for the replication process.

In this directory, create an empty .env and empty app.py file where we will set up our DBtoS3 PostgreSQL full load and replication tasks.

ProjectDirectory/
    |_ .env
    |_ app.py
    |_ catalog.db
    |_ Logs/

We will first have a look at our .env file. Copy the text below and paste it into your .env file, then go about adding all the relevant credentials pertaining to your database and s3 sources.

#[s3 credentials]
AWS_SECRET_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_REGION=
S3_BUCKET=

#[postgres database credentials]
POSTGRES_HOST=
POSTGRES_DATABASE=
POSTGRES_USER=
POSTGRES_PASSWORD=
POSTGRES_S3_MAIN_KEY=
POSTGRES_PORT=

Once you have added your credentials, we will gather these credentials in our app.py file before making use of the DBtoS3 features.

import os

from dotenv import load_dotenv

import dbtos3

APP_ROOT = os.path.join(os.path.dirname(__file__))  # refers to application_top
load_dotenv(os.path.join(APP_ROOT, '.env'))


if __name__ == '__main__':
    pass

Now that we have our app ready to consume credentials, we can import the PostgreSQL Model and assign a list of tables from our database that we would like to replicate.

It is good practice to set up two methods, one for the initial full load and a second that will run from then onwards as the standard replication process.

It is important to initiate a full load which takes on a set amount of days, in order to set up the catalog that would monitor changes and assume any new data to S3 every time the application runs.

  • "table" refers to the table in the database you want to load and replicate.
  • "column" refers to the column in the table that is a timestamp that updates every time data is updated or inserted. if such a column does not exist, DBtoS3 will not be able to replicate data successfully, as the application only does lightweight cataloging.
import os

from dotenv import load_dotenv

import dbtos3

APP_ROOT = os.path.join(os.path.dirname(__file__))  # refers to application_top
load_dotenv(os.path.join(APP_ROOT, '.env'))

###
# Setting up PostgreSQL Replication and full-load
###

db = dbtos3.ReplicationMethodsPostgreSQL(
    host=os.getenv('POSTGRES_HOST'),
    database=os.getenv('POSTGRES_DATABASE'),
    user=os.getenv('POSTGRES_USER'),
    password=os.getenv('POSTGRES_PASSWORD'),
    port=os.getenv('POSTGRES_PORT'),
    main_key=os.getenv('POSTGRES_S3_MAIN_KEY'),

    region_name=os.getenv('AWS_REGION'),
    aws_access_key_id=os.getenv('AWS_SECRET_KEY_ID'),
    aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
    s3bucket=os.getenv('S3_BUCKET'),
)


def db_full_load_methods(args):
    for a in args:
        db.day_level_full_load(days=10, table=a, column='updated_at')


def db_replicate_methods(args):
    for a in args:
        db.replicate_table(table=a, column='updated_at')



if __name__ == '__main__':

    # list all the tables from the database you want to replicate
    db_tables = ['users']
    
    # we can run this method first...
    db_full_load_methods(db_tables)
    
    # before running this method from then onwards...
    db_replicate_methods(db_tables)
    db.close_connection()

Remember to close connection once done!

Once you have run the application, you can find logs and processes in the Logs folder. You should then begin to see .json data coming into your relevant Bucket directory!