In [1]:
"""The unit test will validate each of the methods in the class repositories
    AlphaVantage:
    -------------
        get_historical data

    SQLRepository:
    --------------
        insert_table
        read_table

"""

'The unit test will validate each of the methods in the class repositories\n    AlphaVantage:\n    -------------\n        get_historical data\n\n    SQLRepository:\n    --------------\n        insert_table\n        read_table\n\n'

In [4]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [12]:
import sqlite3
import pandas as pd
from config import settings
from data import AlphaVantageApi
from data import SQLRepository

In [13]:
ticker = "IBM"

In [14]:
api = AlphaVantageApi()
ibm_data = api.get_historical_data(ticker=ticker, output_size="full")

In [15]:
#Inspect if the data passed the unit test

#1. Does daily data returned a dataFrame?
assert isinstance(ibm_data, pd.DataFrame)

#2. Does the dataFrame has 4 columns?
assert ibm_data.shape[1] == 4

#3. Does the dataFrame index has a datetimeIndex?
assert isinstance(ibm_data.index, pd.DatetimeIndex)

#4. Is the index name "date"?
assert ibm_data.index.name == "date"

#5. Does the column has correct name?
assert ibm_data.columns.to_list() == ["open", "high", "low", "close"]

#6. Are columns correct data type?
assert all(ibm_data.dtypes == float)

In [16]:
connection = sqlite3.connect(settings.db_name, check_same_thread = False)

In [17]:
repo = SQLRepository(connection = connection)

#1. Does repo has connection attribute
assert hasattr(repo, "connection")

#2. Does the repo connection attribute an instance of sqlite connection
assert isinstance(repo.connection, sqlite3.Connection)


In [18]:
response = repo.insert_table(table_name=ticker, records=ibm_data, if_exists = "replace")

#Did the method return a dict?
assert isinstance(response, dict)

#Are the keys of the dictionary correct?
assert sorted(list(response.keys())) == ["Number of records inserted", "Transaction successful"]

In [19]:
df_ibm = repo.read_table(table_name=ticker, limit = 50)

#Does the data returned a DataFrame
assert isinstance(df_ibm, pd.DataFrame)

#Does the DataFrame has 4 columns and 50 rows?
assert df_ibm.shape == (50, 4)

#Are the columns name correct?
assert df_ibm.columns.to_list() == ["open", "high", "low", "close"]

#Does the index an instance of datetimeIndex
assert isinstance(df_ibm.index, pd.DatetimeIndex)

#Is the index name "date"
assert df_ibm.index.name == "date"

#Are the columns data type correct?
assert all(df_ibm.dtypes == float)

In [20]:
from model import GarchModel
garch_model = GarchModel(ticker=ticker, repo=repo, model_directory=settings.model_directory, use_new_data=False)
data = garch_model.wrangle_data(n_observations=2500)
model = garch_model.fit(p=1, q=1)
prediction_formatted = garch_model.predict_volatility(horizon=5)

# Is `prediction_formatted` a dictionary?
assert isinstance(prediction_formatted, dict)

# Are keys correct data type?
assert all(isinstance(k, str) for k in prediction_formatted.keys())

# Are values correct data type
assert all(isinstance(v, float) for v in prediction_formatted.values())

prediction_formatted

{'2023-01-09T00:00:00': 1.3529702898849838,
 '2023-01-10T00:00:00': 1.3644020303791737,
 '2023-01-11T00:00:00': 1.3746162560683937,
 '2023-01-12T00:00:00': 1.3837508069621893,
 '2023-01-13T00:00:00': 1.3919262106309414}