In [1]:
import numpy as np
from sklearn.preprocessing import StandardScaler

from momentfm.utils.data import load_from_tsfile


class StockDataset:
    def __init__(self, data_split="train"):
        """
        Parameters
        ----------
        data_split : str
            Split of the dataset, 'train', 'val' or 'test'.
        """

        self.seq_len = 512
        self.train_file_path_and_name = "./label_data/w2_train.ts"
        self.test_file_path_and_name = "./label_data/w2_test.ts"
        self.data_split = data_split  # 'train' or 'test'

        # Read data
        self._read_data()

    def _transform_labels(self, train_labels: np.ndarray, test_labels: np.ndarray):
        labels = np.unique(train_labels)  # Move the labels to {0, ..., L-1}
        transform = {}
        for i, l in enumerate(labels):
            transform[l] = i

        train_labels = np.vectorize(transform.get)(train_labels)
        test_labels = np.vectorize(transform.get)(test_labels)

        return train_labels, test_labels

    def __len__(self):
        return self.num_timeseries

    def _read_data(self):
        self.scaler = StandardScaler()

        self.train_data, self.train_labels = load_from_tsfile(
            self.train_file_path_and_name
        )
        self.test_data, self.test_labels = load_from_tsfile(
            self.test_file_path_and_name
        )

        self.train_labels, self.test_labels = self._transform_labels(
            self.train_labels, self.test_labels
        )

        if self.data_split == "train":
            self.data = self.train_data
            self.labels = self.train_labels
        else:
            self.data = self.test_data
            self.labels = self.test_labels

        self.num_timeseries = self.data.shape[0]
        self.len_timeseries = self.data.shape[2]

        self.data = self.data.reshape(-1, self.len_timeseries)
        self.scaler.fit(self.data)
        self.data = self.scaler.transform(self.data)
        self.data = self.data.reshape(self.num_timeseries, self.len_timeseries)

        self.data = self.data.T

    def __getitem__(self, index):
        assert index < self.__len__()

        timeseries = self.data[:, index]
        timeseries_len = len(timeseries)
        labels = self.labels[index,].astype(int)
        input_mask = np.ones(self.seq_len)
        input_mask[: self.seq_len - timeseries_len] = 0

        timeseries = np.pad(timeseries, (self.seq_len - timeseries_len, 0))

        return np.expand_dims(timeseries, axis=0), input_mask, labels


  from .autonotebook import tqdm as notebook_tqdm


In [None]:
import torch
from torch.utils.data import DataLoader
import torch.nn as nn
from momentfm import MOMENTPipeline
from momentfm.data.base import TimeseriesOutputs
from momentfm.data.classification_dataset import ClassificationDataset

dataset = StockDataset(data_split="train")  # Use 'test' or 'val' as needed

train_dataloader = DataLoader(dataset, batch_size=16, shuffle=True)

print(f"Num Train Set: {len(train_dataloader)}")

# Initialize the model and move it to GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
model = MOMENTPipeline.from_pretrained(
    "AutonLab/MOMENT-1-large", 
    model_kwargs={
        'task_name': 'classification',
        'n_channels': 1,
        'num_class': 3
    },
).to(device)

model.init()

# Convert model weights to float32
model = model.float().to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

# Training loop
for epoch in range(200):  # Specify number of epochs
    total_loss = 0
    correct = 0
    total = 0
    
    for data, input_mask, labels in train_dataloader:  # Adjust based on dataset output format
        data = data.to(device, dtype=torch.float32)
        labels = labels.to(device)
        input_mask = input_mask.to(device) if input_mask is not None else None
        
        # Forward pass
        output = model(x_enc=data)

        if output is None or output.logits is None:
            raise ValueError("The model's output is None. Check the model's forward implementation.")

        logits: TimeseriesOutputs = output.logits

        # Compute loss
        loss = criterion(logits, labels)
        total_loss += loss.item()
        
        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Calculate accuracy
        _, predicted = torch.max(logits, dim=1)  # Get predictions
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    avg_loss = total_loss / len(train_dataloader)

    print(f"Epoch {epoch+1}, Loss: {avg_loss:.3f}, Accuracy: {accuracy:.2f}%")


Num Train Set: 2576
Using device: cuda




Epoch 1, Loss: 1.094, Accuracy: 37.25%


In [3]:
from sklearn.metrics import precision_score, recall_score, f1_score
import time
# Load the test dataset
test_dataset = StockDataset(data_split="test")
test_dataloader = DataLoader(test_dataset, batch_size=1, shuffle=False)

# Evaluate the model on the test dataset
model.eval()  # Set the model to evaluation mode
test_loss = 0
correct = 0
total = 0

all_labels = []
all_predictions = []
t1 = time.time()

with torch.no_grad():  # Disable gradient computation for evaluation
    for data, input_mask, labels in test_dataloader:
        # Move data to the appropriate device
        data = data.to(device, dtype=torch.float32)
        labels = labels.to(device)
        input_mask = input_mask.to(device) if input_mask is not None else None
        
        # Forward pass
        output = model(x_enc=data)

        if output is None or output.logits is None:
            raise ValueError("The model's output is None. Check the model's forward implementation.")

        logits: TimeseriesOutputs = output.logits

        # Compute loss
        loss = criterion(logits, labels)
        test_loss += loss.item()

        # Get predictions
        _, predicted = torch.max(logits, dim=1)  # Get the predicted class indices
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        # Save all labels and predictions for metric computation
        all_labels.extend(labels.cpu().numpy())
        all_predictions.extend(predicted.cpu().numpy())

# Calculate metrics
test_loss /= len(test_dataloader)
accuracy = 100 * correct / total
precision = precision_score(all_labels, all_predictions, average="weighted")
recall = recall_score(all_labels, all_predictions, average="weighted")
f1 = f1_score(all_labels, all_predictions, average="weighted")

# Print metrics

print(f"Num Test Set: {len(test_dataloader)}")
print(f"Test Loss: {test_loss:.3f}")
print(f"Test Accuracy: {accuracy:.2f}%")
print(f"Precision: {precision:.2f}")
print(f"Recall: {recall:.2f}")
print(f"F1 Score: {f1:.2f}")
print(f"Time taken: {time.time() - t1:.2f} seconds")

Num Test Set: 5
Test Loss: 1.210
Test Accuracy: 20.00%
Precision: 0.04
Recall: 0.20
F1 Score: 0.07
Time taken: 0.06 seconds


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
