# Wyznaczanie optymalnych p i q

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from arch import arch_model
from sklearn.metrics import mean_squared_error, mean_absolute_error
import pickle

In [None]:
def load_data(dataset, return_scaling=100):

    df = pd.read_csv(dataset, parse_dates=["timestamp"])
    df = df.sort_values("timestamp").set_index("timestamp")

    df['returns'] = return_scaling * np.log(df['close'] / df['close'].shift(1))
    df = df.dropna()

    return df

In [None]:
def evaluate_garch(p, q, train_returns, test_returns):

    print(f"Testing GARCH({p},{q})...")
    rolling_predictions = []
    history = train_returns.copy()

    try:
        for i in range(len(test_returns)):
            model = arch_model(history, vol='GARCH', p=p, q=q)
            model_fit = model.fit(disp='off', update_freq=0)
            forecast = model_fit.forecast(horizon=1)
            rolling_predictions.append(np.sqrt(forecast.variance.values[-1, 0]))
            history = pd.concat([history, test_returns.iloc[i:i+1]])

        actual = np.abs(test_returns)
        predicted = np.array(rolling_predictions)

        rmse = np.sqrt(mean_squared_error(actual, predicted))
        mae = mean_absolute_error(actual, predicted)
        mape = np.mean(np.abs((actual - predicted) / actual)) * 100

        final_model = arch_model(train_returns, vol='GARCH', p=p, q=q)
        final_fit = final_model.fit(disp='off')
        aic = final_fit.aic
        bic = final_fit.bic

        return {'p': p, 'q': q, 'AIC': aic, 'BIC': bic, 'RMSE': rmse, 'MAE': mae, 'MAPE': mape}

    except Exception as e:
        print(f"Error for ({p},{q}): {e}")
        return {'p': p, 'q': q, 'AIC': np.inf, 'BIC': np.inf, 'RMSE': np.inf, 'MAE': np.inf, 'MAPE': np.inf}

In [None]:
def find_best_garch(train_returns, test_returns, max_p, max_q, dataset_name):

    p_values = range(1, max_p+1)
    q_values = range(1, max_q+1)

    results = []
    for p in p_values:
        for q in q_values:
            result = evaluate_garch(p, q, train_returns, test_returns)
            results.append(result)

    results_df = pd.DataFrame(results).sort_values(by='RMSE')
    print("\n** Model comparison results:")
    print(results_df[['p', 'q', 'RMSE']])

    results_file=f'{dataset_name}_garch_results.csv'

    results_df.to_csv(results_file, index=False)
    print(f"Results saved: {results_file}")

    best = results_df.iloc[0]
    p_best, q_best = int(best['p']), int(best['q'])
    print(f"\nBest GARCH model for {dataset_name}: ({p_best},{q_best})\nRMSE = {best['RMSE']:.4f}\nMAE = {best['MAE']:.4f}\nMAPE = {best['MAPE']:.2f}%")

    return (p_best, q_best), results_df

In [None]:
def save_model(model, filename):
    with open(filename, 'wb') as f:
        pickle.dump(model, f)

def load_model(filename):
    with open(filename, 'rb') as f:
        return pickle.load(f)

In [None]:
def comparison_plot(test_data, predictions, model_info):
    plt.figure(figsize=(12, 6))
    plt.plot(test_data.index, np.abs(test_data['returns']),
             label="Actual", alpha=0.7)
    plt.plot(test_data.index, predictions, linestyle="--",
             label=f"GARCH({model_info['p']},{model_info['q']}) forecast",
             color='orange')
    plt.title(f"Volatility comparison")
    plt.legend()
    plt.grid()
    plt.tight_layout()
    plt.show()

In [None]:
def main(dataset, train_end_date, max_p, max_q):
    dataset_name = dataset.split('_')[0].lower()
    df = load_data(dataset)
    train = df[df.index < train_end_date]
    test = df[df.index >= train_end_date].copy()
    test_returns = test['returns']
    
    (p_best, q_best), results_df = find_best_garch(
        train['returns'], test_returns, max_p, max_q, dataset_name)
    
    final_model = arch_model(train['returns'], vol='GARCH', p=p_best, q=q_best)
    final_fit = final_model.fit(disp='off')
    
    predictions = []
    history = train['returns'].copy()

    for i in range(len(test_returns)):
        model = arch_model(history, vol='GARCH', p=p_best, q=q_best)
        model_fit = model.fit(disp='off', update_freq=0)
        forecast = model_fit.forecast(horizon=1)
        predictions.append(np.sqrt(forecast.variance.values[-1, 0]))
        history = pd.concat([history, test_returns.iloc[i:i+1]])
        
    test['predicted_volatility'] = predictions

    best_model_info = results_df.iloc[0].to_dict()
    comparison_plot(test, predictions, best_model_info)
    
    save_model(final_fit, f'{dataset_name}_garch_model_p{p_best}_q{q_best}.pkl')    

    return final_fit, test

In [None]:
datasets = ["ETH_data.csv", "SPY_data.csv", "AAPL_data.csv", "BTC_data.csv"]
train_end_date = "2025-01-01"
max_p=5
max_q=5

for dataset in datasets:
    final_model, test_results = main(dataset, train_end_date, max_p, max_q)

# Prognozowanie cen

In [None]:
import pandas as pd
import numpy as np
import os
from arch import arch_model
from statsmodels.tsa.arima.model import ARIMA
from sklearn.metrics import r2_score
from datetime import timedelta
import warnings
warnings.filterwarnings("ignore")

In [None]:
def load_data(filepath):
    df = pd.read_csv(filepath, parse_dates=['timestamp'])
    df = df.sort_values("timestamp").set_index("timestamp")
    df['close'] = pd.to_numeric(df['close'], errors='coerce')
    df['returns'] = 100 * np.log(df['close'] / df['close'].shift(1))
    return df.dropna()

In [None]:
def calculate_forecast(df, train_end_date, garch_p=1, garch_q=1, arima_p=1, arima_d=1, arima_q=1):
    train_df = df[df.index < train_end_date].copy()
    test_df = df[df.index >= train_end_date].copy()

    arima = ARIMA(train_df['close'], order=(arima_p, arima_d, arima_q)).fit(method_kwargs={"maxiter": 500})
    
    residuals = arima.resid
    garch = arch_model(residuals, vol='GARCH', p=garch_p, q=garch_q).fit(disp='off', update_freq=5)
    
    preds = []
    actuals = []
    dates = []

    current_series = train_df['close'].tolist()
    
    for date in test_df.index:
        try:
            arima_forecast = arima.forecast(steps=1).iloc[0]
            
            garch_vol = np.sqrt(garch.forecast(horizon=1).variance.iloc[-1,0])
            
            combined_forecast = arima_forecast + np.random.normal(0, garch_vol)
            
            preds.append(combined_forecast)
            actuals.append(df.loc[date, 'close'])
            dates.append(date)
            
            current_series.append(combined_forecast)
            
        except Exception as e:
            print(f"Error for date {date}: {str(e)}")
            continue
    
    return pd.DataFrame({
        'date': dates,
        'actual': actuals,
        'predicted': preds
    }).set_index('date')


def calculate_metrics(pred_df, horizon_days):
    if len(pred_df) < horizon_days:
        return None
    sub = pred_df.iloc[:horizon_days]
    actual = sub['actual'].values
    predicted = sub['predicted'].values
    mape = np.mean(np.abs((actual - predicted) / actual)) * 100
    r2 = r2_score(actual, predicted)
    return predicted, mape, r2

In [None]:
def save_results(pred_df, name):
    if name.lower() == 'aapl':
        range_map = {
            "1_tydzien": 16*7,
            "2_tygodnie": 16*14,
            "3_tygodnie": 16*21,
            "1_miesiac": 320,
            "2_miesiace": 640,
            "3_miesiace": len(pred_df)
        }
    elif name.lower() in ['btc', 'eth']:
        range_map = {
            "1_tydzien": 24*7,
            "2_tygodnie": 24*14,
            "3_tygodnie": 24*21,
            "1_miesiac": 744,
            "2_miesiace": 2*744,
            "3_miesiace": len(pred_df)
        }
    elif name.lower() == 'spy':
        range_map = {
            "1_tydzien": 17*7,
            "2_tygodnie": 17*14,
            "3_tygodnie": 17*21,
            "1_miesiac": 340,
            "2_miesiace": 2*340,
            "3_miesiace": len(pred_df)
        }

    range_name_map = {
        "1_tydzien": "tyg",
        "2_tygodnie": "2tyg",
        "3_tygodnie": "3tyg",
        "1_miesiac": "msc",
        "2_miesiace": "2msc",
        "3_miesiace": "3msc"
    }

    range_name_map = {
        "1_tydzien": "tyg",
        "2_tygodnie": "2tyg",
        "3_tygodnie": "3tyg",
        "1_miesiac": "msc",
        "2_miesiace": "2msc",
        "3_miesiace": "3msc"
    }

    output_folder = f"wyniki_predykcji/{name.lower()}"
    os.makedirs(output_folder, exist_ok=True)

    for range_name, days in range_map.items():
        result = calculate_metrics(pred_df, days)
        if result is None:
            continue

        preds, mape, r2 = result
        okres = range_name_map[range_name]
        podmiot = name.lower()

        preds_filename = f"{podmiot}_{okres}_GARCH.txt"
        preds_filepath = os.path.join(output_folder, preds_filename)
        np.savetxt(preds_filepath, preds, fmt="%.6f")

        metrics_filename = f"{podmiot}_{okres}_GARCH_metrics.txt"
        metrics_filepath = os.path.join(output_folder, metrics_filename)
        with open(metrics_filepath, "w") as f:
            f.write(f"{mape:.4f}\n")
            f.write(f"{r2:.4f}\n")

In [None]:
assets_params = {
    'AAPL': {'garch_p': 2, 'garch_q': 3},
    'SPY': {'garch_p': 4, 'garch_q': 5},
    'ETH': {'garch_p': 4, 'garch_q': 2},
    'BTC': {'garch_p': 4, 'garch_q': 4}
}

for asset, params in assets_params.items():
    fname = f"{asset}_data.csv"
    try:
        data = load_data(fname)
        pred_df = calculate_forecast(data, train_end_date="2025-01-01", 
                                    garch_p=params['garch_p'], 
                                    garch_q=params['garch_q'])
        save_results(pred_df, asset)
        print(f"Przetworzono {asset} pomyślnie")
    except Exception as e:
        print(f"Błąd podczas przetwarzania {asset}: {str(e)}")