EXTRACT

In [None]:
import collections.abc as collections_abc
import dataclasses
import itertools
import typing

import faker
import psycopg2
import psycopg2.extras as psycopg2_extras

import settings
import utils.profilers as profiler_utils
import utils.psycopg2 as psycopg2_utils

In [None]:
SIZE = 200_000

In [None]:
def create_tables(connection: psycopg2_utils.Connection) -> None:
    with connection.cursor() as cursor:
        cursor.execute(
            """
            CREATE TABLE IF NOT EXISTS users(
                id serial primary key,
                name text NOT NULL,
                description text NOT NULL
            )
            """
        )
    connection.commit()

In [None]:
connection = psycopg2.connect(**settings.POSTGRESQL_DATABASE_SETTINGS)

In [None]:
create_tables(connection)

In [None]:
@dataclasses.dataclass
class LoadUser:
    name: str
    description: str

In [None]:
def gen_fake_users() -> collections_abc.Iterator[LoadUser]:
    fake = faker.Faker()
    return (LoadUser(name=fake.name(), description=fake.text()) for _ in range(SIZE))

In [None]:
def load_data(connection: psycopg2_utils.Connection) -> None:
    loader = psycopg2_utils.Loader(connection, chunk_size=500)
    users = gen_fake_users()
    loader.load_from_iterable(users, LoadUser, "users")

In [None]:
load_data(connection)

In [None]:
@dataclasses.dataclass
class ExtractUser:
    id: int
    name: str
    description: str

In [None]:
@profiler_utils.profile
def run_through_iterable(items: collections_abc.Iterable[ExtractUser]) -> None:
    for _ in items:
        ...

In [None]:
ExecuteType = collections_abc.Callable[[psycopg2_utils.Connection], collections_abc.Iterable[ExtractUser]]

def run_execution(func: ExecuteType, connection: psycopg2_utils.Connection) -> None:
    iterable = func(connection)
    run_through_iterable(iterable)

In [None]:
@profiler_utils.profile
def fetch_all_list(connection: psycopg2_utils.Connection) -> list[ExtractUser]:
    with connection.cursor(cursor_factory=psycopg2_extras.DictCursor) as cursor:
        cursor.execute("SELECT id, name, description FROM users ORDER BY id")
        users = cursor.fetchall()
        return [ExtractUser(**user) for user in users]

In [None]:
@profiler_utils.profile
def fetch_all_gen_expression(connection: psycopg2_utils.Connection) -> collections_abc.Iterator[ExtractUser]:
    with connection.cursor(cursor_factory=psycopg2_extras.DictCursor) as cursor:
        cursor.execute("SELECT id, name, description FROM users ORDER BY id")
        users = cursor.fetchall()
        return (ExtractUser(**user) for user in users)

In [None]:
@profiler_utils.profile
def fetch_all_yield(connection: psycopg2_utils.Connection) -> collections_abc.Iterator[ExtractUser]:
    with connection.cursor(cursor_factory=psycopg2_extras.DictCursor) as cursor:
        cursor.execute("SELECT id, name, description FROM users ORDER BY id")
        users = cursor.fetchall()
        yield from (ExtractUser(**user) for user in users)

In [None]:
@profiler_utils.profile
def fetch_one_yield(connection: psycopg2_utils.Connection) -> collections_abc.Iterator[ExtractUser]:
    with connection.cursor(cursor_factory=psycopg2_extras.DictCursor) as cursor:
        cursor.execute("SELECT id, name, description FROM users ORDER BY id")
        while user := cursor.fetchone():
            yield ExtractUser(**user)

In [None]:
run_execution(fetch_all_list, connection)

In [None]:
run_execution(fetch_all_gen_expression, connection)

In [None]:
run_execution(fetch_all_yield, connection)

In [None]:
run_execution(fetch_one_yield, connection)

In [None]:
CHUNK_SIZE = 500

In [None]:
@profiler_utils.profile
def fetch_many_yield(connection: psycopg2_utils.Connection) -> collections_abc.Iterator[ExtractUser]:
    with connection.cursor(cursor_factory=psycopg2_extras.DictCursor) as cursor:
        cursor.execute("SELECT id, name, description FROM users ORDER BY id")
        while users_chunk := cursor.fetchmany(size=CHUNK_SIZE):
            yield from (ExtractUser(**user) for user in users_chunk)

In [None]:
@profiler_utils.profile
def fetch_limit_offset(connection: psycopg2_utils.Connection) -> collections_abc.Iterator[ExtractUser]:
    stmt = "SELECT id, name, description FROM users ORDER BY id LIMIT %s OFFSET %s"
    with connection.cursor(cursor_factory=psycopg2_extras.DictCursor) as cursor:
        for i in itertools.count():
            data = (CHUNK_SIZE, i * CHUNK_SIZE)
            cursor.execute(stmt, data)
            if not cursor.rowcount:
                break
            yield from (ExtractUser(**user) for user in cursor.fetchall())

SELECT id, name, description FROM users WHERE id > %s ORDER BY id LIMIT %s

In [None]:
@profiler_utils.profile
def fetch_last_id(connection: psycopg2_utils.Connection) -> collections_abc.Iterator[ExtractUser]:
    last_id = None
    with connection.cursor(cursor_factory=psycopg2_extras.DictCursor) as cursor:
        while True:
            stmt = "SELECT id, name, description FROM users"
            data: list[typing.Any] = []
            if last_id is not None:
                stmt += " WHERE id > %s"
                data.append(last_id)

            stmt += " ORDER BY id LIMIT %s"
            data.append(CHUNK_SIZE)

            cursor.execute(stmt, data)
            if not cursor.rowcount:
                break
            for user in cursor.fetchall():
                yield ExtractUser(**user)
            last_id = user["id"]

In [None]:
run_execution(fetch_one_yield, connection)

In [None]:
run_execution(fetch_many_yield, connection)

In [None]:
run_execution(fetch_limit_offset, connection)

In [None]:
run_execution(fetch_last_id, connection)

In [None]:
def drop_tables(connection: psycopg2_utils.Connection) -> None:
    with connection.cursor() as cursor:
        cursor.execute("DROP TABLE users")
    connection.commit()

drop_tables(connection)