In [6]:
import os
import json

# Parent folder path (you will provide this)
parent_folder = "/home/limonubuntu/Work/Limon/other_task/driver_anomaly/dataset/Training-20250217T042013Z-004/Training/semi_control_environment"  # Replace with the actual parent folder path

# Output text file to save results
output_file = "semi_control_env_training.txt"
corrupted_files_log = "corrupted_files_log.txt"

def check_json_for_objects(json_file_path):
    """Check for all specified objects and their states in the JSON file."""
    try:
        with open(json_file_path, 'r') as file:
            data = json.load(file)
            object_info = data.get("ObjectInfo", {}).get("BoundingBox", {})

            # Initialize counts for each object and state
            results = {
                "Face": object_info.get("Face", {}).get("isVisible", False),
                "Cigar": object_info.get("Cigar", {}).get("isVisible", False),
                "Phone": object_info.get("Phone", {}).get("isVisible", False),
                "Leye": object_info.get("Leye", {}).get("isVisible", False),
                "Leye_Open": object_info.get("Leye", {}).get("Opened", False),
                "Leye_Closed": not object_info.get("Leye", {}).get("Opened", True),
                "Reye": object_info.get("Reye", {}).get("isVisible", False),
                "Reye_Open": object_info.get("Reye", {}).get("Opened", False),
                "Reye_Closed": not object_info.get("Reye", {}).get("Opened", True),
                "Mouth": object_info.get("Mouth", {}).get("isVisible", False),
                "Mouth_Open": object_info.get("Mouth", {}).get("Opened", False),
                "Mouth_Closed": not object_info.get("Mouth", {}).get("Opened", True),
            }
            return results
    except (json.JSONDecodeError, FileNotFoundError):
        # Skip the file if it's empty, invalid JSON, or not found
        return None

def process_parent_folder(parent_folder):
    """Traverse through the parent folder and its subfolders to find JSON files."""
    detected_files = []
    corrupted_files = []
    counts = {
        "Face": 0,
        "Cigar": 0,
        "Phone": 0,
        "Leye": 0,
        "Leye_Open": 0,
        "Leye_Closed": 0,
        "Reye": 0,
        "Reye_Open": 0,
        "Reye_Closed": 0,
        "Mouth": 0,
        "Mouth_Open": 0,
        "Mouth_Closed": 0,
    }

    for root, _, files in os.walk(parent_folder):
        for file in files:
            if file.endswith(".json"):
                json_file_path = os.path.join(root, file)
                result = check_json_for_objects(json_file_path)

                if result is None:
                    # File is corrupted or empty
                    corrupted_files.append(json_file_path)
                else:
                    # Update counts for each object and state
                    for key in counts:
                        if result.get(key, False):
                            counts[key] += 1

                    # If any object is detected, save the file path
                    if any(result.values()):
                        relative_path = os.path.relpath(json_file_path, parent_folder)
                        detected_files.append(relative_path)

    return detected_files, corrupted_files, counts

def save_results(detected_files, corrupted_files, counts, output_file, corrupted_files_log):
    """Save the detected files, counts, and corrupted files log to text files."""
    # Save detected files and counts
    with open(output_file, 'w') as file:
        file.write("Detection Counts:\n")
        for key, value in counts.items():
            file.write(f"{key}: {value}\n")
        file.write("\nDetected files:\n")
        for item in detected_files:
            file.write(f"{item}\n")

    # Save corrupted files log
    with open(corrupted_files_log, 'w') as file:
        file.write(f"Total corrupted/empty JSON files: {len(corrupted_files)}\n")
        for item in corrupted_files:
            file.write(f"{item}\n")

# Process the parent folder
detected_files, corrupted_files, counts = process_parent_folder(parent_folder)

# Save the results
save_results(detected_files, corrupted_files, counts, output_file, corrupted_files_log)

print(f"Detected files and counts saved to {output_file}")
print(f"Corrupted/empty files log saved to {corrupted_files_log}")
print("Detection Counts:")
for key, value in counts.items():
    print(f"{key}: {value}")
print(f"Total corrupted/empty JSON files: {len(corrupted_files)}")

Detected files and counts saved to semi_control_env_training.txt
Corrupted/empty files log saved to corrupted_files_log.txt
Detection Counts:
Face: 0
Cigar: 0
Phone: 0
Leye: 0
Leye_Open: 0
Leye_Closed: 41052
Reye: 0
Reye_Open: 0
Reye_Closed: 41052
Mouth: 0
Mouth_Open: 0
Mouth_Closed: 41052
Total corrupted/empty JSON files: 1


In [None]:
import os
import json

# Parent folder path (you will provide this)
parent_folder = "/home/limonubuntu/Work/Limon/other_task/driver_anomaly/dataset/Training-20250217T042013Z-004/Training/controlled_environment"  # Replace with the actual parent folder path

# Output text file to save results
output_file = "check_cigar_training.txt"
corrupted_files_log = "corrupted_files_log.txt"

def check_json_for_objects(json_file_path):
    """Check for all specified objects and their states in the JSON file."""
    try:
        with open(json_file_path, 'r') as file:
            data = json.load(file)
            object_info = data.get("ObjectInfo", {}).get("BoundingBox", {})

            # Initialize counts for each object and state
            results = {
                "Cigar": object_info.get("Cigar", {}).get("isVisible", False),
                "Phone": object_info.get("Phone", {}).get("isVisible", False),
            }
            return results
    except (json.JSONDecodeError, FileNotFoundError):
        # Skip the file if it's empty, invalid JSON, or not found
        return None

def process_parent_folder(parent_folder):
    """Traverse through the parent folder and its subfolders to find JSON files."""
    detected_files = []
    corrupted_files = []
    counts = {
        "Cigar": 0,
        "Phone": 0,
    }

    for root, _, files in os.walk(parent_folder):
        for file in files:
            if file.endswith(".json"):
                json_file_path = os.path.join(root, file)
                result = check_json_for_objects(json_file_path)

                if result is None:
                    # File is corrupted or empty
                    corrupted_files.append(json_file_path)
                else:
                    # Update counts for each object and state
                    for key in counts:
                        if result.get(key, False):
                            counts[key] += 1

                    # If any object is detected, save the file path
                    if any(result.values()):
                        relative_path = os.path.relpath(json_file_path, parent_folder)
                        detected_files.append(relative_path)

    return detected_files, corrupted_files, counts

def save_results(detected_files, corrupted_files, counts, output_file, corrupted_files_log):
    """Save the detected files, counts, and corrupted files log to text files."""
    # Save detected files and counts
    with open(output_file, 'w') as file:
        file.write("Detection Counts:\n")
        for key, value in counts.items():
            file.write(f"{key}: {value}\n")
        file.write("\nDetected files:\n")
        for item in detected_files:
            file.write(f"{item}\n")

    # Save corrupted files log
    with open(corrupted_files_log, 'w') as file:
        file.write(f"Total corrupted/empty JSON files: {len(corrupted_files)}\n")
        for item in corrupted_files:
            file.write(f"{item}\n")

# Process the parent folder
detected_files, corrupted_files, counts = process_parent_folder(parent_folder)

# Save the results
save_results(detected_files, corrupted_files, counts, output_file, corrupted_files_log)

print(f"Detected files and counts saved to {output_file}")
print("Detection Counts:")
for key, value in counts.items():
    print(f"{key}: {value}")

Detected files and counts saved to check_cigar_training.txt
Corrupted/empty files log saved to corrupted_files_log.txt
Detection Counts:
Cigar: 11281
Phone: 11173
Total corrupted/empty JSON files: 0


### Find eye-open/close, mouth open/close data

In [None]:
import os
import json
import random
import shutil

# Parent folder paths (you will provide these)
parent_folder_json = "/home/limonubuntu/Work/Limon/other_task/driver_anomaly/dataset/Training-20250217T042013Z-004/Training/controlled_environment"  
parent_folder_images = "/home/limonubuntu/Work/Limon/other_task/driver_anomaly/dataset/Training_images/[원천]bbox(통제환경)-003_(controlled environment)"

output_json_folder = "/home/limonubuntu/Work/Limon/other_task/driver_anomaly/test/test_dataset/output_g/json"
output_image_folder = "/home/limonubuntu/Work/Limon/other_task/driver_anomaly/test/test_dataset/output_g/images"

# Output text file to save results
output_file = "/home/limonubuntu/Work/Limon/other_task/driver_anomaly/test/test_dataset/output_g/json_list.txt"
corrupted_files_log = "/home/limonubuntu/Work/Limon/other_task/driver_anomaly/test/test_dataset/output_g/images/corrupted_files_log_selection.txt"

# Target counts for each class
target_counts = {
    "leye_open": 50,
    "leye_close": 50,
    "reye_open": 50,
    "reye_close": 50,
    "mouth_open": 50,
    "mouth_close": 50,
}

def check_json_for_objects(json_file_path):
    """Check for specified objects and their states in the JSON file."""
    try:
        with open(json_file_path, 'r') as file:
            data = json.load(file)
            object_info = data.get("ObjectInfo", {}).get("BoundingBox", {})

            # Initialize states for each object and state
            results = {
                "leye_open": False,
                "leye_close": False,
                "reye_open": False,
                "reye_close": False,
                "mouth_open": False,
                "mouth_close": False,
            }

            if object_info.get("Leye", {}).get("isVisible", False):
                if object_info.get("Leye", {}).get("Opened", False):
                    results["leye_open"] = True
                else:
                    results["leye_close"] = True

            if object_info.get("Reye", {}).get("isVisible", False):
                if object_info.get("Reye", {}).get("Opened", False):
                    results["reye_open"] = True
                else:
                    results["reye_close"] = True

            if object_info.get("Mouth", {}).get("isVisible", False):
                if object_info.get("Mouth", {}).get("Opened", False):
                    results["mouth_open"] = True
                else:
                    results["mouth_close"] = True

            return results
    except (json.JSONDecodeError, FileNotFoundError):
        # Skip the file if it's empty, invalid JSON, or not found
        return None

def select_json_files(parent_folder_json, target_counts):
    """Select JSON files randomly for each class based on object detection."""
    selected_files = {
        "leye_open": [],
        "leye_close": [],
        "reye_open": [],
        "reye_close": [],
        "mouth_open": [],
        "mouth_close": [],
    }
    corrupted_files = []
    available_files = {
        "leye_open": [],
        "leye_close": [],
        "reye_open": [],
        "reye_close": [],
        "mouth_open": [],
        "mouth_close": [],
    }

    for root, _, files in os.walk(parent_folder_json):
        for file in files:
            if file.endswith(".json"):
                json_file_path = os.path.join(root, file)
                result = check_json_for_objects(json_file_path)

                if result is None:
                    corrupted_files.append(json_file_path)
                else:
                    relative_path = os.path.relpath(json_file_path, parent_folder_json)
                    for class_name, detected in result.items():
                        if detected:
                            available_files[class_name].append(relative_path)

    # Randomly select files for each class
    for class_name, count in target_counts.items():
        files_for_class = available_files[class_name]
        if len(files_for_class) <= count:
            selected_files[class_name] = files_for_class
        else:
            selected_files[class_name] = random.sample(files_for_class, count)

    return selected_files, corrupted_files

def copy_json_files(selected_files, parent_folder_json, output_json_folder):
    """Copy selected JSON files to the output folder, WITHOUT maintaining directory structure."""
    os.makedirs(output_json_folder, exist_ok=True)
    copied_json_files = []

    for class_files in selected_files.values():
        for relative_path in class_files:
            source_path = os.path.join(parent_folder_json, relative_path)
            filename = os.path.basename(relative_path) # Get only the filename
            destination_path = os.path.join(output_json_folder, filename) # Place directly in output folder
            shutil.copy2(source_path, destination_path) # copy2 to preserve metadata
            copied_json_files.append(destination_path)
    return copied_json_files

def copy_image_files(selected_files, parent_folder_json, parent_folder_images, output_image_folder):
    """Copy corresponding image files to the output folder, WITHOUT maintaining directory structure."""
    os.makedirs(output_image_folder, exist_ok=True)
    copied_image_files = []

    for class_files in selected_files.values():
        for relative_json_path in class_files:
            image_name_base = os.path.splitext(relative_json_path)[0]
            image_extensions = ['.jpg', '.png', '.jpeg'] # Possible image extensions
            copied = False
            for ext in image_extensions:
                relative_image_path = image_name_base + ext
                source_image_path = os.path.join(parent_folder_images, relative_image_path)
                if os.path.exists(source_image_path):
                    filename = os.path.basename(relative_image_path) # Get only the filename
                    destination_image_path = os.path.join(output_image_folder, filename) # Place directly in output folder
                    shutil.copy2(source_image_path, destination_image_path)
                    copied_image_files.append(destination_image_path)
                    copied = True
                    break # Stop checking extensions if one is found and copied
            if not copied:
                print(f"Warning: No image found for JSON: {relative_json_path}") # Log if no image found

    return copied_image_files


"""
Please read the and write a python code which will hbave a directory of json files
it read each json files and do count the classes like following.
count reye+leye is visibile true then count
count mouth closed
count mouth open
and save in a text file

"""


def save_results(selected_files, corrupted_files, output_file, corrupted_files_log):
    """Save the selected files, and corrupted files log to text files."""
    total_selected_count = sum(len(files) for files in selected_files.values())

    # Save selected files and counts
    with open(output_file, 'w') as file:
        file.write("Selected JSON files counts:\n")
        for key, files in selected_files.items():
            file.write(f"{key}: {len(files)}\n")
        file.write(f"\nTotal selected JSON files: {total_selected_count}\n")
        file.write("\nSelected JSON files paths (in output folder):\n") # Updated description
        for key, files in selected_files.items():
            file.write(f"\n--- {key} ---\n")
            for item in files:
                file.write(f"{item}\n")

    # Save corrupted files log
    with open(corrupted_files_log, 'w') as file:
        file.write(f"Total corrupted/empty JSON files: {len(corrupted_files)}\n")
        for item in corrupted_files:
            file.write(f"{item}\n")


# Process the parent folder and select JSON files
selected_json_files_dict, corrupted_files = select_json_files(parent_folder_json, target_counts)

# Copy selected JSON files
copied_json_files_list = copy_json_files(selected_json_files_dict, parent_folder_json, output_json_folder)

# Copy corresponding image files
copied_image_files_list = copy_image_files(selected_json_files_dict, parent_folder_json, parent_folder_images, output_image_folder)


# Save the results
save_results(selected_json_files_dict, corrupted_files, output_file, corrupted_files_log)

print(f"Selected JSON files and counts saved to {output_file}")
print(f"Corrupted/empty files log saved to {corrupted_files_log}")
print("Selected JSON Files Counts:")
total_selected = 0
for key, files in selected_json_files_dict.items():
    count = len(files)
    total_selected += count
    print(f"{key}: {count}")
print(f"Total selected JSON files: {total_selected}")
print(f"Total corrupted/empty JSON files: {len(corrupted_files)}")
print(f"Output JSON files are in: {output_json_folder}")
print(f"Output Image files are in: {output_image_folder}")

Selected JSON files and counts saved to /home/limonubuntu/Work/Limon/other_task/driver_anomaly/test/test_dataset/output_g/json_list.txt
Corrupted/empty files log saved to /home/limonubuntu/Work/Limon/other_task/driver_anomaly/test/test_dataset/output_g/images/corrupted_files_log_selection.txt
Selected JSON Files Counts:
leye_open: 50
leye_close: 50
reye_open: 50
reye_close: 50
mouth_open: 50
mouth_close: 50
Total selected JSON files: 300
Total corrupted/empty JSON files: 0
Output JSON files are in: /home/limonubuntu/Work/Limon/other_task/driver_anomaly/test/test_dataset/output_g/json
Output Image files are in: /home/limonubuntu/Work/Limon/other_task/driver_anomaly/test/test_dataset/output_g/images


### Count no of classes from demo test dataset

In [None]:
import json
import os

def count_facial_features(json_dir, output_file):
    face_count = 0
    eye_open_count = 0
    eye_closed_count = 0
    both_eyes_closed = 0
    mouth_closed_count = 0
    mouth_open_count = 0
    total_files = 0

    with open(output_file, "w") as outfile:
        for filename in os.listdir(json_dir):
            if filename.endswith(".json"):
                total_files += 1
                filepath = os.path.join(json_dir, filename)
                try:
                    with open(filepath, 'r') as f:
                        data = json.load(f)

                    if data["ObjectInfo"]["BoundingBox"]["Face"]["isVisible"]:
                        face_count += 1

                        # Count if both eyes are open
                        if data["ObjectInfo"]["BoundingBox"]["Leye"]["Opened"] and data["ObjectInfo"]["BoundingBox"]["Reye"]["Opened"]:
                            eye_open_count += 1
                        #count if both eyes are closed
                        elif not data["ObjectInfo"]["BoundingBox"]["Leye"]["Opened"] and not data["ObjectInfo"]["BoundingBox"]["Reye"]["Opened"]:
                            both_eyes_closed +=1
                        else:
                            eye_closed_count +=1


                        # Count mouth opened/closed
                        if data["ObjectInfo"]["BoundingBox"]["Mouth"]["Opened"]:
                            mouth_open_count += 1
                        else:
                            mouth_closed_count += 1

                except (FileNotFoundError, json.JSONDecodeError, KeyError) as e:
                    print(f"Error processing file {filename}: {e}")

        outfile.write(f"Total Files Processed: {total_files}\n")
        outfile.write(f"Face Visible: {face_count}\n")
        outfile.write(f"Both Eyes Open: {eye_open_count}\n")
        outfile.write(f"Both Eyes Closed: {both_eyes_closed}\n")
        outfile.write(f"Single Eye Closed: {eye_closed_count}\n")
        outfile.write(f"Mouth Closed: {mouth_closed_count}\n")
        outfile.write(f"Mouth Open: {mouth_open_count}\n")


if __name__ == "__main__":
    json_directory = "/home/limonubuntu/Work/Limon/other_task/driver_anomaly/test/test_dataset/output_g/original_json" 
    output_filename = "facial_feature_counts.txt"
    count_facial_features(json_directory, output_filename)
    print(f"Results saved to {output_filename}")

Results saved to facial_feature_counts.txt


In [None]:
/home/limonubuntu/Work/Limon/other_task/driver_anomaly/test/test_notebok/test_demo_face_behaviour/output_images


## Count no of detected claases

In [None]:
import os
import re

def process_files(directory, output_file):
    eye_open = eye_closed = mouth_open = mouth_closed = yawn_count = 0
    
    txt_cnt = 0
    for filename in os.listdir(directory):
        if filename.endswith(".txt"):
            txt_cnt += 1
            with open(os.path.join(directory, filename), "r") as file:
                for line in file:
                    eye_match = re.search(r"eye:\s*(open|closed)", line, re.IGNORECASE)
                    mouth_match = re.search(r"mouth:\s*(open|closed)", line, re.IGNORECASE)
                    yawn_match = re.search(r"Yawn count:\s*(\d+)", line, re.IGNORECASE)
                    
                    if eye_match:
                        if eye_match.group(1).lower() == "open":
                            eye_open += 1
                        else:
                            eye_closed += 1
                    
                    if mouth_match:
                        if mouth_match.group(1).lower() == "open":
                            mouth_open += 1
                        else:
                            mouth_closed += 1
                    
                    if yawn_match:
                        yawn_count += int(yawn_match.group(1))
    


    print("No o txt files: ", txt_cnt)
    # Save results to a text file
    with open(output_file, "w") as out:
        out.write(f"Total eye open: {eye_open}\n")
        out.write(f"Total eye closed: {eye_closed}\n")
        out.write(f"Total mouth open: {mouth_open}\n")
        out.write(f"Total mouth closed: {mouth_closed}\n")
        out.write(f"Total yawns: {yawn_count}\n")

if __name__ == "__main__":
    input_directory = "/home/limonubuntu/Work/Limon/other_task/driver_anomaly/test/test_notebok/test_demo_face_behaviour/output_images/" 
    output_file = "summary.txt"
    process_files(input_directory, output_file)
    print("Processing complete. Summary saved in", output_file)

No o txt files:  298
Processing complete. Summary saved in summary.txt


In [6]:
import cv2

def get_image_color_format(image_path):
 
    try:
        img = cv2.imread(image_path)
        print(img.shape)

        if img is None:
            return "Unknown (Image not found or could not be read)"

        shape = img.shape  # Get the shape of the image array

        if len(shape) == 2:
            return "Grayscale"  # Height and Width only
        elif len(shape) == 3:
            return "RGB"  # Height, Width, and Channels (usually 3 for RGB)
        else:
            return "Unknown (Unexpected number of channels)"

    except Exception as e:
        return f"Error: {e}"

# Example usage
image_path = "/home/limonubuntu/Work/Limon/other_task/driver_anomaly/dataset/Training_images/[원천]bbox(통제환경)-003_(controlled environment)/021_G1/021_G1_13_후면광원_좌사이드미러_졸음재현_20200922_203727_00644.jpg"  
color_format = get_image_color_format(image_path)
print(f"The image color format is: {color_format}")

(1280, 800, 3)
The image color format is: RGB


In [None]:
import os
import json

def analyze_folder(folder_path):
    face_detected_count = 0
    left_eye_open_count = 0
    left_eye_closed_count = 0
    right_eye_open_count = 0
    right_eye_closed_count = 0
    mouth_open_count = 0
    mouth_closed_count = 0
    total_files = 0

    for filename in os.listdir(folder_path):
        if filename.endswith('.json'):
            total_files += 1
            file_path = os.path.join(folder_path, filename)

            with open(file_path, 'r', encoding='utf-8') as file:
                data = json.load(file)

                # Face detection
                face_info = data.get('ObjectInfo', {}).get('BoundingBox', {}).get('Face', {})
                if face_info.get('isVisible', False):
                    face_detected_count += 1

                # Left Eye
                leye_info = data.get('ObjectInfo', {}).get('BoundingBox', {}).get('Leye', {})
                if leye_info.get('isVisible', False):
                    if leye_info.get('Opened', False):
                        left_eye_open_count += 1
                    else:
                        left_eye_closed_count += 1

                # Right Eye
                reye_info = data.get('ObjectInfo', {}).get('BoundingBox', {}).get('Reye', {})
                if reye_info.get('isVisible', False):
                    if reye_info.get('Opened', False):
                        right_eye_open_count += 1
                    else:
                        right_eye_closed_count += 1

                # Mouth
                mouth_info = data.get('ObjectInfo', {}).get('BoundingBox', {}).get('Mouth', {})
                if mouth_info.get('isVisible', False):
                    if mouth_info.get('Opened', False):
                        mouth_closed_count += 1
                    else:
                        mouth_open_count += 1

    # Summary
    print(f"Total JSON Files Processed: {total_files}")
    print(f"Total Faces Detected: {face_detected_count}")
    print(f"Left Eye - Open: {left_eye_open_count}, Closed: {left_eye_closed_count}")
    print(f"Right Eye - Open: {right_eye_open_count}, Closed: {right_eye_closed_count}")
    print(f"Mouth - Open: {mouth_open_count}, Closed: {mouth_closed_count}")
# Example usage
folder_path = "/home/limonubuntu/Work/Limon/other_task/driver_anomaly/test/test_dataset/eval_face_mesh/all/" 
analyze_folder(folder_path)


Total JSON Files Processed: 230
Total Faces Detected: 230
Left Eye - Open: 144, Closed: 82
Right Eye - Open: 140, Closed: 80
Mouth - Open: 111, Closed: 57


In [None]:
"""
Faces = 200
Eyes_open = 120
Eyes_closed = 60
Mouth_open = 100
Mouth_closed = 50
Head_left = 11
Head_right = 6
Yawn = 16
"""

In [9]:
import cv2
import os

def extract_frames(video_path, output_folder, frame_interval=1):
    """
    Extract frames from a video and save them as images.

    Parameters:
    - video_path: Path to the input video.
    - output_folder: Folder to save the extracted frames.
    - frame_interval: Save every nth frame (e.g., 1 saves every frame, 5 saves every 5th frame).
    """

    # Create the output folder if it doesn't exist
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    # Open the video file
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        print(f"Failed to open video: {video_path}")
        return

    frame_count = 0
    saved_count = 0

    while True:
        ret, frame = cap.read()

        if not ret:
            break

        # Save every nth frame
        if frame_count % frame_interval == 0:
            frame_filename = os.path.join(output_folder, f"frame_{saved_count:04d}.jpg")
            cv2.imwrite(frame_filename, frame)
            saved_count += 1

        frame_count += 1

    cap.release()
    print(f"Extracted {saved_count} frames from {video_path} to {output_folder}")

# Example usage
video_path = "test_video/video1.mp4"
output_folder = "extracted_frames_video1"
extract_frames(video_path, output_folder, frame_interval=5)  # Save every 5th frame


Extracted 376 frames from test_video/video1.mp4 to extracted_frames_video1


In [3]:
import numpy as np
import cv2
import mediapipe as mp
import time

mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(min_detection_confidence=0.5,min_tracking_confidence=0.5)

mp_drawing = mp.solutions.drawing_utils

drawing_spec = mp_drawing.DrawingSpec(color=(128,0,128),thickness=2,circle_radius=1)

cap = cv2.VideoCapture(0)

while cap.isOpened():
    success, image = cap.read()

    start = time.time()

    image = cv2.cvtColor(cv2.flip(image,1),cv2.COLOR_BGR2RGB) #flipped for selfie view

    image.flags.writeable = False

    results = face_mesh.process(image)

    image.flags.writeable = True

    image = cv2.cvtColor(image,cv2.COLOR_RGB2BGR)

    img_h , img_w, img_c = image.shape
    face_2d = []
    face_3d = []

    if results.multi_face_landmarks:
        for face_landmarks in results.multi_face_landmarks:
            for idx, lm in enumerate(face_landmarks.landmark):
                if idx == 33 or idx == 263 or idx ==1 or idx == 61 or idx == 291 or idx==199:
                    if idx ==1:
                        nose_2d = (lm.x * img_w,lm.y * img_h)
                        nose_3d = (lm.x * img_w,lm.y * img_h,lm.z * 3000)
                    x,y = int(lm.x * img_w),int(lm.y * img_h)

                    face_2d.append([x,y])
                    face_3d.append(([x,y,lm.z]))


            #Get 2d Coord
            face_2d = np.array(face_2d,dtype=np.float64)

            face_3d = np.array(face_3d,dtype=np.float64)

            focal_length = 1 * img_w

            cam_matrix = np.array([[focal_length,0,img_h/2],
                                  [0,focal_length,img_w/2],
                                  [0,0,1]])
            distortion_matrix = np.zeros((4,1),dtype=np.float64)

            success,rotation_vec,translation_vec = cv2.solvePnP(face_3d,face_2d,cam_matrix,distortion_matrix)


            #getting rotational of face
            rmat,jac = cv2.Rodrigues(rotation_vec)

            angles,mtxR,mtxQ,Qx,Qy,Qz = cv2.RQDecomp3x3(rmat)

            x = angles[0] * 360
            y = angles[1] * 360
            z = angles[2] * 360

            #here based on axis rot angle is calculated
            if y < -10:
                text="Looking Left"
            elif y > 10:
                text="Looking Right"
            elif x < -10:
                text="Looking Down"
            elif x > 10:
                text="Looking Up"
            else:
                text="Forward"

            nose_3d_projection,jacobian = cv2.projectPoints(nose_3d,rotation_vec,translation_vec,cam_matrix,distortion_matrix)

            p1 = (int(nose_2d[0]),int(nose_2d[1]))
            p2 = (int(nose_2d[0] + y*10), int(nose_2d[1] -x *10))

            cv2.line(image,p1,p2,(255,0,0),3)

            cv2.putText(image,text,(20,50),cv2.FONT_HERSHEY_SIMPLEX,2,(0,255,0),2)
            cv2.putText(image,"x: " + str(np.round(x,2)),(500,50),cv2.FONT_HERSHEY_SIMPLEX,1,(0,0,255),2)
            cv2.putText(image,"y: "+ str(np.round(y,2)),(500,100),cv2.FONT_HERSHEY_SIMPLEX,1,(0,0,255),2)
            cv2.putText(image,"z: "+ str(np.round(z, 2)), (500, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)


        end = time.time()
        totalTime = end-start

        fps = 1/totalTime
        print("FPS: ",fps)

        cv2.putText(image,f'FPS: {int(fps)}',(20,450),cv2.FONT_HERSHEY_SIMPLEX,1.5,(0,255,0),2)

        mp_drawing.draw_landmarks(image=image,
                                  landmark_list=face_landmarks,
                                  connections=mp_face_mesh.FACEMESH_CONTOURS,
                                  landmark_drawing_spec=drawing_spec,
                                  connection_drawing_spec=drawing_spec)
    cv2.imshow('Head Pose Detection',image)
    if cv2.waitKey(5) & 0xFF ==27:
        break
cap.release()










[ WARN:0@603.517] global cap_v4l.cpp:913 open VIDEOIO(V4L2:/dev/video0): can't open camera by index
W0000 00:00:1740998032.313236   92253 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1740998032.317898   92253 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
[ERROR:0@603.615] global obsensor_uvc_stream_channel.cpp:158 getStreamChannelGroup Camera index out of range
[ WARN:0@603.615] global cap_v4l.cpp:803 requestBuffers VIDEOIO(V4L2:/dev/video0): failed VIDIOC_REQBUFS: errno=19 (No such device)


In [2]:
import numpy as np
import cv2
import mediapipe as mp
import os
import time

# Initialize MediaPipe FaceMesh
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(min_detection_confidence=0.5, min_tracking_confidence=0.5)

mp_drawing = mp.solutions.drawing_utils
drawing_spec = mp_drawing.DrawingSpec(color=(128, 0, 128), thickness=2, circle_radius=1)

def process_image(image_path):
    image = cv2.imread(image_path)

    start = time.time()

    # Flip for selfie view (optional - you can remove if not needed)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    # Process the image with MediaPipe FaceMesh
    image.flags.writeable = False
    results = face_mesh.process(image)
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

    img_h, img_w, img_c = image.shape
    face_2d = []
    face_3d = []

    if results.multi_face_landmarks:
        for face_landmarks in results.multi_face_landmarks:
            for idx, lm in enumerate(face_landmarks.landmark):
                if idx in [33, 263, 1, 61, 291, 199]:
                    if idx == 1:
                        nose_2d = (lm.x * img_w, lm.y * img_h)
                        nose_3d = (lm.x * img_w, lm.y * img_h, lm.z * 3000)

                    x, y = int(lm.x * img_w), int(lm.y * img_h)
                    face_2d.append([x, y])
                    face_3d.append([x, y, lm.z])

            face_2d = np.array(face_2d, dtype=np.float64)
            face_3d = np.array(face_3d, dtype=np.float64)

            focal_length = 1 * img_w
            cam_matrix = np.array([[focal_length, 0, img_h / 2],
                                   [0, focal_length, img_w / 2],
                                   [0, 0, 1]])
            distortion_matrix = np.zeros((4, 1), dtype=np.float64)

            success, rotation_vec, translation_vec = cv2.solvePnP(face_3d, face_2d, cam_matrix, distortion_matrix)

            rmat, _ = cv2.Rodrigues(rotation_vec)
            angles, _, _, _, _, _ = cv2.RQDecomp3x3(rmat)

            x_angle = angles[0] * 360
            y_angle = angles[1] * 360
            z_angle = angles[2] * 360

            if y_angle < -10:
                text = "Looking Left"
            elif y_angle > 10:
                text = "Looking Right"
            elif x_angle < -10:
                text = "Looking Down"
            elif x_angle > 10:
                text = "Looking Up"
            else:
                text = "Forward"

            nose_3d_projection, _ = cv2.projectPoints(nose_3d, rotation_vec, translation_vec, cam_matrix, distortion_matrix)

            p1 = (int(nose_2d[0]), int(nose_2d[1]))
            p2 = (int(nose_2d[0] + y_angle * 10), int(nose_2d[1] - x_angle * 10))

            cv2.line(image, p1, p2, (255, 0, 0), 3)

            cv2.putText(image, text, (20, 50), cv2.FONT_HERSHEY_SIMPLEX, 2, (0, 255, 0), 2)
            cv2.putText(image, f"x: {x_angle:.2f}", (500, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
            cv2.putText(image, f"y: {y_angle:.2f}", (500, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
            cv2.putText(image, f"z: {z_angle:.2f}", (500, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

            mp_drawing.draw_landmarks(
                image=image,
                landmark_list=face_landmarks,
                connections=mp_face_mesh.FACEMESH_CONTOURS,
                landmark_drawing_spec=drawing_spec,
                connection_drawing_spec=drawing_spec
            )

    end = time.time()
    fps = 1 / (end - start + 1e-5)

    cv2.putText(image, f'FPS: {int(fps)}', (20, 450), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 0), 2)

    return image

def process_folder(folder_path, output_folder):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    image_files = [f for f in os.listdir(folder_path) if f.lower().endswith(('png', 'jpg', 'jpeg'))]

    for img_file in image_files:
        img_path = os.path.join(folder_path, img_file)
        processed_image = process_image(img_path)

        # Show the processed image (optional)
        cv2.imshow('Processed Image', processed_image)
        cv2.waitKey(1)  # 1 millisecond delay between images

        # Save processed image
        output_path = os.path.join(output_folder, img_file)
        cv2.imwrite(output_path, processed_image)

    cv2.destroyAllWindows()

# Example usage
folder_path = 'extracted_frames'  # Folder containing images
output_folder = 'output_images22'  # Folder where processed images will be saved
process_folder(folder_path, output_folder)











W0000 00:00:1740997548.608846   87713 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1740997548.616928   87713 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


In [17]:
import cv2
from ultralytics import YOLO

# Load the YOLOv8 model
model = YOLO("best_10.pt")  # Or your custom model path

# Set the confidence threshold and NMS threshold (during inference)
confidence_threshold = 0.5
iou_threshold = 0.7  # IOU (Intersection Over Union) threshold, also related to NMS

# Get class names from the model or define them manually
class_names = model.names  # Attempt to get class names from the model


# Open the webcam
cap = cv2.VideoCapture(0)

if not cap.isOpened():
    print("Error: Could not open webcam.")
    exit()

try:
    while True:
        # Read a frame from the webcam
        ret, frame = cap.read()

        if not ret:
            print("Error: Could not read frame from webcam.")
            break

        # Perform inference on the frame, setting conf and iou
        results = model(frame, conf=confidence_threshold, iou=iou_threshold)

        # Process the results
        for result in results:
            boxes = result.boxes.cpu().numpy()  # Get boxes on CPU in numpy
            for i, box in enumerate(boxes):
                xyxy = box.xyxy[0].astype(int)  # Bounding box coordinates (x1, y1, x2, y2)
                confidence = box.conf[0]        # Confidence score
                class_id = int(box.cls[0])       # Class ID (integer)

                # Get class name
                class_name = class_names[class_id] if class_id < len(class_names) else str(class_id)  # Handle potential index errors

                # Draw the bounding box and label on the frame
                label = f"{class_name} {confidence:.2f}"
                cv2.rectangle(frame, (xyxy[0], xyxy[1]), (xyxy[2], xyxy[3]), (0, 255, 0), 2)  # Green rectangle
                cv2.putText(frame, label, (xyxy[0], xyxy[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

        # Display the frame with detections
        cv2.imshow("YOLOv8 Webcam Inference", frame)

        # Exit on pressing 'q'
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

except Exception as e:
    print(f"An error occurred: {e}")

finally:
    # Release the webcam and close all windows
    cap.release()
    cv2.destroyAllWindows()
    print("Webcam released and windows closed.")


0: 480x640 (no detections), 18.9ms
Speed: 1.7ms preprocess, 18.9ms inference, 0.2ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 (no detections), 27.5ms
Speed: 0.7ms preprocess, 27.5ms inference, 0.2ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 (no detections), 28.0ms
Speed: 0.8ms preprocess, 28.0ms inference, 0.2ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 (no detections), 53.6ms
Speed: 1.3ms preprocess, 53.6ms inference, 0.3ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 (no detections), 24.4ms
Speed: 0.9ms preprocess, 24.4ms inference, 0.2ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 (no detections), 64.0ms
Speed: 1.8ms preprocess, 64.0ms inference, 0.2ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 (no detections), 22.4ms
Speed: 0.9ms preprocess, 22.4ms inference, 0.2ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 (no detections), 46.9ms
Speed: 1.1ms preprocess, 46.9ms i

In [None]:
import cv2
from ultralytics import YOLO

# Load the YOLOv8 model
model = YOLO("best_10.pt")  # Or your custom model path

# Set the confidence threshold and NMS threshold
confidence_threshold = 0.5
iou_threshold = 0.7

# Get class names from the model or define them manually
class_names = model.names  # Attempt to get class names from the model

# Open the webcam
cap = cv2.VideoCapture(0)

if not cap.isOpened():
    print("Error: Could not open webcam.")
    exit()

try:
    while True:
        # Read a frame from the webcam
        ret, frame = cap.read()

        if not ret:
            print("Error: Could not read frame from webcam.")
            break

        # Convert the frame to grayscale
        gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # Convert the grayscale frame back to BGR (3 channels) to match the model's input
        gray_bgr = cv2.cvtColor(gray_frame, cv2.COLOR_GRAY2BGR)

        # Perform inference on the BGR frame
        results = model(gray_bgr, conf=confidence_threshold, iou=iou_threshold)

        # Process the results
        for result in results:
            boxes = result.boxes.cpu().numpy()  # Get boxes on CPU in numpy
            for i, box in enumerate(boxes):
                xyxy = box.xyxy[0].astype(int)  # Bounding box coordinates (x1, y1, x2, y2)
                confidence = box.conf[0]        # Confidence score
                class_id = int(box.cls[0])       # Class ID (integer)

                # Get class name
                class_name = class_names[class_id] if class_id < len(class_names) else str(class_id)  # Handle potential index errors

                # Draw the bounding box and label on the original color frame
                label = f"{class_name} {confidence:.2f}"
                cv2.rectangle(frame, (xyxy[0], xyxy[1]), (xyxy[2], xyxy[3]), (0, 255, 0), 2)  # Green rectangle
                cv2.putText(frame, label, (xyxy[0], xyxy[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

        # Display the frame with detections (displaying the original color frame)
        cv2.imshow("YOLOv8 Webcam Inference", frame)

        # Exit on pressing 'q'
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

except Exception as e:
    print(f"An error occurred: {e}")

finally:
    # Release the webcam and close all windows
    cap.release()
    cv2.destroyAllWindows()
    print("Webcam released and windows closed.")


An error occurred: axes don't match array
Webcam released and windows closed.


In [None]:
import cv2
import torch
import numpy as np
from ultralytics import YOLO

# Load the trained YOLOv8 model
model_path = "best.pt" 
try:
    model = YOLO(model_path)
    print(f"Model successfully loaded from {model_path}")
except Exception as e:
    print(f"Error loading the model: {e}")
    exit()

# Set up webcam
cap = cv2.VideoCapture(0)
if not cap.isOpened():
    print("Error: Could not open webcam")
    exit()

confidence_threshold = 0.7
iou_threshold = 0.7

while True:
    # Capture frame
    ret, frame = cap.read()
    if not ret:
        print("Error: Could not read frame")
        break

    # Convert to grayscale
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    # "Fake" RGB conversion (duplicating the channel) 
    rgb = cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR)  # Convert grayscale to 3-channel RGB

    # Run inference
    # results = model(rgb)  # Pass the 3-channel grayscale image
    results = model(rgb, conf=confidence_threshold, iou=iou_threshold)

    # Display results (even if they are not meaningful)
    annotated_frame = results[0].plot()

    cv2.imshow("YOLOv8 on Grayscale (Webcam)", annotated_frame)

    # Exit on 'q' press
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Release resources
cap.release()
cv2.destroyAllWindows()

Model successfully loaded from best.pt

0: 320x416 (no detections), 19.7ms
Speed: 1.3ms preprocess, 19.7ms inference, 0.3ms postprocess per image at shape (1, 3, 320, 416)

0: 320x416 (no detections), 13.6ms
Speed: 0.6ms preprocess, 13.6ms inference, 0.2ms postprocess per image at shape (1, 3, 320, 416)

0: 320x416 (no detections), 44.5ms
Speed: 0.9ms preprocess, 44.5ms inference, 0.3ms postprocess per image at shape (1, 3, 320, 416)

0: 320x416 (no detections), 14.4ms
Speed: 0.6ms preprocess, 14.4ms inference, 0.2ms postprocess per image at shape (1, 3, 320, 416)

0: 320x416 (no detections), 53.2ms
Speed: 1.8ms preprocess, 53.2ms inference, 0.3ms postprocess per image at shape (1, 3, 320, 416)

0: 320x416 (no detections), 16.9ms
Speed: 1.0ms preprocess, 16.9ms inference, 0.2ms postprocess per image at shape (1, 3, 320, 416)

0: 320x416 (no detections), 48.7ms
Speed: 1.5ms preprocess, 48.7ms inference, 0.3ms postprocess per image at shape (1, 3, 320, 416)

0: 320x416 (no detections), 1

In [1]:
## Random test dataset for evaluation

import os
import random
import shutil

# Paths
base_folder = '/home/limonubuntu/Work/Limon/other_task/driver_anomaly/test/ob/2_class_test_data'
images_folder = os.path.join(base_folder, 'images')
labels_folder = os.path.join(base_folder, 'labels')
output_folder = 'selected_data'

# Make output folders
output_images_folder = os.path.join(output_folder, 'images')
output_labels_folder = os.path.join(output_folder, 'labels')

os.makedirs(output_images_folder, exist_ok=True)
os.makedirs(output_labels_folder, exist_ok=True)

# Read all labels and check which contain cigars or phones
cigar_images = []
phone_images = []

for label_file in os.listdir(labels_folder):
    if label_file.endswith('.txt'):
        label_path = os.path.join(labels_folder, label_file)
        image_file = label_file.replace('.txt', '.jpg')  # Assuming jpg images
        
        image_path = os.path.join(images_folder, image_file)

        if not os.path.exists(image_path):
            continue  # Skip if corresponding image is missing

        with open(label_path, 'r') as f:
            lines = f.readlines()

        contains_cigar = any(line.startswith('0 ') for line in lines)
        contains_phone = any(line.startswith('1 ') for line in lines)

        if contains_cigar:
            cigar_images.append((image_path, label_path))

        if contains_phone:
            phone_images.append((image_path, label_path))

# Randomly select 50 of each
selected_cigars = random.sample(cigar_images, min(50, len(cigar_images)))
selected_phones = random.sample(phone_images, min(50, len(phone_images)))

# Copy to new folder
for image_path, label_path in selected_cigars + selected_phones:
    shutil.copy(image_path, os.path.join(output_images_folder, os.path.basename(image_path)))
    shutil.copy(label_path, os.path.join(output_labels_folder, os.path.basename(label_path)))

print(f"Selected {len(selected_cigars)} cigar images and {len(selected_phones)} phone images.")
print(f"Files saved to: {output_folder}")

Selected 50 cigar images and 50 phone images.
Files saved to: selected_data


In [3]:
import os
import random
import shutil

# Paths
base_folder = '/home/limonubuntu/Work/Limon/other_task/driver_anomaly/test/ob/2_class_test_data'
images_folder = os.path.join(base_folder, 'images')
labels_folder = os.path.join(base_folder, 'labels')
output_folder = 'selected_data'

output_images_folder = os.path.join(output_folder, 'images')
output_labels_folder = os.path.join(output_folder, 'labels')

os.makedirs(output_images_folder, exist_ok=True)
os.makedirs(output_labels_folder, exist_ok=True)

# Classes mapping
class_map = {'0': 'Cigar', '1': 'Phone'}

# Collect images by class
cigar_images = []
phone_images = []

# Function to convert YOLO format to integer bbox
def yolo_to_bbox(yolo_line, img_width, img_height):
    parts = yolo_line.strip().split()
    class_id = parts[0]
    x_center = float(parts[1]) * img_width
    y_center = float(parts[2]) * img_height
    width = float(parts[3]) * img_width
    height = float(parts[4]) * img_height

    xmin = int(x_center - width / 2)
    ymin = int(y_center - height / 2)
    xmax = int(x_center + width / 2)
    ymax = int(y_center + height / 2)

    return class_map[class_id], xmin, ymin, xmax, ymax

# Scan all label files
for label_file in os.listdir(labels_folder):
    if label_file.endswith('.txt'):
        label_path = os.path.join(labels_folder, label_file)
        image_file = label_file.replace('.txt', '.jpg')
        image_path = os.path.join(images_folder, image_file)

        if not os.path.exists(image_path):
            continue  # Skip if image file is missing

        with open(label_path, 'r') as f:
            lines = f.readlines()

        contains_cigar = any(line.startswith('0 ') for line in lines)
        contains_phone = any(line.startswith('1 ') for line in lines)

        if contains_cigar:
            cigar_images.append((image_path, label_path))

        if contains_phone:
            phone_images.append((image_path, label_path))

# Random selection of 50 each
selected_cigars = random.sample(cigar_images, min(50, len(cigar_images)))
selected_phones = random.sample(phone_images, min(50, len(phone_images)))

selected_images = selected_cigars + selected_phones

# Process selected files
for image_path, label_path in selected_images:
    # Copy images
    shutil.copy(image_path, os.path.join(output_images_folder, os.path.basename(image_path)))

    # Load image dimensions
    from PIL import Image
    with Image.open(image_path) as img:
        img_width, img_height = img.size

    # Convert and save new label format
    new_label_path = os.path.join(output_labels_folder, os.path.basename(label_path))

    with open(label_path, 'r') as f:
        lines = f.readlines()

    with open(new_label_path, 'w') as f_out:
        for line in lines:
            parts = line.strip().split()
            class_id = parts[0]

            if class_id not in class_map:
                continue

            class_name, xmin, ymin, xmax, ymax = yolo_to_bbox(line, img_width, img_height)

            # Write in the new format
            f_out.write(f"{class_name} {xmin} {ymin} {xmax} {ymax}\n")

print(f"Selected {len(selected_cigars)} cigar images and {len(selected_phones)} phone images.")
print(f"Files saved to: {output_folder}")


Selected 50 cigar images and 50 phone images.
Files saved to: selected_data
