In [105]:
import pandas as pd
import numpy as np
from nptdms import TdmsFile
from pathlib import Path
import os
import matplotlib.pyplot as plt

# Path to the directory containing TDMS files
tdms_dir = r"C:\Users\patry\OneDrive\Pulpit\kopalnia"

# List to store paths of TDMS files
tdms_list = []

# Walk through the directory and its subdirectories
for root, dirs, files in os.walk(tdms_dir):
    print(f"There are {len(dirs)} directories and {len(files)} file in {root}")
    
    for file in files:
        if file.endswith(".tdms"):
            # If the file ends with '.tdms', add its full path to the list
            tdms_list.append(os.path.join(root, file))

There are 3 directories and 0 file in C:\Users\patry\OneDrive\Pulpit\kopalnia
There are 0 directories and 112 file in C:\Users\patry\OneDrive\Pulpit\kopalnia\020224wieliczka
There are 0 directories and 76 file in C:\Users\patry\OneDrive\Pulpit\kopalnia\210224wieliczka
There are 0 directories and 86 file in C:\Users\patry\OneDrive\Pulpit\kopalnia\220424wieliczka


In [106]:
# Create an empty list to store data
data_list = []

# Open tdms files and create a list with labels
for tdms_file_path in tdms_list:
    tdms_file = TdmsFile.read(tdms_file_path)
    first_letter = os.path.basename(tdms_file_path)[0]
    
    for group in tdms_file.groups():
        for channel in group.channels():
            data_len = np.shape(channel.data)[0]
            # Check conditions and append data to list
            if (data_len <= 1000000 and data_len >= 10000 and first_letter != 'z'):
                data_list.append({
                    #'anchor_ids': group.name,
                    'class': first_letter,
                    #'driveway': channel.name,
                    'excitation': channel.data,
                    'type_id': 1 if first_letter == 'd' else 0
                })

# Create DataFrame from the list of dictionaries
df = pd.DataFrame(data_list)

In [107]:
df_date = df.loc[df.index % 2 == 0]
df_magitude = df.loc[df.index % 2 != 0]

In [108]:
df_magitude.head()

Unnamed: 0,class,excitation,type_id
1,d,"[10.325520382999999, 10.325520382999999, 10.32...",1
3,d,"[-10.380197332, -8.734103042, -5.822633007, 10...",1
5,d,"[10.325520382999999, 10.325520382999999, -10.3...",1
7,d,"[10.325520382999999, -10.380197332, -10.380197...",1
9,d,"[-0.02696687200000001, 0.002660044999999999, 0...",1


In [109]:
from scipy.signal import welch

def signal_to_psd(signal: np.array, sampling_rate: float) -> tuple:
    """
    Computes the Power Spectral Density (PSD) using Welch's method for the given signal.

    Parameters:
    signal (np.array): Input signal.
    sampling_rate (float): Sampling frequency of the signal (samples per second).

    Returns:
    tuple: A tuple containing the PSD result and corresponding frequencies.
    """
    frequencies, psd = welch(signal, fs=sampling_rate)
    return frequencies, psd

In [110]:
# Determine the maximum length of the signals
max_length = max(df_magitude['excitation'].apply(len))

# Zero-pad each signal to the maximum length
padded_signals = df_magitude['excitation'].apply(lambda x: np.pad(x, (0, max_length - len(x)), 'constant'))

# Initialize the new DataFrame
df_fft = pd.DataFrame()

# Copy the 'type_id' column from df_magitude
df_fft['type_id'] = df_magitude['type_id']

# Initialize empty lists to store the frequencies and magnitudes
freq_list = []
magni_list = []

# Compute FFT for each zero-padded signal
for signal in padded_signals:
    freq, magni = signal_to_psd(np.array(signal), 40000)  # Assuming a sampling rate of 1 Hz
    freq_list.append(freq)
    magni_list.append(magni)

# Add the computed frequencies and magnitudes to the new DataFrame
df_fft['frequencies'] = freq_list
df_fft['magnitudes'] = magni_list

In [111]:
from torch.utils.data import Dataset
import torch

# Custom dataset class
class Magintude_dataset(Dataset):
    def __init__(self, df: pd.DataFrame):
        self.df = df

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        psd_magnitude = self.df['magnitudes'].iloc[idx]
        type_id = self.df['type_id'].iloc[idx]

        psd_magnitude = torch.tensor(psd_magnitude, dtype = torch.float32).unsqueeze(dim=0)
        type_id = torch.tensor(type_id, dtype = torch.long)

        return psd_magnitude, type_id

In [112]:
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader

# Split data to train and test
train_df, test_df = train_test_split(df_fft, test_size=0.1, random_state = 42)

# Make train and test instance of class
train_dataset = Magintude_dataset(train_df)
test_dataset = Magintude_dataset(test_df)

# Make DataLoader
BATCH_SIZE  = 8
train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

In [113]:
next(iter(train_dataloader))[0].size(), next(iter(train_dataloader))[1].size()

(torch.Size([8, 1, 129]), torch.Size([8]))

In [114]:
train_dataset[0][0].size()

torch.Size([1, 129])

In [115]:
import torch
import torch.nn as nn
from torchsummary import summary

class SimpleVGG(nn.Module):
    def __init__(self, num_classes=1):
        super(SimpleVGG, self).__init__()
        self.features = nn.Sequential(
            nn.Conv1d(1, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2),
            
            nn.Conv1d(16, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2),
            
            nn.Conv1d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2),

            nn.Conv1d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2),
        )
        self.avgpool = nn.AdaptiveAvgPool1d(7)
        self.classifier = nn.Sequential(
            nn.Linear(128 * 7, 128*10),
            nn.ReLU(),
            nn.Linear(128*10, 256),
            nn.ReLU(),
            nn.Linear(256, num_classes),
            nn.ReLU()        
        )

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x
"""
class conv_net(nn.Module):
    def __init__(self):
        super(conv_net, self).__init__()
        self.conv1d = nn.Conv1d(in_channels=1, out_channels=32, kernel_size=3, padding='same')

    def forward(self, x):
        return self.conv1d(x)
"""        
# Create an instance of the model
model = SimpleVGG()

# Pass the instance to summary
#summary(model, (1, 129))


In [116]:
# Test
#pred = model(train_dataset[0][0])
#pred

In [117]:
device = 'cpu' if torch.cuda.is_available else 'cpu'
device

'cpu'

In [120]:
import torch.optim as optim

# Definicja funkcji straty i optymalizatora
criterion = nn.BCELoss()  # Używamy Binary Cross Entropy Loss dla binarnej klasyfikacji
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Przygotowanie danych
# Załóżmy, że masz dane treningowe i testowe w formie DataLoaderów o nazwach train_loader i test_loader.

# Pętla ucząca
def train(model, train_dataloader, test_dataloader, criterion, optimizer, num_epochs=10):
    for epoch in range(num_epochs):
        model.train()  # Ustawienie modelu w tryb treningu
        running_loss = 0.0
        for inputs, labels in train_dataloader:
            optimizer.zero_grad()  # Wyzerowanie gradientów
            outputs = model(inputs)  # Przekazanie danych przez model
            loss = criterion(outputs.squeeze(), labels.float())  # Obliczenie funkcji straty
            loss.backward()  # Propagacja wsteczna
            optimizer.step()  # Aktualizacja wag
            running_loss += loss.item() * inputs.size(0)

        epoch_loss = running_loss / len(train_dataloader.dataset)
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss}")

    # Ocenianie modelu na danych testowych
    model.eval()  # Ustawienie modelu w tryb ewaluacji
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in test_dataloader:
            outputs = model(inputs)
            predicted = (outputs > 0.5).squeeze().long()  # Zaokrąglenie do najbliższej całkowitej wartości i usunięcie wymiaru zbędnego
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = correct / total
    print(f"Accuracy on test data: {accuracy}")
# Użycie funkcji uczącej
train(model, train_dataloader, test_dataloader, criterion, optimizer, num_epochs=10)


Epoch [1/10], Loss: 62.96296296296296
Epoch [2/10], Loss: 62.96296296296296
Epoch [3/10], Loss: 62.96296296296296
Epoch [4/10], Loss: 62.96296296296296
Epoch [5/10], Loss: 62.96296296296296
Epoch [6/10], Loss: 62.96296296296296
Epoch [7/10], Loss: 62.96296296296296
Epoch [8/10], Loss: 62.96296296296296
Epoch [9/10], Loss: 62.96296296296296
Epoch [10/10], Loss: 62.96296296296296
Accuracy on test data: 0.25
