In [None]:
from google.colab import drive
drive.mount('/content/drive')

# !pip install gdown
# !gdown --id 1RggdfEs4RyBQKNELokmhCvJ2dUY9KsDl

# 1RggdfEs4RyBQKNELokmhCvJ2dUY9KsDl

Mounted at /content/drive


In [None]:
import zipfile
import os

zip_path = "/content/drive/MyDrive/Dataset.zip"
extract_path = "/content/extracted_dataset"

# Check if zip file exists
if not os.path.exists(zip_path):
    print(f"Error: Zip file '{zip_path}' does not exist.")
    exit()

# Check contents of the zip file
print("Contents of the zip file:")
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    for member in zip_ref.namelist():
        print(member)

# Create extraction directory if it doesn't exist
os.makedirs(extract_path, exist_ok=True)

# Extract the zip file
print(f"\nExtracting zip file to: {extract_path}")
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)
print("Extraction completed!")

# Check contents of the extracted directory
print(f"\nContents of the extracted directory '{extract_path}':")
if os.path.exists(extract_path):
    for root, dirs, files in os.walk(extract_path):
        level = root.replace(extract_path, '').count(os.sep)
        indent = ' ' * 4 * (level)
        # print('{}{}/'.format(indent, os.path.basename(root)))
        subindent = ' ' * 4 * (level + 1)
        # for f in files:
        #     print('{}{}'.format(subindent, f))
else:
    print(f"Extraction directory '{extract_path}' does not exist.")

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Dataset/250 RPM/Fault/Outer Ring/Distance 91.5 from machine _ Height 120 cm /frame_0723.png
Dataset/250 RPM/Fault/Outer Ring/Distance 91.5 from machine _ Height 120 cm /frame_0728.png
Dataset/250 RPM/Fault/Outer Ring/Distance 91.5 from machine _ Height 120 cm /frame_0731.png
Dataset/250 RPM/Fault/Outer Ring/Distance 91.5 from machine _ Height 120 cm /frame_0732.png
Dataset/250 RPM/Fault/Outer Ring/Distance 91.5 from machine _ Height 120 cm /frame_0733.png
Dataset/250 RPM/Fault/Outer Ring/Distance 91.5 from machine _ Height 120 cm /frame_0734.png
Dataset/250 RPM/Fault/Outer Ring/Distance 91.5 from machine _ Height 120 cm /frame_0737.png
Dataset/250 RPM/Fault/Outer Ring/Distance 91.5 from machine _ Height 120 cm /frame_0738.png
Dataset/250 RPM/Fault/Outer Ring/Distance 91.5 from machine _ Height 120 cm /frame_0739.png
Dataset/250 RPM/Fault/Outer Ring/Distance 91.5 from machine _ Height 120 cm /frame_0742.png
Dataset/250 RPM

In [None]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from scipy.ndimage import gaussian_filter1d
from scipy.fft import fft, fftfreq
from scipy.signal import spectrogram
import os
from scipy.stats import skew, kurtosis, entropy
import re

# -------------------------------------------------------------------
# 1. CONFIGURATION - MODIFY THESE VALUES
# -------------------------------------------------------------------
# The root directory containing your dataset (e.g., .../250 RPM/)
DATA_ROOT = "/content/extracted_dataset/Dataset/250 RPM"

# A single base directory where all output folders will be created
BASE_OUTPUT_DIR = "/content/drive/MyDrive/Video_Analysis_Outputs"

# Frames Per Second (FPS) of the original videos.
FPS = 30

# Define the Region of Interest (ROI) you want to analyze.
# Format: (x_coordinate, y_coordinate, width, height)
# To find these values, open a sample frame and use an image editor
# to find the pixel coordinates of the top-left corner and the box size.
ROI = (100, 150, 200, 200) # EXAMPLE - CHANGE THIS

# Eulerian Magnification Parameters
ALPHA = 30
FILTER_SIZE = 5
# -------------------------------------------------------------------

# --- Define paths for organized output folders ---
EULERIAN_VIDEO_OUTPUT_DIR = os.path.join(BASE_OUTPUT_DIR, "Eulerian_Videos")
OPTICAL_FLOW_VIDEO_OUTPUT_DIR = os.path.join(BASE_OUTPUT_DIR, "Optical_Flow_Videos")
ANALYSIS_PLOT_OUTPUT_DIR = os.path.join(BASE_OUTPUT_DIR, "Analysis_Plots")
CSV_OUTPUT_DIR = os.path.join(BASE_OUTPUT_DIR, "CSV_Features")

# --- Create output directories if they don't exist ---
for path in [EULERIAN_VIDEO_OUTPUT_DIR, OPTICAL_FLOW_VIDEO_OUTPUT_DIR, ANALYSIS_PLOT_OUTPUT_DIR, CSV_OUTPUT_DIR]:
    os.makedirs(path, exist_ok=True)


# Function to load and sort frames from a directory
def load_frames_from_directory(directory_path):
    frames = []
    filepaths = sorted(
        [os.path.join(directory_path, f) for f in os.listdir(directory_path) if f.endswith('.png')],
        key=lambda f: int(re.search(r'(\d+)', os.path.basename(f)).group(1)) # Natural sort
    )
    if not filepaths:
        return None
    for f_path in filepaths:
        frame = cv2.imread(f_path)
        if frame is not None:
            frames.append(frame)
    print(f"Loaded {len(frames)} frames from {os.path.basename(directory_path)}.")
    return frames

# Function to crop frames to the predefined ROI
# def crop_frames_to_roi(frames, roi):
#     x, y, w, h = roi
#     cropped_frames = [frame[y:y+h, x:x+w] for frame in frames]
#     return cropped_frames

# Eulerian magnification
def eulerian_magnification(frames, alpha=30, filter_size=5):
    float_frames = [np.float32(frame) for frame in frames]
    enhanced_frames = []
    for frame in float_frames:
        smoothed = gaussian_filter1d(frame, filter_size, axis=0)
        smoothed = gaussian_filter1d(smoothed, filter_size, axis=1)
        amplified = (frame - smoothed) * alpha
        enhanced = np.clip(smoothed + amplified, 0, 255)
        enhanced_frames.append(enhanced.astype(np.uint8))
    return enhanced_frames

# Optical Flow-based vertical displacement
def extract_vertical_displacement(frames):
    vertical_signal = []
    if not frames: return np.array([])
    prev_gray = cv2.cvtColor(frames[0], cv2.COLOR_BGR2GRAY).astype(np.float32)
    for frame in frames[1:]:
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY).astype(np.float32)
        flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
        vertical_movement = flow[..., 1].mean()
        vertical_signal.append(vertical_movement)
        prev_gray = gray
    return np.array(vertical_signal)

# Save video (magnified or optical flow)
def save_video(frames, output_path, fps, is_color=True):
    if not frames: return
    h, w = frames[0].shape[:2]
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (w, h), isColor=is_color)
    for frame in frames:
        out.write(frame)
    out.release()
    print(f"-> Saved video to {output_path}")

def save_optical_flow_video(frames, output_path, fps):
    if not frames: return
    h, w, _ = frames[0].shape
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (w, h), isColor=True)
    prev_gray = cv2.cvtColor(frames[0], cv2.COLOR_BGR2GRAY)

    for next_frame_color in frames[1:]:
        next_gray = cv2.cvtColor(next_frame_color, cv2.COLOR_BGR2GRAY)
        flow = cv2.calcOpticalFlowFarneback(prev_gray, next_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
        mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
        hsv = np.zeros_like(frames[0])
        hsv[..., 1] = 255
        hsv[..., 0] = ang * 180 / np.pi / 2
        hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
        bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
        out.write(bgr)
        prev_gray = next_gray
    out.release()
    print(f"-> Saved optical flow video to {output_path}")

# Signal analysis and plotting functions (same as before)
def analyze_signal(signal, fps):
    N = len(signal)
    if N == 0: return {}, {}, None, None, None
    T = 1.0 / fps
    yf = fft(signal)
    xf = fftfreq(N, T)[:N // 2]
    amplitude = 2.0 / N * np.abs(yf[:N // 2])
    norm_amp = amplitude / np.sum(amplitude) if np.sum(amplitude) != 0 else amplitude
    freq_features = {
        "Total Energy": np.sum(amplitude ** 2),
        "Max Amplitude": np.max(amplitude) if amplitude.size > 0 else 0,
        "Frequency of Max Amp": xf[np.argmax(amplitude)] if amplitude.size > 0 else 0,
        "Spectral Centroid": np.sum(xf * amplitude) / np.sum(amplitude) if np.sum(amplitude) != 0 else 0,
        "Spectral Entropy": entropy(norm_amp) if norm_amp.size > 0 else 0
    }
    time_features = {"Mean": np.mean(signal), "Std Dev": np.std(signal), "Variance": np.var(signal), "Skewness": skew(signal), "Kurtosis": kurtosis(signal), "Peak-to-Peak": np.ptp(signal)}
    f, t, Sxx = spectrogram(signal, fs=fps, nperseg=min(N, 64))
    return time_features, freq_features, xf, amplitude, (f, t, Sxx)

def plot_and_save_results(signal, xf, amplitude, spectrogram_data, features, output_path):
    plt.style.use('seaborn-v0_8-whitegrid')
    fig, axs = plt.subplots(3, 1, figsize=(12, 12))
    axs[0].plot(signal, color='royalblue')
    axs[0].set_title("Vibration Time Series (Vertical Displacement)", fontsize=14)
    axs[0].set_xlabel("Frame Number", fontsize=10)
    axs[0].set_ylabel("Mean Vertical Flow", fontsize=10)
    axs[1].plot(xf, amplitude, color='green')
    axs[1].set_title("Frequency Spectrum (FFT)", fontsize=14)
    axs[1].set_xlabel("Frequency (Hz)", fontsize=10)
    axs[1].set_ylabel("Amplitude", fontsize=10)
    f, t, Sxx = spectrogram_data
    spec = axs[2].pcolormesh(t, f, 10 * np.log10(Sxx + 1e-9), shading='gouraud', cmap='viridis')
    axs[2].set_title("Spectrogram", fontsize=14)
    axs[2].set_ylabel("Frequency [Hz]", fontsize=10)
    axs[2].set_xlabel("Time [sec]", fontsize=10)
    fig.colorbar(spec, ax=axs[2], label='Intensity (dB)')
    features_text = '\n'.join([f'{k}: {v:.4f}' for k, v in features.items()])
    props = dict(boxstyle='round', facecolor='wheat', alpha=0.5)
    fig.text(0.5, -0.01, features_text, transform=fig.transFigure, fontsize=10, verticalalignment='top', bbox=props, ha='center')
    plt.tight_layout(rect=[0, 0.05, 1, 1])
    plt.savefig(output_path, bbox_inches='tight')
    plt.close(fig)
    print(f"-> Saved plot to {output_path}")

# -------------------------------------------------------------------
# MAIN PROCESSING LOOP
# -------------------------------------------------------------------
def process_all_directories(root_dir):
    for dirpath, dirnames, filenames in os.walk(root_dir):
        if any(f.lower().endswith('.png') for f in filenames):
            print(f"\n{'='*50}\nProcessing Directory: {dirpath}\n{'='*50}")

            # Create a unique base name from the folder structure
            relative_path = os.path.relpath(dirpath, root_dir)
            base_filename = relative_path.replace(os.sep, '_')

            try:
                frames = load_frames_from_directory(dirpath)
                if not frames: continue

                print("Cropping frames to ROI...")
                # roi_frames = crop_frames_to_roi(frames, ROI)

                print("Applying Eulerian Magnification...")
                enhanced_frames = eulerian_magnification(frames, alpha=ALPHA, filter_size=FILTER_SIZE)

                print("Calculating vertical displacement signal...")
                vibration_signal = extract_vertical_displacement(enhanced_frames)

                print("Analyzing signal...")
                time_f, freq_f, xf, amp, spec_data = analyze_signal(vibration_signal, FPS)
                if time_f is None: continue

                # Save all outputs to their respective centralized folders
                print("Saving results...")
                # 1. Eulerian Video
                euler_path = os.path.join(EULERIAN_VIDEO_OUTPUT_DIR, f"{base_filename}_eulerian.mp4")
                save_video(enhanced_frames, euler_path, FPS)

                # 2. Optical Flow Video
                flow_path = os.path.join(OPTICAL_FLOW_VIDEO_OUTPUT_DIR, f"{base_filename}_flow.mp4")
                save_optical_flow_video(enhanced_frames, flow_path, FPS)

                # 3. Features CSV
                all_features = {**time_f, **freq_f}
                df_features = pd.DataFrame([all_features])
                csv_path = os.path.join(CSV_OUTPUT_DIR, f"{base_filename}_features.csv")
                df_features.to_csv(csv_path, index=False)
                print(f"-> Saved features to {csv_path}")

                # 4. Analysis Plot
                plot_path = os.path.join(ANALYSIS_PLOT_OUTPUT_DIR, f"{base_filename}_analysis.png")
                plot_and_save_results(vibration_signal, xf, amp, spec_data, all_features, plot_path)

            except Exception as e:
                print(f"!!! An error occurred while processing {dirpath}: {e}")
                continue
    print("\n\nProcessing complete for all directories.")

# --- Run the main process ---
process_all_directories(DATA_ROOT)


Processing Directory: /content/extracted_dataset/Dataset/250 RPM/Fault/10 g/Angel from machine 49.7 _ Angel from glass 61.5 _ Distance 91.5 _ Height 120 
Loaded 3600 frames from Angel from machine 49.7 _ Angel from glass 61.5 _ Distance 91.5 _ Height 120 .
Cropping frames to ROI...
Applying Eulerian Magnification...
Calculating vertical displacement signal...
Analyzing signal...
Saving results...
-> Saved video to /content/drive/MyDrive/Video_Analysis_Outputs/Eulerian_Videos/Fault_10 g_Angel from machine 49.7 _ Angel from glass 61.5 _ Distance 91.5 _ Height 120 _eulerian.mp4
-> Saved optical flow video to /content/drive/MyDrive/Video_Analysis_Outputs/Optical_Flow_Videos/Fault_10 g_Angel from machine 49.7 _ Angel from glass 61.5 _ Distance 91.5 _ Height 120 _flow.mp4
-> Saved features to /content/drive/MyDrive/Video_Analysis_Outputs/CSV_Features/Fault_10 g_Angel from machine 49.7 _ Angel from glass 61.5 _ Distance 91.5 _ Height 120 _features.csv
-> Saved plot to /content/drive/MyDrive/