# OilDesk-Python-Assessment: Question 4

I am using aiosqlite for this question.

In [1]:
import asyncio

import pandas as pd
import aiosqlite

In [2]:
csv_file_path = "../data/MarketData.csv"

# Relevant price data begins in seventh row
df = pd.read_csv(csv_file_path, skiprows=6)

In [3]:
# Easier to filter data with more intuitive names
df.columns = ['Date', 'Copper', 'Aluminum', 'Zinc', 'Lead', 'Tin', 'Oil']

# Removes all columns apart from 'Date', Copper' and 'Zinc'
df = df.loc[:, ['Date', 'Copper', 'Zinc']]

In [4]:
# Converts (day-first) date strings to datetime objects
df['Date'] = pd.to_datetime(df['Date'], format='%d/%m/%Y')

In [5]:
# Filters data to only include the years 2020 & 2021
filtered_data = df.loc[df['Date'].dt.year.isin([2020, 2021])]

# Reindexes DataFrame to make filtered data more readable
filtered_data.reset_index(drop=True, inplace=True)

In [6]:
# Returns the n-day EMA where n = span 
def calculate_ema(prices, span):
    return prices.ewm(span=span, adjust=False).mean()

In [7]:
# Returns the MACD and the signal line 
def calculate_macd(prices, fast_period, slow_period, signal_period):
    
    # Calculates the fast-line
    ema_fast = calculate_ema(prices, fast_period)
    # Calculates the slow-line
    ema_slow = calculate_ema(prices, slow_period)
    
    # Calculates the MACD (difference between fast-line and slow-line)
    macd = ema_fast - ema_slow
    
    # Calculates the signal line
    signal = calculate_ema(macd, signal_period)
    
    # Checks to ensure returned values are numeric pandas Series
    assert isinstance(macd, pd.Series), "MACD is not a pandas Series"
    assert isinstance(signal, pd.Series), "Signal is not a pandas Series"
    assert pd.api.types.is_numeric_dtype(macd), "MACD Series is not numeric"
    assert pd.api.types.is_numeric_dtype(signal), "Signal Series is not numeric"
    
    return macd, signal

In [8]:
# Returns the RSI 
def calculate_rsi(prices, periods=14):
    
    # Calculates the price difference between successive data points
    delta = prices.diff()
    
    # Creates a series of positive changes only
    gain = (delta.where(delta > 0, 0)).fillna(0)
    # Creates a series of negative changes only
    loss = (-delta.where(delta < 0, 0)).fillna(0)
    
    # Calculates the average gain over the window
    avg_gain = gain.rolling(window=periods, min_periods=periods).mean()
    # Calculates the average loss over the window
    avg_loss = loss.rolling(window=periods, min_periods=periods).mean()

    # Calculates the relative strength as (average gain/ average loss) 
    rs = avg_gain / (avg_loss + 1e-10)
    
    # Converts relative strength into RSI
    rsi = 100 - (100 / (1 + rs))
    
    # Checks to ensure returned values are numeric pandas Series
    assert isinstance(rsi, pd.Series), "RSI is not a pandas Series"
    assert pd.api.types.is_numeric_dtype(rsi), "RSI Series is not numeric"
    
    return rsi

In [9]:
# Creates a list of metals to iterate over
metals = ['Copper', 'Zinc']

# Creates an empty dictionary to store the generated data
results = {}

In [10]:
# Loops over the list of metals
for metal in metals:
    
    # Assigns price data in filtered_data for the current metal to the variable prices
    prices = filtered_data[metal]
    
    # Calculates fast MACD and signal line
    macd_fast, signal_fast = calculate_macd(prices, 5, 13, 9)
    # Calculates medium MACD and signal line
    macd_medium, signal_medium = calculate_macd(prices, 12, 26, 9)
    # Calculates slow MACD and signal line
    macd_slow, signal_slow = calculate_macd(prices, 19, 39, 9)
    
    # Calculates and stores RSI
    rsi = calculate_rsi(prices)
    
    # Creates a DataFrame for the current metal and stores it in the results dictionary with metal as the key
    results[metal] = pd.DataFrame({
        'Date': filtered_data['Date'],
        'Metal': metal,
        'Price': prices,
        'MACD_Fast': macd_fast.round(2),
        'Signal_Fast': signal_fast.round(2),
        'MACD_Medium': macd_medium.round(2),
        'Signal_Medium': signal_medium.round(2),
        'MACD_Slow': macd_slow.round(2),
        'Signal_Slow': signal_slow.round(2),
        'RSI': rsi.round(2)
    })

In [11]:
# Prints out the first and last records in the corresponding DataFrame for each metal
for metal, df in results.items():
    print(f"Data for {metal}:\n")
    print(f"{df.head(4)}\n")
    print(f"{df.tail(4)}\n")

Data for Copper:

        Date   Metal   Price  MACD_Fast  Signal_Fast  MACD_Medium  \
0 2020-01-01  Copper  6174.0       0.00         0.00         0.00   
1 2020-01-02  Copper  6188.0       2.67         0.53         1.12   
2 2020-01-03  Copper  6129.5      -7.08        -0.99        -2.69   
3 2020-01-06  Copper  6138.5     -10.60        -2.91        -4.92   

   Signal_Medium  MACD_Slow  Signal_Slow  RSI  
0           0.00       0.00         0.00  NaN  
1           0.22       0.70         0.14  NaN  
2          -0.36      -1.63        -0.21  NaN  
3          -1.27      -3.16        -0.80  NaN  

          Date   Metal   Price  MACD_Fast  Signal_Fast  MACD_Medium  \
519 2021-12-28  Copper  9568.0      28.48         9.00         0.82   
520 2021-12-29  Copper  9680.5      47.03        16.61        12.38   
521 2021-12-30  Copper  9691.5      57.48        24.78        22.17   
522 2021-12-31  Copper  9720.5      66.25        33.08        31.91   

     Signal_Medium  MACD_Slow  Signal_S

In [12]:
formatted_results = {}

for metal, df in results.items():
    
    # Converts 'Date' column to string formatted for SQL
    df['Date'] = df['Date'].dt.strftime('%Y-%m-%d')
    
    # Stores the formatted DataFrame in the new dictionary
    formatted_results[metal] = df

In [13]:
# Creates an empty list to store the tuples from the flattened DataFrames
data_to_insert = []
loop_count = 0

for metal, df in formatted_results.items():
    print(f"{metal} DataFrame size: {df.shape}")
    # Append flattened data as tuples to the list
    data_to_insert.extend(df[['Date', 
                              'Metal', 
                              'Price', 
                              'MACD_Fast', 
                              'Signal_Fast', 
                              'MACD_Medium', 
                              'Signal_Medium', 
                              'MACD_Slow', 
                              'Signal_Slow', 
                              'RSI']].itertuples(index=False, name=None))
    
print(f"Total records: {len(data_to_insert)}\n")    


Copper DataFrame size: (523, 10)
Zinc DataFrame size: (523, 10)
Total records: 1046



## 1. Modify Question 3 to write data to the database asynchronously

In [14]:
# Checks if metal_analysis table from Question 2 exists
async def check_table_exists(db_name, table_name):
    async with aiosqlite.connect(db_name) as db:
        cursor = await db.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?;", (table_name,))
        result = await cursor.fetchone()
        await cursor.close()
        return result is not None

table_exists = await check_table_exists('metal_prices.db', 'metal_analysis')
if table_exists:
    print("Table exists.")
else:
    print("Table does not exist.")

Table exists.


In [15]:
# Deletes all records from metal_analysis
async def clear_table_data(db_name, table_name):
    async with aiosqlite.connect(db_name) as db:
        await db.execute(f'DELETE FROM {table_name}')
        await db.commit()
        
await clear_table_data('metal_prices.db', 'metal_analysis')

In [16]:
# Checks how many records in metal_analysis if it is not empty
async def read_from_table(db_name, table_name):
    async with aiosqlite.connect(db_name) as db:
        cursor = await db.execute(f'SELECT COUNT(*) FROM {table_name}')
        count = await cursor.fetchone()
        await cursor.close()
        return count[0]

rows = await read_from_table('metal_prices.db', 'metal_analysis')
if not rows:
    print("The table is empty.")
else:
    print(f"The table is not empty. It contains {rows} records.")

The table is empty.


In [17]:
# Inserts each record in list of tuples into metal_analysis
async def write_to_db(db_name, data):
    async with aiosqlite.connect(db_name) as db:
        await db.execute('PRAGMA foreign_keys = ON')
        query = '''
        INSERT INTO metal_analysis (Date, Metal, Price, MACD_Fast, Signal_Fast, MACD_Medium, Signal_Medium, MACD_Slow, Signal_Slow, RSI) 
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        '''
        await db.executemany(query, data)
        await db.commit()

await write_to_db('metal_prices.db', data_to_insert)

## 2. Read from the database 5 times concurrantly using async (hint: asyncio.gather())

In [18]:
# Fetches all records from a table
async def fetch_data(db_name, table_name):
    async with aiosqlite.connect(db_name) as db:
        cursor = await db.execute(f'SELECT * FROM {table_name}')
        rows = await cursor.fetchall()
        await cursor.close()
        return rows

In [19]:
# Runs a list of five tasks concurrently
async def perform_concurrent_reads(db_name, table_name):
    tasks = [fetch_data(db_name, table_name) for _ in range(5)]
    results = await asyncio.gather(*tasks)
    return results

In [20]:
concurrent_results = await perform_concurrent_reads('metal_prices.db', 'metal_analysis')

headers = ['Date', 'Metal', 'Price', 'MACD_Fast', 'Signal_Fast', 'MACD_Medium', 'Signal_Medium', 'MACD_Slow', 'Signal_Slow', 'RSI']
print(" | ".join(headers))
print("\n")

for result_set in concurrent_results:
    print("Concurrent Read Output:\n")
    for row in result_set:
        formatted_row = " ".join(f"{value}" for value in row)
        print(formatted_row)
    print()  

Date | Metal | Price | MACD_Fast | Signal_Fast | MACD_Medium | Signal_Medium | MACD_Slow | Signal_Slow | RSI


Concurrent Read Output:

2020-01-01 Copper 6174.0 0.0 0.0 0.0 0.0 0.0 0.0 None
2020-01-02 Copper 6188.0 2.67 0.53 1.12 0.22 0.7 0.14 None
2020-01-03 Copper 6129.5 -7.08 -0.99 -2.69 -0.36 -1.63 -0.21 None
2020-01-06 Copper 6138.5 -10.6 -2.91 -4.92 -1.27 -3.16 -0.8 None
2020-01-07 Copper 6149.0 -10.1 -4.35 -5.77 -2.17 -3.93 -1.43 None
2020-01-08 Copper 6178.0 -3.82 -4.24 -4.07 -2.55 -3.12 -1.77 None
2020-01-09 Copper 6180.0 0.34 -3.33 -2.52 -2.54 -2.32 -1.88 None
2020-01-10 Copper 6198.0 6.13 -1.44 0.15 -2.0 -0.71 -1.65 None
2020-01-13 Copper 6290.0 26.67 4.19 9.59 0.31 5.26 -0.26 None
2020-01-14 Copper 6302.0 39.42 11.23 17.83 3.82 10.94 1.98 None
2020-01-15 Copper 6287.0 41.97 17.38 22.88 7.63 15.0 4.58 None
2020-01-16 Copper 6277.5 39.62 21.83 25.82 11.27 17.91 7.25 None
2020-01-17 Copper 6273.0 35.54 24.57 27.48 14.51 20.09 9.82 None
2020-01-20 Copper 6259.0 28.84 25.42 27.3