In [None]:
import torch
from torch import nn
from torch.utils.data import Dataset,DataLoader

In [None]:
class Time2Vec(nn.Module):
    def __init__(self, k):
        super().__init__()
        self.k = k # Dimension of time2vec vector
        self.w = nn.Parameter(torch.randn(k)) # Learnable parameters for linear part
        self.b = nn.Parameter(torch.randn(k)) # Learnable parameters for linear part
        self.w_sin = nn.Parameter(torch.randn(k)) # Learnable parameters for periodic part
        self.b_sin = nn.Parameter(torch.randn(k)) # Learnable parameters for periodic part
        self.w_cos = nn.Parameter(torch.randn(k)) # Learnable parameters for periodic part
        self.b_cos = nn.Parameter(torch.randn(k)) # Learnable parameters for periodic part
    def forward(self, x):
        x = x.unsqueeze(-1) # Add extra dimension for vectorization
        linear = self.w * x + self.b # Linear transformation (k features)
        sin_trans = torch.sin(self.w_sin * x + self.b_sin) # Periodic transformation (k features)
        cos_trans = torch.cos(self.w_cos * x + self.b_cos) # Periodic transformation (k features)
        return torch.cat([linear, sin_trans, cos_trans,sin_trans+cos_trans+linear],-1) # Concatenate along last dimension
t2v = Time2Vec(20)

In [None]:
# import numpy as np

# # Define hyperparameters
# batch_size = 32 # Number of samples per batch
# seq_len = 100 # Length of each time series
# in_features = 1
# n_classes = 20 # Number of classes to predict

# # Create synthetic dataset (X: input features, y: labels)
# # X = np.random.randn(100,batch_size * seq_len * in_features).reshape(100, batch_size , seq_len)
# # y = np.random.randint(0, n_classes , size=(100,batch_size))
# def gen_ds(samples = 32_000,name = "dl",batch_size = 32):
#     assert samples%n_classes == 0
#     xcont = []
#     ycont = []
#     for i in range(n_classes):
#         cnumb = int(samples/n_classes)
#         ox = np.random.randn(samples,seq_len*in_features).reshape(samples,seq_len)
#         lx = ox + i
#         ly = np.full((samples),i)
#         print()
#         xcont.append(lx)
#         ycont.append(ly)
#         print(np.mean(ox),np.mean(lx),np.mean(ly),i)


#     X = np.concatenate(xcont)
#     Y = np.concatenate(ycont)

#     # Convert numpy arrays to tensors 
#     X = torch.from_numpy(X).float()
#     y = torch.from_numpy(Y).long()

#     class tds(Dataset):
#         def __init__(self,x,y,name) -> None:
#             super().__init__()
#             self.name = name
#             self.X = x
#             self.Y = y
#             self.pspace = len(self.X)
#         def __len__(self):
#             return self.pspace
#         def __getitem__(self, index):
#             return X[index],Y[index]
#     ds = tds(X,y,name)
#     dl = DataLoader(ds,batch_size=batch_size,shuffle=True)
#     return dl
# # train_dl = gen_ds(samples = 800*20,name = "train_dl")
# # test_dl = gen_ds(1000,name = "test_dl",batch_size=250)

In [None]:
from CoRe_Dataloader import dataloader   
train_dl, test_dl = dataloader,dataloader

In [None]:
# Import modules
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

n_features = 16 # Number of features per time step
n_layers = 3 # Number of transformer encoder layers
n_heads = 8 # Number of attention heads per layer
hidden_size = 1024 # Size of hidden state in sublayers 
dropout_rate = 0.2 # Dropout rate for regularization
n_classes = 19

# Define classifier model 
class Classifier(nn.Module):
    def __init__(self , n_layers , n_features , n_heads , hidden_size , dropout_rate , n_classes,length = 100):
        super(Classifier , self).__init__()
        self.t2v = Time2Vec(n_features)
        self.transformer_encoder = nn.TransformerEncoder(nn.TransformerEncoderLayer(n_features*4,n_heads,hidden_size,dropout_rate),n_layers) # Transformer encoder layer 
        self.pooling= nn.AdaptiveAvgPool1d(1) # Global average pooling layer 
        self.linear= nn.Linear(n_features*4,n_classes) # Linear layer with softmax activation 

    def forward(self,x):
        # x shape: (batch_size ,seq_len ,n_features)
        x = self.t2v(x)
        # print(x.shape)
        x= self.transformer_encoder(x) # Apply transformer encoder on x 
        # print(x.shape)
        x= x.permute(0 ,2 ,1) # Permute x to match expected shape for pooling (batch_size ,n_features ,seq_len)
        x= self.pooling(x) # Apply pooling on x 
        x= x.squeeze(-1) # Remove last dimension 
        x= self.linear(x) # Apply linear layer on x 
        return nn.functional.softmax(x,dim=-1) #Return class probabilities (batch_size ,n_classes)

In [None]:
model = Classifier(
    n_layers=n_layers,
    n_features=n_features,
    n_heads=n_heads,
    hidden_size=hidden_size,
    dropout_rate=dropout_rate,
    n_classes=n_classes,
    length=4680,
).to("cuda:0")
# Define loss function and optimizer
loss_fn = nn.CrossEntropyLoss(
    weight=torch.tensor(
        [
            0.9995,
            1.0000,
            0.4342,
            1.0000,
            0.9999,
            0.9582,
            0.9980,
            1.0000,
            0.9582,
            1.0000,
            0.3286,
            0.9984,
            0.9999,
            0.9882,
            0.2075,
            0.9975,
            0.1007,
            1.0000,
            0.9923,
        ]
    )
).to(
    "cuda:0"
)  # Cross entropy loss function
optimizer = optim.AdamW(model.parameters(), lr=5e-3)  # Adam optimizer
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 3, gamma=0.97)

In [None]:
import gc
torch.cuda.empty_cache()
gc.collect()

In [None]:
n_epochs = 1000
# Train the model on the training data 
for epoch in range(n_epochs): #Iterate over epochs 
    epochloss = 0
    ldl = len(train_dl)
    for bnum,(batch_x,batch_y) in enumerate(train_dl): #Iterate over batches 
        # print(batch_x.shape,batch_y.shape)
        batch_x = batch_x.to("cuda:0").to(torch.float)
        batch_y = batch_y.to("cuda:0").to(torch.long)[:,0]
        optimizer.zero_grad() #Clear previous gradients 
        
        output=model(batch_x).to(torch.float) #Get model output for current batch (batch_step ,n_classes)
        # print(output.shape)
        loss=loss_fn(output.to(torch.float),batch_y) #Compute loss for current batch 
        loss.backward() #Backpropagate loss 
        
        optimizer.step() #Update parameters 
        
        print(f"Epoch {epoch} Batch {bnum}/{ldl} : Loss {loss.item()}",end = "\r",flush=True) #Print epoch and loss 
        epochloss+=loss.item()
    if epoch > 20 and epoch < 300: scheduler.step() 
    epochloss/=ldl

    print(f"\nAverage epoch loss: {epochloss} with Learning rate {scheduler.get_last_lr()}")
    print("Evaluating with",end = " ")
    with torch.no_grad(): # Disable gradient computation 
        accsum = []
        for bnum,(batch_x,batch_y) in enumerate(train_dl):
            batch_x = batch_x.to("cuda:0")
            batch_y = batch_y.to("cuda:0").to(torch.long)[:,0]
            output=model(batch_x) #Get model output for validation data (val_batch_size ,n_classes)
            pred=torch.argmax(output,dim=-1) # Get predicted classes (val_batch_size)
            accsum.append((pred==batch_y).float()) # Compute accuracy (%)
            # print(inp.shape, output.shape, pred.shape, rout.shape,acc)
        # print(f"Accuracy on validation data : {acc}%")# Print accuracy
        equals= torch.cat(accsum)
        means = torch.mean(equals)
        print("mean total accuracy:", means.item()*100,"%\n","-=<{|}>=-"*8,'\n')
    # Evaluate the model on the validation data 

In [None]:
with torch.no_grad(): # Disable gradient computation 
    accsum = []
    for bnum,(batch_x,batch_y) in enumerate(test_dl):
        batch_x = batch_x.to("cuda:0")
        batch_y = batch_y.to("cuda:0")
        print(bnum)
        output=model(batch_x) #Get model output for validation data (val_batch_size ,n_classes)
        pred=torch.argmax(output,dim=-1) # Get predicted classes (val_batch_size)
        accsum.append((pred==batch_y).float()) # Compute accuracy (%)
        # print(inp.shape, output.shape, pred.shape, rout.shape,acc)
    # print(f"Accuracy on validation data : {acc}%")# Print accuracy
    equals= torch.cat(accsum)
    means = torch.mean(equals)
    print(means)

In [None]:
torch.set_printoptions(threshold=500_000)
print(equals)
torch.set_printoptions(profile='default')