![QuantConnect Logo](https://cdn.quantconnect.com/web/i/icon.png)
<hr>

In [15]:
# QuantBook Analysis Tool 
# For more information see [https://www.quantconnect.com/docs/v2/our-platform/research/getting-started]
qb = QuantBook()
spy = qb.AddEquity("SPY")
#btx = qb.AddCrypto("BTCUSD")
history = qb.History(qb.Securities.Keys, 360, Resolution.Daily)

# Indicator Analysis
bbdf = qb.Indicator(BollingerBands(30, 2), spy.Symbol, 360, Resolution.Daily)
bbdf.drop('standarddeviation', axis=1).plot()

In [2]:
import numpy as np
import pandas as pd
from matplotlib.pyplot import subplots
import statsmodels.api as sm

from sklearn.discriminant_analysis import \
     (LinearDiscriminantAnalysis as LDA,
      QuadraticDiscriminantAnalysis as QDA)
from sklearn.naive_bayes import GaussianNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix

In [3]:
def confusion_table(predicted, actual):
    """
    Generate a confusion table with labeled rows and columns.

    Parameters:
    predicted (array-like): Predicted labels (e.g., model predictions).
    actual (array-like): True labels (ground truth).

    Returns:
    pd.DataFrame: Transposed confusion matrix with row and column labels.
    """
    # Generate the confusion matrix
    cm = confusion_matrix(actual, predicted)
    
    # Transpose the matrix
    cm = cm.T
    
    # Create a DataFrame with labels
    confusion_df = pd.DataFrame(
        cm,
        index=["Predicted Down", "Predicted Up"],  # Rows for predicted labels
        columns=["Actual Down", "Actual Up"]       # Columns for actual labels
    )
    
    return confusion_df

In [4]:
df = qb.History(qb.Securities.Keys, 1800, Resolution.Daily)
df

In [6]:
TestSet = qb.History(qb.Securities.Keys, 35, Resolution.Daily)

df = TestSet.reset_index()  # Flatten the multi-index if needed

# Calculate the daily return as the percentage change in the 'close' price
df['Return'] = df['close'].pct_change() * 100

# Create lag columns for the past 5 days
for i in range(1, 6):
    df[f'Lag{i}'] = df['Return'].shift(i)

# Extract the year from the 'time' column
df['Year'] = pd.to_datetime(df['time']).dt.year

# Rename 'Return' to 'Today' to indicate today’s return
df['Today'] = df['Return']

# Determine the 'Direction' column based on the 'Today' return
df['Direction'] = df['Today'].apply(lambda x: 'Up' if x > 0 else 'Down')

# Add Moving Averages
df['SMA_10'] = df['close'].rolling(window=10).mean()  # 10-day Simple Moving Average
df['SMA_20'] = df['close'].rolling(window=20).mean()  # 20-day Simple Moving Average

# Add Relative Strength Index (RSI)
# RSI calculation requires a bit more work
delta = df['close'].diff()
gain = delta.where(delta > 0, 0)
loss = -delta.where(delta < 0, 0)
avg_gain = gain.rolling(window=14).mean()
avg_loss = loss.rolling(window=14).mean()
rs = avg_gain / avg_loss
df['RSI'] = 100 - (100 / (1 + rs))

# Add Bollinger Bands
df['Bollinger_Mid'] = df['close'].rolling(window=20).mean()
df['Bollinger_Upper'] = df['Bollinger_Mid'] + 2 * df['close'].rolling(window=20).std()
df['Bollinger_Lower'] = df['Bollinger_Mid'] - 2 * df['close'].rolling(window=20).std()

# Keep only the relevant columns
TestSet = df[['Year', 'Lag1', 'Lag2', 'Lag3', 'Lag4', 'Lag5', 'volume', 'Today', 'Direction', 'SMA_10', 'SMA_20', 'RSI', 'Bollinger_Upper', 'Bollinger_Lower']]

# Drop rows with NaN values, which appear because of the lagging
TestSet.dropna(inplace=True)

allvars = TestSet.columns.drop(['Today', 'Direction', 'Year'])
TestX = TestSet[allvars]
TestX = sm.add_constant(TestX)
Testy = TestSet.Direction == 'Up'

In [7]:
# Calculate intraday and interday gradients
df['intraday_grads'] = (df['close'] / df['open'] - 1).dropna()
df['interday_grads'] = (df['open'] / df['close'].shift(1) - 1).dropna()

# Normalize gradients to [0,1] range
df['intraday_grads_norm'] = (
    df['intraday_grads'] - df['intraday_grads'].min()
) / (df['intraday_grads'].max() - df['intraday_grads'].min())

df['interday_grads_norm'] = (
    df['interday_grads'] - df['interday_grads'].min()
) / (df['interday_grads'].max() - df['interday_grads'].min())

# Create DataFrame with normalized gradients
grads = df[['interday_grads_norm', 'intraday_grads_norm']]
grads.columns = ['0_inter', '0_intra']

# Add shifted gradients for next 2 days
grads['1_inter'] = df['interday_grads_norm'].shift(-1)
grads['1_intra'] = df['intraday_grads_norm'].shift(-1)
grads['2_inter'] = df['interday_grads_norm'].shift(-2) 
grads['2_intra'] = df['intraday_grads_norm'].shift(-2)

# Remove rows with missing values
grads = grads.dropna()

In [None]:
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score

# Same setup as before
X = grads[['0_inter', '0_intra', '1_inter', '1_intra', '2_inter', '2_intra']]
y_inter = df['interday_grads_norm'].shift(1)[grads.index]
y_intra = df['intraday_grads_norm'].shift(1)[grads.index]

X = X[1:]
y_inter = y_inter[1:]
y_intra = y_intra[1:]

# Split data into train and test sets
X_train, X_test, y_inter_train, y_inter_test = train_test_split(
    X, y_inter, test_size=0.2, shuffle=False
)
_, _, y_intra_train, y_intra_test = train_test_split(
    X, y_intra, test_size=0.2, shuffle=False
)

# Train models
model_inter = LinearRegression()
model_intra = LinearRegression()
model_inter.fit(X_train, y_inter_train)
model_intra.fit(X_train, y_intra_train)

# Get predictions
inter_pred = model_inter.predict(X_test)
intra_pred = model_intra.predict(X_test)

# Calculate both R² and L2 loss
inter_r2 = r2_score(y_inter_test, inter_pred)
intra_r2 = r2_score(y_intra_test, intra_pred)
inter_mse = mean_squared_error(y_inter_test, inter_pred)
intra_mse = mean_squared_error(y_intra_test, intra_pred)

print(f"Interday predictions:")
print(f"R² score: {inter_r2:.4f}")
print(f"L2 loss (MSE): {inter_mse:.4f}")
print(f"\nIntraday predictions:")
print(f"R² score: {intra_r2:.4f}") 
print(f"L2 loss (MSE): {intra_mse:.4f}")