In [None]:
import yfinance as yf
import pandas as pd
import numpy as np

In [None]:
# pull data from api and store as csv to reduce num of api calls
def get_data_to_csv(tickers, start, end):
  import yfinance as yf
  import pandas as pd
  import numpy as np

  result = []
  try:
    for ticker in tickers:
      yf.download(ticker, start=start, end=end).to_csv(ticker + "_data_set.csv")
  except Exception as e:
    return e

  return result

In [None]:
# load data from saved csvs
def load_train_set(tickers):
  trainData = []
  for ticker in tickers:
    df = pd.read_csv(ticker + "_data_set.csv", header=[0, 1], index_col=0)

    # flatten each df for concat
    df = df.stack(level=1).reset_index()
    trainData.append(df)

  # combine datasets to become training set
  df_train = pd.concat(trainData)
  # df_train.index.name = 'index'
  return df_train

In [None]:
# ////// Feature engineering //////
# Relative Strength Index (RSI)
def calculate_rsi(prices, period=14):
  # Calculate price differences
  delta = prices.diff()

  # Separate gains and losses
  gain = np.where(delta > 0, delta, 0)
  loss = np.where(delta < 0, -delta, 0)

  # Convert to pandas Series to align with original index
  gain_series = pd.Series(gain, index=prices.index)
  loss_series = pd.Series(loss, index=prices.index)

  # Calculate rolling averages
  avg_gain = gain_series.rolling(window=period, min_periods=1).mean()
  avg_loss = loss_series.rolling(window=period, min_periods=1).mean()

  # Calculate RSI
  rs = avg_gain / avg_loss
  rsi = 100 - (100 / (1 + rs))

  return rsi

# x Day Moving Average
def calc_moving_avg(prices, period=30):
  return prices.rolling(window=period).mean()

def create_features(df):
  # RSI indicator
  df['RSI30'] = calculate_rsi(df['Close'], 30)

  # Moving Avg indicators
  df['30_Day_Moving_Avg'] = calc_moving_avg(df['Close'], 30)
  df['100_Day_Moving_Avg'] = calc_moving_avg(df['Close'], 100)
  df['365_Day_Moving_Avg'] = calc_moving_avg(df['Close'], 365)

  # lag feautures
  df['Close_Lag1'] = df['Close'].shift(1)

  # VIX close can indicate market volatility
  df['VIX_Close'] = df[df['Ticker'] == '^VIX']['Close']

  return ['RSI30', '30_Day_Moving_Avg', '100_Day_Moving_Avg', '365_Day_Moving_Avg', 'Close_Lag1', 'VIX_Close']

In [None]:
# model choices
# linear regression or lgb
from sklearn.linear_model import LinearRegression
import lightgbm as lgb
from sklearn.model_selection import train_test_split

def train(df_train, features, target, model):
  from sklearn.model_selection import train_test_split
  df = df_train.copy()

  # try linear regression first as a baseline model because it is simple and quick.
  # Many of the features also relate to one another and influence each other
  # issue with linear regression here: sklearn does not like NaN values for linear regression
  # features for training and the target to predict

  if isinstance(model, LinearRegression):
    # issue with linear regression here: sklearn does not like NaN values for linear regression
    pass

  df = df.dropna(subset=features + [target])

  # Split data into training and testing sets
  X = df[features]
  y = df[target]
  X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, shuffle=False)

  if isinstance(model, lgb.LGBMRegressor):
    pass

  if hasattr(model, 'fit'):
    model.fit(X_train, y_train)
  else:
    raise TypeError("Model must have either a 'fit' or 'train_step' method.")

  return (model, X_test, y_test)


In [None]:
# Tickers to use for training
tickers = [
  "AAPL",
  "META",
  "^VIX",
  "TSLA",
  "COST"
]

start = "2020-01-01"
end = "2025-08-01"

# save ticker data from yahoo api as csv files
get_data_to_csv(tickers, start, end)
df_train = load_train_set(tickers)
df_train.head()

# df_train.info()
# df_train.describe()
# print(df_train.shape)

  yf.download(ticker, start=start, end=end).to_csv(ticker + "_data_set.csv")
[*********************100%***********************]  1 of 1 completed
  yf.download(ticker, start=start, end=end).to_csv(ticker + "_data_set.csv")
[*********************100%***********************]  1 of 1 completed
  yf.download(ticker, start=start, end=end).to_csv(ticker + "_data_set.csv")
[*********************100%***********************]  1 of 1 completed
  yf.download(ticker, start=start, end=end).to_csv(ticker + "_data_set.csv")
[*********************100%***********************]  1 of 1 completed
  yf.download(ticker, start=start, end=end).to_csv(ticker + "_data_set.csv")
[*********************100%***********************]  1 of 1 completed
  df = df.stack(level=1).reset_index()
  df = df.stack(level=1).reset_index()
  df = df.stack(level=1).reset_index()
  df = df.stack(level=1).reset_index()
  df = df.stack(level=1).reset_index()


Price,Date,Ticker,Close,High,Low,Open,Volume
0,2020-01-02,AAPL,72.538521,72.598899,71.292311,71.545897,135480400
1,2020-01-03,AAPL,71.83329,72.594055,71.608685,71.765667,146322800
2,2020-01-06,AAPL,72.405678,72.444321,70.703012,70.954188,118387200
3,2020-01-07,AAPL,72.065125,72.671318,71.845347,72.415314,108872000
4,2020-01-08,AAPL,73.224396,73.526287,71.768071,71.768071,132079200


In [None]:
# base model - linear regression
features = create_features(df_train)

base_model, X_test, y_test = train(df_train, features, 'High', LinearRegression())
pred = base_model.predict(X_test)
pred

array([199.30943598, 192.95044801, 193.47188215, ..., 939.09840449,
       939.48952771, 933.7395978 ])

In [None]:
pip install skopt

[31mERROR: Could not find a version that satisfies the requirement skopt (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for skopt[0m[31m
[0m

In [None]:
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
from skopt.space import Real, Integer
from skopt import gp_minimize

space = [
  Integer(50, 500, name='n_estimators'),
  Real(0.01, 0.3, name='learning_rate'),
  Integer(3, 15, name='max_depth'),
  Integer(20, 100, name='num_leaves'),
  Real(0.5, 1.0, name='subsample'),
  Real(0.5, 1.0, name='colsample_bytree'),
  Real(0.0, 1.0, name='reg_alpha'),
  Real(0.0, 1.0, name='reg_lambda'),
  Integer(5, 100, name='min_child_samples'),
  Real(0.0, 1.0, name='min_split_gain'),
]

def objective(params):
  n_estimators, learning_rate, max_depth, num_leaves, subsample, colsample_bytree, reg_alpha, reg_lambda, min_child_samples, min_split_gain  = params

  model = lgb.LGBMRegressor(
    n_estimators=n_estimators,
    learning_rate=learning_rate,
    max_depth=max_depth,
    num_leaves=num_leaves,
    subsample=subsample,
    colsample_bytree=colsample_bytree,
    reg_alpha=reg_alpha,
    reg_lambda=reg_lambda,
    random_state=42,
    min_child_samples=min_child_samples,
    min_split_gain=min_split_gain,
  )

  features = create_features(df_train)
  model, X_test, y_test = train(df_train, features, 'High', model)
  pred = model.predict(X_test)

  mse = mean_squared_error(y_test, pred)
  return mse  # gp_minimize minimizes this


result = gp_minimize(objective, space, n_calls=30, random_state=42)
best_params = result.x

# Define the parameter names in the same order as in your search space
param_names = [
  'n_estimators',
  'learning_rate',
  'max_depth',
  'num_leaves',
  'subsample',
  'colsample_bytree',
  'reg_alpha',
  'reg_lambda',
  'min_child_samples',
  'min_split_gain'
]

# Create a dictionary of best parameters
best_params_dict = dict(zip(param_names, best_params))

# Instantiate the model with the optimized parameters
champion_model = lgb.LGBMRegressor(**best_params_dict)

# Train the model
champion_model, X_test, y_test = train(df_train, features, 'High', champion_model)

champ_pred = champion_model.predict(X_test)
champ_pred

ModuleNotFoundError: No module named 'skopt'

In [None]:
# Model Evaluation
def evaluate(pred, y_test, modelName):
  import matplotlib.pyplot as plt
  from sklearn.metrics import mean_squared_error, r2_score
  from sklearn.metrics import mean_squared_error

  # Ensure y_test and pred are aligned
  min_len = min(len(y_test), len(pred))
  y_test_plot = y_test[:min_len]
  pred_plot = pred[:min_len]

  # Create scatter plot
  plt.figure(figsize=(8, 6))
  plt.scatter(y_test_plot, pred_plot, color='blue', alpha=0.6, label='Predicted vs Actual')

  # Plot reference line (ideal prediction line)
  min_val = min(min(y_test_plot), min(pred_plot))
  max_val = max(max(y_test_plot), max(pred_plot))
  plt.plot([min_val, max_val], [min_val, max_val], 'r--', label='Ideal Line')

  # Add labels and title
  plt.xlabel('Actual Values (y_test)')
  plt.ylabel('Predicted Values (pred)')
  plt.title(modelName + ': Predicted vs Actual')
  plt.legend()
  plt.grid(True)
  plt.tight_layout()

  # Show plot
  plt.show()

  # Compute MSE - the lower the better the model is at predicting the target
  mse = mean_squared_error(y_test, pred)
  print(f"Mean Squared Error: {mse:.2f}")

  from sklearn.metrics import r2_score
  # Compute R² score
  r2 = r2_score(y_test, pred)
  print(f"R² Score: {r2:.2f}")

  from sklearn.metrics import mean_absolute_error
  mae = mean_absolute_error(y_test, pred)
  print(f"Mean Absolute Error: {mae:.2f}")

  rmse = mse ** 0.5
  print(f"Root Mean Squared Error: {rmse:.2f}")

In [None]:
evaluate(pred, y_test, 'Linear Regression')

In [None]:
evaluate(champ_pred, y_test, 'LBGM')

In [None]:
tickers = ['NVDA']
X_test = load_train_set(tickers)
features = create_features(X_test)
X_test = X_test.dropna(subset=features + ['High'])

y_test = X_test['High']
X_test = X_test[features]

pred = base_model.predict(X_test)
evaluate(pred, y_test, 'Logistic Regression')

champ_pred = champion_model.predict(X_test)
evaluate(champ_pred, y_test, 'LBGM')