<a href="https://colab.research.google.com/github/dadashkarimi/CPSC/blob/master/Transformer_Indicator_Stocks_ipynb.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
import yfinance as yf
from datetime import datetime
import os

# Define indicator functions

def compute_RSI(data, window=14):
    delta = data.diff()
    gain = (delta.where(delta > 0, 0)).fillna(0)
    loss = (-delta.where(delta < 0, 0)).fillna(0)
    avg_gain = gain.rolling(window=window).mean()
    avg_loss = loss.rolling(window=window).mean()
    rs = avg_gain / avg_loss
    rsi = 100 - (100 / (1 + rs))
    return rsi

def EMA(data, period=14):
    return data.ewm(span=period, adjust=False).mean()

def SMA(data, period=14):
    return data.rolling(window=period).mean()

def BollingerBands(data, period=20, std_dev=2):
    sma = SMA(data, period)
    std = data.rolling(window=period).std()
    upper_band = sma + (std_dev * std)
    lower_band = sma - (std_dev * std)
    return upper_band, lower_band

def MACD(data, short_period=12, long_period=26, signal_period=9):
    short_ema = EMA(data, short_period)
    long_ema = EMA(data, long_period)
    macd = short_ema - long_ema
    signal = EMA(macd, signal_period)
    histogram = macd - signal
    return macd, signal, histogram

def ATR(high, low, close, period=14):
    tr1 = high - low
    tr2 = abs(high - close.shift(1))
    tr3 = abs(low - close.shift(1))
    tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
    return tr.rolling(window=period).mean()

def StochasticOscillator(close, high, low, period=14):
    L14 = low.rolling(window=period).min()
    H14 = high.rolling(window=period).max()
    K = 100 * ((close - L14) / (H14 - L14))
    D = K.rolling(window=3).mean()
    return K, D

def CCI(high, low, close, period=20):
    TP = (high + low + close) / 3
    sma = SMA(TP, period)
    mad = (TP - sma).abs().rolling(window=period).mean()
    return (TP - sma) / (0.015 * mad)

def save_checkpoint(model, optimizer, epoch, loss, file_path):
    torch.save({
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'epoch': epoch,
        'loss': loss
    }, file_path)

def load_checkpoint(file_path, model, optimizer):
    checkpoint = torch.load(file_path)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    epoch = checkpoint['epoch']
    loss = checkpoint['loss']
    return model, optimizer, epoch, loss

def ParabolicSAR(high, low, acceleration=0.02, max_acceleration=0.2):
    # Ensure high and low are Pandas Series
    if not isinstance(high, pd.Series) or not isinstance(low, pd.Series):
        raise TypeError("High and Low inputs must be Pandas Series.")

    # Check if columns are empty or contain all null values
    if high.isnull().all():
        raise ValueError("High column is empty or contains all null values.")
    if low.isnull().all():
        raise ValueError("Low column is empty or contains all null values.")

    # Handle NaN values
    high = high.fillna(method='ffill').fillna(method='bfill')
    low = low.fillna(method='ffill').fillna(method='bfill')

    sar = low.iloc[0]
    ep = high.iloc[0]  # Extreme point
    af = acceleration
    psar = [sar]

    for i in range(1, len(high)):
        new_sar = sar + af * (ep - sar)

        # Check for trend reversal
        if new_sar > high.iloc[i]:
            new_sar = high.iloc[i - 1]
            ep = low.iloc[i]
            af = acceleration
        elif new_sar < low.iloc[i]:
            new_sar = low.iloc[i - 1]
            ep = high.iloc[i]
            af = acceleration
        else:
            # Update extreme point and acceleration factor
            if high.iloc[i] > ep:
                ep = high.iloc[i]
                af = min(af + acceleration, max_acceleration)
            elif low.iloc[i] < ep:
                ep = low.iloc[i]
                af = min(af + acceleration, max_acceleration)

        psar.append(new_sar)
        sar = new_sar

    return pd.Series(psar, index=high.index)



def Ichimoku(high, low, period1=9, period2=26, period3=52):
    conversion = (high.rolling(period1).max() + low.rolling(period1).min()) / 2
    base = (high.rolling(period2).max() + low.rolling(period2).min()) / 2
    span_a = ((conversion + base) / 2).shift(period2)
    span_b = ((high.rolling(period3).max() + low.rolling(period3).min()) / 2).shift(period2)
    return conversion, base, span_a, span_b

def VWAP(close, volume):
    return (close * volume).cumsum() / volume.cumsum()

def Aroon(high, low, period=25):
    aroon_up = 100 * (period - high.rolling(period).apply(lambda x: x.argmax())) / period
    aroon_down = 100 * (period - low.rolling(period).apply(lambda x: x.argmin())) / period
    return aroon_up, aroon_down

def CMF(high, low, close, volume, period=20):
    money_flow_multiplier = ((close - low) - (high - close)) / (high - low)
    money_flow_volume = money_flow_multiplier * volume
    return money_flow_volume.rolling(window=period).sum() / volume.rolling(window=period).sum()

def WilliamsR(high, low, close, period=14):
    return ((high.rolling(period).max() - close) / (high.rolling(period).max() - low.rolling(period).min())) * -100

def ROC(data, period=12):
    return ((data - data.shift(period)) / data.shift(period)) * 100

def Momentum(data, period=14):
    return data - data.shift(period)

def FibonacciRetracement(high, low):
    diff = high - low
    return [high - diff * level for level in [0.236, 0.382, 0.5, 0.618, 0.786]]

def compute_indicators(data):
    required_columns = ['High', 'Low', 'Close', 'Volume']
    for col in required_columns:
        if col not in data:
            raise KeyError(f"Column '{col}' is missing from data.")

    indicators = {}
    indicators['RSI'] = compute_RSI(data['Close'])
    indicators['EMA_20'] = EMA(data['Close'], period=20)
    indicators['Bollinger_Upper'], indicators['Bollinger_Lower'] = BollingerBands(data['Close'], period=20, std_dev=2)
    macd, signal, histogram = MACD(data['Close'])
    indicators['MACD'] = macd
    indicators['Signal'] = signal
    indicators['Histogram'] = histogram
    indicators['Stochastic_K'], indicators['Stochastic_D'] = StochasticOscillator(data['Close'], data['High'], data['Low'])
    indicators['ATR'] = ATR(data['High'], data['Low'], data['Close'])
    indicators['CCI'] = CCI(data['High'], data['Low'], data['Close'])
    # indicators['Parabolic_SAR'] = ParabolicSAR(data['High'], data['Low'])
    indicators['VWAP'] = VWAP(data['Close'], data['Volume'])
    indicators['Aroon_Up'], indicators['Aroon_Down'] = Aroon(data['High'], data['Low'])
    indicators['CMF'] = CMF(data['High'], data['Low'], data['Close'], data['Volume'])
    indicators['Williams_R'] = WilliamsR(data['High'], data['Low'], data['Close'])
    indicators['ROC'] = ROC(data['Close'])
    indicators['Momentum'] = Momentum(data['Close'])
    return indicators

class BitcoinDataset(Dataset):
    def __init__(self, data, seq_len, n_days):
        self.data = data
        self.seq_len = seq_len
        self.n_days = n_days

    def __len__(self):
        return len(self.data) - self.seq_len - self.n_days + 1

    def __getitem__(self, idx):
        x = self.data[idx:idx + self.seq_len, :-1].astype(np.float32)
        y = self.data[idx + self.seq_len: idx + self.seq_len + self.n_days, -1].astype(np.int64)
        return x, y

class TransformerModel(nn.Module):
    def __init__(self, input_dim, num_classes, n_days, d_model=128, nhead=4, num_layers=3, dim_feedforward=256, dropout=0.1):
        super(TransformerModel, self).__init__()
        self.input_projection = nn.Linear(input_dim, d_model)
        self.positional_encoding = nn.Parameter(torch.randn(1, 1000, d_model))
        self.transformer = nn.Transformer(
            d_model=d_model,
            nhead=nhead,
            num_encoder_layers=num_layers,
            num_decoder_layers=num_layers,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
            batch_first=True
        )
        self.fc = nn.Linear(d_model, num_classes * n_days)
        self.n_days = n_days
        self.num_classes = num_classes

    def forward(self, x):
        seq_len = x.size(1)
        x = self.input_projection(x) + self.positional_encoding[:, :seq_len, :]
        transformer_out = self.transformer(x, x)
        out = self.fc(transformer_out[:, -1, :])
        return out.view(-1, self.n_days, self.num_classes)

def preprocess_data(data_dict, target_symbol, bins):
    for symbol in data_dict.keys():
        stock_data = data_dict[symbol]
        indicators = compute_indicators(stock_data)
        for key, value in indicators.items():
            stock_data[key] = value

    combined_df = pd.concat(
        [data[['Close', 'RSI', 'EMA_20', 'Bollinger_Upper', 'Bollinger_Lower', 'MACD', 'Signal', 'Histogram', 'Stochastic_K', 'Stochastic_D', 'ATR', 'CCI', 'VWAP', 'Aroon_Up', 'Aroon_Down', 'CMF', 'Williams_R', 'ROC', 'Momentum']]
         for data in data_dict.values()], axis=1, keys=data_dict.keys())

    combined_df.columns = ['{}_{}'.format(symbol, col) for symbol in data_dict.keys()
                           for col in ['Close', 'RSI', 'EMA_20', 'Bollinger_Upper', 'Bollinger_Lower', 'MACD', 'Signal', 'Histogram', 'Stochastic_K', 'Stochastic_D', 'ATR', 'CCI', 'VWAP', 'Aroon_Up', 'Aroon_Down', 'CMF', 'Williams_R', 'ROC', 'Momentum']]

    combined_df = assign_classes(combined_df, f'{target_symbol}_Close', bins)

    scaler = MinMaxScaler()
    scaled_features = scaler.fit_transform(combined_df.iloc[:, :-1])

    combined_data = np.hstack((scaled_features, combined_df['Class'].values.reshape(-1, 1)))
    return combined_data, list(combined_df.columns), scaler

def assign_classes(data, price_column, bins):
    pct_change = (data[price_column].diff() / data[price_column].shift(1)) * 100
    classes = np.digitize(pct_change, bins) - 3
    classes = classes + 2
    data['Class'] = classes
    data.dropna(inplace=True)
    return data

if __name__ == "__main__":
    symbols = [
        "BTC-USD", "AAPL", "MSFT", "NVDA", "TSLA", "GOOGL", "AMZN", "META", "BRK-B", "JPM",
        "V", "PG", "UNH", "HD", "MA", "DIS", "ADBE", "CRM", "PYPL", "NFLX",
        "INTC", "PEP", "KO", "NKE", "MCD", "WMT", "CSCO", "QCOM", "ORCL", "ABBV",
        "T", "XOM", "CVX", "VZ", "PFE", "MRK", "LLY", "BMY", "COST", "DHR",
        "TMO", "HON", "IBM", "AMD", "BA", "CAT", "SPGI", "GS", "MS", "GOLD"
    ]
    target_symbol = "BTC-USD"
    model_file = "drive/MyDrive/Transformer/bitcoin_transformer_indicator_model.pth"
    seq_len = 30
    hidden_dim = 128
    num_layers = 5
    num_classes = 5
    n_days = 7
    batch_size = 32
    learning_rate = 1e-4
    num_epochs = 3000

    print("Fetching data for cryptocurrencies and stocks...")
    stock_data = {symbol: yf.download(symbol, start="2020-01-01", end=datetime.today().strftime('%Y-%m-%d')) for symbol in symbols}

    bins = [-np.inf, -2, -0.5, 0.5, 2, np.inf]

    print("Preprocessing data...")
    data, features, scaler = preprocess_data(stock_data, target_symbol, bins)

    train_data, test_data = train_test_split(data, test_size=0.2, shuffle=False)

    train_dataset = BitcoinDataset(train_data, seq_len=seq_len, n_days=n_days)
    test_dataset = BitcoinDataset(test_data, seq_len=seq_len, n_days=n_days)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    model = TransformerModel(
        input_dim=len(features) - 1,  # Updated for additional indicators
        num_classes=num_classes,
        n_days=n_days,
        d_model=128,
        nhead=4,
        num_layers=num_layers,
        dim_feedforward=256
    ).to(device)

    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss()

    if os.path.exists(model_file):
        print(f"Loading model from {model_file}...")
        model, optimizer, start_epoch, best_loss = load_checkpoint(model_file, model, optimizer)
    else:
        print("No saved model found. Starting fresh.")
        start_epoch = 0
        best_loss = float('inf')

    for epoch in range(start_epoch, num_epochs):
        model.train()
        total_loss = 0
        for x, y in train_loader:
            x, y = x.to(device), y.to(device)
            optimizer.zero_grad()
            outputs = model(x)
            loss = 0
            for day in range(n_days):
                loss += criterion(outputs[:, day, :], y[:, day])
            loss /= n_days
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        avg_loss = total_loss / len(train_loader)
        print(f"Epoch [{epoch+1}/{num_epochs}] Loss: {avg_loss:.4f}")

        if avg_loss < best_loss:
            best_loss = avg_loss
            save_checkpoint(model, optimizer, epoch, best_loss, model_file)

    print("Training complete.")

Fetching data for cryptocurrencies and stocks...


[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%********

Preprocessing data...


  data['Class'] = classes


Loading model from drive/MyDrive/Transformer/bitcoin_transformer_indicator_model.pth...


  checkpoint = torch.load(file_path)


RuntimeError: Attempting to deserialize object on a CUDA device but torch.cuda.is_available() is False. If you are running on a CPU-only machine, please use torch.load with map_location=torch.device('cpu') to map your storages to the CPU.

In [2]:
import torch

if torch.cuda.is_available():
    print(f"CUDA is available! GPU count: {torch.cuda.device_count()}")
    print(f"Current GPU: {torch.cuda.get_device_name(0)}")
else:
    print("CUDA is not available.")


CUDA is not available.


In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import datetime, timedelta
import torch
import numpy as np

# Predict future trends using the trained model
print("Predicting future trends...")
model.eval()
with torch.no_grad():
    # Use the last sequence from the dataset for future predictions
    recent_data = torch.tensor(data[-seq_len:, :-1].astype(np.float32)).unsqueeze(0).to(device)
    outputs = model(recent_data)  # Shape: [1, n_days, num_classes]
    predicted_classes = torch.argmax(outputs, dim=2).squeeze(0).cpu().numpy()

    # Map predictions back to percentage changes
    class_to_pct_change = {0: -5, 1: -2, 2: 0, 3: 2, 4: 5}
    predicted_pct_changes = [class_to_pct_change[p] for p in predicted_classes]

    # Simulate future prices based on recent price and predicted percentage changes
    recent_prices = scaler.inverse_transform(data[-seq_len:, :-1])[:, 0]  # Reverse scaling for the target feature
    last_price = recent_prices[-1]
    future_prices = [last_price]

    for pct_change in predicted_pct_changes:
        next_price = future_prices[-1] * (1 + pct_change / 100)
        future_prices.append(next_price)

    future_prices = np.array(future_prices[1:])  # Remove the first value (duplicate of last_price)

# Generate dates for recent and future prices
seq_len = len(recent_prices)
n_days = len(predicted_pct_changes)
today = datetime.today()
recent_dates = [today - timedelta(days=seq_len - i) for i in range(seq_len)]
future_dates = [recent_dates[-1] + timedelta(days=i + 1) for i in range(n_days)]

# Enhanced visualization
plt.figure(figsize=(14, 8))
plt.plot(recent_dates, recent_prices, label="Recent Prices", color="#1f77b4", linewidth=2)
plt.plot(future_dates, future_prices, label="Predicted Prices", color="#ff7f0e", linestyle="--", linewidth=2)

# Highlight today's date
today_price = recent_prices[-1]
plt.scatter([today], [today_price], color="red", label="Today", zorder=5)
plt.text(today, today_price, f" {today_price:.2f} USD", fontsize=10, color="red", weight="bold")

# Add grid and format
plt.grid(which='major', linestyle='--', linewidth=0.5, alpha=0.7)
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%b %d'))  # Format x-axis as "Month Day"
plt.gca().xaxis.set_major_locator(mdates.DayLocator(interval=1))    # Show ticks for every day
plt.xticks(rotation=45, fontsize=10)
plt.yticks(fontsize=10)

# Add title and labels
plt.title("Bitcoin Price Prediction", fontsize=16, weight='bold', pad=20)
plt.xlabel("Date", fontsize=12, labelpad=10)
plt.ylabel("Price (USD)", fontsize=12, labelpad=10)

# Add legend
plt.legend(fontsize=10, loc="upper left")

# Add shaded region for the prediction
plt.axvspan(future_dates[0], future_dates[-1], color="orange", alpha=0.1, label="Prediction Period")

# Final layout adjustments and display
plt.tight_layout()
plt.show()

Predicting future trends...


NameError: name 'model' is not defined