# Scheduled data pipeline to load VAT rates

**WORKFLOW MANAGEMENT TOOL SELECTION**

<img src="resources/images/etl-comparison.png" width="850"/>

**ARCHITECTURE DEFINITION**

- **Harbor**: Harbor is an open source cloud native registry that stores, signs, and scans container images for vulnerabilities.
- **AWS Batch**: Dynamically allocates the resources to run the container, so there's no need to run a machine all the time.
- **Cloudwatch**: Monitoring and observability service. Run pipeline at 00:00 am (UTC) every day
- **Snowflake**: Snowflake comes with a Python connector, allowing us to use code to fire SQL queries to Snowflake to transform and aggregate data.

**DATA MERGE AND VERSIONING ON TARGET TABLES**

First idea which comes to mind is that we can overwrite the existing information and forget about the previous one. But by doing so, important information is deleted.

Ideas:
1. Create a daily version of the VATRates table? -> Might be inefficient - data should not change that frequently.
2. We can use the <code>last_updated</code> value of the JSON to check if data was changed, added (countries), or deleted (countries). 
    - First option: In case data has changed (<code>last_updated</code> is greater than the previous <code>last_updated</code> date), we insert new values to the target table (VATRates) with the new date. This is easier to do, but querying might get tricky as there will be duplicate values. Also, as there will 2+ IDs to represent 1 record, it would become really hard to link other tables we might add in the future.
    - **Second option**: In case data has changed (<code>last_updated</code> is greater than the previous <code>last_updated</code> date), then we move the records in VATRates table to VATRatesHistory table and update the values in the target table. This process is more complex, it requires an additional table (but might allow to visualize data better), and querying would be easier.


**ARCHITECTURE DIAGRAM**

![title](resources/images/pipeline-architecture.png)

**PREFECT WORKFLOW STRUCTURE**

In [1]:
import json
import requests
import pandas as pd
import pytz
from datetime import datetime, timedelta
from prefect import task, Flow, Parameter
import snowflake.connector

In [2]:
@task
def extract(url: str) -> dict:
    res = requests.get(url)
    if not res:
        raise Exception('No data fetched')
    return json.loads(res.content)

In [3]:
@task
def connect_db():
    """
    Connect to Snowflake database and return cursor
    """
    # con_eb = snowflake.connector.connect(...)         
    # return con_eb.cursor()
    return ''

In [4]:
@task
def query_db(cursor) -> bool:
    query = 'SELECT * FROM VATRates;'
    """
        With query and connection get if table has data already loaded
    """
    # cursor.execute(query)
    # exists = bool(cur.fetchone())
    # return exists
    return True

In [5]:
def transform_df(data: dict) -> pd.DataFrame:
    print(data)
    """
        Transform data:
        Convert last_updated string to datetime type 
        Create dataframe with rates information and last_updated date value
        Replace missing values with 0's
        Transform numeric column types to float
    """
    return pd.DataFrame([])

In [6]:
@task
def transform(data: dict, data_loaded: bool) -> pd.DataFrame:
    """
        Transform process will be carried out only if it's the first load or data needs to be updated
    """
    df = pd.DataFrame([])
    last_updated = datetime.strptime(data['last_updated'], "%Y-%m-%dT%H:%M%z")
    update_needed = False

    if data_loaded:
        now_date = datetime.now().replace(tzinfo=pytz.UTC)
        if now_date-timedelta(hours=24) <= last_updated <= now_date:
            df = transform_df(data)
            update_needed = True
    else:
        df = transform_df(data)
            
    return {'vat_rates': df, 'update_needed': update_needed}

In [7]:
@task
def load(data: dict, db_cursor) -> None:
    """
        Load data into Snowflake database only if it's the first load or data needs to be updated
    """
    if len(data['vat_rates']) > 0:
        if data['update_needed']:
            print('Executing transaction...')
            # Create query for transaction:
            # 1. Insert data from VATRates to VATRatesHistory
            # 2. Delete data from VATRates
            # 3. Create insert query from dataframe
            # 4. Execute transaction: db_cursor.execute(query)
        else:
            print('Inserting data to table for first time...')
            # 1. Create insert query from dataframe
            # 2. Execute query: db_cursor.execute(query)

In [8]:
def prefect_flow():
    with Flow(name='vat_rates_etl_pipeline') as flow:
        param_url = Parameter(name='rates_url', required=True)
        # Database connection
        db_cursor = connect_db()
        data_loaded = query_db(db_cursor)
        
        # Extract data from GitHub
        vat_info = extract(url=param_url)
        
        # Transform data
        data = transform(vat_info, data_loaded)
        
        # Load data to database
        load(data, db_cursor)
    return flow

In [9]:
if __name__ == '__main__':
    flow = prefect_flow()
    flow.run(parameters={
        'rates_url': 'https://raw.githubusercontent.com/benbucksch/eu-vat-rates/ce7a777b7bbdcc94e352d816282647271f6baebf/rates.json'
    })

[2022-03-04 10:19:25-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'vat_rates_etl_pipeline'
[2022-03-04 10:19:25-0500] INFO - prefect.TaskRunner | Task 'connect_db': Starting task run...
[2022-03-04 10:19:26-0500] INFO - prefect.TaskRunner | Task 'connect_db': Finished task run for task with final state: 'Success'
[2022-03-04 10:19:26-0500] INFO - prefect.TaskRunner | Task 'rates_url': Starting task run...
[2022-03-04 10:19:26-0500] INFO - prefect.TaskRunner | Task 'rates_url': Finished task run for task with final state: 'Success'
[2022-03-04 10:19:26-0500] INFO - prefect.TaskRunner | Task 'query_db': Starting task run...
[2022-03-04 10:19:26-0500] INFO - prefect.TaskRunner | Task 'query_db': Finished task run for task with final state: 'Success'
[2022-03-04 10:19:26-0500] INFO - prefect.TaskRunner | Task 'extract': Starting task run...
[2022-03-04 10:19:26-0500] INFO - prefect.TaskRunner | Task 'extract': Finished task run for task with final state: 'Success'
[2022-03-04 1