In [5]:
import glob
import random
import shutil
import os
from concurrent.futures import ThreadPoolExecutor

def copy_file(file_path, dest_dir):
    """Function to copy a single file."""
    try:
        shutil.copy(file_path, dest_dir)
        print(f"Copied: {file_path}")
    except Exception as e:
        print(f"Error copying {file_path}: {e}")

def random_copy_images_multithread(src_dir, dest_dir, num_samples, num_threads=4):
    # Get a list of all JPG files in the source directory
    jpg_files = glob.glob(os.path.join(src_dir, "*.jpg"))

    # Ensure there are enough files to sample
    if len(jpg_files) < num_samples:
        print(f"Not enough files to sample. Only {len(jpg_files)} available.")
        return
    # Randomly select the specified number of files
    selected_files = random.sample(jpg_files, num_samples)

    # Create destination directory if it doesn't exist
    os.makedirs(dest_dir, exist_ok=True)

    # Use ThreadPoolExecutor for multithreading
    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        # Submit copy tasks for each selected file
        for file_path in selected_files:
            executor.submit(copy_file, file_path, dest_dir)


In [6]:
import glob
daatapath = "/home/amir/project/bitbucket/ESD/CLEANED/SQ3"
labels = glob.glob(daatapath+"/*")
labels

['/home/amir/project/bitbucket/ESD/CLEANED/SQ3/T001FR',
 '/home/amir/project/bitbucket/ESD/CLEANED/SQ3/T001BACK',
 '/home/amir/project/bitbucket/ESD/CLEANED/SQ3/T001DRONE',
 '/home/amir/project/bitbucket/ESD/CLEANED/SQ3/T001FL',
 '/home/amir/project/bitbucket/ESD/CLEANED/SQ3/T001FRONT']

In [7]:
# # Usage example
# source_directory = "/home/amir/project/bitbucket/ESD/CLEANED"
# destination_directory = "/home/amir/project/bitbucket/ESD/TRAINDATA"
# number_of_samples = 3096
# threads = 8
# random_copy_images_multithread(source_directory, destination_directory, number_of_samples, threads)

In [8]:
import os
import shutil
import random
from pathlib import Path

def train_test_split(source_dir, output_dir, val_ratio=0.2):
    source_dir = Path(source_dir)
    train_dir = Path(output_dir) / "train"
    val_dir = Path(output_dir) / "val"
    
    for class_folder in source_dir.iterdir():
        if class_folder.is_dir():
            class_name = class_folder.name
            images = list(class_folder.glob("*"))  # List all files
            random.shuffle(images)
            
            split_idx = int(len(images) * (1 - val_ratio))
            train_images = images[:split_idx]
            val_images = images[split_idx:]
            
            # Create class directories in train and val
            (train_dir / class_name).mkdir(parents=True, exist_ok=True)
            (val_dir / class_name).mkdir(parents=True, exist_ok=True)
            
            # Move images to respective folders
            for img in train_images:
                shutil.copy(img, train_dir / class_name / img.name)
            for img in val_images:
                shutil.copy(img, val_dir / class_name / img.name)
    
    print("Dataset split completed!")

# Example usage

daatapath = "/home/amir/project/bitbucket/ESD/CLEANED/SQ3"
output = "TRAINING_DATA"
train_test_split(daatapath, output, val_ratio=0.3)


Dataset split completed!
