<a href="https://colab.research.google.com/github/CoreTheGreat/HBPU-Machine-Learning-Course/blob/main/ML_Chapter3_Classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 第五章：深度学习
湖北理工学院《机器学习》课程资料

作者：李辉楚吴

笔记内容概述: 前馈神经网络、全连接网络、Wi-Fi动作感知

## 处理原始Mat文件，与本实验无关

In [None]:
# Read U1_G1_N10_L_L1_D0_20200408_1_Labeled.mat file
import scipy.io as sio
import numpy as np
import pandas as pd
import os

def mat2csi(matfile):
    ''' 
    Change mat to csi
    Extract csi of first T-R link
    
    return:
    csi: CSI data of first T-R link
    csilabel: CSI label of first T-R link
    timestamp: CSI timestamp of first T-R link
    '''
    
    # Load the .mat file
    mat_data = sio.loadmat(matfile)
    
    # For example, if there's a key called 'data':
    raw_timestamp = mat_data['csi'][:,0]
    raw_csi = mat_data['csi'][:,2:32]
    raw_csilabel = mat_data['csiLabel'].reshape(-1)

    # Get indices of labels > 0
    valid_indices = raw_csilabel >= 0
    csi = np.abs(raw_csi[valid_indices]) # Take the absolute value of the CSI data
    csilabel = raw_csilabel[valid_indices].astype(int) # Extract the labels
    timestamp = raw_timestamp[valid_indices].real / 10 ** 6 # Convert the timestamp to seconds, using only the real part
    timestamp = timestamp - timestamp[0] # Normalize the timestamp

    # Change to DataFrame
    df_combined = pd.DataFrame({
        'timestamp': timestamp,
        'label': csilabel,
        **{f'Channel {i}': csi[:, i-1] for i in range(1, 31)}
    })

    # Extract filename without .mat extension
    filename = os.path.basename(matfile).split('.')[0] + '.csv'
    
    # Save combined DataFrame to a single CSV file
    df_combined.to_csv(filename, index=False)

    print(f'{filename} saved successfully.')

    return csi, csilabel, timestamp

_, _, _ = mat2csi('./Data/U1_G1_N10_L_L1_D0_20200408_1_Labeled.mat')
_, _, _ = mat2csi('./Data/U1_G1_N30_L_L1_D0_20200408_2_Labeled.mat')
_, _, _ = mat2csi('./Data/U1_G2_N10_L_L1_D0_20200408_1_Labeled.mat')
_, _, _ = mat2csi('./Data/U1_G2_N30_L_L1_D0_20200408_2_Labeled.mat')
_, _, _ = mat2csi('./Data/U1_G3_N10_L_L1_D0_20200408_1_Labeled.mat')
_, _, _ = mat2csi('./Data/U1_G3_N30_L_L1_D0_20200408_2_Labeled.mat')

## 数据准备


载入csv数据

In [None]:
import pandas as pd

# Define training and testing files
training_files = [
    './Data/U1_G1_N30_L_L1_D0_20200408_2_Labeled.csv',
    './Data/U1_G2_N30_L_L1_D0_20200408_2_Labeled.csv',
    './Data/U1_G3_N30_L_L1_D0_20200408_2_Labeled.csv']

testing_files = [
    './Data/U1_G1_N10_L_L1_D0_20200408_1_Labeled.csv',
    './Data/U1_G2_N10_L_L1_D0_20200408_1_Labeled.csv',
    './Data/U1_G3_N10_L_L1_D0_20200408_1_Labeled.csv'
]

# Function to read and process CSV files
def read_csv_file(file_path):
    print(file_path)
    df = pd.read_csv(file_path)
    csi = df.iloc[:, 2:].values  # All columns except 'timestamp' and 'label'
    label = df['label'].values # 0: static, 1: up, 2: down, 3: left, 4: right
    timestamp = df['timestamp'].values
    print(np.unique(label))
    return csi, label, timestamp

def segment_signals(csi, label, timestamp):
    segments = [] # Store segments
    segment_label = label[0] # Initialize segment label
    segment_start = 0 # Initialize segment start index

    for i in range(len(label)): # Iterate through all labels
        if label[i] != segment_label: # If the label is different from the current segment label
            segments.append((csi[segment_start:i-1], segment_label, timestamp[segment_start:i-1])) # Append the current segment to the segments list
            segment_start = i # Update the segment start index
            segment_label = label[i] # Update the segment label

    segments.append((csi[segment_start:], segment_label, timestamp[segment_start:])) # Append the last segment to the segments list
    return segments

# Define training and testing segments
training_segments = []
testing_segments = []

# Read and process training files
for file in training_files:
    s, y, t = read_csv_file(file)
    training_segments.extend(segment_signals(s, y, t))

# Read and process testing files
for file in testing_files:
    s, y, t = read_csv_file(file)
    testing_segments.extend(segment_signals(s, y, t))

# Print sizes of the training segments and testing segments
print(f"Training segments: {len(training_segments)}")

# Print length of all training segments
for i, (s, y, t) in enumerate(training_segments):
    print(f"Training Segment {i + 1}: {len(s)}")

# Print size of the testing segments
print(f"Testing segments: {len(testing_segments)}")

数据对齐：通过特征提取使得每一个训练集和测试集的样本长度相同

In [43]:
import numpy as np
from scipy.stats import kurtosis
from scipy.stats import skew

# Extract features of training segments
def extract_features(s):
    ''' 
    Extract features of each segment
    features include:
    - mean
    - std
    - max
    - min
    - median
    - kurtosis
    - skew
    
    Input:
    s: segment (N*30) in training_segments or testing_segments
    
    Output:
    x: 1-D vector (8*30)
    '''
    x = []
    x.extend(np.mean(s, axis=0))
    x.extend(np.std(s, axis=0))
    x.extend(np.max(s, axis=0))
    x.extend(np.min(s, axis=0))
    x.extend(np.median(s, axis=0))
    x.extend(kurtosis(s, axis=0))
    x.extend(skew(s, axis=0))

    return np.array(x)

使用extract_features创建训练集和测试集


In [60]:
import torch
from torch.utils.data import DataLoader, TensorDataset

def one_hot_collate(batch):
    data = torch.stack([item[0] for item in batch])
    labels = torch.tensor([item[1] for item in batch])
    
    one_hot_labels = torch.zeros(labels.size(0), 4)  # 4 classes
    one_hot_labels.scatter_(1, labels.unsqueeze(1), 1)
    return data, one_hot_labels

batch_size = 4

# Build training dataset
trX = [extract_features(s) for s, _, _ in training_segments] # Extract features of training segments
trX = torch.tensor(trX, dtype=torch.float32) # Convert trX to tensor
trY = [y for _, y, _ in training_segments] # Extract labels of training segments
trY = torch.tensor(trY) # Convert trY to tensor

# Build testing dataset
teX = [extract_features(s) for s, _, _ in testing_segments] # Extract features of testing segments
teX = torch.tensor(teX, dtype=torch.float32) # Convert teX to tensor
teY = [y for _, y, _ in testing_segments] # Extract labels of testing segments
teY = torch.tensor(teY) # Convert teY to tensor

# Normalize trX and teX
# Calculate mean and standard deviation from the training data
mean = trX.mean(dim=0)
std = trX.std(dim=0)

# Normalize training data
trX = (trX - mean) / std

# Normalize testing data using training mean and std
teX = (teX - mean) / std

# Build Dataset
trDataset = TensorDataset(trX, trY) # Create training dataset
teDataset = TensorDataset(teX, teY) # Create testing dataset

# Build loader
trLoader = DataLoader(trDataset, batch_size=batch_size, shuffle=True, num_workers=0, collate_fn=one_hot_collate) # Create training dataloader
teLoader = DataLoader(teDataset, batch_size=batch_size, shuffle=False, num_workers=0, collate_fn=one_hot_collate) # Create testing dataloader

定义模型

In [65]:
import torch.nn as nn

class FNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(FNN, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu1 = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.relu2 = nn.ReLU()
        self.fc3 = nn.Linear(hidden_size, num_classes)
        self.softmax = nn.Softmax(dim=1)
    
    def forward(self, x):
        x = self.fc1(x)
        x = self.relu1(x)
        x = self.fc2(x)
        x = self.relu2(x)
        x = self.fc3(x)
        out = self.softmax(x)
        return out

使用Adam作为Optimizor训练模型

In [None]:
# Define the model parameters
hidden_size = 10

# Instantiate the model
input_size = trX.shape[1]
num_classes = 4 # 3 movements and static
model = FNN(input_size, hidden_size, num_classes)
print(model)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())

# Lists to store losses
train_losses = []
te_losses = []

# Number of epochs
num_epochs = 200

for epoch in range(num_epochs):
    model.train()
    batch_losses = []
    
    for batch_x, batch_y in trLoader:
        # Forward pass
        outputs = model(batch_x)
        loss = criterion(outputs, batch_y)
        
        # Backward pass and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        batch_losses.append(loss.item())
    
    # Calculate average training loss for this epoch
    avg_train_loss = sum(batch_losses) / len(batch_losses)
    train_losses.append(avg_train_loss)
    
    # Evaluate on cross-validation set
    model.eval()
    te_batch_losses = []
    with torch.no_grad():
        for te_x, te_y in teLoader:
            te_outputs = model(te_x)
            te_loss = criterion(te_outputs, te_y)
            te_batch_losses.append(te_loss.item())
    
    avg_te_loss = sum(te_batch_losses) / len(te_batch_losses)
    te_losses.append(avg_te_loss)
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_train_loss:.4f}, CV Loss: {avg_te_loss:.4f}')

计算精度与学习曲线

In [None]:
import matplotlib.pyplot as plt

# Calculate and print accuracies for training and cross-validation sets
model.eval()
with torch.no_grad():
    # Training set accuracy
    tr_correct = 0
    tr_total = 0
    for images, labels in trLoader:
        outputs = model(images)
        _, predicted = torch.max(outputs, 1)
        _, true_labels = torch.max(labels, 1)
        tr_total += labels.size(0)
        tr_correct += (predicted == true_labels).sum().item()
    
    tr_accuracy = 100 * tr_correct / tr_total
    
    # test set accuracy
    te_correct = 0
    te_total = 0
    for images, labels in teLoader:
        outputs = model(images)
        _, predicted = torch.max(outputs, 1)
        _, true_labels = torch.max(labels, 1)
        te_total += labels.size(0)
        te_correct += (predicted == true_labels).sum().item()
    
    te_accuracy = 100 * te_correct / te_total

print(f'Accuracy on training set: {tr_accuracy:.2f}%')
print(f'Accuracy on cross-validation set: {te_accuracy:.2f}%')

# Plot training and cross-validation losses
plt.figure(figsize=(10, 5))
plt.plot(range(1, num_epochs+1), train_losses, label='Training Loss')
plt.plot(range(1, num_epochs+1), te_losses, label='Testing Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()