# Introduction

https://github.com/adpatter/sqlalchemy_alembic_notes/blob/main/main.ipynb

## SQLAlchemy and Alembic

SQLAlchemy and Alembic are formal ORM and migration tools - that's not how I use them.  :-)

### Example Application
1. Start with arbitrary CSV received from network.
2. Unsupervised table creation.
3. Unsupervised column creation.
4. Unsupervised type conversions.
5. Unsupervised deduplication of records. 

## Materials

In [1]:
import time
import os
import copy
from decimal import Decimal
import pandas as pd
import sqlalchemy as sa
from alembic import migration
from alembic import operations

Create a Postgresql database and a SQLite database.

In [27]:
#  Create a Postgres database in a Docker container.
!docker stop postgres
!docker container rm postgres
!docker run -p 5432:5432 --name postgres -e POSTGRES_PASSWORD=PASSWORD -d postgres

#  Create a SQLite database on disk.
if 'main.db' in os.listdir('./sqlite/db/'): os.remove('./sqlite/db/main.db')
!sqlite3 ./sqlite/db/main.db ""
time.sleep(5)

Error response from daemon: No such container: postgres
Error: No such container: postgres
27bebb8a868897dc3a84b33c62c3cf5041602f10d8e7be8864e22ff47927ed5b


Create Connection and Operations objects for the SQLite and the Postgres database.

In [28]:
sl_engine = sa.create_engine('sqlite:///sqlite/db/main.db')
sl_conn = sl_engine.connect()
sl_op = operations.Operations(migration.MigrationContext.configure(sl_conn))

pg_engine = sa.create_engine('postgresql://postgres:PASSWORD@localhost:5432/postgres')
pg_conn = pg_engine.connect()
pg_op = operations.Operations(migration.MigrationContext.configure(pg_conn))

###  The Connection Object (SQLAlchemy)
"The Connection object is procured by calling the Engine.connect() method of the Engine object, and **provides services for execution of SQL statements as well as transaction control** (Bayer et al., https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.Connection)."

###  The Operations Object (Alembic)
**"All \[migration\] directives exist as methods on a class called Operations** (Bayer et al.,https://alembic.sqlalchemy.org/en/latest/ops.html#alembic.operations.Operations)."

*"While Operations is normally configured as part of the EnvironmentContext.run_migrations() method called from an env.py script, **a standalone Operations instance can be made for use cases external to regular Alembic migrations** by passing in a MigrationContext (Bayer et al.,https://alembic.sqlalchemy.org/en/latest/ops.html#alembic.operations.Operations)."*

## Method
What is in the databases?

In [29]:
#  Display tables in the Postgres database.
pg_meta = sa.MetaData(bind=pg_conn)
pg_meta.reflect(bind=pg_conn)
pg_meta.tables

FacadeDict({})

In [30]:
#  Display tables in the SQLite database.
sl_meta = sa.MetaData(bind=sl_conn)
sl_meta.reflect(bind=sl_conn)
sl_meta.tables

FacadeDict({})

Create a table named table_1 in the SQLite database and the Postgres database.

In [31]:
#  Create a table in the Postgres database.

if 'table_1' in pg_meta.tables.keys():
    pg_op.drop_table('table_1')

pg_table_1 = pg_op.create_table(
    'table_1', 
    sa.schema.Column(name='pk_1', type_=sa.types.Integer(), primary_key=True, nullable=False),
    sa.schema.Column(name='column_1', type_=sa.types.String(length=32)), 
    sa.schema.Column(name='column_2', type_=sa.types.Numeric(2,1))
    )

pg_meta = sa.MetaData(bind=pg_conn)
pg_meta.reflect(bind=pg_conn)
pg_meta.tables

FacadeDict({'table_1': Table('table_1', MetaData(bind=<sqlalchemy.engine.base.Connection object at 0x7fee6d075df0>), Column('pk_1', INTEGER(), table=<table_1>, primary_key=True, nullable=False, server_default=DefaultClause(<sqlalchemy.sql.elements.TextClause object at 0x7fee3b9d5a30>, for_update=False)), Column('column_1', VARCHAR(length=32), table=<table_1>), Column('column_2', NUMERIC(precision=2, scale=1), table=<table_1>), schema=None)})

In [32]:
#  Create a table in the SQLite database.

sl_meta = sa.MetaData(bind=sl_conn)
sl_meta.reflect(bind=sl_conn)
if 'table_1' in sl_meta.tables.keys(): sl_op.drop_table('table_1')

sl_table_1 = sl_op.create_table(
    'table_1', 
    sa.schema.Column(name='pk_1', type_=sa.types.Integer(), primary_key=True, nullable=False),
    sa.schema.Column(name='column_1', type_=sa.types.String(length=32)), 
    sa.schema.Column(name='column_2', type_=sa.types.Numeric(2,1))
    )

sl_meta = sa.MetaData(bind=sl_conn)
sl_meta.reflect(bind=sl_conn)
sl_meta.tables

FacadeDict({'table_1': Table('table_1', MetaData(bind=<sqlalchemy.engine.base.Connection object at 0x7fee3b9de940>), Column('pk_1', INTEGER(), table=<table_1>, primary_key=True, nullable=False), Column('column_1', VARCHAR(length=32), table=<table_1>), Column('column_2', NUMERIC(precision=2, scale=1), table=<table_1>), schema=None)})

In [33]:
first_set_of_records = [
    {'pk_1':1, 'column_1':'string_1', 'column_2':Decimal(1.0)},
    {'pk_1':2, 'column_1':'string_2', 'column_2':Decimal(2.0)}
]

second_set_of_records = [
    {'pk_1':3, 'column_1':'string_3', 'column_2':Decimal(3.0)},
    {'pk_1':4, 'column_1':'string_4', 'column_2':Decimal(4.0)}
]

pd.DataFrame(first_set_of_records+second_set_of_records)

Unnamed: 0,pk_1,column_1,column_2
0,1,string_1,1
1,2,string_2,2
2,3,string_3,3
3,4,string_4,4


In [34]:
try:
    trans = sl_conn.begin()

    sl_op.bulk_insert(table=sl_table_1, rows=first_set_of_records)

    sl_op.bulk_insert(table=sl_table_1, rows=second_set_of_records)

    trans.commit()
except BaseException as e:
    trans.rollback()
    raise e
finally:
    # sl_conn.close()
    pass

df_sl_table_1 = pd.read_sql('table_1', con=sl_conn)

df_sl_table_1

Unnamed: 0,pk_1,column_1,column_2
0,1,string_1,1
1,2,string_2,2
2,3,string_3,3
3,4,string_4,4


In [35]:
first_set_of_records = [
    {'pk_1':1, 'column_1':'string_1', 'column_2':Decimal(1.0)},
    {'pk_1':2, 'column_1':'string_2', 'column_2':Decimal(2.0)}
]

try:
    trans = pg_conn.begin()

    pg_op.bulk_insert(table=pg_table_1, rows=first_set_of_records)

    trans.commit()
except BaseException as e:
    trans.rollback()
    raise e
finally:
    # pg_conn.close()
    pass

df_pg_table_1 = pd.read_sql('table_1', con=pg_conn)

df_pg_table_1

Unnamed: 0,pk_1,column_1,column_2
0,1,string_1,1.0
1,2,string_2,2.0


##  Method

Copy rows from a SQLite table into a Postgres table that are not already in the Postgres table.
1.  Copy the SQLite records from the SQLite database into a temporary table in the Postgres database named temp_table_1.
2.  Use an EXCEPT clause in order to insert records into the table named table_1 from temp_table_1 that are not already in table_1. 

In [36]:
#  Create a temporary table in the Postgres database that will hold the SQLite records.
pg_meta = sa.MetaData(bind=pg_conn)
pg_meta.reflect(bind=pg_conn)
if 'temp_table_1' in pg_meta.tables.keys(): pg_op.drop_table('temp_table_1')

pg_temp_table_1 = pg_op.create_table(
    'temp_table_1', 
    *[copy.copy(column) for column in sl_table_1.columns]
    #  The columns are copies of the SQLite columns.
    )

pg_meta = sa.MetaData(bind=pg_conn)
pg_meta.reflect(bind=pg_conn)
pg_meta.tables.keys()

dict_keys(['table_1', 'temp_table_1'])

Print a SELECT statement.

In [37]:
sl_select_statement = sa.sql.expression.select(sl_table_1.columns)
print(sl_select_statement)

SELECT table_1.pk_1, table_1.column_1, table_1.column_2 
FROM table_1


Select and display the records to be inserted into temp_table_1.

In [38]:
sl_conn.execute(sl_select_statement).all()

  sl_conn.execute(sl_select_statement).all()


[(1, 'string_1', Decimal('1.0')),
 (2, 'string_2', Decimal('2.0')),
 (3, 'string_3', Decimal('3.0')),
 (4, 'string_4', Decimal('4.0'))]

Transform the records so that they can be inserted into temp_table_1.

In [39]:
records = [dict(record._mapping) for record in sl_conn.execute(sl_select_statement)]

records

[{'pk_1': 1, 'column_1': 'string_1', 'column_2': Decimal('1.0')},
 {'pk_1': 2, 'column_1': 'string_2', 'column_2': Decimal('2.0')},
 {'pk_1': 3, 'column_1': 'string_3', 'column_2': Decimal('3.0')},
 {'pk_1': 4, 'column_1': 'string_4', 'column_2': Decimal('4.0')}]

Print a INSERT statement.

In [40]:
pg_insert_statement = pg_temp_table_1.insert().values(records)

print(pg_insert_statement)

INSERT INTO temp_table_1 (pk_1, column_1, column_2) VALUES (:pk_1_m0, :column_1_m0, :column_2_m0), (:pk_1_m1, :column_1_m1, :column_2_m1), (:pk_1_m2, :column_1_m2, :column_2_m2), (:pk_1_m3, :column_1_m3, :column_2_m3)


In [41]:
try:
    trans = pg_conn.begin()

    pg_conn.execute(pg_insert_statement)

    trans.commit()
except BaseException as e:
    trans.rollback()
    raise e
finally:
    # pg_conn.close()
    pass

pd.read_sql('temp_table_1', con=pg_conn)

Unnamed: 0,pk_1,column_1,column_2
0,1,string_1,1.0
1,2,string_2,2.0
2,3,string_3,3.0
3,4,string_4,4.0


In [42]:
pd.read_sql('table_1', con=pg_conn)

Unnamed: 0,pk_1,column_1,column_2
0,1,string_1,1.0
1,2,string_2,2.0


Display the tables in the Postgres database.

In [43]:
pg_meta = sa.MetaData(bind=pg_conn)
pg_meta.reflect(bind=pg_conn)
print(pg_meta.tables.keys())

dict_keys(['table_1', 'temp_table_1'])


Build the EXCEPT clause.

In [44]:
select_except_statement = sa.sql.expression.except_(sa.select(pg_temp_table_1), sa.select(pg_table_1))

print(select_except_statement)

SELECT temp_table_1.pk_1, temp_table_1.column_1, temp_table_1.column_2 
FROM temp_table_1 EXCEPT SELECT table_1.pk_1, table_1.column_1, table_1.column_2 
FROM table_1


Build the INSERT statement using the EXCEPT clause.

In [45]:
insert_from_select_except_statement = sa.insert(pg_table_1).from_select(pg_table_1.columns, select_except_statement)

print(insert_from_select_except_statement)

INSERT INTO table_1 (pk_1, column_1, column_2) SELECT temp_table_1.pk_1, temp_table_1.column_1, temp_table_1.column_2 
FROM temp_table_1 EXCEPT SELECT table_1.pk_1, table_1.column_1, table_1.column_2 
FROM table_1


Insert the subset of rows from temp_table_1 into temp_table and remove the temp_table_1.

In [46]:
try:
    trans = pg_conn.begin()

    pg_conn.execute(insert_from_select_except_statement)

    trans.commit()
except BaseException as e:
    trans.rollback()
    raise e
finally:
    pg_meta = sa.MetaData(bind=pg_conn)
    pg_meta.reflect(bind=pg_conn)
    if 'temp_table_1' in pg_meta.tables.keys():
        pg_op.drop_table(table_name='temp_table_1')

    # pg_conn.close()
    pass

pg_meta = sa.MetaData(bind=pg_conn)
pg_meta.reflect(bind=pg_conn)
print(pg_meta.tables.keys())

pd.read_sql('table_1', con=pg_conn)

dict_keys(['table_1'])


Unnamed: 0,pk_1,column_1,column_2
0,1,string_1,1.0
1,2,string_2,2.0
2,4,string_4,4.0
3,3,string_3,3.0


## Method

Rollback on an insertion of a duplicate primary key.

In [47]:
try:
    trans = pg_conn.begin()

    pg_op.bulk_insert(table=pg_table_1, rows=[{'pk_1':5, 'column_1':'string_5', 'column_2':5.0}])
    # OK

    pg_op.bulk_insert(table=pg_table_1, rows=[{'pk_1':5, 'column_1':'string_6', 'column_2':6.0}])
    #  Not OK.                                        #  Duplicate primary key.

    trans.commit()
except BaseException as e:
    trans.rollback()
finally:
    # pg_conn.close()
    pass

pg_meta = sa.MetaData(bind=pg_conn)
pg_meta.reflect(bind=pg_conn)
print(pg_meta.tables.keys())

df_pg_table_1 = pd.read_sql('table_1', con=pg_conn)

df_pg_table_1

dict_keys(['table_1'])


Unnamed: 0,pk_1,column_1,column_2
0,1,string_1,1.0
1,2,string_2,2.0
2,4,string_4,4.0
3,3,string_3,3.0


## Method
Cast a column from Numeric to Integer.

In [48]:
df_pg_table_1.dtypes

pk_1          int64
column_1     object
column_2    float64
dtype: object

In [49]:
try:
    trans = pg_conn.begin()

    pg_op.add_column(
    table_name='table_1', 
    column=sa.schema.Column(name='temp_column_2', type_=sa.types.Integer())
    )

    pg_meta = sa.MetaData(bind=pg_conn)
    pg_meta.reflect(bind=pg_conn)
    pg_table_1 = pg_meta.tables['table_1']
    #  GOTCHA:  Must do a reflection in order to get the changes to the table to show up in pg_table_1.

    cast_statement = sa.cast(pg_table_1.c.column_2, sa.types.Integer())

    update_cast_statement = pg_table_1.update().values(**{'temp_column_2':cast_statement})

    print(update_cast_statement)

    pg_conn.execute(update_cast_statement)
    
    pg_op.drop_column(table_name='table_1', column_name='column_2')

    pg_op.alter_column(
    table_name='table_1', 
    column_name='temp_column_2', 
    new_column_name='column_2'
    )

    trans.commit()
except BaseException as e:
    trans.rollback()
    raise e
finally:
    df_pg_table_1 = pd.read_sql('table_1', con=pg_conn)
    pg_conn.close()

df_pg_table_1

UPDATE table_1 SET temp_column_2=CAST(table_1.column_2 AS INTEGER)


Unnamed: 0,pk_1,column_1,column_2
0,1,string_1,1
1,2,string_2,2
2,4,string_4,4
3,3,string_3,3


In [50]:
df_pg_table_1.dtypes

pk_1         int64
column_1    object
column_2     int64
dtype: object

## Results

In [51]:
sl_conn.close()
pg_conn.close()
!docker stop postgres
!docker container rm postgres
if 'main.db' in os.listdir('./sqlite/db/'): os.remove('./sqlite/db/main.db')

postgres
postgres
