# Question 5: Async Data Pipeline
Modify Question 3 to write data to the database asynchronously. \
Read from the database 5 times concurrantly using async (hint: asyncio.gather())

## Solution

For this question I adapted the code from Q3 by connecting to the database with aiosqlite. I wrote two functions that interact with the database async:
- 'update_metal_indicators' to update the MACD and RSI indicators.
- 'read_data' to read all the data from one metal, given a time range, and return it into pd.DataFrame. 

For the sake of this exercise I read 5 times the same data, returning identical dataframes.

In [72]:
import os
from datetime import timedelta
import asyncio
import aiosqlite
import time
import logging

import pandas as pd
import numpy as np

logging.basicConfig(level=logging.INFO)

### Functions to calculate the indicators ###


def calculate_MACD(
    time_data: pd.Series, short_window: int = 12, long_window: int = 26
) -> pd.Series:
    """Calculate the Moving Average Convergence Divergence (MACD) for a given time series.
    INPUTS:
    - data: pd.Series, the time series data
    - short_window, long_window: int, the short and long window periods for MACD line (EMA_short - EMA_long).
    OUTPUTS:
    - macd: pd.Series, as per definition
    """
    short_EMA = time_data.ewm(span=short_window, adjust=False).mean()
    long_EMA = time_data.ewm(span=long_window, adjust=False).mean()
    return short_EMA - long_EMA


def calculate_RSI(time_data: pd.Series, window: int = 14) -> pd.Series:
    """Calculate the Relative Strength Index (RSI) for a given time series.
    The Relative Strenght (RS) is a EMA of the gains and losses with smoothing factor based on 'window'.
    INPUTS:
    - data: pd.Series, the time series data
    - window: int, the window period for RSI
    OUTPUTS:
    - rsi: pd.Series, as per definition
    """
    delta = time_data.diff()
    gain = delta.where(delta > 0, other=0)
    loss = -delta.where(delta < 0, other=0)
    avg_gain = gain.ewm(com=window).mean()
    avg_loss = loss.ewm(com=window).mean()
    rs = avg_gain / avg_loss
    rsi = 100 - (100 / (1 + rs))
    return rsi


### Functions to handle the database ###


def log_execution(func):
    async def wrapper(*args, **kwargs):
        if func.__name__ == "update_metal_indicators":
            logging.info(
                f"Updating metal indicators for {args[1]} from {args[2]} to {args[3]}."
            )
        elif func.__name__ == "read_data":
            logging.info(f"Reading data from {args[0]} to {args[1]} for {args[2]}.")
        else:
            logging.info(
                f"Running function {func.__name__} with arguments: {args} and keyword arguments: {kwargs}."
            )
        start_execution = time.time()
        # Await the function call, otherwise it will return a coroutine and not run the function
        result = await func(*args, **kwargs)
        end_execution = time.time()
        if func.__name__ == "update_metal_indicators":
            logging.info(
                f"Finished updating metal indicators for {args[1]} in {end_execution - start_execution:.2f} seconds."
            )
        elif func.__name__ == "read_data":
            logging.info(
                f"Finished reading data for {args[2]} between {args[0]} and {args[1]} in {end_execution - start_execution:.2f} seconds."
            )
        else:
            logging.info(
                f"Finished execution of {func.__name__} in {end_execution - start_execution:.2f} seconds."
            )
        return result

    return wrapper


@log_execution
async def update_metal_indicators(
    df: pd.DataFrame, metal: str, start_date: pd.Timestamp, end_date: pd.Timestamp
) -> None:
    """Update the MACD and RSI indicators for a given metal in the database asynchronously.
    INPUTs:
    - df: pd.DataFrame, the data with the selected metals, dates, MACD and RSI
    - metal: str, the metal to update
    - start_date, end_date: pd.Timestamp, the start and end dates of the period to update
    """
    try:
        async with aiosqlite.connect(f"{os.pardir}/market_data.db") as conn:
            async with conn.cursor() as cur:
                # Check the columns exist in the database
                await cur.execute("PRAGMA table_info(MetalPrices);")
                columns_info = await cur.fetchall()
                column_names = [column[1] for column in columns_info]
                if ("MACD" not in column_names) or ("RSI" not in column_names):
                    raise ValueError(
                        "The database table MetalPrices does not have MACD or RSI columns."
                    )

                # Select only dates of interest
                df_filtered = df[
                    (df["Dates"] >= start_date) & (df["Dates"] <= end_date)
                ]

                # Loop through rows and update MACD and RSI for the specified metal
                for _, row in df_filtered.iterrows():
                    # Convert the date to ISO format (YYYY-MM-DD)
                    date_str = row["Dates"].strftime("%Y-%m-%d")

                    await cur.execute(
                        """
                        UPDATE MetalPrices
                        SET MACD = ?, RSI = ?
                        WHERE Date = ? AND Metal = ?;
                    """,
                        (row[f"MACD_{metal}"], row[f"RSI_{metal}"], date_str, metal),
                    )

                await conn.commit()
    except Exception as e:
        logging.error(f"Error updating metal indicators: {e}")
    finally:
        if conn:
            await conn.close()


@log_execution
async def read_data(start_date: str, end_date: str, metal: str) -> pd.DataFrame:
    """From start_date to end_date, returns the data from the database for specified metal.
    INPUTs:
    - start_date, end_date: str, YYYY-MM-DD
    - metal: str, name of the metal (e.g. 'COPPER')
    OUTPUT:
    - df: DataFrame, with columns ['Date', 'Metal', 'Price']
    """
    try:
        async with aiosqlite.connect(f"{os.pardir}/market_data.db") as conn:
            async with conn.cursor() as cur:
                # Get the data matching the date range and metal
                query = """
                    SELECT * FROM MetalPrices
                    WHERE Date BETWEEN ? AND ? AND Metal = ?;
                """
                await cur.execute(query, (start_date, end_date, metal))
                rows = await cur.fetchall()
                if len(rows) == 0:
                    logging.warning(
                        f"No data found for {metal} between {start_date} and {end_date}."
                    )

                # Get all the columns names from the database
                await cur.execute("PRAGMA table_info(MetalPrices);")
                columns_info = await cur.fetchall()
                column_names = [column[1] for column in columns_info]

                # Convert to DataFrame
                df = pd.DataFrame(rows, columns=column_names)

    except Exception as e:
        logging.error(f"Error {e} occurred while reading from the database")
        df = None
    finally:
        if conn:
            await conn.close()
    return df


### Function to get the data from the CSV file ###


def get_metal_data(
    data_path: str,
    metals: list[str],
    start_date: pd.Timestamp,
    end_date: pd.Timestamp,
    padding: int,
) -> pd.DataFrame:
    """Get metals prices from the CSV file for a given period + initial padding.
    INPUTs:
    - data_path: str, path to the CSV file
    - metals: list of str, metals to select
    - start_date: pd.Timestamp, start date of the period
    - end_date: pd.Timestamp, end date of the period
    - padding: int, number of days to pad the start date for later calculations
    OUTPUTs:
    - df: pd.DataFrame, the data with the selected metals and dates
    """
    # Load the data ###
    raw_df = pd.read_csv(data_path)
    # Get columns names with selected metals
    columns_metals = raw_df.iloc[2, 1:-1].values
    columns_metals = [col.split(" ")[1] for col in columns_metals]
    # Get the mask of the columns to select, the first column is the date
    mask_columns = np.concatenate(([True], np.isin(columns_metals, metals), [False]))
    # Select only columns in the mask, also the first 6 rows are headers
    df = raw_df.iloc[6:, mask_columns].copy()
    df.columns = np.concatenate((["Dates"], metals))
    # Ensure the data types are correct. the format dd/mm/yyyy works better for pandas, but needs to be converted to yyyy-mm-dd in the database
    df["Dates"] = pd.to_datetime(df["Dates"], format="%d/%m/%Y", errors="coerce")
    for metal in metals:
        df[metal] = pd.to_numeric(df[metal], errors="coerce")
    # Select only the indicated period with a padding at the beginning
    cutoff_date = start_date - timedelta(days=padding)
    df = df[(df["Dates"] > cutoff_date) & (df["Dates"] <= end_date)]
    # Check the data doesn't have any missing values, otherwise it needs addressing
    if not (df.isna().sum() == 0).all():
        raise ValueError(
            "There are NaN values in the data. Please check the data and try again."
        )
    return df

In [73]:
### Parameters ###

data_path = f"{os.pardir}/data/MarketData.csv"
# Metals to select
metals = ["COPPER", "ZINC"]
# Use a padding to calculate the EMA and RSI without a starting bias
padding = 50
# Select only 2020 and 2021
start_date = pd.to_datetime("01/01/2020", format="%d/%m/%Y")
end_date = pd.to_datetime("31/12/2021", format="%d/%m/%Y")


# Get the data
df = get_metal_data(data_path, metals, start_date, end_date, padding)

# Calculate the indicators
for metal in metals:
    df[f"MACD_{metal}"] = calculate_MACD(df[metal])
    df[f"RSI_{metal}"] = calculate_RSI(df[metal])

# Update the database concurrently
tasks = [update_metal_indicators(df, metal, start_date, end_date) for metal in metals]
await asyncio.gather(*tasks)

# Read the data from the database concurrently
tasks = [
    read_data(start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d"), metals[0])
    for _ in range(5)
]
all_data = await asyncio.gather(*tasks)

INFO:root:Updating metal indicators for COPPER from 2020-01-01 00:00:00 to 2021-12-31 00:00:00.
INFO:root:Updating metal indicators for ZINC from 2020-01-01 00:00:00 to 2021-12-31 00:00:00.
INFO:root:Finished updating metal indicators for COPPER in 0.03 seconds.
INFO:root:Finished updating metal indicators for ZINC in 0.07 seconds.
INFO:root:Reading data from 2020-01-01 to 2021-12-31 for COPPER.
INFO:root:Reading data from 2020-01-01 to 2021-12-31 for COPPER.
INFO:root:Reading data from 2020-01-01 to 2021-12-31 for COPPER.
INFO:root:Reading data from 2020-01-01 to 2021-12-31 for COPPER.
INFO:root:Reading data from 2020-01-01 to 2021-12-31 for COPPER.
INFO:root:Finished reading data for COPPER between 2020-01-01 and 2021-12-31 in 0.03 seconds.
INFO:root:Finished reading data for COPPER between 2020-01-01 and 2021-12-31 in 0.03 seconds.
INFO:root:Finished reading data for COPPER between 2020-01-01 and 2021-12-31 in 0.03 seconds.
INFO:root:Finished reading data for COPPER between 2020-01-