LOAD

In [2]:
import collections.abc as collections_abc
import dataclasses
import itertools
import typing

import faker
import more_itertools
import psycopg2

import settings
import utils.profilers as profiler_utils
import utils.psycopg2 as psycopg2_utils

In [3]:
connection = psycopg2.connect(**settings.POSTGRESQL_DATABASE_SETTINGS)

In [4]:
def create_tables(connection: psycopg2_utils.Connection) -> None:
    with connection.cursor() as cursor:
        cursor.execute(
            """
            CREATE TABLE IF NOT EXISTS users(
                id serial,
                name text NOT NULL,
                description text NOT NULL
            )
            """
        )
    connection.commit()

In [5]:
def truncate_tables(connection: psycopg2_utils.Connection) -> None:
    with connection.cursor() as cursor:
        cursor.execute("TRUNCATE users")
    connection.commit()

In [6]:
create_tables(connection)

In [7]:
@dataclasses.dataclass
class User:
    name: str
    description: str

In [8]:
SIZE = 10_000

def gen_fake_users() -> collections_abc.Iterator[User]:
    fake = faker.Faker()
    return (User(name=fake.name(), description=fake.text()) for _ in range(SIZE))

In [9]:
ExecuteType = collections_abc.Callable[[psycopg2_utils.Connection, typing.Iterator[User]], None]

def run_execution(func: ExecuteType, connection: psycopg2_utils.Connection) -> None:
    users = gen_fake_users()
    func(connection, users)
    truncate_tables(connection)

In [10]:
@profiler_utils.profile
def execute_single(connection: psycopg2_utils.Connection, users: collections_abc.Iterator[User]) -> None:
    with connection.cursor() as cursor:
        for user in users:
            stmt = "INSERT INTO users (name, description) VALUES (%s, %s)"
            data = (user.name, user.description)
            cursor.execute(stmt, data)
    connection.commit()

In [11]:
@profiler_utils.profile
def executemany(connection: psycopg2_utils.Connection, users: collections_abc.Iterator[User]) -> None:
    with connection.cursor() as cursor:
        stmt = "INSERT INTO users (name, description) VALUES (%s, %s)"
        data = [(user.name, user.description) for user in users]
        cursor.executemany(stmt, data)
    connection.commit()

In [12]:
@profiler_utils.profile
def execute_single_query(connection: psycopg2_utils.Connection, users: collections_abc.Iterator[User]) -> None:
    data = list(itertools.chain.from_iterable((user.name, user.description) for user in users))
    stmt = f"INSERT INTO users (name, description) VALUES {','.join('(%s, %s)' for _ in range(len(data)//2))}"
    with connection.cursor() as cursor:
        cursor.execute(stmt, data)
    connection.commit()

In [13]:
run_execution(execute_single, connection)

execute_single:
 max_memory: 0.112 mb
 exec time: 16,300.990 ms


In [14]:
run_execution(executemany, connection)

executemany:
 max_memory: 3.223 mb
 exec time: 12,261.426 ms


In [15]:
run_execution(execute_single_query, connection)

execute_single_query:
 max_memory: 7.174 mb
 exec time: 11,366.399 ms


In [16]:
CHUNK_SIZE = 500

In [17]:
@profiler_utils.profile
def execute_chunks(connection: psycopg2_utils.Connection, users: collections_abc.Iterator[User]) -> None:
    stmt = "INSERT INTO users (name, description) VALUES (%s, %s)"
    with connection.cursor() as cursor:
        for user_chunk in more_itertools.ichunked(users, CHUNK_SIZE):
            for user in user_chunk:
                data = (user.name, user.description)
                cursor.execute(stmt, data)
            connection.commit()

In [18]:
@profiler_utils.profile
def executemany_chunks(connection: psycopg2_utils.Connection, users: collections_abc.Iterator[User]) -> None:
    stmt = "INSERT INTO users (name, description) VALUES (%s, %s)"
    with connection.cursor() as cursor:
        for user_chunk in more_itertools.ichunked(users, CHUNK_SIZE):
            data = [(user.name, user.description) for user in user_chunk]
            cursor.executemany(stmt, data)
            connection.commit()

In [19]:
@profiler_utils.profile
def execute_single_query_chunks(connection: psycopg2_utils.Connection, users: collections_abc.Iterator[User]) -> None:
    with connection.cursor() as cursor:
        for user_chunk in more_itertools.ichunked(users, CHUNK_SIZE):
            data = list(itertools.chain.from_iterable((user.name, user.description) for user in user_chunk))
            stmt = f"INSERT INTO users (name, description) VALUES {','.join('(%s, %s)' for _ in range(len(data)//2))}"
            cursor.execute(stmt, data)
            connection.commit()

In [20]:
run_execution(execute_single, connection)

execute_single:
 max_memory: 0.071 mb
 exec time: 15,890.040 ms


In [21]:
run_execution(execute_chunks, connection)

execute_chunks:
 max_memory: 0.066 mb
 exec time: 15,864.328 ms


In [22]:
run_execution(executemany_chunks, connection)

executemany_chunks:
 max_memory: 0.359 mb
 exec time: 12,616.776 ms


In [23]:
run_execution(execute_single_query_chunks, connection)

execute_single_query_chunks:
 max_memory: 0.476 mb
 exec time: 11,987.098 ms


In [24]:
def drop_tables(connection: psycopg2_utils.Connection) -> None:
    with connection.cursor() as cursor:
        cursor.execute("TRUNCATE users")
    connection.commit()

drop_tables(connection)
