<a href="https://colab.research.google.com/github/Poorvi4-art/Time_series_anamoly_detection/blob/main/anamoly_detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#  Import Libraries

import os
import numpy as np
import pandas as pd
from datetime import datetime
import matplotlib.pyplot as plt
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
# import warnings
# warnings.filterwarnings('ignore')

In [None]:
# Load and Process All Snapshot Files

folder_path =folder_path = 'archive/1st_test'  # Change this path if your files are elsewhere

summary_records = []
file_list = sorted(os.listdir(folder_path))

print(f"Found {len(file_list)} snapshot files in the folder.")

for index, fname in enumerate(file_list):
    if index % 100 == 0:  # Print progress every 100 files
        print(f"  Processed {index}/{len(file_list)} files...")

    full_path = os.path.join(folder_path, fname)

    try:
        with open(full_path, 'r') as f:
            text_data = f.read()

        readings = np.array(text_data.split(), dtype=float)

        mean_val = readings.mean()
        std_val = readings.std()
        min_val = readings.min()
        max_val = readings.max()

        timestamp = datetime.strptime(fname, '%Y.%m.%d.%H.%M.%S')

        summary_records.append({
            'filename': fname,
            'timestamp': timestamp,
            'mean': mean_val,
            'std_dev': std_val,
            'min': min_val,
            'max': max_val
        })
    except Exception as e:
        print(f"Error processing {fname}: {e}")

df_features = pd.DataFrame(summary_records)
df_features = df_features.sort_values('timestamp').reset_index(drop=True)

print(f"\nSuccessfully processed {len(df_features)} files!")
print(f"Data spans from {df_features['timestamp'].min()} to {df_features['timestamp'].max()}")


FileNotFoundError: [Errno 2] No such file or directory: 'C:\\Users\\Admin\\Desktop\\Assignment\\archive\\1st_test'

In [None]:
# Display Feature Summary

print("\nFeature Summary Statistics:")
print("=" * 60)
print(df_features[['mean', 'std_dev', 'min', 'max']].describe())

print("\nFirst few rows of extracted features:")
print(df_features.head(10))

print("\nLast few rows of extracted features:")
print(df_features.tail(10))

In [None]:
# Visualize Mean Vibration Over Time

plt.figure(figsize=(16, 6))
plt.plot(df_features['timestamp'], df_features['mean'], linewidth=2, color='steelblue', label='Mean Vibration')
plt.fill_between(df_features['timestamp'], df_features['mean'], alpha=0.3, color='steelblue')
plt.xlabel('Time', fontsize=12, fontweight='bold')
plt.ylabel('Mean Vibration Amplitude', fontsize=12, fontweight='bold')
plt.title('Mean Vibration Over Time - Is the Bearing Getting Worse?', fontsize=14, fontweight='bold')
plt.legend(fontsize=11)
plt.grid(True, alpha=0.3)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

print("Observation: Look for gradual increases or sudden spikes â€” these indicate bearing degradation.")

In [None]:
#  Visualize Standard Deviation Over Time

plt.figure(figsize=(16, 6))
plt.plot(df_features['timestamp'], df_features['std_dev'], linewidth=2, color='coral', label='Std Deviation')
plt.fill_between(df_features['timestamp'], df_features['std_dev'], alpha=0.3, color='coral')
plt.xlabel('Time', fontsize=12, fontweight='bold')
plt.ylabel('Standard Deviation', fontsize=12, fontweight='bold')
plt.title('Vibration Variability Over Time - How Erratic is the Motion?', fontsize=14, fontweight='bold')
plt.legend(fontsize=11)
plt.grid(True, alpha=0.3)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

print("Observation: Increasing std dev means the bearing is moving more unpredictably.")

In [None]:
# Visualize Feature Distributions

fig, axes = plt.subplots(2, 2, figsize=(14, 10))
fig.suptitle('Distribution of Features Across All Snapshots', fontsize=16, fontweight='bold')

axes[0, 0].hist(df_features['mean'], bins=50, color='steelblue', edgecolor='black', alpha=0.7)
axes[0, 0].set_title('Mean Amplitude Distribution')
axes[0, 0].set_xlabel('Mean Value')
axes[0, 0].set_ylabel('Frequency')
axes[0, 0].grid(alpha=0.3)

axes[0, 1].hist(df_features['std_dev'], bins=50, color='coral', edgecolor='black', alpha=0.7)
axes[0, 1].set_title('Standard Deviation Distribution')
axes[0, 1].set_xlabel('Std Dev Value')
axes[0, 1].set_ylabel('Frequency')
axes[0, 1].grid(alpha=0.3)

axes[1, 0].hist(df_features['min'], bins=50, color='lightgreen', edgecolor='black', alpha=0.7)
axes[1, 0].set_title('Minimum Value Distribution')
axes[1, 0].set_xlabel('Min Value')
axes[1, 0].set_ylabel('Frequency')
axes[1, 0].grid(alpha=0.3)

axes[1, 1].hist(df_features['max'], bins=50, color='salmon', edgecolor='black', alpha=0.7)
axes[1, 1].set_title('Maximum Value Distribution')
axes[1, 1].set_xlabel('Max Value')
axes[1, 1].set_ylabel('Frequency')
axes[1, 1].grid(alpha=0.3)

plt.tight_layout()
plt.show()

print("Observation: These distributions help us understand what's 'normal' for this bearing.")

In [None]:
# Scale Features for Anomaly Detection

features_to_use = ['mean', 'std_dev', 'min', 'max']

scaler = StandardScaler()
scaled_features = scaler.fit_transform(df_features[features_to_use])

print("Features have been scaled to a standard distribution.")
print(f"Scaled features shape: {scaled_features.shape}")
print("\nWhat's scaling? Converting all features to a common scale (mean=0, std=1)")
print("This helps the anomaly detection algorithm work fairly across all features.")


In [None]:
# Train Isolation Forest Anomaly Detector

iso_forest = IsolationForest(contamination=0.05, random_state=42)
df_features['anomaly_score'] = iso_forest.fit_predict(scaled_features)

df_features['anomaly_flag'] = df_features['anomaly_score'] == -1

num_anomalies = df_features['anomaly_flag'].sum()
total_records = len(df_features)
anomaly_percentage = (num_anomalies / total_records) * 100

print("Isolation Forest Training Complete!")
print("=" * 60)
print(f"Total snapshots analyzed: {total_records}")
print(f"Anomalies detected: {num_anomalies} ({anomaly_percentage:.2f}%)")
print(f"Normal snapshots: {total_records - num_anomalies} ({100 - anomaly_percentage:.2f}%)")

In [None]:
# Display Detected Anomalies

anomaly_df = df_features[df_features['anomaly_flag']][['filename', 'timestamp', 'mean', 'std_dev']].copy()

print("\nAnomalous Snapshots Detected:")
print("=" * 60)
print(anomaly_df.to_string(index=False))

if len(anomaly_df) > 0:
    print(f"\nFirst anomaly detected at: {anomaly_df.iloc[0]['timestamp']}")
    print(f"Last anomaly detected at: {anomaly_df.iloc[-1]['timestamp']}")

In [None]:
# Visualize Anomalies on Mean Vibration Timeline


plt.figure(figsize=(16, 7))

normal_df = df_features[~df_features['anomaly_flag']]
anomaly_df = df_features[df_features['anomaly_flag']]

plt.plot(normal_df['timestamp'], normal_df['mean'], 'o-', color='green',
         linewidth=2, markersize=4, label='Normal', alpha=0.7)

if len(anomaly_df) > 0:
    plt.scatter(anomaly_df['timestamp'], anomaly_df['mean'], color='red',
               s=200, marker='X', label='Anomaly', zorder=5, edgecolors='darkred', linewidth=2)

plt.xlabel('Time', fontsize=12, fontweight='bold')
plt.ylabel('Mean Vibration Amplitude', fontsize=12, fontweight='bold')
plt.title('Anomaly Detection Results - Red X marks unusual behavior', fontsize=14, fontweight='bold')
plt.legend(fontsize=11, loc='upper left')
plt.grid(True, alpha=0.3)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()


In [None]:
# Visualize Anomalies on Std Deviation Timeline

plt.figure(figsize=(16, 7))

normal_df = df_features[~df_features['anomaly_flag']]
anomaly_df = df_features[df_features['anomaly_flag']]

plt.plot(normal_df['timestamp'], normal_df['std_dev'], 'o-', color='blue',
         linewidth=2, markersize=4, label='Normal', alpha=0.7)

if len(anomaly_df) > 0:
    plt.scatter(anomaly_df['timestamp'], anomaly_df['std_dev'], color='red',
               s=200, marker='X', label='Anomaly', zorder=5, edgecolors='darkred', linewidth=2)

plt.xlabel('Time', fontsize=12, fontweight='bold')
plt.ylabel('Standard Deviation', fontsize=12, fontweight='bold')
plt.title('Anomaly Detection on Variability - Are we detecting erratic behavior?', fontsize=14, fontweight='bold')
plt.legend(fontsize=11, loc='upper left')
plt.grid(True, alpha=0.3)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

In [None]:
# Time to Anomaly (Days Until First Anomaly)

if len(anomaly_df) > 0:
    start_time = df_features['timestamp'].min()
    first_anomaly_time = anomaly_df['timestamp'].min()

    days_to_anomaly = (first_anomaly_time - start_time).days
    hours_to_anomaly = ((first_anomaly_time - start_time).total_seconds()) / 3600

    print("Anomaly Timeline Analysis:")
    print("=" * 60)
    print(f"Test started at: {start_time}")
    print(f"First anomaly at: {first_anomaly_time}")
    print(f"Time elapsed: {days_to_anomaly} days, {hours_to_anomaly % 24:.1f} hours")
    print(f"Total hours: {hours_to_anomaly:.1f} hours")
else:
    print("No anomalies detected in this bearing test.")


In [None]:
#  Summary and Interpretation Guide

"""
What to look for in the results:

1. MEAN VIBRATION TREND:
   - Flat line = healthy bearing
   - Gradual increase = bearing wearing out slowly
   - Sharp spike = sudden change, could indicate failure

2. STANDARD DEVIATION TREND:
   - Increasing std dev = bearing moving unpredictably
   - High variability + high mean = likely failure imminent

3. RED X MARKERS (ANOMALIES):
   - These are snapshots where vibration pattern differs from normal
   - Clustered anomalies near the end = bearing approaching failure
   - Scattered anomalies = occasional disturbances (could be normal)

4. BUSINESS DECISION:
   - First anomaly = time to schedule maintenance inspection
   - Cluster of anomalies = replace part before failure
   - No anomalies = bearing still operating normally

KEY SUCCESS METRIC:
If anomalies cluster near the end of the test, our model is working!
It means we detected degradation before failure occurred.
"""
