In [None]:
import pandas as pd
import numpy as np
from statsmodels.tsa.arima.model import ARIMA
from pmdarima import auto_arima
from prophet import Prophet
from sklearn.metrics import mean_squared_error, mean_absolute_error
import matplotlib.pyplot as plt
from datetime import timedelta
import warnings
warnings.filterwarnings('ignore')

In [None]:
def load_and_prepare_data(stock_csv, balance_csv, cashflow_csv, income_csv, ticker):
    """
    Load and preprocess stock and financial data, aligning them for modeling.
    """
    try:
        # Load stock data
        stock_df = pd.read_csv(stock_csv)
        stock_df['Date'] = pd.to_datetime(stock_df['Date'])
        stock_df = stock_df[stock_df['Ticker'] == ticker][['Date', 'Close', 'Volume']].sort_values('Date')
        
        # Create daily date range and fill missing dates
        date_range = pd.date_range(start=stock_df['Date'].min(), end=stock_df['Date'].max(), freq='D')
        stock_df = stock_df.set_index('Date').reindex(date_range, method='ffill').reset_index()
        stock_df = stock_df.rename(columns={'index': 'Date'})
        stock_df['Ticker'] = ticker
        
        print(f"Loaded {len(stock_df)} daily stock data points for {ticker}")
        
        # Load financial data
        balance_df = pd.read_csv(balance_csv)
        cashflow_df = pd.read_csv(cashflow_csv)
        income_df = pd.read_csv(income_csv)
        
        # Select key financial metrics
        financial_metrics = {
            'Diluted EPS': income_df[income_df['index'] == 'Diluted EPS'][['2024-09-30 00:00:00']].iloc[0, 0],
            'Free Cash Flow': cashflow_df[cashflow_df['index'] == 'Free Cash Flow'][['2024-09-30 00:00:00']].iloc[0, 0],
            'Net Debt': balance_df[balance_df['index'] == 'Net Debt'][['2024-09-30 00:00:00']].iloc[0, 0],
            'EBITDA': income_df[income_df['index'] == 'EBITDA'][['2024-09-30 00:00:00']].iloc[0, 0]
        }
        
        # Create financial DataFrame aligned with stock data
        financial_df = pd.DataFrame(index=stock_df['Date'])
        for metric, value in financial_metrics.items():
            financial_df[metric] = value
        
        # Merge stock and financial data
        merged_df = stock_df.merge(financial_df.reset_index(), on='Date', how='left')
        merged_df = merged_df.set_index('Date')
        
        print(f"Prepared data with {len(merged_df)} rows, including financial features: {list(financial_metrics.keys())}")
        return merged_df
    
    except FileNotFoundError as e:
        print(f"Error: File not found - {e}")
        return None
    except Exception as e:
        print(f"Error preparing data: {e}")
        return None

In [None]:
def calculate_metrics(actual, predicted):
    """
    Calculate RMSE, MAE, and MAPE for model evaluation.
    """
    rmse = np.sqrt(mean_squared_error(actual, predicted))
    mae = mean_absolute_error(actual, predicted)
    mape = np.mean(np.abs((actual - predicted) / actual)) * 100
    return {'RMSE': rmse, 'MAE': mae, 'MAPE': mape}

In [None]:
def arima_forecast(data, forecast_horizon=7):
    """
    Fit ARIMA model using auto_arima and forecast future prices.
    """
    try:
        # Tune ARIMA with auto_arima
        model = auto_arima(data, seasonal=False, max_p=5, max_q=5, max_d=2, 
                          stepwise=True, trace=True, error_action='ignore')
        best_order = model.order
        print(f"Best ARIMA order: {best_order}")
        
        # Fit ARIMA with best order
        arima_model = ARIMA(data, order=best_order)
        model_fit = arima_model.fit()
        
        # Forecast
        forecast = model_fit.forecast(steps=forecast_horizon)
        
        # Evaluate on last 7 days (if available)
        if len(data) >= forecast_horizon:
            test_data = data[-forecast_horizon:]
            forecast_test = model_fit.forecast(steps=forecast_horizon)[-forecast_horizon:]
            metrics = calculate_metrics(test_data, forecast_test)
        else:
            metrics = {'RMSE': np.nan, 'MAE': np.nan, 'MAPE': np.nan}
        
        print("\nARIMA Model Summary:")
        print(model_fit.summary())
        print(f"ARIMA Metrics: {metrics}")
        
        return forecast, metrics, best_order
    except Exception as e:
        print(f"Error in ARIMA forecasting: {e}")
        return None, None, None

In [None]:
def prophet_forecast(data, forecast_horizon=7, changepoint_prior_scale=0.05):
    """
    Fit Prophet model with financial regressors and forecast future prices.
    """
    try:
        # Prepare data for Prophet
        prophet_df = data.reset_index()[['Date', 'Close', 'Diluted EPS', 'Free Cash Flow', 'Net Debt', 'EBITDA']]
        prophet_df = prophet_df.rename(columns={'Date': 'ds', 'Close': 'y'})
        
        # Fit Prophet model with regressors
        model = Prophet(daily_seasonality=True, yearly_seasonality=True, weekly_seasonality=True,
                       changepoint_prior_scale=changepoint_prior_scale)
        for regressor in ['Diluted EPS', 'Free Cash Flow', 'Net Debt', 'EBITDA']:
            model.add_regressor(regressor)
        model.fit(prophet_df)
        
        # Create future dataframe
        future = model.make_future_dataframe(periods=forecast_horizon)
        for regressor in ['Diluted EPS', 'Free Cash Flow', 'Net Debt', 'EBITDA']:
            future[regressor] = prophet_df[regressor].iloc[-1]
        forecast_df = model.predict(future)
        
        # Extract forecast
        forecast = forecast_df[['ds', 'yhat']].tail(forecast_horizon).set_index('ds')['yhat']
        
        # Evaluate on last 7 days (if available)
        if len(prophet_df) >= forecast_horizon:
            test_data = prophet_df['y'][-forecast_horizon:]
            forecast_test = model.predict(prophet_df[-forecast_horizon:])['yhat']
            metrics = calculate_metrics(test_data, forecast_test)
        else:
            metrics = {'RMSE': np.nan, 'MAE': np.nan, 'MAPE': np.nan}
        
        print(f"\nProphet Metrics (changepoint_prior_scale={changepoint_prior_scale}):")
        print(f"Prophet Metrics: {metrics}")
        
        return forecast, metrics, forecast_df
    except Exception as e:
        print(f"Error in Prophet forecasting: {e}")
        return None, None, None

In [None]:
def tune_prophet(data, forecast_horizon=7):
    """
    Tune Prophet model by testing multiple changepoint_prior_scale values.
    """
    scales = [0.05, 0.1, 0.5]
    best_metrics = {'MAPE': float('inf')}
    best_forecast = None
    best_forecast_df = None
    best_scale = None
    
    for scale in scales:
        forecast, metrics, forecast_df = prophet_forecast(data, forecast_horizon, changepoint_prior_scale=scale)
        if metrics and metrics['MAPE'] < best_metrics['MAPE']:
            best_metrics = metrics
            best_forecast = forecast
            best_forecast_df = forecast_df
            best_scale = scale
    
    print(f"\nBest Prophet changepoint_prior_scale: {best_scale}")
    print(f"Best Prophet Metrics: {best_metrics}")
    
    return best_forecast, best_metrics, best_forecast_df, best_scale

In [None]:
def backtest_strategy(data, predictions, model_name):
    """
    Backtest a simple trading strategy based on predictions.
    """
    try:
        # Align predictions with data
        pred_df = pd.DataFrame({'Date': data.index, 'Close': data['Close'], 'Prediction': predictions})
        pred_df = pred_df.dropna()
        
        # Generate trading signals: Buy if next day's prediction > current, Sell if < current
        pred_df['Signal'] = 0
        pred_df['Signal'][1:] = np.where(pred_df['Prediction'].shift(-1)[1:] > pred_df['Prediction'][1:], 1, -1)
        
        # Calculate daily returns
        pred_df['Return'] = pred_df['Close'].pct_change()
        
        # Strategy returns: Hold position based on signal
        pred_df['Strategy_Return'] = pred_df['Signal'].shift(1) * pred_df['Return']
        
        # Cumulative return
        cumulative_return = (1 + pred_df['Strategy_Return']).cumprod().iloc[-1] - 1
        num_trades = pred_df['Signal'].abs().sum()
        
        results = {
            'Cumulative Return (%)': cumulative_return * 100,
            'Number of Trades': num_trades
        }
        
        print(f"\nBacktest Results for {model_name}:")
        print(f"Cumulative Return: {results['Cumulative Return (%)']:.2f}%")
        print(f"Number of Trades: {results['Number of Trades']}")
        
        return results, pred_df
    except Exception as e:
        print(f"Error in backtesting: {e}")
        return None, None

In [None]:
def walk_forward_validation(data, forecast_horizon=7, n_folds=5):
    """
    Perform walk-forward validation for ARIMA and Prophet models.
    """
    arima_metrics_list = []
    prophet_metrics_list = []
    step_size = forecast_horizon
    
    for i in range(n_folds):
        train_end = len(data) - (n_folds - i) * forecast_horizon
        if train_end <= forecast_horizon:
            continue
        
        train_data = data.iloc[:train_end]
        test_data = data.iloc[train_end:train_end + forecast_horizon]['Close']
        
        if len(test_data) != forecast_horizon:
            continue
        
        # ARIMA
        arima_model = auto_arima(train_data['Close'], seasonal=False, max_p=5, max_q=5, max_d=2,
                                stepwise=True, error_action='ignore')
        arima_fit = ARIMA(train_data['Close'], order=arima_model.order).fit()
        arima_pred = arima_fit.forecast(steps=forecast_horizon)
        arima_metrics = calculate_metrics(test_data, arima_pred)
        arima_metrics_list.append(arima_metrics)
        
        # Prophet
        prophet_df = train_data.reset_index()[['Date', 'Close', 'Diluted EPS', 'Free Cash Flow', 'Net Debt', 'EBITDA']]
        prophet_df = prophet_df.rename(columns={'Date': 'ds', 'Close': 'y'})
        prophet_model = Prophet(daily_seasonality=True, yearly_seasonality=True, weekly_seasonality=True,
                               changepoint_prior_scale=0.05)
        for regressor in ['Diluted EPS', 'Free Cash Flow', 'Net Debt', 'EBITDA']:
            prophet_model.add_regressor(regressor)
        prophet_model.fit(prophet_df)
        future = prophet_model.make_future_dataframe(periods=forecast_horizon)
        for regressor in ['Diluted EPS', 'Free Cash Flow', 'Net Debt', 'EBITDA']:
            future[regressor] = prophet_df[regressor].iloc[-1]
        prophet_pred_df = prophet_model.predict(future)
        prophet_pred = prophet_pred_df['yhat'].tail(forecast_horizon)
        prophet_metrics = calculate_metrics(test_data, prophet_pred)
        prophet_metrics_list.append(prophet_metrics)
    
    # Average metrics
    avg_metrics = {
        'ARIMA': {
            'RMSE': np.mean([m['RMSE'] for m in arima_metrics_list]) if arima_metrics_list else np.nan,
            'MAE': np.mean([m['MAE'] for m in arima_metrics_list]) if arima_metrics_list else np.nan,
            'MAPE': np.mean([m['MAPE'] for m in arima_metrics_list]) if arima_metrics_list else np.nan
        },
        'Prophet': {
            'RMSE': np.mean([m['RMSE'] for m in prophet_metrics_list]) if prophet_metrics_list else np.nan,
            'MAE': np.mean([m['MAE'] for m in prophet_metrics_list]) if prophet_metrics_list else np.nan,
            'MAPE': np.mean([m['MAPE'] for m in prophet_metrics_list]) if prophet_metrics_list else np.nan
        }
    }
    
    print("\nWalk-Forward Validation Results:")
    print(f"ARIMA Average Metrics: {avg_metrics['ARIMA']}")
    print(f"Prophet Average Metrics: {avg_metrics['Prophet']}")
    
    return avg_metrics

In [None]:
def plot_predictions(data, arima_forecast, prophet_forecast, prophet_forecast_df, backtest_arima_df, backtest_prophet_df, forecast_horizon=7):
    """
    Plot actual vs predicted prices and backtest signals for ARIMA and Prophet models.
    
    Args:
        data (pd.DataFrame): Historical data.
        arima_forecast (pd.Series): ARIMA forecast.
        prophet_forecast (pd.Series): Prophet forecast.
        prophet_forecast_df (pd.DataFrame): Full Prophet forecast with confidence intervals.
        backtest_arima_df (pd.DataFrame): Backtest results for ARIMA.
        backtest_prophet_df (pd.DataFrame): Backtest results for Prophet.
        forecast_horizon (int): Number of days forecasted.
    """
    last_date = data.index[-1]
    future_dates = [last_date + timedelta(days=i+1) for i in range(forecast_horizon)]
    
    plt.figure(figsize=(14, 8))
    
    # Plot historical and forecasted prices
    plt.plot(data.index, data['Close'], label='Historical Close', color='blue')
    if arima_forecast is not None:
        plt.plot(future_dates, arima_forecast, label='ARIMA Forecast', color='red', linestyle='--')
    if prophet_forecast is not None:
        plt.plot(future_dates, prophet_forecast, label='Prophet Forecast', color='green', linestyle='--')
        plt.fill_between(future_dates, 
                         prophet_forecast_df['yhat_lower'].tail(forecast_horizon), 
                         prophet_forecast_df['yhat_upper'].tail(forecast_horizon), 
                         color='green', alpha=0.1, label='Prophet Confidence Interval')
    
    # Plot backtest signals (buy/sell)
    if backtest_arima_df is not None:
        buy_signals = backtest_arima_df[backtest_arima_df['Signal'] == 1]
        sell_signals = backtest_arima_df[backtest_arima_df['Signal'] == -1]
        plt.scatter(buy_signals['Date'], buy_signals['Close'], color='green', marker='^', label='ARIMA Buy Signal', alpha=0.7)
        plt.scatter(sell_signals['Date'], sell_signals['Close'], color='red', marker='v', label='ARIMA Sell Signal', alpha=0.7)
    
    if backtest_prophet_df is not None:
        buy_signals = backtest_prophet_df[backtest_prophet_df['Signal'] == 1]
        sell_signals = backtest_prophet_df[backtest_prophet_df['Signal'] == -1]
        plt.scatter(buy_signals['Date'], buy_signals['Close'], color='lime', marker='^', label='Prophet Buy Signal', alpha=0.7)
        plt.scatter(sell_signals['Date'], sell_signals['Close'], color='darkred', marker='v', label='Prophet Sell Signal', alpha=0.7)
    
    plt.title(f'AAPL Stock Price Prediction and Backtest Signals (7-Day Forecast)')
    plt.xlabel('Date')
    plt.ylabel('Close Price (USD)')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.savefig('stock_price_predictions_with_backtest.png')
    plt.show()

In [None]:
# Parameters
ticker = pd.read_csv('cleaned_stock_data.csv')['Ticker'].unique()[0] 
forecast_horizon = 7
stock_csv = 'cleaned_stock_data.csv'
balance_csv = 'balance_sheet_AAPL.csv'
cashflow_csv = 'cash_flow_AAPL.csv'
income_csv = 'income_statement_AAPL.csv'

# Load and prepare data
data = load_and_prepare_data(stock_csv, balance_csv, cashflow_csv, income_csv, ticker)

# ARIMA forecast with tuning
arima_forecast, arima_metrics, arima_order = arima_forecast(data['Close'], forecast_horizon)

# Prophet forecast with tuning
prophet_forecast, prophet_metrics, prophet_forecast_df, prophet_scale = tune_prophet(data, forecast_horizon)

# Walk-forward validation
avg_metrics = walk_forward_validation(data, forecast_horizon)

# Backtesting
# For backtesting, generate predictions for historical data
if len(data) >= forecast_horizon:
    # ARIMA historical predictions
    arima_model = auto_arima(data['Close'][:-forecast_horizon], seasonal=False, max_p=5, max_q=5, max_d=2)
    arima_fit = ARIMA(data['Close'][:-forecast_horizon], order=arima_model.order).fit()
    arima_hist_pred = arima_fit.predict(start=0, end=len(data)-forecast_horizon-1)
    arima_backtest_results, arima_backtest_df = backtest_strategy(data.iloc[:-forecast_horizon], arima_hist_pred, 'ARIMA')
    
    # Prophet historical predictions
    prophet_df = data.iloc[:-forecast_horizon].reset_index()[['Date', 'Close', 'Diluted EPS', 'Free Cash Flow', 'Net Debt', 'EBITDA']]
    prophet_df = prophet_df.rename(columns={'Date': 'ds', 'Close': 'y'})
    prophet_model = Prophet(daily_seasonality=True, yearly_seasonality=True, weekly_seasonality=True,
                            changepoint_prior_scale=prophet_scale)
    for regressor in ['Diluted EPS', 'Free Cash Flow', 'Net Debt', 'EBITDA']:
        prophet_model.add_regressor(regressor)
    prophet_model.fit(prophet_df)
    prophet_hist_pred = prophet_model.predict(prophet_df)['yhat']
    prophet_backtest_results, prophet_backtest_df = backtest_strategy(data.iloc[:-forecast_horizon], prophet_hist_pred, 'Prophet')
else:
    arima_backtest_results, arima_backtest_df = None, None
    prophet_backtest_results, prophet_backtest_df = None, None
    print("Insufficient data for backtesting.")

# Plot predictions and backtest signals
plot_predictions(data, arima_forecast, prophet_forecast, prophet_forecast_df, 
                    arima_backtest_df, prophet_backtest_df, forecast_horizon)

# Save predictions
if arima_forecast is not None and prophet_forecast is not None:
    future_dates = [data.index[-1] + timedelta(days=i+1) for i in range(forecast_horizon)]
    pred_df = pd.DataFrame({
        'Date': future_dates,
        'ARIMA_Prediction': arima_forecast,
        'Prophet_Prediction': prophet_forecast
    })
    pred_df.to_csv('stock_price_predictions.csv', index=False)
    print("\nSaved predictions to stock_price_predictions.csv")

# Save metrics
metrics_df = pd.DataFrame({
    'Model': ['ARIMA', 'Prophet'],
    'RMSE': [arima_metrics.get('RMSE', np.nan), prophet_metrics.get('RMSE', np.nan)],
    'MAE': [arima_metrics.get('MAE', np.nan), prophet_metrics.get('MAE', np.nan)],
    'MAPE': [arima_metrics.get('MAPE', np.nan), prophet_metrics.get('MAPE', np.nan)],
    'Best Parameters': [f"Order: {arima_order}", f"changepoint_prior_scale: {prophet_scale}"]
})
metrics_df.to_csv('model_metrics.csv', index=False)
print("Saved metrics to model_metrics.csv")

# Save backtest results
if arima_backtest_results and prophet_backtest_results:
    backtest_df = pd.DataFrame({
        'Model': ['ARIMA', 'Prophet'],
        'Cumulative Return (%)': [arima_backtest_results['Cumulative Return (%)'], 
                                    prophet_backtest_results['Cumulative Return (%)']],
        'Number of Trades': [arima_backtest_results['Number of Trades'], 
                            prophet_backtest_results['Number of Trades']]
    })
    backtest_df.to_csv('backtest_results.csv', index=False)
    print("Saved backtest results to backtest_results.csv")