# Tasks 2 & 3: Time Series Forecasting and Future Trend Analysis

**Time Series Forecasting for Portfolio Management Optimization**  
**Guide Me in Finance (GMF) Investments**

This notebook implements:
1. ARIMA/SARIMA forecasting models
2. Model comparison and evaluation
3. 12-month future trend forecasts
4. Risk and volatility analysis

In [None]:
# Setup and Imports
import os
import sys
import warnings
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

warnings.filterwarnings('ignore')
sys.path.append('../src')

from data_loader import FinancialDataLoader
from preprocessing import FinancialDataPreprocessor
from forecasting_models import TimeSeriesForecaster

print("✓ All imports successful!")

In [None]:
# Configuration
os.makedirs('../results/forecasting', exist_ok=True)

start_date = "2015-07-01"
end_date = "2024-12-31"
target_asset = "TSLA"
forecast_horizon_days = 252  # 12 months

print(f"Target Asset: {target_asset}")
print(f"Forecast Horizon: 12 months ({forecast_horizon_days} days)")

In [None]:
# Step 1: Load and Prepare Data
print("Loading TSLA data...")
loader = FinancialDataLoader(start_date=start_date, end_date=end_date)
tsla_data = loader.fetch_asset_data(target_asset)

if tsla_data is not None:
    print(f"✓ {len(tsla_data)} records loaded")
    print(f"Current Price: ${tsla_data['Close'].iloc[-1]:.2f}")
    
    # Preprocess
    preprocessor = FinancialDataPreprocessor()
    processed_data = preprocessor.preprocess_asset_data({target_asset: tsla_data})
    tsla_processed = processed_data[target_asset]
    print(f"✓ Data preprocessed: {len(tsla_processed)} records")
else:
    print("Failed to load data")
    # Handle missing values (NaNs) in the processed data
if tsla_processed.isnull().values.any():
    print("Warning: Missing values detected. Filling with forward fill and back fill.")
    tsla_processed = tsla_processed.fillna(method='ffill').fillna(method='bfill')
else:
    print("✓ No missing values detected in processed data.")

## Exploratory Data Analysis (EDA)
 Visualize missing values, outliers, and basic statistics for TSLA.


In [None]:
import seaborn as sns

# Visualize missing values
plt.figure(figsize=(10, 1))
sns.heatmap(tsla_processed.isnull().T, cbar=False)
plt.title("Missing Values in TSLA Processed Data")
plt.show()

# Summary statistics
display(tsla_processed.describe())

# Outlier detection (boxplot)
plt.figure(figsize=(8, 4))
sns.boxplot(x=tsla_processed['Close'])
plt.title("TSLA Close Price Outlier Detection")
plt.show()

## Feature Engineering: Technical Indicators


In [None]:
def add_technical_indicators(df):
    df = df.copy()
    df['MA20'] = df['Close'].rolling(window=20).mean()
    df['MA50'] = df['Close'].rolling(window=50).mean()
    df['RSI'] = 100 - (100 / (1 + df['Close'].pct_change().add(1).rolling(window=14).apply(lambda x: (x[x > 0].sum() / abs(x[x < 0].sum())) if abs(x[x < 0].sum()) > 0 else 0)))
    return df

tsla_processed = add_technical_indicators(tsla_processed)
display(tsla_processed.tail())

In [None]:
# Step 2: Prepare for Modeling
forecaster = TimeSeriesForecaster()

# Split data chronologically
train_data, test_data, train_dates, test_dates = forecaster.prepare_data_for_modeling(
    tsla_processed, target_column='Close', test_size=0.2
)

print(f"Training: {len(train_data)} samples ({train_dates[0].strftime('%Y-%m-%d')} to {train_dates[-1].strftime('%Y-%m-%d')})")
print(f"Testing: {len(test_data)} samples ({test_dates[0].strftime('%Y-%m-%d')} to {test_dates[-1].strftime('%Y-%m-%d')})")

# Visualize split
plt.figure(figsize=(15, 6))
plt.plot(train_data.index, train_data.values, label='Training Data', color='blue')
plt.plot(test_data.index, test_data.values, label='Test Data', color='red')
plt.title(f'{target_asset} Price Data - Train/Test Split')
plt.xlabel('Date')
plt.ylabel('Price ($)')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

In [None]:
# Step 3: ARIMA Model
print("Training ARIMA Model...")
model_results = {}

try:
    # Check stationarity
    stationarity = forecaster.check_stationarity(train_data)
    print(f"Stationarity: {stationarity['conclusion']} (p-value: {stationarity['p_value']:.6f})")
    
    # Fit ARIMA
    arima_model = forecaster.fit_arima_model(train_data)
    arima_forecasts = forecaster.generate_forecasts(
        'ARIMA', test_data, forecast_horizon_days, confidence_level=0.95
    )
    
    # Evaluate
    if len(arima_forecasts['out_sample_predictions']) > 0:
        arima_performance = forecaster.evaluate_model_performance(
            'ARIMA', test_data, arima_forecasts['out_sample_predictions']
        )
        
        model_results['ARIMA'] = {
            'model': arima_model,
            'forecasts': arima_forecasts,
            'performance': arima_performance
        }
        
        print("✓ ARIMA model successful")
        print(f"  RMSE: {arima_performance['RMSE']:.4f}")
        print(f"  MAPE: {arima_performance['MAPE']:.2f}%")
        
except Exception as e:
    print(f"✗ ARIMA failed: {str(e)}")

In [None]:
# Step 4: SARIMA Model (Simplified)
print("Training SARIMA Model (simplified parameters)...")

try:
    sarima_model = forecaster.fit_sarima_model(train_data)
    sarima_forecasts = forecaster.generate_forecasts(
        'SARIMA', test_data, forecast_horizon_days, confidence_level=0.95
    )
    
    if len(sarima_forecasts['out_sample_predictions']) > 0:
        sarima_performance = forecaster.evaluate_model_performance(
            'SARIMA', test_data, sarima_forecasts['out_sample_predictions']
        )
        
        model_results['SARIMA'] = {
            'model': sarima_model,
            'forecasts': sarima_forecasts,
            'performance': sarima_performance
        }
        
        print("✓ SARIMA model successful")
        print(f"  RMSE: {sarima_performance['RMSE']:.4f}")
        print(f"  MAPE: {sarima_performance['MAPE']:.2f}%")
        
except Exception as e:
    print(f"✗ SARIMA failed: {str(e)}")

## ARIMA Model with Grid Search

In [None]:
import itertools
import statsmodels.api as sm

p = d = q = range(0, 2)
pdq = list(itertools.product(p, d, q))
best_aic = float('inf')
best_order = None
for order in pdq:
    try:
        model = sm.tsa.ARIMA(train_data, order=order)
        results = model.fit()
        if results.aic < best_aic:
            best_aic = results.aic
            best_order = order
    except:
        continue

print(f"Best ARIMA order: {best_order} (AIC={best_aic:.2f})")
arima_model = sm.tsa.ARIMA(train_data, order=best_order).fit()

## Walk-Forward Validation for ARIMA


In [None]:
from sklearn.metrics import mean_squared_error

history = list(train_data.values)
predictions = []
for t in range(len(test_data)):
    model = sm.tsa.ARIMA(history, order=best_order)
    model_fit = model.fit()
    yhat = model_fit.forecast()[0]
    predictions.append(yhat)
    history.append(test_data.values[t])

rmse = np.sqrt(mean_squared_error(test_data, predictions))
print(f"Walk-forward RMSE: {rmse:.4f}")

## Step 4b: LSTM Model (Deep Learning)
We implement an LSTM neural network to capture non-linear temporal dependencies in TSLA price data. The model is trained on scaled, windowed sequences and evaluated on out-of-sample data.

In [None]:
# Step 4b: LSTM Model (Deep Learning)
print("Training LSTM Model...")

from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.metrics import mean_squared_error, mean_absolute_error

# Prepare data for LSTM (scale and create sequences)
scaler = MinMaxScaler()
scaled_train = scaler.fit_transform(train_data.values.reshape(-1, 1))
scaled_test = scaler.transform(test_data.values.reshape(-1, 1))

def create_sequences(data, seq_length=30):
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i+seq_length])
        y.append(data[i+seq_length])
    return np.array(X), np.array(y)

seq_length = 30
X_train, y_train = create_sequences(scaled_train, seq_length)
# For test, concatenate last part of train with test for continuity
X_test, y_test = create_sequences(np.concatenate([scaled_train[-seq_length:], scaled_test]), seq_length)

# Build LSTM model
lstm_model = Sequential([
    LSTM(50, return_sequences=True, input_shape=(seq_length, 1)),
    Dropout(0.2),
    LSTM(50),
    Dropout(0.2),
    Dense(1)
])
lstm_model.compile(optimizer='adam', loss='mse')

# Train LSTM
early_stop = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
lstm_model.fit(X_train, y_train, epochs=50, batch_size=32, 
               validation_split=0.1, callbacks=[early_stop], verbose=0)

# Predict on test set
lstm_preds_scaled = lstm_model.predict(X_test)
lstm_preds = scaler.inverse_transform(lstm_preds_scaled)

# Align test set for comparison
lstm_test_actual = test_data.values[seq_length:]

# Evaluate LSTM
rmse = np.sqrt(mean_squared_error(lstm_test_actual, lstm_preds))
mae = mean_absolute_error(lstm_test_actual, lstm_preds)
mape = np.mean(np.abs((lstm_test_actual - lstm_preds.flatten()) / lstm_test_actual)) * 100
direction_acc = np.mean(
    np.sign(np.diff(lstm_test_actual)) == np.sign(np.diff(lstm_preds.flatten()))
)

lstm_performance = {
    'RMSE': rmse,
    'MAE': mae,
    'MAPE': mape,
    'Direction_Accuracy': direction_acc
}

# Generate 12-month (252 trading days) future forecast using LSTM
def forecast_lstm_future(model, last_sequence, n_steps, scaler):
    preds = []
    current_seq = last_sequence.copy()
    for _ in range(n_steps):
        pred = model.predict(current_seq.reshape(1, -1, 1), verbose=0)
        preds.append(pred[0, 0])
        current_seq = np.roll(current_seq, -1)
        current_seq[-1] = pred
    preds = np.array(preds).reshape(-1, 1)
    return scaler.inverse_transform(preds).flatten()

# Prepare last sequence from all data for future forecasting
full_scaled = scaler.transform(tsla_processed['Close'].values.reshape(-1, 1))
last_seq = full_scaled[-seq_length:]
future_lstm_forecast = forecast_lstm_future(lstm_model, last_seq, forecast_horizon_days, scaler)

# Create future forecast index
future_dates = pd.date_range(tsla_processed.index[-1] + pd.Timedelta(days=1), periods=forecast_horizon_days, freq='B')
future_lstm_forecast_series = pd.Series(future_lstm_forecast, index=future_dates, name='LSTM_Forecast')

model_results['LSTM'] = {
    'model': lstm_model,
    'forecasts': {
        'out_sample_predictions': lstm_preds.flatten(),
        'future_forecast': future_lstm_forecast_series
    },
    'performance': lstm_performance
}

print("✓ LSTM model successful")
print(f"  RMSE: {rmse:.4f}")
print(f"  MAPE: {mape:.2f}%")

## Improved LSTM Model with Hyperparameter Tuning


In [None]:
from tensorflow.keras.wrappers.scikit_learn import KerasRegressor
from sklearn.model_selection import GridSearchCV

def build_lstm(units=50, dropout=0.2):
    model = Sequential([
        LSTM(units, return_sequences=True, input_shape=(seq_length, 1)),
        Dropout(dropout),
        LSTM(units),
        Dropout(dropout),
        Dense(1)
    ])
    model.compile(optimizer='adam', loss='mse')
    return model

lstm_reg = KerasRegressor(build_fn=build_lstm, epochs=30, batch_size=32, verbose=0)
param_grid = {'units': [32, 50], 'dropout': [0.2, 0.3]}
grid = GridSearchCV(lstm_reg, param_grid, cv=2)
grid.fit(X_train, y_train)
print("Best LSTM params:", grid.best_params_)

# Use best params for final model
best_lstm = build_lstm(**grid.best_params_)
early_stop = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
best_lstm.fit(X_train, y_train, epochs=50, batch_size=32, validation_split=0.1, callbacks=[early_stop], verbose=0)

In [None]:
# Step 5: Model Comparison
if model_results:
    comparison_data = []
    for model_name, results in model_results.items():
        perf = results['performance']
        comparison_data.append({
            'Model': model_name,
            'RMSE': perf['RMSE'],
            'MAPE': perf['MAPE'],
            'Direction_Accuracy': perf['Direction_Accuracy']
        })
    
    comparison_df = pd.DataFrame(comparison_data).round(4)
    comparison_df = comparison_df.sort_values('RMSE')
    
    print("\nModel Performance Comparison:")
    display(comparison_df)
    
    best_model = comparison_df.iloc[0]['Model']
    print(f"\n🏆 Best Model: {best_model}")
else:
    print("No successful models")
    best_model = None

In [None]:
# Step 6: Future Trend Analysis
if best_model and best_model in model_results:
    forecasts = model_results[best_model]['forecasts']
    future_forecast = forecasts['future_forecast']
    
    current_price = tsla_processed['Close'].iloc[-1]
    forecast_end = future_forecast.iloc[-1]
    total_return = ((forecast_end - current_price) / current_price) * 100
    
    print(f"\n📈 12-Month Forecast Summary ({best_model}):")
    print(f"  Current Price: ${current_price:.2f}")
    print(f"  Forecasted End Price: ${forecast_end:.2f}")
    print(f"  Expected Return: {total_return:.2f}%")
    print(f"  Trend: {'Bullish' if total_return > 0 else 'Bearish'}")
    print(f"  Max Forecast: ${future_forecast.max():.2f}")
    print(f"  Min Forecast: ${future_forecast.min():.2f}")
    
    # Risk Analysis
    returns = future_forecast.pct_change().dropna()
    var_95 = np.percentile(returns, 5) * 100
    max_dd = ((1 + returns).cumprod() / (1 + returns).cumprod().expanding().max() - 1).min() * 100
    
    print(f"\n⚠️ Risk Metrics:")
    print(f"  VaR (95%): {var_95:.2f}%")
    print(f"  Max Drawdown: {max_dd:.2f}%")
    print(f"  Volatility: {returns.std() * np.sqrt(252) * 100:.2f}%")

In [None]:
# Step 7: Visualization
if best_model and best_model in model_results:
    fig, axes = plt.subplots(2, 2, figsize=(16, 12))
    
    # Plot 1: Forecast
    ax1 = axes[0, 0]
    recent = tsla_processed['Close'].iloc[-252:]
    ax1.plot(recent.index, recent.values, label='Historical', color='black')
    ax1.plot(future_forecast.index, future_forecast.values, 
             label=f'{best_model} Forecast', color='red', linewidth=2)
    
    if 'confidence_intervals' in forecasts:
        ci = forecasts['confidence_intervals']
        ax1.fill_between(future_forecast.index, ci.iloc[:, 0], ci.iloc[:, 1],
                        alpha=0.3, color='red', label='95% CI')
    
    ax1.set_title('12-Month Price Forecast')
    ax1.set_ylabel('Price ($)')
    ax1.legend()
    ax1.grid(True, alpha=0.3)
    
    # Plot 2: Returns Distribution
    ax2 = axes[0, 1]
    ax2.hist(returns, bins=30, alpha=0.7, color='skyblue', edgecolor='black')
    ax2.axvline(returns.mean(), color='red', linestyle='--', label=f'Mean: {returns.mean():.4f}')
    ax2.set_title('Forecasted Returns Distribution')
    ax2.set_xlabel('Daily Returns')
    ax2.legend()
    ax2.grid(True, alpha=0.3)
    
    # Plot 3: Cumulative Returns
    ax3 = axes[1, 0]
    cum_returns = (1 + returns).cumprod()
    ax3.plot(future_forecast.index[1:], cum_returns, color='green', linewidth=2)
    ax3.set_title('Forecasted Cumulative Returns')
    ax3.set_ylabel('Cumulative Return')
    ax3.grid(True, alpha=0.3)
    
    # Plot 4: Rolling Volatility
    ax4 = axes[1, 1]
    rolling_vol = returns.rolling(30).std() * np.sqrt(252) * 100
    ax4.plot(future_forecast.index[1:], rolling_vol, color='orange', linewidth=2)
    ax4.set_title('30-Day Rolling Volatility')
    ax4.set_ylabel('Volatility (%)')
    ax4.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()

In [None]:
# Plot all model future forecasts for comparison
plt.figure(figsize=(15, 6))
recent = tsla_processed['Close'].iloc[-252:]
plt.plot(recent.index, recent.values, label='Historical', color='black')
for model_name, results in model_results.items():
    if 'future_forecast' in results['forecasts']:
        plt.plot(results['forecasts']['future_forecast'].index, 
                 results['forecasts']['future_forecast'].values, 
                 label=f"{model_name} Forecast")
plt.title('12-Month Price Forecasts: All Models')
plt.xlabel('Date')
plt.ylabel('Price ($)')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

In [None]:
# Step 8: Save Results
if best_model and best_model in model_results:
    # Save forecast data
    future_forecast.to_csv('../results/forecasting/future_forecasts.csv')
    comparison_df.to_csv('../results/forecasting/model_comparison.csv', index=False)
    
    print("\n✓ Results saved to:")
    print("  - ../results/forecasting/future_forecasts.csv")
    print("  - ../results/forecasting/model_comparison.csv")
    
    print("\n" + "="*60)
    print("TASKS 2 & 3 COMPLETED SUCCESSFULLY!")
    print("="*60)
    print(f"✓ Best Model: {best_model}")
    print(f"✓ 12-month forecast generated")
    print(f"✓ Risk analysis completed")
    print(f"✓ Visualizations created")
else:
    print("⚠️ Analysis incomplete - no successful models")

## 📊 Model Selection and Results Summary

In this notebook, we implemented and compared ARIMA, SARIMA, and LSTM models for time series forecasting of TSLA stock prices. Each model was evaluated using RMSE, MAE, MAPE, and Direction Accuracy. We generated 12-month forecasts and analyzed risk metrics such as Value at Risk (VaR), maximum drawdown, and volatility.

**Key Findings:**
- The best-performing model was selected based on the lowest RMSE on the test set.
- All models captured the general trend, but the LSTM model was able to learn non-linear patterns and provided competitive accuracy.
- Risk analysis on the forecasted period highlighted potential drawdowns and volatility, which are critical for portfolio management.

---

## 📝 Conclusion and Recommendations

- **Model Choice:** The selected best model (see above) is recommended for short-term TSLA price forecasting, but all models should be monitored for performance drift.
- **Forecast Reliability:** Confidence intervals (where available) provide a measure of uncertainty. Forecasts should be interpreted as probabilistic, not deterministic.
- **Limitations:** Forecasts are based solely on historical price data and do not account for external factors (e.g., macroeconomic events, news).
- **Next Steps:** Integrate forecasts with portfolio optimization and backtesting for robust investment decision-making. Regularly retrain models with new data for continued accuracy.

---