# File management

In [None]:
# write script to check every image in a given directory and use PIL to see how many bands it has (RGB  vs RGBA)
import os
def check_image_bands(directory):
	for filename in os.listdir(directory):
		if filename.endswith('.jpg') or filename.endswith('.png'):
			image_path = os.path.join(directory, filename)
			try:
				with Image.open(image_path) as img:
					bands = img.getbands()
					print(f"{filename}: {bands}")
			except Exception as e:
				print(f"Error processing {filename}: {e}")

# check_image_bands('/home/data/raw/kaggle_v0/hammer/images')

In [None]:
suffixes = ['none', 'box', 'gaussian', 'poisson', 'motion']
# given a folder source_dir containing .txt files with bounding boxes in darknet format, and a folder images_source containing images
# copy these to a new folder except edit the .txt files to only include bounding boxes of certain classes (0,1)
import glob, os
def filter_bboxes_by_classes(source_dir, output_dir, classes_to_keep):
	if not os.path.exists(output_dir):
		os.makedirs(output_dir)
	
	for label_file in glob.glob(os.path.join(source_dir, '*.txt')):
		with open(label_file, 'r') as f:
			lines = f.readlines()
		
		filtered_lines = []
		for line in lines:
			parts = line.strip().split()
			if len(parts) > 0 and int(parts[0]) in classes_to_keep:
				filtered_lines.append(line)
		
		# if filtered_lines:
		# Write the filtered bounding boxes to a new file
		for suffix in suffixes:
			output_label_file = label_file.replace(source_dir, output_dir).replace('.txt', f'_{suffix}.txt')
			if not os.path.exists(output_label_file):
				with open(output_label_file, 'w') as f:
					f.writelines(filtered_lines)
			else:
				print(f"Skipping {output_label_file} as it already exists.")

# filter_bboxes_by_classes('/home/data/processed/kaggle-cnp-v0/synthetic-75965-unsplit/labels-orig',
# 						 '/home/data/processed/kaggle-cnp-v0/synthetic-75965-unsplit/labels',
# 						 [0, 1])

In [None]:
# split the train/images and train/labels files from a given folder into val and test as well
import random
import shutil
import os, glob

random.seed(0)
def split_cnp_into_train_val_test(source_dir, dest_parent_dir=None, train_ratio=0.7, val_ratio=0.2, test_ratio=0.1):
	"""
	Splits the images and labels in the source directory into train, val, and test sets.
	The source directory should contain 'images' and 'labels' subdirectories.
	"""
	if dest_parent_dir is None:
		dest_parent_dir = source_dir
	if os.path.exists(dest_parent_dir):
		raise FileExistsError(f"Destination parent directory {dest_parent_dir} already exists. Not risking the overwrite.")

	images_dir = os.path.join(source_dir, 'images')
	labels_dir = os.path.join(source_dir, 'labels')

	images = sorted(glob.glob(os.path.join(images_dir, '*')))
	labels = sorted(glob.glob(os.path.join(labels_dir, '*.txt')))

	image_ids = list(set([os.path.basename(img).split('_')[0] for img in images]))
	random.shuffle(image_ids)
	train_size = int(len(image_ids) * train_ratio)
	val_size = int(len(image_ids) * val_ratio)
	test_size = int(len(image_ids) * test_ratio)
	print(f'Total unique images {len(image_ids)}, train: {train_size}, val: {val_size}, test: {test_size}')
	train_image_ids = image_ids[:train_size]
	val_image_ids = image_ids[train_size:train_size + val_size]
	test_image_ids = image_ids[train_size + val_size:train_size + val_size + test_size]

	for split in ['train', 'val', 'test']:
		dest_dir = os.path.join(dest_parent_dir, split, 'images')
		label_dest_dir = os.path.join(dest_parent_dir, split, 'labels')
		
		os.makedirs(dest_dir, exist_ok=False) # raise error if directory exists
		os.makedirs(label_dest_dir, exist_ok=False) # raise error if directory exists

	for img, label in zip(images, labels):
		img_id = os.path.basename(img).split('_')[0]
		if img_id in train_image_ids:
			split = 'train'
		elif img_id in val_image_ids:
			split = 'val'
		elif img_id in test_image_ids:
			split = 'test'
		else:
			continue

		dest_dir = os.path.join(dest_parent_dir, split, 'images')
		label_dest_dir = os.path.join(dest_parent_dir, split, 'labels')

		shutil.copy(img, os.path.join(dest_dir, os.path.basename(img)))
		shutil.copy(label, os.path.join(label_dest_dir, os.path.basename(label)))

# split_cnp_into_train_val_test('/home/data/processed/kaggle-cnp-v0/synthetic-75965-unsplit', 
# 					 dest_parent_dir='/home/data/processed/kaggle-cnp-v0/synthetic-75965',
# 					 train_ratio = 0.7, val_ratio = 0.2, test_ratio = 0.1)


Total unique images 15193, train: 10635, val: 3038, test: 1519


In [1]:
import random
import shutil
import os, glob

random.seed(0)
def split_data_into_train_val_test(source_dir, dest_parent_dir=None, train_ratio=0.7, val_ratio=0.2, test_ratio=0.1):
	"""
	Splits the images and labels in the source directory into train, val, and test sets.
	The source directory should contain 'images' and 'labels' subdirectories.
	"""
	if dest_parent_dir is None:
		dest_parent_dir = source_dir

	images_dir = os.path.join(source_dir, 'images')
	labels_dir = os.path.join(source_dir, 'labels')

	images = sorted(glob.glob(os.path.join(images_dir, '*')))
	labels = sorted(glob.glob(os.path.join(labels_dir, '*.txt')))

	train_size = int(len(images) * train_ratio)
	val_size = int(len(images) * val_ratio)
	test_size = int(len(images) * test_ratio)
	size = {'train': train_size, 'val': val_size, 'test': test_size}
	print(f'Total unique images {len(images)}, train: {train_size}, val: {val_size}, test: {test_size}')
	train_image_ids = images[:train_size]
	val_image_ids = images[train_size:train_size + val_size]
	test_image_ids = images[train_size + val_size:train_size + val_size + test_size]

	for split in ['train', 'val', 'test']:
		if size[split] > 0:
			dest_dir = os.path.join(dest_parent_dir, split, 'images')
			label_dest_dir = os.path.join(dest_parent_dir, split, 'labels')
		
			os.makedirs(dest_dir, exist_ok=False) # raise error if directory exists
			os.makedirs(label_dest_dir, exist_ok=False) # raise error if directory exists

	for img, label in zip(images, labels):
		if img in train_image_ids:
			split = 'train'
		elif img in val_image_ids:
			split = 'val'
		elif img in test_image_ids:
			split = 'test'
		else:
			continue

		dest_dir = os.path.join(dest_parent_dir, split, 'images')
		label_dest_dir = os.path.join(dest_parent_dir, split, 'labels')

		shutil.copy(img, os.path.join(dest_dir, os.path.basename(img)))
		shutil.copy(label, os.path.join(label_dest_dir, os.path.basename(label)))


split_data_into_train_val_test('/home/data/processed/mech_hammer_screwdriver/train/',
					 dest_parent_dir='/home/data/processed/mech_hammer_screwdriver-100train/',
					 train_ratio = 0.1, val_ratio = 0, test_ratio = 0)

# split_data_into_train_val_test('/home/data/raw/objects365',
# 					 dest_parent_dir='/home/data/processed/kaggle_v0/cnp_vik-mid-orig-labels',
# 					 train_ratio = 0.0, val_ratio = 2/3, test_ratio = 1/3)


Total unique images 946, train: 94, val: 0, test: 0


In [None]:
# given a directory containing text files in darnket format, i want you to change class label 82 to 30 and 115 to 45
def change_class_labels_in_darknet_format(directory, class_mappings):
	"""
	Change class labels in darknet format files in the specified directory.
	
	Args:
		directory (str): Path to the directory containing .txt files.
		class_mappings (dict): Dictionary mapping old class labels to new class labels.
		(key should be whatever faulty label you have, value should be the correct O365 label)
	"""
	for filename in glob.glob(os.path.join(directory, '*.txt')):
		with open(filename, 'r') as file:
			lines = file.readlines()
		
		with open(filename, 'w') as file:
			for line in lines:
				parts = line.strip().split()
				if (len(parts) > 0) and (int(parts[0]) in class_mappings):
					parts[0] = str(class_mappings[int(parts[0])])
					file.write(' '.join(parts) + '\n')				

# change_class_labels_in_darknet_format('/home/data/processed/kaggle_v0/cnp_vik-mid-orig-labels/val/labels', 
									#   {115: 292, 61: 286})

In [None]:
import os,glob
# theres a folder containing text files of the form [numeric_id]_[none/box/blur/...].txt
# for each numeric id count how many files there are and print the id and count
def count_files_by_id(directory):
	"""
	Count the number of files for each numeric ID in the specified directory.
	
	Args:
		directory (str): Path to the directory containing text files.
	"""
	id_counts = {}
	
	total_file_count = 0
	for filename in glob.glob(os.path.join(directory, '*')):
		total_file_count += 1
		if not filename.endswith('.txt'):
			print(f"Skipping non-txt file: {filename}")
			continue
		base_name = os.path.basename(filename)
		numeric_id = base_name.split('_')[0]
		
		if numeric_id not in id_counts:
			id_counts[numeric_id] = 0
		id_counts[numeric_id] += 1
	
	for numeric_id, count in id_counts.items():
		if count != 5:
			print(f"ID: {numeric_id}, Count: {count}")
	print(f'Found {total_file_count} files in total.')
count_files_by_id('/home/data/processed/kaggle_v0/cnp_vik_full-synthetic-only/labels')

Found 15365 files in total.
