In [None]:
from datetime import timedelta
from sklearn.metrics import mean_squared_error, mean_absolute_error
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn

In [None]:
df = pd.read_csv("GC_in_daily_new.csv")
print(df.columns)
df.head()

In [None]:
df = df[['date', 'symbol', 'open', 'high', 'low', 'close', 'volume']]
df['date'] = pd.to_datetime(df['date'])
df = df.sort_values('date')
df.head()

In [None]:
events = [
    {'date': '2008-09-15', 'name': 'Lehman Brothers', 'color': 'brown'},
    {'date': '2011-08-05', 'name': 'US Debt Downgrade\nGold $1,900', 'color': 'darkorange'},
    {'date': '2013-04-15', 'name': 'Gold Crash 2013', 'color': 'darkblue'},
    {'date': '2020-03-23', 'name': 'COVID-19 Crash\n& Stimulus', 'color': 'red'},
    {'date': '2022-02-24', 'name': 'Russia-Ukraine War', 'color': 'darkred'},
]

for event in events:
    event['date'] = pd.to_datetime(event['date'])

In [None]:
def is_position_available(new_y, existing_y_positions, threshold=0.06):
    """Check if new_y is too close to any existing y positions"""
    for existing_y in existing_y_positions:
        if abs(new_y - existing_y) / y_range < threshold:
            return False
    return True

In [None]:
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(20, 14))

# Plot 1: Normal scale with events
ax1.plot(df['date'], df['close'], color='gold', linewidth=2, zorder=1)
ax1.fill_between(df['date'], df['close'], alpha=0.2, color='gold')

y_min, y_max = df['close'].min(), df['close'].max()
y_range = y_max - y_min

event_positions = []
for event in events:
    if event['date'] >= df['date'].min() and event['date'] <= df['date'].max():
        event_idx = (df['date'] - event['date']).abs().idxmin()
        event_price = df.loc[event_idx, 'close']
        event_positions.append((event, event_price))

event_positions.sort(key=lambda x: x[0]['date'])

In [None]:
used_positions = []
annotation_data = []

for i, (event, event_price) in enumerate(event_positions):
    positions_to_try = [
        event_price * 0.85,  
        event_price * 0.75,  
        event_price * 1.15, 
        y_max * 0.3,        
        y_max * 0.7,       
        y_min * 1.5,         
    ]
    
    found_position = False
    for pos in positions_to_try:
        pos = max(y_min * 1.1, min(y_max * 0.95, pos))
        
        if is_position_available(pos, used_positions, threshold=0.06):
            used_positions.append(pos)
            annotation_data.append((event, event_price, pos))
            found_position = True
            break
    
    if not found_position:
        pos = y_min + (y_range * (i % 5 + 1) / 6)
        used_positions.append(pos)
        annotation_data.append((event, event_price, pos))

days_offset = timedelta(days=150)

In [None]:
for event, event_price, text_y_pos in annotation_data:
    ax1.axvline(x=event['date'], color=event['color'], linestyle='--', 
               alpha=0.7, linewidth=1.2, zorder=2)
    
    va = 'center'
    if text_y_pos > event_price:
        va = 'bottom'
    elif text_y_pos < event_price:
        va = 'top'
    
    xytext_x = event['date'] - days_offset
    
    ax1.annotate(event['name'], 
                xy=(event['date'], event_price),
                xytext=(xytext_x, text_y_pos),
                fontsize=9,
                ha='right',  
                va=va,
                color=event['color'],
                bbox=dict(boxstyle='round,pad=0.4', facecolor='white', alpha=0.95,
                         edgecolor=event['color'], linewidth=1.2),
                arrowprops=dict(arrowstyle='->', color=event['color'], alpha=0.8,
                               connectionstyle="arc3,rad=-0.2",  
                               linewidth=1.2),
                zorder=3)

In [None]:
ax1.set_title('Gold Price History with Major Events (Normal Scale)', 
             fontsize=16, fontweight='bold', pad=20)
ax1.set_xlabel('Year', fontsize=13)
ax1.set_ylabel('Price in USD', fontsize=13)
ax1.grid(True, alpha=0.3, zorder=0)

In [None]:
# Plot 2: Log scale with SAME VISIBILITY vertical lines
ax2.plot(df['date'], df['close'], color='goldenrod', linewidth=2, zorder=1)

for price_level in [100, 500, 1000, 1500, 2000, 2500]:
    if price_level < y_max:
        ax2.axhline(y=price_level, color='gray', linestyle=':', alpha=0.3, linewidth=0.8)
        ax2.text(df['date'].max(), price_level, f' ${price_level:,}', 
                va='center', fontsize=9, color='gray', alpha=0.7)
        
for event in events:
    if event['date'] >= df['date'].min() and event['date'] <= df['date'].max():
        event_idx = (df['date'] - event['date']).abs().idxmin()
        event_price = df.loc[event_idx, 'close']
        
        ax2.axvline(x=event['date'], color=event['color'], linestyle='--', 
                   alpha=0.7, linewidth=1.2, zorder=2)  

        ax2.plot(event['date'], event_price, 'o', 
                color=event['color'], markersize=8, zorder=3,
                markeredgecolor='white', markeredgewidth=1)
        
ax2.set_title('Gold Price History with Major Events (Log Scale)', 
             fontsize=16, fontweight='bold', pad=20)
ax2.set_xlabel('Year', fontsize=13)
ax2.set_ylabel('Price in USD (Log Scale)', fontsize=13)
ax2.set_yscale('log')
ax2.grid(True, alpha=0.3, which='both', zorder=0)

In [None]:
for ax in [ax1, ax2]:
    ax.xaxis.set_major_formatter(plt.matplotlib.dates.DateFormatter('%Y'))
    ax.xaxis.set_major_locator(plt.matplotlib.dates.YearLocator(5))
    plt.setp(ax.get_xticklabels(), rotation=45, ha='right')

ax1.set_facecolor('white')
ax2.set_facecolor('white')

fig.suptitle('Gold Price: Impact of Major Historical Events (1975-Present)', 
            fontsize=18, fontweight='bold', y=0.98)
plt.tight_layout(rect=[0, 0.03, 1, 0.98]) 
plt.show()

In [None]:
df = df.dropna().reset_index(drop=True)
df = df[df['volume'] != 0]
print(len(df))
df.head()

In [None]:
feature_cols = ["open", "high", "low", "close", "volume"]
scaler = MinMaxScaler()
df_scaled = scaler.fit_transform(df[feature_cols])
df_scaled = pd.DataFrame(df_scaled, columns=feature_cols)
df_scaled["date"] = df["date"]

In [None]:
scaler = MinMaxScaler()
scaled_values = scaler.fit_transform(df[feature_cols])
df_scaled = df.copy()
df_scaled[feature_cols] = scaled_values

In [None]:
def create_window_data(df, window=30):
    X, y, dates = [], [], []
    for i in range(len(df) - window):
        X.append(df.iloc[i:i+window][feature_cols].values)
        y.append(df.iloc[i+window]["close"])  
        dates.append(df.iloc[i+window]["date"])
    return np.array(X), np.array(y), np.array(dates)

In [None]:
window = 30
X, y, dates = create_window_data(df_scaled)

In [None]:
N = len(X)
train_end = int(0.7 * N)
val_end = int(0.85 * N)

X_train, y_train = X[:train_end], y[:train_end]
X_val, y_val = X[train_end:val_end], y[train_end:val_end]
X_test, y_test = X[val_end:], y[val_end:]
dates_test = dates[val_end:]

In [None]:
class PriceDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

train_loader = DataLoader(PriceDataset(X_train, y_train), batch_size=32, shuffle=True)
val_loader   = DataLoader(PriceDataset(X_val, y_val), batch_size=32, shuffle=False)
test_loader  = DataLoader(PriceDataset(X_test, y_test), batch_size=32, shuffle=False)

In [None]:
class LSTMModel(nn.Module):
    def __init__(self, input_size=5, hidden_size=64, num_layers=2):
        super().__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, 1)

    def forward(self, x):
        out, _ = self.lstm(x)
        out = out[:, -1, :]
        return self.fc(out).squeeze()
    
model = LSTMModel()

criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
train_losses = []
val_losses = []
epochs = 30

for epoch in range(epochs):
    model.train()
    train_loss = 0

    for Xb, yb in train_loader:
        optimizer.zero_grad()
        pred = model(Xb)
        loss = criterion(pred, yb)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()

    # validation
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for Xb, yb in val_loader:
            pred = model(Xb)
            loss = criterion(pred, yb)
            val_loss += loss.item()

    train_loss /= len(train_loader)
    val_loss /= len(val_loader)

    train_losses.append(train_loss)
    val_losses.append(val_loss)

    print(f"Epoch {epoch+1}/{epochs} | Train Loss: {train_loss:.6f} | Val Loss: {val_loss:.6f}")

In [None]:
plt.figure(figsize=(10,5))
plt.plot(train_losses, label="Train Loss")
plt.plot(val_losses, label="Validation Loss")
plt.title("Training vs Validation Loss")
plt.xlabel("Epoch")
plt.ylabel("MSE Loss")
plt.legend()
plt.grid()
plt.show()

In [None]:
model.eval()
pred_scaled = []

with torch.no_grad():
    for Xb, _ in test_loader:
        pred_scaled.append(model(Xb).numpy())

pred_scaled = np.concatenate(pred_scaled)
close_min = scaler.data_min_[3]
close_max = scaler.data_max_[3]
true_price = y_test * (close_max - close_min) + close_min
pred_price = pred_scaled * (close_max - close_min) + close_min


plt.figure(figsize=(12,6))
plt.plot(dates_test, true_price, label="True Price")
plt.plot(dates_test, pred_price, label="Predicted Price")
plt.title("Gold Price Prediction (Test Set)")
plt.xlabel("Date")
plt.ylabel("Gold Price (USD)")
plt.legend()
plt.grid()
plt.show()

In [None]:
def reverse_scale_close(scaled_values, scaler, col_idx=3):
    close_min = scaler.data_min_[col_idx]
    close_max = scaler.data_max_[col_idx]
    return scaled_values * (close_max - close_min) + close_min

In [None]:
def compute_metrics(y_true, y_pred):
    mse = mean_squared_error(y_true, y_pred)
    rmse = np.sqrt(mse)
    mae = mean_absolute_error(y_true, y_pred)
    mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100
    return mse, rmse, mae, mape

In [None]:
def predict(model, dataloader):
    model.eval()
    all_preds, all_true = [], []
    with torch.no_grad():
        for Xb, yb in dataloader:
            preds = model(Xb).numpy()
            all_preds.append(preds)
            all_true.append(yb.numpy())
    all_preds = np.concatenate(all_preds)
    all_true = np.concatenate(all_true)
    return all_true, all_preds


In [None]:
# Train
y_train_true, y_train_pred = predict(model, train_loader)
y_train_true_usd = reverse_scale_close(y_train_true, scaler)
y_train_pred_usd = reverse_scale_close(y_train_pred, scaler)
train_mse, train_rmse, train_mae, train_mape = compute_metrics(y_train_true_usd, y_train_pred_usd)

# Validation
y_val_true, y_val_pred = predict(model, val_loader)
y_val_true_usd = reverse_scale_close(y_val_true, scaler)
y_val_pred_usd = reverse_scale_close(y_val_pred, scaler)
val_mse, val_rmse, val_mae, val_mape = compute_metrics(y_val_true_usd, y_val_pred_usd)

# Test
y_test_true_usd = reverse_scale_close(y_test, scaler)
y_test_pred_usd = reverse_scale_close(pred_scaled, scaler)
test_mse, test_rmse, test_mae, test_mape = compute_metrics(y_test_true_usd, y_test_pred_usd)

# Print results
print("="*50)
print("TRAINING METRICS")
print(f"MSE: {train_mse:.2f}, RMSE: {train_rmse:.2f}, MAE: {train_mae:.2f}, MAPE: {train_mape:.2f}%")
print("VALIDATION METRICS")
print(f"MSE: {val_mse:.2f}, RMSE: {val_rmse:.2f}, MAE: {val_mae:.2f}, MAPE: {val_mape:.2f}%")
print("TEST METRICS")
print(f"MSE: {test_mse:.2f}, RMSE: {test_rmse:.2f}, MAE: {test_mae:.2f}, MAPE: {test_mape:.2f}%")
print("="*50)
