
### Question 5: Async Data Pipeline
- Modify Question 3 to write data to the database **asynchronously** .
- Read from the database 5 times *concurrantly* using **async** (hint: `asyncio.gather()`)

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import sqlite3
import asyncio
import aiosqlite
from functools import wraps
from datetime import datetime

In [2]:
connection = sqlite3.connect('metal_prices.db')
cursor = connection.cursor()

In [3]:
cursor.execute("SELECT name FROM sqlite_master WHERE type = 'table';").fetchall()

[('LME_Copper_3MO',),
 ('LME_Aluminum_3MO',),
 ('LME_Zinc_3MO',),
 ('LME_Lead_3MO',),
 ('LME_Tin_3MO',),
 ('Generic_CL_Future',)]

In [8]:
data = pd.read_csv(r"C:\Users\lagan\Desktop\OilDesk-Intern-Assessment\data\MarketData.csv")
data.columns = ['Date','LME_Copper_3MO', 'LME_Aluminum_3MO', 'LME_Zinc_3MO','LME_Lead_3MO', 'LME_Tin_3MO', 'Generic_CL_Future']
data = data.drop(index = data.index[:6])
data['Date'] = pd.to_datetime(data['Date'],dayfirst = True)
## calculating over a two year period, 2020-2021 for copper and zinc
filtered_data = data[(data['Date'].dt.year >= 2020) & (data['Date'].dt.year <= 2021)][['Date', 'LME_Copper_3MO', 'LME_Zinc_3MO']]
filtered_data['LME_Copper_3MO'] = pd.to_numeric(filtered_data['LME_Copper_3MO'])
filtered_data['LME_Zinc_3MO'] = pd.to_numeric(filtered_data['LME_Zinc_3MO'])
filtered_data.head(20)

Unnamed: 0,Date,LME_Copper_3MO,LME_Zinc_3MO
2614,2020-01-01,6174.0,2272.0
2615,2020-01-02,6188.0,2310.0
2616,2020-01-03,6129.5,2306.0
2617,2020-01-06,6138.5,2324.5
2618,2020-01-07,6149.0,2346.0
2619,2020-01-08,6178.0,2403.0
2620,2020-01-09,6180.0,2377.0
2621,2020-01-10,6198.0,2378.0
2622,2020-01-13,6290.0,2378.0
2623,2020-01-14,6302.0,2373.5


In [9]:
copper_df = pd.DataFrame({
    'Date': filtered_data['Date'],
    'LME_Copper_3MO': filtered_data['LME_Copper_3MO']})
zinc_df = pd.DataFrame({
    'Date': filtered_data['Date'],
    'LME_Zinc_3MO': filtered_data['LME_Zinc_3MO']})

In [10]:
def apply_macd_strategy(df, price_column, fast = 9, medium = 12, slow = 26 , initial_capital=1000):
    ''' 
    Function that applies an MACD strategy to a df which contains time series data on last settlment prices
    By default uses the standard (12,9,26)
    
    Inputs = dataframe withh time series data, the name of the column which contains last settlment prices
    Outputs = a df with MACD metrics and a plot showing the MACD line, Signal Line and a histogram to highlight 
    crossover buy/sell points
    
    '''
    
    df['MACD'] = df[price_column].ewm(span=medium).mean() - df[price_column].ewm(span=slow).mean()
    df['Signal_Line'] = df['MACD'].ewm(span=fast).mean()
    df['Histogram'] = df['MACD'] - df['Signal_Line']

    return df[['Date', price_column, 'MACD', 'Signal_Line']]

In [11]:
def calculate_rsi(df, price_column, period=14, over_bought = 70, over_sold = 30):
    """
    Calculates the Relative Strength Index (RSI) for an asset.
    By default uses the standard period of 14 days
    By default over bought level is 70 and over sold is 30 as standard
    
    Inputs: dataframe, price column which contains last settlment price data
    Outputs: a dataframe with RSI metrics, an RSI plot with overbought and oversold lines to indicate buy and sell
    
    """
    df['Change'] = df[price_column].diff()
    df['Gain'] = np.where(df['Change']>0, df['Change'],0)
    df['Loss'] = np.where(df['Change']<0, -df['Change'],0)
    
    df['Avg Gain'] = df['Gain'].rolling(window = 14).mean()
    df['Avg Loss'] = df['Loss'].rolling(window = 14).mean()
    
    df['RS']= df['Avg Gain']/df['Avg Loss']
    df['RSI'] = 100 - (100/(1+df['RS']))
    


    return df[['Date', price_column, 'RSI']]

In [12]:
apply_macd_strategy(copper_df, 'LME_Copper_3MO')
apply_macd_strategy(zinc_df, 'LME_Zinc_3MO')

Unnamed: 0,Date,LME_Zinc_3MO,MACD,Signal_Line
2614,2020-01-01,2272.0,0.000000,0.000000
2615,2020-01-02,2310.0,0.852564,0.473647
2616,2020-01-03,2306.0,0.962296,0.673913
2617,2020-01-06,2324.5,1.656556,1.006787
2618,2020-01-07,2346.0,2.854467,1.556430
...,...,...,...,...
3132,2021-12-27,3519.0,66.849430,41.187284
3133,2021-12-28,3519.0,69.361656,46.822159
3134,2021-12-29,3513.0,70.060845,51.469896
3135,2021-12-30,3532.5,71.365785,55.449074


In [14]:
calculate_rsi(copper_df, 'LME_Copper_3MO')
calculate_rsi(zinc_df, 'LME_Zinc_3MO')

Unnamed: 0,Date,LME_Zinc_3MO,RSI
2614,2020-01-01,2272.0,
2615,2020-01-02,2310.0,
2616,2020-01-03,2306.0,
2617,2020-01-06,2324.5,
2618,2020-01-07,2346.0,
...,...,...,...
3132,2021-12-27,3519.0,75.816417
3133,2021-12-28,3519.0,71.635611
3134,2021-12-29,3513.0,70.756646
3135,2021-12-30,3532.5,70.816327


In [15]:
print(copper_df.tail())
print(zinc_df.tail())

           Date  LME_Copper_3MO       MACD  Signal_Line  Histogram  Change  \
3132 2021-12-27          9568.0  -2.500190   -20.259043  17.758853     0.0   
3133 2021-12-28          9568.0   0.815513   -16.044131  16.859645     0.0   
3134 2021-12-29          9680.5  12.378352   -10.359635  22.737987   112.5   
3135 2021-12-30          9691.5  22.173985    -3.852911  26.026896    11.0   
3136 2021-12-31          9720.5  31.909321     3.299536  28.609786    29.0   

       Gain  Loss   Avg Gain   Avg Loss        RS        RSI  
3132    0.0   0.0  38.571429  40.500000  0.952381  48.780488  
3133    0.0   0.0  34.464286  40.500000  0.850970  45.974273  
3134  112.5   0.0  42.500000  32.071429  1.325167  56.992337  
3135   11.0   0.0  43.285714  30.071429  1.439430  59.006816  
3136   29.0   0.0  45.357143  25.857143  1.754144  63.691073  
           Date  LME_Zinc_3MO       MACD  Signal_Line  Histogram  Change  \
3132 2021-12-27        3519.0  66.849430    41.187284  25.662145     0.0   
3

In [16]:
## defining the decorator to log sql insert executions
def log_execution(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        print(f"Executing {func.__name__} at {datetime.now()}")
        result = func(*args, **kwargs)
        print(f"Execution completed at {datetime.now()}")
        return result
    return wrapper

In [17]:
## function that adds the desired columns to the desired tables

def alter_metal(connection, tables, cols):
    cursor = connection.cursor()

    for table in tables:
        cursor.execute(f"PRAGMA table_info({table})")
        existing_columns = [row[1] for row in cursor.fetchall()]  
        for col in cols:
            if col not in existing_columns:
                cursor.execute(f"ALTER TABLE {table} ADD COLUMN {col} REAL DEFAULT NULL")
                print(f"Added column {col} to table {table}.")
            else:
                print(f"Column {col} already exists in table {table}.")

    connection.commit()
    cursor.close()


def main():
    connection = sqlite3.connect('metal_prices.db')
    
    tables = ['LME_Zinc_3MO', 'LME_Copper_3MO']
    cols = ['MACD', 'Signal_Line', 'RSI']

    alter_metal(connection, tables, cols)

    connection.close()

main()


Column MACD already exists in table LME_Zinc_3MO.
Column Signal_Line already exists in table LME_Zinc_3MO.
Column RSI already exists in table LME_Zinc_3MO.
Column MACD already exists in table LME_Copper_3MO.
Column Signal_Line already exists in table LME_Copper_3MO.
Column RSI already exists in table LME_Copper_3MO.


In [18]:
## chekcing that the columns have been added correctly
print(cursor.execute("PRAGMA table_info('LME_Copper_3MO')").fetchall())
print(cursor.execute("PRAGMA table_info('LME_Zinc_3MO')").fetchall())

[(0, 'date', 'DATE', 0, None, 1), (1, 'price', 'REAL', 0, None, 0), (2, 'MACD', 'REAL', 0, 'NULL', 0), (3, 'Signal_Line', 'REAL', 0, 'NULL', 0), (4, 'RSI', 'REAL', 0, 'NULL', 0)]
[(0, 'date', 'DATE', 0, None, 1), (1, 'price', 'REAL', 0, None, 0), (2, 'MACD', 'REAL', 0, 'NULL', 0), (3, 'Signal_Line', 'REAL', 0, 'NULL', 0), (4, 'RSI', 'REAL', 0, 'NULL', 0)]


In [23]:
(copper_df['Date']).dtypes

dtype('<M8[ns]')

In [24]:
## executes the SQL update statement and commits the changes asynchronously
@log_execution
async def update_metal_data(connection, metal, Date, MACD=None, Signal_Line=None, RSI=None):
    if isinstance(Date, int):
        Date = datetime.utcfromtimestamp(Date)
    Date = Date.strftime('%Y-%m-%d')
    async with connection.cursor() as cursor:
        await cursor.execute(f"""
            UPDATE {metal}
            SET MACD = ?, Signal_Line = ?, RSI = ?
            WHERE Date = ?
        """, (MACD, Signal_Line, RSI, Date))
        await connection.commit()
        
## asynchronously adds to the update_tasks list, once all the rows are processed, gathers and runs concurrently
@log_execution
async def update_data_from_dataframe(connection, dataframe, metals):
    update_tasks = []

    for index, row in dataframe.iterrows():
        Date = row.name
        MACD = row['MACD']
        Signal_Line = row['Signal_Line']
        RSI = row['RSI']
        for metal in metals:
            update_tasks.append(
                update_metal_data(connection, metal, Date, MACD, Signal_Line, RSI)
            )

    await asyncio.gather(*update_tasks)

## defines the list of metals and df for each metal
## establishes an asynchronous connection to the db and creates tasks to update
## data for each metal - waits for all uodate tasks to be complete

async def main():
    metals = ['LME_Copper_3MO', 'LME_Zinc_3MO']
    dataframes = {'LME_Copper_3MO': copper_df, 'LME_Zinc_3MO': zinc_df}

    
    async with aiosqlite.connect('metal_prices.db') as connection:
        update_tasks = [
            update_data_from_dataframe(connection, df, [metal])
            for metal, df in dataframes.items()
        ]

        
        await asyncio.gather(*update_tasks)

## runs the main() function in an asychronous context
await main()


Executing update_data_from_dataframe at 2024-11-17 23:25:20.715510
Execution completed at 2024-11-17 23:25:20.715510
Executing update_data_from_dataframe at 2024-11-17 23:25:20.715510
Execution completed at 2024-11-17 23:25:20.715510
Executing update_metal_data at 2024-11-17 23:25:20.719837
Execution completed at 2024-11-17 23:25:20.719837
Executing update_metal_data at 2024-11-17 23:25:20.719837
Execution completed at 2024-11-17 23:25:20.719837
Executing update_metal_data at 2024-11-17 23:25:20.719837
Execution completed at 2024-11-17 23:25:20.719837
Executing update_metal_data at 2024-11-17 23:25:20.719837
Execution completed at 2024-11-17 23:25:20.719837
Executing update_metal_data at 2024-11-17 23:25:20.721190
Execution completed at 2024-11-17 23:25:20.721190
Executing update_metal_data at 2024-11-17 23:25:20.721190
Execution completed at 2024-11-17 23:25:20.721190
Executing update_metal_data at 2024-11-17 23:25:20.721190
Execution completed at 2024-11-17 23:25:20.721190
Executing 

Executing update_metal_data at 2024-11-17 23:25:20.973802
Execution completed at 2024-11-17 23:25:20.973802
Executing update_metal_data at 2024-11-17 23:25:20.973802
Execution completed at 2024-11-17 23:25:20.973802
Executing update_metal_data at 2024-11-17 23:25:20.975275
Execution completed at 2024-11-17 23:25:20.975275
Executing update_metal_data at 2024-11-17 23:25:20.975275
Execution completed at 2024-11-17 23:25:20.975275
Executing update_metal_data at 2024-11-17 23:25:20.975275
Execution completed at 2024-11-17 23:25:20.975275
Executing update_metal_data at 2024-11-17 23:25:20.975275
Execution completed at 2024-11-17 23:25:20.976294
Executing update_metal_data at 2024-11-17 23:25:20.976294
Execution completed at 2024-11-17 23:25:20.976294
Executing update_metal_data at 2024-11-17 23:25:20.976294
Execution completed at 2024-11-17 23:25:20.976294
Executing update_metal_data at 2024-11-17 23:25:20.976294
Execution completed at 2024-11-17 23:25:20.976294
Executing update_metal_data 

In [21]:
## reads and returns the data from the database for a specified metal between a start and end date.
@log_execution
async def read_data_from_db(connection, metal, start_date, end_date):
    async with connection.cursor() as cursor:
        result = await cursor.execute(f"""
            SELECT Date, MACD, Signal_Line, RSI
            FROM {metal}
            WHERE Date BETWEEN ? AND ?
        """, (start_date, end_date))
        rows = await result.fetchall()
        return rows
## main function the reads data for multiple metals from the database asynchronously
async def main():
    metals = ['LME_Copper_3MO', 'LME_Zinc_3MO']
    start_date = '2020-01-01'
    end_date = '2021-12-31'

    async with aiosqlite.connect('metal_prices.db') as connection:
        read_tasks = [
            read_data_from_db(connection, metals[0], start_date, end_date) for _ in range(5)
        ]
        results = await asyncio.gather(*read_tasks)

        for i, result in enumerate(results, 1):
            print(f"Read {i}: {result}")

await main()

Executing read_data_from_db at 2024-11-17 23:13:39.278813
Execution completed at 2024-11-17 23:13:39.278813
Executing read_data_from_db at 2024-11-17 23:13:39.278813
Execution completed at 2024-11-17 23:13:39.278813
Executing read_data_from_db at 2024-11-17 23:13:39.278813
Execution completed at 2024-11-17 23:13:39.278813
Executing read_data_from_db at 2024-11-17 23:13:39.278813
Execution completed at 2024-11-17 23:13:39.278813
Executing read_data_from_db at 2024-11-17 23:13:39.278813
Execution completed at 2024-11-17 23:13:39.278813
Read 1: [('2020-01-01', 0.0, 0.0, None), ('2020-01-02', 0.3141025641016313, 0.17450142450090628, None), ('2020-01-03', -1.413852487659824, -0.4764632935977536, None), ('2020-01-06', -1.8401688124531574, -0.9384231577086627, None), ('2020-01-07', -1.615263376207622, -1.1397678205177295, None), ('2020-01-08', -0.20964614667809656, -0.8876531331425146, None), ('2020-01-09', 0.7860043281398248, -0.46409507881011725, None), ('2020-01-10', 2.2970571863688747, 0.