# Description

This notebook shows what you should be aware of when using `pangres.aupsert`, the asynchronous variant of `pangres.upsert`.

I had hypotheses on what was going to happen with such asynchronous workflows and this notebook was one of the ways I used to check them.

The notes below are merely observations of what happened in my situation. You might get something slightly different.

The point is to avoid such race conditions like you see below. You can do that by **separating statements** (make sure they are alone) **that can cause structural changes**.


Reminder: `pangres.aupsert` requires an **asynchronous engine or an asynchronous connection**
(a.k.a. an asynchronous connectable) obtainable with `sqlalchemy>=1.4`.

In [1]:
import asyncio
import pandas as pd
import traceback
from enum import Enum
from pangres import aupsert
from sqlalchemy import text
from sqlalchemy.ext.asyncio import create_async_engine

# IPython "patch" for async
import nest_asyncio  # pip install nest_asyncio
nest_asyncio.apply()

# hide traceback for privacy reasons
import sys
sys.tracebacklimit = 0

# for informative purposes, this is automatically imported in IPython
from IPython.display import display

# Config

Change the connection string accordingly and do not forget to install the necessary driver:

```
pip install aiosqlite # SQLite
pip install aiomysql # MySQL
pip install asyncpg # PostgreSQL
```

In [2]:
# engine = create_async_engine('sqlite+aiosqlite://')  # in-memory SQLite
# engine = create_async_engine('sqlite+aiosqlite:///test.db')  # SQLite on disk
# engine = create_async_engine('mysql+aiomysql://username:password@localhost:3306/db')  # MySQL
engine = create_async_engine('postgresql+asyncpg://username:password@localhost:5432/postgres')  # PostgreSQL
schema = None  # set this to None if using something else than PostgreSQL!

# Helpers

In [3]:
async def adrop_schema(engine, schema):
    # this function is not relevant if it is not a postgres database
    if 'postgres' not in engine.dialect.dialect_description:
        return

    async with engine.connect() as connection:
        await connection.execute(text(f'DROP SCHEMA IF EXISTS {schema};'))
        await connection.commit()


async def adrop_table(engine, schema, table_name):
    namespace = f'{schema}.{table_name}' if schema is not None else table_name
    async with engine.connect() as connection:
        await connection.execute(text(f'DROP TABLE IF EXISTS {namespace};'))
        await connection.commit()


async def adisplay_table(engine, schema, table_name, index_col='id'):
    namespace = f'{schema}.{table_name}' if schema is not None else table_name
    async with engine.connect() as connection:
        proxy = await connection.execute(text(f'SELECT * FROM {namespace};'))
        results = proxy.all()
        if len(results) == 0:
            df = pd.DataFrame(columns=proxy.keys())
        else:
            df = pd.DataFrame(r._asdict() for r in results)
        df = df.set_index(index_col)
        display(df)


class TableNames(Enum):
    namespace_conflict = 'test_namespace_conflict'
    add_new_columns_conflict = 'test_new_column_conflict'
    alter_column_conflict = 'test_alter_conflict'
    upsert_conflict = 'test_upsert_conflict'

# Reset tests

For PostgreSQL we expect the `schema` to not exist or be empty. If it is not empty we will get an error.

In [4]:
async def reset_tests(engine, schema, drop_schema=True):
    # PostgreSQL defaults to public schema
    if schema is None and 'postgres' in engine.dialect.dialect_description:
        schema = 'public'
    for item in TableNames:
        await adrop_table(engine=engine, schema=schema, table_name=item.value)
    if drop_schema:
        await adrop_schema(engine=engine, schema=schema)

await reset_tests(engine=engine, schema=schema, drop_schema=True)

# 1. Structural changes

When using **multiple coroutines** at the same time with **any argument** that makes or is likely to make a **structural change** to the database or the table set to `True`, this can result in conflicts.

List of arguments that can cause such problems:
* `create_schema`
* `create_table` -> **`True`** by default!
* `add_new_columns`
* `adapt_dtype_of_empty_db_columns`

Here we will cause conflicts on purpose by executing 2 identical coroutines in parallel and trying the various arguments to make structural changes to the database or the table.

Note that we are catching the exceptions to print them here, otherwise the notebook wouldn't execute the whole way through (unless we run cell per cell obviously).
And in a Python program, it would cause an exit.

In [5]:
async def test_namespace_conflict(**ddl_kwargs):
    table_name = 'test_namespace_conflict'
    if 'postgres' not in engine.dialect.dialect_description and 'create_schema' in ddl_kwargs:
        print('Irrelevant test for given database type (schemas are a PostgreSQL feature)')
        return

    test_df = pd.DataFrame({'id':[0]}).set_index('id')  # start at 1 and not 0 to avoid MySQL creating a serial...
    await adrop_table(engine=engine, schema=schema, table_name=table_name)
    try:
        # execute 2 identical coroutines in parallel
        await asyncio.gather(aupsert(con=engine, schema=schema, df=test_df, table_name=table_name, if_row_exists='update', **ddl_kwargs),
                             aupsert(con=engine, schema=schema, df=test_df, table_name=table_name, if_row_exists='update', **ddl_kwargs))
    except Exception:
        traceback.print_exc()
    await adisplay_table(engine=engine, schema=schema, table_name=table_name)

## 1.1. `create_schema`

This test is **only relevant for PostgreSQL** (a schema is an additional hierarchical layer between databases and tables that kind of acts like a subdatabase).

In our example the two coroutines ask the database at roughly the same time if the PostgreSQL schema exists thus both get the the answer that it does not and both try to create the schema at the same time.

This results in an **exception** by one of the coroutine. The other coroutine does succeed though so the schema and table are created and the data gets inserted.

In [6]:
await test_namespace_conflict(create_schema=True)

asyncpg.exceptions.UniqueViolationError: duplicate key value violates unique constraint "pg_namespace_nspname_index"
DETAIL:  Key (nspname)=(public) already exists.

The above exception was the direct cause of the following exception:

sqlalchemy.dialects.postgresql.asyncpg.AsyncAdapt_asyncpg_dbapi.IntegrityError: <class 'asyncpg.exceptions.UniqueViolationError'>: duplicate key value violates unique constraint "pg_namespace_nspname_index"
DETAIL:  Key (nspname)=(public) already exists.

The above exception was the direct cause of the following exception:

sqlalchemy.exc.IntegrityError: (sqlalchemy.dialects.postgresql.asyncpg.IntegrityError) <class 'asyncpg.exceptions.UniqueViolationError'>: duplicate key value violates unique constraint "pg_namespace_nspname_index"
DETAIL:  Key (nspname)=(public) already exists.
[SQL: CREATE SCHEMA public]
(Background on this error at: https://sqlalche.me/e/14/gkpj)


0


## 1.2. `create_table`

Both coroutines get the answer that the table does not exist and both try to create it at the same time. Also here one of the two couroutines succeeds and the other one fails.

I have noticed a strange and systematic behavior with MySQL and with an in memory SQlite database: the coroutine that succeeds will roll back.
This is why we may end up with an empty table (the table creation should also be rolled back but MySQL does not support that and SQlite implementation in Python does not allow it either).

I'd be curious to read an explanation for this behavior if you have one.

In [7]:
await test_namespace_conflict(create_table=True)

asyncpg.exceptions.UniqueViolationError: duplicate key value violates unique constraint "pg_type_typname_nsp_index"
DETAIL:  Key (typname, typnamespace)=(test_namespace_conflict, 83945) already exists.

The above exception was the direct cause of the following exception:

sqlalchemy.dialects.postgresql.asyncpg.AsyncAdapt_asyncpg_dbapi.IntegrityError: <class 'asyncpg.exceptions.UniqueViolationError'>: duplicate key value violates unique constraint "pg_type_typname_nsp_index"
DETAIL:  Key (typname, typnamespace)=(test_namespace_conflict, 83945) already exists.

The above exception was the direct cause of the following exception:

sqlalchemy.exc.IntegrityError: (sqlalchemy.dialects.postgresql.asyncpg.IntegrityError) <class 'asyncpg.exceptions.UniqueViolationError'>: duplicate key value violates unique constraint "pg_type_typname_nsp_index"
DETAIL:  Key (typname, typnamespace)=(test_namespace_conflict, 83945) already exists.
[SQL: 
CREATE TABLE public.test_namespace_conflict (
	id BIGINT N

0


## 1.3. `add_new_columns`

Similarly to previous conflicts, both coroutines get the answer that the column does not exist and both try to create it at the same time. One of the coroutines succeeds and the other one fails.

In [8]:
async def test_add_new_column_conflict():
    table_name = 'test_new_column_conflict'
    test_df = pd.DataFrame({'id':[0]}).set_index('id')
    await adrop_table(engine=engine, schema=schema, table_name=table_name)

    # setup table and insert data all at once
    await aupsert(con=engine, schema=schema, df=test_df, table_name=table_name, if_row_exists='update')
    await adisplay_table(engine=engine, schema=schema, table_name=table_name)

    # add a new column in the df so that pangres has to create it in the SQL table when we set `add_new_columns` to `True`
    test_df['new_column'] = 'foo'

    try:
        # execute 2 identical coroutines in parallel
        await asyncio.gather(aupsert(con=engine, schema=schema, df=test_df, table_name=table_name, if_row_exists='update', add_new_columns=True),
                             aupsert(con=engine, schema=schema, df=test_df, table_name=table_name, if_row_exists='update', add_new_columns=True))
    except Exception:
        traceback.print_exc()
    await adisplay_table(engine=engine, schema=schema, table_name=table_name)

In [9]:
await test_add_new_column_conflict()

0


2022-01-17 19:17:34,259 | INFO     | pangres    | logger:log:73 - Added column test_new_column_conflict.new_column (type: TEXT) in table test_new_column_conflict (schema="public")
asyncpg.exceptions.DuplicateColumnError: column "new_column" of relation "test_new_column_conflict" already exists

The above exception was the direct cause of the following exception:

sqlalchemy.dialects.postgresql.asyncpg.AsyncAdapt_asyncpg_dbapi.ProgrammingError: <class 'asyncpg.exceptions.DuplicateColumnError'>: column "new_column" of relation "test_new_column_conflict" already exists

The above exception was the direct cause of the following exception:

sqlalchemy.exc.ProgrammingError: (sqlalchemy.dialects.postgresql.asyncpg.ProgrammingError) <class 'asyncpg.exceptions.DuplicateColumnError'>: column "new_column" of relation "test_new_column_conflict" already exists
[SQL: ALTER TABLE public.test_new_column_conflict ADD COLUMN new_column TEXT]
(Background on this error at: https://sqlalche.me/e/14/f405)


Unnamed: 0_level_0,new_column
id,Unnamed: 1_level_1
0,foo


## 1.4. `adapt_dtype_of_empty_db_columns`

This causes somewhat similar problems as in the previous tests we've shown. Both coroutines see that the data type is going to have to change and send a SQL `ALTER` statement.

* Sqlite: SQlite does not support such column type alteration so we get an exception
* PostgreSQL: one of the coroutines manages to lock the table first for updating the structure (and succesfully runs), which prevents the other coroutine from completing and even creates what is called a **deadlock**
* MySQL: no exceptions despite the same `ALTER` statement being executed twice at roughly the same time 😳!... I'd be curious to read an explanation for this if you have one.

In [10]:
async def test_alter_dtype_conflict():
    table_name = 'test_alter_dtype_conflict'
    test_df = pd.DataFrame({'id':[0], 'empty_column':[None]}).set_index('id')
    await adrop_table(engine=engine, schema=schema, table_name=table_name)

    # setup table and insert data all at once
    await aupsert(con=engine, schema=schema, df=test_df, table_name=table_name, if_row_exists='update')
    await adisplay_table(engine=engine, schema=schema, table_name=table_name)

    # add something in the column that will force pangres to change the data type. IIRC an empty column
    # will default to the TEXT data type (we use pandas to create the SQL model for us in the backend of pangres)
    test_df['empty_column'] = 0  # pangres will have to convert from TEXT to INTEGER

    try:
        # execute 2 identical coroutines in parallel
        await asyncio.gather(aupsert(con=engine, schema=schema, df=test_df, table_name=table_name, if_row_exists='update', adapt_dtype_of_empty_db_columns=True),
                             aupsert(con=engine, schema=schema, df=test_df, table_name=table_name, if_row_exists='update', adapt_dtype_of_empty_db_columns=True))
    except Exception:
        traceback.print_exc()
    await adisplay_table(engine=engine, schema=schema, table_name=table_name)

In [11]:
await test_alter_dtype_conflict()

Unnamed: 0_level_0,empty_column
id,Unnamed: 1_level_1
0,


2022-01-17 19:17:34,408 | INFO     | pangres    | logger:log:73 - Changed type of column empty_column from TEXT to BIGINT in table test_alter_dtype_conflict (schema="public")


Unnamed: 0_level_0,empty_column
id,Unnamed: 1_level_1
0,0


# 2. Upsert

What happens if we have multiple coroutines with colliding rows to update that we execute in parallel?

Here for the same user (identified by its id) we will have three coroutines updating the field `name` with a different value each.

Strangely enough we get no exceptions 😳! But there are undesired behaviors:

* PostgreSQL and SQLite (not in memory): we get different values, this is a bit "random". This leads me to believe that for the same primary key the last coroutine to run will set the value of any given field we defined
* MySQL and SQLite (in memory): it seems the last coroutine in the code systematically sets the value, as if it was running synchronously 🤔. I don't have an idea why so feel free to comment.

In [12]:
table_name_test_upsert = 'test_upsert_conflict'


async def asetup_user_table():
    df = pd.DataFrame({'id':[], 'name':[]}).astype({'id':int, 'name':str}).set_index('id')
    await aupsert(con=engine, schema=schema, df=df, table_name=table_name_test_upsert, if_row_exists='update')


async def aadd_user(user_id, user_name):
    df = pd.DataFrame({'id':[user_id], 'name':[user_name]}).set_index('id')
    await aupsert(con=engine, schema=schema, df=df, table_name=table_name_test_upsert, if_row_exists='update')


async def test_upsert_conflict():
    await adrop_table(engine=engine, schema=schema, table_name=table_name_test_upsert)

    # setup table and insert data all at once
    await asetup_user_table()

    try:
        await asyncio.gather(aadd_user(user_id=0, user_name='foo'),
                             aadd_user(user_id=0, user_name='bar'),
                             aadd_user(user_id=0, user_name='baz'))
    except Exception:
        traceback.print_exc()
    await adisplay_table(engine=engine, schema=schema, table_name=table_name_test_upsert)

In [13]:
for i in range(10):
    print(f'Test iteration {i+1}')
    await test_upsert_conflict()

Test iteration 1


Unnamed: 0_level_0,name
id,Unnamed: 1_level_1
0,baz


Test iteration 2


Unnamed: 0_level_0,name
id,Unnamed: 1_level_1
0,bar


Test iteration 3


Unnamed: 0_level_0,name
id,Unnamed: 1_level_1
0,baz


Test iteration 4


Unnamed: 0_level_0,name
id,Unnamed: 1_level_1
0,baz


Test iteration 5


Unnamed: 0_level_0,name
id,Unnamed: 1_level_1
0,baz


Test iteration 6


Unnamed: 0_level_0,name
id,Unnamed: 1_level_1
0,baz


Test iteration 7


Unnamed: 0_level_0,name
id,Unnamed: 1_level_1
0,baz


Test iteration 8


Unnamed: 0_level_0,name
id,Unnamed: 1_level_1
0,baz


Test iteration 9


Unnamed: 0_level_0,name
id,Unnamed: 1_level_1
0,baz


Test iteration 10


Unnamed: 0_level_0,name
id,Unnamed: 1_level_1
0,baz
