# Question 5 Solution

## Setting Up Environment 

In [1]:
#import libraries
import asyncio
import pandas as pd
import pandas_ta as ta
import logging
import functools
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession #library to write data to the database asynchronously
from sqlalchemy.orm import sessionmaker
from sqlalchemy import text
from joblib import Parallel, delayed

# Setup logging format
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Create asynchronous connection to 'master' database
engine = create_async_engine(
    "mssql+aioodbc://sa:!Hartree123!@localhost:1433/master?"
    "driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes",
    echo=False
)

# Create an asynchronous session object for executing SQL commands
AsyncSessionLocal = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)

## Asynchronous Database Insert with Logging & Data Reading

I use a log_insert decorator to wrap asynchronous functions, specificllay the bulk insert function. Then the bulk insert function uses asynchronous methods to ensure that the operation does not block the event loop, so improving the performance when dealing with I/O-bound tasks.

In [2]:
# Async logging decorator: Log start and end of async function
def log_insert(func):
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        logging.info("Starting SQL insert operation.")
        result = await func(*args, **kwargs)
        logging.info("Completed SQL insert operation.")
        return result
    return wrapper

#Async bulk insert - insertin records into metal_prices table
@log_insert
async def async_bulk_insert(session, records):
    insert_stmt = text("""
        INSERT INTO metal_prices (
            date, metal, price,
            macd_fast, macds_fast, macdh_fast,
            macd_med, macds_med, macdh_med,
            macd_slow, macds_slow, macdh_slow,
            rsi
        ) VALUES (
            :date, :metal, :price,
            :macd_fast, :macds_fast, :macdh_fast,
            :macd_med, :macds_med, :macdh_med,
            :macd_slow, :macds_slow, :macdh_slow,
            :rsi
        )
    """)
    await session.execute(insert_stmt, records)
    await session.commit()

# async data read - retriveing and printing record count
async def async_read_data():
    """Reads from the database; for example, returns a count of records."""
    async with AsyncSessionLocal() as session:
        result = await session.execute(text("SELECT COUNT(*) FROM metal_prices"))
        count = result.scalar()
        print(f"Record count: {count}")
        return count

## Processing Metal Data with Technical Indicators

I refactored the MACD calculation method from my solution in Question 3 to eliminate redundancy. Before I computed each set of MACD values manually, which was repetitive. Now by using a loop over a dictionary of MACD parameters and leveraging the ta library from pandas, the code is computationally more efficient.

In [3]:
def process_metal(metal, df):
    # Copy relevant columns and rename the metal column to 'price'
    df_temp = df[['Dates', metal]].copy()
    df_temp = df_temp.rename(columns={metal: 'price'})
    
    # Filter for years 2020 and 2021 and convert price to numeric
    df_temp = df_temp[df_temp['Dates'].dt.year.isin([2020, 2021])]
    df_temp['price'] = pd.to_numeric(df_temp['price'], errors='coerce')
    
    # Calculate rsi indicators
    df_temp['rsi'] = ta.rsi(df_temp['price'], length=14)

    # Defining MACD parameter sets for calculations
    macd_params = {
        'fast': {'fast': 12, 'slow': 26, 'signal': 9},
        'med':  {'fast': 19, 'slow': 39, 'signal': 9},
        'slow': {'fast': 26, 'slow': 52, 'signal': 9}
    }
    
    # Looping over parameter sets to calculate MACD values
    for key, params in macd_params.items():
        macd_results = ta.macd(df_temp['price'], 
                               fast=params['fast'], 
                               slow=params['slow'], 
                               signal=params['signal'])
        df_temp[f'macd_{key}']  = macd_results[f'MACD_{params["fast"]}_{params["slow"]}_{params["signal"]}']
        df_temp[f'macds_{key}'] = macd_results[f'MACDs_{params["fast"]}_{params["slow"]}_{params["signal"]}']
        df_temp[f'macdh_{key}'] = macd_results[f'MACDh_{params["fast"]}_{params["slow"]}_{params["signal"]}']
    
    # Add metal identifier column
    df_temp['metal'] = metal
    return df_temp

## Async Data Loading, Processing, and Bulk DB Operations

Main function where the processed data is converted to a list of dictionaries and inserted into the database asynchronously. Then the code concurently reads the database record count 5 times using asyncio.gather.

In [4]:
async def main():
    # Load CSV and clean csv data
    df = pd.read_csv('/Users/giacomofiorani/Desktop/Hartree/OilDesk-Intern-Assessment/data/MarketData.csv', header=None)
    df = df.iloc[7:].reset_index(drop=True)
    df.columns = ['Dates', 'COPPER', 'ALUMINIUM', 'ZINC', 'LEAD', 'TIN', 'FUTURE']
    df['Dates'] = pd.to_datetime(df['Dates'], dayfirst=True)
    
    # Delete all existing records asynchronously to prevent duplication
    async with engine.begin() as conn:
        await conn.execute(text("DELETE FROM metal_prices"))
        await conn.commit()
    
    # Defining list of additional columns needed for technical indicators
    additional_cols = [
        'macd_fast', 'macds_fast', 'macdh_fast',
        'macd_med', 'macds_med', 'macdh_med',
        'macd_slow', 'macds_slow', 'macdh_slow',
        'rsi'
    ]

    #asynchronous connect to database and ensure each column exists
    async with engine.connect() as conn:
        for col in additional_cols:
            #ensure column exists in metal_prices table
            result = await conn.execute(
                text("""
                    SELECT COUNT(*) FROM INFORMATION_SCHEMA.COLUMNS
                    WHERE TABLE_NAME = 'metal_prices' AND COLUMN_NAME = :col
                """), {"col": col}
            )
            count = result.scalar()
            #if column missing add it
            if count == 0:
                await conn.execute(text(f"ALTER TABLE metal_prices ADD {col} FLOAT"))
                print(f"{col} successfully added")
            else:
                print(f"{col} already exists")
        #asynchronous commit to database
        await conn.commit()
    print("Database and table setup complete.")
    
    # Process both metals concurrently using Joblib's Parallel with delayed
    processed_dfs = Parallel(n_jobs=2)(delayed(process_metal)(metal, df) for metal in ['COPPER', 'ZINC'])
    final_df = pd.concat(processed_dfs, ignore_index=True)
    final_df = final_df.rename(columns={'Dates': 'date'})
    final_df = final_df.where(final_df.notnull(), None)
    
    # Convert DataFrame to a list of dictionaries for bulk insert
    records = final_df.to_dict(orient='records')
    records = [{k: (None if pd.isna(v) else v) for k, v in rec.items()} for rec in records]
    
    # Asynchronously bulk insert records
    async with AsyncSessionLocal() as session:
        await async_bulk_insert(session, records)
    print("All 2020 and 2021 Copper and Zinc records successfully inserted.")
    
    # Concurrently read from the database 5 times using asyncio.gather()
    read_tasks = [async_read_data() for _ in range(5)]
    results = await asyncio.gather(*read_tasks)
    print("Concurrent read results:", results)

# Entry point: Run the program by awaiting the main() async function when this script is executed.
if __name__ == "__main__":
    await main()

macd_fast already exists
macds_fast already exists
macdh_fast already exists
macd_med already exists
macds_med already exists
macdh_med already exists
macd_slow already exists
macds_slow already exists
macdh_slow already exists
rsi already exists
Database and table setup complete.


2025-03-22 16:34:15,395 - INFO - Starting SQL insert operation.
2025-03-22 16:34:15,878 - INFO - Completed SQL insert operation.


All 2020 and 2021 Copper and Zinc records successfully inserted.
Record count: 1046
Record count: 1046
Record count: 1046
Record count: 1046
Record count: 1046
Concurrent read results: [1046, 1046, 1046, 1046, 1046]
