In [1]:
import os
import numpy as np
import pandas as pd
from scipy.stats import entropy
from scipy.fft import rfft, rfftfreq
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
from imblearn.over_sampling import SMOTE
from xgboost import XGBClassifier


# 1. Sliding Window Segmentation

def segment_signal(data, window_size=1024, step=256):
    segments = []
    for start in range(0, len(data) - window_size + 1, step):
        segments.append(data[start:start+window_size])
    return segments


# 2. Hjorth Parameters

def hjorth_params(data):
    diff1 = np.diff(data)
    diff2 = np.diff(diff1)
    var0 = np.var(data)
    var1 = np.var(diff1)
    var2 = np.var(diff2)
    activity = var0
    mobility = np.sqrt(var1 / (var0 + 1e-9))
    complexity = np.sqrt(var2 / (var1 + 1e-9)) / (mobility + 1e-9)
    return activity, mobility, complexity


# 3. Advanced Feature Extraction

def extract_features(data):
    data = np.array(data)
    if len(data) == 0:
        return [0]*22

    # Time-domain
    mean = np.mean(data)
    std = np.std(data)
    rms = np.sqrt(np.mean(data**2))
    peak = np.max(np.abs(data))
    skew = pd.Series(data).skew()
    kurt = pd.Series(data).kurt()
    crest_factor = peak / (rms + 1e-9)
    shape_factor = rms / (np.mean(np.abs(data)) + 1e-9)
    impulse_factor = peak / (np.mean(np.abs(data)) + 1e-9)
    clearance_factor = peak / (np.mean(np.sqrt(np.abs(data))) + 1e-9)
    peak_to_peak = np.ptp(data)
    variance = np.var(data)
    activity, mobility, complexity = hjorth_params(data)

    # Frequency-domain
    fft_vals = np.abs(rfft(data))
    fft_freqs = rfftfreq(len(data), d=1)
    dominant_freq = fft_freqs[np.argmax(fft_vals)] if len(fft_vals) > 0 else 0.0

    sum_fft_vals = np.sum(fft_vals)
    spectral_centroid = np.sum(fft_freqs * fft_vals) / (sum_fft_vals + 1e-9) if sum_fft_vals > 0 else 0.0
    spectral_entropy = entropy(fft_vals / (sum_fft_vals + 1e-9)) if sum_fft_vals > 0 else 0.0
    spectral_kurtosis = pd.Series(fft_vals).kurt() if len(fft_vals) > 1 else 0.0

    band_energy_0_500 = np.sum(fft_vals[(fft_freqs >=0) & (fft_freqs <=500)])
    band_energy_500_1000 = np.sum(fft_vals[(fft_freqs >500) & (fft_freqs <=1000)])
    band_energy_1000_2000 = np.sum(fft_vals[(fft_freqs >1000) & (fft_freqs <=2000)])

    return [
        mean, std, rms, peak, skew, kurt,
        crest_factor, shape_factor, impulse_factor, clearance_factor,
        peak_to_peak, variance, activity, mobility, complexity,
        dominant_freq, spectral_centroid, spectral_entropy, spectral_kurtosis,
        band_energy_0_500, band_energy_500_1000, band_energy_1000_2000
    ]


# 4. Load Dataset + Segment + Extract Features

BASE_PATH = "/content/drive/MyDrive/IoTeligen/Data"
features = []
labels = []

for fault_type in os.listdir(BASE_PATH):
    folder = os.path.join(BASE_PATH, fault_type)
    if not os.path.isdir(folder):
        continue
    for sub in os.listdir(folder):
        sub_path = os.path.join(folder, sub)
        for file in os.listdir(sub_path):
            if file.endswith(".csv"):
                filepath = os.path.join(sub_path, file)
                temp_data = []
                with open(filepath, 'rb') as f:
                    for line in f:
                        decoded_line = ""
                        try:
                            decoded_line = line.decode('utf-8')
                        except UnicodeDecodeError:
                            try:
                                decoded_line = line.decode('latin-1')
                            except UnicodeDecodeError:
                                try:
                                    decoded_line = line.decode('cp1252')
                                except UnicodeDecodeError:
                                    continue

                        cleaned_line = decoded_line.strip()

                        numerical_part = cleaned_line
                        if ',' in cleaned_line:
                            parts = cleaned_line.split(',')

                            if len(parts) > 1 and parts[1].strip():
                                numerical_part = parts[1].strip()
                            elif len(parts) > 0 and parts[0].strip():
                                numerical_part = parts[0].strip()
                            else:
                                continue


                        numerical_part = numerical_part.replace(',', '.')

                        try:
                            value = float(numerical_part)
                            temp_data.append(value)
                        except ValueError:
                            continue

                data = np.array(temp_data)
                if len(data) == 0:
                    continue

                segments = segment_signal(data)
                for seg in segments:
                    feats = extract_features(seg)
                    if not all(x == 0 for x in feats):
                        features.append(feats)
                        labels.append(fault_type)

# 5. DataFrame + Encode + Scale + SMOTE

columns = [
    "mean","std","rms","peak","skew","kurt",
    "crest_factor","shape_factor","impulse_factor","clearance_factor",
    "peak_to_peak","variance","activity","mobility","complexity",
    "dominant_freq","spectral_centroid","spectral_entropy","spectral_kurtosis",
    "band_energy_0_500","band_energy_500_1000","band_energy_1000_2000"
]

df = pd.DataFrame(features, columns=columns)
df["label"] = labels

le = LabelEncoder()
df["label_encoded"] = le.fit_transform(df["label"])

X = df.drop(["label","label_encoded"], axis=1)
y = df["label_encoded"]

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

sm = SMOTE()
X_res, y_res = sm.fit_resample(X_scaled, y)


# 6. Train/Test Split

X_train, X_test, y_train, y_test = train_test_split(
    X_res, y_res, test_size=0.25, random_state=42, stratify=y_res
)


# 7. Tuned XGBoost

model = XGBClassifier(
    n_estimators=1000,
    learning_rate=0.02,
    max_depth=9,
    subsample=0.8,
    colsample_bytree=0.8,
    gamma=1,
    reg_lambda=2,
    eval_metric='mlogloss'
)

model.fit(X_train, y_train)
y_pred = model.predict(X_test)


# 8. Evaluation

print("Accuracy:", accuracy_score(y_test, y_pred))
print("\nClassification Report:\n", classification_report(y_test, y_pred, target_names=le.classes_))

Accuracy: 0.9339622641509434

Classification Report:
                precision    recall  f1-score   support

     Cracking       0.91      0.96      0.94       159
        Ideal       0.95      0.92      0.93       159
Offset_Pulley       0.96      0.96      0.96       159
         Wear       0.92      0.90      0.91       159

     accuracy                           0.93       636
    macro avg       0.93      0.93      0.93       636
 weighted avg       0.93      0.93      0.93       636

