In [9]:
import os
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import re
import cv2
import glob

# Set the root directory
root_dir = "merged_data"
# Helper function to get the last frame of the last video in the task/Video directory
def get_last_frame(video_dir):
    video_files = glob.glob(os.path.join(video_dir, "video_*.mp4"))
    if not video_files:
        return None
    # Sort videos by numeric x value
    video_files.sort(key=lambda x: int(os.path.basename(x).split("_")[-1].split(".")[0]))
    last_video = video_files[-1]

    # Open the last video and grab the last frame
    cap = cv2.VideoCapture(last_video)
    if not cap.isOpened():
        print(f"Failed to open video: {last_video}")
        return None

    frame = None
    while True:
        ret, current_frame = cap.read()
        if not ret:
            break
        frame = current_frame  # Keep updating until the last frame

    cap.release()

    if frame is not None:
        frame_path = os.path.join(video_dir, "last_frame.jpg")
        cv2.imwrite(frame_path, frame)  # Save the frame to a temporary file
        return frame_path
    return None

# Collect all successful tasks
tasks = []
for x_dir in os.listdir(root_dir):
    x_path = os.path.join(root_dir, x_dir)
    status_file = os.path.join(x_path, "status.txt")
    video_dir = os.path.join(x_path, "task/Video")

    if os.path.isdir(x_path) and os.path.isfile(status_file) and os.path.isdir(video_dir):
        with open(status_file, "r") as file:
            if file.read().strip() == "success":
                last_frame_path = get_last_frame(video_dir)
                if last_frame_path:
                    tasks.append((x_dir, last_frame_path, status_file))

def show_images_inline(tasks):
    for task in tasks:
        img = mpimg.imread(task[1])
        plt.imshow(img)
        plt.title(f"Task {task[0]}")
        plt.axis('off')
        plt.show()

print(len(tasks))
show_images_inline(tasks)



# Merge folders

In [4]:
import os
import shutil

def merge_folders(folder1, folder2, target_folder):
    # Ensure the target folder exists
    os.makedirs(target_folder, exist_ok=True)

    # Helper function to get the next available integer name
    def get_next_name(existing_names):
        i = 1
        while str(i) in existing_names:
            i += 1
        return str(i)

    # Collect all subfolder names already in the target folder
    existing_names = set(os.listdir(target_folder))

    # Process each folder
    for folder in [folder1, folder2]:
        for subfolder in os.listdir(folder):
            source_path = os.path.join(folder, subfolder)
            if os.path.isdir(source_path):  # Ensure it's a subfolder
                if subfolder in existing_names:
                    # Resolve conflict by renaming
                    new_name = get_next_name(existing_names)
                    existing_names.add(new_name)
                    target_path = os.path.join(target_folder, new_name)
                else:
                    existing_names.add(subfolder)
                    target_path = os.path.join(target_folder, subfolder)

                # Move the subfolder
                shutil.move(source_path, target_path)
                print(f"Moved '{source_path}' to '{target_path}'")

# Example usage
folder1 = "./successful"
folder2 = "./successful2"
target_folder = "./merged_data"

merge_folders(folder1, folder2, target_folder)

# Delete failed exeriments from directory

In [None]:
import os
import shutil

# Set the root directory
root_dir = "merged_data"

# Function to remove failed sessions or directories without a status.txt file
def clean_directories(root_dir):
    removed_sessions = []
    removed_empty_dirs = []
    for x_dir in os.listdir(root_dir):
        x_path = os.path.join(root_dir, x_dir)
        status_file = os.path.join(x_path, "status.txt")

        if os.path.isdir(x_path):
            if os.path.isfile(status_file):
                with open(status_file, "r") as file:
                    status = file.read().strip()
                    if status == "fail":
                        shutil.rmtree(x_path)  # Remove the entire directory
                        removed_sessions.append(x_dir)
                        print(f"Removed failed session directory: {x_path}")
            else:
                shutil.rmtree(x_path)  # Remove the directory without status.txt
                removed_empty_dirs.append(x_dir)
                print(f"Removed directory without status.txt: {x_path}")

    if not removed_sessions and not removed_empty_dirs:
        print("No failed session directories or directories without status.txt found.")
    else:
        print(f"Removed {len(removed_sessions)} failed session(s) and {len(removed_empty_dirs)} directory(ies) without status.txt.")

# Execute the function
clean_directories(root_dir)


# Show all keyframes

In [None]:
import os
import matplotlib.pyplot as plt
import matplotlib.image as mpimg

# Set the root directory
root_dir = "candidate"

# Collect all tasks with extracted frames
tasks_with_frames = []
for x_dir in os.listdir(root_dir):
    x_path = os.path.join(root_dir, x_dir)
    frame_dir = os.path.join(x_path, "task/extracted_frames_Video")

    if os.path.isdir(frame_dir):
        frame_files = sorted([os.path.join(frame_dir, f) for f in os.listdir(frame_dir) if f.startswith("frame_") and f.endswith(".jpg")])
        if frame_files:
            tasks_with_frames.append((x_dir, frame_files))

def display_frames(tasks_with_frames):
    """Displays frames for all tasks."""
    for task_name, frames in tasks_with_frames:
        print(f"Displaying frames for task: {task_name}")
        fig, axes = plt.subplots(1, len(frames), figsize=(15, 5))
        for i, frame in enumerate(frames):
            img = mpimg.imread(frame)
            if len(frames) > 1:
                axes[i].imshow(img)
                axes[i].axis('off')
                axes[i].set_title(f"Frame {i}")
            else:
                # Handle case with single frame
                axes.imshow(img)
                axes.axis('off')
                axes.set_title(f"Frame {i}")
        plt.suptitle(f"Task: {task_name}")
        plt.show()

# Display frames
display_frames(tasks_with_frames)
