In [14]:
import tarfile
import os

def extract_tar(tar_path, output_dir, delete_tar=False):
    """
    Extracts a tar file to the specified output directory.

    Parameters:
    - tar_path: str, path to the tar file.
    - output_dir: str, path to the directory where files should be extracted.
    - delete_tar: bool, if True, deletes the tar file after extraction.

    Returns:
    - Output directory
    """
    # Ensure the output directory exists
    os.makedirs(output_dir, exist_ok=True)

    # Open the tar file and extract its contents
    with tarfile.open(tar_path, 'r') as tar:
        tar.extractall(path=output_dir)
        print(f"Extracted '{tar_path}' to '{output_dir}'.")

    # Delete the tar file if specified
    if delete_tar:
        os.remove(tar_path)
        print(f"Deleted tar file: '{tar_path}'.")
    
    return output_dir

directory_path = extract_tar('../../../Downloads/833512_321721_10_new_fps.tar.gz', './extracted', delete_tar=False)


Extracted '../../../Downloads/833512_321721_10_new_fps.tar.gz' to './extracted'.


In [19]:
import os

# List all files and directories in the specified directory
croot = os.path.join(directory_path, os.listdir(directory_path)[0])

print(croot)


./extracted/10_new_fps


In [24]:
# Initialize an empty list to store folder paths
folder_paths = []

# List all items in the specified directory (croot)
for item in os.listdir(croot):
    item_path = os.path.join(croot, item)
    # Check if it's a directory
    if os.path.isdir(item_path):
        folder_paths.append(item_path)

# Print the list of folder paths
print(folder_paths)

['./extracted/10_new_fps/dataset 2024-10-14 14-08-46', './extracted/10_new_fps/dataset 2024-10-14 15-46-09']


In [26]:
for folder in folder_paths:
    # Create paths for video and annotation folders
    video_path = os.path.join(folder, 'video')
    annotation_path = os.path.join(folder, 'ann')

    # Initialize variables to hold file paths if found
    video_file_path = None
    annotation_file_path = None

    # Check if video_path exists and list its contents
    if os.path.exists(video_path):
        # Update video_file_path with the full path to the file inside the video directory
        video_files = os.listdir(video_path)
        if video_files:  # Check if there are any files in the directory
            video_file_path = os.path.join(video_path, video_files[0])  # Update to the first video file found

    # Check if annotation_path exists and list its contents
    if os.path.exists(annotation_path):
        # Update annotation_file_path with the full path to the file inside the annotation directory
        annotation_files = os.listdir(annotation_path)
        if annotation_files:  # Check if there are any files in the directory
            annotation_file_path = os.path.join(annotation_path, annotation_files[0])  # Update to the first annotation file found

    # Output the updated paths if they were found
    if video_file_path:
        print(f"Updated Video path: {video_file_path}")
    if annotation_file_path:
        print(f"Updated Annotation path: {annotation_file_path}")

    print()  # Print a newline for better readability

Updated Video path: ./extracted/10_new_fps/dataset 2024-10-14 14-08-46/video/001.mp4
Updated Annotation path: ./extracted/10_new_fps/dataset 2024-10-14 14-08-46/ann/001.mp4.json

Updated Video path: ./extracted/10_new_fps/dataset 2024-10-14 15-46-09/video/006.mp4
Updated Annotation path: ./extracted/10_new_fps/dataset 2024-10-14 15-46-09/ann/006.mp4.json



In [28]:
import tarfile
import os
import cv2
import json

def extract_tar(tar_path, output_dir, delete_tar=False):
    os.makedirs(output_dir, exist_ok=True)
    with tarfile.open(tar_path, 'r') as tar:
        tar.extractall(path=output_dir)
        print(f"Extracted '{tar_path}' to '{output_dir}'.")
    if delete_tar:
        os.remove(tar_path)
        print(f"Deleted tar file: '{tar_path}'.")
    return output_dir

def list_folders(directory):
    return [os.path.join(directory, item) for item in os.listdir(directory) 
            if os.path.isdir(os.path.join(directory, item))]

def find_video_and_annotation_paths(folder):
    video_path = os.path.join(folder, 'video')
    annotation_path = os.path.join(folder, 'ann')

    video_file_path = next((os.path.join(video_path, f) for f in os.listdir(video_path) if os.path.isfile(os.path.join(video_path, f))), None)
    annotation_file_path = next((os.path.join(annotation_path, f) for f in os.listdir(annotation_path) if os.path.isfile(os.path.join(annotation_path, f))), None)

    return video_file_path, annotation_file_path

def convert_to_yolo(exterior, img_width, img_height):
    x_min, y_min = exterior[0]
    x_max, y_max = exterior[1]
    center_x = (x_min + x_max) / 2 / img_width
    center_y = (y_min + y_max) / 2 / img_height
    bbox_width = (x_max - x_min) / img_width
    bbox_height = (y_max - y_min) / img_height
    return center_x, center_y, bbox_width, bbox_height

def process_video_annotations(video_path, annotation_path):
    with open(annotation_path) as f:
        annotations = json.load(f)

    os.makedirs('frames', exist_ok=True)
    os.makedirs('labels', exist_ok=True)

    cap = cv2.VideoCapture(video_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    height = annotations['size']['height']
    width = annotations['size']['width']

    class_mapping = {}
    for idx, obj in enumerate(annotations['objects']):
        if obj['classTitle'] not in class_mapping:
            class_mapping[obj['classTitle']] = idx

    for frame in annotations['frames']:
        frame_index = frame['index']
        if frame_index < total_frames:
            cap.set(cv2.CAP_PROP_POS_FRAMES, frame_index)
            ret, image = cap.read()
            if ret:
                frame_path = f"frames/frame_{frame_index:04d}.jpg"
                cv2.imwrite(frame_path, image)

                label_path = f"labels/frame_{frame_index:04d}.txt"
                with open(label_path, 'w') as label_file:
                    for figure in frame['figures']:
                        geometry = figure['geometry']['points']['exterior']
                        center_x, center_y, bbox_width, bbox_height = convert_to_yolo(geometry, width, height)

                        object_key = figure['objectKey']
                        for obj in annotations['objects']:
                            if obj['key'] == object_key:
                                class_id = class_mapping[obj['classTitle']]
                                break

                        label_file.write(f"{class_id} {center_x} {center_y} {bbox_width} {bbox_height}\n")

    cap.release()

def main(tar_path, output_dir):
    extracted_dir = extract_tar(tar_path, output_dir)

    croot = os.path.join(extracted_dir, os.listdir(extracted_dir)[0])
    print(f"Root folder: {croot}")

    folder_paths = list_folders(croot)
    print("Folder paths:", folder_paths)

    for folder in folder_paths:
        video_file_path, annotation_file_path = find_video_and_annotation_paths(folder)

        if video_file_path and annotation_file_path:
            print(f"Processing Video: {video_file_path}")
            print(f"Processing Annotation: {annotation_file_path}")
            process_video_annotations(video_file_path, annotation_file_path)

        print()  # Print a newline for better readability

# Example usage
if __name__ == "__main__":
    main('../../../Downloads/833512_321721_10_new_fps.tar.gz', './extracted')


Extracted '../../../Downloads/833512_321721_10_new_fps.tar.gz' to './extracted'.
Root folder: ./extracted/10_new_fps
Folder paths: ['./extracted/10_new_fps/dataset 2024-10-14 14-08-46', './extracted/10_new_fps/dataset 2024-10-14 15-46-09']
Processing Video: ./extracted/10_new_fps/dataset 2024-10-14 14-08-46/video/001.mp4
Processing Annotation: ./extracted/10_new_fps/dataset 2024-10-14 14-08-46/ann/001.mp4.json

Processing Video: ./extracted/10_new_fps/dataset 2024-10-14 15-46-09/video/006.mp4
Processing Annotation: ./extracted/10_new_fps/dataset 2024-10-14 15-46-09/ann/006.mp4.json



# Final Yolo Working Ipynb

In [34]:
import tarfile
import os
import cv2
import json
import shutil

def extract_tar(tar_path, output_dir, delete_tar=False):
    os.makedirs(output_dir, exist_ok=True)
    with tarfile.open(tar_path, 'r') as tar:
        print(f"Extracting '{tar_path}' to '{output_dir}'.")
        tar.extractall(path=output_dir)
        # print(f"Extracted '{tar_path}' to '{output_dir}'.")
    if delete_tar:
        print(f"Deleting tar file: '{tar_path}'.")
        os.remove(tar_path)
        # print(f"Deleted tar file: '{tar_path}'.")
    return output_dir

def list_folders(directory):
    return [os.path.join(directory, item) for item in os.listdir(directory) 
            if os.path.isdir(os.path.join(directory, item))]

def find_video_and_annotation_paths(folder):
    video_path = os.path.join(folder, 'video')
    annotation_path = os.path.join(folder, 'ann')

    video_file_path = next((os.path.join(video_path, f) for f in os.listdir(video_path) if os.path.isfile(os.path.join(video_path, f))), None)
    annotation_file_path = next((os.path.join(annotation_path, f) for f in os.listdir(annotation_path) if os.path.isfile(os.path.join(annotation_path, f))), None)

    return video_file_path, annotation_file_path

def convert_to_yolo(exterior, img_width, img_height):
    x_min, y_min = exterior[0]
    x_max, y_max = exterior[1]
    center_x = (x_min + x_max) / 2 / img_width
    center_y = (y_min + y_max) / 2 / img_height
    bbox_width = (x_max - x_min) / img_width
    bbox_height = (y_max - y_min) / img_height
    return center_x, center_y, bbox_width, bbox_height

def create_yolo_labels(video_path, annotation_path, output_base_path):
    os.makedirs(os.path.join(output_base_path, 'images'), exist_ok=True)
    os.makedirs(os.path.join(output_base_path, 'labels'), exist_ok=True)

    with open(annotation_path) as f:
        annotations = json.load(f)

    cap = cv2.VideoCapture(video_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    height = annotations['size']['height']
    width = annotations['size']['width']

    class_mapping = {obj['classTitle']: idx for idx, obj in enumerate(annotations['objects'])}

    for frame in annotations['frames']:
        frame_index = frame['index']
        if frame_index < total_frames:
            cap.set(cv2.CAP_PROP_POS_FRAMES, frame_index)
            ret, image = cap.read()
            if ret:
                frame_path = os.path.join(output_base_path, 'images', f"frame_{frame_index:04d}.jpg")
                cv2.imwrite(frame_path, image)

                label_path = os.path.join(output_base_path, 'labels', f"frame_{frame_index:04d}.txt")
                with open(label_path, 'w') as label_file:
                    for figure in frame['figures']:
                        geometry = figure['geometry']['points']['exterior']
                        center_x, center_y, bbox_width, bbox_height = convert_to_yolo(geometry, width, height)

                        object_key = figure['objectKey']
                        class_id = class_mapping.get(next(obj['classTitle'] for obj in annotations['objects'] if obj['key'] == object_key), -1)
                        if class_id != -1:
                            label_file.write(f"{class_id} {center_x} {center_y} {bbox_width} {bbox_height}\n")

    cap.release()

def compress_folder(folder_path):
    print(f"Compressing folder '{folder_path}' to '{folder_path}.zip'.")
    shutil.make_archive(folder_path, 'zip', folder_path)
    # print(f"Compressed folder '{folder_path}' to '{folder_path}.zip'.")

def main(tar_path, output_dir, separate_folders=False, delete_extracted=False, delete_yolo_annotation=False):
    extracted_dir = extract_tar(tar_path, output_dir)

    croot = os.path.join(extracted_dir, os.listdir(extracted_dir)[0])
    # print(f"Root folder: {croot}")

    folder_paths = list_folders(croot)
    # print("Folder paths:", folder_paths)

    total_videos = len(folder_paths)
    for idx, folder in enumerate(folder_paths, start=1):
        video_file_path, annotation_file_path = find_video_and_annotation_paths(folder)

        if video_file_path and annotation_file_path:
            print(f"Processing Video {idx} of {total_videos}")
            # print(f"Processing Annotation: {annotation_file_path}")

            # Determine output path
            if separate_folders:
                folder_name = os.path.basename(folder)
                output_base_path = os.path.join('yolo_annotation', folder_name)
            else:
                output_base_path = 'yolo_annotation'

            create_yolo_labels(video_file_path, annotation_file_path, output_base_path)

        # print()  # Print a newline for better readability

    # Compress the yolo annotation folder
    compress_folder('yolo_annotation')

    # Cleanup
    if delete_extracted:
        print(f"Deleting extracted folder: '{extracted_dir}'.")
        shutil.rmtree(extracted_dir)
        # print(f"Deleted extracted folder: '{extracted_dir}'.")

    if delete_yolo_annotation:
        print(f"Deleting yolo_annotation folder after compression.")
        shutil.rmtree('yolo_annotation')
        # print(f"Deleted yolo_annotation folder after compression.")


# Example usage
if __name__ == "__main__":
    main('../../../Downloads/833512_321721_10_new_fps.tar.gz', './extracted', separate_folders=True, delete_extracted=True, delete_yolo_annotation=True)


Extracting '../../../Downloads/833512_321721_10_new_fps.tar.gz' to './extracted'.
Processing Video 1 of 2
Processing Video 2 of 2
Compressing folder 'yolo_annotation' to 'yolo_annotation.zip'.
Deleting extracted folder: './extracted'.
Deleting yolo_annotation folder after compression.


# Final YOLO working cmd

In [35]:
import tarfile
import os
import cv2
import json
import shutil
import sys

def extract_tar(tar_path, output_dir, delete_tar=False):
    os.makedirs(output_dir, exist_ok=True)
    with tarfile.open(tar_path, 'r') as tar:
        print(f"Extracting '{tar_path}' to '{output_dir}'.")
        tar.extractall(path=output_dir)
    if delete_tar:
        print(f"Deleting tar file: '{tar_path}'.")
        os.remove(tar_path)
    return output_dir

def list_folders(directory):
    return [os.path.join(directory, item) for item in os.listdir(directory) 
            if os.path.isdir(os.path.join(directory, item))]

def find_video_and_annotation_paths(folder):
    video_path = os.path.join(folder, 'video')
    annotation_path = os.path.join(folder, 'ann')

    video_file_path = next((os.path.join(video_path, f) for f in os.listdir(video_path) if os.path.isfile(os.path.join(video_path, f))), None)
    annotation_file_path = next((os.path.join(annotation_path, f) for f in os.listdir(annotation_path) if os.path.isfile(os.path.join(annotation_path, f))), None)

    return video_file_path, annotation_file_path

def convert_to_yolo(exterior, img_width, img_height):
    x_min, y_min = exterior[0]
    x_max, y_max = exterior[1]
    center_x = (x_min + x_max) / 2 / img_width
    center_y = (y_min + y_max) / 2 / img_height
    bbox_width = (x_max - x_min) / img_width
    bbox_height = (y_max - y_min) / img_height
    return center_x, center_y, bbox_width, bbox_height

def create_yolo_labels(video_path, annotation_path, output_base_path):
    os.makedirs(os.path.join(output_base_path, 'images'), exist_ok=True)
    os.makedirs(os.path.join(output_base_path, 'labels'), exist_ok=True)

    with open(annotation_path) as f:
        annotations = json.load(f)

    cap = cv2.VideoCapture(video_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    height = annotations['size']['height']
    width = annotations['size']['width']

    class_mapping = {obj['classTitle']: idx for idx, obj in enumerate(annotations['objects'])}

    for frame in annotations['frames']:
        frame_index = frame['index']
        if frame_index < total_frames:
            cap.set(cv2.CAP_PROP_POS_FRAMES, frame_index)
            ret, image = cap.read()
            if ret:
                frame_path = os.path.join(output_base_path, 'images', f"frame_{frame_index:04d}.jpg")
                cv2.imwrite(frame_path, image)

                label_path = os.path.join(output_base_path, 'labels', f"frame_{frame_index:04d}.txt")
                with open(label_path, 'w') as label_file:
                    for figure in frame['figures']:
                        geometry = figure['geometry']['points']['exterior']
                        center_x, center_y, bbox_width, bbox_height = convert_to_yolo(geometry, width, height)

                        object_key = figure['objectKey']
                        class_id = class_mapping.get(next(obj['classTitle'] for obj in annotations['objects'] if obj['key'] == object_key), -1)
                        if class_id != -1:
                            label_file.write(f"{class_id} {center_x} {center_y} {bbox_width} {bbox_height}\n")

    cap.release()

def compress_folder(folder_path):
    print(f"Compressing folder '{folder_path}' to '{folder_path}.zip'.")
    shutil.make_archive(folder_path, 'zip', folder_path)

def main(tar_path, output_dir, separate_folders=False, delete_extracted=False, delete_yolo_annotation=False):
    extracted_dir = extract_tar(tar_path, output_dir)

    croot = os.path.join(extracted_dir, os.listdir(extracted_dir)[0])

    folder_paths = list_folders(croot)

    total_videos = len(folder_paths)
    for idx, folder in enumerate(folder_paths, start=1):
        video_file_path, annotation_file_path = find_video_and_annotation_paths(folder)

        if video_file_path and annotation_file_path:
            print(f"Processing Video {idx} of {total_videos}")

            # Determine output path
            if separate_folders:
                folder_name = os.path.basename(folder)
                output_base_path = os.path.join('yolo_annotation', folder_name)
            else:
                output_base_path = 'yolo_annotation'

            create_yolo_labels(video_file_path, annotation_file_path, output_base_path)

    # Compress the yolo annotation folder
    compress_folder('yolo_annotation')

    # Cleanup
    if delete_extracted:
        print(f"Deleting extracted folder: '{extracted_dir}'.")
        shutil.rmtree(extracted_dir)

    if delete_yolo_annotation:
        print(f"Deleting yolo_annotation folder after compression.")
        shutil.rmtree('yolo_annotation')

# Example usage from command line
if __name__ == "__main__":
    if len(sys.argv) < 6:
        print("Usage: python your_script.py <tar_path> <output_dir> <separate_folders> <delete_extracted> <delete_yolo_annotation>")
    else:
        tar_path = sys.argv[1]
        output_dir = sys.argv[2]
        separate_folders = sys.argv[3].lower() == 'true'  # Convert to boolean
        delete_extracted = sys.argv[4].lower() == 'true'  # Convert to boolean
        delete_yolo_annotation = sys.argv[5].lower() == 'true'  # Convert to boolean
        main(tar_path, output_dir, separate_folders, delete_extracted, delete_yolo_annotation)


Usage: python your_script.py <tar_path> <output_dir> <separate_folders> <delete_extracted> <delete_yolo_annotation>
