# Working with Tasks

In [15]:
from typing import List, Union
import pandas as pd
from tasks import BaseTask as Task
from pandas.testing import assert_frame_equal

In [16]:
# ====================== PARAMETERS ====================== #
# First lets reference some data files for us to work with
dimA = 'data/dim_a.csv'
dimB = 'data/dim_b.csv'
dimC = 'data/dim_c.csv'
factTbl = 'data/fact_tbl.csv'

In [17]:
# ====================== FUNCTIONS ====================== #
# Now lets create some python functions that we will use to process some data
def select_target_cols(df: pd.DataFrame, target_schema: List) -> pd.DataFrame:
    df = df[target_schema]
    return df


def create_composite_key(df: pd.DataFrame) -> pd.DataFrame:
    df['CompositeKey'] = df.CNo.map(str) + "-" \
        + df.BNo.map(str) + "-" \
        + df.ANo.map(str)
    return df


def read_table(path: str) -> pd.DataFrame:
    df = pd.read_csv(path)
    return df


def join_dataframes(A: pd.DataFrame, B: pd.DataFrame, on: Union[str, List], how: str) -> pd.DataFrame:
    df = A.merge(B, on=on, how=how)
    return df


def sink_output(df: pd.DataFrame, path: str) -> None:
    df.to_csv(path)
    return


def drop_duplicates(df: pd.DataFrame, columns=List[str]) -> pd.DataFrame:
    df = df.drop_duplicates(subset=columns)
    return df

In [18]:
# ====================== TASKS ====================== #

# Now lets create a bunch of tasks by wrapping our functions
read = Task(read_table)
select = Task(select_target_cols)
join = Task(join_dataframes)
joinCust = Task(join_dataframes)
joinBG = Task(join_dataframes)
composite = Task(create_composite_key)
sink = Task(sink_output)


In [19]:
# ====================== EXECUTION ====================== #

# Now that we have some tasks we can use some of the methods we defined
# Lets see our validate method in action by using it
select.validate(read)

True

In [20]:
# Lets see what happens when we try to validate to tasks that are not compatible
read.validate(select)

CompatibilityException: Validation Failed. Output of select_target_cols is incompatible with inputs from read_table

In [21]:
# Lets run some tasks we created in a linear (sequential pipeline)
df = read.run(path='data\dim_a.csv')
df = select.run(df, ['AId', 'BId'])
df = join.run(df, df, on='BId', how='left')
print(df.head(10))

   AId_x     BId  AId_y
0    300  784955    300
1    300  784955    406
2    300  784955    412
3    300  784955    784
4    300  784955    787
5    300  784955    842
6    300  784955   1030
7    300  784955   1034
8    300  784955   1064
9    300  784955   1156


# Using the Postgres Client with Tasks

In [22]:
import pandas as pd
from psycopg import Connection
from clients.postgres import PostgresClient
from tasks import BaseTask as Task

from typing import List, Tuple


### Lets write some functions to use for processing data from our DVD-Rental Database

In [23]:
# Lets pack all this into a function that we can encapsulate within a set of task
def setup_client(config_file, section) -> Connection:
    client = PostgresClient()
    cursor = client.connect_from_config(path=config_file, section=section)
    return cursor

def set_search_path(cursor, catalog, schema) -> Connection:
    cursor.execute(f"SET search_path TO {catalog}, {schema};")
    return cursor
    
def create_new_schema(cursor, schema) -> Connection:
    cursor.execute(f"CREATE SCHEMA IF NOT EXISTS {schema};")
    return cursor
    
def read_table(cursor, table) -> List[Tuple]:
    tbl = cursor.execute(f"SELECT * FROM {table};").fetchall()
    return tbl
    
def to_pandas_df(data) -> pd.DataFrame:
    df = pd.DataFrame(data)
    return df

## Now Lets Encapsulate Those functions in as Pipeline Tasks

In [24]:
# Now we can put our functions in tasks for our pipeline
tsk_0 = Task(setup_client)
tsk_1 = Task(set_search_path)
tsk_2 = Task(create_new_schema)
tsk_3 = Task(read_table)
tsk_4 = Task(to_pandas_df)

## Now lets execute this as a linear pipeline

In [25]:
cursor = tsk_0.run("..\..\maellin\.config\.postgres", 'postgresql')
cursor = tsk_1.run(cursor=cursor, catalog='dvdrental', schema='public')
data = tsk_3.run(cursor=cursor, table='actor')
df = tsk_4.run(data)
df.head()

Unnamed: 0,0,1,2,3
0,1,Penelope,Guiness,2013-05-26 14:47:57.620
1,2,Nick,Wahlberg,2013-05-26 14:47:57.620
2,3,Ed,Chase,2013-05-26 14:47:57.620
3,4,Jennifer,Davis,2013-05-26 14:47:57.620
4,5,Johnny,Lollobrigida,2013-05-26 14:47:57.620


# But What about Task 2 that we did not use?

Task 2 is for creating a new schema, possibly a target for our processed data to use as a sink.
But putting task 2 in our linear pipeline feels awkward, it does not fit in well with the other steps.
This is where pipeline branching (splitting the pipeline into separate directions becomes useful)

## To use branching strategies in data processing we need a DAG!