In [23]:
from collections.abc import Generator
from functools import wraps
from typing import Tuple, Dict, Any

import psycopg2
from psycopg2.extras import DictCursor, DictRow
import json
from sqlalchemy import create_engine
import pandas as pd

# load settings
with open('settings.json', 'r') as f:
     settings = json.load(f)
username = settings['DB']['username']
password = settings['DB']['password']
database = settings['DB']['database']
schema = settings['DB']['schema']
hostname = settings['DB']['hostname']
database_loc = f"postgresql://{username}:{password}@localhost:5432/{database}"
engine = create_engine(database_loc)

SQL = f"""select id, number from {schema}.etl_pileline_2"""

In [19]:

df = pd.DataFrame([(i, i * 10) for i in range(10)], columns=['id', 'number'])
# Adding data to the database
df.to_sql(name='etl_pileline_2',
          con=engine,
          index=False,
          if_exists='replace',
          schema=schema)

10

In [20]:
pd.read_sql_table(table_name='etl_pileline_2', con=engine, schema=schema)

Unnamed: 0,id,number
0,0,0
1,1,10
2,2,20
3,3,30
4,4,40
5,5,50
6,6,60
7,7,70
8,8,80
9,9,90


In [21]:
def coroutine(func):
    @wraps(func)
    def inner(*args:tuple[Any, ...], **kwargs: dict[str, Any]) -> Generator:
        fn: Generator = func(*args, **kwargs)
        next(fn)
        return fn

    return inner


def extract(batch: Generator) -> None:
    """ Retrieves rows from the database and passes them to the generator
    Args:
        batch: the generator function to which we pass the value
    """

    dbs: Dict = dict(dbname=database, user=username, password=password, host=hostname)
    with psycopg2.connect(**dbs) as connection:
        with connection.cursor(cursor_factory=DictCursor) as cursor:
            cursor.execute(SQL)
            record = cursor.fetchone() # you can use fetchmany to fetch data "in batches"
            while record:
                batch.send(record) # make sure the argument is an iterable
                record = cursor.fetchone()


@coroutine
def transform(batch: Generator) -> Generator[None, DictRow, None]:

    foo: int | str # instruction for mypy

    while record := (yield):

        new_number = record["number"] ** 2
        if record["number"] % 2 == 0:
            foo = "an even number"
        elif record["number"] == 3:
            print("skip load stage")
            continue
        else:
            foo = 0

        batch.send((new_number, foo))


@coroutine
def load() -> Generator[None, Tuple, None]:
    while subject := (yield):
        if subject == (int(number), str(bar)):
            print("the square of", bar, number)
        elif subject == (int(number), int(bar)):
            print(number)
        else:
            raise SyntaxError(f"Unknown structure of {subject=}")

In [24]:
unloads = load()
multiplication = transform(unloads)
extract(multiplication)

NameError: name 'number' is not defined