In [1]:
# -*- coding: utf-8 -*-
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import LabelEncoder
from torch.utils.data.dataset import random_split
import numpy as np
import os,copy,csv,importlib,time,math
import math,random,shutil
import matplotlib.pyplot as plt
from PIL import Image
from scipy import signal
import pickle as pickle
from sklearn import preprocessing
from sklearn.metrics import confusion_matrix
from sklearn.metrics import roc_curve, auc
from sklearn.cluster import KMeans, MeanShift, AgglomerativeClustering
from sklearn.metrics import silhouette_score
from sklearn.preprocessing import StandardScaler
from sklearn import svm
from tqdm import tqdm
from sklearn.model_selection import GridSearchCV
from itertools import cycle
from matplotlib import cm
from scipy.stats import gaussian_kde
colors = cycle("bgrcmykbgrcmykbgrcmykbgrcmyk")

In [2]:
#########################################################################################
#set the seed for random environment
#here we set the seed to 42
#########################################################################################
def seed_everything(seed=42):
    np.random.seed(seed)
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    
#when import the module the seed is set
seed_everything()

In [3]:
batch_size = 64

## Dataset Preparation ##

In [4]:

class WaveFormDataset(Dataset):
    def __init__(self, root_dir):
        """
        root_dir: 数据集的根目录
        """
        self.samples = []
        self.labels = []

        # 自动发现数据和标签
        for label in ['noise', 'internal', 'corona']:
            data_dir = os.path.join(root_dir, label, 'WaveForm')
            for file in os.listdir(data_dir):
                file_path = os.path.join(data_dir, file)
                if file.endswith('.csv'):
                    # 读取CSV文件，跳过表头
                    data = pd.read_csv(file_path, header=None, skiprows=1, sep='\t').values
                    # 对于文件中的每一行，保存为一个独立的样本
                    for row in data:
                        self.samples.append(row.astype(np.float32))
                        self.labels.append(label)
        
        # 将文本标签转换为整数
        self.label_encoder = LabelEncoder()
        self.labels = self.label_encoder.fit_transform(self.labels)

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        # 返回单个样本和标签
        sample = self.samples[idx]
        label = self.labels[idx]
        return sample, label

# 使用示例
root_dir = '/media/mldadmin/home/s123mdg34_04/WangShengyuan/FYP/data' # 根据实际路径调整
WaveForm = WaveFormDataset(root_dir=root_dir)


In [5]:
# 假设 dataset 是你的完整数据集
dataset_size = len(WaveForm)
train_size = int(dataset_size * 0.7) # 70% 数据用于训练
val_size = int(dataset_size * 0.15) # 15% 数据用于验证
test_size = dataset_size - train_size - val_size # 剩余15%数据用于测试

train_dataset, val_dataset, test_dataset = random_split(WaveForm, [train_size, val_size, test_size])


## DataLoader 

In [10]:
train_loader = DataLoader(train_dataset, batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size, shuffle=False)

## Model

In [16]:
def relu(y_hat):
    return torch.max(torch.zeros_like(y_hat), y_hat)

def optimize_weights(param, learning_rate):
    param_new = param - learning_rate * param.grad
    param.data = param_new

def train(model, epochs, input_features, labels, loss_function, learning_rate):
    losses = []
    for epoch in range(epochs):

        # stack the input features and labels along the first dim
        x = torch.stack((input_features,), 0)
        y = torch.stack((labels))
        # apply model
        y_hat = model.forward(x)
        
        # print(x.shape)
        loss = loss_function(y_hat, y)
        losses.append(loss.detach().item())
        # zero_grad
        model.zero_grad()
        # perform backward
        loss.backward()

        # optimize weights
        for param in model.parameters():
            optimize_weights(param, learning_rate)

    return losses

In [12]:

class Linear(torch.nn.Module):
    def __init__(self,num_in_features, 
                    num_out_features):
        super().__init__()
        # use num_in_features and num_out_features 
        w = torch.FloatTensor(num_in_features, num_out_features)
        w = torch.nn.init.uniform_(w, -0.2, 0.2)
        b = torch.FloatTensor(1)
        b = torch.nn.init.uniform_(b, -0.2, 0.2)
        self.w = torch.nn.Parameter(w, requires_grad=True)
        self.b = torch.nn.Parameter(b, requires_grad=True)
        
    def forward(self, x):        
        # return self.w.T @ x + self.b
        # x: *, 2, 1
        # w: 2, 1
        return self.w.T @ x  + self.b 


In [13]:
class ThreeLayerNN(torch.nn.Module):

    def __init__(self):
        super().__init__()
        self.layer0_0 = Linear(256, 1)
        self.layer0_1 = Linear(256, 1)
        self.layer0_2 = Linear(256, 1)
        self.layer1_0 = Linear(3, 1)
        self.layer1_1 = Linear(3, 1)
        self.layer2 = Linear(2, 1)

    def forward(self, x):

        # x: *, 2, 1
        y_hat0_l0 = self.layer0_0.forward(x) 
        y_hat1_l0 = self.layer0_1.forward(x) 
        y_hat2_l0 = self.layer0_2.forward(x)

        
        # y_hat0_0: *, 1, 1
        # y_hat0_1: *, 1, 1
        z_0 = torch.relu(y_hat0_l0) 
        z_1 = torch.relu(y_hat1_l0)
        z_2 = torch.relu(y_hat2_l0)
        
        
        # z_0: *, 1, 1
        # z_1: *, 1, 1
        
        z = torch.cat([z_0, z_1, z_2], dim=-2)

        y_hat0_l1 = self.layer1_0.forward(z)
        y_hat1_l1 = self.layer1_1.forward(z)
        z_3 = torch.relu(y_hat0_l1)
        z_4 = torch.relu(y_hat1_l1)
        z1= torch.cat([z_3, z_4], dim=-2)
        
        # z: *, 2, 1
        # print(z.shape)
        y_hat = self.layer2.forward(z1)



        return y_hat

def loss_function(y_hat, y):
    return torch.mean((y_hat - y)**2)

# apply seed for random number generator before the model
torch.manual_seed(100)

model = ThreeLayerNN()
epochs = 1000
learning_rate = 0.1


In [17]:
for data, labels in train_loader:
    losses = train(model, epochs, data, labels, loss_function, learning_rate)
    print(data.shape, labels)
    break


TypeError: stack(): argument 'tensors' (position 1) must be tuple of Tensors, not Tensor