In [1]:
# !pip install -q colorednoise

import pandas as pd

sub = pd.read_parquet('/kaggle/input/test-sub/submission (2).parquet')
sub

Unnamed: 0,sequence_id,gesture
0,SEQ_000001,Cheek - pinch skin
1,SEQ_000011,Eyelash - pull hair


In [2]:
# import colorednoise as cn
import gc
import librosa
import math
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import plotly.io as pio
import random
import time
import torch
import torch.nn as nn
import torch.nn.functional as F


from copy import deepcopy
from glob import glob
from plotly.subplots import make_subplots
from scipy.signal import butter, lfilter
from torch.utils.data import DataLoader, Dataset, Subset
from tqdm import tqdm

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
pio.renderers.default = 'iframe'

In [3]:
class config:
    MIXUP_PROB = 0.5

class paths:
    TEST_CSV = "/kaggle/input/cmi-detect-behavior-with-sensor-data/test.csv"
    TRAIN_CSV = "/kaggle/input/cmi-detect-behavior-with-sensor-data/train.csv"

In [4]:
def seed_everything(seed: int):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed) 
    

def sep():
    print("—"*100)

label_to_num = {
    'Above ear - pull hair': 0,  # < ------- TARGETS START
    'Cheek - pinch skin': 1,
    'Eyebrow - pull hair': 2,
    'Eyelash - pull hair': 3,
    'Forehead - pull hairline': 4,
    'Forehead - scratch': 5,
    'Neck - pinch skin': 6,
    'Neck - scratch': 7,  # < ------- TARGETS END
    'Drink from bottle/cup': 8,  # < ------- NON-TARGETS START
    'Feel around in tray and pull out an object': 9,
    'Glasses on/off': 10,
    'Pinch knee/leg skin': 11,
    'Pull air toward your face': 12,
    'Scratch knee/leg skin': 13,
    'Text on phone': 14,
    'Wave hello': 15,
    'Write name in air': 16,
    'Write name on leg': 17  # < ------- NON-TARGETS END
}

num_to_label = {v: k for k, v in label_to_num.items()}

In [5]:
df_train = pd.read_csv(paths.TRAIN_CSV)
df_test = pd.read_csv(paths.TEST_CSV)

print(f"Train dataframe shape: {df_train.shape}"), sep()
print(f"Tesat dataframe shape: {df_test.shape}"), sep()

df_train["target"] = df_train["gesture"].map(label_to_num)

Train dataframe shape: (574945, 341)
————————————————————————————————————————————————————————————————————————————————————————————————————
Tesat dataframe shape: (107, 336)
————————————————————————————————————————————————————————————————————————————————————————————————————


In [6]:
from sklearn.model_selection import train_test_split

demo = df_train[['sequence_id','target']].drop_duplicates()
from sklearn.model_selection import train_test_split

train_seq, val_seq, train_y, val_y = train_test_split(
    demo['sequence_id'],               
    demo['target'],              
    test_size=0.20,   
    train_size=None,  
    random_state=42,  
    shuffle=True,     
    stratify=demo['target']      
)

# print(train_y.value_counts())
# print(val_y.value_counts())

train = df_train[df_train.sequence_id.isin(train_seq)]
val = df_train[df_train.sequence_id.isin(val_seq)]

In [7]:
def standard_scale(arr: np.ndarray) -> np.ndarray:
    means = np.nanmean(arr, axis=0)
    stds = np.nanstd(arr, axis=0)
    stds = np.where(stds == 0, 1, stds)  # Prevent division by zero for constant columns
    scaled = (arr - means) / stds
    return scaled


def random_padding(x,max_time_steps= 100):
    ##assuming seq shape is time_steps x 7 (imu only)
    x= np.array(x)
    if x.shape[0] < max_time_steps:
        r = torch.randint(0,max_time_steps - x.shape[0], size = (1,))
        final_x = np.vstack((torch.zeros(r,x.shape[1]),x,torch.zeros(max_time_steps-r-x.shape[0],x.shape[1])))
        assert final_x.shape[0] == max_time_steps, "Error: Shape issue in padding!!"
        # final_x = pd.DataFrame(final_x)
        # final_x.columns = x.columns
        return final_x
    else:
        return  x[:max_time_steps]


imu_cols = ["acc_x", "acc_y", "acc_z", "rot_w", "rot_x", "rot_y", "rot_z"]
# train_X, train_y = [], []

# for sequence_id in tqdm(train_seq):
#     ds = train[train["sequence_id"] == sequence_id]
#     X = ds[imu_cols].values
#     y = ds.target.values[0]
#     acc = standard_scale(X[:, 0:3])
#     rot = X[:, 3:]
#     X = np.concatenate([acc, rot], axis=1)
#     X = np.where(np.isnan(X), 0, X)  # fill NaNs
#     train_X.append(X)
#     train_y.append(y)

# train_y = np.array(train_y)

# val_X, val_y = [], []

# for sequence_id in tqdm(val_seq):
#     ds = val[val["sequence_id"] == sequence_id]
#     X = ds[imu_cols].values
#     y = ds.target.values[0]
#     acc = standard_scale(X[:, 0:3])
#     rot = X[:, 3:]
#     X = np.concatenate([acc, rot], axis=1)
#     X = np.where(np.isnan(X), 0, X)  # fill NaNs
#     val_X.append(X)
#     val_y.append(y)

# val_y = np.array(val_y)


In [8]:
test_X = []
sequence_ids_test = df_test.sequence_id.unique()

for sequence_id in tqdm(sequence_ids_test):
    ds = df_test[df_test["sequence_id"] == sequence_id]
    X = ds[imu_cols].values
    acc = standard_scale(X[:, 0:3])
    rot = X[:, 3:]
    X = np.concatenate([acc, rot], axis=1)
    X = np.where(np.isnan(X), 0, X)  # fill NaNs
    test_X.append(X)

100%|██████████| 2/2 [00:00<00:00, 574.52it/s]


In [9]:
class SignalTransform:
    def __init__(self, always_apply: bool = False, p: float = 0.5):
        self.always_apply = always_apply
        self.p = p

    def __call__(self, y: np.ndarray):
        if self.always_apply:
            return self.apply(y)
        else:
            if np.random.rand() < self.p:
                return self.apply(y)
            else:
                return y

    def apply(self, y: np.ndarray):
        raise NotImplementedError


class Compose:
    def __init__(self, transforms: list):
        self.transforms = transforms

    def __call__(self, y: np.ndarray):
        for trns in self.transforms:
            y = trns(y)
        return y


class OneOf:
    def __init__(self, transforms: list):
        self.transforms = transforms

    def __call__(self, y: np.ndarray):
        n_trns = len(self.transforms)
        trns_idx = np.random.choice(n_trns)
        trns = self.transforms[trns_idx]
        return trns(y)

In [10]:
class GaussianNoise(SignalTransform):
    def __init__(
        self, always_apply: bool = False, 
        p: float = 0.5, max_noise_amplitude: float = 0.20, **kwargs
    ):
        super().__init__(always_apply, p)
        self.noise_amplitude = (0.0, max_noise_amplitude)

    def apply(self, x: np.ndarray, **params):
        noise_amplitude = np.random.uniform(*self.noise_amplitude)
        noise = np.random.randn(*x.shape)  # shape (L, N)
        augmented = (x + noise * noise_amplitude).astype(x.dtype)
        return augmented

In [11]:
class PinkNoiseSNR(SignalTransform):
    def __init__(self, always_apply=False, p=0.5, min_snr=5.0, max_snr=20.0, **kwargs):
        super().__init__(always_apply, p)
        self.min_snr = min_snr
        self.max_snr = max_snr

    def apply(self, y: np.ndarray, **params):
        snr = np.random.uniform(self.min_snr, self.max_snr)
        a_signal = np.sqrt((y ** 2).max(axis=0))  # shape: (N,)
        a_noise = a_signal / (10 ** (snr / 20))   # shape: (N,)
        pink_noise = np.stack([cn.powerlaw_psd_gaussian(1, len(y)) for _ in range(y.shape[1])], axis=1)
        a_pink = np.sqrt((pink_noise ** 2).max(axis=0))  # shape: (N,)
        pink_noise_normalized = pink_noise * (a_noise / a_pink)
        augmented = (y + pink_noise_normalized).astype(y.dtype)
        return augmented

In [12]:
class TimeStretch(SignalTransform):
    def __init__(self, max_rate=1.5, min_rate=0.5, always_apply=False, p=0.5):
        super().__init__(always_apply, p)
        self.max_rate = max_rate
        self.min_rate = min_rate
        self.always_apply = always_apply
        self.p = p

    def apply(self, x: np.ndarray):
        """
        Stretch a 1D or 2D array in time using linear interpolation.
        - x: np.ndarray of shape (L,) or (L, N)
        - rate: float, e.g., 1.2 for 20% longer, 0.8 for 20% shorter
        """
        rate = np.random.uniform(self.min_rate, self.max_rate)
        L = x.shape[0]
        L_new = int(L / rate)
        orig_idx = np.linspace(0, L - 1, num=L)
        new_idx = np.linspace(0, L - 1, num=L_new)

        if x.ndim == 1:
            return np.interp(new_idx, orig_idx, x)
        elif x.ndim == 2:
            return np.stack([
                np.interp(new_idx, orig_idx, x[:, i]) for i in range(x.shape[1])
            ], axis=1)
        else:
            raise ValueError("Only 1D or 2D arrays are supported.")


In [13]:
class TimeShift(SignalTransform):
    def __init__(self, always_apply=False, p=0.5, max_shift_pct=0.25, padding_mode="replace"):
        super().__init__(always_apply, p)
        
        assert 0 <= max_shift_pct <= 1.0, "`max_shift_pct` must be between 0 and 1"
        assert padding_mode in ["replace", "zero"], "`padding_mode` must be either 'replace' or 'zero'"
        
        self.max_shift_pct = max_shift_pct
        self.padding_mode = padding_mode

    def apply(self, x: np.ndarray, **params):
        assert x.ndim == 2, "`x` must be a 2D array with shape (L, N)"
        
        L = x.shape[0]
        max_shift = int(L * self.max_shift_pct)
        shift = np.random.randint(-max_shift, max_shift + 1)

        # Roll along time axis (axis=0)
        augmented = np.roll(x, shift, axis=0)

        if self.padding_mode == "zero":
            if shift > 0:
                augmented[:shift, :] = 0
            elif shift < 0:
                augmented[shift:, :] = 0

        return augmented

In [14]:
class ButterFilter(SignalTransform):
    def __init__(self, always_apply=False, p=0.5, cutoff_freq=20, sampling_rate=200, order=4):
        super().__init__(always_apply, p)
        
        self.cutoff_freq = cutoff_freq
        self.sampling_rate = sampling_rate
        self.order = order

    def apply(self, x: np.ndarray, **params):
        assert x.ndim == 2, "`x` must be a 2D array with shape (L, N)"
        return self.butter_lowpass_filter(x)

    def butter_lowpass_filter(self, data: np.ndarray):
        nyquist = 0.5 * self.sampling_rate
        normal_cutoff = self.cutoff_freq / nyquist
        b, a = butter(self.order, normal_cutoff, btype='low', analog=False)
        filtered_data = lfilter(b, a, data, axis=0)  # filter each channel independently
        return filtered_data


In [15]:
class CustomDataset(Dataset):
    def __init__(
        self, config,df: pd.DataFrame, X: list[np.ndarray], y = None,
        transforms = None, mode: str = "train"
    ): 
        self.config = config
        self.df = df
        self.X = X
        self.y = y
        self.indexes = self.df.sequence_id.unique()
        self.alpha = 0.3
        self.mode = mode
        self.transforms = transforms
        self.num_classes = 18
        
    def __len__(self):
        """
        Length of dataset.
        """
        return len(self.indexes)
        
    def __getitem__(self, i):
        """
        Get one item.
        """
        sequence_id = self.indexes[i]
        p = np.random.rand()
        if p <= 0.0:
            if self.mode == "train":
                X, y = self.get_data(i)
                X = self.transforms(X)
                X = random_padding(X)
                output = {
                    "X": torch.tensor(X, dtype=torch.float32),
                    "y": torch.tensor(y, dtype=torch.float32),
                }
            elif self.mode == "test":
                X = self.get_data(i)
                X = random_padding(X)
                output = {
                    "X": torch.tensor(X, dtype=torch.float32)
                }
            elif self.mode == "val":
                X ,y= self.get_data(i)
                X = random_padding(X)
                output = {
                    "X": torch.tensor(X, dtype=torch.float32),
                    "y": torch.tensor(y, dtype=torch.float32)
                }
        else:
            if self.mode == "train":
                lam = np.random.beta(self.alpha, self.alpha)
                j = np.random.randint(0, len(self.indexes))
                X1, y1 = self.get_data(i)
                X2, y2 = self.get_data(j)
                X1, X2 = random_padding(X1), random_padding(X2) 
                X = lam * X1 + (1 - lam) * X2
                y = lam * y1 + (1 - lam) * y2
                output = {
                    "X": torch.tensor(X, dtype=torch.float32),
                    "y": torch.tensor(y, dtype=torch.float32),
                }
            elif self.mode == "test":
                X = self.get_data(i)
                X = random_padding(X)
                output = {
                    "X": torch.tensor(X, dtype=torch.float32)
                }
            elif self.mode == "val":
                X ,y= self.get_data(i)
                X = random_padding(X)
                output = {
                    "X": torch.tensor(X, dtype=torch.float32),
                    "y": torch.tensor(y, dtype=torch.float32)
                }
                  
        return output

    def get_data(self, index):
        X = self.X[index]
        if self.mode in ["train","val"]:
            y = self.y[index]
            y = np.eye(self.num_classes, dtype=int)[y]
            return X, y
        elif self.mode == "test":
            return X
        
        

In [16]:
transforms = Compose([
    OneOf([
        GaussianNoise(p=0.5, max_noise_amplitude=0.05),
        PinkNoiseSNR(p=0.5, min_snr=4.0, max_snr=20.0),
        ButterFilter(p=0.5)
    ]),
    TimeStretch(p=0.25),
    TimeShift(p=0.25)

])

In [17]:
# train_dataset = CustomDataset(config,train, train_X,  train_y, transforms, mode="train")
# val_dataset = CustomDataset(config,val, val_X,  val_y, mode="val")
test_dataset =  CustomDataset(config,df = df_test,X = test_X,mode="test")


In [18]:
# x = train_dataset.__getitem__(0)
# x['X'].sum(axis = 0)

In [19]:
#(batch_size, seq_len, 7)
class lstm(nn.Module):
    def __init__(self,input_dim,num_layers):
        super().__init__()
        self.lstm = nn.LSTM(input_size=input_dim, hidden_size=128, num_layers=num_layers, batch_first=True,bidirectional=True)

    def forward(self,x):
        out, _ = self.lstm(x)
        return out

class conv(nn.Module):
    def __init__(self,input_channels,output_channels, kernel_size):
        super().__init__()
        self.convs= nn.Sequential(
            nn.Conv2d(input_channels, output_channels, kernel_size, bias=False),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(output_channels)
        )
    def forward(self,x):
        out = self.convs(x)
        return out      
        
class lstm_res(nn.Module):
    def __init__(self):
        super().__init__()
        self.lstm1 = lstm(7,2)
        self.conv1 = conv(1,128,5)
        self.conv2 = conv(128,64,5)
        self.conv3 = conv(1,64,1)
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.Dense = nn.Sequential(nn.Linear(64, 32),nn.ReLU(),nn.Linear(32, 18))


    def forward(self,x):
        lstm_out1 = self.lstm1(x)
        conv_out1 = self.conv1(lstm_out1.unsqueeze(1))    ##dimension add for cnn layers
        conv_out2 = self.conv2(conv_out1)
        conv_out3 = self.conv3(lstm_out1.unsqueeze(1))

        upsampled = F.interpolate(conv_out3, size=conv_out2.shape[2:], mode= 'nearest')

        out = (conv_out2 + upsampled)
        # out = conv_out2
        out = self.avg_pool(out).view(out.shape[0],-1)

        out = self.Dense(out)

        return out

In [20]:
def train_model(model, train_loader,criterion, optimizer, num_epochs=10):
    model.train()

    for epoch in range(num_epochs):
        total_loss = 0
        correct = 0
        total = 0

        for p in train_loader:
            
            inputs = p['X'].to(device)            
            targets = p['y'].to(device)      

            optimizer.zero_grad()
            outputs = model(inputs)              
            loss = loss = criterion(F.log_softmax(outputs, dim=1), targets)

            loss.backward()
            optimizer.step()

            total_loss += loss.item()

            # Accuracy
            _, predicted = torch.max(outputs.data, 1)
            total += targets.size(0)
            # print(targets)
            # print(predicted)
            _,targets = torch.max(targets,1)
            correct += (predicted == targets).sum().item()

        acc = 100 * correct / total
        print(f"Epoch [{epoch+1}/{num_epochs}] Loss: {total_loss:.4f}, Accuracy: {acc:.2f}%")



In [21]:
import torch.optim as optim
from torch.utils.data import DataLoader

# model = lstm_res()
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# model = model.to(device)


# optimizer = optim.Adam(model.parameters(), lr=0.005)   
# criterion = nn.KLDivLoss(reduction="batchmean")

# train_loader = DataLoader(train_dataset, batch_size=64, shuffle=False)
# val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False) 

# gc.collect()
# torch.cuda.empty_cache()

# train_model(model, train_loader, criterion, optimizer, num_epochs=20)

In [22]:
# import torch, os

# save_dir = "/kaggle/working"
# os.makedirs(save_dir, exist_ok=True)
# model_path = os.path.join(save_dir, "lstm_res_005_64.pth")
# torch.save(model.state_dict(), model_path)

# # optionally save optimizer + epoch as checkpoint
# checkpoint = {
#     "model_state": model.state_dict(),
#     "optimizer_state": optimizer.state_dict()
# }
# torch.save(checkpoint, os.path.join(save_dir, model_path))

In [23]:
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset


model = lstm_res().to(device)
ckpt = torch.load("/kaggle/input/lstm_latest/pytorch/default/1/lstm_res_005_64.pth", map_location=device)['model_state']
model.load_state_dict(ckpt)
model.eval()

# --- Batch prediction function (for a DataLoader) ---
def predict_imu(model, loader, device=device):
    model.eval()
    all_preds = []
    all_probs = []

    with torch.no_grad():
        for batch in loader:
            if isinstance(batch, dict):
                X = batch['X'].to(device)
            elif isinstance(batch, (list, tuple)):
                X = batch[0].to(device)
            else:
                X = batch.to(device)

            logits = model(X)
            preds = torch.argmax(logits, dim=1)  
            all_preds.append(preds.cpu())

    all_preds = torch.cat(all_preds, dim=0).numpy()
    return all_preds


In [24]:
# predict_val = predict_imu(model, val_loader, device=device)

In [25]:
from sklearn.metrics import accuracy_score

# acc = accuracy_score(val_y, predict_val)
# print("Accuracy:", acc)

In [26]:
# pd.crosstab(val_y,predict_val)
# print(np.unique(val_y))
# print(np.unique(predict_val))
from sklearn.metrics import f1_score

def ScoreMetric(ytrue, ypreds)-> tuple:
    bscore = f1_score(
        np.where(ytrue  <= 7, 1, 0),
        np.where(ypreds <= 7, 1, 0),
        zero_division = 0.0,
    )

    mscore = f1_score(
        np.where(ytrue   <= 7, ytrue, 99),
        np.where(ypreds  <= 7, ypreds, 99),
        average = "macro", 
        zero_division = 0.0,
    )

    return (0.5 * (bscore + mscore))

# ScoreMetric(val_y,predict_val)

In [27]:
# torch.save(model, "/kaggle/working/lstm_res_full.pth")

In [28]:
import pandas as pd
import numpy as np
import os
import polars as pl
import kaggle_evaluation.cmi_inference_server
from collections import Counter
cols = df_test.columns
def predict(sequence: pl.DataFrame, demographics: pl.DataFrame) -> str:
    test_X = []
    sequence = pd.DataFrame(sequence)
    sequence.columns = cols
    # print(sequence)
    sequence_ids_test = sequence.sequence_id.unique()
    
    for sequence_id in tqdm(sequence_ids_test):
        ds = sequence[sequence["sequence_id"] == sequence_id]
        X = ds[imu_cols].to_numpy(dtype=np.float32)
        # print(X.dtype)
        acc = standard_scale(X[:, 0:3])
        rot = X[:, 3:]
        X = np.concatenate([acc, rot], axis=1)
        X = np.where(np.isnan(X),0, X)  # fill NaNs
        test_X.append(X)

    test_seq = CustomDataset(config,df = sequence,X = test_X,mode="test")
    test_loader = DataLoader(test_seq, batch_size=64, shuffle=True)
    predictions = predict_imu(model,test_loader,device)
    pred = num_to_label[predictions[0]]
    return str(pred)


inference_server = kaggle_evaluation.cmi_inference_server.CMIInferenceServer(predict)
# inference_server.serve()
if os.getenv('KAGGLE_IS_COMPETITION_RERUN'):
    inference_server.serve()
else:
    inference_server.run_local_gateway(
        data_paths=(
            '/kaggle/input/cmi-detect-behavior-with-sensor-data/test.csv',
            '/kaggle/input/cmi-detect-behavior-with-sensor-data/test_demographics.csv',
        )
    )

100%|██████████| 1/1 [00:00<00:00, 582.54it/s]
100%|██████████| 1/1 [00:00<00:00, 606.03it/s]


In [29]:
if not os.getenv('KAGGLE_IS_COMPETITION_RERUN'):
    print("\nRunning manual test...")
    test_df = pd.read_csv('/kaggle/input/cmi-detect-behavior-with-sensor-data/test.csv')
    demographics = pd.read_csv('/kaggle/input/cmi-detect-behavior-with-sensor-data/test_demographics.csv')
    sample_seq_id = "SEQ_000001"
    test_seq = test_df[test_df['sequence_id'] == sample_seq_id]
    prediction = predict(pl.DataFrame(test_seq), pl.DataFrame(demographics))
    print(f"Manual prediction result for sequence_id {sample_seq_id}: {prediction}")


Running manual test...


100%|██████████| 1/1 [00:00<00:00, 571.98it/s]

Manual prediction result for sequence_id SEQ_000001: Cheek - pinch skin



