In [None]:
import torch
import h5py
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, TensorDataset
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import scipy.io
import scipy.signal
import pywt
from scipy.stats import kurtosis, skew
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.feature_selection import SelectKBest, f_classif
import warnings
warnings.filterwarnings('ignore')

class AESignalProcessor:
    """Complete AE Signal Processing and Classification System"""
    
    def __init__(self, mat_file_path):
        self.mat_file_path = mat_file_path
        self.data = None
        self.processed_data = {}
        self.features = None
        self.labels = None
        self.scaler = StandardScaler()
        self.label_encoder = LabelEncoder()
        self.models = {}
        self.results = {}
        
    
    def load_data(self):
        """Step 1: Load MATLAB v7.3 HDF5-based AE data"""
        print("Loading MAT v7.3 file using h5py...")
        self.data = h5py.File(self.mat_file_path, 'r')

        ae_all = self.data['AE_ALL']
        print("Top-level classes in AE_ALL:", list(ae_all.keys()))

        classes = ['BF', 'GF', 'TF', 'NI']
        self.processed_data = {}

        for class_name in classes:
            if class_name in ae_all:
                ref = ae_all[class_name][0][0]  # Get reference
                class_data = self.data[ref]    # Dereference
                print(f"{class_name}: shape {class_data.shape}")  # Should be (4, 40)

            # Channel 4 (index 3), shape (40,)
                signal_refs = class_data[3, :]
                signal_list = []

                for i in range(signal_refs.shape[0]):
                    signal_ref = signal_refs[i]
                    signal = np.array(self.data[signal_ref])[:, 0]  # shape (1000000,)
                    signal_list.append(signal)

                self.processed_data[class_name] = signal_list
                print(f"  -> Extracted {len(signal_list)} signals of shape {signal_list[0].shape}")
            else:
                print(f"Class {class_name} not found in AE_ALL.")

        return self.processed_data

    
    def burst_informed_signal_processing(self, signal, fs=1000000):
        """Step 2: Burst-Informed Signal Processing"""
        print("Applying burst-informed signal processing...")
        
        # Frame-based segmentation
        frame_length = 1024
        hop_length = 512
        frames = []
        
        for i in range(0, len(signal) - frame_length, hop_length):
            frame = signal[i:i + frame_length]
            frames.append(frame)
        
        # Adaptive wavelet decomposition with soft thresholding
        processed_frames = []
        for frame in frames:
            # Wavelet decomposition
            coeffs = pywt.wavedec(frame, 'db4', level=4)
            
            # Soft thresholding
            threshold = 0.1 * np.max(np.abs(coeffs[0]))
            coeffs_thresh = list(coeffs)
            for i in range(1, len(coeffs)):
                coeffs_thresh[i] = pywt.threshold(coeffs[i], threshold, 'soft')
            
            # Reconstruction
            reconstructed = pywt.waverec(coeffs_thresh, 'db4')
            processed_frames.append(reconstructed)
        
        # Burst-informed frame selection (select frames with high energy)
        energies = [np.sum(frame**2) for frame in processed_frames]
        energy_threshold = np.percentile(energies, 75)  # Top 25% energy frames
        
        selected_frames = [frame for frame, energy in zip(processed_frames, energies) 
                          if energy > energy_threshold]
        
        if len(selected_frames) == 0:
            selected_frames = processed_frames[:10]  # Fallback
        
        # Denoising and data augmentation
        denoised_signal = np.concatenate(selected_frames)
        
        return denoised_signal
    
    def extract_features(self, signal):
        """Step 3: Feature Extraction"""
        features = {}
        
        # Time Domain Features
        features['td_mean'] = np.mean(signal)
        features['td_std'] = np.std(signal)
        features['td_var'] = np.var(signal)
        features['td_rms'] = np.sqrt(np.mean(signal**2))
        features['td_peak'] = np.max(np.abs(signal))
        features['td_crest_factor'] = features['td_peak'] / features['td_rms']
        features['td_kurtosis'] = kurtosis(signal)
        features['td_skewness'] = skew(signal)
        features['td_energy'] = np.sum(signal**2)
        features['td_zero_crossings'] = np.sum(np.diff(np.sign(signal)) != 0)
        
        # Frequency Domain Features
        fft_signal = np.fft.fft(signal)
        freqs = np.fft.fftfreq(len(signal))
        magnitude = np.abs(fft_signal)
        
        features['fd_mean_freq'] = np.mean(freqs[:len(freqs)//2])
        features['fd_peak_freq'] = freqs[np.argmax(magnitude[:len(freqs)//2])]
        features['fd_spectral_centroid'] = np.sum(freqs[:len(freqs)//2] * magnitude[:len(freqs)//2]) / np.sum(magnitude[:len(freqs)//2])
        features['fd_spectral_rolloff'] = np.percentile(magnitude[:len(freqs)//2], 85)
        features['fd_spectral_flux'] = np.sum(np.diff(magnitude[:len(freqs)//2])**2)
        
        # Time-Frequency Domain Features
        f, t, Zxx = scipy.signal.stft(signal, nperseg=256)
        spectrogram = np.abs(Zxx)
        
        features['tfd_spectral_energy'] = np.sum(spectrogram**2)
        features['tfd_spectral_entropy'] = -np.sum(spectrogram * np.log(spectrogram + 1e-10))
        features['tfd_peak_time'] = t[np.argmax(np.sum(spectrogram, axis=0))]
        features['tfd_peak_freq'] = f[np.argmax(np.sum(spectrogram, axis=1))]
        
        # Higher Order Statistics
        features['hos_moment_3'] = np.mean(signal**3)
        features['hos_moment_4'] = np.mean(signal**4)
        features['hos_cumulant_3'] = np.mean((signal - np.mean(signal))**3)
        features['hos_cumulant_4'] = np.mean((signal - np.mean(signal))**4) - 3 * np.var(signal)**2
        
        # Burst Features
        burst_threshold = 3 * np.std(signal)
        burst_indices = np.where(np.abs(signal) > burst_threshold)[0]
        
        if len(burst_indices) > 0:
            features['burst_count'] = len(burst_indices)
            features['burst_duration'] = len(burst_indices) / len(signal)
            features['burst_energy'] = np.sum(signal[burst_indices]**2)
            features['burst_peak'] = np.max(np.abs(signal[burst_indices]))
        else:
            features['burst_count'] = 0
            features['burst_duration'] = 0
            features['burst_energy'] = 0
            features['burst_peak'] = 0
        
        return features
    
    def attention_based_fusion(self, features_dict):
        """Multi-domain feature fusion with attention mechanism"""
        # Group features by domain
        domains = {
            'td': [k for k in features_dict.keys() if k.startswith('td_')],
            'fd': [k for k in features_dict.keys() if k.startswith('fd_')],
            'tfd': [k for k in features_dict.keys() if k.startswith('tfd_')],
            'hos': [k for k in features_dict.keys() if k.startswith('hos_')],
            'burst': [k for k in features_dict.keys() if k.startswith('burst_')]
        }
        
        # Calculate attention weights based on feature variance
        domain_features = {}
        attention_weights = {}
        
        for domain, feature_keys in domains.items():
            domain_vals = [features_dict[k] for k in feature_keys if k in features_dict]
            if domain_vals:
                domain_features[domain] = np.array(domain_vals)
                attention_weights[domain] = np.var(domain_vals) + 1e-10
        
        # Normalize attention weights
        total_weight = sum(attention_weights.values())
        for domain in attention_weights:
            attention_weights[domain] /= total_weight
        
        # Apply attention weights
        weighted_features = []
        for domain, features_arr in domain_features.items():
            weighted_features.extend(features_arr * attention_weights[domain])
        
        return np.array(weighted_features)
    
    def feature_engineering(self, all_features, all_labels):
        """Step 4: Feature Engineering with Dimensionality Reduction"""
        print("Performing feature engineering...")
        
        # Convert to numpy arrays
        feature_matrix = np.array(all_features)
        label_array = np.array(all_labels)
        
        # Variational Autoencoder for dimensionality reduction (simplified PCA)
        pca = PCA(n_components=min(20, feature_matrix.shape[1]))
        compressed_features = pca.fit_transform(feature_matrix)
        
        # Burst-guided feature selection
        selector = SelectKBest(f_classif, k=min(15, compressed_features.shape[1]))
        selected_features = selector.fit_transform(compressed_features, label_array)
        
        # Feature scaling
        scaled_features = self.scaler.fit_transform(selected_features)
        
        return scaled_features, pca, selector
    
    class TabNet(nn.Module):
        """Simplified TabNet implementation"""
        def __init__(self, input_dim, output_dim, hidden_dim=64):
            super().__init__()
            self.feature_transformer = nn.Sequential(
                nn.Linear(input_dim, hidden_dim),
                nn.BatchNorm1d(hidden_dim),
                nn.ReLU(),
                nn.Dropout(0.2),
                nn.Linear(hidden_dim, hidden_dim),
                nn.BatchNorm1d(hidden_dim),
                nn.ReLU(),
                nn.Dropout(0.2),
                nn.Linear(hidden_dim, output_dim)
            )
            
        def forward(self, x):
            return self.feature_transformer(x)
    
    class XGBoostPyTorch(nn.Module):
        """PyTorch implementation of gradient boosting"""
        def __init__(self, input_dim, output_dim, n_estimators=100):
            super().__init__()
            self.trees = nn.ModuleList([
                nn.Sequential(
                    nn.Linear(input_dim, 32),
                    nn.ReLU(),
                    nn.Linear(32, output_dim)
                ) for _ in range(n_estimators)
            ])
            self.n_estimators = n_estimators
            
        def forward(self, x):
            outputs = []
            for tree in self.trees:
                outputs.append(tree(x))
            return torch.mean(torch.stack(outputs), dim=0)
    
    def train_intelligent_ensemble(self, X_train, y_train, X_test, y_test):
        """Step 5: Intelligent Ensemble Classifier"""
        print("Training intelligent ensemble classifier...")
        
        # Convert to PyTorch tensors
        X_train_tensor = torch.FloatTensor(X_train)
        y_train_tensor = torch.LongTensor(y_train)
        X_test_tensor = torch.FloatTensor(X_test)
        y_test_tensor = torch.LongTensor(y_test)
        
        # Create data loaders
        train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
        train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
        
        input_dim = X_train.shape[1]
        output_dim = len(np.unique(y_train))
        
        # Initialize models
        tabnet = self.TabNet(input_dim, output_dim)
        xgboost_model = self.XGBoostPyTorch(input_dim, output_dim)
        
        # SVM using sklearn
        svm_model = SVC(probability=True, random_state=42)
        svm_model.fit(X_train, y_train)
        
        # Random Forest Meta Classifier
        rf_meta = RandomForestClassifier(n_estimators=100, random_state=42)
        
        # Train TabNet
        tabnet_optimizer = optim.Adam(tabnet.parameters(), lr=0.001)
        tabnet_criterion = nn.CrossEntropyLoss()
        
        tabnet.train()
        for epoch in range(50):
            for batch_x, batch_y in train_loader:
                tabnet_optimizer.zero_grad()
                outputs = tabnet(batch_x)
                loss = tabnet_criterion(outputs, batch_y)
                loss.backward()
                tabnet_optimizer.step()
        
        # Train XGBoost model
        xgboost_optimizer = optim.Adam(xgboost_model.parameters(), lr=0.001)
        xgboost_criterion = nn.CrossEntropyLoss()
        
        xgboost_model.train()
        for epoch in range(50):
            for batch_x, batch_y in train_loader:
                xgboost_optimizer.zero_grad()
                outputs = xgboost_model(batch_x)
                loss = xgboost_criterion(outputs, batch_y)
                loss.backward()
                xgboost_optimizer.step()
        
        # Get predictions from base classifiers
        tabnet.eval()
        xgboost_model.eval()
        
        with torch.no_grad():
            tabnet_pred = torch.softmax(tabnet(X_train_tensor), dim=1).numpy()
            xgboost_pred = torch.softmax(xgboost_model(X_train_tensor), dim=1).numpy()
        
        svm_pred = svm_model.predict_proba(X_train)
        
        # Stack predictions for meta-learning
        meta_features = np.hstack([tabnet_pred, xgboost_pred, svm_pred])
        rf_meta.fit(meta_features, y_train)
        
        # Test predictions
        with torch.no_grad():
            tabnet_test_pred = torch.softmax(tabnet(X_test_tensor), dim=1).numpy()
            xgboost_test_pred = torch.softmax(xgboost_model(X_test_tensor), dim=1).numpy()
        
        svm_test_pred = svm_model.predict_proba(X_test)
        meta_test_features = np.hstack([tabnet_test_pred, xgboost_test_pred, svm_test_pred])
        
        final_predictions = rf_meta.predict(meta_test_features)
        
        # Store models
        self.models = {
            'tabnet': tabnet,
            'xgboost': xgboost_model,
            'svm': svm_model,
            'meta_classifier': rf_meta
        }
        
        return final_predictions
    
    def evaluate_output(self, y_test, predictions):
        """Step 6: Output Evaluation"""
        print("Evaluating results...")
        
        # Calculate metrics
        accuracy = accuracy_score(y_test, predictions)
        report = classification_report(y_test, predictions, output_dict=True)
        cm = confusion_matrix(y_test, predictions)
        
        # Store results
        self.results = {
            'accuracy': accuracy,
            'classification_report': report,
            'confusion_matrix': cm,
            'predictions': predictions,
            'true_labels': y_test
        }
        
        # Print results
        print(f"\n=== EVALUATION RESULTS ===")
        print(f"Accuracy: {accuracy:.4f}")
        print(f"Classification Report:")
        print(classification_report(y_test, predictions))
        
        return self.results
    
    def explain_results(self, X_test):
        """Generate explainable results using LIME-like feature importance"""
        print("Generating explainable results...")
        
        # Feature importance from Random Forest meta-classifier
        if 'meta_classifier' in self.models:
            feature_importance = self.models['meta_classifier'].feature_importances_
            
            # Create feature importance plot
            plt.figure(figsize=(10, 6))
            plt.bar(range(len(feature_importance)), feature_importance)
            plt.title('Feature Importance for AE Signal Classification')
            plt.xlabel('Meta-Feature Index')
            plt.ylabel('Importance')
            plt.show()
            
            return feature_importance
        
        return None
    
    def run_complete_pipeline(self, num_samples_per_class=10):
        """Run the complete pipeline from start to finish"""
        print("=== STARTING COMPLETE AE SIGNAL CLASSIFICATION PIPELINE ===\n")
        
        # Step 1: Load data
        self.load_data()
        
        # Step 2-3: Process signals and extract features
        all_features = []
        all_labels = []
        
        for class_name, class_data in self.processed_data.items():
            print(f"\nProcessing class: {class_name}")
            
            # Sample signals from the class
            if len(class_data) > num_samples_per_class:
                indices = np.random.choice(len(class_data), num_samples_per_class, replace=False)
                sampled_signals = class_data[indices]
            else:
                sampled_signals = class_data
            
            for i, signal in enumerate(sampled_signals):
                if len(signal) > 1000:  # Ensure signal is long enough
                    # Step 2: Burst-informed signal processing
                    processed_signal = self.burst_informed_signal_processing(signal)
                    
                    # Step 3: Feature extraction
                    features = self.extract_features(processed_signal)
                    
                    # Attention-based fusion
                    fused_features = self.attention_based_fusion(features)
                    
                    all_features.append(fused_features)
                    all_labels.append(class_name)
                    
                    if (i + 1) % 5 == 0:
                        print(f"  Processed {i + 1} signals...")
        
        # Step 4: Feature engineering
        feature_matrix, pca, selector = self.feature_engineering(all_features, all_labels)
        
        # Encode labels
        encoded_labels = self.label_encoder.fit_transform(all_labels)
        
        # Split data
        X_train, X_test, y_train, y_test = train_test_split(
            feature_matrix, encoded_labels, test_size=0.2, random_state=42, stratify=encoded_labels
        )
        
        print(f"\nTraining set size: {X_train.shape}")
        print(f"Test set size: {X_test.shape}")
        
        # Step 5: Train intelligent ensemble
        predictions = self.train_intelligent_ensemble(X_train, y_train, X_test, y_test)
        
        # Step 6: Evaluate results
        results = self.evaluate_output(y_test, predictions)
        
        # Generate explainable results
        feature_importance = self.explain_results(X_test)
        
        print("\n=== PIPELINE COMPLETED SUCCESSFULLY ===")
        print(f"Final Accuracy: {results['accuracy']:.4f}")
        
        return results

# Usage Example (uncomment and modify path when you have the actual .mat file)
"""
# Initialize the processor
processor = AESignalProcessor('path_to_your_mat_file.mat')

# Run the complete pipeline
results = processor.run_complete_pipeline(num_samples_per_class=10)

# Access results
print("Final Results:")
print(f"Accuracy: {results['accuracy']:.4f}")
print("Classes:", processor.label_encoder.classes_)
"""

print("Complete AE Signal Classification System is ready!")
print("To use: Initialize with your .mat file path and call run_complete_pipeline()")
print("Example: processor = AESignalProcessor('your_file.mat')")
print("         results = processor.run_complete_pipeline()")

Complete AE Signal Classification System is ready!
To use: Initialize with your .mat file path and call run_complete_pipeline()
Example: processor = AESignalProcessor('your_file.mat')
         results = processor.run_complete_pipeline()


In [2]:
processor = AESignalProcessor(r'E:\1 Paper MCT\Cutting Tool Paper\Dataset\cutting tool data\mat files data\AE_ALL.mat')
processor.load_data()


Loading MAT v7.3 file using h5py...
Top-level classes in AE_ALL: ['BF', 'BFI', 'GF', 'GFI', 'N', 'NI', 'TF']
BF: shape (1000000, 40)


TypeError: Accessing a group is done with bytes or str, not <class 'numpy.float64'>

In [None]:
results = processor.run_complete_pipeline(num_samples_per_class=10)
