In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import os

In [None]:
# Cell 1: Import necessary libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import warnings

# For time series analysis
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

# For modeling
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error
from sklearn.preprocessing import StandardScaler

In [None]:
# Set up visualization
plt.style.use('fivethirtyeight')
sns.set(style='whitegrid')
warnings.filterwarnings('ignore')

In [None]:
# Cell 2: Load and prepare the data
# Create DataFrame
df = pd.read_csv(os.path.join('backend', 'data', 'tqbr', 'SBER.csv'))

# Convert Date column to datetime
df['Date'] = pd.to_datetime(df['Date'])

# Set Date as index
df.set_index('Date', inplace=True)

# Display basic information
print("Data shape:", df.shape)
print("\nFirst 5 rows:")
print(df.head())
print("\nData types:")
print(df.dtypes)
print("\nBasic statistics:")
print(df.describe())

In [None]:
# Cell 3: Visualize the time series
plt.figure(figsize=(14, 8))
plt.plot(df.index, df['Close'], label='Close Price')
plt.title('Stock Close Price Over Time')
plt.xlabel('Date')
plt.ylabel('Price')
plt.legend()
plt.grid(True)
plt.show()

In [None]:
# Cell 4: Check for stationarity
def test_stationarity(timeseries):
    # Perform Dickey-Fuller test
    result = adfuller(timeseries, autolag='AIC')
    print('ADF Statistic:', result[0])
    print('p-value:', result[1])
    print('Critical Values:')
    for key, value in result[4].items():
        print(f'\t{key}: {value}')
    
    if result[1] <= 0.05:
        print("The series is stationary")
    else:
        print("The series is non-stationary")

print("Stationarity test for Close price:")
test_stationarity(df['Close'])

In [None]:
# Cell 5: Create lagged features for regression models
def create_lagged_features(data, target_col, lags):
    df_lagged = data.copy()
    for lag in range(1, lags + 1):
        df_lagged[f'{target_col}_lag_{lag}'] = df_lagged[target_col].shift(lag)
    return df_lagged

# Create lagged features (using last 5 days)
lags = 5
df_lagged = create_lagged_features(df, 'Close', lags)

# Add additional features
df_lagged['MA_5'] = df_lagged['Close'].shift(1).rolling(window=5).mean()
df_lagged['MA_10'] = df_lagged['Close'].shift(1).rolling(window=10).mean()
df_lagged['Daily_Return'] = df_lagged['Close'].shift(1).pct_change() * 100

# Drop rows with NaN values
df_lagged.dropna(inplace=True)

print("Data with lagged features:")
print(df_lagged.head())

In [None]:
# Cell 6: Split data for forecasting
# For time series, we should use chronological split
train_size = int(len(df_lagged) * 0.8)
train_data = df_lagged.iloc[:train_size]
test_data = df_lagged.iloc[train_size:]

print(f"Training data size: {len(train_data)}")
print(f"Test data size: {len(test_data)}")

In [None]:
# Cell 7: Linear Regression with lagged features
# Prepare features for linear regression
lag_features = [f'Close_lag_{i}' for i in range(1, lags + 1)]
additional_features = ['MA_5', 'MA_10', 'Daily_Return']
additional_features = []
X_train_lr = train_data[lag_features + additional_features]
y_train_lr = train_data['Close']
X_test_lr = test_data[lag_features + additional_features]
y_test_lr = test_data['Close']

# Scale features
scaler = StandardScaler()
X_train_lr_scaled = scaler.fit_transform(X_train_lr)
X_test_lr_scaled = scaler.transform(X_test_lr)

# Train model
lr_model = LinearRegression()
lr_model.fit(X_train_lr_scaled, y_train_lr)

# Make predictions
lr_pred = lr_model.predict(X_test_lr_scaled)

# Evaluate
lr_mse = mean_squared_error(y_test_lr, lr_pred)
lr_mae = mean_absolute_error(y_test_lr, lr_pred)
lr_rmse = np.sqrt(lr_mse)

print("Linear Regression Model:")
print(f"Mean Squared Error: {lr_mse:.2f}")
print(f"Mean Absolute Error: {lr_mae:.2f}")
print(f"Root Mean Squared Error: {lr_rmse:.2f}")

In [None]:
# Cell 8: Random Forest with lagged features
# Train Random Forest model
rf_model = RandomForestRegressor(n_estimators=100, random_state=42)
rf_model.fit(X_train_lr, y_train_lr)

# Make predictions
rf_pred = rf_model.predict(X_test_lr)

# Evaluate
rf_mse = mean_squared_error(y_test_lr, rf_pred)
rf_mae = mean_absolute_error(y_test_lr, rf_pred)
rf_rmse = np.sqrt(rf_mse)

print("\nRandom Forest Model:")
print(f"Mean Squared Error: {rf_mse:.2f}")
print(f"Mean Absolute Error: {rf_mae:.2f}")
print(f"Root Mean Squared Error: {rf_rmse:.2f}")

In [None]:
# Cell 9: ARIMA Model
# Prepare data for ARIMA (only use the close price)
arima_train = train_data['Close']
arima_test = test_data['Close']

# Fit ARIMA model
# Using (5,1,0) as a starting point - can be optimized with grid search
arima_model = ARIMA(arima_train, order=(5, 1, 0))
arima_model_fit = arima_model.fit()

# Make predictions
arima_pred = arima_model_fit.forecast(steps=len(arima_test))

# Evaluate
arima_mse = mean_squared_error(arima_test, arima_pred)
arima_mae = mean_absolute_error(arima_test, arima_pred)
arima_rmse = np.sqrt(arima_mse)

print("\nARIMA Model:")
print(f"Mean Squared Error: {arima_mse:.2f}")
print(f"Mean Absolute Error: {arima_mae:.2f}")
print(f"Root Mean Squared Error: {arima_rmse:.2f}")

In [None]:
# Cell 10: Compare model predictions
plt.figure(figsize=(14, 8))
plt.plot(test_data.index, y_test_lr, label='Actual', linewidth=2)
plt.plot(test_data.index, lr_pred, label='Linear Regression', alpha=0.7)
plt.plot(test_data.index, rf_pred, label='Random Forest', alpha=0.7)
plt.plot(test_data.index, arima_pred, label='ARIMA', alpha=0.7)
plt.title('Stock Price Forecasting: Model Comparison')
plt.xlabel('Date')
plt.ylabel('Price')
plt.legend()
plt.grid(True)
plt.show()

In [None]:
# Cell 11: Model performance comparison
models = ['Linear Regression', 'Random Forest', 'ARIMA']
mse_values = [lr_mse, rf_mse, arima_mse]
mae_values = [lr_mae, rf_mae, arima_mae]
rmse_values = [lr_rmse, rf_rmse, arima_rmse]

# Create a comparison dataframe
performance_df = pd.DataFrame({
    'Model': models,
    'MSE': mse_values,
    'MAE': mae_values,
    'RMSE': rmse_values
})

print("Model Performance Comparison:")
print(performance_df)

# Visualize performance
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
sns.barplot(x='Model', y='MSE', data=performance_df, ax=axes[0])
axes[0].set_title('Mean Squared Error')
sns.barplot(x='Model', y='MAE', data=performance_df, ax=axes[1])
axes[1].set_title('Mean Absolute Error')
sns.barplot(x='Model', y='RMSE', data=performance_df, ax=axes[2])
axes[2].set_title('Root Mean Squared Error')
plt.tight_layout()
plt.show()

In [None]:
# Cell 12: Feature importance for Random Forest
feature_importance = pd.DataFrame({
    'Feature': lag_features + additional_features,
    'Importance': rf_model.feature_importances_
}).sort_values('Importance', ascending=False)

plt.figure(figsize=(10, 6))
sns.barplot(x='Importance', y='Feature', data=feature_importance)
plt.title('Random Forest Feature Importance')
plt.tight_layout()
plt.show()

print("Feature Importance:")
print(feature_importance)

In [None]:
# Cell 13: Time series decomposition
decomposition = seasonal_decompose(df['Close'], model='additive', period=30)  # Assuming monthly seasonality
fig = decomposition.plot()
fig.set_size_inches(14, 10)
plt.show()

In [None]:
# Cell 14: ACF and PACF plots
fig, axes = plt.subplots(1, 2, figsize=(16, 4))
plot_acf(df['Close'], lags=30, ax=axes[0])
plot_pacf(df['Close'], lags=30, ax=axes[1])
plt.show()

In [None]:
# Cell 15: Understanding and Handling Non-Stationarity

# First, let's visualize why the series is non-stationary
plt.figure(figsize=(14, 6))
plt.subplot(1, 2, 1)
plt.plot(df.index, df['Close'])
plt.title('Original Non-Stationary Series')
plt.xlabel('Date')
plt.ylabel('Price')

# Calculate and plot the rolling mean and standard deviation
rolling_mean = df['Close'].rolling(window=30).mean()
rolling_std = df['Close'].rolling(window=30).std()

plt.subplot(1, 2, 2)
plt.plot(df.index, df['Close'], label='Original')
plt.plot(df.index, rolling_mean, label='Rolling Mean')
plt.plot(df.index, rolling_std, label='Rolling Std')
plt.title('Rolling Statistics')
plt.xlabel('Date')
plt.legend()
plt.tight_layout()
plt.show()

In [None]:
# Cell 16: Transform to Stationary Series

# Method 1: First differencing (most common for stock prices)
df['Close_diff'] = df['Close'].diff()
df['Close_diff'].dropna(inplace=True)

# Test stationarity of the differenced series
print("Stationarity test for first difference:")
test_stationarity(df['Close_diff'].dropna())

# Method 2: Log transformation + differencing
df['Close_log'] = np.log(df['Close'])
df['Close_log_diff'] = df['Close_log'].diff()
df['Close_log_diff'].dropna(inplace=True)

print("\nStationarity test for log difference:")
test_stationarity(df['Close_log_diff'].dropna())

# Visualize the transformed series
plt.figure(figsize=(14, 6))
plt.subplot(1, 2, 1)
plt.plot(df.index, df['Close_diff'])
plt.title('First Difference')
plt.xlabel('Date')
plt.ylabel('Price Difference')

plt.subplot(1, 2, 2)
plt.plot(df.index, df['Close_log_diff'])
plt.title('Log Difference')
plt.xlabel('Date')
plt.ylabel('Log Difference')
plt.tight_layout()
plt.show()

In [None]:
# Cell 17: Update ARIMA Model with Differencing

# The 'd' parameter in ARIMA(p,d,q) handles differencing
# Since we found the series is non-stationary, we need d >= 1
# Let's try ARIMA(5,1,0) with the original data (d=1 for first differencing)
arima_train = train_data['Close']
arima_test = test_data['Close']

# Fit ARIMA model with differencing
arima_model = ARIMA(arima_train, order=(5, 1, 0))
arima_model_fit = arima_model.fit()

# Make predictions
arima_pred = arima_model_fit.forecast(steps=len(arima_test))

# Evaluate
arima_mse = mean_squared_error(arima_test, arima_pred)
arima_mae = mean_absolute_error(arima_test, arima_pred)
arima_rmse = np.sqrt(arima_mse)

print("ARIMA Model with Differencing:")
print(f"Mean Squared Error: {arima_mse:.2f}")
print(f"Mean Absolute Error: {arima_mae:.2f}")
print(f"Root Mean Squared Error: {arima_rmse:.2f}")

In [None]:
# Cell 18: Work with Returns Instead of Prices

# Calculate daily returns (percentage change)
df['Daily_Return'] = df['Close'].pct_change() * 100
df['Daily_Return'].dropna(inplace=True)

# Test stationarity of returns
print("\nStationarity test for daily returns:")
test_stationarity(df['Daily_Return'].dropna())

# Visualize returns
plt.figure(figsize=(14, 6))
plt.subplot(1, 2, 1)
plt.plot(df.index, df['Daily_Return'])
plt.title('Daily Returns')
plt.xlabel('Date')
plt.ylabel('Return (%)')

# Histogram of returns
plt.subplot(1, 2, 2)
plt.hist(df['Daily_Return'].dropna(), bins=50, alpha=0.7)
plt.title('Distribution of Daily Returns')
plt.xlabel('Return (%)')
plt.ylabel('Frequency')
plt.tight_layout()
plt.show()

In [None]:
# Cell 19: Update Models to Work with Returns

# Create lagged features for returns
df_returns = df[['Daily_Return']].copy()
for lag in range(1, 6):
    df_returns[f'Return_lag_{lag}'] = df_returns['Daily_Return'].shift(lag)

# Add moving averages of returns
df_returns['MA_5'] = df_returns['Daily_Return'].shift(1).rolling(window=5).mean()
df_returns['MA_10'] = df_returns['Daily_Return'].shift(1).rolling(window=10).mean()

# Drop rows with NaN values
df_returns.dropna(inplace=True)

# Split data
train_size = int(len(df_returns) * 0.8)
train_returns = df_returns.iloc[:train_size]
test_returns = df_returns.iloc[train_size:]

# Prepare features
return_features = [f'Return_lag_{i}' for i in range(1, 6)] + ['MA_5', 'MA_10']
X_train_ret = train_returns[return_features]
y_train_ret = train_returns['Daily_Return']
X_test_ret = test_returns[return_features]
y_test_ret = test_returns['Daily_Return']

# Train models on returns
lr_model_ret = LinearRegression()
lr_model_ret.fit(X_train_ret, y_train_ret)
lr_pred_ret = lr_model_ret.predict(X_test_ret)

rf_model_ret = RandomForestRegressor(n_estimators=100, random_state=42)
rf_model_ret.fit(X_train_ret, y_train_ret)
rf_pred_ret = rf_model_ret.predict(X_test_ret)

# Evaluate
lr_mse_ret = mean_squared_error(y_test_ret, lr_pred_ret)
rf_mse_ret = mean_squared_error(y_test_ret, rf_pred_ret)

print(f"Linear Regression on Returns - MSE: {lr_mse_ret:.4f}")
print(f"Random Forest on Returns - MSE: {rf_mse_ret:.4f}")

In [None]:
# Cell 20: Convert Return Predictions Back to Price Predictions

# Get the last known price
last_price = df['Close'].iloc[train_size + len(test_returns) - 1]

# Convert return predictions to price predictions
lr_price_pred = [last_price]
for ret in lr_pred_ret:
    lr_price_pred.append(lr_price_pred[-1] * (1 + ret/100))

rf_price_pred = [last_price]
for ret in rf_pred_ret:
    rf_price_pred.append(rf_price_pred[-1] * (1 + ret/100))

# Get actual prices for comparison
actual_prices = df['Close'].iloc[train_size:train_size+len(test_returns)+1].values

# Evaluate price predictions
lr_price_mse = mean_squared_error(actual_prices[1:], lr_price_pred[1:])
rf_price_mse = mean_squared_error(actual_prices[1:], rf_price_pred[1:])

print(f"Linear Regression (from returns) - Price MSE: {lr_price_mse:.2f}")
print(f"Random Forest (from returns) - Price MSE: {rf_price_mse:.2f}")

# Visualize results
plt.figure(figsize=(14, 8))
plt.plot(test_data.index[:len(actual_prices)], actual_prices, label='Actual', linewidth=2)
plt.plot(test_data.index[:len(lr_price_pred[1:])], lr_price_pred[1:], label='Linear Regression (from returns)', alpha=0.7)
plt.plot(test_data.index[:len(rf_price_pred[1:])], rf_price_pred[1:], label='Random Forest (from returns)', alpha=0.7)
plt.title('Stock Price Forecasting Using Returns')
plt.xlabel('Date')
plt.ylabel('Price')
plt.legend()
plt.grid(True)
plt.show()

In [None]:












# Cell 21: Compare All Approaches

# Create a comparison dataframe
comparison_data = {
    'Model': ['ARIMA (Original)', 'Linear Regression (Original)', 'Random Forest (Original)',
              'Linear Regression (Returns)', 'Random Forest (Returns)'],
    'MSE': [arima_mse, lr_mse, rf_mse, lr_price_mse, rf_price_mse],
    'Approach': ['Differencing', 'Lagged Features', 'Lagged Features', 
                 'Returns → Prices', 'Returns → Prices']
}

comparison_df = pd.DataFrame(comparison_data)
print("Model Performance Comparison:")
print(comparison_df.sort_values('MSE'))

# Visualize comparison
plt.figure(figsize=(12, 6))
sns.barplot(x='Model', y='MSE', data=comparison_df)
plt.title('Model Performance Comparison')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()