# ETL Workflow

In [None]:
%load_ext lab_black
%load_ext autoreload
%autoreload 2

In [None]:
from datetime import timedelta
from typing import List

import numpy as np
import pandas as pd
from prefect import flow, get_run_logger, task
from prefect.task_runners import SequentialTaskRunner
from prefect.tasks import task_input_hash

## About

Run the ETL workflow.

## User Inputs

In [None]:
scaling_factor = 6
cols_to_scale = ["A", "D", "E"]
nrows = [5, 25, 150, 300, 500, 12, 75, 125, 600, 30]

## Run Basic Workflow

In [None]:
# %%time
# @flow(name="Testing")
# def basic_flow():
#     logger = get_run_logger()
#     logger.info("The fun is about to begin")


# state = basic_flow()

## Run ETL Workflow

In [None]:
@task(
    name="Data Retriever",
    description="Extract raw data.",
    tags=["get-data", "workflow"],
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(minutes=1),
)
def extract(nrows: str) -> pd.DataFrame:
    """Extract data."""
    logger = get_run_logger()
    df = pd.DataFrame(np.random.rand(nrows, 5), columns=list("ABCDE")).assign(
        nrows=nrows
    )
    logger.info(f"Retrieved {len(df):,} rows of data")
    return df


@task(
    name="Data Transformer",
    description="Transform data.",
    tags=["transform-data", "workflow"],
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(minutes=1),
)
def transform(
    df: pd.DataFrame, scaling_factor: int, cols_to_scale: List[str]
) -> pd.DataFrame:
    """Transform data."""
    logger = get_run_logger()
    df[cols_to_scale] = df[cols_to_scale] * scaling_factor
    logger.info(f"Transformed columns: {','.join(cols_to_scale)}")
    return df


@task(
    name="Data Loader",
    description="Load transformed data",
    tags=["load-data", "workflow"],
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(minutes=1),
)
def load(data: pd.DataFrame, load_str: str) -> None:
    """Load data."""
    logger = get_run_logger()
    logger.info(f"Loaded data with {len(data):,} rows & used str {load_str}.")


@flow(
    name="ETL Workflow",
    description="Run end-to-end ETL flow.",
    version="02",
    task_runner=SequentialTaskRunner(),
)
def data_flow(
    nrows: List[int], load_str: str, scaling_factor: int, cols_to_scale: List[str]
) -> pd.DataFrame:
    """Run ETL job."""
    dfs_trans = []
    for nrow in nrows:
        df_raw = extract(nrow)
        df_transformed = transform(df_raw, scaling_factor, cols_to_scale)
        load(df_transformed, load_str)
        dfs_trans.append(df_transformed.result())
    # print(type(df_transformed.result()))
    df_trans = pd.concat(dfs_trans, ignore_index=True)
    return df_trans

In [None]:
%%time
state = data_flow(nrows, "dummy-string", scaling_factor, cols_to_scale)

## Validate Output of Workflow

In [None]:
# Verify output is of the expected datatype
assert isinstance(state.result(), pd.DataFrame)

# Verify that total number of rows in output is sum of nrows input list
assert len(state.result()) == sum(nrows)

# Verify scaled columns are in output
assert set(cols_to_scale).issubset(set(list(state.result())))

# Verify that nrows column contains same values as nrows input list
assert state.result()["nrows"].unique().tolist() == nrows