

# Building Our Data Module


| URL | Component |
|:--- | :-------- |
| `https://www.alphavantage.co` | This is the **hostname** or **base URL**. It is the web address for the server where we can get our stock data. |
| `/query` | This is the **path**. Most APIs have lots of different operations they can do. The path is the name of the particular operation we want to access. |
| `?` |  This question mark denotes that everything that follows in the URL is a **parameter**. Each parameter is separated by a `&` character. These parameters provide additional information that will change the operation's behavior. This is similar to the way we pass **arguments** into functions in Python. |
| `function=TIME_SERIES_DAILY` | Our first parameter uses the `function` keyword. The value is `TIME_SERIES_DAILY`. In this case, we're asking for **daily** stock data. |
| `symbol=IBM` | Our second parameter uses the `symbol` keyword. So we're asking for a data on a stock whose [**ticker symbol**](https://en.wikipedia.org/wiki/Ticker_symbol) is `IBM`. |
| `apikey=demo` | Much in the same way you need a password to access some websites, an **API key** or **API token** is the password that you'll use to access the API. |

## AlphaVantage API Class
 there is alot that needs to be clarified concerning accessing APIs through a request. 

```python

 ticker = "AMBUJACEM.BSE"
output_size = "compact"
data_type = "json"

url = (
    "https://learn-api.wqu.edu/1/data-services/alpha-vantage/query?"
    "function=TIME_SERIES_DAILY&"
    f"symbol={ticker}&"
    f"outputsize={output_size}&"
    f"datatype={data_type}&"
    f"apikey={settings.alpha_api_key}"
)

print("url type:", type(url))
print(url)
```

In the data module, we create a class definition for AlphaVantageAPI. For now, making sure that it has an __init__ method that attaches the API key as the attribute __api_key. 

In [None]:
# Import `AlphaVantageAPI`

from data import AlphaVantageAPI
# Create instance of `AlphaVantageAPI` class
av = AlphaVantageAPI()

print("av type:", type(av))

Create a get_daily method for your AlphaVantageAPI class. Once you're done, use the cell below to fetch the stock data for the renewable energy company Suzlon and assign it to the DataFrame df_suzlon.

In [None]:
# Define Suzlon ticker symbol
ticker = "SUZLON.BSE"

# Use your `av` object to get daily data
df_suzlon = av.get_daily(ticker=ticker)

print("df_suzlon type:", type(df_suzlon))
print("df_suzlon shape:", df_suzlon.shape)
df_suzlon.head()

## Test Driven Development 
validating the  get_daily method.

In [None]:
# Does `get_daily` return a DataFrame?
assert isinstance(df_suzlon, pd.DataFrame)

# Does DataFrame have 5 columns?
assert df_suzlon.shape[1]==5

# Does DataFrame have a DatetimeIndex?
assert isinstance(df_suzlon.index, pd.DatetimeIndex)

# Is the index name "date"?
assert df_suzlon.index.name== "date"

Further  tests for the output of  get_daily method

In [None]:
# Does DataFrame have correct column names?

assert df_suzlon.columns.to_list()==['open', 'high', 'low', 'close', 'volume']
# Are columns correct data type?
assert all(df_suzlon.dtypes==float)

# SQL Repository Class
It wouldn't be efficient if our application needed to get data from the AlphaVantage API every time we wanted to explore our data or build a model, so we'll need to store our data in a database. Because our data is highly structured (each DataFrame we extract from AlphaVantage is always going to have the same five columns), it makes sense to use a SQL database.



In [None]:
connection = sqlite3.connect(database=settings.db_name, check_same_thread=False)

print("connection type:", type(connection))

Tests for SQLRepository class

In [None]:
# Import class definition
from data import SQLRepository
# Create instance of class
repo = SQLRepository(connection=connection)

# Does `repo` have a "connection" attribute?
assert hasattr(repo, "connection")

# Is the "connection" attribute a SQLite `Connection`?
assert isinstance(repo.connection, sqlite3.Connection)

 Create a definition for your SQLRepository class. 

In [None]:
# we are working on the data module
# n_inserted= records.to_sql(
#     name=table_name, con=self.connection, if_exists=if_exists
# )
# return {
#     "transaction_successful": True,
#     "records_inserted": n_inserted
# }

# TDD

In [None]:
response = repo.insert_table(table_name=ticker, records=df_suzlon, if_exists="replace")

# Does your method return a dictionary?
assert isinstance(response, dict)

# Are the keys of that dictionary correct?
assert sorted(list(response.keys())) == ["records_inserted", "transaction_successful"]

If our method is passing the assert statements, we know it's returning a record of the database transaction, but we still need to check whether the data has actually been added to the database.

After inserting the data in the database, we can begin to explore them

In [None]:

%sql sqlite://///home/path-to-the-SQL-database-fil/stocks.sqlite

%sql: This is a Jupyter magic command that allows you to execute SQL queries directly in a cell. When you begin a line with %sql, the rest of the line will be treated as an SQL query.

sqlite:///: This is the connection string indicating the type of database to connect to. In this case, it specifies that the database is of type SQLite. SQLite is a lightweight, self-contained database engine that is often used for local development and testing.

/home/path-to-the-SQL-database-file/stocks.sqlite: This is the path to the SQLite database file that the code wants to connect to. In this example, the file is located at the given path. The database file name is stocks.sqlite

In [None]:
%%sql
SELECT *
FROM 'SUZLON.BSE'
LIMIT 5

First, write a SQL query to get all the Suzlon data. Then use pandas to extract the data from the database and read it into a DataFrame, names df_suzlon_test.

In [None]:
sql = "SELECT * FROM 'SUZLON.BSE'"
df_suzlon_test = pd.read_sql(
    sql=sql, con=connection, parse_dates=["date"], index_col="date"
)

print("df_suzlon_test type:", type(df_suzlon_test))
print()
print(df_suzlon_test.info())
df_suzlon_test.head()

Now that we know how to read a table from our database, let's turn our code into a proper function. But since we're doing backwards designs, we need to start with our tests.

## TDD for the  read_table function. 

In [None]:
# Assign `read_table` output to `df_suzlon`
df_suzlon = read_table(table_name="SUZLON.BSE", limit=2500)  # noQA F821

# Is `df_suzlon` a DataFrame?
assert isinstance (df_suzlon, pd.DataFrame)

# Does it have a `DatetimeIndex`?
assert isinstance(df_suzlon.index, pd.DatetimeIndex)

# Is the index named "date"?
assert df_suzlon.index.name=="date"

# Does it have 2,500 rows and 5 columns?
assert df_suzlon.shape==(2500, 5)
# Are the column names correct?
assert df_suzlon.columns.to_list()==['open', 'high', 'low', 'close', 'volume']
# Are the column data types correct?

assert all(df_suzlon.dtypes==float)
# Print `df_suzlon` info
print("df_suzlon shape:", df_suzlon.shape)
print()
print(df_suzlon.info())
df_suzlon.head()

# lets write the read_table function 

In [None]:
def read_table(table_name, limit=None):

    """Read table from database.

    Parameters
    ----------
    table_name : str
        Name of table in SQLite database.
    limit : int, None, optional
        Number of most recent records to retrieve. If `None`, all
        records are retrieved. By default, `None`.

    Returns
    -------
    pd.DataFrame
        Index is DatetimeIndex "date". Columns are 'open', 'high',
        'low', 'close', and 'volume'. All columns are numeric.
    """
    # Create SQL query (with optional limit)
    if limit:
        sql = f"SELECT * FROM '{table_name}' LIMIT {limit}"  # Added space after table_name and before LIMIT keyword
    else:
        sql = f"SELECT * FROM '{table_name}'"

    # Retrieve data, read into DataFrame
    df = pd.read_sql(
        sql=sql, con=connection, parse_dates=["date"], index_col="date"
    )

    # Return DataFrame
    return df


## TDD for read_table Method in the AlphaVantageAPI class

In [None]:
# Assign `read_table` output to `df_suzlon`
df_suzlon = repo.read_table(table_name="SUZLON.BSE", limit=2500)  # noQA F821

# Is `df_suzlon` a DataFrame?
assert isinstance (df_suzlon, pd.DataFrame)

# Does it have a `DatetimeIndex`?
assert isinstance(df_suzlon.index, pd.DatetimeIndex)

# Is the index named "date"?
assert df_suzlon.index.name=="date"

# Does it have 2,500 rows and 5 columns?
assert df_suzlon.shape==(2500, 5)
# Are the column names correct?
assert df_suzlon.columns.to_list()==['open', 'high', 'low', 'close', 'volume']
# Are the column data types correct?

assert all(df_suzlon.dtypes==float)
# Print `df_suzlon` info
print("df_suzlon shape:", df_suzlon.shape)
print()
print(df_suzlon.info())
df_suzlon.head()

### LETS START USING THE instances of the AlphaVantageAPI and SQLRepository classes(av and repo, respectively) to get the stock data for Ambuja Cement and read it into the database.

In [None]:
ticker = "AMBUJACEM.BSE"

# Get Ambuja data using `av`
ambuja_records = av.get_daily(ticker=ticker)

# Insert `ambuja_records` database using `repo`
response = repo.insert_table(
    table_name=ticker, records=ambuja_records, if_exists="replace"
)

response

### check the read_table method in the  SQLRepository class, extract the most recent 2,500 rows of data for Ambuja Cement from the database and assign the result to df_ambuja.

In [None]:
ticker = "AMBUJACEM.BSE"
df_ambuja = repo.read_table(table_name=ticker, limit=2500)

print("df_ambuja type:", type(df_ambuja))
print("df_ambuja shape:", df_ambuja.shape)
df_ambuja.head()

# EDA

exploring the data to check for validity and if they are fit for use: for this i have ploted the clossing price vs 

In [None]:
fig, ax = plt.subplots(figsize=(15, 6))
# Plot `df_ambuja` closing price
df_ambuja["close"].plot(ax=ax, label="AMBUJACEM", color="C1")

# Label axes
plt.xlabel("date")
plt.ylabel("Closing Price")

# Add legend
plt.legend

Below is a plot that shows the closing prices of df_suzlon and df_ambuja

In [None]:
fig, ax = plt.subplots(figsize=(15, 6))
# Plot `df_suzlon` and `df_ambuja`
df_suzlon["close"].plot(ax=ax, label="SUZLON")
df_ambuja["close"].plot(ax=ax, label="AMBUJACEM")
# Label axes
plt.xlabel("Date")
plt.ylabel("Closing Price")

# Add legend
plt.legend;


One way in which investors compare stocks is by looking at their returns instead. A return is the change in value in an investment, represented as a percentage. So let's look at the daily returns for our two stocks.

In [None]:
# Sort DataFrame ascending by date
df_ambuja.sort_index(ascending=True, inplace=True)
# Create "return" column

df_ambuja["return"]=df_ambuja["close"].pct_change()*100
print("df_ambuja shape:", df_ambuja.shape)
print(df_ambuja.info())
df_ambuja.head()

lets do for suzlon as well

In [None]:
# Sort DataFrame ascending by date
df_suzlon.sort_index(ascending=True, inplace=True)
# Create "return" column

df_suzlon["return"]=df_suzlon["close"].pct_change()*100
print("df_ambuja shape:", df_suzlon.shape)
print(df_suzlon.info())
df_suzlon.head()

Plot the returns for df_suzlon and df_ambuja. Be sure to label your axes and use legend.

In [None]:
fig, ax = plt.subplots(figsize=(15, 6))
# Plot `df_suzlon` and `df_ambuja`
df_suzlon["return"].plot(ax=ax, label="SUZLON")
df_ambuja["return"].plot(ax=ax, label="AMBUJACEM")
# Label axes
plt.xlabel("Date")
plt.ylabel("Daily Return")
# Add legend
plt.legend();

when we can plot the two data comparatively in one plot, we achieve great insights about them, especially how they compare in terms of spread. This is what investors look at are often refer to as volatility.

# BUILDING THE garch model for Predicting Volatility

In [None]:
import sqlite3

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from arch import arch_model
from config import settings
from data import SQLRepository
from IPython.display import VimeoVideo
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

Let's connect to the database  and then instantiate the SQLRepository named repo to interact with that database.

In [None]:
connection = sqlite3.connect(settings.db_name, check_same_thread=False)
repo = SQLRepository(connection=connection)

print("repo type:", type(repo))
print("repo.connection type:", type(repo.connection))

Lets Pull the most recent 2,500 rows of data for Ambuja Cement from the database. Assign the results to the variable df_ambuja.

we can also Inspect a DataFrame using shape, info, and head in pandas.

In [None]:
df_ambuja = repo.read_table(table_name="AMBUJACEM.BSE", limit=2500)

print("df_ambuja type:", type(df_ambuja))
print("df_ambuja shape:", df_ambuja.shape)
df_ambuja.head()

 Create a wrangle_data function whose output is the returns for a stock stored in your database. Use the docstring as a guide and the assert statements in the following code block to test your function.

In [None]:
def wrangle_data(ticker, n_observations):

    """Extract table data from database. Calculate returns.

    Parameters
    ----------
    ticker : str
        The ticker symbol of the stock (also table name in database).

    n_observations : int
        Number of observations to return.

    Returns
    -------
    pd.Series
        Name will be `"return"`. There will be no `NaN` values.
    """
    # Get table from database
    df=repo.read_table(table_name=ticker, limit=n_observations+1)


    # Sort DataFrame ascending by date
    df.sort_index(ascending=True, inplace=True)

    # Create "return" column
    df["return"]=df["close"].pct_change() *100


    # Return returns
    return df["return"].dropna()

When you run the cell below to test your function, you'll also create a Series y_ambuja that we'll use to train our model.

In [None]:
y_ambuja = wrangle_data(ticker="AMBUJACEM.BSE", n_observations=2500)

# Is `y_ambuja` a Series?
assert isinstance(y_ambuja, pd.Series)

# Are there 2500 observations in the Series?
assert len(y_ambuja) == 2500

# Is `y_ambuja` name "return"?
assert y_ambuja.name == "return"

# Does `y_ambuja` have a DatetimeIndex?
assert isinstance(y_ambuja.index, pd.DatetimeIndex)

# Is index sorted ascending?
assert all(y_ambuja.index == y_ambuja.sort_index(ascending=True).index)

# Are there no `NaN` values?
assert y_ambuja.isnull().sum() == 0

y_ambuja.head()

# Explore
Let's recreate the volatility time series plot we made in the last lesson so that we have a visual aid to talk about what volatility is.

In [None]:
fig, ax = plt.subplots(figsize=(15, 6))

# Plot returns for `df_suzlon` and `df_ambuja`
y_suzlon.plot(ax=ax, label="SUZLON")
y_ambuja.plot(ax=ax, label="AMBUJACEM")

# Label axes
plt.xlabel("Date")
plt.ylabel("Return")

# Add legend
plt.legend();

The above plot shows how returns change over time. This may seem like a totally new concept, but if we visualize them without considering time, things will start to look familiar.

In [None]:
# Create histogram of `y_ambuja`, 25 bins
plt.hist(y_ambuja, bins=25)
# Add axis labels

plt.xlabel("Daily Returns")
plt.ylabel("Frequency [Count]")

# Add title
plt.title("Distribution of Ambuja Cement Daily Returns");

Volatility is the measure of the spread of these returns around the mean. In other words, volatility in finance is the same thing at standard deviation in statistics.

Let's start by measuring the daily volatility of our two stocks. Since our data frequency is also daily, this will be exactly the same as calculating the standard deviation

In [None]:
suzlon_daily_volatility = y_suzlon.std()
ambuja_daily_volatility = y_ambuja.std()

print("Suzlon Daily Volatility:", suzlon_daily_volatility)
print("Ambuja Daily Volatility:", ambuja_daily_volatility)

Looks like Suzlon is more volatile than Ambuja. This reinforces what we saw in our time series plot, where Suzlon returns have a much wider spread.

While daily volatility is useful, investors are also interested in volatility over other time periods — like annual volatility. Keep in mind that a year isn't 365 days for a stock market, though. After excluding weekends and holidays, most markets have only 252 trading days.

Calculate the annual volatility for Suzlon and Ambuja, assigning the results to suzlon_annual_volatility and ambuja_annual_volatility, respectively.

In [None]:
suzlon_annual_volatility = suzlon_daily_volatility* np.sqrt(252)
ambuja_annual_volatility = ambuja_daily_volatility * np.sqrt(252)

print("Suzlon Annual Volatility:", suzlon_annual_volatility)
print("Ambuja Annual Volatility:", ambuja_annual_volatility)

Calculate the rolling volatility for y_ambuja, using a 50-day window. Assign the result to ambuja_rolling_50d_volatility.

What's a rolling window?
Do a rolling window calculation in pandas.

In [None]:
fig, ax = plt.subplots(figsize=(15, 6))

# Plot `y_ambuja`

y_ambuja.plot(ax=ax, label="daily return")
# Plot `ambuja_rolling_50d_volatility`
ambuja_rolling_50d_volatility.plot(ax=ax, label="50d rolling volatility", linewidth=3 )

# Add x-axis label
plt.xlabel("Date")

# Add legend
plt.legend();

Here we can see that volatility goes up when the returns change drastically — either up or down. For instance, we can see a big increase in volatility in May 2020, when there were several days of large negative returns. We can also see volatility go down in August 2022, when there are only small day-to-day changes in returns.

This plot reveals a problem. We want to use returns to see if high volatility on one day is associated with high volatility on the following day. But high volatility is caused by large changes in returns, which can be either positive or negative. How can we assess negative and positive numbers together without them canceling each other out? One solution is to take the absolute value of the numbers, which is what we do to calculate performance metrics like mean absolute error. The other solution, which is more common in this context, is to square all the values.

Create a time series plot of the squared returns in y_ambuja. Don't forget to label your axes.

In [None]:
fig, ax = plt.subplots(figsize=(15, 6))

# Plot squared returns

(y_ambuja**2).plot(ax=ax)
# Add axis labels
plt.xlabel("Date")
plt.ylabel("Squared Returns");

Perfect! Now it's much easier to see that (1) we have periods of high and low volatility, and (2) high volatility days tend to cluster together. This is a perfect situation to use a GARCH model.

A GARCH model is sort of like the ARMA model we learned about in Lesson 3.4. It has a p parameter handling correlations at prior time steps and a q parameter for dealing with "shock" events. It also uses the notion of lag. To see how many lags we should have in our model, we should create an ACF and PACF plot — but using the squared returns.

Create an ACF plot of squared returns for Ambuja Cement. Be sure to label your x-axis "Lag [days]" and your y-axis "Correlation Coefficient".

In [None]:
fig, ax = plt.subplots(figsize=(15, 6))

# Create ACF of squared returns
plot_acf(y_ambuja**2, ax=ax)

# Add axis labels
plt.xlabel("Lag [Days]")
plt.ylabel("Correlation Coefficient");

Create a PACF plot of squared returns for Ambuja Cement. Be sure to label your x-axis "Lag [days]" and your y-axis "Correlation Coefficient".

In [None]:
fig, ax = plt.subplots(figsize=(15, 6))

# Create PACF of squared returns
plot_pacf(y_ambuja**2, ax=ax)

# Add axis labels
plt.xlabel("Lag [Days]")
plt.ylabel("Correlation Coefficient");

# Split
The last thing we need to do before building our model is to create a training set. Note that we won't create a test set here. Rather, we'll use all of y_ambuja to conduct walk-forward validation after we've built our model.

Create a training set y_ambuja_train that contains the first 80% of the observations in y_ambuja.

In [None]:
cutoff_test = int(len(y_ambuja)*0.8)
y_ambuja_train = y_ambuja.iloc[:cutoff_test]

print("y_ambuja_train type:", type(y_ambuja_train))
print("y_ambuja_train shape:", y_ambuja_train.shape)
y_ambuja_train.tail()

# Build Model


In [None]:
# Build and train model
model = arch_model(
    y_ambuja_train, 
    p=2,
    q=2, 
    rescale=False
    
).fit(disp=0)
print("model type:", type(model))

# Show model summary
model.summary()

Create a time series plot with the Ambuja returns and the conditional volatility for your model. Be sure to include axis labels and add a legend.

In [None]:
fig, ax = plt.subplots(figsize=(15, 6))

# Plot `y_ambuja_train`

y_ambuja_train.plot(ax=ax, label="Ambuja Daily Returns")
# Plot conditional volatility * 2
(2* model.conditional_volatility).plot(
    ax=ax, color="C1", label="2 SD Conditional Volatility", linewidth=3
)

# Plot conditional volatility * -2

(-2* model.conditional_volatility).plot(
    ax=ax, color="C1", label="2 SD Conditional Volatility", linewidth=3
)
# Add axis labels
plt.xlabel("Date")


# Add legend
plt.legend();

Create a time series plot of the standardized residuals for your model. Be sure to include axis labels and a legend.

In [None]:
fig, ax = plt.subplots(figsize=(15, 6))

# Plot standardized residuals
model.std_resid.plot(ax=ax, label="Standardized Residuals")

# Add axis labels

plt.xlabel("Date")

# Add legend
plt.legend();

Create a histogram with 25 bins of the standardized residuals for your model. Be sure to label your axes and use a title.

What's a histogram?
Create a histogram using Matplotlib.

In [None]:
# Create histogram of standardized residuals, 25 bins
plt.hist(model.std_resid, bins=25)

# Add axis labels
plt.xlabel("Standardized Residuals")

plt.ylabel("Frequency[count]")
# Add title
plt.title("Distribution of standard residuals");

Create an ACF plot of the square of your standardized residuals. Don't forget axis labels!

In [None]:
fig, ax = plt.subplots(figsize=(15, 6))

# Create ACF of squared, standardized residuals
plot_acf(model.std_resid**2, ax=ax)

# Add axis labels

plt.xlabel("Lag [days]")
plt.ylabel("Correlation Coefficient")

Create a one-day forecast from your model and assign the result to the variable one_day_forecast.

In [None]:
one_day_forecast = model.forecast(horizon=1, reindex=False).variance

print("one_day_forecast type:", type(one_day_forecast))
one_day_forecast

In [None]:
# Create empty list to hold predictions
predictions = []

# Calculate size of test data (20%)
test_size = int(len(y_ambuja) * 0.2)

# Walk forward
for i in range(test_size):
    # Create test data
    y_train = y_ambuja.iloc[: -(test_size - i)]

    # Train model
    model = arch_model(y_train, p=1, q=1, rescale=False).fit(disp=0)

    # Generate next prediction (volatility, not variance)
    next_pred = model.forecast(horizon=1, reindex=False).variance.iloc[0,0]**0.5

    # Append prediction to list
    predictions.append(next_pred)

# Create Series from predictions list
y_test_wfv = pd.Series(predictions, index=y_ambuja.tail(test_size).index)

print("y_test_wfv type:", type(y_test_wfv))
print("y_test_wfv shape:", y_test_wfv.shape)
y_test_wfv.head()

In [None]:
fig, ax = plt.subplots(figsize=(15, 6))

# Plot returns for test data
y_ambuja.tail(test_size).plot(ax=ax, label="Ambuja Return")

# Plot volatility predictions * 2
(2 * y_test_wfv).plot(ax=ax, c="C1", label="2 SD Predicted Volatility")

# Plot volatility predictions * -2
(-2 * y_test_wfv).plot(ax=ax, c="C1")

# Label axes
plt.xlabel("Date")
plt.ylabel("Return")

# Add legend
plt.legend();

In [None]:
# Generate 5-day volatility forecast
prediction = model.forecast(horizon=5, reindex=False).variance ** 0.5
print(prediction)

# Calculate forecast start date
start = prediction.index[0]+pd.DateOffset(days=1)

# Create date range
prediction_dates = pd.bdate_range(start=start, periods=prediction.shape[1])

# Create prediction index labels, ISO 8601 format
prediction_index = [d.isoformat() for d in prediction_dates]

print("prediction_index type:", type(prediction_index))
print("prediction_index len:", len(prediction_index))
prediction_index[:3]

In [None]:
def clean_prediction(prediction):

    """Reformat model prediction to JSON.

    Parameters
    ----------
    prediction : pd.DataFrame
        Variance from a `ARCHModelForecast`

    Returns
    -------
    dict
        Forecast of volatility. Each key is date in ISO 8601 format.
        Each value is predicted volatility.
    """
   
    # Calculate forecast start date
    start = prediction.index[0]+pd.DateOffset(days=1)

    # Create date range
    prediction_dates = pd.bdate_range(start=start, periods=prediction.shape[1])

    # Create prediction index labels, ISO 8601 format
    prediction_index = [d.isoformat() for d in prediction_dates]

    # Extract predictions from DataFrame, get square root
    data=prediction.values.flatten()**0.5
    # Combine `data` and `prediction_index` into Series
    prediction_formatted= pd.Series(data, index=prediction_index)


    # Return Series as dictionary
    return prediction_formatted.to_dict()

In [None]:
prediction = model.forecast(horizon=10, reindex=False).variance
prediction_formatted = clean_prediction(prediction)

# Is `prediction_formatted` a dictionary?
assert isinstance(prediction_formatted, dict)

# Are keys correct data type?
assert all(isinstance(k, str) for k in prediction_formatted.keys())

# Are values correct data type
assert all(isinstance(v, float) for v in prediction_formatted.values())

prediction_formatted

# Model Deployment

at this stage We have a module for getting and storing our data. We have the code to train model and clean its predictions. Lets put them all   together and deploy the model with an API that others can use to train their own models and predict volatility. 

 Create a SQLRepository named repo.

Open a connection to a SQL database using sqlite3.

In [None]:
connection = sqlite3.connect(settings.db_name, check_same_thread=False)
repo = SQLRepository(connection=connection)

print("repo type:", type(repo))
print("repo.connection type:", type(repo.connection))

In the `model` module, create a definition for a `GarchModel` model class. For now, it should only have an `__init__` method. Use the docstring as a guide. 

In [None]:
from model import GarchModel

# Instantiate a `GarchModel`
gm_ambuja = GarchModel(ticker="AMBUJACEM.BSE", repo=repo, use_new_data=False)

# Does `gm_ambuja` have the correct attributes?
assert gm_ambuja.ticker == "AMBUJACEM.BSE"
assert gm_ambuja.repo == repo
assert not gm_ambuja.use_new_data
assert gm_ambuja.model_directory == settings.model_directory

In [None]:
# Instantiate `GarchModel`, use new data
model_shop = GarchModel(ticker="SHOPERSTOP.BSE", repo=repo, use_new_data=True)

# Check that model doesn't have `data` attribute yet
assert not hasattr(model_shop, "data")

# Wrangle data
model_shop.wrangle_data(n_observations=1000)

# Does model now have `data` attribute?
assert hasattr(model_shop, "data")

# Is the `data` a Series?
assert isinstance(model_shop.data, pd.Series)

# Is Series correct shape?
assert model_shop.data.shape == (1000,)

model_shop.data.head()

In [None]:
# def wrangle_data(self, n_observations):

#         """Extract data from database (or get from AlphaVantage), transform it
#         for training model, and attach it to `self.data`.

#         Parameters
#         ----------
#         n_observations : int
#             Number of observations to retrieve from database

#         Returns
#         -------
#         None
#         """
#         # Add new data to database if required
#         if self.use_new_data:
#             #instantiate an API class
#             api=AlphaVantageAPI()
#             #Get Data
#             new_data=api.get_daily(ticker=self.ticker)
#             #insert data into repo
#             self.repo.insert_table(
#                 table_name=self.ticker, records=new_data, if_exists="replace"
#             )

#         # Pull data from SQL database
#         df=self.repo.read_table(table_name=self.ticker, limit=n_observations+1)
#         # Clean data, attach to class as `data` attribute
#         df.sort_index(ascending=True, inplace=True)
#         df["return"]=df["close"].pct_change() *100
#         self.data = df["return"].dropna()

In [None]:
# Instantiate `GarchModel`, use new data
model_shop = GarchModel(ticker="SHOPERSTOP.BSE", repo=repo, use_new_data=True)

# Check that model doesn't have `data` attribute yet
assert not hasattr(model_shop, "data")

# Wrangle data
model_shop.wrangle_data(n_observations=1000)

# Does model now have `data` attribute?
assert hasattr(model_shop, "data")

# Is the `data` a Series?
assert isinstance(model_shop.data, pd.Series)

# Is Series correct shape?
assert model_shop.data.shape == (1000,)

model_shop.data.head()

 Using your code from the previous lesson, create a `fit` method for your `GarchModel` class. 


In [None]:
# def fit(self,p, q):

#         """Create model, fit to `self.data`, and attach to `self.model` attribute.
#         For assignment, also assigns adds metrics to `self.aic` and `self.bic`.

#         Parameters
#         ----------
#         p : int
#             Lag order of the symmetric innovation

#         q : ind
#             Lag order of lagged volatility

#         Returns
#         -------
#         None
#         """
#         # Train Model, attach to `self.model`
#         self.model = arch_model(self.data, p=p, q=q, rescale=False).fit(disp=0)

In [None]:
# Instantiate `GarchModel`, use old data
model_shop = GarchModel(ticker="SHOPERSTOP.BSE", repo=repo, use_new_data=False)

# Wrangle data
model_shop.wrangle_data(n_observations=1000)

# Fit GARCH(1,1) model to data
model_shop.fit(p=1, q=1)

# Does `model_shop` have a `model` attribute now?
assert hasattr(model_shop, "model")

# Is model correct data type?
assert isinstance(model_shop.model, ARCHModelResult)

# Does model have correct parameters?
assert model_shop.model.params.index.tolist() == ["mu", "omega", "alpha[1]", "beta[1]"]

# Check model parameters
model_shop.model.summary()

 Create a `predict_volatility` method for your `GarchModel` class. 

In [None]:
# def __clean_prediction(self, prediction):

#         """Reformat model prediction to JSON.

#         Parameters
#         ----------
#         prediction : pd.DataFrame
#             Variance from a `ARCHModelForecast`

#         Returns
#         -------
#         dict
#             Forecast of volatility. Each key is date in ISO 8601 format.
#             Each value is predicted volatility.
#         """
#         # Calculate forecast start date
#         start = prediction.index[0]+pd.DateOffset(days=1)

#         # Create date range
#         prediction_dates = pd.bdate_range(start=start, periods=prediction.shape[1])

#         # Create prediction index labels, ISO 8601 format
#         prediction_index = [d.isoformat() for d in prediction_dates]

#         # Extract predictions from DataFrame, get square root
#         data=prediction.values.flatten()**0.5
#         # Combine `data` and `prediction_index` into Series
#         prediction_formatted= pd.Series(data, index=prediction_index)

#         # Return Series as dictionary
#         return prediction_formatted.to_dict()

In [None]:
# def predict_volatility(self, horizon):

#         """Predict volatility using `self.model`

#         Parameters
#         ----------
#         horizon : int
#             Horizon of forecast, by default 5.

#         Returns
#         -------
#         dict
#             Forecast of volatility. Each key is date in ISO 8601 format.
#             Each value is predicted volatility.
#         """
#         # Generate variance forecast from `self.model`
#         prediction = self.model.forecast(horizon=horizon, reindex=False).variance

#         # Format prediction with `self.__clean_predction`
#         prediction_formatted = self.__clean_prediction(prediction)

#         # Return `prediction_formatted`
#         return prediction_formatted

 Create a `dump` method for your `GarchModel` class.

In [None]:
#     def dump(self):

#         """Save model to `self.model_directory` with timestamp.

#         Returns
#         -------
#         str
#             filepath where model was saved.
#         """
#         # Create timestamp in ISO format
#         timestamp = pd.Timestamp.now().isoformat()
#         # Create filepath, including `self.model_directory`
#         filepath = os.path.join(self.model_directory, f"{timestamp}_{self.ticker}.pkl")
#         # Save `self.model`
#         joblib.dump(self.model, filepath)

#         # Return filepath
#         return filepath

In [None]:
# Save `model_shop` model, assign filename
filename = model_shop.dump()

# Is `filename` a string?
assert isinstance(filename, str)

# Does filename include ticker symbol?
assert model_shop.ticker in filename

# Does file exist?
assert os.path.exists(filename)

filename

 Create a `load` function below that will take a ticker symbol as input and return a model. 

In [None]:
def load(ticker):

    """Load latest model from model directory.

    Parameters
     ----------
    ticker : str
    Ticker symbol for which model was trained.

    Returns
    -------
    `ARCHModelResult`
    """
    # Create pattern for glob search
    pattern= os.path.join(settings.model_directory, f"*{ticker}.pkl")

     # Try to find path of latest model
    try:
        model_path=sorted(glob(pattern))[-1]
    # Handle possible `IndexError`
    except IndexError:
        raise Exception(f"No Model trained for '{ticker}'." )
    # Load model
    model = joblib.load(model_path)

    # Return model
    return model

# MAIN MODULE 
Similar to the interactive applications we made in Projects 6 and 7, our first step here will be to create an `app` object. This time, instead of being a plotly application, it'll be a FastAPI application.

 In the `main` module, instantiate a FastAPI application named `app`.

- [Instantiate an application in FastAPI.](../%40textbook/22-apis.ipynb#Creating-a-Path)

In the cmd on  the directory for the s project, start the app server by entering the following command.

```bash
uvicorn main:app --reload --workers 1 --host localhost --port 8008
```

In [None]:
url = "http://localhost:8008/hello"
response = requests.get(url=url)

print("response code:", response.status_code)
response.json()

## `"/fit"` Path

Our first path will allow the user to fit a model to stock data when they make a `post` request to our server. They'll have the choice to use new data from AlphaVantage, or older data that's already in our database. When a user makes a request, they'll receive a response telling them if the operation was successful or whether there was an error. 

One thing that's very important when building an API is making sure the user passes the correct parameters into the app. Otherwise, our app could crash! FastAPI works well with the [pydantic library](https://pydantic-docs.helpmanual.io/), which checks that each request has the correct parameters and data types. It does this by using special data classes that we need to define. Our `"/fit"` path will take user input and then output a response, so we need two classes: one for input and one for output.

With our data classes defined, let's see how pydantic ensures our that users are supplying the correct input and our application is returning the correct output.

Create a `build_model` function in your `main` module.

**Task 8.4.16:** Create a `"/fit"` path for your `app`. It will take a `FitIn` object as input, and then build a `GarchModel` using the `build_model` function. The model will wrangle the needed data, fit to the data, and save the completed model. Finally, it will send a response in the form of a `FitOut` object. Be sure to handle any errors that may arise. 

- [Create an application path in FastAPI.](../%40textbook/22-apis.ipynb#Creating-a-Path)

In [None]:
# Task 8.4.16, `"/fit" path, 200 status code
@app.post("/fit", status_code=200, response_model=FitOut)
def fit_model(request: FitIn):
    """Fit model, return confirmation message.

    Parameters
    ----------
    request : FitIn

    Returns
    ------
    dict
        Must conform to `FitOut` class
    """
    # Create `response` dictionary from `request`
    response=request.dict()
    # Create try block to handle exceptions
    try:
        # Build model with `build_model` function
        model=build_model(ticker=request.ticker, use_new_data=request.use_new_data)
        # Wrangle data
        model.wrangle_data(n_observations=request.n_observations)
        # Fit model
        model.fit(p=request.p, q=request.q)
        # Save model
        filename=model.dump()
        # Add `"success"` key to `response`
        response["success"]=True
        # Add `"message"` key to `response` with `filename`
        response["message"]=f"Trained and saved '{filename}'."
    # Create except block
    try Exception as e:
        # Add `"success"` key to `response`
        response["success"]= False
        # Add `"message"` key to `response` with error message
        response["message"]=str(e)

    # Return response
    return response

## `"/predict"` Path

For our `"/predict"` path, users will be able to make a `post` request with the ticker symbol they want a prediction for and the number of days they want to forecast into the future. Our app will return a forecast or, if there's an error, a message explaining the problem.

The setup will be very similar to our `"/fit"` path. We'll start with data classes for the in- and output.

*Create definitions for a `PredictIn` and `PredictOut` data class. The `PredictIn` class should inherit from the pydantic `BaseModel`, and the `PredictOut` class should inherit from the `PredictIn` class. 

In [None]:

# Task 8.4.18, `PredictIn` class
class PredictIn(BaseModel):
    ticker:str
    n_days:int


# Task 8.4.18, `PredictOut` class
class PreductOut(PredictIn):
    success:bool
    forecast:dict
    message:str

 Create a `"/predict"` path for your `app`. It will take a `PredictIn` object as input, build a `GarchModel`, load the most recent trained model for the given ticker, and generate a dictionary of predictions. 

In [None]:
# Task 8.4.19 `"/predict" path, 200 status code
@app.post("/predict", status_code=200, response_mode=PredictOut)
def get_prediction(request: PredictIn):

    # Create `response` dictionary from `request`
    response=request.dict()

    # Create try block to handle exceptions
    try:
        # Build model with `build_model` function
        model=build_model(ticker=request.ticker)

        # Load stored model
        model.load()

        # Generate prediction

        prediction=model.predict_volatility(horizon=request.n_days)
        # Add `"success"` key to `response`
        response["success"]=True

        # Add `"forecast"` key to `response`
        response["forecast"]=prediction

        # Add `"message"` key to `response`
        response["message"]=str(e)

    # Create except block
    except Exception as e:
        # Add `"success"` key to `response`
        response["success"]=False

        # Add `"forecast"` key to `response`
        response["forecast"]={}

        #  Add `"message"` key to `response`
        response["message"]=str(e)

    # Return response
    return response

 Create a `post` request to hit the `"/predict"` path running at `"http://localhost:8008"`. 

In [None]:
# URL of `/predict` path
url = "http://localhost:8008/predict"
# Data to send to path
json = {"ticker":"SHOPERSTOP.BSE", "n_days":3}
# Response of post request
response = requests.post(url=url, json=json)
# Response JSON to be submitted to grader
submission = response.json()
# Inspect JSON
submission