# Output RDMBS Alchemy

This component pushes data to any RDMBS supported by SQLAlchemy as CSV on a given table. Parameters like host, database, user, password and table name need to be set.

Currently only append mode is supported via the execute_batch helper. Optionally, you can flush (truncate) the table before.

The current implementation loads all data into main memory (via pandas) first. Better implementations can be found below, PRs welcome!

https://hakibenita.com/fast-load-data-python-postgresql

In [None]:
!pip install sqlalchemy==1.4.23 pandas==1.3.1 psycopg2-binary==2.9.1

In [None]:
import logging
import os
import pandas as pd
import psycopg2 
import re
from sqlalchemy import create_engine
import sys

In [None]:
# data to load (expects CSV file with header)
data_csv = os.environ.get('data_csv', 'data.csv')

# type of database server (sqlalchemy dialect), e.g. postgresql
db_type = os.environ.get('db_type','postgresql')

# hostname of database server
host = os.environ.get('host')

# database name
database = os.environ.get('database')

# db user
user = os.environ.get('user')

# db password
password = os.environ.get('password')

# db port
port = int(os.environ.get('port', 5432))

# schema name
schema = os.environ.get('schema')

# table name
table = os.environ.get('table')

# truncate table before insert
truncate = bool(os.environ.get('truncate', False))

# temporal data storage for local execution
data_dir = os.environ.get('data_dir', '../../data/')

In [None]:
parameters = list(
  map(
      lambda s: re.sub('$', '"', s),
      map(
          lambda s: s.replace('=', '="'),
          filter(
              lambda s: s.find('=') > -1 and bool(re.match('[A-Za-z0-9_]*=[.\/A-Za-z0-9]*', s)),
              sys.argv
          )
      )
  )
)

for parameter in parameters:
    logging.warning('Parameter: '+parameter) 
    exec(parameter)
    
truncate = bool(truncate)
port = int(port)

In [None]:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

engine = create_engine(f'{db_type}://{user}:{password}@{host}:{port}/{database}')

In [None]:
if truncate:
    with engine.connect() as con:
        con.execution_options(autocommit=True).execute(f'TRUNCATE TABLE {schema}.{table};')

In [None]:
Session = sessionmaker(bind=engine) 

with Session() as session:
    df = pd.read_csv(data_dir + data_csv) 
    df.to_sql(table, con=engine, if_exists='append',index=False)