In [1]:
import h5py
import numpy as np
import torch


def getData():
    # Open the .h5 file
    with h5py.File("data2.h5", "r") as f:
        # Access datasets
        x_dataset = f["x_data"]
        y_dataset = f["y_data"]

        # Convert datasets to NumPy arrays
        x_np = np.array(x_dataset[:])
        y_np = np.array(y_dataset[:])

        # Optional: Convert NumPy arrays to PyTorch tensors
        x_tensor = torch.tensor(x_np, dtype=torch.float32)
        y_tensor = torch.tensor(y_np, dtype=torch.long)

        x_tensor = x_tensor.permute(0, 2, 1).contiguous()
    return x_tensor, y_tensor


In [2]:
from torch.utils.data import Dataset


class MyDataSet(Dataset):
    def __init__(self, features, labels, transform=None,
                 target_transform=None):
        super(MyDataSet, self).__init__()
        # calculate magnitude and phase
        real = features[:, :, 0]
        imaginary = features[:, :, 1]
        magnitude = np.sqrt(real**2 + imaginary**2)
        phase = np.arctan2(imaginary, real)

        # append magnitude and phase to features
        features = np.concatenate((features, magnitude[:, :, None],
                                   phase[:, :, None]), axis=2)

        self.features = torch.tensor(features, dtype=torch.float32)
        self.labels = labels
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.features)

    def __getitem__(self, index):
        sample = self.features[index, :, :]
        annotation = self.labels[index]  # there's 5 classes
        sample = sample.view(1, 100, 4)
        if self.transform:
            sample = self.transform(sample)
        if self.target_transform:
            annotation = self.target_transform(annotation)
        sample = sample.squeeze()

        return sample, annotation


In [3]:
import torch.nn as nn


# 5 classes to create


# Patchify
class Embedding(nn.Module):
    def __init__(self, in_features, d_model):
        super(Embedding, self).__init__()
        self.linear = nn.Linear(in_features, d_model)
        self.relu = nn.ReLU()
        self.norm = nn.LayerNorm(d_model)

    def forward(self, x):
        # x shape -> (B, Seq_len, in_features)
        x = self.norm(self.relu(self.linear(x)))
        # x shape -> (B, seq_len, d_m)
        return x


# PositionalEncoding (w/ cls token)
class PositionalEncoding(nn.Module):
    def __init__(self, seq_len, d_model):
        super(PositionalEncoding, self).__init__()

        self.cls_token = nn.Parameter(torch.randn(1, 1, d_model),
                                      requires_grad=True)
        self.pe = nn.Parameter(torch.randn(1, seq_len + 1, d_model),
                               requires_grad=True)

    def forward(self, x):
        # x shape -> (B, seq_len, d_m)
        # x = x.squeeze(1)

        B, seq_len, _ = x.shape
        cls_token = self.cls_token.expand(B, -1, -1)
        x = torch.cat([cls_token, x], dim=1)
        # x shape -> (B, seq_len + 1, d_m)
        x = x + self.pe[:, : seq_len + 1]
        return x


# AttentionHead ( I am not going to be implementing this from scratch)
# MultiHeadAttention (Because of performance issues)
# Encoder Layer
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, ff_d, dropout):
        super(EncoderLayer, self).__init__()
        self.attn = nn.MultiheadAttention(d_model, num_heads, dropout,
                                          batch_first=True)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.ffn = nn.Sequential(
            nn.Linear(d_model, ff_d),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.LayerNorm(ff_d),
            nn.Linear(ff_d, d_model),
            nn.ReLU(),
        )
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # x shape -> (B, seq_len + 1, d_model)
        att_out, _ = self.attn(x, x, x)
        x = self.norm1(x + self.dropout(att_out))
        ff_out = self.ffn(x)
        x = self.norm2(x + self.dropout(ff_out))
        return x


# TransformerEncoder
class TransformerEncoder(nn.Module):
    def __init__(self, d_model, num_heads, ff_d, num_layers, dropout):
        super(TransformerEncoder, self).__init__()
        self.encoders = nn.ModuleList(
            [EncoderLayer(d_model, num_heads, ff_d, dropout)
             for _ in range(num_layers)]
        )

    def forward(self, x):
        # x shape -> (B, seq_len + 1, d_model)
        enc_out = x
        for enc in self.encoders:
            enc_out = enc(enc_out)
        return enc_out


# ViT
class ViT(nn.Module):
    def __init__(
        self,
        in_features,
        d_model,
        seq_len,
        num_heads,
        ff_d,
        num_layers,
        num_classes,
        dropout,
    ):
        super(ViT, self).__init__()

        self.embedding = Embedding(in_features, d_model)
        self.pe = PositionalEncoding(seq_len, d_model)

        self.transformer = TransformerEncoder(
            d_model, num_heads, ff_d, num_layers, dropout
        )
        self.head = nn.Sequential(
            nn.Linear(d_model, ff_d),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.LayerNorm(ff_d),
            nn.Linear(ff_d, num_classes),
        )

    def forward(self, x):
        embed = self.embedding(x)
        out = self.pe(embed)
        out = self.transformer(out)
        # out shape -> (B, seq_len + 1, d_model)
        out = self.head(out[:, 0, :]).squeeze()
        # out shape -> (B, num_classes)
        return out


In [4]:
from torch.utils.data import DataLoader
from torch.optim import Optimizer
from tqdm import tqdm


def train_one_epoch(model, data_loader: DataLoader, criterion,
                    optimizer: Optimizer, epoch: int, device: str="cpu"):
    model.train()
    total_loss = 0.0
    avg_acc = 0.0
    loop = tqdm(data_loader, desc=f"Epoch {epoch + 1}", unit="batch")
    for i, batch in enumerate(loop):

        x, y = batch[0].to(device), batch[1].to(device)
        # forward pass
        y_hat = model(x)

        # calc loss
        loss = criterion(y_hat, y)
        total_loss += loss.item()

        # zero out the gradient
        optimizer.zero_grad()
        # backward pass
        loss.backward()
        optimizer.step()

        preds = torch.argmax(y_hat, dim=-1)
        acc = (preds == y).sum().item() / len(y)
        avg_acc += acc

        loop.set_postfix(loss=total_loss / (i + 1),
                         accuracy=100. * avg_acc / (i + 1))

    total_loss /= len(data_loader)
    avg_acc /= len(data_loader)
    return total_loss, avg_acc


def test_one_epoch(model, data_loader: DataLoader, criterion,
                   epoch: int, device: str="cpu"):
    model.eval()
    total_loss = 0.0
    avg_acc = 0.0
    loop = tqdm(data_loader, desc=f"Epoch {epoch + 1}", unit="batch")
    for i, batch in enumerate(loop):

        x, y = batch[0].to(device), batch[1].to(device)
        # forward pass
        y_hat = model(x)

        # calc loss
        loss = criterion(y_hat, y)
        total_loss += loss.item()

        preds = torch.argmax(y_hat, dim=-1)
        acc = (preds == y).sum().item() / len(y)
        avg_acc += acc

        loop.set_postfix(loss=total_loss / (i + 1),
                         accuracy=100. * avg_acc / (i + 1))

    total_loss /= len(data_loader)
    avg_acc /= len(data_loader)
    return total_loss, avg_acc


In [5]:
print("CUDA disponível:", torch.cuda.is_available())
print("Número de GPUs disponíveis:", torch.cuda.device_count())
print("Nome da GPU:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "Nenhuma GPU encontrada")

CUDA disponível: True
Número de GPUs disponíveis: 1
Nome da GPU: Tesla T4


In [11]:
import yaml
import torch

from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# carregue os dados formatados
X, y = getData()
# separe os dados
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.25, random_state=24
)

# crie o dataset
train_dataset = MyDataSet(X_train, y_train)
test_dataset = MyDataSet(X_test, y_test)

# crie o dataloader
train_loader = DataLoader(dataset=train_dataset, batch_size=1024, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=32, shuffle=False)
print(f"Train_loader size: {len(train_loader)}")
print(f"Size of each loader: {len(next(iter(train_loader)))}")
print(f"Test_loader size: {len(test_loader)}")

# Construct the argument parser
model_name = "vit_r13"
with open(f"{model_name}.yml", "r") as file:
    config = yaml.safe_load(file)

hyperparams = config["hyperparams"]
print(hyperparams)
torch.cuda.empty_cache()
model = ViT(**hyperparams).to(device)
# model = CNN()
total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total params: {total_params}")


Train_loader size: 120
Size of each loader: 2
Test_loader size: 1280
{'in_features': 4, 'd_model': 25, 'seq_len': 100, 'num_heads': 5, 'ff_d': 1024, 'num_layers': 7, 'num_classes': 5, 'dropout': 0.1}


RuntimeError: CUDA error: device-side assert triggered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.


In [10]:
!pip show torch

Name: torch
Version: 2.5.1+cu121
Summary: Tensors and Dynamic neural networks in Python with strong GPU acceleration
Home-page: https://pytorch.org/
Author: PyTorch Team
Author-email: packages@pytorch.org
License: BSD-3-Clause
Location: /usr/local/lib/python3.10/dist-packages
Requires: filelock, fsspec, jinja2, networkx, sympy, typing-extensions
Required-by: accelerate, fastai, peft, sentence-transformers, timm, torchaudio, torchvision


In [7]:
# now load the model params
model.load_state_dict(torch.load(f"{model_name}.pth", weights_only=True))
print("loaded model")
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min",
                                                       factor=0.5,patience=20,
                                                       min_lr=1e-6)

num_epochs = 34

train_losses = []
test_losses = []
for epoch in range(num_epochs):
    train_loss, train_acc = train_one_epoch(
        model, train_loader, criterion, optimizer, epoch, device
    )

    val_loss, val_acc = test_one_epoch(model, test_loader, criterion,
                                       epoch, device)
    scheduler.step(val_loss)

    train_losses.append(train_loss)
    test_losses.append(val_loss)

print(f"Learning rate {scheduler.get_last_lr()}")
print(f"Train loss: {train_losses[-1]}, acc: {train_acc}")
print(f"Val loss: {test_losses[-1]}, acc: {val_acc}")

# plot the loss and val loss
plt.plot(range(num_epochs), train_losses, color="red", label="train_loss")
plt.plot(range(num_epochs), test_losses, label="val_loss")
plt.show()

torch.save(model.state_dict(), f"{model_name}.pth")
print("Model saved successfully.")

loaded model


Epoch 1:   0%|          | 0/120 [00:00<?, ?batch/s]


RuntimeError: CUDA error: device-side assert triggered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.
