# Highly Scalable Data Pipelines with Prefects-16

## Our Journey in Leveraging Prefect & k8s to enable Resilient, Orchestrated Workflows

### David Zucker
### 2021-09-16

# What is our Team about ?

- Provide to the business advance recommendation
- Ensure quality of data
- Data provider/aggregator for our platform

# What was our evolution ?


## November 2019: First release

**Pro**
- Running on Kubernetes

**Con**
- Single big Pod, Hard to Kubernetes to find space to started
- Took 5h to process the full model
- Use CronJob to schedule the run


## June 2020: First use of Prefect Core

**Pro**
- Running on Kubernetes
- Use Dask for scale out
- Took <1h to process the full model

**Con**
- Use CronJob to schedule the run
- Hard to get inside about the platform

## February 2021: Move to Prefect Server

**Pro**
- Running on Kubernetes
- Use Dask for scale out
- Took <1h to process the full model
- Use Prefect Server to schedule and get inside about our runs

**Con**
- Still evolving/maturing platform with change from master version


# Let show how to evolve your Python code using Prefect Core



## Our use case

- Extract Daily market data at close of exchange
- Need to run on Market Index daily
- Need to load the data as fast as we can at close


# Basic Python implementation

## Solution explanation 

- Extract index composition from Stooq
- Download daily market data for each symbols
- Resample daily candles to weekly and monthly
- Save all result to parquets

In [None]:
import pandas as pd
from typing import List
import os
import time
import pathlib


def index_to_stooq_id(index_name:str)-> str:
    i_2_s = {"DJI": "578",
            "NDX":"580",
            "HSI":"616"}
    return i_2_s[index_name]

def get_index_composition(index_id: str) -> pd.DataFrame:
    return pd.read_html(f"https://stooq.com/t/?i={index_id}", attrs = {"id": "fth1"})[0]

def get_symbols(df_index_composition: pd.DataFrame) -> List[str]:
    return [symbol for symbol in df_index_composition["Symbol"]]

def get_historical_data(symbol: str, time_frame: str="d") -> pd.DataFrame:
    df = pd.read_csv(f"https://stooq.com/q/d/l/?s={symbol}&i={time_frame}", parse_dates=True)
    df['Date'] =  pd.to_datetime(df['Date'], format='%Y-%m-%d')
    return df

In [None]:
def resample_candle(df_daily:pd.DataFrame, resampling: str = "W") -> pd.DataFrame:
    agg_dict = {'Open': 'first',
          'High': 'max',
          'Low': 'min',
          'Close': 'last',
          'Volume': 'mean'}
    df_with_index = df_daily.set_index("Date")
    return df_with_index.resample(resampling).agg(agg_dict).reset_index()

def save_to_parquet(df: pd.DataFrame, path: str):
    pathlib.Path(path).expanduser().parent.mkdir(parents=True, exist_ok=True)
    df.to_parquet(path)

In [None]:
%%time

index_name = "DJI"
root_market_data = "~/git/prefect_presentation/market_data"


print(f"=> Extract composition for {index_name}")
index_id = index_to_stooq_id(index_name)
df_composition = get_index_composition(index_id)


for symbol in get_symbols(df_composition)[:6]:
    start = time.time()
    print(f"==> Extract market data for {symbol}", end =" ")
    df_OHLC_daily = get_historical_data(symbol)
    save_to_parquet(df_OHLC_daily, f"{root_market_data}/{index_name}/{symbol}_d.parquet")
    print("Resample W.", end =" ")
    df_OHLC_weekly = resample_candle(df_OHLC_daily, "W")
    save_to_parquet(df_OHLC_weekly, f"{root_market_data}/{index_name}/{symbol}_W.parquet")
    print("M", end =" ")
    df_OHLC_monthly = resample_candle(df_OHLC_daily, "M")
    save_to_parquet(df_OHLC_monthly, f"{root_market_data}/{index_name}/{symbol}_M.parquet")
    end = time.time()
    print(f"took: {end - start:2.2f}s")

## Limitation

- Linear execution
- Hard scale out without code complexity (Threading or asyncio)
- No failover/retry capability
- Load of edge case to handle to be resilient
- Depend on external solution to schedule

# Upgrading the code with Prefect Core



## Few components you need to know

1. **Task**: Low level execution unit. Simple function decorated with the attribute ```@task```
2. **Flow**: Container of tasks with their dependencies
3. **Parameters**: Special task that allow to change input value of the flow at run time

## Task

``` python
from prefect import task

@task
def plus_one(x):
    return x + 1

```

## Task: Failover support

- max_retries
- retry_delay
- timeout

``` python
from prefect import task
from datetime import timedelta

@task(max_retries=4, retry_delay=timedelta(seconds=10), timeout=timedelta(seconds=2))
def plus_one(x):
    return x + 1
```



## Task: Fonctional map

- Map allow to iterate a list of element through a task.
- Powerful to distribute work

In [None]:
from prefect import task, Flow, unmapped
from datetime import timedelta
from typing import List

@task
def add(x, y):
    return x + y

@task
def elements() -> List[int]:
     return [x for x in range(1,11)]
    
with Flow("sample") as f:
    ints = elements()
    # Will iterate through all element and add the value 10
    result_add = add.map(unmapped(10), ints)
    
flow_state = f.run()
flow_state.result[result_add].result

## Task Library

- Set of share task supported by prefect
- Wide variaty that allow simple integration
 
 ### Some example
     - Shellscript
     - KubernetesJob
     - Lambda function
     - Multiple database

## Task Library

![media/prefect_task_1.png](media/prefect_tasks_1.png)

## Task Library

![media/prefect_task_2.png](media/prefect_tasks_2.png)

## Task Library

![media/prefect_task_3.png](media/prefect_tasks_3.png)

## Flow

- Container of task
- Contain the DAG that will run
- Abstract the What to run from the How to run

In [None]:
from prefect import task, Flow, Parameter, unmapped
import pandas as pd
from typing import List
from datetime import timedelta
import os
import time
import pathlib

@task
def index_to_stooq_id(index_name:str)-> str:
    i_2_s = {"DJI": "578",
            "NDX":"580",
            "HSI":"616"}
    return i_2_s[index_name]

@task(max_retries=4, retry_delay=timedelta(seconds=2), timeout=timedelta(seconds=4))
def get_index_composition(index_id: str) -> pd.DataFrame:
    return pd.read_html(f"https://stooq.com/t/?i={index_id}", attrs = {"id": "fth1"})[0]

@task
def get_symbols(df_index_composition: pd.DataFrame) -> List[str]:
    return [symbol for symbol in df_index_composition["Symbol"][:6]]

@task(max_retries=4, retry_delay=timedelta(seconds=2), timeout=timedelta(seconds=6))
def get_historical_data(symbol: str, time_frame: str="d") -> pd.DataFrame:
    df = pd.read_csv(f"https://stooq.com/q/d/l/?s={symbol}&i={time_frame}", parse_dates=True)
    df['Date'] =  pd.to_datetime(df['Date'], format='%Y-%m-%d')
    return df


In [None]:

@task
def resample_candle(df_daily:pd.DataFrame, time_frame: str = "W") -> pd.DataFrame:
    agg_dict = {'Open': 'first',
          'High': 'max',
          'Low': 'min',
          'Close': 'last',
          'Volume': 'mean'}
    df_with_index = df_daily.set_index("Date")
    return df_with_index.resample(time_frame).agg(agg_dict).reset_index()

@task
def get_save_path(root:str, index_name: str, symbol:str, timeframe:str)-> str:
    return f"{root}/{index_name}/{symbol}_{timeframe}.parquet"

@task
def save_to_parquet(df: pd.DataFrame, path: str):
    pathlib.Path(path).expanduser().parent.mkdir(parents=True, exist_ok=True)
    df.to_parquet(path)


In [None]:
with Flow("extract_market_data") as f:
    # Define the flow input parameters
    index_name = Parameter("index", default="DJI")
    root_market_data = Parameter("root_folder", "~/git/prefect_presentation/market_data")
    
    # Retrieve the symbols for the index
    index_id = index_to_stooq_id(index_name)
    df_composition = get_index_composition(index_id)
    symbols = get_symbols(df_composition)
    
    # Download the market data for the index
    df_OHLC_daily = get_historical_data.map(symbols)
    daily_path = get_save_path.map(unmapped(root_market_data),unmapped(index_name), symbols, unmapped("d") )
    save_to_parquet.map(df_OHLC_daily, daily_path)
    
    # Resample the market data to Weekly
    df_weekly = resample_candle.map(df_OHLC_daily, unmapped("W"))
    weekly_path = get_save_path.map(unmapped(root_market_data),unmapped(index_name), symbols, unmapped("W") )
    save_to_parquet.map(df_weekly, weekly_path)
    
    # Resample the market data to Monthly
    df_monthly = resample_candle.map(df_OHLC_daily, unmapped("M"))
    monthly_path = get_save_path.map(unmapped(root_market_data),unmapped(index_name), symbols, unmapped("M") )
    save_to_parquet.map(df_monthly, monthly_path)
    


## Lets have a look at the workflow

- Prefect view of the DAG
- Show dependencies
- Distinguish mapped and non mapped task

In [None]:
f.visualize()

## Time for a run


In [None]:
%%time
flow_state = f.run(parameters={"index": "DJI"})

## DAG that prefect ran

In [None]:
f.visualize(flow_state)


## This look like sequential to me !!!!
```
[2021-09-13 17:25:56+0800] INFO - prefect.TaskRunner | Task 'get_historical_data': Starting task run...
[2021-09-13 17:25:56+0800] INFO - prefect.TaskRunner | Task 'get_historical_data': Finished task run for task with final state: 'Mapped'
[2021-09-13 17:25:56+0800] INFO - prefect.TaskRunner | Task 'get_historical_data[0]': Starting task run...
[2021-09-13 17:26:00+0800] INFO - prefect.TaskRunner | Task 'get_historical_data[0]': Finished task run for task with final state: 'Success'
[2021-09-13 17:26:00+0800] INFO - prefect.TaskRunner | Task 'get_historical_data[1]': Starting task run...
[2021-09-13 17:26:04+0800] INFO - prefect.TaskRunner | Task 'get_historical_data[1]': Finished task run for task with final state: 'Success'
[2021-09-13 17:26:05+0800] INFO - prefect.TaskRunner | Task 'get_historical_data[2]': Starting task run...
[2021-09-13 17:26:10+0800] INFO - prefect.TaskRunner | Task 'get_historical_data[2]': Finished task run for task with final state: 'Success'
[2021-09-13 17:26:10+0800] INFO - prefect.TaskRunner | Task 'get_historical_data[3]': Starting task run...
[2021-09-13 17:26:16+0800] INFO - prefect.TaskRunner | Task 'get_historical_data[3]': Finished task run for task with final state: 'Retrying'
```


# Prefect Modularity

- **Executors**: Define how you want to run your workflow
- **Storage**: Define the where the flow definition are stored.
- **Result**: Define where task result are persisted
- **Serialization**: Define how you want to stream your data
- **State Handle**: 

## Result

- Abstract the result type (Local, S3, Azure Blob Storage...)
- Allow you to define how we want to serialize the result
- Dynamic path generation




In [None]:
from prefect.engine.results import LocalResult
from prefect.engine.serializers import PandasSerializer

MARKET_RESULT = LocalResult(dir="~/market_data/", serializer=PandasSerializer(file_type="parquet"))
TARGET_STOCK = "stock/{today_nodash}/{symbol}.{time_frame}.parquet"

@task(checkpoint=True,
      target="index/{today_nodash}/{index_name}.parquet",
      result=MARKET_RESULT)
def get_index_composition(index_id: str, index_name: str) -> pd.DataFrame:
    return pd.read_html(f"https://stooq.com/t/?i={index_id}", attrs = {"id": "fth1"})[0]

@task(checkpoint=True,
      target=TARGET_STOCK,
      result=MARKET_RESULT)
def get_historical_data(symbol: str, time_frame: str="d") -> pd.DataFrame:
    df = pd.read_csv(f"https://stooq.com/q/d/l/?s={symbol}&i={time_frame}", parse_dates=True)
    df['Date'] =  pd.to_datetime(df['Date'], format='%Y-%m-%d')
    return df

In [None]:
@task(checkpoint=True,
      target=TARGET_STOCK,
      result=MARKET_RESULT)
def resample_candle(df_daily:pd.DataFrame, symbol:str, time_frame: str = "W") -> pd.DataFrame:
    agg_dict = {'Open': 'first',
          'High': 'max',
          'Low': 'min',
          'Close': 'last',
          'Volume': 'mean'}
    df_with_index = df_daily.set_index("Date")
    return df_with_index.resample(time_frame).agg(agg_dict).reset_index()


In [None]:
with Flow("extract_market_data_with_result") as f_with_result:
    # Define the flow input parameters
    index_name = Parameter("index", default="DJI")
    root_market_data = Parameter("root_folder", "~/git/prefect_presentation/market_data")
    
    # Retrieve the symbols for the index
    index_id = index_to_stooq_id(index_name)
    df_composition = get_index_composition(index_id, index_name)
    symbols = get_symbols(df_composition)

    df_OHLC_daily = get_historical_data.map(symbols, unmapped("d"))
    df_weekly = resample_candle.map(df_OHLC_daily, symbols, unmapped("W"))
    df_monthly = resample_candle.map(df_OHLC_daily, symbols, unmapped("M"))
    

In [None]:
import prefect
prefect.context.config.flows.checkpointing = True

In [None]:
f_with_result.run()

In [None]:
prefect.context.config["checkpointing"] 

### Executors

Responsible for actually executing the tasks

- **LocalExecutor**: Single threaded executor in memory
- **LocalDaskExecutor**: Use local dask executor to run in multi-threaded
- **DaskExecutor**: Use Dask distributed to sent task to remote worker

#### Dask: Dask is an open source library for parallel computing written in Python.[


### Running with LocalDaskExecutor

- Initialize a LocalDaskExecutor
- Set the executor in the run function


In [None]:
%%time
from prefect.executors import LocalDaskExecutor 
from flows.extract_market_data import flow_market
executor = LocalDaskExecutor(scheduler="threads", nb_threads="8")
flow_market.run(executor=executor)

### Running with DaskExecutor

- Advance executor to dask
- Require an external dask cluster running
- Allow to scale out

In [None]:
from prefect.executors import DaskExecutor 
from flows.dask_cluster import DaskCluster
from flows.extract_market_data import flow_market
from prefect.utilities.logging import get_logger

logger = get_logger()

with DaskCluster(logger) as cluster:
    executor = DaskExecutor(address=cluster.scheduler_address)
    flow_market.run(executor=executor)

In [None]:
cluster