In [None]:
import numpy as np
import pandas as pd
from scipy import signal, stats
from scipy.signal import welch
from sklearn.preprocessing import StandardScaler
import warnings
warnings.filterwarnings('ignore')


# ============ FEATURE EXTRACTION ============

def extract_comprehensive_features(horizontal_acc, axial_acc, vertical_acc, tachometer, fs=20480):
    """
    Estrae feature dal segnale triassiale + tachimetro.
    Ritorna un dizionario {feature_name: value}.
    """
    features = {}

    # --- TIME & FREQ FEATURES per asse ---
    for axis_name, signal_data in [('horizontal', horizontal_acc),
                                   ('axial', axial_acc),
                                   ('vertical', vertical_acc)]:
        signal_data = np.asarray(signal_data).astype(float)

        # Statistiche base
        rms = np.sqrt(np.mean(signal_data**2)) if len(signal_data) else 0.0
        std = np.std(signal_data) if len(signal_data) else 0.0
        kurt = stats.kurtosis(signal_data) if len(signal_data) else 0.0
        skew = stats.skew(signal_data) if len(signal_data) else 0.0
        peak = float(np.max(np.abs(signal_data))) if len(signal_data) else 0.0

        features[f'{axis_name}_rms'] = rms
        features[f'{axis_name}_std'] = std
        features[f'{axis_name}_kurtosis'] = kurt
        features[f'{axis_name}_skewness'] = skew
        features[f'{axis_name}_peak'] = peak
        features[f'{axis_name}_crest_factor'] = (peak / rms) if rms > 1e-12 else 0.0
        mean_abs = np.mean(np.abs(signal_data)) if len(signal_data) else 0.0
        features[f'{axis_name}_shape_factor'] = (rms / mean_abs) if mean_abs > 1e-12 else 0.0
        features[f'{axis_name}_impulse_factor'] = (peak / mean_abs) if mean_abs > 1e-12 else 0.0

        # Energia in bande
        if len(signal_data) >= 1024:
            freqs, psd = welch(signal_data, fs=fs, nperseg=1024)
            features[f'{axis_name}_energy_low']  = float(np.sum(psd[(freqs >= 10) & (freqs <= 100)]))
            features[f'{axis_name}_energy_mid']  = float(np.sum(psd[(freqs >= 100) & (freqs <= 1000)]))
            features[f'{axis_name}_energy_high'] = float(np.sum(psd[(freqs >= 1000) & (freqs <= 5000)]))
        else:
            features[f'{axis_name}_energy_low']  = 0.0
            features[f'{axis_name}_energy_mid']  = 0.0
            features[f'{axis_name}_energy_high'] = 0.0

    # --- TACHOMETER FEATURES ---
    rpm_estimation = extract_rpm_from_tachometer(np.asarray(tachometer).astype(float), fs)
    features['rpm_mean'] = float(np.mean(rpm_estimation)) if len(rpm_estimation) else 0.0
    features['rpm_std']  = float(np.std(rpm_estimation)) if len(rpm_estimation) else 0.0

    # --- CROSS-AXIS CORRELATIONS ---
    def _safe_corr(a, b):
        a = np.asarray(a).astype(float); b = np.asarray(b).astype(float)
        if len(a) < 2 or len(b) < 2: return 0.0
        try:
            return float(np.corrcoef(a, b)[0, 1])
        except Exception:
            return 0.0

    features['corr_horiz_axial']    = _safe_corr(horizontal_acc, axial_acc)
    features['corr_horiz_vertical'] = _safe_corr(horizontal_acc, vertical_acc)
    features['corr_axial_vertical'] = _safe_corr(axial_acc, vertical_acc)

    # --- COMPOSITE ---
    ha = np.asarray(horizontal_acc).astype(float)
    aa = np.asarray(axial_acc).astype(float)
    va = np.asarray(vertical_acc).astype(float)
    if len(ha) and len(aa) and len(va):
        total_vibration = np.sqrt(ha**2 + aa**2 + va**2)
        features['total_rms'] = float(np.sqrt(np.mean(total_vibration**2)))
        features['total_kurtosis'] = float(stats.kurtosis(total_vibration))
    else:
        features['total_rms'] = 0.0
        features['total_kurtosis'] = 0.0

    axis_energies = [features['horizontal_rms'], features['axial_rms'], features['vertical_rms']]
    denom = sum(axis_energies) + 1e-8
    features['dominant_axis_ratio'] = float(max(axis_energies) / denom)

    return features


def extract_rpm_from_tachometer(tachometer_signal, fs):
    """
    Stima RPM dal tachimetro. Restituisce un array di RPM.
    """
    t = np.asarray(tachometer_signal).astype(float)
    if len(t) < 4 or np.allclose(np.max(np.abs(t)), 0.0):
        return np.array([1000.0])

    thresh = 0.5 * np.max(t)
    peaks, _ = signal.find_peaks(t, height=thresh)
    if len(peaks) < 2:
        return np.array([1000.0])

    intervals_s = np.diff(peaks) / float(fs)
    with np.errstate(divide='ignore', invalid='ignore'):
        rpm_values = 60.0 / intervals_s
        rpm_values = rpm_values[np.isfinite(rpm_values)]

    if len(rpm_values) == 0:
        return np.array([1000.0])

    rpm_median = np.median(rpm_values)
    rpm_filtered = rpm_values[np.abs(rpm_values - rpm_median) < 0.2 * rpm_median]
    return rpm_filtered if len(rpm_filtered) > 0 else np.array([rpm_median])


# ============ TREND & SYNTHETIC DATA ============

def compute_degradation_trends(features_dict):
    """
    features_dict: {class_id: list of feature vectors (consistent order)}
    Ritorna trend per coppie di classi adiacenti.
    """
    available_classes = sorted(features_dict.keys())
    trends = {}

    for i in range(len(available_classes) - 1):
        c_low = available_classes[i]
        c_up  = available_classes[i + 1]
        lower = np.asarray(features_dict[c_low])
        upper = np.asarray(features_dict[c_up])
        if lower.size == 0 or upper.size == 0:
            continue

        n_feat = min(lower.shape[1], upper.shape[1])
        trend = {}
        for f in range(n_feat):
            lower_mean = float(np.mean(lower[:, f]))
            upper_mean = float(np.mean(upper[:, f]))
            rate = (upper_mean - lower_mean) / max(1, (c_up - c_low))
            trend[f] = {'rate': rate, 'lower_mean': lower_mean, 'upper_mean': upper_mean}

        trends[f'{c_low}_to_{c_up}'] = trend

    return trends


def intelligent_interpolation(features_dict, target_class, degradation_trends):
    """
    Interpolazione tra classi vicine con aggiustamento al trend.
    """
    available = sorted(features_dict.keys())
    lower_classes = [c for c in available if c < target_class]
    upper_classes = [c for c in available if c > target_class]

    if not lower_classes or not upper_classes:
        return extrapolate_missing_class(features_dict, target_class, degradation_trends)

    cl = max(lower_classes)
    cu = min(upper_classes)

    alpha = (target_class - cl) / max(1, (cu - cl))
    lower = np.asarray(features_dict[cl])
    upper = np.asarray(features_dict[cu])

    if lower.size == 0 or upper.size == 0:
        return np.empty((0, lower.shape[1] if lower.size else (upper.shape[1] if upper.size else 0)))

    n = min(len(lower), len(upper))
    n_feat = min(lower.shape[1], upper.shape[1])
    synth = []

    trend_key = f'{cl}_to_{cu}'
    for i in range(n):
        row = []
        for f in range(n_feat):
            base_interp = (1 - alpha) * lower[i, f] + alpha * upper[i, f]
            if trend_key in degradation_trends and f in degradation_trends[trend_key]:
                tinfo = degradation_trends[trend_key][f]
                expected = tinfo['lower_mean'] + tinfo['rate'] * (target_class - cl)
                row.append(0.7 * base_interp + 0.3 * expected)
            else:
                row.append(base_interp)
        synth.append(row)

    return np.asarray(synth)


def extrapolate_missing_class(features_dict, target_class, degradation_trends):
    """
    Estrapolazione conservativa per classi estreme.
    """
    available = sorted(features_dict.keys())
    if len(available) == 0:
        return np.empty((0, 0))

    if target_class > max(available) and len(available) >= 2:
        ref = available[-2:]
        trend_key = f'{ref[0]}_to_{ref[1]}'
        base = np.asarray(features_dict[ref[1]])
        if base.size and trend_key in degradation_trends:
            out = []
            for sample in base:
                row = []
                for f in range(len(sample)):
                    tinfo = degradation_trends[trend_key].get(f, {'rate': 0.0})
                    extrapolation_factor = target_class - ref[1]
                    damping = np.exp(-0.1 * extrapolation_factor)
                    row.append(sample[f] + tinfo['rate'] * extrapolation_factor * damping)
                out.append(row)
            return np.asarray(out)

    # fallback: duplica la classe più vicina + rumore relativo
    closest = min(available, key=lambda x: abs(x - target_class))
    base = np.asarray(features_dict[closest])
    if base.size == 0:
        return np.empty((0, 0))
    noise_scale = 0.1 * abs(target_class - closest)
    noise = np.random.normal(0, noise_scale, base.shape)
    return base + base * noise


def add_physics_informed_noise(synthetic_data, reference_features, target_class, feature_names):
    """
    Aggiunge rumore realistico e clippa feature non-negative.
    - synthetic_data: (n_synth, n_feat)
    - reference_features: list/array di feature reali per stimare la scala del rumore
    - feature_names: lista di nomi in ordine (serve per i vincoli fisici)
    """
    if synthetic_data.size == 0:
        return synthetic_data

    reference_array = np.asarray(reference_features)
    if reference_array.size == 0:
        # fallback: rumore fisso
        noise_profile = np.ones(synthetic_data.shape[1]) * 0.1
    else:
        noise_profile = np.std(reference_array, axis=0)
        noise_profile = np.where(np.isfinite(noise_profile), noise_profile, 0.0)

    degradation_factor = target_class / 10.0  # 0..1
    adaptive_noise_scale = 0.05 + 0.15 * degradation_factor

    noisy = []
    for sample in synthetic_data:
        noise = np.random.normal(0, adaptive_noise_scale * noise_profile)
        ns = sample + noise

        # vincoli fisici per feature >= 0
        nonneg_tokens = ('rms', 'energy', 'std', 'peak')
        for i, fname in enumerate(feature_names):
            if any(tok in fname for tok in nonneg_tokens):
                ns[i] = max(1e-6, ns[i])

        noisy.append(ns)

    return np.asarray(noisy)


def validate_synthetic_realism(real_features_dict, synthetic_features_dict):
    """
    Stima un punteggio di 'realismo' confrontando distribuzioni (KS test) con classi vicine.
    """
    validation_results = {}
    for missing_class, synth in synthetic_features_dict.items():
        synth = np.asarray(synth)
        if synth.size == 0:
            validation_results[missing_class] = {'realism_score': 0.0, 'n_samples': 0, 'feature_consistency': False}
            continue

        available_classes = list(real_features_dict.keys())
        if not available_classes:
            validation_results[missing_class] = {'realism_score': 0.0, 'n_samples': len(synth), 'feature_consistency': False}
            continue

        closest = sorted(available_classes, key=lambda x: abs(x - missing_class))[:2]
        accum = 0.0
        feat_var_list = []

        for cc in closest:
            real = np.asarray(real_features_dict[cc])
            if real.size == 0:
                continue
            n_feat = min(synth.shape[1], real.shape[1])
            sims = []
            for f in range(n_feat):
                ks_stat, _ = stats.ks_2samp(synth[:, f], real[:, f])
                sims.append(1.0 - float(ks_stat))
            if sims:
                accum += float(np.mean(sims))
                feat_var_list.extend(sims)

        if not feat_var_list:
            realism = 0.0
            consistency = False
        else:
            realism = accum / max(1, len(closest))
            consistency = (np.std(feat_var_list) < 0.3)

        validation_results[missing_class] = {
            'realism_score': float(realism),
            'n_samples': int(len(synth)),
            'feature_consistency': bool(consistency)
        }
        print(f"Classe {missing_class}: Realismo={validation_results[missing_class]['realism_score']:.3f}")

    return validation_results


# ============ STRATEGIA COMPLETA ============

def enhanced_missing_class_strategy_from_dataframe(df, label_col='label',
                                                   h_col='horizontal_acceleration',
                                                   a_col='axial_acceleration',
                                                   v_col='vertical_acceleration',
                                                   t_col='tachometer_signal',
                                                   fs=20480,
                                                   missing_classes=(5, 7, 9, 10)):
    """
    Pipeline completa a partire da un DataFrame.
    Ritorna: dict con features (scaled), labels, scaler, feature_names, synthetic_validation, degradation_trends
    """
    # 1) Prepara feature_names UNA VOLTA, per mantenere l'ordine
    feature_names = sorted(extract_comprehensive_features(
        np.zeros(1024), np.zeros(1024), np.zeros(1024), np.zeros(1024), fs=fs
    ).keys())

    # 2) Estrazione feature reali per classe
    print("=== PHASE 1: Feature Extraction ===")
    features_by_class = {}
    counts_by_class = {}

    for class_id, group in df.groupby(label_col):
        class_features = []
        for _, row in group.iterrows():
            sample_features = extract_comprehensive_features(
                horizontal_acc=row[h_col],
                axial_acc=row[a_col],
                vertical_acc=row[v_col],
                tachometer=row[t_col],
                fs=fs
            )
            # vettore in ordine fisso
            vec = [sample_features[k] for k in feature_names]
            class_features.append(vec)

        features_by_class[int(class_id)] = class_features
        counts_by_class[int(class_id)] = len(class_features)
        n_feat = len(feature_names) if class_features else 0
        print(f"Classe {class_id}: {len(class_features)} campioni, {n_feat} features")

    # 3) Trend
    print("\n=== PHASE 2: Degradation Trend Analysis ===")
    degradation_trends = compute_degradation_trends(features_by_class)

    # 4) Dati sintetici per classi mancanti
    print("\n=== PHASE 3: Synthetic Data Generation ===")
    synthetic_features = {}

    for mc in missing_classes:
        print(f"\nGenerando dati per classe {mc}...")
        base = intelligent_interpolation(features_by_class, mc, degradation_trends)
        # scegli la classe reale più vicina come riferimento per il rumore
        if features_by_class:
            closest_real = min(features_by_class.keys(), key=lambda x: abs(x - mc))
            ref_feats = features_by_class[closest_real]
        else:
            ref_feats = []
        noisy = add_physics_informed_noise(base, ref_feats, mc, feature_names)
        synthetic_features[int(mc)] = noisy
        print(f"  → Generati {len(noisy)} campioni sintetici")

    # 5) Validazione
    print("\n=== PHASE 4: Validation ===")
    validation_results = validate_synthetic_realism(features_by_class, synthetic_features)

    # 6) Integrazione
    print("\n=== PHASE 5: Data Integration ===")
    all_features = []
    all_labels = []

    for cid, feats in features_by_class.items():
        all_features.extend(feats)
        all_labels.extend([cid] * len(feats))

    for cid, feats in synthetic_features.items():
        if feats.size:
            all_features.extend(feats.tolist())
            all_labels.extend([cid] * len(feats))

    all_features = np.asarray(all_features, dtype=float)
    all_labels = np.asarray(all_labels, dtype=int)

    if all_features.size == 0:
        raise ValueError("Nessuna feature disponibile dopo integrazione (reali + sintetiche).")

    scaler = StandardScaler()
    all_features_scaled = scaler.fit_transform(all_features)

    print(f"\nDataset finale: {all_features.shape[0]} campioni, {all_features.shape[1]} features")
    unique, counts = np.unique(all_labels, return_counts=True)
    print(f"Distribuzione classi: {dict(zip(unique.tolist(), counts.tolist()))}")

    return {
        'features': all_features_scaled,
        'labels': all_labels,
        'scaler': scaler,
        'feature_names': feature_names,
        'synthetic_validation': validation_results,
        'degradation_trends': degradation_trends
    }


# ============ ESEMPIO D'USO ============

# Carica il tuo pickle
train_data = pd.read_pickle('../data/processed/train_data_dowsampled_walt.pkl')
print(train_data.dtypes)

result = enhanced_missing_class_strategy_from_dataframe(
    train_data,
    label_col='health_level',
    h_col='horizontal_acceleration',
    a_col='axial_acceleration',
    v_col='vertical_acceleration',
    t_col='tachometer_signal',
    fs=20480,
    missing_classes=(5, 7, 9, 10)  # modifica se serve
)

features = result['features']
labels = result['labels']

print("features shape:", features.shape)
print("labels shape:", labels.shape)


file_name                   object
etichetta                   object
health_level                 int64
velocita                     int64
torque                       int64
rep                          int64
sampling_rate                int64
descrizione                 object
duration                   float64
num_samples                  int64
horizontal_acceleration     object
axial_acceleration          object
vertical_acceleration       object
tachometer_signal           object
dtype: object
=== PHASE 1: Feature Extraction ===
Classe 0: 287 campioni, 41 features
Classe 1: 295 campioni, 41 features
Classe 2: 291 campioni, 41 features
Classe 3: 267 campioni, 41 features
Classe 4: 304 campioni, 41 features
Classe 6: 276 campioni, 41 features
Classe 8: 296 campioni, 41 features

=== PHASE 2: Degradation Trend Analysis ===

=== PHASE 3: Synthetic Data Generation ===

Generando dati per classe 5...
  → Generati 276 campioni sintetici

Generando dati per classe 7...
  → Generati 276 ca

In [21]:
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report

# ================= PREPARA DATI =================
X = result["features"]                      # (N, F) float
y = result["labels"]                        # (N,) int o categ
if y.dtype != np.int32 and y.dtype != np.int64:
    y = LabelEncoder().fit_transform(y)

n_features = X.shape[1]
n_classes = len(np.unique(y))

# Aggiungi dimensione canale per Conv1D: (N, F) -> (N, F, 1)
X = X[..., np.newaxis]

# Split train/test
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, stratify=y, random_state=42
)

# ================= MODELL0 =================
model = tf.keras.Sequential([
    tf.keras.layers.Conv1D(32, 7, padding='same', activation='relu', input_shape=(n_features, 1)),
    tf.keras.layers.Conv1D(64, 5, padding='same', activation='relu'),
    tf.keras.layers.GlobalAveragePooling1D(),
    tf.keras.layers.Dense(64, activation='relu'),
    tf.keras.layers.Dense(n_classes, activation='softmax')
])

model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

# ================= TRAINING =================
history = model.fit(
    X_train, y_train,
    validation_split=0.2,
    epochs=50,
    batch_size=32,
    verbose=1
)

# ================= VALUTAZIONE =================
y_pred = model.predict(X_test).argmax(axis=1)
print(classification_report(y_test, y_pred))


Epoch 1/50
[1m64/64[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 26ms/step - accuracy: 0.1236 - loss: 2.3550 - val_accuracy: 0.1818 - val_loss: 2.2950
Epoch 2/50
[1m64/64[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 14ms/step - accuracy: 0.1815 - loss: 2.2669 - val_accuracy: 0.2075 - val_loss: 2.2163
Epoch 3/50
[1m64/64[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 14ms/step - accuracy: 0.2028 - loss: 2.2032 - val_accuracy: 0.2154 - val_loss: 2.1704
Epoch 4/50
[1m64/64[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 14ms/step - accuracy: 0.2166 - loss: 2.1313 - val_accuracy: 0.2352 - val_loss: 2.0697
Epoch 5/50
[1m64/64[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 14ms/step - accuracy: 0.2537 - loss: 2.0532 - val_accuracy: 0.2510 - val_loss: 2.0279
Epoch 6/50
[1m64/64[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 20ms/step - accuracy: 0.2671 - loss: 1.9928 - val_accuracy: 0.3063 - val_loss: 1.9454
Epoch 7/50
[1m64/64[0m [32m━━━━