<a href="https://colab.research.google.com/github/anissa762/goldfish/blob/main/goldfish4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [19]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [20]:
!pip install --upgrade optuna ta catboost
!pip install optuna-integration




In [21]:
import pandas as pd
import numpy as np
import glob
import os
from datetime import datetime
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error
import optuna
import pickle
import matplotlib.pyplot as plt
import seaborn as sns
import ta
from catboost import CatBoostRegressor, Pool

In [22]:
#---------------------------------------------
# User-Defined Parameters
#---------------------------------------------
DATA_PATH = '/content/drive/MyDrive/data/'
ELECTION_DATA_PATH = '/content/drive/MyDrive/final_stock_rankings2.csv'

# Prediction horizon (days)
TARGET_WINDOW_DAYS = 30

# Investment amount
TOTAL_INVESTMENT = 30.0

# Number of top stocks
TOP_N = 5

# Prioritize Dec/Jan weighting
DEC_JAN_WEIGHT = 2.0  # adjust this factor as needed

# Election Date (Assuming US Presidential Election 2024)
ELECTION_DATE = datetime(2024, 11, 5)

In [23]:
#---------------------------------------------
# Load Election Data
#---------------------------------------------
if os.path.exists(ELECTION_DATA_PATH):
    election_data = pd.read_csv(ELECTION_DATA_PATH)
else:
    raise FileNotFoundError("Election impact data not found. Please ensure final_stock_rankings2.csv is in data folder.")

election_data['ElectionImpact'] = election_data['FinalScore']

# Keep track of tickers with election data
valid_tickers = set(election_data['Ticker'].unique())

In [24]:
#---------------------------------------------
# Load and Preprocess Stock Data
#---------------------------------------------
csv_files = glob.glob(os.path.join(DATA_PATH, '*_data_cleaned.csv'))

def add_technical_indicators(df):
    df = df.copy()
    # Moving Averages
    df['MA10'] = df['Close'].rolling(window=10).mean()
    df['MA50'] = df['Close'].rolling(window=50).mean()
    # RSI
    df['RSI'] = ta.momentum.RSIIndicator(df['Close'], window=14).rsi()
    # MACD
    macd = ta.trend.MACD(df['Close'])
    df['MACD'] = macd.macd()
    df['MACD_signal'] = macd.macd_signal()
    df['MACD_diff'] = macd.macd_diff()
    # Bollinger Bands
    bollinger = ta.volatility.BollingerBands(df['Close'], window=20, window_dev=2)
    df['Bollinger_High'] = bollinger.bollinger_hband()
    df['Bollinger_Low'] = bollinger.bollinger_lband()
    # Volume MA
    df['Volume_MA20'] = df['Volume'].rolling(window=20).mean()
    # OBV
    df['OBV'] = ta.volume.OnBalanceVolumeIndicator(df['Close'], df['Volume']).on_balance_volume()
    # EMAs
    df['EMA10'] = ta.trend.EMAIndicator(df['Close'], window=10).ema_indicator()
    df['EMA50'] = ta.trend.EMAIndicator(df['Close'], window=50).ema_indicator()

    df.dropna(inplace=True)
    return df

stock_data = {}
for file in csv_files:
    ticker = os.path.basename(file).split('_')[0]
    # Only consider tickers that appear in election_data
    if ticker not in valid_tickers:
        continue
    df = pd.read_csv(file)
    df['Date'] = pd.to_datetime(df['Date'])
    df.sort_values('Date', inplace=True)
    df.dropna(inplace=True)
    df.reset_index(drop=True, inplace=True)
    df = add_technical_indicators(df)

    # Merge ElectionImpact
    score_val = election_data.loc[election_data['Ticker'] == ticker, 'ElectionImpact']
    if not score_val.empty:
        election_impact = score_val.values[0]
    else:
        # If ticker not found, skip
        continue
    df['ElectionImpact'] = election_impact

    # Additional Date Features
    df['Month'] = df['Date'].dt.month
    df['DayOfYear'] = df['Date'].dt.dayofyear

    # Post-Election Features
    df['DaysSinceElection'] = (df['Date'] - ELECTION_DATE).dt.days
    df['PostElection'] = (df['Date'] > ELECTION_DATE).astype(int)

    # Future Return as target
    df['Future_Close'] = df['Close'].shift(-TARGET_WINDOW_DAYS)
    df['Target_Return'] = ((df['Future_Close'] - df['Close']) / df['Close']) * 100
    df.dropna(inplace=True)

    stock_data[ticker] = df


In [25]:
#---------------------------------------------
# Combine Data
#---------------------------------------------
feature_cols = ['Open', 'Close', 'High', 'Low', 'Volume',
                'MA10', 'MA50', 'RSI', 'MACD', 'MACD_signal', 'MACD_diff',
                'Bollinger_High', 'Bollinger_Low', 'Volume_MA20', 'OBV',
                'EMA10', 'EMA50', 'ElectionImpact', 'Month', 'DayOfYear',
                'DaysSinceElection', 'PostElection']

X_list = []
y_list = []
weights_list = []

for ticker, df in stock_data.items():
    if len(df) < (TARGET_WINDOW_DAYS + 1):
        continue
    X_list.append(df[feature_cols].values)
    y_list.append(df['Target_Return'].values)
    # Weight samples: December (12) and January (1) get higher weights
    sample_weights = np.ones(len(df))
    dec_jan_mask = (df['Month'] == 12) | (df['Month'] == 1)
    sample_weights[dec_jan_mask] = DEC_JAN_WEIGHT
    weights_list.append(sample_weights)

X = np.vstack(X_list)
y = np.hstack(y_list)
sample_weights = np.hstack(weights_list)

# Optional: Remove extreme outliers in y to improve stability
y_median = np.median(y)
y_mad = np.median(np.abs(y - y_median))
threshold = 3 * y_mad
mask = (np.abs(y - y_median) < threshold)
X = X[mask]
y = y[mask]
sample_weights = sample_weights[mask]


In [26]:
#---------------------------------------------
# Train-Test Split
# Using TimeSeriesSplit to simulate historical scenario
#---------------------------------------------
tscv = TimeSeriesSplit(n_splits=5)
for train_index, test_index in tscv.split(X):
    X_train, X_test = X[train_index], X[test_index]
    y_train, y_test = y[train_index], y[test_index]
    w_train, w_test = sample_weights[train_index], sample_weights[test_index]

# We do NOT scale features by default since CatBoost can handle raw features well.
# If needed, we could try scaling again, but let's skip it to see if performance improves.
# from sklearn.preprocessing import StandardScaler
# scaler = StandardScaler()
# X_train = scaler.fit_transform(X_train)
# X_test = scaler.transform(X_test)


In [None]:
#---------------------------------------------
# Hyperparameter Tuning with Optuna (CatBoost)
#---------------------------------------------
from catboost import cv as catboost_cv

STUDY_DB_PATH = os.path.join(DATA_PATH, 'catboost_study2.db')
STUDY_NAME = 'catboost_hyperparam_tuning2'

def objective_catboost(trial):
    params = {
        'iterations': trial.suggest_int('iterations', 500, 3000),
        'depth': trial.suggest_int('depth', 4, 12),
        'learning_rate': trial.suggest_float('learning_rate', 1e-4, 0.3, log=True),
        'l2_leaf_reg': trial.suggest_float('l2_leaf_reg', 1, 15),
        'random_strength': trial.suggest_float('random_strength', 1, 10),
        'bagging_temperature': trial.suggest_float('bagging_temperature', 0, 10),
        'od_wait': 50,
        'random_seed': 42,
        'loss_function': 'RMSE',
        'verbose': False
    }

    cat_data = Pool(X_train, y_train, weight=w_train)
    cv_out = catboost_cv(cat_data, params, fold_count=3, plot=False, verbose=False, early_stopping_rounds=1)
    best_rmse = cv_out['test-RMSE-mean'].min()
    return best_rmse

study = optuna.create_study(
    study_name=STUDY_NAME,
    storage=f"sqlite:///{STUDY_DB_PATH}",
    load_if_exists=True,
    direction='minimize'
)

# Increase number of trials and/or time for better search
study.optimize(objective_catboost, n_trials=70, timeout=1800000)

best_params = study.best_params
best_params.update({
    'loss_function': 'RMSE',
    'random_seed': 42,
    'verbose': False
})

with open(os.path.join(DATA_PATH, 'best_params_catboost2.pkl'), 'wb') as f:
    pickle.dump(best_params, f)


[I 2024-12-12 14:58:03,132] Using an existing study with name 'catboost_hyperparam_tuning2' instead of creating a new one.


Training on fold [0/3]


In [None]:
#---------------------------------------------
# Train Final Model with Best Params
#---------------------------------------------
final_model = CatBoostRegressor(**best_params)

final_model.fit(X_train, y_train, sample_weight=w_train, eval_set=(X_test, y_test), use_best_model=True)

y_pred = final_model.predict(X_test)

rmse = np.sqrt(mean_squared_error(y_test, y_pred))
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
print(f"R² Score: {r2:.4f}")
print(f"RMSE: {rmse:.4f}")
print(f"MAE: {mae:.4f}")

final_model.save_model(os.path.join(DATA_PATH, 'final_catboost_model2.cbm'))

# Make Predictions for Investment Allocation
allocation_candidates = []
for ticker, df in stock_data.items():
    if len(df) < (TARGET_WINDOW_DAYS + 1):
        continue
    latest_row = df.iloc[-1]
    feat = latest_row[feature_cols].values.reshape(1, -1)
    pred_return = final_model.predict(feat)[0]
    allocation_candidates.append({
        'Stock': ticker,
        'Predicted_Return': pred_return
    })

alloc_df = pd.DataFrame(allocation_candidates)
alloc_df.sort_values(by='Predicted_Return', ascending=False, inplace=True)
alloc_df.reset_index(drop=True, inplace=True)

TOP_N = 250
top_stocks = alloc_df.head(TOP_N).copy()
top_stocks['Investment'] = TOTAL_INVESTMENT / TOP_N
top_stocks['Expected_Profit'] = top_stocks['Investment'] * (top_stocks['Predicted_Return'] / 100)

print("\n### Investment Allocation ###\n")
print(top_stocks[['Stock', 'Investment', 'Predicted_Return', 'Expected_Profit']])
print(f"\nTotal Investment: £{TOTAL_INVESTMENT}")
print(f"Total Expected Profit: £{top_stocks['Expected_Profit'].sum():.2f}")

# Plot Actual vs. Predicted
plt.figure(figsize=(10,6))
plt.scatter(y_test, y_pred, alpha=0.3)
plt.xlabel('Actual Returns (%)')
plt.ylabel('Predicted Returns (%)')
plt.title('Actual vs. Predicted Returns')
plt.plot([y_test.min(), y_test.max()],[y_test.min(), y_test.max()],'r--')
plt.show()

# Residuals
residuals = y_test - y_pred
plt.figure(figsize=(10,6))
sns.histplot(residuals, kde=True, bins=50)
plt.title('Distribution of Residuals')
plt.xlabel('Residuals (%)')
plt.show()

def mean_absolute_percentage_error(y_true, y_pred):
    y_true, y_pred = np.array(y_true), np.array(y_pred)
    non_zero = y_true != 0
    y_true = y_true[non_zero]
    y_pred = y_pred[non_zero]
    return np.mean(np.abs((y_true - y_pred) / y_true)) * 100

mape = mean_absolute_percentage_error(y_test, y_pred)
print(f"MAPE: {mape:.2f}%")

print("Done.")
