In [40]:
%load_ext autoreload

In [41]:
%autoreload 2

# Inroduction

## A guide to develop and deploy a modern data pipeline to production infrastructure
Using two three tasks this guide will go through the three principle stages nesscary to develop a modern data pipeline in python.  

#### Pipeline tasks
1. Extract MOC imbalance data from the TSX website
2. Load the data into a `postgress` database

In reality a pipeline will include other tasks that might: (1) Extract data from many sources at different time zones. (2) Prepare the data for analytics or prediction algorithm(s). (3) run multiple statistical experiments on decentralized infrastructure.

Our motivating example is a toy ETL pipeline that extracs data from the Toronto Stock Exchange's Market on Close facilty website and loads into a database.  The data is published every trading day at 15:40 Toronto time and at midnight the table gets flushed out.
 

### Principle stages and tools
1.  Write the base code
    - `requests` - extracting data.
    - `pandas` - transforming and passing data.
    - `psycopg2` for loading data to the db.
2.  Implement task configuration and orchestration logic for individual tasks and the pipeline. i.e. error handling, execution logic
    - `prefect` - orchestration, configuration, execution method i.e. dask or local, and scheduling
3.  Deploy the flow to a compute enviroment. Local, self hosted or IaaS providor. In our case AWS
    - `prefect` ,
    - AWS `s3`, `ECR`, `Batch`

### Requirements
- An AWS account
    - aws cli configured on the lcoal machine. [Instructions](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-configure.html)
- postgres db - Remote lazy way: [Lightsail](https://aws.amazon.com/lightsail/) is free for a month, easy to setup and teardown for experiments. postgress or mysql.
- Docker - [docker install](https://docs.docker.com/engine/install/#supported-platforms).  
- graphviz - [download](https://www.graphviz.org/download/).  Choose your os.
- python 3.7
    - prefect - `pip install -U "prefect[viz, aws]"` See [prefect install](https://docs.prefect.io/core/getting_started/installation.html) for other possibilities
    - The rest - `pip install requests pandas psycopg2 SQLAlchemy docker boto3 lxml, html5lib` `datetime`
    

## Base pipeline tasks
Context - Our base code will live in a directory called `prefect_guide`. In one file called `get_moc.py`.  Both can be named any valid unix valid names.

### a. Extract MOC imbalance data from the TSX website

The first task is to retrieve data from a table from the [TSX website](https://api.tmxmoney.com/mocimbalance/en/TSX/moc.html)  using `requests`.  The task returns and returns a `dataframe` with the MOC imbalances for the day. 

In [42]:
import pandas as pd
import requests


import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def get_tsx_moc_imb(url: str):
    """
    Scrape the TSX website Market on close website. Data only available weekdays after 15:40 pm Toronto time
    until 12 am.
    
    Use archived url for testing.       
    "https://web.archive.org/web/20200414202757/https://api.tmxmoney.com/mocimbalance/en/TSX/moc.html"
    """
    
    # 1, Get the html content
    html = requests.get(url).content
    
    # 2. Read all the tables
    df_list = pd.read_html(html, header=[0], na_values=[''], keep_default_na=False)
    
    tsx_imb_df = df_list[-1]
    
    logger.info(f"MOC download shape {tsx_imb_df.shape}")

    return tsx_imb_df

#### Run the function

After running the function below the user will get one of three results: 

   1. A dataframe having a shape of `[num_rows > 1, 4]`
   2. A dataframe having a shape of `[0, 4]`
   3. Some connection error .i.e. `RemoteDisconnected`, ... 

No matter what result is returned, it will be handled later in the configuration and orchastration stage. But for the purposes of validating the next step use the `backup_url` given below 

In [43]:
tsx_url = 'https://api.tmxmoney.com/mocimbalance/en/TSX/moc.html'
backup_url = "https://web.archive.org/web/20200414202757/https://api.tmxmoney.com/mocimbalance/en/TSX/moc.html"

tsx_imb_df = get_tsx_moc_imb(backup_url)
print(tsx_imb_df.shape)
tsx_imb_df.head(3)  

(389, 4)


Unnamed: 0,Symbol,Imbalance Side,Imbalance Size,Imbalance Reference Price
0,AAV,BUY,34003,1.725
1,ABX,BUY,460592,34.005
2,ACB,BUY,211790,1.035


### b. Transform the imbalance data

After extracting the data, its time to clean and add features to `tsx_moc_imb_df`. The only new library is `datetime`.
1.  Clean data
    - Convert column names to snake case.
    - Take care of nans.
        - When we extracted the data to the `df`, the `symbol` "NA" got converted to `nan`

2.  Add features
    - Date 
        - The TSX does not provide a date
    - Dollar delta
        - Multipy imbalamce size by imbalance_reference_price

In [44]:
import datetime 

In [50]:
class PrepareLoad(object):
    """
    A class to prepare loading tsx imbalances to the database
    """
    
    def __init__(
        self,
        symbol_clmn = "symbol" ,
        date_clmn = "moc_date",
        dollar_delta_clmn = "dlr_delta",
        idx_clmn_lst = ["moc_date", "symbol"]
        
    ):
        self.symbol_clmn = symbol_clmn
        self.date_clmn = date_clmn
        self.dollar_delta_clmn = dollar_delta_clmn
        self.idx_clmn_lst = idx_clmn_lst
        
    def clean_data(self, df):
        # snake case df columns
        df.columns = ["_".join(nm.split()).lower() for nm in df.columns]
        
        # Handle the symbol NA  (There ae more robust ways to do this but for clarity this was chosen)
        df.loc[df[self.symbol_clmn].isna()==True, self.symbol_clmn]= "NA"
        
        return df 
    
    def add_features(self, df):
        # The tsx does not supply a date with their moc data
        df[self.date_clmn] = datetime.date.today()
        
        # Dollar delta feature
        df[self.dollar_delta_clmn] = df["imbalance_size"] * df["imbalance_reference_price"]
        df[self.dollar_delta_clmn] = df[self.dollar_delta_clmn].astype(int)
        return df
    
    def run(self, df):
        
        cleaned_df =  self.clean_data(df)
        imb_df = self.add_features(cleaned_df)
        
        # Set df index
        imb_df = imb_df.set_index(self.idx_clmn_lst, drop=True, verify_integrity=True).copy()
        
        return imb_df

Execute the `PrepareLoad()` `run` method.  The output will be a dataframe with two additional columns.  This is the data that is loaded to the db.

In [51]:
tranData = PrepareLoad()
imb_df = tranData.run(tsx_imb_df)
imb_df.head(2)

Unnamed: 0_level_0,Unnamed: 1_level_0,imbalance_side,imbalance_size,imbalance_reference_price,dlr_delta
moc_date,symbol,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2020-04-29,AAV,BUY,34003,1.725,58655
2020-04-29,ABX,BUY,460592,34.005,15662430


### c. Load to database

This task takes the transformed data as an input and loads the `imb_df` to a database using `sqlAlchemy`.  There are many ways to accomplish this goal, including some baked into `prefect`. But this is the simplest in terms of writing from a small `df`. 

In [55]:
import sqlalchemy as sa
def df_to_db(df, tbl_name, conn_str):

    engine = sa.create_engine(conn_str)
    
    df.to_sql(
        name=tbl_name,
        con=engine,
        if_exists="append",
        index=True,
        method="multi",
        chunksize=5000
        )
    
    engine.dispose()
  
    return df.shape

Build a connection string using your own credentials for the db

In [56]:
usr_nm = 'something'
pwd = "verysecret"
host = "some_endpoint or ip"
db_nm = "tst"

db_string = f"postgres://{usr_nm}:{pwd}@{host}/{db_nm}"

Run the function.  A new table with the imb data should now be inserted

In [57]:
df_shape = df_to_db(imb_df, tbl_name="moc_tst", conn_str=s.get())
df_shape

(389, 4)

## Stage 2 - Configuration and Orchestration
Configure the tasks

SyntaxError: invalid syntax (<ipython-input-58-b1480fb36f76>, line 1)