In [None]:
import cv2
import os
import json
import matplotlib.pyplot as plt
import mediapipe as mp
import pandas as pd
import numpy as np
from pprint import pprint
from tqdm import tqdm

Saving images with mask

In [None]:
# Paths
image_folder = r"D:\PAIND\DATA\20240527_Data_Prozess_01\webcam\batch"
output_folder = r"D:\PAIND\DATA\20240527_Data_Prozess_01\webcam\labeled_regions"
mask_folder = r"D:\PAIND\DATA\20240527_Data_Prozess_01\webcam\batch\output_grayscale_gaussian"

# Masks as file paths
masks = {
    "filled_mask_below": os.path.join(mask_folder, "filled_mask_below.png"),
    "filled_mask_above": os.path.join(mask_folder, "filled_mask_above.png"),
    "deducted_mask_outside": os.path.join(mask_folder, "deducted_mask_outside.png"),
    "mask_inside": os.path.join(mask_folder, "mask_inside.png"),
}

# Ensure the output folder exists
os.makedirs(output_folder, exist_ok=True)

# Load and validate masks
loaded_masks = {}
for mask_name, mask_path in masks.items():
    mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
    if mask is None:
        raise FileNotFoundError(f"Mask '{mask_name}' not found at {mask_path}.")
    loaded_masks[mask_name] = mask

# Process images
images = sorted([img for img in os.listdir(image_folder) if img.endswith(".jpg")])

for img_name in tqdm(images, desc="Processing Images"):
    img_path = os.path.join(image_folder, img_name)
    image = cv2.imread(img_path)

    for region_name, mask in loaded_masks.items():
        # Ensure the mask matches the image dimensions
        if mask.shape[:2] != image.shape[:2]:
            mask = cv2.resize(mask, (image.shape[1], image.shape[0]), interpolation=cv2.INTER_NEAREST)

        # Apply the mask to the image
        region = cv2.bitwise_and(image, image, mask=mask)

        # Save the region
        region_path = os.path.join(output_folder, f"{region_name}_{img_name}")
        cv2.imwrite(region_path, region)

print(f"Region-specific images saved to {output_folder}")


In [3]:
masks = {
    "filled_mask_below": os.path.join(mask_folder, "filled_mask_below.png"),
    "filled_mask_above": os.path.join(mask_folder, "filled_mask_above.png"),
    "mask_inside": os.path.join(mask_folder, "mask_inside.png"),
}

Analysis

In [None]:
# Paths
image_folder = r"D:\PAIND\DATA\20240527_Data_Prozess_01\webcam\labeled_regions\inside"
output_analysis_folder = r"D:\PAIND\DATA\20240527_Data_Prozess_01\webcam\labeled_regions\inside\hand_detection_mediapipe"
os.makedirs(output_analysis_folder, exist_ok=True)

# Import only required components from MediaPipe
from mediapipe.python.solutions.hands import Hands

# Initialize MediaPipe Hands
hands = Hands(static_image_mode=True, max_num_hands=2, min_detection_confidence=0.5)

# Process the images
results = []
image_filenames = sorted([img for img in os.listdir(image_folder) if img.endswith(".jpg")])

for img_name in tqdm(image_filenames, desc="Processing Images for Hand Detection"):
    img_path = os.path.join(image_folder, img_name)
    image = cv2.imread(img_path)
    if image is None:
        continue

    # Convert image to RGB
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    # Process the image with MediaPipe Hands
    result = hands.process(image_rgb)

    # Analyze results
    hand_detected = False
    hand_bounding_boxes = []

    if result.multi_hand_landmarks:
        hand_detected = True
        for hand_landmarks in result.multi_hand_landmarks:
            # Calculate bounding box
            x_min = min([lm.x for lm in hand_landmarks.landmark]) * image.shape[1]
            y_min = min([lm.y for lm in hand_landmarks.landmark]) * image.shape[0]
            x_max = max([lm.x for lm in hand_landmarks.landmark]) * image.shape[1]
            y_max = max([lm.y for lm in hand_landmarks.landmark]) * image.shape[0]
            hand_bounding_boxes.append([int(x_min), int(y_min), int(x_max - x_min), int(y_max - y_min)])

    # Save the result
    result_entry = {
        "image": img_name,
        "result": "Hand detected" if hand_detected else "No hand detected",
        "bounding_boxes": hand_bounding_boxes
    }
    results.append(result_entry)

# Save results to a JSON file
output_json_path = os.path.join(output_analysis_folder, "hand_detection_mediapipe.json")
with open(output_json_path, "w") as json_file:
    json.dump(results, json_file, indent=4)

print(f"Hand detection analysis saved to: {output_json_path}")

# Release MediaPipe resources
hands.close()


In [None]:
# Paths
hand_detection_results_path = r"D:\PAIND\DATA\20240527_Data_Prozess_01\webcam\labeled_regions\inside\hand_detection_mediapipe\hand_detection_mediapipe.json"
output_plot_path = r"D:\PAIND\DATA\20240527_Data_Prozess_01\webcam\labeled_regions\inside\hand_detection_timeseries.png"

# Function to extract timestamp from image name
def extract_timestamp_from_image_name(image_name):
    """
    Extracts a timestamp from an image name with the format:
    "prefix_YYYYMMDD_HHMMSSfff_suffix.jpg"
    """
    try:
        timestamp_str = image_name.split('_')[2]
        print(timestamp_str)
        time_str = image_name.split('_')[3][:9]  # Extract HHMMSSfff
        print(time_str)
        print(f"{timestamp_str[:4]}-{timestamp_str[4:6]}-{timestamp_str[6:]} {time_str[:2]}:{time_str[2:4]}:{time_str[4:6]}.{time_str[6:]}")
        return pd.Timestamp(f"{time_str[:2]}:{time_str[2:4]}:{time_str[4:6]}.{time_str[6:]}")
    except Exception as e:
        print(f"Error extracting timestamp from {image_name}: {e}")
        return None

# Load hand detection results
with open(hand_detection_results_path, "r") as f:
    hand_detection_results = json.load(f)

# Process results to create a DataFrame
data = []
for entry in hand_detection_results:
    timestamp = extract_timestamp_from_image_name(entry["image"])
    hand_detected = entry["result"] == "Hand detected"
    data.append({"Timestamp": timestamp, "Hand Detected": hand_detected})

df = pd.DataFrame(data)

# Ensure timestamps are sorted
df = df.sort_values("Timestamp").reset_index(drop=True)

# Create a time series plot
plt.figure(figsize=(15, 6))
plt.plot(df["Timestamp"], df["Hand Detected"].astype(int), label="Hand Detected (1=True, 0=False)", marker='o')
plt.xlabel("Timestamp")
plt.ylabel("Hand Detection")
plt.title("Hand Detection Over Time")
plt.grid()
plt.legend()
plt.tight_layout()

# Save the plot
plt.savefig(output_plot_path)
plt.show()

print(f"Time series plot saved to {output_plot_path}")


In [None]:
# Filter DataFrame for rows where hands were detected
hands_detected_df = df[df["Hand Detected"]]

# Print the filtered rows
print("Rows where hands were detected:")
print(hands_detected_df)

# Optionally, save the filtered rows to a CSV file for analysis
output_csv_path = r"D:\PAIND\DATA\20240527_Data_Prozess_01\webcam\labeled_regions\inside\hands_detected.csv"
hands_detected_df.to_csv(output_csv_path, index=False)
print(f"Filtered rows saved to {output_csv_path}")


for mediapipe (above) \_\_init__ files had to be adapted to remove audio imports and similar for the module to load
venv with adapted modules provided in zip file

In [None]:
# Paths
output_analysis_folder = os.path.join(output_folder, "analysis_results")
os.makedirs(output_analysis_folder, exist_ok=True)

# State and object detection methods
methods = ["spinning_stationary", "alignment_detection", "foreign_object_detection"]
results = {method: [] for method in methods}

# Define detection functions
def detect_spinning_or_stationary(image, prev_image, threshold=30):
    """
    Detect whether the reel is spinning or stationary by comparing current frame
    with a previous frame using frame differencing.
    """
    gray_current = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    gray_prev = cv2.cvtColor(prev_image, cv2.COLOR_BGR2GRAY)
    diff = cv2.absdiff(gray_current, gray_prev)
    _, thresh = cv2.threshold(diff, threshold, 255, cv2.THRESH_BINARY)
    motion_pixels = cv2.countNonZero(thresh)
    return "Spinning" if motion_pixels > 500 else "Stationary"

def detect_cable_alignment(image):
    """
    Detect cable alignment based on contours and relative positions.
    """
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    _, thresh = cv2.threshold(gray, 127, 255, cv2.THRESH_BINARY)
    contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    if len(contours) == 0:
        return "No cable detected"
    largest_contour = max(contours, key=cv2.contourArea)
    rect = cv2.boundingRect(largest_contour)
    aspect_ratio = rect[2] / rect[3]
    if aspect_ratio > 3:
        return "Misaligned"
    elif aspect_ratio < 0.5:
        return "Trapped"
    else:
        return "Aligned"

def detect_foreign_objects(image):
    """
    Detect foreign objects such as hands, tools, or debris using color and contour analysis.
    """
    hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
    detections = []
    lower_skin = np.array([0, 20, 70], dtype=np.uint8)
    upper_skin = np.array([20, 255, 255], dtype=np.uint8)
    mask_skin = cv2.inRange(hsv, lower_skin, upper_skin)
    if cv2.countNonZero(mask_skin) > 500:
        detections.append("Hand detected")
    lower_red1 = np.array([0, 120, 70], dtype=np.uint8)
    upper_red1 = np.array([10, 255, 255], dtype=np.uint8)
    lower_red2 = np.array([170, 120, 70], dtype=np.uint8)
    upper_red2 = np.array([180, 255, 255], dtype=np.uint8)
    mask_red = cv2.inRange(hsv, lower_red1, upper_red1) + cv2.inRange(hsv, lower_red2, upper_red2)
    if cv2.countNonZero(mask_red) > 500:
        detections.append("Tools detected")
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    _, bright_spots = cv2.threshold(gray, 200, 255, cv2.THRESH_BINARY)
    if cv2.countNonZero(bright_spots) > 500:
        detections.append("Debris above reel")
    return detections if detections else ["No foreign objects detected"]

# Process images with masks sequentially
batch_size = 50
image_filenames = sorted([img for img in os.listdir(image_folder) if img.endswith(".jpg")])
num_images = len(image_filenames)

for mask_name, mask_path in tqdm(masks.items(), desc="Processing Masks"):
    mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
    if mask is None:
        raise FileNotFoundError(f"Mask not found: {mask_path}")

    prev_image = None

    for i in tqdm(range(0, num_images, batch_size), desc=f"Processing {mask_name}"):
        batch_images = image_filenames[i:i + batch_size]
        for img_name in batch_images:
            img_path = os.path.join(image_folder, img_name)
            image = cv2.imread(img_path)
            if image is None:
                print(f"Error loading image: {img_name}")
                continue
            if mask.shape[:2] != image.shape[:2]:
                mask = cv2.resize(mask, (image.shape[1], image.shape[0]), interpolation=cv2.INTER_NEAREST)
            masked_image = cv2.bitwise_and(image, image, mask=mask)
            spinning_stationary = None
            if prev_image is not None:
                spinning_stationary = detect_spinning_or_stationary(masked_image, prev_image)
            # Only apply cable alignment detection for the "filled_mask_below"
            cable_state = None
            if mask_name == "filled_mask_below":
                cable_state = detect_cable_alignment(masked_image)
            foreign_objects = detect_foreign_objects(masked_image)
            results["spinning_stationary"].append({"image": img_name, "result": spinning_stationary, "mask": mask_name})
            if cable_state is not None:
                results["alignment_detection"].append({"image": img_name, "result": cable_state, "mask": mask_name})
            results["foreign_object_detection"].append({"image": img_name, "result": foreign_objects, "mask": mask_name})
            prev_image = masked_image

# Save analysis results
for method, method_results in results.items():
    output_json = os.path.join(output_analysis_folder, f"{method}_analysis.json")
    with open(output_json, "w") as f:
        json.dump(method_results, f, indent=4)

print(f"Analysis results saved to {output_analysis_folder}")


In [None]:
path1 = r'D:\PAIND\DATA\20240527_Data_Prozess_01\webcam\labeled_regions\analysis_results\spinning_stationary_analysis.json'
path2 = r'D:\PAIND\DATA\20240527_Data_Prozess_01\webcam\labeled_regions\analysis_results\foreign_object_detection_analysis.json'
path3 = r'D:\PAIND\DATA\20240527_Data_Prozess_01\webcam\labeled_regions\analysis_results\alignment_detection_analysis.json'

with open(path1, 'r') as f:
    results1 = json.load(f)
with open(path2, 'r') as f:
    results2 = json.load(f)
with open(path3, 'r') as f:
    results3 = json.load(f)

print("\nresults:")
pprint(results1[:1])
print("\nresults:")
pprint(results2[:1])
print("\nresults:")
pprint(results3[:1])


In [None]:
# Paths
analysis_results_folder = r"D:\PAIND\DATA\20240527_Data_Prozess_01\webcam\labeled_regions\analysis_results"
output_visualization_folder = r"D:\PAIND\DATA\20240527_Data_Prozess_01\webcam\labeled_regions\visualizations"
os.makedirs(output_visualization_folder, exist_ok=True)

# Function to extract timestamps and features and log them as time series
def process_and_plot_results_with_varied_formats():
    """
    Processes analysis results and logs detected features as a time series graph.
    Handles varied formats of the `result` field.
    """
    time_series_data = []

    # Iterate through JSON result files in the analysis results folder
    for result_file in os.listdir(analysis_results_folder):
        if result_file.endswith("_analysis.json"):
            result_path = os.path.join(analysis_results_folder, result_file)

            # Load the analysis results
            with open(result_path, "r") as f:
                analysis_results = json.load(f)

            # Extract timestamps and detected features
            for result in analysis_results:
                image_name = result["image"]

                # Extract timestamp from the image name
                timestamp = image_name.split("_")[1]  # Format: HHMMSS
                feature_data = result.get("result")

                # Handle different formats of the `result` field
                if feature_data:
                    if isinstance(feature_data, str):  # Single feature
                        time_series_data.append({"Timestamp": timestamp, "Feature": feature_data})
                    elif isinstance(feature_data, list):  # Multiple features
                        for feature in feature_data:
                            time_series_data.append({"Timestamp": timestamp, "Feature": feature})

    # Convert the time-series data into a DataFrame
    if not time_series_data:
        print("No features detected in the results.")
        return

    df = pd.DataFrame(time_series_data)

    # Aggregate feature counts over time
    feature_counts = df.groupby(["Timestamp", "Feature"]).size().reset_index(name="Count")

    # Plot each feature as a time series
    for feature, group in feature_counts.groupby("Feature"):
        plt.figure(figsize=(10, 6))
        plt.plot(group["Timestamp"], group["Count"], marker="o", label=feature)
        plt.title(f"Time Series for Feature: {feature}")
        plt.xlabel("Timestamp")
        plt.ylabel("Frequency")
        plt.xticks(rotation=45)
        plt.grid()
        plt.legend()
        output_path = os.path.join(output_visualization_folder, f"time_series_{feature}.png")
        plt.savefig(output_path)
        plt.close()

        print(f"Time series plot saved for feature '{feature}' at {output_path}")

# Execute the function
process_and_plot_results_with_varied_formats()
