In this example, we're going to build a small version of what `main.py` does. The goal is to 
*   Connect to the database
*   Create the schema defined
*   Create the `dim_staff` table
*   Extract that table
*   Transform that table
*   Load the table into the data warehouse
*   Teardown the connection. 

In [None]:
# Necessary imports
import pandas as pd
from psycopg import Cursor
from pypika import PostgreSQLQuery
from pypika import Schema, Column, PostgreSQLQuery
from dexxy.common.tasks import Task
from dexxy.common.workflows import Pipeline
from dexxy.common.plotting import plot_dag
from dexxy.database.postgres import PostgresClient

In [None]:
################## Parameters ###################
# Neccessary for connecting to the database. 

databaseConfig = "../config/database.ini"
section = 'postgresql'
dw = Schema('dssa')
dvd = Schema('public')

In [None]:
############## Table Definitions ################
# These are some generic builds for our star-schema. For visual reference refer to the star-schema.jpg. 
DIM_STAFF = (
    Column('sk_staff', 'INT', False),
    Column('name', 'VARCHAR(100)', False),
    Column('email', 'VARCHAR(100)', False)
)

In [None]:
################### Functions ####################
def createCursor(path:str, section:str) -> Cursor:
    client = PostgresClient()
    conn = client.connect_from_config(path, section, autocommit=True)
    cursor = conn.cursor()
    return cursor

# The code below assumes you'll also have a global cursor variable for connecting to the database.
cursor = createCursor(databaseConfig, section)

def createSchema(cursor: Cursor, schemaName: str) -> Cursor:
    q = f"CREATE SCHEMA IF NOT EXISTS {schemaName};"
    cursor.execute(q)
    return cursor

def createTable(cursor:Cursor, tableName:str, definition:tuple, primaryKey:str=None, foreignKeys:list=None, referenceTables:list=None) -> None: 
    ddl = PostgreSQLQuery \
        .create_table(tableName) \
        .if_not_exists() \
        .columns(*definition)
        
    if primaryKey is not None:
        ddl = ddl.primary_key(primaryKey)
        
    if foreignKeys is not None:
        for idx, key in enumerate(foreignKeys):
            ddl.foreign_key(
                columns=key,
                reference_table = referenceTables[idx],
                reference_columns = key
            )
            
    ddl = ddl.get_sql()
    
    cursor.execute(ddl)
    return    

def readData(tableName:str, columns:tuple) -> pd.DataFrame:
    query = PostgreSQLQuery \
        .from_(tableName) \
        .select(*columns) \
        .get_sql()
    res = cursor.execute(query)
    data = res.fetchall()
    
    col_names = []
    
    for names in res.description:
        col_names.append(names[0])
    
    df = pd.DataFrame(data, columns=col_names)
    return df

def loadData(df:pd.DataFrame, target:str):
    data = tuple(df.itertuples(index=False, name=None))
    query = PostgreSQLQuery \
        .into(target) \
        .insert(*data) \
        .get_sql()
    cursor.execute(query)
    return 

def buildDimStaff(staff_df:pd.DataFrame, *args, **kwargs) -> pd.DataFrame:
    staff_df.rename(columns={'staff_id': 'sk_staff'}, inplace=True)
    staff_df['name'] = staff_df.first_name + " " + staff_df.last_name
    dim_staff = staff_df[['sk_staff', 'name', 'email']].copy()
    dim_staff.drop_duplicates(inplace=True)
    return dim_staff

def tearDown(*args, **kwargs) -> None:
    cursor.close()
    return 

Below is what would be placed inside `def main():`

This is how you would build define and build Tasks/Pipelines. 

In [None]:
def main():
    
    setup = Pipeline(
        steps=[
            Task(createCursor,
                kwargs={'path': databaseConfig, 'section': section},
                dependsOn=None,
                name='createCursor'
            ),
            Task(createSchema,
                kwargs={"schemaName": dw._name},
                dependsOn=['createCursor'],
                name='createSchema'
            ),
            Task(createTable,
                kwargs={'tableName': dw.staff, 'primaryKey': 'sk_staff', 'definition':DIM_STAFF},
                dependsOn=['createSchema'],
                name='createDimStaff'
            ),
        ]
    )
    
    extract = Pipeline(
        steps=[Task(readData,
                kwargs={'tableName': dvd.staff,'columns': ('staff_id', 'first_name', 'last_name', 'email')},
                dependsOn=['createFactRentals'],
                name='extractStaff'
            )
        ]
    )
    
    transform = Pipeline(
        steps=[
            Task(buildDimStaff,
                dependsOn=['extractStaff', 'transformCustomer'],
                name='transformStaff'
            )
        ]
    )
    
    load = Pipeline(
        steps=[
            Task(loadData,
                dependsOn=['transformStaff'],
                kwargs={'target': dw.staff},
                name='loadStaff'
            )
        ]
    )
    
    # Creates a DAG for tear down tasks and closing out any open connections to the database
    teardown = Pipeline(
        steps =[
            Task(tearDown,
                dependsOn= [load],
                name='tearDown',
            )
        ]
    )
    
    # We merge all the above Pipelines into a single Pipeline containing all Tasks to be added to the DAG.
    workflow = Pipeline(
        steps=[
            setup,
            extract,
            transform,
            load,
            teardown
        ]
    )
    
    

In order to run this ETL pipeline, you will need to run a few functions after the workflow definition:

In [None]:
# ============================ COMPILATION ============================ #
# This section composes the DAG from the provided Tasks 
workflow.compose()

# ============================ ENQUEUE ============================ #
# This section uses the .collect() method which enqueues all tasks in the DAG to a task FIFO queue in topological order 
workflow.collect()

# ============================ EXECUTION ============================ #
# Runs the workflow locally using a single worker
workflow.run()