In [None]:
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import os
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import math
#读取路径
yh_path='D:\前后摇晃.csv'
#read csv 导入数据
data_yh = pd.read_csv(yh_path)
list_data=[data_yh]
#调时间
for data in list_data:   
    data['时间'] = pd.to_datetime(data['时间'],errors='coerce')
    start_time = data['时间'].min()
    data['时间'] = [start_time + pd.Timedelta(milliseconds=5 * i) for i in range(len(data))]
x_data = data_yh['时间']
y_data = data_yh['加速度Z(g)']
y1_data= data_yh['加速度X(g)']
y2_data= data_yh['加速度Y(g)']
fig = go.Figure()
fig.add_trace(
    go.Line(x=x_data, y=y_data, name='加速度Z(g)'),
)
fig.add_trace(
    go.Line(x=x_data, y=y1_data, name='加速度X(g)',yaxis="y2"),          #最后一步使该图线对应到轴yaxis2
)
fig.add_trace(
    go.Line(x=x_data, y=y2_data, name='加速度Y(g)',yaxis="y3"),
)
fig.update_layout(
    title_text="加速度变化折线图",
    xaxis_title="时间",
    yaxis=dict(title="加速度Z(g)"),
    yaxis2=dict(title="加速度X(g)",overlaying="y",side="left",tickfont=dict(color="#d62728"),titlefont=dict(color="#d62728")),    #dict是词典，用来调整一堆参数
    yaxis3=dict(title="加速度Y(g)",overlaying="y",side="right",titlefont=dict(color="#9467bd"),tickfont=dict(color="#9467bd")),
    font_family="SimHei"
)
fig.write_html("acceleration_plot.html")
fig.show()    

In [None]:
def find_peaks(data):
    data = np.array(data)                          #找波峰
    peaks = []
    for i in range(1, len(data) - 1):
        if data[i - 1] < data[i] and data[i + 1] < data[i]:
            peaks.append(i)
    return peaks
def find_troughs(data):
    data = np.array(data)                         #找波谷
    troughs = []
    for i in range(1, len(data) - 1):
        if data[i - 1] > data[i] and data[i + 1] > data[i]:
            troughs.append(i)
    return troughs
zx_peaks_indices = find_peaks(y1_data) 
zx_troughs_indices = find_troughs(y1_data)    
all_zx_indices = sorted(zx_peaks_indices + zx_troughs_indices)
x_all_waves = []
for i in range(len(all_zx_indices) - 1):
    x_all_waves.append(y1_data[all_zx_indices[i]:all_zx_indices[i + 1] + 1])


In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
from sklearn.metrics import f1_score
from sklearn.model_selection import StratifiedKFold
thresholds = np.linspace(0.003,0.04,38)   
best_threshold = 0

for x_threshold in thresholds:
    labels = []
    for wave in x_all_waves:
        wave = np.array(wave)
        peak = np.max(wave)
        trough = np.min(wave)
        diff = peak - trough
        if diff > x_threshold:
            wave_labels = [1] * len(wave)
        else:
            wave_labels = [0] * len(wave)
        labels.extend(wave_labels)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    max_length = max([len(wave) for wave in x_all_waves])
    padded_waves = []
    for wave in x_all_waves:
        padded = np.pad(wave, (0, max_length - len(wave)), mode='constant')
        padded_waves.append(padded)
    X = np.array(padded_waves)
    y = np.array(labels)

    test_size = 0.2
    num_samples = len(X)
    num_test = int(num_samples * test_size)
    indices = np.random.permutation(num_samples)
    test_indices = indices[:num_test]
    train_indices = indices[num_test:]

    X_train = X[train_indices]
    y_train = y[train_indices]
    X_test = X[test_indices]
    y_test = y[test_indices]

    X_train = np.expand_dims(X_train, axis=1)
    X_test = np.expand_dims(X_test, axis=1)

    class CustomDataset(Dataset):
        def __init__(self, data, labels):
            self.data = torch.tensor(data, dtype=torch.float32)
            self.labels = torch.tensor(labels, dtype=torch.float32)

        def __len__(self):
            return len(self.data)

        def __getitem__(self, idx):
            return self.data[idx], self.labels[idx]

    train_dataset = CustomDataset(X_train, y_train)
    test_dataset = CustomDataset(X_test, y_test)

    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

    class OneDCNN(nn.Module):
        def __init__(self):
            super(OneDCNN, self).__init__()
            self.conv1 = nn.Conv1d(in_channels=1, out_channels=32, kernel_size=3)
            self.dp = nn.Dropout(p = 0.2)
            self.pool = nn.MaxPool1d(kernel_size=2)
            self.fc1 = nn.Linear(32 * ((max_length - 2) // 2), 1)
            self.sigmoid = nn.Sigmoid()

        def forward(self, x):
            x = self.pool(torch.relu(self.conv1(x)))
            x = self.dp(x)
            x = x.view(-1, 32 * ((max_length - 2) // 2))
            x = self.fc1(x)
            x = self.dp(x)
            x = self.sigmoid(x)
            return x

    model = OneDCNN().to(device)
    criterion = nn.BCELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.0001, weight_decay=0.00001)

    num_epochs = 10
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        train_loader_tqdm = tqdm(train_loader, desc=f'Epoch {epoch + 1}/{num_epochs}')
        for inputs, labels in train_loader_tqdm:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs.squeeze(), labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            train_loader_tqdm.set_postfix(loss=running_loss / (train_loader_tqdm.n + 1))
    model_path ='saved_model(qhyh).pth'
    torch.save(model.state_dict(), model_path)
    #评估模型
    model.eval()
    correct = 0
    total = 0
    best_ac = 0
    all_probs = []
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            probs = outputs.squeeze().cpu().numpy()
            all_probs.extend(probs)
            predicted = (outputs.squeeze() > 0.5).float()
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    ac = correct / total
    print(f'Test Accuracy: {100 * correct / total}%')
    y_pred = (np.array(all_probs) > 0.5).astype(int)

    if ac > best_ac:
        best_ac = ac
        best_threshold = x_threshold
print(f'Best threshold: {best_threshold}, Best ac: {best_ac}')

In [None]:
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm

data_yh1 = "D:\COM13_0.csv"
#read csv 导入数据
data_yh1 = pd.read_csv(data_yh1)
list_data1=[data_yh1]
#调时间
for data in list_data1:   
    data['时间'] = pd.to_datetime(data['时间'],errors='coerce')
    start_time = data['时间'].min()
    data['时间'] = [start_time + pd.Timedelta(milliseconds=5 * i) for i in range(len(data))]
#画蹲低折线图 
x_data = data_yh['时间']
y_data = data_yh['加速度Z(g)']
y1_data= data_yh['加速度X(g)']
y2_data= data_yh['加速度Y(g)']
# 提取所有完整波形
zx_peaks_indices = find_peaks(y1_data) 
zx_troughs_indices = find_troughs(y1_data)    
all_zx_indices = sorted(zx_peaks_indices + zx_troughs_indices)
x_all_waves = []
for i in range(len(all_zx_indices) - 1):
    x_all_waves.append(y1_data[all_zx_indices[i]:all_zx_indices[i + 1] + 1])
# 标记异常波形
labels = []
for wave in x_all_waves:
        wave = np.array(wave)
        peak = np.max(wave)
        trough = np.min(wave)
        diff = peak - trough
        if diff > best_threshold:
            wave_labels = [1] * len(wave)
        else:
            wave_labels = [0] * len(wave)
        labels.extend(wave_labels)
padded_waves = []
for wave in x_all_waves:
    padded = np.pad(wave, (0, max_length - len(wave)), mode='constant')
    padded_waves.append(padded)

# 转换为 numpy 数组
X = np.array(padded_waves)
y = np.array(labels)

# 添加通道维度
X = np.expand_dims(X, axis=1)

# 自定义数据集类
class CustomDataset(Dataset):
    def __init__(self, data, labels):
        self.data = torch.tensor(data, dtype=torch.float32)
        self.labels = torch.tensor(labels, dtype=torch.float32)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

# 创建数据集和数据加载器
test_dataset = CustomDataset(X, y)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# 初始化模型
model = OneDCNN()
# 将模型移动到 GPU 或 CPU
model = model.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
# 加载训练好的权重
model.load_state_dict(torch.load(model_path))
# 设置模型为评估模式
model.eval()
# 评估模型
correct = 0
total = 0
# 使用 tqdm 包装 test_loader 显示评估进度
test_loader_tqdm = tqdm(test_loader, desc='Evaluating')
with torch.no_grad():
    for inputs, labels in test_loader_tqdm:
        # 将数据移动到 GPU
        inputs, labels = inputs.to(torch.device("cuda" if torch.cuda.is_available() else "cpu")), labels.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
        outputs = model(inputs)
        predicted = (outputs.squeeze() > 0.5).float()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        # 更新 tqdm 进度条显示的信息
        test_loader_tqdm.set_postfix(accuracy=correct / total)

print(f'Test Accuracy: {100 * correct / total}%')