Question 3: Data Pipeline and Transformation

Objective: Show understanding of creating data pipelines and transformations.

Task:

Using the CSV file from Question 1, filter the data to include only 'Copper' and 'Zinc' for the year 2020 & 2021.
Demonstrate the use of asynchronous programming to calculate MACD (slow/medium/fast) and RSI for each metal historically.
Use SQL inserts to populate the SQL table created in Question 2 with this generated data.
Demonstrate the use of a context manager to handle the connection to the database.
Demonstrate the use of a decorator to log the execution of the asynchronous SQL inserts.

In [1]:
import pandas as pd
import numpy as np
import asyncio
from time import perf_counter
import talib as ta  # TA-Lib for technical indicators
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, Float
from sqlalchemy.ext.asyncio import create_async_engine
import functools

In [2]:
# load the CSV file
df = pd.read_csv("../data/MarketData.csv",skiprows=6)

df.rename(columns={'PX_SETTLE': 'Copper', 'PX_SETTLE.2': 'Zinc'}, inplace=True) #rename columns with copper and zinc prices
df = df[['Dates','Copper','Zinc']] #keep only these columns
df['Dates'] = pd.to_datetime(df['Dates'],dayfirst=True)  #convert dates column to datetime type
df = df[(df['Dates'].dt.year == 2021)  | (df['Dates'].dt.year == 2020)] #filter to only include 2021 year


In [3]:
# Define a function to calculate MACD and RSI for a given metal's data
def calculate_technical_indicators(metal_data):
    # calculate macd
    macd, macd_signal, macd_hist = ta.MACD(metal_data)
    # calculate rsi
    rsi = ta.RSI(metal_data)
    return macd, rsi

def add_technical_indicators_col(df, col_name):
    df[col_name+"_MACD"], df[col_name+"_RSI"] = calculate_technical_indicators(df[col_name])

In [4]:
def sync_main():
    add_technical_indicators_col(df, 'Copper')
    add_technical_indicators_col(df, 'Zinc')

In [5]:
# Define a function to calculate MACD and RSI for a given metal's data asynchronously
async def calculate_technical_indicators_async(metal_data):
    # Calculate MACD
    macd, macd_signal, macd_hist = await asyncio.to_thread(ta.MACD, metal_data)
    # Calculate RSI
    rsi = await asyncio.to_thread(ta.RSI, metal_data)
    return macd, rsi

# Modify the add_technical_indicators_col function to be asynchronous
async def add_technical_indicators_col_async(df, col_name):
    macd, rsi = await calculate_technical_indicators_async(df[col_name])
    df[col_name+"_MACD"] = macd
    df[col_name+"_RSI"] = rsi

In [6]:
# Create an event loop
async def async_main():
    
    # Create tasks to add technical indicators asynchronously
    tasks = [
        add_technical_indicators_col_async(df, 'Copper'),
        add_technical_indicators_col_async(df, 'Zinc')
    ]

    # Wait for all tasks to complete
    await asyncio.gather(*tasks)

await async_main()
df

Unnamed: 0,Dates,Copper,Zinc,Copper_MACD,Copper_RSI,Zinc_MACD,Zinc_RSI
2608,2020-01-01,6174.0,2272.0,,,,
2609,2020-01-02,6188.0,2310.0,,,,
2610,2020-01-03,6129.5,2306.0,,,,
2611,2020-01-06,6138.5,2324.5,,,,
2612,2020-01-07,6149.0,2346.0,,,,
...,...,...,...,...,...,...,...
3126,2021-12-27,9568.0,3519.0,-2.500190,51.692406,66.849430,66.260897
3127,2021-12-28,9568.0,3519.0,0.815513,51.692406,69.361656,66.260897
3128,2021-12-29,9680.5,3513.0,12.378352,56.618811,70.060845,65.418907
3129,2021-12-30,9691.5,3532.5,22.173985,57.079706,71.365785,66.891416


In [7]:
# Define an aqlalchemy database connection
database_uri = "sqlite+aiosqlite:///../data/my_database.db"

engine = create_async_engine(database_uri,echo=False)

metadata = MetaData()

#creating a new table to store indicator data, since unsure how it should be stored in metalPrices table from question2
technical_indicators = Table(
    'TechnicalIndicators',
    metadata,
    Column('ID', Integer, primary_key=True),
    Column('Date', String),  # Adjust data type as per your date format
    Column('Copper', Float),
    Column('Zinc', Float),
    Column('Copper_MACD', Float),
    Column('Copper_RSI', Float),
    Column('Zinc_MACD', Float),
    Column('Zinc_RSI', Float)
)

# Reflect the existing 'MetalPrices' table from the database
#metal_prices = Table('MetalPrices', metadata, autoload=True, autoload_with=engine)

# Create the table in the database
async def create_table():
    async with engine.begin() as conn:
        await conn.run_sync(metadata.create_all)
        
# reflect the metalPrices table that was created in question 2 from the database
async def reflect_table():
    async with engine.begin() as conn:
        
        await conn.run_sync(metadata.reflect, only=["MetalPrices"])
        metal_prices = Table("MetalPrices", metadata, autoload_with=engine)
        return metal_prices

await create_table()
metal_prices = await reflect_table()

In [8]:
def insert_data():
    time_before = perf_counter()
    with engine.connect() as conn:
        for _, row in df.iterrows():
            insert_query = technical_indicators.insert().values(Date=str(row['Dates']), Copper=row['Copper'], Zinc=row['Zinc'],
                                                                Copper_MACD=row['Copper_MACD'],Copper_RSI=row['Copper_RSI'], 
                                                                Zinc_MACD=row['Zinc_MACD'], Zinc_RSI=row['Zinc_RSI'])
            conn.execute(insert_query)
    print("sync time taken: ",perf_counter()-time_before)

In [9]:
# Define the decorator to help log sql inserts
def log_insert(func):
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        row = args[0]
        try:
            result = await func(*args, **kwargs)
            print(f"Row inserted: Date={row['Dates']}, Copper={row['Copper']}, Zinc={row['Zinc']}, "
                  f"Copper_MACD={row['Copper_MACD']}, Copper_RSI={row['Copper_RSI']}, "
                  f"Zinc_MACD={row['Zinc_MACD']}, Zinc_RSI={row['Zinc_RSI']}")
            return result
        except Exception as e:
            print(f"Error inserting row: Date={row['Dates']}, Error: {str(e)}")
            raise e
    return wrapper


async def insert_data_async():
    time_before = perf_counter()
    async with engine.connect() as conn:
        await asyncio.gather(*[insert_row_async(row, conn) for _,row in df.iterrows()])
        await conn.commit()
    print("sync time taken: ",perf_counter()-time_before)

@log_insert
async def insert_row_async(row, conn):
    
    insert_query = technical_indicators.insert().values(Date=str(row['Dates']), Copper=row['Copper'], Zinc=row['Zinc'],
                                                                Copper_MACD=row['Copper_MACD'],Copper_RSI=row['Copper_RSI'], 
                                                                Zinc_MACD=row['Zinc_MACD'], Zinc_RSI=row['Zinc_RSI'])
    await conn.execute(insert_query)


async def insert_data_async_metal_prices():
    time_before = perf_counter()
    async with engine.connect() as conn:
        await asyncio.gather(*[insert_row_async_metal_prices(row,'Copper', conn) for _,row in df.iterrows()],
                             *[insert_row_async_metal_prices(row,'Zinc', conn) for _,row in df.iterrows()],
                             *[insert_row_async_metal_prices(row,'Copper_MACD', conn) for _,row in df.iterrows()],
                             *[insert_row_async_metal_prices(row,'Copper_RSI', conn) for _,row in df.iterrows()],
                             *[insert_row_async_metal_prices(row,'Zinc_MACD', conn) for _,row in df.iterrows()],
                             *[insert_row_async_metal_prices(row,'Zinc_RSI', conn) for _,row in df.iterrows()])
        await conn.commit()
    print("sync time taken: ",perf_counter()-time_before)
    

@log_insert
async def insert_row_async_metal_prices(row, price_col, conn):
    insert_query = metal_prices.insert().values(Date=str(row['Dates']), Metal = price_col, Price=row[price_col])
    await conn.execute(insert_query)
    

In [10]:
await insert_data_async() #insert the data into the indicator data table
await insert_data_async_metal_prices() #insert the data into the metalprices table from question 2

Row inserted: Date=2020-01-01 00:00:00, Copper=6174.0, Zinc=2272.0, Copper_MACD=nan, Copper_RSI=nan, Zinc_MACD=nan, Zinc_RSI=nan
Row inserted: Date=2020-01-02 00:00:00, Copper=6188.0, Zinc=2310.0, Copper_MACD=nan, Copper_RSI=nan, Zinc_MACD=nan, Zinc_RSI=nan
Row inserted: Date=2020-01-03 00:00:00, Copper=6129.5, Zinc=2306.0, Copper_MACD=nan, Copper_RSI=nan, Zinc_MACD=nan, Zinc_RSI=nan
Row inserted: Date=2020-01-06 00:00:00, Copper=6138.5, Zinc=2324.5, Copper_MACD=nan, Copper_RSI=nan, Zinc_MACD=nan, Zinc_RSI=nan
Row inserted: Date=2020-01-07 00:00:00, Copper=6149.0, Zinc=2346.0, Copper_MACD=nan, Copper_RSI=nan, Zinc_MACD=nan, Zinc_RSI=nan
Row inserted: Date=2020-01-08 00:00:00, Copper=6178.0, Zinc=2403.0, Copper_MACD=nan, Copper_RSI=nan, Zinc_MACD=nan, Zinc_RSI=nan
Row inserted: Date=2020-01-09 00:00:00, Copper=6180.0, Zinc=2377.0, Copper_MACD=nan, Copper_RSI=nan, Zinc_MACD=nan, Zinc_RSI=nan
Row inserted: Date=2020-01-10 00:00:00, Copper=6198.0, Zinc=2378.0, Copper_MACD=nan, Copper_RSI=n