In [1]:
import matplotlib.pyplot as plt
import matplotlib as mpl
import numpy as np
import pandas as pd
import statsmodels.api as sm
from scipy import stats

In [2]:
import sqlalchemy as alc
import pandas as pd
import os


# add run method to engine
def run(self: alc.engine.Engine, sql: str) -> pd.DataFrame | None:
    with self.begin() as conn:
        res = conn.execute(alc.text(sql))
        if res.returns_rows:
            return pd.DataFrame(res.all(), columns=res.keys())


alc.engine.Engine.run = run

connection_string = "postgresql://test_db_user:1234@localhost/test_db"
# eng = alc.create_engine(connection_string, echo=True)
eng = alc.create_engine(connection_string)

### specs

In [3]:
df = pd.read_csv("taxis.csv")
df['pk'] = df.index
df['pickup'] = pd.to_datetime(df['pickup'])
df['pickup_dates'] = df['pickup'].dt.date
df['dropoff'] = pd.to_datetime(df['dropoff'])
df['passenger_is_1'] = df['passengers'] == 1
df.head()

Unnamed: 0,pickup,dropoff,passengers,distance,fare,tip,tolls,total,color,payment,pickup_zone,dropoff_zone,pickup_borough,dropoff_borough,pk,pickup_dates,passenger_is_1
0,2019-03-23 20:21:09,2019-03-23 20:27:24,1,1.6,7.0,2.15,0.0,12.95,yellow,credit card,Lenox Hill West,UN/Turtle Bay South,Manhattan,Manhattan,0,2019-03-23,True
1,2019-03-04 16:11:55,2019-03-04 16:19:00,1,0.79,5.0,0.0,0.0,9.3,yellow,cash,Upper West Side South,Upper West Side South,Manhattan,Manhattan,1,2019-03-04,True
2,2019-03-27 17:53:01,2019-03-27 18:00:25,1,1.37,7.5,2.36,0.0,14.16,yellow,credit card,Alphabet City,West Village,Manhattan,Manhattan,2,2019-03-27,True
3,2019-03-10 01:23:59,2019-03-10 01:49:51,1,7.7,27.0,6.15,0.0,36.95,yellow,credit card,Hudson Sq,Yorkville West,Manhattan,Manhattan,3,2019-03-10,True
4,2019-03-30 13:27:42,2019-03-30 13:37:14,3,2.16,9.0,1.1,0.0,13.4,yellow,credit card,Midtown East,Yorkville West,Manhattan,Manhattan,4,2019-03-30,False


### test

In [4]:
from pg_inserter import PgInserter

try:
    eng.run("drop table test_schema.taxi")
except Exception as e:
    print(f"Table does not exist.")

qry = f"""
CREATE TABLE test_schema.taxi (
    pk BIGINT PRIMARY KEY,
	pickup TIMESTAMP WITHOUT TIME ZONE, 
	dropoff TIMESTAMP WITHOUT TIME ZONE, 
	passengers BIGINT, 
	distance FLOAT(53), 
	fare FLOAT(53), 
	tip FLOAT(53), 
	tolls FLOAT(53), 
	total FLOAT(53), 
	color TEXT, 
	payment TEXT, 
	pickup_zone TEXT, 
	dropoff_zone TEXT, 
	pickup_borough TEXT, 
	dropoff_borough TEXT, 
	pickup_dates DATE, 
	passenger_is_1 BOOLEAN
)
"""
eng.run(qry)

pi = PgInserter(eng, "test_schema.taxi", ["pk"])
pi.insert_non_duplicates(df=df.query(f"pk <= 1000"))
pi.insert_non_duplicates(df=df.query(f"pk > 950"), potential_dups_condition="pk > 700")

In [5]:
qry = f"""
select
    *
from test_schema.taxi
"""

df = eng.run(qry)
assert len(df['pk'].unique()) == df.shape[0]

### funcs

In [7]:
def _parse_schema_table_name(sch_tbl_name:str) -> tuple[str, str]:
    """
    Examples::

    """
    try:
        clean = sch_tbl_name.lower()
        items = clean.split(".")
    except AttributeError:
        schema_name = None
        table_name = None
    else:
        n = len(items)
        if n == 1:
            schema_name = None
            table_name = items[0]
        elif n == 2:
            schema_name = items[0]
            table_name = items[1]
        else:
            schema_name = items[0]
            table_name = ".".join(items[1:n])
    return schema_name, table_name

print(f"{_parse_schema_table_name('ab') = }")
print(f"{_parse_schema_table_name('Ab') = }")
print(f"{_parse_schema_table_name('ab.ab') = }")
print(f"{_parse_schema_table_name('Ab.Ab') = }")
print(f"{_parse_schema_table_name('Ab.Ab.Ab') = }")
print(f"{_parse_schema_table_name('') = }")
print(f"{_parse_schema_table_name(None) = }")
print(f"{_parse_schema_table_name(3) = }")

_parse_schema_table_name('ab') = (None, 'ab')
_parse_schema_table_name('Ab') = (None, 'ab')
_parse_schema_table_name('ab.ab') = ('ab', 'ab')
_parse_schema_table_name('Ab.Ab') = ('ab', 'ab')
_parse_schema_table_name('Ab.Ab.Ab') = ('ab', 'ab.ab')
_parse_schema_table_name('') = (None, '')
_parse_schema_table_name(None) = (None, None)
_parse_schema_table_name(3) = (None, None)


In [74]:
def _guess_pg_dtype(dtype: type) -> type:
    if dtype is None:
        raise ValueError(f"Invalid dtype: {dtype}")
    elif np.issubdtype(dtype, np.int64):
        return alc.dialects.postgresql.BIGINT
    elif np.issubdtype(dtype, np.bool_):
        return alc.dialects.postgresql.BOOLEAN
    elif np.issubdtype(dtype, np.float64):
        return alc.Float(8)
    elif np.issubdtype(dtype, np.int64):
        return alc.Integer
    elif np.issubdtype(dtype, np.float64):
        return alc.Float
    elif np.issubdtype(dtype, np.datetime64):
        return alc.DateTime
    else:
        return alc.String

lst = []
for col in df.columns:
    dtype = df[col].dtype
    pg_dtype = _guess_pg_dtype(dtype)
    lst.append((col, dtype, pg_dtype))

print(pd.DataFrame(lst, columns=["Column", "Pandas Dtype", "PostgreSQL Dtype"]).to_string())

             Column    Pandas Dtype                            PostgreSQL Dtype
0            pickup  datetime64[ns]  <class 'sqlalchemy.sql.sqltypes.DateTime'>
1           dropoff  datetime64[ns]  <class 'sqlalchemy.sql.sqltypes.DateTime'>
2        passengers           int64    <class 'sqlalchemy.sql.sqltypes.BIGINT'>
3          distance         float64                                       FLOAT
4              fare         float64                                       FLOAT
5               tip         float64                                       FLOAT
6             tolls         float64                                       FLOAT
7             total         float64                                       FLOAT
8             color          object    <class 'sqlalchemy.sql.sqltypes.String'>
9           payment          object    <class 'sqlalchemy.sql.sqltypes.String'>
10      pickup_zone          object    <class 'sqlalchemy.sql.sqltypes.String'>
11     dropoff_zone          object    <

### creation and insertion

In [6]:
qry = f"""
select table_catalog,table_schema,table_name
    ,is_insertable_into,commit_action
from information_schema.tables
where table_schema
   not in ('information_schema','pg_catalog')
"""

df = eng.run(qry)
df

Unnamed: 0,table_catalog,table_schema,table_name,is_insertable_into,commit_action
0,test_db,public,test_schema.taxi,YES,
1,test_db,test_schema,taxi,YES,


In [7]:
eng.run("create schema test_schema")

In [105]:
# drop table
tbl.drop(eng)

2025-08-09 19:20:18,136 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2025-08-09 19:20:18,136 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-08-09 19:20:18,137 INFO sqlalchemy.engine.Engine select current_schema()
2025-08-09 19:20:18,137 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-08-09 19:20:18,139 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2025-08-09 19:20:18,140 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-08-09 19:20:18,141 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-08-09 19:20:18,142 INFO sqlalchemy.engine.Engine 
DROP TABLE test_schema.taxi
2025-08-09 19:20:18,142 INFO sqlalchemy.engine.Engine [no key 0.00045s] {}
2025-08-09 19:20:18,145 INFO sqlalchemy.engine.Engine COMMIT


In [88]:
# create table
df2:pd.DataFrame = df.copy()
schema_table_name = "test_schema.taxi"
eng = eng

for col in df2.columns:
    assert col.islower() and '.' not in col, f"Clean up col names: '{col}'. Need lower case and no '.'"

schema_name, table_name = _parse_schema_table_name(schema_table_name)
meta = alc.MetaData(schema=schema_name)

tbl = alc.Table(
    table_name,
    meta,
    *[alc.Column(col, _guess_pg_dtype(df2[col].dtype)) for col in df2.columns],
    schema=schema_name,
)

tbl.create(eng)

In [89]:
for col in df2.columns:
    if np.issubdtype(df2[col].dtype, np.datetime64) or np.issubdtype(
        df2[col].dtype, np.float64
    ):
        df2[col] = df2[col].astype(object).where(~df2[col].isna(), None)

df2_dict = df2.to_dict("records")
with eng.begin() as conn:
    conn.execute(
        tbl.insert(),
        df2_dict,
    )

In [90]:
eng.run("select count(*) as n from test_schema.taxi").iloc[0,0]

np.int64(6433)

In [107]:
_ = eng.run("select * from test_schema.taxi limit 5")
print(f"{_.dtypes = }")
print(_.head().to_string())

2025-08-09 19:22:45,055 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-08-09 19:22:45,056 INFO sqlalchemy.engine.Engine select * from test_schema.taxi limit 5
2025-08-09 19:22:45,057 INFO sqlalchemy.engine.Engine [generated in 0.00087s] {}
2025-08-09 19:22:45,061 INFO sqlalchemy.engine.Engine COMMIT
_.dtypes = pickup             datetime64[ns]
dropoff            datetime64[ns]
passengers                  int64
distance                  float64
fare                      float64
tip                       float64
tolls                     float64
total                     float64
color                      object
payment                    object
pickup_zone                object
dropoff_zone               object
pickup_borough             object
dropoff_borough            object
pickup_dates               object
passenger_is_1               bool
dtype: object
               pickup             dropoff  passengers  distance  fare   tip  tolls  total   color      payment            pic

In [108]:
_['pickup_dates'].iloc[0]

datetime.date(2019, 3, 23)

### pd to sql

In [106]:
df3:pd.DataFrame = df.copy()
df3.to_sql("taxi", con=eng, schema="test_schema", if_exists="replace", index=False)

2025-08-09 19:21:06,496 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-08-09 19:21:06,503 INFO sqlalchemy.engine.Engine SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relname = %(table_name)s AND pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param_5)s]) AND pg_catalog.pg_namespace.nspname = %(nspname_1)s
2025-08-09 19:21:06,503 INFO sqlalchemy.engine.Engine [generated in 0.00115s] {'table_name': 'taxi', 'param_1': 'r', 'param_2': 'p', 'param_3': 'f', 'param_4': 'v', 'param_5': 'm', 'nspname_1': 'test_schema'}
2025-08-09 19:21:06,512 INFO sqlalchemy.engine.Engine 
CREATE TABLE test_schema.taxi (
	pickup TIMESTAMP WITHOUT TIME ZONE, 
	dropoff TIMESTAMP WITHOUT TIME ZONE, 
	passengers BIGINT, 
	distance FLOAT(53), 
	fare FLOAT(53), 
	tip FLOAT(53), 
	tolls FLOAT(53), 
	total FLOAT(53), 
	color TEXT, 
	

433

### check table pk info

In [122]:
eng.run("drop table if exists test_schema.taxi")

In [123]:
df.query(f"pk <= 1000").to_sql("taxi", con=eng, schema="test_schema", if_exists="replace", index=False)
eng.run("select count(*) as n from test_schema.taxi").iloc[0,0]

np.int64(1001)

In [124]:
df2:pd.DataFrame = df.query(f"950 <= pk <= 2000").copy()
pkeys:list = ['pk']
where = f"pk >= 900"
table_name = "test_schema.taxi"

where_cls = f"\nwhere {where}" if where else ""
schema_name, table_name = _parse_schema_table_name(table_name)
db_tbl:pd.DataFrame = eng.run(f"select {','.join(pkeys)} from {schema_name}.{table_name} {where_cls}")

if not db_tbl.empty and not df2.empty:
    dedup_tbl = (
        df2.merge(
            db_tbl, how="left", on=pkeys, validate="one_to_one", indicator=True
        )
        .loc[lambda x: x._merge == "left_only", :]
        .drop(columns="_merge")
    )
else:
    dedup_tbl = df2

dedup_tbl.to_sql(table_name, con=eng, schema=schema_name, if_exists="append", index=False)

1000

In [125]:
eng.run("select count(*) as n from test_schema.taxi").iloc[0,0]

np.int64(2001)