<a href="https://colab.research.google.com/github/arushi-lu/deep_learning/blob/main/DefCNN_Wisdm.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import pandas as pd
import torch
from torch.utils.data import DataLoader, Dataset, random_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split
from scipy import stats

In [None]:
# Open and read the raw data file
file = open('WISDM_ar_v1.1_raw.txt')
lines = file.readlines()

processedList = []

# Process each line in the raw data file
for i, line in enumerate(lines):
    try:
        # Split each line into components
        line = line.split(',')
        # Extract and clean the z-axis value
        last = line[5].split(';')[0].strip()
        if last == '':
            break
        # Append the cleaned data to the list
        temp = [line[0], line[1], line[2], line[3], line[4], last]
        processedList.append(temp)
    except:
        print('Error at line number: ', i)

# Define column names and create a DataFrame
columns = ['user', 'activity', 'time', 'x', 'y', 'z']
data = pd.DataFrame(data=processedList, columns=columns)

Error at line number:  281873
Error at line number:  281874
Error at line number:  281875


In [None]:
# Convert x, y, z columns to float
data['x'] = data['x'].astype('float')
data['y'] = data['y'].astype('float')
data['z'] = data['z'].astype('float')

# Sampling frequency (not used here but could be used later)
Fs = 20

# Get the list of activities
activities = data['activity'].value_counts().index
print(activities)

Index(['Walking', 'Jogging', 'Upstairs', 'Downstairs', 'Sitting', 'Standing'], dtype='object', name='activity')


In [None]:
# Drop user and time columns for further processing
df = data.drop(['user', 'time'], axis=1).copy()
print(df.head())

  activity         x          y         z
0  Jogging -0.694638  12.680544  0.503953
1  Jogging  5.012288  11.264028  0.953424
2  Jogging  4.903325  10.882658 -0.081722
3  Jogging -0.612916  18.496431  3.023717
4  Jogging -1.184970  12.108489  7.205164


In [None]:
# Balance the dataset by sampling an equal number of records for each activity
Walking = df[df['activity'] == 'Walking'].head(3555).copy()
Jogging = df[df['activity'] == 'Jogging'].head(3555).copy()
Upstairs = df[df['activity'] == 'Upstairs'].head(3555).copy()
Downstairs = df[df['activity'] == 'Downstairs'].head(3555).copy()
Sitting = df[df['activity'] == 'Sitting'].head(3555).copy()
Standing = df[df['activity'] == 'Standing'].copy()

balanced_data = pd.DataFrame()
balanced_data = pd.concat([balanced_data, Walking, Jogging, Upstairs, Downstairs, Sitting, Standing])

# Display the shape and value counts of the balanced dataset
print(balanced_data.shape)

(21330, 4)


In [None]:
label = LabelEncoder()
balanced_data['label'] = label.fit_transform(balanced_data['activity'])
print(balanced_data.head())

    activity         x          y         z  label
597  Walking  0.844462   8.008764  2.792171      5
598  Walking  1.116869   8.621680  3.786457      5
599  Walking -0.503953  16.657684  1.307553      5
600  Walking  4.794363  10.760075 -1.184970      5
601  Walking -0.040861   9.234595 -0.694638      5


In [None]:
# Extract features and labels
X = balanced_data[['x', 'y', 'z']]
y = balanced_data['label']

# Standardize the features
scaler = StandardScaler()
X = scaler.fit_transform(X)

# Create a DataFrame for scaled features
scaled_X = pd.DataFrame(data=X, columns=['x', 'y', 'z'])
scaled_X['label'] = y.values
print(scaled_X.head())

          x         y         z  label
0  0.000503 -0.099190  0.337933      5
1  0.073590  0.020386  0.633446      5
2 -0.361275  1.588160 -0.103312      5
3  1.060258  0.437573 -0.844119      5
4 -0.237028  0.139962 -0.698386      5


In [None]:
# Define frame and hop sizes
Fs = 20
frame_size = Fs * 4  # 80 samples
hop_size = Fs * 2    # 40 samples

def get_frames(df, frame_size, hop_size):
    N_FEATURES = 3

    frames = []
    labels = []
    for i in range(0, len(df) - frame_size, hop_size):
        x = df['x'].values[i: i + frame_size]
        y = df['y'].values[i: i + frame_size]
        z = df['z'].values[i: i + frame_size]

        # Retrieve the most frequent label in the frame
        label = stats.mode(df['label'][i: i + frame_size])[0]
        # Convert the LabelEncoder object to a Series

        frames.append([x, y, z])
        labels.append(label)

    # Reshape frames to be compatible with CNN input
    frames = np.asarray(frames).reshape(-1, frame_size, N_FEATURES)
    labels = np.asarray(labels)

    return frames, labels

# Segment the data into frames
X, y = get_frames(scaled_X, frame_size, hop_size)
print(X.shape, y.shape)

(532, 80, 3) (532,)


In [None]:
import torch
from torch.utils.data import DataLoader, Dataset, random_split

# Custom Dataset class for PyTorch
class WISDMDataset(Dataset):
    def __init__(self, data, labels, transform=None):
        self.data = data
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx]
        label = self.labels[idx]
        if self.transform:
            sample = self.transform(sample)
        return sample, label

# Convert to PyTorch Datasets
X_tensor = torch.tensor(X, dtype=torch.float32)
y_tensor = torch.tensor(y, dtype=torch.long)
dataset = WISDMDataset(X_tensor, y_tensor)

# Split into training and testing datasets
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

In [None]:
print("Training dataset size:", len(train_dataset))
print("Testing dataset size:", len(test_dataset))

Training dataset size: 425
Testing dataset size: 107


In [None]:
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

In [None]:
'''Deformable Convolutional Network (DCN): 可变形网络'''
def get_min_value(ks, padsize):
    ks0 = ks[0]
    ks1 = ks[1]
    pd0 = padsize[0]
    pd1 = padsize[1]
    while ks0 - 2 > 1:
        if pd0 > 0:
            ks0 -= 2
            pd0 -= 1
        else:
            break
    while ks1 - 2 > 1:
        if pd1 > 0:
            ks1 -= 2
            pd1 -= 1
        else:
            break
    return (ks0, ks1), (pd0, pd1)

class DeformConv2d(nn.Module):
    def __init__(self, inc, outc, kernel_size=3, stride=1, padding=1, bias=None, modulation=False, minit=0, pinit=0):

        super(DeformConv2d, self).__init__()
        self.kernel_size = (kernel_size, kernel_size) if type(kernel_size) == int else kernel_size
        self.padding = (padding, padding) if type(padding) == int else padding
        self.stride = (stride, stride) if type(stride) == int else stride

        self.zero_padding = nn.ZeroPad2d((self.padding[1], self.padding[1], self.padding[0], self.padding[0]))
        self.conv = nn.Conv2d(inc, outc, kernel_size=kernel_size, stride=kernel_size, bias=bias)

        ks, pd = get_min_value(self.kernel_size, self.padding)
        self.p_conv = nn.Conv2d(inc, 2*self.kernel_size[0]*self.kernel_size[1], kernel_size=ks, padding=pd, stride=self.stride)
        if pinit:
            # nn.init.normal_(self.p_conv.weight, mean=0, std=pinit)
            nn.init.constant_(self.p_conv.weight, pinit)
        else:
            nn.init.constant_(self.p_conv.weight, 0)

        self.p_conv.register_backward_hook(self._set_lr)

        self.modulation = modulation
        if modulation:
            self.m_conv = nn.Conv2d(inc, self.kernel_size[0]*self.kernel_size[1], kernel_size=ks, padding=pd, stride=self.stride)
            nn.init.constant_(self.m_conv.weight, minit)
            self.m_conv.register_backward_hook(self._set_lr)

    @staticmethod
    def _set_lr(module, grad_input, grad_output):
        grad_input = (grad_input[i] * 0.1 for i in range(len(grad_input)))
        grad_output = (grad_output[i] * 0.1 for i in range(len(grad_output)))

    def forward(self, x):   #(256, 1, 128, 9)
        offset = self.p_conv(x)   #(256, 18, 128, 9)
        if self.modulation:
            m = torch.sigmoid(self.m_conv(x))

        dtype = offset.data.type()   #float
        ks = self.kernel_size  #(3, 3)
        N = offset.size(1) // 2  # 9

        if self.padding:
            x = self.zero_padding(x)

        # (b, 2N, h, w)
        p = self._get_p(offset, dtype) #(256, 18, 6, 512)
        p = p.contiguous().permute(0, 2, 3, 1) #(256, 6, 512, 18)

        q_lt = p.detach().floor() #向下取整
        q_rb = q_lt + 1

        q_lt = torch.cat([torch.clamp(q_lt[..., :N], 0, x.size(2)-1), torch.clamp(q_lt[..., N:], 0, x.size(3)-1)], dim=-1).long()
        q_rb = torch.cat([torch.clamp(q_rb[..., :N], 0, x.size(2)-1), torch.clamp(q_rb[..., N:], 0, x.size(3)-1)], dim=-1).long()
        q_lb = torch.cat([q_lt[..., :N], q_rb[..., N:]], dim=-1)
        q_rt = torch.cat([q_rb[..., :N], q_lt[..., N:]], dim=-1)

        # clip p
        p = torch.cat([torch.clamp(p[..., :N], 0, x.size(2)-1), torch.clamp(p[..., N:], 0, x.size(3)-1)], dim=-1)

        # bilinear kernel (b, h, w, N)
        g_lt = (1 + (q_lt[..., :N].type_as(p) - p[..., :N])) * (1 + (q_lt[..., N:].type_as(p) - p[..., N:]))
        g_rb = (1 - (q_rb[..., :N].type_as(p) - p[..., :N])) * (1 - (q_rb[..., N:].type_as(p) - p[..., N:]))
        g_lb = (1 + (q_lb[..., :N].type_as(p) - p[..., :N])) * (1 - (q_lb[..., N:].type_as(p) - p[..., N:]))
        g_rt = (1 - (q_rt[..., :N].type_as(p) - p[..., :N])) * (1 + (q_rt[..., N:].type_as(p) - p[..., N:]))

        # (b, c, h, w, N)
        x_q_lt = self._get_x_q(x, q_lt, N)
        x_q_rb = self._get_x_q(x, q_rb, N)
        x_q_lb = self._get_x_q(x, q_lb, N)
        x_q_rt = self._get_x_q(x, q_rt, N)

        # (b, c, h, w, N)
        x_offset = g_lt.unsqueeze(dim=1) * x_q_lt + \
                   g_rb.unsqueeze(dim=1) * x_q_rb + \
                   g_lb.unsqueeze(dim=1) * x_q_lb + \
                   g_rt.unsqueeze(dim=1) * x_q_rt

        # modulation
        if self.modulation:
            m = m.contiguous().permute(0, 2, 3, 1)
            m = m.unsqueeze(dim=1)
            m = torch.cat([m for _ in range(x_offset.size(1))], dim=1)
            x_offset *= m

        x_offset = self._reshape_x_offset(x_offset, ks)
        out = self.conv(x_offset)

        return out

    def _get_p_n(self, N, dtype):
        p_n_x, p_n_y = torch.meshgrid(
            torch.arange(-(self.kernel_size[1]-1)//2, (self.kernel_size[1]-1)//2+1),
            torch.arange(-(self.kernel_size[0]-1)//2, (self.kernel_size[0]-1)//2+1))
        # (2N, 1)
        p_n = torch.cat([torch.flatten(p_n_x), torch.flatten(p_n_y)], 0)
        p_n = p_n.view(1, 2*N, 1, 1).type(dtype)

        return p_n

    def _get_p_0(self, h, w, N, dtype):
        p_0_x, p_0_y = torch.meshgrid(
            torch.arange(1, h*self.stride[0]+1, self.stride[0]),
            torch.arange(1, w*self.stride[1]+1, self.stride[1]))
        p_0_x = torch.flatten(p_0_x).view(1, 1, h, w).repeat(1, N, 1, 1)
        p_0_y = torch.flatten(p_0_y).view(1, 1, h, w).repeat(1, N, 1, 1)
        p_0 = torch.cat([p_0_x, p_0_y], 1).type(dtype)

        return p_0

    def _get_p(self, offset, dtype):
        N, h, w = offset.size(1)//2, offset.size(2), offset.size(3)   #(9, 128, 9)

        # (1, 2N, 1, 1)
        p_n = self._get_p_n(N, dtype) #(1, 18, 1, 1)
        # (1, 2N, h, w)
        p_0 = self._get_p_0(h, w, N, dtype) #(1, 9+9, 128, 9)
        p = p_0 + p_n + offset
        return p

    def _get_x_q(self, x, q, N):
        b, h, w, _ = q.size()
        padded_w = x.size(3)
        c = x.size(1)
        # (b, c, h*w)
        x = x.contiguous().view(b, c, -1)

        # (b, h, w, N)
        index = q[..., :N]*padded_w + q[..., N:]  # offset_x*w + offset_y
        # (b, c, h*w*N)
        index = index.contiguous().unsqueeze(dim=1).expand(-1, c, -1, -1, -1).contiguous().view(b, c, -1)

        x_offset = x.gather(dim=-1, index=index).contiguous().view(b, c, h, w, N)

        return x_offset

    @staticmethod
    def _reshape_x_offset(x_offset, ks):
        b, c, h, w, N = x_offset.size()
        x_offset = torch.cat([x_offset[..., s:s+ks[1]].contiguous().view(b, c, h, w*ks[1]) for s in range(0, N, ks[1])], dim=-1)
        x_offset = x_offset.contiguous().view(b, c, h*ks[0], w*ks[1])

        return x_offset

class DeformableConvolutionalNetwork(nn.Module):
    def __init__(self, train_shape, category):
        super(DeformableConvolutionalNetwork, self).__init__()
        '''
            train_shape: 总体训练样本的shape【DCN自适应调整卷积形状，不需要固定按模态轴进行条形卷积，因此这里没有用到train_shape来设定adapool与fc】
            category: 类别数
        '''
        self.layer = nn.Sequential(
            DeformConv2d(1, 64, 3, 2, 1, modulation=True),
            nn.BatchNorm2d(64),
            nn.ReLU(),

            DeformConv2d(64, 128, 3, 2, 1, modulation=True),
            nn.BatchNorm2d(128),
            nn.ReLU(),

            DeformConv2d(128, 256, 3, 2, 1, modulation=True),
            nn.BatchNorm2d(256),
            nn.ReLU(),

            DeformConv2d(256, 512, 3, 2, 1, modulation=True),
            nn.BatchNorm2d(512),
            nn.ReLU()
        )
        self.ada_pool = nn.AdaptiveAvgPool2d((1, 4))
        self.fc = nn.Linear(512*4, category)

    def forward(self, x):
        '''
            x.shape: [b, c, series, modal]
        '''
        x = x.permute(0, 1, 3, 2) # [b, c, modal, series]
        x = self.layer(x)
        x = self.ada_pool(x) # [b, c, 1, 4]
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

In [None]:
model = DeformableConvolutionalNetwork(train_shape=X.shape, category=len(activities))

print(model)

DeformableConvolutionalNetwork(
  (layer): Sequential(
    (0): DeformConv2d(
      (zero_padding): ZeroPad2d((1, 1, 1, 1))
      (conv): Conv2d(1, 64, kernel_size=(3, 3), stride=(3, 3), bias=False)
      (p_conv): Conv2d(1, 18, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
      (m_conv): Conv2d(1, 9, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    )
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): DeformConv2d(
      (zero_padding): ZeroPad2d((1, 1, 1, 1))
      (conv): Conv2d(64, 128, kernel_size=(3, 3), stride=(3, 3), bias=False)
      (p_conv): Conv2d(64, 18, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
      (m_conv): Conv2d(64, 9, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    )
    (4): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU()
    (6): DeformConv2d(
      (zero_padding): ZeroPad2d((1, 1, 1, 1))
      (conv): Conv2d(128, 256, kernel_s

In [None]:
# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

# Function to calculate accuracy
def accuracy(outputs, labels):
    _, predicted = torch.max(outputs, 1)
    correct = (predicted == labels).sum().item()
    return correct / labels.size(0)

In [None]:
num_epochs = 50
for epoch in range(num_epochs):
    model.train()  # Set the model to training mode
    running_loss = 0.0
    correct_predictions = 0
    total_predictions = 0

    for inputs, labels in train_loader:
        # Zero the parameter gradients
        optimizer.zero_grad()
        #print(inputs)
        # Forward + backward + optimize
        outputs = model(inputs.permute(0, 2, 1).unsqueeze(1))  # Permute input dimensions for Conv1d
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # Track the accuracy and loss
        running_loss += loss.item()
        correct_predictions += (torch.argmax(outputs, dim=1) == labels).sum().item()
        total_predictions += labels.size(0)

    # Calculate train accuracy and loss
    train_accuracy = correct_predictions / total_predictions
    avg_loss = running_loss / len(train_loader)

    # Print statistics every epoch
    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')

Epoch [1/50], Train Loss: 0.0000, Train Accuracy: 1.0000
Epoch [2/50], Train Loss: 0.0002, Train Accuracy: 1.0000
Epoch [3/50], Train Loss: 0.0001, Train Accuracy: 1.0000
Epoch [4/50], Train Loss: 0.0000, Train Accuracy: 1.0000
Epoch [5/50], Train Loss: 0.0001, Train Accuracy: 1.0000
Epoch [6/50], Train Loss: 0.0000, Train Accuracy: 1.0000
Epoch [7/50], Train Loss: 0.0000, Train Accuracy: 1.0000
Epoch [8/50], Train Loss: 0.0000, Train Accuracy: 1.0000
Epoch [9/50], Train Loss: 0.0002, Train Accuracy: 1.0000
Epoch [10/50], Train Loss: 0.0000, Train Accuracy: 1.0000
Epoch [11/50], Train Loss: 0.0000, Train Accuracy: 1.0000
Epoch [12/50], Train Loss: 0.0000, Train Accuracy: 1.0000
Epoch [13/50], Train Loss: 0.0000, Train Accuracy: 1.0000
Epoch [14/50], Train Loss: 0.0000, Train Accuracy: 1.0000
Epoch [15/50], Train Loss: 0.0008, Train Accuracy: 1.0000
Epoch [16/50], Train Loss: 0.0000, Train Accuracy: 1.0000
Epoch [17/50], Train Loss: 0.0000, Train Accuracy: 1.0000
Epoch [18/50], Train Lo

In [None]:
# Evaluation on the test set
model.eval()  # Set the model to evaluation mode
test_correct = 0
test_total = 0

with torch.no_grad():
    for inputs, labels in test_loader:
        outputs = model(inputs.permute(0, 2, 1).unsqueeze(1))
        _, predicted = torch.max(outputs, 1)
        test_correct += (predicted == labels).sum().item()
        test_total += labels.size(0)

# Calculate test accuracy
test_accuracy = test_correct / test_total
print(f'Test Accuracy: {test_accuracy:.4f}')

Test Accuracy: 0.9533
