In [23]:
import numpy as np
import pandas as pd
from scipy.stats import kurtosis, skew, multivariate_normal
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from scipy.ndimage import uniform_filter1d

from scipy import stats
%matplotlib tk

In [24]:
V_SF = pd.read_csv('MotorVRB_2025.csv')
V_2F = pd.read_csv('MotorV2F_2025.csv')

In [3]:
assimetria = V_SF[['Sinal-C0', 'Sinal-C20', 'Sinal-C40', 'Sinal-C60', 'Sinal-C80', 'Sinal-C100']].skew()
assimetria2 = V_2F[['Sinal-C0', 'Sinal-C20', 'Sinal-C40', 'Sinal-C60', 'Sinal-C80', 'Sinal-C100']].skew()

# Combinamos em um novo DataFrame
df_skew = pd.DataFrame({
    'Healthy': assimetria,
    'Fault': assimetria2,
})

# Plot agrupado (3 barras por variável)
fig = plt.figure(figsize=(12, 6), facecolor='white')
ax = fig.add_subplot(111)

df_skew.plot(kind='bar', ax=ax, width=0.8)

ax.set_facecolor('white')
ax.set_ylabel('Assimetria', fontsize=25)  # Atualizado para "Assimetria"
ax.tick_params(axis='both', labelsize=25)
ax.axhline(y=0, color='black', linewidth=1)
ax.grid(False)
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.spines['left'].set_visible(False)
ax.spines['bottom'].set_visible(False)

# Criar borda preta ao redor do gráfico
xlim = ax.get_xlim()
ylim = ax.get_ylim()
rect = patches.Rectangle(
    (xlim[0], ylim[0]),
    xlim[1] - xlim[0],
    ylim[1] - ylim[0],
    linewidth=2,
    edgecolor='black',
    facecolor='none',
    transform=ax.transData,
    zorder=10
)
ax.add_patch(rect)

plt.tight_layout()
plt.legend(fontsize=20)
plt.show()

In [4]:
def compute_features(df):
    return {
        'RMS': df.apply(lambda x: np.sqrt(np.mean(x**2))),
        'Mean': df.mean(),
        'STD': df.std(),
        'Variance': df.var(),
        'Peak': df.max(),
        'ShapeFactor': df.apply(lambda x: np.sqrt(np.mean(x**2)) / np.mean(np.abs(x))),
        'ImpulseFactor': df.apply(lambda x: np.max(np.abs(x)) / np.mean(np.abs(x))),
    }

# Lista de columnas por carga
cols = ['Sinal-C0', 'Sinal-C20', 'Sinal-C40', 'Sinal-C60', 'Sinal-C80', 'Sinal-C100']

# Extraer características para cada señal
features_healthy = compute_features(V_SF[cols])
features_fault = compute_features(V_2F[cols])

# Para cada característica, graficar
for feature_name in features_healthy.keys():
    healthy_vals = features_healthy[feature_name]
    fault_vals = features_fault[feature_name]

    df_feat = pd.DataFrame({
        'Healthy': healthy_vals,
        'Fault': fault_vals
    })

    # Plot estilo barras agrupadas
    fig = plt.figure(figsize=(12, 6), facecolor='white')
    ax = fig.add_subplot(111)

    df_feat.plot(kind='bar', ax=ax, width=0.8)

    ax.set_facecolor('white')
    ax.set_ylabel(feature_name, fontsize=25)
    ax.tick_params(axis='both', labelsize=20)
    ax.axhline(y=0, color='black', linewidth=1)
    ax.grid(False)
    ax.spines['top'].set_visible(False)
    ax.spines['right'].set_visible(False)
    ax.spines['left'].set_visible(False)
    ax.spines['bottom'].set_visible(False)

    # Borda negra ao redor do gráfico
    xlim = ax.get_xlim()
    ylim = ax.get_ylim()
    rect = patches.Rectangle(
        (xlim[0], ylim[0]),
        xlim[1] - xlim[0],
        ylim[1] - ylim[0],
        linewidth=2,
        edgecolor='black',
        facecolor='none',
        transform=ax.transData,
        zorder=10
    )
    ax.add_patch(rect)

    plt.tight_layout()
    plt.legend(fontsize=20)
    plt.title(f'Comparación: {feature_name}', fontsize=22)
    plt.show()

<center><h1>Time-Domain Features</center></h1>

In [25]:
def calculate_rms(signal):
    return np.sqrt(np.mean(signal**2))

def extract_signal_stats(signal, window_size=1680):
    start = 0
    stats_list = []
    epsilon = 1e-8  # Small value to avoid division by zero

    while start < len(signal):
        end = min(start + window_size, len(signal))
        cycle = signal[start:end]

        mean_val = np.mean(cycle)
        rms_val = calculate_rms(cycle)
        peak_val = np.max(np.abs(cycle))

        stats_list.append({
            'Skewness': skew(cycle, bias=False),
            'Variance': np.var(cycle),
            'StandardDeviation': np.std(cycle),
            'RMS': rms_val,
            'ImpulseFactor': peak_val / (np.abs(mean_val) + epsilon),
        })

        start += window_size  # Move to next window

    df_stats = pd.DataFrame(stats_list)
    return df_stats

In [26]:
RMS_0 = extract_signal_stats(V_SF['Sinal-C0'])
RMS_20 = extract_signal_stats(V_SF['Sinal-C20'])
RMS_40 = extract_signal_stats(V_SF['Sinal-C40'])
RMS_60 = extract_signal_stats(V_SF['Sinal-C60'])
RMS_80 = extract_signal_stats(V_SF['Sinal-C80'])
RMS_100 = extract_signal_stats(V_SF['Sinal-C100'])

In [27]:
RMS_F0 = extract_signal_stats(V_2F['Sinal-C0'])
RMS_F20 = extract_signal_stats(V_2F['Sinal-C20'])
RMS_F40 = extract_signal_stats(V_2F['Sinal-C40'])
RMS_F60 = extract_signal_stats(V_2F['Sinal-C60'])
RMS_F80 = extract_signal_stats(V_2F['Sinal-C80'])
RMS_F100 = extract_signal_stats(V_2F['Sinal-C100'])

In [28]:
SF = pd.concat([RMS_0, RMS_20, RMS_40,
                         RMS_60, RMS_80, RMS_100], ignore_index=True)

In [29]:
F2 = pd.concat([RMS_F0, RMS_F20, RMS_F40,
                         RMS_F60, RMS_F80, RMS_F100], ignore_index=True)

In [63]:
def graph(variable):
    plt.figure()
    plt.plot(RMS_SF[f'{variable}'], linewidth=0.5, label=f'Healthy {variable}')
    plt.plot(RMS_2F[f'{variable}'], linewidth=0.5, label=f'Faulty {variable}')
    plt.xlabel('Sample')
    plt.ylabel(variable)
    plt.legend(loc='best')
    plt.show()

var = ['RMS', 'Skewness', 'Variance', 'StandardDeviation', 'ImpulseFactor']

for v in var:
    graph(v)

In [262]:
time_domain = extract_signal_stats(V_SF['Sinal-C100'])

In [289]:
time_domain2 = extract_signal_stats(V_2F['Sinal-C100'])

<h2>Feature Statistics at 80% Load DataFrame

In [34]:
time_domain.to_csv('TD_VSF80.csv', index=False)

<h2> Step 2: Expand PF State Vector for Feature Tracking

In [30]:
def state_transition(x_prev, stds, eta=0.001):
    """
    :param x_prev: 6 elements array (5 features + Health)
    :param stds: Pandas series with standard deviations of each feature
    :param eta: Health decay
    :return: New state array
    """
    x_prev = np.array(x_prev, dtype=float)
    x_next = np.array(x_prev, dtype=float)

    for i in range(5):
        noise = np.random.normal(loc=0.0, scale=stds.iloc[i])
        x_next[i] += noise

    x_next[5] = max(x_prev[5] - eta, 0.0)

    return x_next

In [31]:
def calculate_likelihood(z, x_mean, stds):
    z = np.array(z, dtype=float)
    x_mean = np.array(x_mean, dtype=float)
    var = stds.values ** 2
    diff = z - x_mean
    exponent = -0.5 * np.sum((diff ** 2) / var)
    norm_const = np.sqrt((2 * np.pi) ** len(z) * np.prod(var))
    return np.exp(exponent) / norm_const

In [32]:
def update_weights(particles, weights, observation, stds):
    """
    Actualiza los pesos de las partículas según la observación real.

    :param particles: np.ndarray de shape (N, 6) → estados de las partículas
    :param weights: np.ndarray de shape (N,) → pesos actuales
    :param observation: np.ndarray de shape (5,) → observación real
    :param stds: pd.Series con std de cada característica
    :return: np.ndarray de pesos normalizados
    """
    N = particles.shape[0]
    new_weights = np.zeros(N)

    for i in range(N):
        x_i = particles[i, :5]  # estado predicho
        likelihood = calculate_likelihood(observation, x_i, stds)
        new_weights[i] = weights[i] * likelihood

    # Evitar división por cero
    weight_sum = np.sum(new_weights)
    if weight_sum == 0:
        new_weights[:] = 1.0 / N
    else:
        new_weights /= weight_sum

    return new_weights

In [33]:
# Configuraciones iniciales
feature_names = ['RMS', 'Skewness', 'Variance', 'StandardDeviation', 'ImpulseFactor']
stds = SF[feature_names].std()

# Estado inicial con Health = 1.0 desde la primera fila
x0 = SF.iloc[0][feature_names].tolist() + [1.0]

# Inicializar partículas y pesos
N = 900
particles = np.tile(np.array(x0), (N, 1))  # shape (N, 6)
weights = np.ones(N) / N

# Lista para guardar estimaciones de Health
health_estimates = []

# Recorrer todas las observaciones reales de SF
for idx, row in SF.iterrows():
    # 1. Predecir estado de cada partícula
    for i in range(N):
        particles[i] = state_transition(particles[i], stds)

    # 2. Obtener observación real
    z_obs = row[feature_names].values

    # 3. Actualizar pesos
    weights = update_weights(particles, weights, z_obs, stds)

    # 4. Estimar estado promedio (x̂)
    x_hat = np.average(particles, axis=0, weights=weights)

    # 5. Guardar solo el valor de Health estimado
    health_estimates.append(x_hat[5])

# Convertir resultados a DataFrame
health_df = pd.DataFrame({
    'Step': np.arange(len(health_estimates)),
    'Estimated_Health': health_estimates
})

# Mostrar los primeros resultados
print(health_df.head())

KeyboardInterrupt: 

In [16]:
plt.plot(health_df['Step'], health_df['Estimated_Health'], label='Health estimado')
plt.axhline(0.95, color='r', linestyle='--', label='Umbral de alerta')
plt.xlabel('Paso')
plt.ylabel('Estimated Health')
plt.title('Evolución de Health con datos saludables')
plt.legend()
plt.grid(True)
plt.show()


In [34]:
health_estimates_falla = []

for idx, row in SF['Sinal-C0'].iterrows():
    for i in range(N):
        particles[i] = state_transition(particles[i], stds)

    z_obs = row[feature_names].values
    weights = update_weights(particles, weights, z_obs, stds)
    x_hat = np.average(particles, axis=0, weights=weights)

    health_estimates_falla.append(x_hat[5])  # Health estimado


KeyError: 'Sinal-C0'

In [35]:

# Señal original con falla (ejemplo, debes reemplazarla con la tuya)
# signal_falla = ...  # array 1D de datos crudos de vibración

# Número de muestras por ventana usada para las estadísticas
window_size = 1024

# Lista de Estimated_Health generada al pasar el filtro sobre DF_falla
# health_estimates_falla = [...]

# Crear eje de tiempo para cada ventana (posición central de cada ventana)
health_times = np.arange(len(health_estimates_falla)) * window_size + window_size // 2

# Crear figura
plt.figure(figsize=(14, 6))

# Señal original
plt.plot(SF['Sinal-C0'], label='Señal de vibración', alpha=0.5)

# Overlay de Health (reescalado para visualización, opcional)
scaled_health = np.interp(health_times, np.arange(len(SF['Sinal-C0'])), np.array(health_estimates_falla))
plt.plot(health_times, scaled_health, color='red', label='Estimated Health', linewidth=2)

# Umbral visual
plt.axhline(y=0.95, color='r', linestyle='--', label='Umbral de alerta')

plt.xlabel('Muestra')
plt.ylabel('Amplitud / Health')
plt.title('Señal original y estimación de Health sobre la vibración')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()


KeyError: 'Sinal-C0'