In [None]:
# Graphing function definitions
import numpy as np
import pandas as pd
import altair as alt
alt.data_transformers.disable_max_rows()

GRAPH_WIDTH = 1024

def _make_plot(wide_df, title):
    melted_df = wide_df.copy().reset_index().rename(columns={"index":"Tick"}).melt("Tick", var_name = "Class", value_name = "Value")
    chart_base = alt.Chart(melted_df)
    selector = alt.selection_point(fields=['Class'])
    chart_selector = chart_base.mark_bar().encode(
            y = f"Class:N",
            color = alt.condition(selector, alt.Color("Class:N"), alt.value("lightgray"))
        ).add_params(selector)
    chart_plot = chart_base.mark_line().encode(
            x = alt.X("Tick"),
            y = alt.Y("Value").scale(zero=False),
            color = alt.Color("Class", legend = None),
            tooltip = ["Class", "Value", "Tick"]
        ).transform_filter(selector
        ).properties(title = f"{title} ({DERIVATION} Derivation)", width = GRAPH_WIDTH
        ).interactive()
    chart_box = chart_base.mark_boxplot().encode(
            x = alt.X("Class"),
            y = alt.Y("Value").scale(zero=False),
            color = alt.Color("Class", legend = None),
        ).transform_filter(selector)
    return (chart_selector | chart_plot | chart_box).configure(background='#BBBBBB')
    
def direct_plot(df):
    return _make_plot(df, "Direct price")

def abs_delta_plot(df):
    return _make_plot(df.diff().shift(-1), "Price delta")

def delta_plot(df):
    return _make_plot(100 * df.diff().shift(-1) / df, "% Price delta")

def error_plot(df):
    error_df = pd.DataFrame()
    for column in df.columns:
        error_df[column] = 100 * (df[column] - df["Reality"]) / df["Reality"]
    return _make_plot(error_df, "% error of prediction")

def cum_error_plot(df):
    error_df = pd.DataFrame()
    for column in df.columns:
        error_df[column] = 100 * (df[column] - df["Reality"]).cumsum() / df["Reality"]
    return _make_plot(error_df, "Cumulative % error of prediction")

def rolling_error_plot(df, window):
    error_df = pd.DataFrame()
    for column in df.columns:
        error_df[column] = 100 * (df[column] - df["Reality"]).rolling(window = window).sum() / df["Reality"]
    return _make_plot(error_df, f"Rolling [window {window}] of % error of prediction")

def abs_error_plot(df):
    abs_error_df = pd.DataFrame()
    for column in df.columns:
        abs_error_df[column] = 100 * abs(df[column] - df["Reality"]) / abs(df["Reality"])
    return _make_plot(abs_error_df, "Abs of % error of prediction")

def cum_abs_error_plot(df):
    abs_error_df = pd.DataFrame()
    for column in df.columns:
        abs_error_df[column] = 100 * abs(df[column] - df["Reality"]).cumsum() / abs(df["Reality"])
    return _make_plot(abs_error_df, "Cummulative abs of % error of prediction")

def rolling_abs_error_plot(df, window):
    abs_error_df = pd.DataFrame()
    for column in df.columns:
        abs_error_df[column] = 100 * abs(df[column] - df["Reality"]).rolling(window = window).sum() / abs(df["Reality"])
    return _make_plot(abs_error_df, f"Rolling [window {window}] of abs of % error of prediction")

def relation_plot(df):
    TITLE = "Relation betwen actual value vs predicted value"
    melted_df = df.copy().melt("Reality", var_name = "Class", value_name = "Value")
    chart_base = alt.Chart(melted_df)
    selector = alt.selection_point(fields=['Class'])
    chart_selector = chart_base.mark_bar().encode(
            y = f"Class:N",
            color = alt.condition(selector, alt.Color("Class:N"), alt.value("lightgray"))
        ).add_params(selector)
    chart_plot = chart_base.mark_point().encode(
            x = alt.X("Reality").scale(zero=False),
            y = alt.Y("Value").scale(zero=False),
            color = alt.Color("Class", legend = None),
            tooltip = ["Class", "Value", "Reality"]
        ).transform_filter(selector
        ).properties(title = TITLE, width = GRAPH_WIDTH
        ).interactive()
    return (chart_selector | chart_plot).configure(background='#BBBBBB')

def rolling_correlation_plot(df, window):
    df_corr = pd.DataFrame()
    for column in df.columns:
        df_corr[column] = df["Reality"].rolling(window=window, min_periods=0).corr(df[column])
    return _make_plot(df_corr, f"Correlation [window {window}] of prediction to reality")

def rolling_mse_plot(df, window):
    df_rmse = pd.DataFrame()
    for column in df.columns:
        df_rmse[column] = ((df[column] - df["Reality"]) ** 2).rolling(window=window, min_periods=0).mean()
    return _make_plot(df_rmse, f"Rolling MSE [window {window}] of prediction to reality")

def rolling_rmse_plot(df, window):
    df_rmse = pd.DataFrame()
    for column in df.columns:
        df_rmse[column] = ((df[column] - df["Reality"]) ** 2).rolling(window=window, min_periods=0).mean() ** 0.5
    return _make_plot(df_rmse, f"Rolling RMSE [window {window}] of prediction to reality")

def rolling_rmsle_plot(df, window):
    df_rmse = pd.DataFrame()
    for column in df.columns:
        df_rmse[column] = ((np.log10(1 + df[column]) - np.log10(1 + df["Reality"])) ** 2).rolling(window=window, min_periods=0).mean() ** 0.5
    return _make_plot(df_rmse, f"Rolling RMSLE [window {window}] of prediction to reality")

def rolling_r_squared_plot(df, window, trim):
    df_r_squared = pd.DataFrame()
    for column in df.columns:
        col_r_squared = []
        for i in range(len(reality)):
            window_prediction = df[column].head(i).tail(window)
            window_reality = df["Reality"].head(i).tail(window)
            mean_reality = window_reality.mean()
            mse_model = ((window_prediction - window_reality) ** 2).mean()
            mse_baseline = ((mean_reality - window_reality) ** 2).mean()
            r_squared = 1 - mse_model / mse_baseline
            col_r_squared.append(r_squared)
        df_r_squared[column] = col_r_squared
    df_r_squared = df_r_squared.tail(-trim)
    return _make_plot(df_r_squared, f"Rolling R-Squared [window {window} - trim {trim}] of prediction to reality")

In [None]:
# Sample model predicting functions definitions
import random as rnd

def predict_keep(reality, inflation):
    prediction = []
    last = 0
    for item in reality:
        prediction.append(last * (1+inflation))
        last = item
    return prediction

def predict_keep_momentum(reality, momentum):
    prediction = []
    last = 0
    deltas = [0] * momentum
    for item in reality:
        prediction.append(last + sum(deltas)/momentum)
        deltas = deltas[1:]
        deltas.append(last - item)
        last = item
    return prediction

def predict_up_momentum(reality, momentum):
    prediction = []
    last = 0
    deltas = [0] * momentum
    for item in reality:
        prediction.append(last + abs(sum(deltas)/momentum))
        deltas = deltas[1:]
        deltas.append(last - item)
        last = item
    return prediction

def predict_rnd_momentum(reality, momentum):
    prediction = []
    last = 0
    deltas = [0] * momentum
    for item in reality:
        prediction.append(last + abs(sum(deltas)/momentum) * (1 - 2*rnd.random()))
        deltas = deltas[1:]
        deltas.append(last - item)
        last = item
    return prediction

def predict_random(items, min_val, max_val):
    return [min_val + (max_val - min_val) * rnd.random() for i in range(items)]

In [None]:
# Sample data
# DataFrame to be wide form
#    Reality: True value of the stock
#    <Prediction_Name>: Value of given prediciton, to be in graphs
TRIM = 20


reality = pd.read_csv("sample_prices.csv")["Adj Close"].tolist()
dataset = {"Reality": reality,
           "Keep": predict_keep(reality, 0),
           "Inflate 0.05%": predict_keep(reality, 0.0005),
           "Keep_Momentum_1": predict_keep_momentum(reality, 1),
           "Keep_Momentum_3": predict_keep_momentum(reality, 3),
           "Keep_Momentum_5": predict_keep_momentum(reality, 5),
           "Keep_Momentum_20": predict_keep_momentum(reality, 20),
           "Up_Momentum_1": predict_up_momentum(reality, 1),
           "Up_Momentum_3": predict_up_momentum(reality, 3),
           "Up_Momentum_5": predict_up_momentum(reality, 5),
           "Up_Momentum_20": predict_up_momentum(reality, 20),
           "Rnd_Momentum_1": predict_rnd_momentum(reality, 1),
           "Rnd_Momentum_3": predict_rnd_momentum(reality, 3),
           "Rnd_Momentum_5": predict_rnd_momentum(reality, 5),
           "Rnd_Momentum_20": predict_rnd_momentum(reality, 20),
           "Random": predict_random(len(reality), min(reality), max(reality))}
df = pd.DataFrame(dataset).tail(-TRIM)

In [None]:
# Perform N-th derivation
DERIVATION = 0

for i in range(DERIVATION):
    df = df.diff().shift(-1)

In [None]:
direct_plot(df)

In [None]:
abs_delta_plot(df)

In [None]:
delta_plot(df)

In [None]:
error_plot(df)

In [None]:
cum_error_plot(df)

In [None]:
rolling_error_plot(df, 50)

In [None]:
abs_error_plot(df)

In [None]:
cum_abs_error_plot(df)

In [None]:
rolling_abs_error_plot(df, 50)

In [None]:
relation_plot(df)

In [None]:
rolling_correlation_plot(df, 50)

In [None]:
rolling_correlation_plot(df, 5000)

In [None]:
rolling_mse_plot(df, 50)

In [None]:
rolling_mse_plot(df, 5000)

In [None]:
rolling_rmse_plot(df, 50)

In [None]:
rolling_rmse_plot(df, 5000)

In [None]:
rolling_rmsle_plot(df, 50)

In [None]:
rolling_rmsle_plot(df, 5000)

In [None]:
rolling_r_squared_plot(df, 50, 50)

In [None]:
rolling_r_squared_plot(df, 50000, 50)