<a href="https://colab.research.google.com/github/eisbetterthanpi/transformer/blob/main/Transformer_classifier.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# @title papmap2 dataset
!wget https://archive.ics.uci.edu/static/public/231/pamap2+physical+activity+monitoring.zip -O pamap2.zip
!unzip pamap2.zip
!unzip PAMAP2_Dataset.zip

import os
import numpy as np
import pandas as pd
# https://github.com/EdnaEze/Physical-Activity-Monitoring/blob/main/DSRM-Edna.ipynb

activities = {0:'transient', 1:'lying', 2:'sitting', 3:'standing', 4:'walking', 5:'running', 6:'cycling', 7:'Nordic_walking', 9:'watching_TV', 10:'computer_work', 11:'car driving', 12:'ascending_stairs', 13:'descending_stairs', 16:'vacuum_cleaning', 17:'ironing', 18:'folding_laundry', 19:'house_cleaning', 20:'playing_soccer', 24:'rope_jumping'}
all_columns = ["time", "activity", "heartrate", 'handTemperature', 'handAcc16_1', 'handAcc16_2', 'handAcc16_3', 'handAcc6_1', 'handAcc6_2', 'handAcc6_3', 'handGyro1', 'handGyro2', 'handGyro3', 'handMagne1', 'handMagne2', 'handMagne3', 'handOrientation1', 'handOrientation2', 'handOrientation3', 'handOrientation4', 'chestTemperature', 'chestAcc16_1', 'chestAcc16_2', 'chestAcc16_3', 'chestAcc6_1', 'chestAcc6_2', 'chestAcc6_3', 'chestGyro1', 'chestGyro2', 'chestGyro3', 'chestMagne1', 'chestMagne2', 'chestMagne3', 'chestOrientation1', 'chestOrientation2', 'chestOrientation3', 'chestOrientation4', 'ankleTemperature', 'ankleAcc16_1', 'ankleAcc16_2', 'ankleAcc16_3', 'ankleAcc6_1', 'ankleAcc6_2', 'ankleAcc6_3', 'ankleGyro1', 'ankleGyro2', 'ankleGyro3', 'ankleMagne1', 'ankleMagne2', 'ankleMagne3', 'ankleOrientation1', 'ankleOrientation2', 'ankleOrientation3', 'ankleOrientation4']

dataset = pd.DataFrame()

# path = '/content/OpportunityUCIDataset/dataset'
path = '/content/PAMAP2_Dataset/Protocol/'

usr_lst = os.listdir(path)
for file in os.listdir(path):
# for file, subject_id in zip(file_names, subject_id):
    df = pd.read_table(path+file, header=None, sep='\s+')
    df.columns = all_columns
    df['subject'] = file
    dataset = pd.concat([dataset, df], ignore_index=True)

y = dataset['subject'].unique()
y.sort()

df_train = dataset[dataset['subject'].isin(y[:int(.7*len(y))])]
df_test = dataset[dataset['subject'].isin(y[-int(.3*len(y)):])]

def make_Xy(dataset):
    anss = [y for _, y in dataset.groupby(['subject', 'activity'])]
    ans = []
    for x in anss:
        if len(x) > 1000: # only keep sequences with more than 1000 samples
            ans.append(x)
    y_train = [df['activity'].iloc[0] for df in ans]
    # y_train = [df['subject'].iloc[0] for df in ans]
    # X_train = [df.drop(['subject', 'activity','time'], axis=1) for df in X_train]
    X_train = [df.drop(['subject', 'activity','time'], axis=1) for df in ans]
    # X_train = [df.interpolate(method='index', axis=0, limit_direction='both') for df in ans]

    X_train = [df.apply(pd.to_numeric, errors='coerce') for df in X_train] # Convert non-numeric data in dataset to numeric. errors='coerce': replace all non-numeric values with NaN.
    X_train = [df.interpolate(method='index', axis=0, limit_direction='both') for df in X_train] # replace NaN by interpolating

    # X_train = [df.interpolate(method='values', axis=0, limit_direction='both') for df in ans]
    # data.reset_index(drop=True, inplace=True) # make row ind start from 0
    return X_train, y_train

X_train, y_train = make_Xy(df_train)
X_test, y_test = make_Xy(df_test)



In [None]:
# @title pandasDataset
import numpy as np
import pandas as pd
from torch.utils.data import Dataset, DataLoader

class pandasDataset(Dataset):
    def __init__(self, X, y):
        self.X, self.y = X, y
        chars = sorted(list(set(y)))
        self.vocab_size = len(chars) #
        self.stoi = {ch:i for i,ch in enumerate(chars)}
        self.itos = {i:ch for i,ch in enumerate(chars)}
        self.y = self.data_process(y) #
        self.seq_len = min([len(a) for a in X])
        print('seq_len',self.seq_len)

    def data_process(self, data): # str
        # return torch.tensor([self.stoi.get(c) for c in data]) #
        return np.array([self.stoi.get(c) for c in data]) #

    def __len__(self): return len(self.X)
    # def __getitem__(self, idx): return self.X.iloc[idx].to_numpy(), self.y.iloc[idx]
    # def __getitem__(self, idx): return self.X[idx].to_numpy(), self.y[idx]
    def __getitem__(self, idx):
        i = np.random.randint(0, len(self.X[idx])-self.seq_len+1)
        return self.X[idx].to_numpy()[i:i+self.seq_len].astype(float), self.y[idx]

train_data = pandasDataset(X_train, y_train)
test_data = pandasDataset(X_test, y_test)
batch_size = 16 # 64
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, pin_memory=True, num_workers=2)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=True, pin_memory=True, num_workers=2)

for X, y in train_loader:
    print(X.shape, y.shape)
    break



In [None]:
# @title RoPE
import torch
from torch import nn
device = 'cuda' if torch.cuda.is_available() else 'cpu'

class RoPE(nn.Module): # Rotary Positional Embeddings
    def __init__(self, dim, seq_len=512, base=10000):
        super().__init__()
        self.dim, self.base = dim, base
        theta = 1.0 / (base ** (torch.arange(0, dim, step=2) / dim))
        pos = torch.arange(seq_len).unsqueeze(-1)
        angles = (pos * theta)[None,...,None] # [seq_len, 1] * [dim // 2] -> [1, seq_len, dim // 2, 1]
        self.rot_emb = torch.cat([torch.sin(angles), torch.cos(angles)], dim=-1).flatten(-2).to(device) # [seq_len, dim // 2, 2] -> [1, seq_len, dim]

    def forward(self, x):
        seq_len = x.size(1)
        if self.rot_emb.shape[0] < seq_len: self.__init__(self.dim, seq_len, self.base)
        return x * self.rot_emb[:seq_len]


In [None]:
# @title Transformer classifier
import torch
from torch import nn
device = 'cuda' if torch.cuda.is_available() else 'cpu'

class TransformerModel(nn.Module):
    def __init__(self, in_dim, d_model, out_dim, nhead=8, nlayers=1, dropout=0.):
        super().__init__()
        self.lin = nn.Linear(in_dim, d_model)
        self.pos_encoder = RoPE(d_model, seq_len=15, base=10000)
        # encoder_layers = nn.TransformerEncoderLayer(d_model, nhead, d_model*4, dropout, batch_first=True) # https://docs.pytorch.org/docs/stable/generated/torch.nn.TransformerEncoderLayer.html
        # self.transformer_encoder = nn.TransformerEncoder(encoder_layers, nlayers) # https://docs.pytorch.org/docs/stable/generated/torch.nn.TransformerEncoder.html
        self.transformer_encoder = nn.TransformerEncoderLayer(d_model, nhead, d_model*4, dropout, batch_first=True) # https://docs.pytorch.org/docs/stable/generated/torch.nn.TransformerEncoderLayer.html
        self.out = nn.Linear(d_model, out_dim)
        # self.out = nn.Linear(d_model*2, out_dim)
        self.cls = nn.Parameter(torch.randn(1,1,d_model))
        self.attn_pool = nn.Linear(d_model, 1, bias=False)

    def forward(self, x, src_key_padding_mask = None): # [batch, seq_len, d_model], [batch, seq_len] # True will be ignored by the attention # https://pytorch.org/docs/stable/generated/torch.nn.Transformer.html

        x = self.lin(x)
        # x = torch.cat([self.cls.repeat(x.shape[0],1,1), x], dim=1)
        # src_key_padding_mask = torch.cat([torch.zeros((batch, 1), dtype=torch.bool), src_key_padding_mask], dim=1)
        x = self.pos_encoder(x)
        out = self.transformer_encoder(x, src_key_padding_mask=src_key_padding_mask)
        # mean_pool = out.min(dim=1)[0]
        # max_pool = out.max(dim=1)[0]
        # out = torch.cat([out, mean_pool], dim=-1)
        # out = self.out(out[:,0])

        attn = self.attn_pool(x).squeeze(-1) # [batch, seq] # seq_pool

        out = (torch.softmax(attn, dim=-1).unsqueeze(1) @ x).squeeze(1) # [batch, 1, seq] @ [batch, seq, dim] -> [batch, dim]
        # print(attn.shape, x.shape, out.shape)
        out = self.out(out)
        return out # [batch, seq_len, d_model]
        # return mean_pool # [batch, d_model]


batch, seq_len, d_model = 4,7,16
try:
    in_dim = X[0].shape[-1] # 3
    out_dim = train_data.vocab_size # 16
except NameError:
    in_dim, out_dim = 3,16
print('in_dim, out_dim', in_dim, out_dim)

model = TransformerModel(in_dim, d_model, out_dim, nhead=4, nlayers=1, dropout=0.).to(device)
x = torch.rand(batch, seq_len, in_dim, device=device)

src_key_padding_mask = torch.stack([(torch.arange(seq_len) < seq_len - v) for v in torch.randint(seq_len, (batch,))]) # True will be ignored # https://pytorch.org/docs/stable/generated/torch.nn.Transformer.html
# print(src_key_padding_mask)
out = model(x)
# out = model(x, src_key_padding_mask)
print(out.shape)

optim = torch.optim.AdamW(model.parameters(), lr=1e-3)
print(sum(p.numel() for p in model.parameters() if p.requires_grad)) #


In [None]:
# @title train test
import torch
import torch.nn.functional as F
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
scaler = torch.GradScaler()

def train(model, dataloader, optim, scheduler=None):
    model.train()
    for i, (x, y) in enumerate(dataloader):
        # x = x.to(device)#.to(torch.bfloat16)
        x, y = x.to(device).to(torch.float), y.to(device)
        # with torch.autocast(device_type=device, dtype=torch.bfloat16): # bfloat16 float16
        y_ = model(x)
        # print(y_.shape, y.shape)
        # loss = F.l1_loss(y_, y) # L1 loss
        # loss = F.mse_loss(y_, y) # L2 loss
        loss = F.cross_entropy(y_, y) # classification loss
        optim.zero_grad()
        scaler.scale(loss).backward()
        scaler.step(optim)
        scaler.update()
        if scheduler is not None: scheduler.step()
        if i>=100:
            print("train",loss.item())

def test(model, dataloader):
    model.eval()
    correct = 0
    for i, (x, y) in enumerate(dataloader):
        # x, y = x.to(device), y.to(device) # [batch, ]
        x, y = x.to(device).to(torch.float), y.to(device) # [batch, ] # (id, activity)
        with torch.no_grad():
            y_ = model(x)
        # test_loss = F.mse_loss(y_, y)
        test_loss = F.cross_entropy(y_, y)
        correct += (y==y_.argmax(dim=1)).sum().item()
    # print(correct/len(y))
    print('acc', round(correct/len(dataloader.dataset), 3), 'test_loss', round(test_loss.item()/len(y), 3))

# scheduler = get_cosine_schedule_with_warmup(optim, num_warmup_steps=400, num_training_steps=2000) # https://docs.pytorch.org/torchtune/0.2/generated/torchtune.modules.get_cosine_schedule_with_warmup.html
for i in range(2000): #
    # train_loader = torch.utils.data.DataLoader(train_data, batch_size=batch_size, shuffle=True)
    # test_loader = torch.utils.data.DataLoader(test_data, batch_size=batch_size, shuffle=True)

    train(model, train_loader, optim)
    test(model, test_loader)

    # scheduler.step()
    # print('lr', i, optim.param_groups[0]["lr"])


In [None]:
torch.cuda.empty_cache()