In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# 生成示例数据
np.random.seed(0)
torch.manual_seed(0)

# 生成正弦波数据加上一些噪声
time = np.arange(0, 400, 0.1)
data = np.sin(time) + 0.1 * np.random.randn(len(time))

# 制作数据集
def create_dataset(data, look_back):
    X, Y = [], []
    for i in range(len(data) - look_back):
        x = data[i:(i + look_back)]
        y = data[i + look_back]
        X.append(x)
        Y.append(y)
    return np.array(X), np.array(Y)

look_back = 20
X, Y = create_dataset(data, look_back)
X_train, Y_train = torch.FloatTensor(X[:-100]), torch.FloatTensor(Y[:-100])
X_test, Y_test = torch.FloatTensor(X[-100:]), torch.FloatTensor(Y[-100:])

# DataLoader for batch processing
train_loader = DataLoader(TensorDataset(X_train, Y_train), batch_size=32, shuffle=True)
test_loader = DataLoader(TensorDataset(X_test, Y_test), batch_size=32)


In [2]:

import torch
import torch.nn as nn
from torch.nn.utils import weight_norm


class Chomp1d(nn.Module):
    def __init__(self, chomp_size):
        super(Chomp1d, self).__init__()
        self.chomp_size = chomp_size

    def forward(self, x):
        return x[:, :, :-self.chomp_size].contiguous()


class TemporalBlock(nn.Module):
    def __init__(self, n_inputs, n_outputs, kernel_size, stride, dilation, padding, dropout=0.2):
        super(TemporalBlock, self).__init__()
        self.conv1 = weight_norm(nn.Conv1d(n_inputs, n_outputs, kernel_size,
                                           stride=stride, padding=padding, dilation=dilation))
        self.chomp1 = Chomp1d(padding)
        self.relu1 = nn.ReLU()
        self.dropout1 = nn.Dropout(dropout)

        self.conv2 = weight_norm(nn.Conv1d(n_outputs, n_outputs, kernel_size,
                                           stride=stride, padding=padding, dilation=dilation))
        self.chomp2 = Chomp1d(padding)
        self.relu2 = nn.ReLU()
        self.dropout2 = nn.Dropout(dropout)

        self.net = nn.Sequential(self.conv1, self.chomp1, self.relu1, self.dropout1,
                                 self.conv2, self.chomp2, self.relu2, self.dropout2)
        self.downsample = nn.Conv1d(n_inputs, n_outputs, 1) if n_inputs != n_outputs else None
        self.relu = nn.ReLU()
        self.init_weights()

    def init_weights(self):
        self.conv1.weight.data.normal_(0, 0.01)
        self.conv2.weight.data.normal_(0, 0.01)
        if self.downsample is not None:
            self.downsample.weight.data.normal_(0, 0.01)

    def forward(self, x):
        out = self.net(x)
        res = x if self.downsample is None else self.downsample(x)
        return self.relu(out + res)
class TemporalConvNet(nn.Module):
    def __init__(self, num_inputs, num_channels, kernel_size=2, dropout=0.2):
        super(TemporalConvNet, self).__init__()
        layers = []
        num_levels = len(num_channels)
        for i in range(num_levels):
            dilation_size = 2 ** i
            in_channels = num_inputs if i == 0 else num_channels[i-1]
            out_channels = num_channels[i]
            layers.append(
                TemporalBlock(in_channels, out_channels, kernel_size, stride=1,
                              dilation=dilation_size,
                              padding=(kernel_size-1) * dilation_size, dropout=dropout)
            )
        self.network = nn.Sequential(*layers)
        self.final_layer = nn.Linear(num_channels[-1], 1)  # 添加全连接层以确保输出是一个标量

    def forward(self, x):
        x = x.unsqueeze(1)  # 增加一个维度以适配网络
        x = self.network(x)
        x = x.squeeze(1)  # 去掉多余的维度
        x = self.final_layer(x[:, :, -1])  # 选择最后一个时间点的输出，并通过全连接层
        return x

model = TemporalConvNet(num_inputs=1, num_channels=[16, 32], kernel_size=2)




In [9]:
# 设置优化器和损失函数
optimizer = optim.Adam(model.parameters(), lr=0.01)
criterion = nn.MSELoss()

# 训练过程
def train_model(model, train_loader, criterion, optimizer, epochs):
    model.train()
    for epoch in range(epochs):
        for x_batch, y_batch in train_loader:
            optimizer.zero_grad()
            output = model(x_batch)
            loss = criterion(output, y_batch)
            loss.backward()
            optimizer.step()
        print(f"Epoch {epoch+1}, Loss: {loss.item()}")

train_model(model, train_loader, criterion, optimizer, epochs=20)


Epoch 1, Loss: 0.6239224076271057
Epoch 2, Loss: 0.44543954730033875
Epoch 3, Loss: 0.8427554368972778
Epoch 4, Loss: 0.56889808177948
Epoch 5, Loss: 0.371062308549881
Epoch 6, Loss: 0.5301308631896973
Epoch 7, Loss: 0.356380432844162
Epoch 8, Loss: 0.5729440450668335
Epoch 9, Loss: 0.5049682259559631
Epoch 10, Loss: 0.5576505661010742
Epoch 11, Loss: 0.7468507289886475
Epoch 12, Loss: 0.5814102292060852
Epoch 13, Loss: 0.6765184998512268
Epoch 14, Loss: 0.6801705360412598
Epoch 15, Loss: 0.505359411239624
Epoch 16, Loss: 0.37233367562294006
Epoch 17, Loss: 0.5229997634887695
Epoch 18, Loss: 0.618018627166748
Epoch 19, Loss: 0.6751857995986938
Epoch 20, Loss: 0.6104845404624939


In [10]:
# 测试模型
model.eval()
with torch.no_grad():
    total_loss = 0
    for x_batch, y_batch in test_loader:
        output = model(x_batch)
        loss = criterion(output, y_batch)
        total_loss += loss.item()
    print(f"Test Loss: {total_loss / len(test_loader)}")


Test Loss: 0.47548918426036835


# classify


In [11]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

import torch

# 生成示例数据
def generate_data(sequence_length=1000, num_classes=3):
    X = torch.randn(sequence_length, 1)  # 随机特征，每个特征维度为1
    Y = torch.randint(0, num_classes, (sequence_length,))  # 随机标签，标签范围[0, num_classes)
    return X, Y

# 创建数据集
def create_dataset(X, Y, look_back=20):
    dataX, dataY = [], []
    for i in range(len(X) - look_back):
        a = X[i:(i + look_back)].reshape(-1)  # 重塑每个窗口为一个连续的数组
        dataX.append(a)
        dataY.append(Y[i + look_back])
    return torch.stack(dataX), torch.stack(dataY)

# 数据参数
sequence_length = 1000
look_back = 20
num_classes = 3

# 生成并处理数据
X, Y = generate_data(sequence_length, num_classes)
X, Y = create_dataset(X, Y, look_back)

# 创建 DataLoader
batch_size = 32
dataset = TensorDataset(X.unsqueeze(1), Y)  # 增加一个维度用于表示单通道输入
train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# 定义模型
class TemporalBlock(nn.Module):
    def __init__(self, n_inputs, n_outputs, kernel_size, stride, dilation, padding, dropout=0.2):
        super(TemporalBlock, self).__init__()
        self.conv1 = nn.Conv1d(n_inputs, n_outputs, kernel_size,
                               stride=stride, padding=padding, dilation=dilation)
        self.chomp1 = nn.Sequential(
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Conv1d(n_outputs, n_outputs, kernel_size,
                      stride=stride, padding=padding, dilation=dilation),
            nn.ReLU(),
            nn.Dropout(dropout)
        )

    def forward(self, x):
        x = self.conv1(x)
        return self.chomp1(x)
# Let's assume:
num_channels_last_conv_layer = 32  # Or whatever your last conv layer's output channels are
look_back = 20  # Assuming the output sequence length remains the same as input
num_classes = 3

# Define your model:
class TemporalConvNet(nn.Module):
    def __init__(self, num_inputs, num_channels, num_classes, kernel_size=2, dropout=0.2):
        super(TemporalConvNet, self).__init__()
        self.layers = []
        num_levels = len(num_channels)
        for i in range(num_levels):
            dilation_size = 2 ** i
            in_channels = num_inputs if i == 0 else num_channels[i-1]
            out_channels = num_channels[i]
            self.layers.append(
                TemporalBlock(in_channels, out_channels, kernel_size, stride=1,
                              dilation=dilation_size,
                              padding=(kernel_size-1) * dilation_size, dropout=dropout)
            )
        self.network = nn.Sequential(*self.layers)
        self.final_layer = nn.Linear(832, num_classes)

    def forward(self, x):
        # x = x.unsqueeze(1)  # Ensure the channel dimension is correct
        x = self.network(x)
        x = x.view(x.size(0), -1)  # Flatten all dimensions except the batch
        # print("Dimension before Linear layer:", x.shape)  # This will tell you the actual dimension

        return self.final_layer(x)

model = TemporalConvNet(num_inputs=1, num_channels=[16, 32], num_classes=3, kernel_size=2)

optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

# 训练模型
def train_model(model, train_loader, criterion, optimizer, epochs=100):
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for x_batch, y_batch in train_loader:
            optimizer.zero_grad()
            output = model(x_batch)
            loss = criterion(output.squeeze(), y_batch)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Epoch {epoch+1}, Loss: {total_loss / len(train_loader)}")

train_model(model, train_loader, criterion, optimizer)


Epoch 1, Loss: 1.098836971867469
Epoch 2, Loss: 1.0955107865795013
Epoch 3, Loss: 1.0936923180857012
Epoch 4, Loss: 1.0906382376147854
Epoch 5, Loss: 1.0897552467161609
Epoch 6, Loss: 1.0851940493429861
Epoch 7, Loss: 1.0890301196805892
Epoch 8, Loss: 1.0819648888803297
Epoch 9, Loss: 1.0796236722700057
Epoch 10, Loss: 1.0767486056973856
Epoch 11, Loss: 1.0711935835499917
Epoch 12, Loss: 1.0758843768027522
Epoch 13, Loss: 1.071782269785481
Epoch 14, Loss: 1.068077189307059
Epoch 15, Loss: 1.0706683454975006
Epoch 16, Loss: 1.0618713075114834
Epoch 17, Loss: 1.0632517183980634
Epoch 18, Loss: 1.058184479513476
Epoch 19, Loss: 1.0603989247352845
Epoch 20, Loss: 1.0618263598411315
Epoch 21, Loss: 1.0455755745210955
Epoch 22, Loss: 1.0392987785800811
Epoch 23, Loss: 1.0483193359067362
Epoch 24, Loss: 1.0405565557941314
Epoch 25, Loss: 1.0351342693451913
Epoch 26, Loss: 1.0455350068307692
Epoch 27, Loss: 1.0356935897181112
Epoch 28, Loss: 1.0236457470924623
Epoch 29, Loss: 1.035339703482966

In [12]:
# 假设已经有了X_test和Y_test
# 这里用同样的方法创建测试数据集
X_test, Y_test = generate_data(sequence_length=200, num_classes=num_classes)  # 生成更少的数据用于测试
X_test, Y_test = create_dataset(X_test, Y_test, look_back)

test_dataset = TensorDataset(X_test.unsqueeze(1), Y_test)
test_loader = DataLoader(test_dataset, batch_size=batch_size)
def evaluate_model(model, test_loader, criterion):
    model.eval()  # 将模型设置为评估模式
    total_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():  # 在评估过程中不计算梯度
        for x_batch, y_batch in test_loader:
            outputs = model(x_batch)
            loss = criterion(outputs, y_batch)
            total_loss += loss.item()

            _, predicted = torch.max(outputs.data, 1)
            total += y_batch.size(0)
            correct += (predicted == y_batch).sum().item()

    avg_loss = total_loss / len(test_loader)
    accuracy = 100 * correct / total
    print(f'Test Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%')

    return avg_loss, accuracy
evaluate_model(model, test_loader, criterion)


Test Loss: 1.2410, Accuracy: 32.78%


(1.2409824927647908, 32.77777777777778)