 - write table using google connection
 - write table using regular ip connection
 - restart the db? 

## Using Google connection with Dagster

In [None]:
%%time
import numpy as np
import pandas as pd

df = pd.DataFrame(np.random.randint(0, 100, size=(100, 4)), columns=list("ABCD"))

from usage_metrics.resources.postgres import PostgresManager

engine = PostgresManager()._create_engine()

with engine.connect() as con:
    df.to_sql("new_table", con, if_exists="replace")

## Using regular sql alchemy connection

In [None]:
%%time
import os

import sqlalchemy as sa


def get_sql_engine() -> sa.engine.Engine:
    """Create a sql alchemy engine from environment vars."""
    user = os.environ["POSTGRES_USER"]
    password = os.environ["POSTGRES_PASS"]
    db = os.environ["POSTGRES_DB"]
    db_ip = "35.193.54.179"
    return sa.create_engine(f"postgresql://{user}:{password}@{db_ip}:5432/{db}")


engine = get_sql_engine()

with engine.connect() as con:
    df.to_sql("new_table", con, if_exists="replace")

In [None]:
engine

## Using Google connection **without** Dagster Resource

In [None]:
import csv
from io import StringIO

import pg8000
from google.cloud.sql.connector import Connector


def _init_connection_engine() -> sa.engine.Engine:
    """Create a SqlAlchemy engine using Cloud SQL connection client."""
    connector = Connector()

    def getconn() -> pg8000.dbapi.Connection:
        with Connector() as connector:
            conn: pg8000.dbapi.Connection = connector.connect(
                os.environ["POSTGRES_CONNECTION_NAME"],
                "pg8000",
                user=os.environ["POSTGRES_USER"],
                password=os.environ["POSTGRES_PASS"],
                db=os.environ["POSTGRES_DB"],
                enable_iam_auth=True,
            )
        return conn

    engine = sa.create_engine(
        "postgresql+pg8000://",
        creator=getconn,
    )
    engine.dialect.description_encoding = None
    return engine


def psql_insert_copy(table, conn, keys, data_iter):
    """Execute SQL statement inserting data

    Parameters
    ----------
    table : pandas.io.sql.SQLTable
    conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
    keys : list of str
        Column names
    data_iter : Iterable that iterates the values to be inserted
    """
    # gets a DBAPI connection that can provide a cursor
    dbapi_conn = conn.connection
    with dbapi_conn as cur:
        s_buf = StringIO()
        writer = csv.writer(s_buf)
        writer.writerows(data_iter)
        s_buf.seek(0)

        columns = ", ".join([f'"{k}"' for k in keys])
        if table.schema:
            table_name = f"{table.schema}.{table.name}"
        else:
            table_name = table.name

        sql = f"COPY {table_name} ({columns}) FROM STDIN WITH CSV"
        cur.copy_expert(sql=sql, file=s_buf)

In [None]:
with engine.connect() as con:
    print(con)

In [None]:
%%time

engine = _init_connection_engine()

with engine.connect() as con:
    df.to_sql("new_table", con, if_exists="replace", method=psql_insert_copy)

In [None]:
%%time
import numpy as np
import pandas as pd


def psql_insert_copy(table, conn, keys, data_iter):
    """Execute SQL statement inserting data

    Parameters
    ----------
    table : pandas.io.sql.SQLTable
    conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
    keys : list of str
        Column names
    data_iter : Iterable that iterates the values to be inserted
    """
    # gets a DBAPI connection that can provide a cursor
    dbapi_conn = conn.connection
    with dbapi_conn.cursor() as cur:
        s_buf = StringIO()
        writer = csv.writer(s_buf)
        writer.writerows(data_iter)
        s_buf.seek(0)

        columns = ", ".join([f'"{k}"' for k in keys])
        if table.schema:
            table_name = f"{table.schema}.{table.name}"
        else:
            table_name = table.name

        sql = f"COPY {table_name} ({columns}) FROM STDIN WITH CSV"
        cur.copy_expert(sql=sql, file=s_buf)


df = pd.DataFrame(np.random.randint(0, 100, size=(100, 4)), columns=list("ABCD"))


df.to_sql(
    name="new_table",
    con=engine,
    if_exists="replace",
    chunksize=10,
    method=psql_insert_copy,
)