数据加载

In [None]:
import numpy as np
import os
def load_data(root):
    path_list = [path for path in os.listdir(root)]
    data = []
    label = []
    for data_path in path_list:
        data_org = np.load(os.path.join(root, data_path))
        if data_path[1] == '3':  # 1: hard fall 2: soft fall 3: non-fall
            lb = 0
        else:
            lb = 1
        data.append(data_org)
        label.append(lb)
    return np.array(data),np.array(label)

if __name__=='__main__':
    os.chdir('/tmp/FYP/12_DTweighted')

    ROOT = '/tmp/FYP/12_DTweighted/'
    train_path = ROOT + 'train/'
    test_path = ROOT + 'test/'
    trainX,trainy = load_data(train_path)
    testX,testy = load_data(test_path)

    print(load_data(train_path))
    print(load_data(test_path))
    
    print("data loaded")

分析数据维度：二维数组（128,64），8192个数据点


数据文件命名:


1.data_path[1] 代表hard, soft, non-fall


2.data_path[2] 和 data_path[3]两位代表28个属性


3.数据集里面Hard:1-9, soft:10-14, non-fall:15-28

In [None]:
import numpy as np
import os
data_example = np.load('train_new/4101010001210420_1_061_DTweighted-01.npy')

out_file_path = 'data.txt'
with open(out_file_path,'w') as f:
    f.write(np.array2string(data_example, threshold=np.inf))

print("Printed!")
print("Array Shape:",data_example.shape)
print("Data Shape:",data_example.dtype)

with open(out_file_path,'r') as file:
    content = file.read()
    print(content)


取一个示例数据，做出Doppler-Time map

In [None]:
import matplotlib.pyplot as plt

# 加载.npy文件
data_train_example = np.load('train_new/4101010001210420_1_061_DTweighted-01.npy')  # 使用实际路径替换

# 设置时间轴，假设每十个数据点代表1秒，总共有64个数据点
time_ticks = np.linspace(0, 6.4, 64)  # 64个数据点，最大值为6.4秒

# 设置频率轴，从-640 Hz到+640 Hz，总共128个数据点
frequency_ticks = np.linspace(-640, 640, 128)

# 创建图像并设置大小
plt.figure(figsize=(12, 10))

# 绘制Doppler-time map
plt.imshow(data_train_example, aspect='auto', cmap='viridis', interpolation='none',
           extent=[time_ticks.min(), time_ticks.max(), frequency_ticks.min(), frequency_ticks.max()])

# 设置颜色条和标题
plt.colorbar(label='Intensity')
plt.title('Doppler-Time Map')
plt.xlabel('Time (seconds)')
plt.ylabel('Frequency (Hz)')

# 调整刻度密度
plt.xticks(np.arange(0, time_ticks.max(), 0.5))
plt.yticks(np.arange(frequency_ticks.min(), frequency_ticks.max()+1, 128))

# 保存图像，使用较高的dpi值
plt.savefig('doppler_time_map_high_res.png', dpi=300)  # 使用高分辨率保存

plt.show()

实验目的：对（128,64）数据进行数据增强

实验方法：Stable Diffusion

之后跑分类模型

Stable Diffusion架构

In [None]:
import os
import numpy as np
import torch
from torch.utils.data import DataLoader, Dataset
from torch import nn, optim
from torch.nn.functional import relu
import time
import matplotlib.pyplot as plt


# 检查CUDA是否可用，选择正确的设备
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# 定义U-Net模型
class UNet(nn.Module):
    def __init__(self):
        super(UNet, self).__init__()
        self.time_embedding = nn.Linear(1, 16)
        self.down1 = nn.Sequential(nn.Conv2d(1 + 16, 64, 3, padding=1), nn.ReLU(), nn.BatchNorm2d(64))
        self.down2 = nn.Sequential(nn.Conv2d(64, 128, 3, stride=2, padding=1), nn.ReLU(), nn.BatchNorm2d(128))
        self.down3 = nn.Sequential(nn.Conv2d(128, 256, 3, stride=2, padding=1), nn.ReLU(), nn.BatchNorm2d(256))
        self.up2 = nn.Sequential(nn.ConvTranspose2d(256, 128, 4, stride=2, padding=1), nn.ReLU(), nn.BatchNorm2d(128))
        self.up3 = nn.Sequential(nn.ConvTranspose2d(128, 64, 4, stride=2, padding=1), nn.ReLU(), nn.BatchNorm2d(64))
        self.final = nn.Conv2d(64, 1, 3, padding=1)

    def forward(self, x, t):
        t_emb = relu(self.time_embedding(t))
        t_emb = t_emb.unsqueeze(-1).unsqueeze(-1).expand(-1, -1, x.shape[2], x.shape[3])
        x = torch.cat([x, t_emb], dim=1)
        x1 = self.down1(x)
        x2 = self.down2(x1)
        x3 = self.down3(x2)
        x = self.up2(x3)
        x = self.up3(x + x2)
        x = self.final(x + x1)
        return x

# 自定义数据集
class ImageDataset(Dataset):
    def __init__(self, root_dir):
        self.root_dir = root_dir
        self.files = [os.path.join(root_dir, f) for f in os.listdir(root_dir) if f.endswith('.npy')]
        if not self.files:
            raise RuntimeError("No data files found in specified directory.")

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        img = np.load(self.files[idx])
        img = torch.tensor(img, dtype=torch.float32).unsqueeze(0)  # Add channel dimension
        if img.max() == img.min():  # Check for constant image
            img = torch.zeros_like(img)  # Use a zero tensor if the image is constant
        else:
            img = (img - img.min()) / (img.max() - img.min())  # Normalize to [0, 1]
        return img, os.path.basename(self.files[idx])

# 扩散和逆扩散步骤
# 扩散步骤
def diffusion_step(x, beta):
    noise = torch.randn_like(x).to(device)
    return torch.sqrt(torch.clamp(1 - beta, min=1e-8)) * x + torch.sqrt(torch.clamp(beta, min=1e-8)) * noise

def reverse_diffusion_step(x_t, t, model, beta):
    pred_noise = model(x_t, t)
    return (x_t - torch.sqrt(beta) * pred_noise) / torch.sqrt(1 - beta)

# 训练函数
def train(model, data_loader, epochs=10, beta_schedule=np.linspace(0.01, 0.1, 1000)):
    model.train()
    name = torch.cuda.get_device_name()
    print('Using device '+ name + ' to train the model.')
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    loss_fn = nn.MSELoss()
    losses = []
    for epoch in range(epochs):
        start = time.time()
        epoch_loss = 0
        for batch, _ in data_loader:
            x = batch.to(device)
            t = torch.rand(x.size(0), 1).to(device)
            beta = torch.tensor(beta_schedule[np.random.randint(0, len(beta_schedule))], device=device)
            x_t = diffusion_step(x, beta)
            pred_noise = model(x_t, t)
            true_noise = x - diffusion_step(x, beta)
            loss = loss_fn(pred_noise, true_noise)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
        avg_loss = epoch_loss / len(data_loader)
        losses.append(avg_loss)
        elapsed = (time.time()-start) / 60
        print('Training epoch={} \t cost_time={:.3f} min \t loss={:.6f} '.format(epoch+1, elapsed, avg_loss))
    return losses

# 采样函数和保存
def generate_and_save_samples(model, data_loader, output_dir='train_new', beta_schedule=np.linspace(0.01, 0.1, 1000)):
    model.eval()
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    
    timesteps = 1000
    with torch.no_grad():
        for batch, filenames in data_loader:
            for x, filename in zip(batch, filenames):
                x = x.unsqueeze(0).to(device)
                x_t = torch.randn_like(x)
                for t in reversed(range(timesteps)):
                    time_step = torch.tensor([[t / timesteps]], device=device)
                    beta = torch.tensor(beta_schedule[t], device=device)
                    x_t = reverse_diffusion_step(x_t, time_step, model, beta)
                sample_np = x_t.cpu().numpy().squeeze()
                variant_filename = os.path.basename(filename).replace('.npy', '-01.npy')
                np.save(os.path.join(output_dir, variant_filename), sample_np.astype(np.float64))

# 实例化模型和数据加载
model = UNet().to(device)
dataset = ImageDataset('/tmp/FYP_Projects/12_DTweighted/train')
dataloader = DataLoader(dataset, batch_size=50, shuffle=True,num_workers=6)

print("Dataset loaded!")
# 训练模型
losses = train(model, dataloader)
torch.save(model.state_dict(),'/tmp/FYP_Projects/diffusion_model.pt')
print("Model saved!")

plt.figure(figsize=(12,6))
plt.plot(losses, label='Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss per Epoch')
plt.legend()
plt.show()

# # 生成样本并保存
# generate_and_save_samples(model, dataloader)
# print("Task finished!")


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class LKCNN(nn.Module):
    def __init__(self):
        super(LKCNN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=(7, 1), stride=1)
        self.pool1 = nn.MaxPool2d(kernel_size=(4, 1), stride=(4, 1))
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=(1, 9), stride=1)
        self.pool2 = nn.MaxPool2d(kernel_size=(1, 4), stride=(1, 4))

        self.fc1 = nn.Linear(64 * 30 * 14, 1024)  # 此处的乘数需要根据实际尺寸计算得到

        self.fc2 = nn.Linear(1024, 3)

    def forward(self, x):

        x = self.pool1(F.relu(self.conv1(x)))
        x = self.pool2(F.relu(self.conv2(x)))
        x = x.view(x.size(0), -1)  # 动态计算展平后的尺寸
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

model = LKCNN()
print(model)

# 假设有一个随机数据批次和标签
inputs = torch.randn(1, 1, 128, 64)  # batch_size=1, channels=1, height=128, width=64
labels = torch.randint(0, 3, (1,))  # batch_size=1, classes=10

# 前向传播
outputs = model(inputs)
loss_function = nn.CrossEntropyLoss()
loss = loss_function(outputs, labels)

print(loss)
