# Description

This notebook was created to research how to keep a connection open when using it in a generator for multiple executions.

In `pangres` we have always passed the engine pretty much everywhere and we were created lots of connections, even making one connection per chunk.

When using connections everywhere, everything works except for when we want to iterate over results of upserts. See for instance [issue 32 of pangres](https://github.com/ThibTrip/pangres/issues/32).

In [1]:
from sqlalchemy import create_engine, text

# Functions that reproduce my library pangres' workflow

In [2]:
def create_table(connection, table_name):
    ddl = text(f"CREATE TABLE IF NOT EXISTS {table_name} (id BIGINT PRIMARY KEY, name TEXT);")
    connection.execute(ddl)


def commit_if_needed(connection):
    """
    Calls method `commit` of sqlalchemy connection if available (for future versions of sqlalchemy
    or for when engine is create with argument `future=True`)
    """
    if hasattr(connection, 'commit'):
        connection.commit()


def generate_upsert_query(table_name, future:bool=False) -> (str, dict):
    query = f"""INSERT INTO {table_name} (id, name) VALUES (:id0, :name0), (:id1, :name1)
                ON CONFLICT (id) DO UPDATE SET name=EXCLUDED.name"""
    parameters = dict(id0=0, name0='Foo', id1=1, name1='Bar')
    parameters = {'parameters':parameters} if future else parameters
    return query, parameters


def execute_upsert(table_name:str, connection, yield_chunks:bool=False, future:bool=False):
    query, parameters = generate_upsert_query(table_name=table_name, future=future)
    if not yield_chunks:
        for _ in range(10):
            connection.execute(text(query), **parameters)
            commit_if_needed(connection)
    else:
        def as_generator():
            print('entering upsert execution with generator')
            for _ in range(10):
                result = connection.execute(text(query), **parameters)
                commit_if_needed(connection)
                yield result
            print('leaving upsert execution with generator')
        return as_generator()


def upsert(engine, table_name:str, yield_chunks:bool=False, simulate_error:bool=False, future:bool=False):
    print('entering')  # yeah I know that's not a context manager but that's the same principle here
    connection = engine.connect()
    with connection.begin():  # run inside a transaction
        create_table(connection=connection, table_name=table_name)
        if not yield_chunks:
            try:
                if simulate_error:
                    raise ValueError('an error was raised')
                return execute_upsert(table_name=table_name,
                                      connection=connection,
                                      yield_chunks=yield_chunks,
                                      future=future)
            finally:
                print('closing')
                connection.close()
        else:
            def as_generator():
                gen = execute_upsert(table_name=table_name,
                                     connection=connection,
                                     yield_chunks=yield_chunks,
                                     future=future)
                try:
                    print('entering generator within main function')
                    for result in gen:
                        if simulate_error:
                            raise ValueError('an error was raised')
                        yield result
                finally:
                    print('closing generator within main function')
                    connection.close()
            return as_generator()

# Tests with `yield_chunks=True` and with `yield_chunks=False`

And with errors as well

In [3]:
future = False
engine = create_engine('sqlite:///:memory:', future=future)
upsert(engine=engine, table_name='test', yield_chunks=False, future=future)

entering
closing


In [4]:
for i in upsert(engine=engine, table_name='test', yield_chunks=True, future=future):
    print(f'{i.rowcount} rows updated')

entering
entering generator within main function
entering upsert execution with generator
2 rows updated
2 rows updated
2 rows updated
2 rows updated
2 rows updated
2 rows updated
2 rows updated
2 rows updated
2 rows updated
2 rows updated
leaving upsert execution with generator
closing generator within main function


In [5]:
engine = create_engine('sqlite:///:memory:')
try:
    upsert(engine=engine, table_name='test', yield_chunks=False, simulate_error=True, future=future)
except Exception as e:
    print(e)

entering
closing
an error was raised


In [6]:
try:
    for i in upsert(engine=engine, table_name='test', yield_chunks=True, simulate_error=True, future=future):
        print(f'{i.rowcount} rows updated')
except Exception as e:
    print(e)

entering
entering generator within main function
entering upsert execution with generator
closing generator within main function
an error was raised
