In [2]:
import os
import re
from scipy.io import loadmat
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler, MaxAbsScaler
from sklearn.model_selection import RandomizedSearchCV
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.metrics import make_scorer, accuracy_score, confusion_matrix, classification_report
from sklearn.preprocessing import StandardScaler
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt

In [2]:
### Helper Functions ###

def extract_s11_from_s1p(file_path):
    data = []
    with open(file_path, 'r') as file:
        for line in file:
            # Skip header lines that start with ! or #
            if not line.startswith(('!', '#')):
                columns = line.split()
                # Collect only S11_Real (second column)
                data.append(float(columns[1]))
    return data

# Function to parse Lg, Vds, and Vgs from filenames
def parse_filename_parameters(filename):
    # Initialize default values
    Lg = Vds = Vgs = 'NA'
    
    # Use regex to search for Lg, Vds, and Vgs patterns in the filename
    lg_match = re.search(r'Lg(\d+)p(\d+)', filename)
    vds_match = re.search(r'Vds(\d+)', filename)
    vgs_match = re.search(r'Vgs(\d+)', filename)

    # Process the Lg match with "p" as decimal point
    if lg_match:
        Lg = float(f"{lg_match.group(1)}.{lg_match.group(2)}")
    if vds_match:
        Vds = int(vds_match.group(1))
    if vgs_match:
        Vgs = int(vgs_match.group(1))
    
    # If 'Opend' is in the filename, set Vds to 10000
    if 'opend' in filename:
        Vds = 10000
    
    return Lg, Vds, Vgs

# Function to find repeating pattern
def find_repeating_pattern(waveform, min_period=1000, num_periods=5):
    # Calculate autocorrelation
    autocorr = np.correlate(waveform, waveform, mode='full')
    autocorr = autocorr[autocorr.size // 2:]  # Keep only the second half

    # Find the first peak after lag=0
    differences = np.diff(autocorr)  # Differences between consecutive points
    peaks = np.where((differences[:-1] > 0) & (differences[1:] < 0))[0] + 1  # Peak detection
    
    # Find the period
    if len(peaks) > 0:
        period = peaks[0]  # The first peak indicates the repeating period
        if period < min_period:
            period = min_period  # Enforce minimum period
    else:
        period = min_period  # Default to minimum period if no peaks are found

    # Extract multiple periods of the waveform
    end_index = period * num_periods
    repeating_pattern = waveform[:end_index]
    
    return repeating_pattern, period

def filter_dataset_by_columns(main_dataset, subset_dataset, matching_columns):
    """
    Filters rows in the main dataset where the values in matching columns 
    match those in the subset dataset.

    Parameters:
        main_dataset (pd.DataFrame): The primary dataset to filter.
        subset_dataset (pd.DataFrame): The subset with matching criteria.
        matching_columns (list): List of column names to match on.

    Returns:
        pd.DataFrame: A filtered dataset with matching rows.
    """
    # Filter rows in the main dataset where matching column values are in the subset dataset
    filtered_dataset = main_dataset[
        main_dataset[matching_columns].apply(tuple, axis=1).isin(
            subset_dataset[matching_columns].apply(tuple, axis=1)
        )
    ]
    return filtered_dataset

def smooth_dataframe_columns(df, group_size, fixed_start_cols, fixed_end_cols):
    
    if group_size <= 0:
        raise ValueError("Group size must be greater than 0.")
    
    # Separate fixed columns
    fixed_start = df.iloc[:, :fixed_start_cols]
    fixed_end = df.iloc[:, -fixed_end_cols:]
    
    # Columns to smooth (excluding fixed columns)
    smooth_cols = df.iloc[:, fixed_start_cols:-fixed_end_cols]

    # Smooth by averaging every `group_size` columns
    smoothed_data = []
    for i in range(0, smooth_cols.shape[1], group_size):
        chunk = smooth_cols.iloc[:, i:i+group_size]
        smoothed_data.append(chunk.mean(axis=1))

    # Handle remaining columns if not a perfect multiple of group_size
    if smooth_cols.shape[1] % group_size != 0:
        remaining_cols = smooth_cols.iloc[:, -(smooth_cols.shape[1] % group_size):]
        smoothed_data.append(remaining_cols.mean(axis=1))

    # Combine fixed columns with smoothed data
    smoothed_df = pd.concat([fixed_start] + smoothed_data + [fixed_end], axis=1)
    return smoothed_df

In [3]:
def creating_sync_dataset(S1P_file_path, TDR_file_path):
    
    s11_data = {}
    frequency_data = None
    
    # Loop over all .s1p files in the directory
    for filename in os.listdir(S1P_file_path):
        if filename.endswith('.s1p'):
            file_path = os.path.join(S1P_file_path, filename)
            s11_values = extract_s11_from_s1p(file_path)
            
            # Use the first file's frequency values as a reference
            if frequency_data is None:
                with open(file_path, 'r') as file:
                    frequency_data = [float(line.split()[0]) for line in file if not line.startswith(('!', '#'))]
            
            s11_data[filename] = s11_values
            
    s11_dataset = pd.DataFrame(s11_data).T
    s11_dataset.columns = [f'Frequency_{i}' for i in range(len(s11_dataset.columns))]  # Set column names dynamically
    s11_dataset.insert(0, 'File', s11_dataset.index) 
    
    Duty_list = []

    #for index, row in s11_dataframe.iterrows():
    for i in s11_dataset['File']:
        if 'dut1' in i:
            Duty_list.append(1)
        elif 'dut5' in i:
            Duty_list.append(0)
        else:
            Duty_list.append('NA')
    
    s11_dataset = s11_dataset.drop(columns=['Frequency_0'], errors='ignore')  # Ignore error if column does not exist
    
    s11_dataset[['Lg', 'Vds', 'Vgs']] = s11_dataset['File'].apply(lambda x: pd.Series(parse_filename_parameters(x)))
    s11_dataset = s11_dataset[['Lg', 'Vds', 'Vgs'] + [col for col in s11_dataset.columns if col.startswith('Frequency')]]
    s11_dataset['Duty'] = Duty_list
    
    # Initialize a dictionary to store the loaded data
    mat_data = {}

    # Loop through the files in the folder
    for file_name in os.listdir(TDR_file_path):
        if file_name.endswith('.mat'):  # Check if the file is a .mat file
            # Construct the full file path
            file_path = os.path.join(TDR_file_path, file_name)
            # Load the .mat file and store it in the dictionary
            mat_data[file_name] = loadmat(file_path)

    # Access the loaded data as needed
    tdr_train = mat_data.get('TDR_train.mat')
    tdr_test = mat_data.get('TDR_test.mat')
    tdr_val = mat_data.get('TDR_val.mat')
    
    # Extract and convert the datasets to DataFrames
    train_data = pd.DataFrame(tdr_train["dataTDRtrain"])
    test_data = pd.DataFrame(tdr_test["dataTDRtest"])
    val_data = pd.DataFrame(tdr_val["dataTDRval"])
    
    num_columns = train_data.shape[1]
    column_names = [f"t_{i+1}" for i in range(num_columns)]

    # Assign these column names to the DataFrame
    train_data.columns = column_names
    test_data.columns = column_names
    val_data.columns = column_names
    
    # Specify the path to your Excel file
    excel_file_path = r'D:\Master_thesis\creating_dataset\TDR\key_identifiers.xlsx'

    # Load each sheet into a separate DataFrame
    identifiers_train = pd.read_excel(excel_file_path, sheet_name='train', header=1)
    identifiers_test = pd.read_excel(excel_file_path, sheet_name='test', header=1)
    identifiers_val = pd.read_excel(excel_file_path, sheet_name='val', header=1)

    train_dataset = pd.concat([identifiers_train, train_data], axis=1)
    test_dataset = pd.concat([identifiers_test, test_data], axis=1)
    val_dataset = pd.concat([identifiers_val, val_data], axis=1)

    # The column to move to the last position (for example, column 'B')
    col_to_move = 'Duty'

    # Function to move a column to the last position in a DataFrame
    def move_column_to_last(df, col_to_move):
        cols = [col for col in df.columns if col != col_to_move]
        df = df[cols + [col_to_move]]
        return df

    # Apply the function to each dataset
    train_dataset = move_column_to_last(train_dataset, col_to_move)
    test_dataset = move_column_to_last(test_dataset, col_to_move)
    val_dataset = move_column_to_last(val_dataset, col_to_move)

    TDR_dataset = pd.concat([train_dataset, test_dataset, val_dataset], axis=0, ignore_index=True)

    TDR_dataset['Duty'] = TDR_dataset['Duty'].replace({1: 1, 5: 0})
    print()
    # Columns for comparison
    common_columns = ['Lg', 'Vds', 'Vgs', 'Duty']

    # Identify common samples
    common_samples = pd.merge(s11_dataset[common_columns], TDR_dataset[common_columns], on=common_columns)

    # Subset 1: Rows in df1 not in common samples
    unsync_s11_dataset = s11_dataset[~s11_dataset[common_columns].apply(tuple, axis=1).isin(common_samples.apply(tuple, axis=1))]

    # Subset 2: Common samples from df1 with all df1 columns
    sync_s11_dataset = s11_dataset[s11_dataset[common_columns].apply(tuple, axis=1).isin(common_samples.apply(tuple, axis=1))]

    # Subset 3: Common samples from df2 with all df2 columns
    sync_TDR_dataset = TDR_dataset[TDR_dataset[common_columns].apply(tuple, axis=1).isin(common_samples.apply(tuple, axis=1))]

    # Subset 4: Rows in df2 not in common samples
    unsync_TDR_dataset = TDR_dataset[~TDR_dataset[common_columns].apply(tuple, axis=1).isin(common_samples.apply(tuple, axis=1))]

    
    return unsync_s11_dataset, sync_s11_dataset, sync_TDR_dataset, unsync_TDR_dataset, TDR_dataset

In [4]:
S1P_file_path = 'D:\Master_thesis\creating_dataset\Dataset\S11'
TDR_file_path = r'D:\Master_thesis\creating_dataset\TDR'

In [5]:
_, _, sync_TDR_dataset, unsync_TDR_dataset, TDR_dataset= creating_sync_dataset(S1P_file_path, TDR_file_path)




In [6]:
sync_TDR_dataset.shape, TDR_dataset.shape

((30, 63005), (75, 63005))

In [10]:
class SingleModalityClassifier(nn.Module):
    def __init__(self, seq_len, input_dim, num_classes, d_model=128, nhead=8, num_layers=1):
        super(SingleModalityClassifier, self).__init__()
      
        self.embedding = nn.Linear(input_dim, d_model)
        
        encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead)
        
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        self.classifier = nn.Linear(d_model*seq_len, num_classes)

    def forward(self, x):

        x = self.embedding2(x)  # Shape: [batch_size, seq_len2, d_model]

        x = x.permute(1, 0, 2)  # Shape: [seq_len2, batch_size, d_model]
        x = self.transformer_encoder(x)
        x = x.permute(1, 0, 2)  # Back to [batch_size, seq_len2, d_model]

        # Flatten for classification
        x = x.flatten(start_dim=1)  # Shape: [batch_size, (seq_len1 + seq_len2) * d_model]
        out = self.classifier(x)  # Shape: [batch_size, num_classes]
    
        return out

In [11]:
TDR_Classifier = SingleModalityClassifier(seq_len=1005, input_dim=1, num_classes=2)

In [12]:
TDR_Classifier

SingleModalityClassifier(
  (embedding): Linear(in_features=1, out_features=128, bias=True)
  (transformer_encoder): TransformerEncoder(
    (layers): ModuleList(
      (0): TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=128, out_features=128, bias=True)
        )
        (linear1): Linear(in_features=128, out_features=2048, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
        (linear2): Linear(in_features=2048, out_features=128, bias=True)
        (norm1): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.1, inplace=False)
        (dropout2): Dropout(p=0.1, inplace=False)
      )
    )
  )
  (classifier): Linear(in_features=128640, out_features=2, bias=True)
)

In [8]:
def data_Scaling(Dataset, scaling_type='standard'):
    
    train_shape = int((Dataset.shape[0])*0.6)
    test_shape = int((Dataset.shape[0])*0.2)
    val_shape = int((Dataset.shape[0])*0.2)
    print(train_shape, test_shape, val_shape)
    
    Dataset = Dataset.sample(frac=1, random_state=42).reset_index(drop=True)

    Dataset_train, temp_data = train_test_split(Dataset, train_size=train_shape, random_state=None, shuffle=True)

    # Step 2: Split temp_data into test and val
    Dataset_test, Dataset_val = train_test_split(temp_data, train_size=test_shape, random_state=None, shuffle=True)
    
    Dataset_train = smooth_dataframe_columns(Dataset_train, 63, 3, 1)
    Dataset_test = smooth_dataframe_columns(Dataset_test, 63, 3, 1)
    Dataset_val = smooth_dataframe_columns(Dataset_val, 63, 3, 1)

    X_Dataset_train = Dataset_train.iloc[:, 0:-1].values
    y_Dataset_train = Dataset_train['Duty'].values

    X_Dataset_test = Dataset_test.iloc[:, 0:-1].values
    y_Dataset_test = Dataset_test['Duty'].values
    print(y_Dataset_test)
    X_Dataset_val = Dataset_val.iloc[:, 0:-1].values
    y_Dataset_val = Dataset_val['Duty'].values
    
    # Mapping of scaling types to scaler objects
    scalers = {
        'standard': StandardScaler(),
        'minmax': MinMaxScaler(),
        'robust': RobustScaler(),
        'maxabs': MaxAbsScaler()
    }

    # Check if the scaling_type is valid
    if scaling_type not in scalers:
        raise ValueError(f"Invalid scaling_type. Choose from {list(scalers.keys())}.")
    
    # Select the appropriate scaler
    scaler = scalers[scaling_type]
    
    X_Dataset_train_scaled = scaler.fit_transform(X_Dataset_train)
    X_Dataset_test_scaled = scaler.transform(X_Dataset_test)
    X_Dataset_val_scaled = scaler.transform(X_Dataset_val)
    
    # Convert data to PyTorch tensors
    X_Dataset_train_scaled = torch.tensor(X_Dataset_train_scaled, dtype=torch.float32)
    y_Dataset_train = torch.tensor(y_Dataset_train, dtype=torch.long)
    X_Dataset_test_scaled = torch.tensor(X_Dataset_test_scaled, dtype=torch.float32)
    y_Dataset_test = torch.tensor(y_Dataset_test, dtype=torch.long)
    X_Dataset_val_scaled = torch.tensor(X_Dataset_val_scaled, dtype=torch.float32)
    y_Dataset_val = torch.tensor(y_Dataset_val, dtype=torch.long)

    # Convert data and labels to TensorDatasets and create DataLoaders
    Dataset_train_dataset = TensorDataset(X_Dataset_train_scaled, y_Dataset_train.long())
    Dataset_val_dataset = TensorDataset(X_Dataset_val_scaled, y_Dataset_val.long())
    Dataset_test_dataset = TensorDataset(X_Dataset_test_scaled, y_Dataset_test.long())

    Dataset_train_loader = torch.utils.data.DataLoader(Dataset_train_dataset, batch_size=4, shuffle=True)
    Dataset_test_loader = torch.utils.data.DataLoader(Dataset_test_dataset, batch_size=4)
    Dataset_val_loader = torch.utils.data.DataLoader(Dataset_val_dataset, batch_size=4)
    
    return Dataset_train_loader, Dataset_test_loader, Dataset_val_loader

In [9]:
syncTDR_train_loader, syncTDR_test_loader, syncTDR_val_loader = data_Scaling(sync_TDR_dataset, scaling_type='standard')

18 6 6
[1 1 0 0 0 1]


In [10]:
# Training function
def train_TDR_model(model, train_loader, val_loader):
    num_epochs=50 
    patience=10
    best_val_loss = float('inf')
    epochs_no_improve = 0
    
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(TDR_Classifier.parameters(), lr=0.001)
    
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        train_targets, train_preds = [], []
        
        # Iterate over both loaders simultaneously (synchronized)
        for train_data, train_labels in train_loader:

            optimizer.zero_grad()
            outputs = model(train_data.unsqueeze(-1))
            loss = criterion(outputs, train_labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            
            train_preds.extend(torch.argmax(outputs, dim=1).cpu().numpy())
            train_targets.extend(train_labels.cpu().numpy())
                
        train_accuracy = accuracy_score(train_targets, train_preds)
        
        # Validation
        model.eval()
        val_loss = 0
        val_preds, val_targets = [], []
        with torch.no_grad():
            
            for val_data, val_labels in val_loader:
                
                outputs = model(val_data.unsqueeze(-1))
                loss = criterion(outputs, val_labels)
                val_loss += loss.item()

                val_preds.extend(torch.argmax(outputs, dim=1).cpu().numpy())
                val_targets.extend(val_labels.cpu().numpy())

        val_accuracy = accuracy_score(val_targets, val_preds)
        print(f"Epoch {epoch + 1}/{num_epochs}, Train Accuracy: {train_accuracy:.4f}, Val Accuracy: {val_accuracy:.4f}")

        # Early stopping logic
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1

        if epochs_no_improve >= patience:
            print(f"Early stopping triggered after {epoch + 1} epochs. Best Val Loss: {best_val_loss:.4f}")
            
            break
            
def test_model(model, test_loader):
    model.eval()
    test_preds, test_targets = [], []
    with torch.no_grad():

        for test_data, test_labels in test_loader:

            outputs = model(test_data.unsqueeze(-1))
            test_preds.extend(torch.argmax(outputs, dim=1).cpu().numpy())
            test_targets.extend(test_labels.cpu().numpy())

    test_accuracy = accuracy_score(test_targets, test_preds)
    test_conf_matrix = confusion_matrix(test_targets, test_preds)
    classi_report = classification_report(test_targets, test_preds)
    print('Test accuracy:-', test_accuracy)
    print(test_conf_matrix)
    print('Confusion Matrix')
    print(confusion_matrix)
    print('Accuracy - ',np.round(test_accuracy,3))
    print('Classification Report')
    print(classi_report)

In [None]:
TDR_train_loader, TDR_test_loader, TDR_val_loader = data_Scaling(TDR_dataset, scaling_type='standard')

In [11]:
TDR_Classifier = SingleModalityClassifier(seq_len2=1005, input_dim=1, num_classes=2)

train_TDR_model(TDR_Classifier, syncTDR_train_loader, syncTDR_val_loader)
test_model(TDR_Classifier, syncTDR_test_loader)



Epoch 1/50, Train Accuracy: 0.5556, Val Accuracy: 1.0000
Epoch 2/50, Train Accuracy: 0.2778, Val Accuracy: 1.0000
Epoch 3/50, Train Accuracy: 0.5000, Val Accuracy: 0.6667
Epoch 4/50, Train Accuracy: 0.7222, Val Accuracy: 0.6667
Epoch 5/50, Train Accuracy: 0.8333, Val Accuracy: 0.8333
Epoch 6/50, Train Accuracy: 0.9444, Val Accuracy: 0.3333
Epoch 7/50, Train Accuracy: 0.8889, Val Accuracy: 0.3333
Epoch 8/50, Train Accuracy: 0.9444, Val Accuracy: 0.6667
Epoch 9/50, Train Accuracy: 0.8333, Val Accuracy: 0.5000
Epoch 10/50, Train Accuracy: 1.0000, Val Accuracy: 0.3333
Epoch 11/50, Train Accuracy: 1.0000, Val Accuracy: 0.3333
Early stopping triggered after 11 epochs. Best Val Loss: 0.0000
Test accuracy:- 0.5
[[3 0]
 [3 0]]
Confusion Matrix
<function confusion_matrix at 0x000002B37FCA1D30>
Accuracy -  0.5
Classification Report
              precision    recall  f1-score   support

           0       0.50      1.00      0.67         3
           1       0.00      0.00      0.00         3

   

  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
