# SQLAlchemy Data Model from Reflection

In [1]:
import psycopg2
import pandas as pd
import numpy as np
import sqlalchemy
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import Session

from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import mapper, relationship
from sqlalchemy import inspect, Table, MetaData, Column, Integer, String, ForeignKey
from sqlalchemy import text

In [2]:
pg_driver = "postgresql+psycopg2://"
pg_uri = "user=pbatch password=Password1 host=ds port=5432 dbname=pagila"
pg_schema = 'public'
def pg_creator():
    return psycopg2.connect(pg_uri)
    #return psycopg2.connect(user="pbatch", password="Password1", host="ds", port="5432", database="pagila")
pg_engine = sqlalchemy.create_engine(pg_driver, creator=pg_creator)
pg_Base = automap_base()
pg_Base.prepare(pg_engine, schema=pg_schema, reflect=True)
pg_session = Session(pg_engine)

In [3]:
metadata = MetaData()
metadata.reflect(bind=pg_engine)

In [4]:
list_tables = ['country', 'city', 'address', 'store', 'customer']

In [5]:
MAPPERS = {}
repr_name = lambda t: '%s%s' % (t[0].upper(), t[1:]) # change the first charactor of the string to upper case

In [6]:
for table_name in list_tables:
    table = metadata.tables[table_name]
    cls = None
    # 1. create class object
    cls_name = repr_name(str(table))
    print(f"create class {cls_name}")
    exec("""class %s(object): pass""" % cls_name)
    exec("""cls = %s""" % cls_name)

    mapper(cls, table, properties={})
    MAPPERS.update({cls_name: cls})
    '''
    # 2. collect relations by FK
    properties = {}
    #for c in metadata.tables[table].columns:
    for c in table.columns:
        for fk in c.foreign_keys:
            name = str(fk.column).split('.')[0]
            print(f"    foreign key column {name} = {MAPPERS[repr_name(name)]}")
            properties.update({
                name: relationship(lambda: MAPPERS[repr_name(name)]),
            })

    print(f"    foreign key properties {properties}")
    # 3. map table to class object 
    mapper(cls, table, properties=properties)

    MAPPERS.update({cls_name: cls})
    '''

create class Country
create class City
create class Address
create class Store
create class Customer


In [7]:
MAPPERS

{'Country': __main__.Country,
 'City': __main__.City,
 'Address': __main__.Address,
 'Store': __main__.Store,
 'Customer': __main__.Customer}

In [8]:
insp = inspect(Address)
list(insp.columns)

[Column('address_id', INTEGER(), table=<address>, primary_key=True, nullable=False, server_default=DefaultClause(<sqlalchemy.sql.elements.TextClause object at 0x7f6ac71b7430>, for_update=False)),
 Column('address', TEXT(), table=<address>, nullable=False),
 Column('address2', TEXT(), table=<address>),
 Column('district', TEXT(), table=<address>, nullable=False),
 Column('city_id', SMALLINT(), ForeignKey('city.city_id'), table=<address>, nullable=False),
 Column('postal_code', TEXT(), table=<address>),
 Column('phone', TEXT(), table=<address>, nullable=False),
 Column('last_update', TIMESTAMP(timezone=True), table=<address>, nullable=False, server_default=DefaultClause(<sqlalchemy.sql.elements.TextClause object at 0x7f6ac71b7760>, for_update=False))]

In [9]:
insp = inspect(City)
list(insp.columns)

[Column('city_id', INTEGER(), table=<city>, primary_key=True, nullable=False, server_default=DefaultClause(<sqlalchemy.sql.elements.TextClause object at 0x7f6ac71b7e50>, for_update=False)),
 Column('city', TEXT(), table=<city>, nullable=False),
 Column('country_id', SMALLINT(), ForeignKey('country.country_id'), table=<city>, nullable=False),
 Column('last_update', TIMESTAMP(timezone=True), table=<city>, nullable=False, server_default=DefaultClause(<sqlalchemy.sql.elements.TextClause object at 0x7f6ac71b7dc0>, for_update=False))]

In [10]:
table = metadata.tables['address']

In [11]:
list(table.columns.city_id.foreign_keys)[0]._colspec

'city.city_id'

In [12]:
data = pg_session.query(Address).filter_by(city_id=1)

In [13]:
tables = dict()
for t in list_tables:
    table_class = MAPPERS[repr_name(t)]
    data = pg_session.query(table_class)
    insp = inspect(table_class)
    tables[t] = (data, insp)
    #for rec in data:
    #    print("\t".join([str(getattr(rec, field)) for field in insp.column_attrs.keys()]))
    

In [14]:
table_name = "customer"
for rec in tables[table_name][0]:
    print("\t".join([str(getattr(rec, field)) for field in tables[table_name][1].column_attrs.keys()]))

1	1	MARY	SMITH	MARY.SMITH@sakilacustomer.org	5	True	2020-02-14	2020-02-15 20:57:20+11:00	1
2	1	PATRICIA	JOHNSON	PATRICIA.JOHNSON@sakilacustomer.org	6	True	2020-02-14	2020-02-15 20:57:20+11:00	1
3	1	LINDA	WILLIAMS	LINDA.WILLIAMS@sakilacustomer.org	7	True	2020-02-14	2020-02-15 20:57:20+11:00	1
4	2	BARBARA	JONES	BARBARA.JONES@sakilacustomer.org	8	True	2020-02-14	2020-02-15 20:57:20+11:00	1
5	1	ELIZABETH	BROWN	ELIZABETH.BROWN@sakilacustomer.org	9	True	2020-02-14	2020-02-15 20:57:20+11:00	1
6	2	JENNIFER	DAVIS	JENNIFER.DAVIS@sakilacustomer.org	10	True	2020-02-14	2020-02-15 20:57:20+11:00	1
7	1	MARIA	MILLER	MARIA.MILLER@sakilacustomer.org	11	True	2020-02-14	2020-02-15 20:57:20+11:00	1
8	2	SUSAN	WILSON	SUSAN.WILSON@sakilacustomer.org	12	True	2020-02-14	2020-02-15 20:57:20+11:00	1
9	2	MARGARET	MOORE	MARGARET.MOORE@sakilacustomer.org	13	True	2020-02-14	2020-02-15 20:57:20+11:00	1
10	1	DOROTHY	TAYLOR	DOROTHY.TAYLOR@sakilacustomer.org	14	True	2020-02-14	2020-02-15 20:57:20+11:00	1
11	2	LISA	ANDERS

In [15]:
customer_ret = pg_session.query(Customer).filter_by(first_name='JOYCE')
vars(customer_ret[0])

{'_sa_instance_state': <sqlalchemy.orm.state.InstanceState at 0x7f6ac6e57b20>,
 'address_id': 53,
 'last_name': 'EDWARDS',
 'store_id': 2,
 'customer_id': 49,
 'last_update': datetime.datetime(2020, 2, 15, 20, 57, 20, tzinfo=psycopg2.tz.FixedOffsetTimezone(offset=660, name=None)),
 'activebool': True,
 'email': 'JOYCE.EDWARDS@sakilacustomer.org',
 'first_name': 'JOYCE',
 'active': 1,
 'create_date': datetime.date(2020, 2, 14)}

In [16]:
store_ret = pg_session.query(Store).filter_by(store_id=customer_ret[0].store_id)
vars(store_ret[0])

{'_sa_instance_state': <sqlalchemy.orm.state.InstanceState at 0x7f6ac6e57490>,
 'manager_staff_id': 2,
 'store_id': 2,
 'last_update': datetime.datetime(2020, 2, 15, 20, 57, 12, tzinfo=psycopg2.tz.FixedOffsetTimezone(offset=660, name=None)),
 'address_id': 2}

In [17]:
address_ret = pg_session.query(Address).filter_by(address_id=store_ret[0].address_id)
vars(address_ret[0])

{'_sa_instance_state': <sqlalchemy.orm.state.InstanceState at 0x7f6ac6f304c0>,
 'address': '28 MySQL Boulevard',
 'phone': '',
 'city_id': 576,
 'address2': None,
 'address_id': 2,
 'last_update': datetime.datetime(2020, 2, 15, 20, 45, 30, tzinfo=psycopg2.tz.FixedOffsetTimezone(offset=660, name=None)),
 'postal_code': '',
 'district': 'QLD'}

In [18]:
city_ret = pg_session.query(City).filter_by(city_id=address_ret[0].city_id)
vars(city_ret[0])

{'_sa_instance_state': <sqlalchemy.orm.state.InstanceState at 0x7f6ac6f6fc40>,
 'last_update': datetime.datetime(2020, 2, 15, 20, 45, 25, tzinfo=psycopg2.tz.FixedOffsetTimezone(offset=660, name=None)),
 'country_id': 8,
 'city': 'Woodridge',
 'city_id': 576}

In [19]:
country_ret = pg_session.query(Country).filter_by(country_id=city_ret[0].country_id)
vars(country_ret[0])

{'_sa_instance_state': <sqlalchemy.orm.state.InstanceState at 0x7f6ac70f2280>,
 'country_id': 8,
 'last_update': datetime.datetime(2020, 2, 15, 20, 44, tzinfo=psycopg2.tz.FixedOffsetTimezone(offset=660, name=None)),
 'country': 'Australia'}

# Raw SQL

In [20]:
# engine level reflection
inspector = inspect(pg_engine)

In [21]:
inspector.get_columns('payment')

[{'name': 'payment_id',
  'type': INTEGER(),
  'nullable': False,
  'default': "nextval('payment_payment_id_seq'::regclass)",
  'autoincrement': True,
  'comment': None},
 {'name': 'customer_id',
  'type': SMALLINT(),
  'nullable': False,
  'default': None,
  'autoincrement': False,
  'comment': None},
 {'name': 'staff_id',
  'type': SMALLINT(),
  'nullable': False,
  'default': None,
  'autoincrement': False,
  'comment': None},
 {'name': 'rental_id',
  'type': INTEGER(),
  'nullable': False,
  'default': None,
  'autoincrement': False,
  'comment': None},
 {'name': 'amount',
  'type': NUMERIC(precision=5, scale=2),
  'nullable': False,
  'default': None,
  'autoincrement': False,
  'comment': None},
 {'name': 'payment_date',
  'type': TIMESTAMP(timezone=True),
  'nullable': False,
  'default': None,
  'autoincrement': False,
  'comment': None}]

In [27]:
select_statement = text("""
    select p.payment_id, 
           concat(cu.first_name, ' ', cu.last_name) "name" 
    from payment p, customer cu 
    where cu.customer_id = p.customer_id 
    order by p.payment_id desc 
    limit 10""")

### select

In [28]:
data = list()
with pg_engine.connect() as con:
    rs = con.execute(select_statement)
    table_fields = rs.keys()
    for row in rs:
        row_dict = dict(zip(rs.keys(), row))
        data.append(row_dict)
print(table_fields)
for _ in data:
    print(_)

['payment_id', 'name']
{'payment_id': 32098, 'name': 'GWENDOLYN MAY'}
{'payment_id': 32097, 'name': 'HILDA HOPKINS'}
{'payment_id': 32096, 'name': 'MATTIE HOFFMAN'}
{'payment_id': 32095, 'name': 'VICKIE BREWER'}
{'payment_id': 32094, 'name': 'COURTNEY DAY'}
{'payment_id': 32093, 'name': 'VIOLA HANSON'}
{'payment_id': 32092, 'name': 'MARCIA DEAN'}
{'payment_id': 32091, 'name': 'CLAUDIA FULLER'}
{'payment_id': 32090, 'name': 'TAMARA NGUYEN'}
{'payment_id': 32089, 'name': 'ALLISON STANLEY'}


### insert

In [24]:
select_statement = text("""select * from payment order by payment_id desc limit 1""")
data = list()
with pg_engine.connect() as con:
    rs = con.execute(select_statement)
    table_fields = rs.keys()
    for row in rs:
        row_dict = dict(zip(rs.keys(), row))
        data.append(row_dict)
print("select:", table_fields, data)

with pg_engine.connect() as con:
    insert_statement = text(f"INSERT INTO payment(customer_id, staff_id, rental_id, amount, payment_date) VALUES(:customer_id, :staff_id, :rental_id, :amount, :payment_date)")
    for line in data:
        ret = con.execute(insert_statement, **line)

select: ['payment_id', 'customer_id', 'staff_id', 'rental_id', 'amount', 'payment_date'] [{'payment_id': 32098, 'customer_id': 264, 'staff_id': 2, 'rental_id': 14243, 'amount': Decimal('2.99'), 'payment_date': datetime.datetime(2020, 5, 14, 22, 44, 29, 996577, tzinfo=psycopg2.tz.FixedOffsetTimezone(offset=600, name=None))}]


### delete

In [25]:
with pg_engine.connect() as con:
    delete_statement = text("""delete from payment where payment_id = (select max(payment_id) from payment)""")
    ret = con.execute(delete_statement)