<a href="https://colab.research.google.com/github/IvaroEkel/AI-Spielplatz/blob/main/AlphaTransformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# !pip install torch numpy scipy statsmodels

In [1]:
import numpy as np
import pandas as pd
import random
import torch
import torch.nn as nn
import torch.optim as optim
from scipy.stats import norm
import matplotlib.pyplot as plt
# import matplotlib.pyplot as plt
from statsmodels.tsa.stattools import acf

In [2]:
class TimeSeriesEmbeddingAlpha:
    def __init__(self, time_series, dimension, lag):
        self.time_series = np.array(time_series, dtype=float)
        self.dimension = dimension
        self.lag = lag
        self.n_del_vec = len(self.time_series) - (dimension - 1) * lag
        self.delay_vectors = self.compute_delay_vectors()
        self.alpha_sequence = self.compute_alpha_sequence()

    def compute_delay_vectors(self):
        n = self.n_del_vec
        delay_vectors = np.zeros((n, self.dimension))
        for i in range(n):
            for j in range(self.dimension):
                delay_vectors[i, j] = self.time_series[i + j * self.lag]
        return delay_vectors

    def ordinal_rank_vectors(self):
        return np.apply_along_axis(lambda row: np.argsort(np.argsort(row)), 1, self.delay_vectors) + 1

    def alpha(self, ordinal_rank):
        n = len(ordinal_rank)
        lim = n // 2
        tau_n = ordinal_rank[:lim]
        tau_p = ordinal_rank[lim+1:] if n % 2 else ordinal_rank[lim:]
        return np.sum(tau_p) - np.sum(tau_n)

    def compute_alpha_sequence(self):
        ordinal_ranks = self.ordinal_rank_vectors()
        return np.array([self.alpha(ordinal_ranks[i, :]) for i in range(len(ordinal_ranks))], dtype=int)

In [9]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [10]:
# prompt: change to specific directory and set as working directory

import os

# Change directory
os.chdir('/content/drive/MyDrive/Colab Notebooks/AlphaTransformer/')

# Verify the current working directory
print(os.getcwd())

/content/drive/MyDrive/Colab Notebooks/AlphaTransformer


commodity prices: corn, wheat, rice, soybean, soybean oil, lumber, copper...

In [12]:
# load the column value from the file copper-prices-historical-chart-data in the current directory

import pandas as pd
print(os.getcwd())

# Try different encodings:
encodings_to_try = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1']  # Add other encodings if needed

for encoding in encodings_to_try:
    try:
        df = pd.read_csv('./data/corn-prices-historical-chart-data_clean-1.csv', encoding=encoding)
        df = pd.read_csv('./data/corn-prices-historical-chart-data_clean-1.csv', encoding=encoding)
        df.dropna(inplace=True)
        print(df)
        seq = df['value']
        print(f"Successfully loaded data using encoding: {encoding}")
        break  # Stop if successful
    except UnicodeDecodeError:
        print(f"Failed to load data using encoding: {encoding}")

# remove the nan values form the seq list

seq_array = seq.to_numpy()  # Convert to NumPy array
seq_array

/content/drive/MyDrive/Colab Notebooks/AlphaTransformer
             date   value
0      1959-07-01  1.1770
1      1959-07-02  1.1760
2      1959-07-06  1.1710
3      1959-07-07  1.1710
4      1959-07-08  1.1700
...           ...     ...
16579  2025-03-12  4.4925
16580  2025-03-13  4.5500
16581  2025-03-14  4.4325
16582  2025-03-17  4.6050
16583  2025-03-18  4.6050

[16584 rows x 2 columns]
Successfully loaded data using encoding: utf-8


array([1.177 , 1.176 , 1.171 , ..., 4.4325, 4.605 , 4.605 ])

In [None]:
import numpy as np

def divide_into_subsequences(seq_array, subseq_length):
    """Divides a sequence into disjoint subsequences of specified length.

    Args:
        seq_array: The input sequence as a NumPy array.
        subseq_length: The desired length of each subsequence.

    Returns:
        A list of NumPy arrays, each representing a disjoint subsequence.
    """

    num_subsequences = len(seq_array) // subseq_length  # Calculate number of subsequences
    subsequences = []
    for i in range(num_subsequences):
        start_index = i * subseq_length
        end_index = (i + 1) * subseq_length
        subsequence = seq_array[start_index:end_index]
        subsequences.append(subsequence)

    return subsequences
sequence_length = 128  # Length
# Example usage:
# Assuming you have seq_array and subseq_length defined
subsequences = divide_into_subsequences(seq_array, subseq_length=sequence_length)  # Example subseq_length
len(subsequences)
# Now subsequences is a list of NumPy arrays, each of length 64
# You can access them like this:
# first_subsequence = subsequences[0]
# second_subsequence = subsequences[1]
# ... and so on

In [None]:
# Apply TimeSeriesEmbeddingAlpha to each subsequence
dimension = 16
lag = 1
alpha_sequences = []
for subsequence in subsequences:
    embedding = TimeSeriesEmbeddingAlpha(subsequence, dimension, lag)
    alpha_sequences.append(embedding.alpha_sequence)

alpha_sequences = np.array(alpha_sequences)
print(len(alpha_sequences))

Brownian paths

In [None]:
# Generate  Brownian motion paths and compute their alpha sequences
np.random.seed(42)
num_samples = 20000
sequence_length = 64  # Length
dimension = 8
lag = 1

alpha_sequences = []

for _ in range(num_samples):
    time_series_brownian = np.cumsum(np.random.normal(0, 1, sequence_length))
    embedding = TimeSeriesEmbeddingAlpha(time_series, dimension, lag)
    alpha_sequences.append(embedding.alpha_sequence)


logistic map trajectories

In [None]:
# Generate  Brownian motion paths and compute their alpha sequences
np.random.seed(42)
num_samples = 20000
sequence_length = 64  # Length
dimension = 8
lag = 1

alpha_sequences = []


for _ in range(num_samples):
    time_series = [random.uniform(0, 1)]  # Random initial value
    for _ in range(sequence_length - 1): time_series.append(4 * time_series[-1] * (1 - time_series[-1]))
    embedding = TimeSeriesEmbeddingAlpha(time_series, dimension, lag)
    alpha_sequences.append(embedding.alpha_sequence)

alpha_sequences = np.array(alpha_sequences)

Get tokens

In [None]:

# Get unique values (sorted) and create a discrete token map
unique_values = np.sort(np.unique(alpha_sequences))
token_map = {value: idx for idx, value in enumerate(unique_values)}
inverse_token_map = {idx: value for value, idx in token_map.items()}

# Convert sequences to token indices
alpha_sequences_tokenized = np.vectorize(token_map.get)(alpha_sequences)
vocab_size = len(token_map)  # Number of unique tokens

print(f"Unique Alpha Values: {unique_values}")
print(f"Token Mapping: {token_map}")
print(f"Vocabulary Size: {vocab_size}")
print(f"Example of tokenized sequence: {alpha_sequences_tokenized[0]}")

In [None]:
unique_values

In [None]:
token_map

# Define transformer model

In [None]:
class TransformerForSequences(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_heads, ff_hidden_dim, num_layers):
        super(TransformerForSequences, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)  # Convert token indices into embeddings
        self.transformer = nn.Transformer(
            d_model=embed_dim,
            nhead=num_heads,
            num_encoder_layers=num_layers,
            num_decoder_layers=num_layers,
            dim_feedforward=ff_hidden_dim,
            batch_first=True
        )
        self.fc_out = nn.Linear(embed_dim, vocab_size)  # Output probabilities for token indices

    def forward(self, src, tgt):
        src = self.embedding(src)
        tgt = self.embedding(tgt)
        output = self.transformer(src, tgt)
        output = self.fc_out(output)  # Shape: (batch, seq_len, vocab_size)
        return output

# Model parameters
embed_dim = 16
num_heads = 8
ff_hidden_dim = 16
num_layers = 8

# Create model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = TransformerForSequences(vocab_size, embed_dim, num_heads, ff_hidden_dim, num_layers).to(device)
print("Model initialized and moved to:", device)

In [None]:
# Convert tokenized sequences to PyTorch tensors
X_train = torch.tensor(alpha_sequences_tokenized[:, :-1], dtype=torch.long).to(device)  # Input sequences
y_train = torch.tensor(alpha_sequences_tokenized[:, 1:], dtype=torch.long).to(device)   # Target sequences

print(f"Training data shape: {X_train.shape}, {y_train.shape}")

In [None]:

import os
# Define model ID (use the same one used for saving)
# seriestype = "copper_prices-"
seriestype = "corn_prices-"
modelid = seriestype + f"A-{embed_dim}-{num_heads}-{ff_hidden_dim}-{num_layers}"

# Define base folder
base_folder = "/content/drive/MyDrive/Colab Notebooks/AlphaTransformer/"
model_name = f"transformer_alpha_model_{modelid}"
model_name

In [None]:

# Define base folder
base_folder = "/content/drive/MyDrive/"
project_folder = base_folder + "Colab Notebooks/AlphaTransformer/"
model_name = f"transformer_alpha_model_{modelid}"

# File paths
model_path = os.path.join(project_folder, f"{model_name}.pth")
params_json_path = os.path.join(project_folder, f"{model_name}_params.json")
params_txt_path = os.path.join(project_folder, f"{model_name}_params.txt")

In [None]:
# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
learn_rate = 0.0001
optimizer = optim.Adam(model.parameters(), lr=learn_rate) # good lr = 0.0001

losses = []
num_epochs = 10000
for epoch in range(num_epochs):
    model.train()
    optimizer.zero_grad()

    # Forward pass
    output = model(X_train, X_train)  # Output shape: (batch, seq_len, vocab_size)

    # Reshape for loss function: (batch*seq_len, vocab_size) vs. (batch*seq_len,)
    loss = criterion(output.view(-1, vocab_size), y_train.view(-1))

    # Backpropagation
    loss.backward()
    optimizer.step()
    losses.append(loss.item())

    if (epoch + 1) % 50 == 0:
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")
        # Save model weights
        torch.save(model.state_dict(), model_path)
        print(f"Model saved to {model_path}")

# Plot training loss
plt.plot(losses)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss')
plt.yscale('log')

#fig_path = os.path.join(base_folder, f"{model_name}_plot.png")
#plt.savefig(fig_path, dpi=200)
plt.show()

# Save model weights
torch.save(model.state_dict(), model_path)
print(f"Model saved to {model_path}")

In [None]:
8
import json
# Define model and training parameters
model_params = {
    "alpha embedd dim": dimension,
    "alpha lag": lag,
    "embed_dim": embed_dim,
    "num_heads": num_heads,
    "ff_hidden_dim": ff_hidden_dim,
    "num_layers": num_layers,
    "vocab_size": vocab_size,
    "sequence_length": sequence_length,
    "num_training_examples": len(alpha_sequences),
    "learning_rate": learn_rate,
    "num_epochs": num_epochs,
    "training_data_type": "megatrends_copper_prices",
    "description": "Transformer trained to predict alpha sequences from corn price series."
}

# Save as JSON
with open(params_json_path, "w") as json_file:
    json.dump(model_params, json_file, indent=4)

# Save as TXT
with open(params_txt_path, "w") as txt_file:
    for key, value in model_params.items():
        txt_file.write(f"{key}: {value}\n")

print(f"Model parameters saved as JSON: {params_json_path}")
print(f"Model parameters saved as TXT: {params_txt_path}")

In [None]:
## Generate a new Brownian motion path for testing
# test_series = np.cumsum(np.random.normal(0, 1, sequence_length*2))
## Generate a logistic map trajectory for testing
# test_series = [random.uniform(0, 1)]  # Random initial value
# for _ in range(sequence_length - 1): test_series.append(4 * test_series[-1] * (1 - test_series[-1]))
## get the last L values of seq_array for testing
# test_series = test_series[-(sequence_length//4):]
test_series = seq_array[-sequence_length:]
test_embedding = TimeSeriesEmbeddingAlpha(test_series, dimension, lag)

test_seq = test_embedding.alpha_sequence  # Extract alpha sequence

# Convert test sequence to token indices
test_seq_tokenized = np.vectorize(token_map.get)(test_seq)

# Convert to tensor
test_seq_tensor = torch.tensor(test_seq_tokenized[:-1], dtype=torch.long).unsqueeze(0).to(device)

# Predict next step
model.eval()
with torch.no_grad():
    predicted_logits = model(test_seq_tensor, test_seq_tensor)
    predicted_tokens = torch.argmax(predicted_logits, dim=-1)  # Get most probable token indices

# Convert predicted indices back to original signed values
predicted_tokens_signed = np.vectorize(inverse_token_map.get)(predicted_tokens.squeeze().cpu().numpy())

print("Input Alpha Sequence (Original):", test_seq[:-1])
print("Predicted Next Tokens (Restored Signed Values):", predicted_tokens_signed)

In [None]:
import time

# Generate a timestamp
timestamp = time.strftime("%Y%m%d-%H%M%S")  # Format: YYYYMMDD-HHMMSS

# Define save path for the figure with timestamp
fig_path = os.path.join(project_folder, f"{model_name}_plot_{timestamp}.png")

# Define save path for the figure
# fig_path = os.path.join(base_folder, f"{model_name}_plot.png")

In [None]:


# Plot original vs. predicted sequences
plt.figure(figsize=(10, 5))

# Original sequence (blue circles)
plt.plot(range(len(test_seq[:-1])), test_seq[:-1], 'bo-', label='Original Sequence', markersize=8)

# Predicted sequence (red triangles)
plt.plot(range(len(predicted_tokens_signed)), predicted_tokens_signed, 'r^-', label='Predicted Sequence', markersize=8)

# Labels & Legend
plt.xlabel("Time Step")
plt.ylabel("Alpha Sequence Value")
plt.title("Original vs. Predicted Alpha Sequence")
plt.legend()
plt.grid(True)
# Save figure with 200 dpi
plt.savefig(fig_path, dpi=200)
print(f"Figure saved to {fig_path}")
# Show plot
plt.show()

predict last m tokens

In [None]:

m = 1  # Number of tokens to predict

# test_embedding = TimeSeriesEmbeddingAlpha(test_series, dimension, lag)
# test_seq = test_embedding.alpha_sequence  # Extract alpha sequence

# # Convert test sequence to token indices
# test_seq_tokenized = np.vectorize(token_map.get)(test_seq)

# Split input and target sequences
test_seq_input = test_seq_tokenized[:-m]  # Input sequence (excluding last `m` tokens)
test_seq_target = test_seq_tokenized[-m:]  # Ground truth for last `m` tokens

# Convert to tensor
test_seq_tensor = torch.tensor(test_seq_input, dtype=torch.long).unsqueeze(0).to(device)  # Shape: (1, seq_len)

# Predict `m` tokens
model.eval()
predicted_tokens = []
with torch.no_grad():
    for _ in range(m):
        # Forward pass
        predicted_logits = model(test_seq_tensor, test_seq_tensor)  # Output shape: (1, seq_len, vocab_size)

        # Extract the last predicted token
        next_token = torch.argmax(predicted_logits[:, -1, :], dim=-1)  # Shape: (1,)

        # Reshape `next_token` to match `test_seq_tensor`
        next_token = next_token.unsqueeze(1)  # Shape: (1, 1) -> (batch, seq_len)

        # Append predicted token to input sequence (autoregressive modeling)
        test_seq_tensor = torch.cat([test_seq_tensor, next_token], dim=1)  # Concatenates along sequence dimension

        # Store predicted token
        predicted_tokens.append(next_token.item())

# Convert predicted indices back to original signed values
predicted_tokens_signed = np.vectorize(inverse_token_map.get)(predicted_tokens)

# Convert ground truth back to signed values
test_seq_target_signed = np.vectorize(inverse_token_map.get)(test_seq_target)

print("Input Alpha Sequence (Original, Excluding Last m): \n", test_seq[:-m], "\n")
print("Ground Truth Last m Tokens: \n", test_seq_target_signed, "\n")
print("Predicted Last m Tokens:\n", predicted_tokens_signed, "\n")

In [None]:
import matplotlib.pyplot as plt

# Create x-axis positions
x_full = np.arange(len(test_seq))  # Full sequence indices
x_original = x_full[:-m]  # Before the last m tokens
x_ground_truth = x_full[-m:]  # Last m tokens (ground truth)
x_predicted = x_ground_truth  # Last m tokens (predictions)

plt.figure(figsize=(10, 5))

# Original sequence before last m points (blue circles)
plt.plot(x_original, test_seq[:-m], 'bo-', label='Original Sequence', markersize=8)

# Last m points of original sequence  (crosses)
plt.plot(x_ground_truth, test_seq_target_signed, 'x--', color='blue', label='Ground Truth (Last m)', markersize=6)

# Predicted sequence (red triangles)
plt.plot(x_predicted, predicted_tokens_signed, 'r^--', label='Predicted token', markersize=4)

# Add dashed blue line: Connect last original point to first ground truth point
plt.plot([x_original[-1], x_ground_truth[0]], [test_seq[:-m][-1], test_seq_target_signed[0]], 'b--')

# Add dashed red line: Connect last original point to first predicted point
plt.plot([x_original[-1], x_predicted[0]], [test_seq[:-m][-1], predicted_tokens_signed[0]], 'r--')

# Labels & Legend
plt.xlabel("Time Step")
plt.ylabel("Alpha Sequence Value")
plt.title(f"Original vs. Predicted Alpha Sequence (Predicting Last {m} Tokens)")
plt.legend()
plt.grid(True)
fig_path = os.path.join(project_folder, f"{model_name}_predict_{m}_last_.png")
plt.savefig(fig_path, dpi=200)
print(f"Figure saved to {fig_path}")
# Show plot
plt.show()

In [None]:
seq_array[-9:]

In [None]:
# plot seq_array[-9:]

plt.plot(seq_array[-8:-1], 'bo-')
plt.xlabel("Index")
plt.ylabel("Value")
plt.title("Last 7 Values of seq_array before predicting the alpha value consistent with the past")
plt.show()


In [None]:
np.argsort(np.argsort(seq_array[-8:-1]))

In [None]:
embdd = TimeSeriesEmbeddingAlpha(seq_array[-10:], dimension, lag)
embdd.alpha_sequence

In [None]:
import matplotlib.pyplot as plt

# Create x-axis positions
x_full = np.arange(len(test_seq))
x_original = x_full[:-m]  # Before the last m tokens
x_ground_truth = x_full[-m:]  # Last m tokens
x_predicted = x_ground_truth  # Predicted points align with last m tokens

plt.figure(figsize=(10, 5))

# Original sequence (blue circles, excluding last m points)
plt.plot(x_original, test_seq[:-m], 'bo-', label='Original Sequence', markersize=8)

# Last m points of original sequence (orange crosses)
plt.plot(x_ground_truth, test_seq_target_signed, 'x-', color='blue', label='Ground Truth (Last m)', markersize=10)

# Predicted sequence (red triangles)
plt.plot(x_predicted, predicted_tokens_signed, 'r^-', label='Predicted Sequence', markersize=8)

# Labels & Legend
plt.xlabel("Time Step")
plt.ylabel("Alpha Sequence Value")
plt.title(f"Original vs. Predicted Alpha Sequence (Predicting Last {m} Tokens)")
plt.legend()
plt.grid(True)

# Show plot
plt.show()