In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!pip install pywavelets
!pip install scipy

Collecting pywavelets
  Downloading pywavelets-1.8.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.0 kB)
Downloading pywavelets-1.8.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.5/4.5 MB[0m [31m44.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pywavelets
Successfully installed pywavelets-1.8.0


In [3]:
import pandas as pd
import numpy as np
import pywt
from scipy.signal import stft

In [4]:
def generate_synthetic_signal(signal_type="normal", length=1280, noise_db=20, fs=6400):
    t = np.linspace(0, 1, length, endpoint=False)
    clean_signal = np.sin(2 * np.pi * 50 * t)

    if signal_type == "sag":
        clean_signal[300:700] *= 0.5
    elif signal_type == "swell":
        clean_signal[300:700] *= 1.5
    elif signal_type == "interrupt":
        clean_signal[300:700] = 0
    elif signal_type == "harmonic":
        clean_signal += 0.2 * np.sin(2 * np.pi * 150 * t)
    elif signal_type == "harmonic_sag":
        clean_signal[300:700] *= 0.5
        clean_signal += 0.2 * np.sin(2 * np.pi * 150 * t)
    elif signal_type == "harmonic_swell":
        clean_signal[300:700] *= 1.5
        clean_signal += 0.2 * np.sin(2 * np.pi * 150 * t)
    elif signal_type == "flicker":
        clean_signal *= (1 + 0.2 * np.sin(2 * np.pi * 10 * t))
    elif signal_type == "transient":
        clean_signal += 0.5 * np.exp(-((t - 0.5) ** 2) / 0.001) * np.sin(2 * np.pi * 500 * t)


    noise_std = 10 ** (-noise_db / 20)
    noise = noise_std * np.random.randn(length)
    noisy_signal = clean_signal + noise


    signal_power = np.mean(clean_signal ** 2)
    noise_power = np.mean(noise ** 2)
    snr_db_actual = 10 * np.log10(signal_power / noise_power)

    return noisy_signal, snr_db_actual


num_samples = 13500
fs = 6400
signal_types = ["normal", "sag", "swell", "interrupt", "harmonic", "harmonic_sag", "harmonic_swell", "flicker", "transient"]
labels = []
signals = []
snr_values = []

for _ in range(num_samples):
    signal_type = np.random.choice(signal_types)
    noisy_signal, actual_snr = generate_synthetic_signal(signal_type, noise_db=50, fs=fs)
    signals.append(noisy_signal)
    labels.append(signal_type)
    snr_values.append(actual_snr)

signals = np.array(signals)


signals_df = pd.DataFrame(signals)
signals_df.insert(0, "SNR_dB", snr_values)
signals_df.insert(1, "Label", labels)
signals_df.to_csv("/content/drive/MyDrive/pq_signals50_dataset.csv", index=False)

In [5]:
def extract_wavelet_features(signal, wavelet='db4', level=7):
    coeffs = pywt.wavedec(signal, wavelet, level=level)
    F1 = np.sum(np.square(coeffs[-1]))  # Wavelet Energy of 7th Layer
    return np.array([F1])

def extract_st_features(signal, fs=6400):
    f, t, Zxx = stft(signal, fs=fs, nperseg=64)  # Short-Time Fourier Transform
    magnitude = np.abs(Zxx)  # Compute magnitude spectrum
    max_amplitude_vector = np.max(magnitude, axis=0)  # Max amplitude over time

    F2 = np.max(max_amplitude_vector)  # Max value of max amplitude vector
    F3 = np.min(max_amplitude_vector)  # Min value of max amplitude vector
    F4 = np.std(max_amplitude_vector)  # Std deviation of max amplitude vector

    freq_range_100_600 = (f >= 100) & (f <= 600)
    F5 = np.std(np.max(magnitude[freq_range_100_600, :], axis=0)) if np.any(freq_range_100_600) else 0

    freq_range_700_1500 = (f >= 700) & (f <= 1500)
    if np.any(freq_range_700_1500):
        max_freq_amp_700_1500 = np.max(magnitude[freq_range_700_1500, :], axis=0)
        F6 = np.mean(max_freq_amp_700_1500)
        F7 = np.std(max_freq_amp_700_1500)
    else:
        F6, F7 = 0, 0

    threshold = 0.9 * np.max(max_amplitude_vector)
    F8 = np.sum(max_amplitude_vector > threshold)

    return np.array([F2, F3, F4, F5, F6, F7, F8])

def extract_features(signal, fs=6400):
    wavelet_features = extract_wavelet_features(signal)
    st_features = extract_st_features(signal, fs=fs)
    return np.concatenate((wavelet_features, st_features))

# Extract features
features = np.array([extract_features(sig, fs=fs) for sig in signals])

# Save features to CSV
feature_names = ["F1", "F2", "F3", "F4", "F5", "F6", "F7", "F8"]
df_features = pd.DataFrame(features, columns=feature_names)
df_features.insert(0, "Label", labels)
df_features.to_csv("/content/drive/MyDrive/pq_features50_dataset.csv", index=False)