In [1]:
%pwd

'/home/gourav/ML/PCB Fault Detection/research'

In [2]:
import os
os.chdir("../")

In [3]:
%pwd

'/home/gourav/ML/PCB Fault Detection'

In [6]:
from dataclasses import dataclass
from pathlib import Path

@dataclass(frozen=True)
class DataProcessingConfig:
    source_dir: Path
    raw_data_dir: Path
    processed_data_dir: Path
    split_data_dir: Path

In [7]:
# importing constants and utils
from src.YOLO_V8.constants import *
from src.YOLO_V8.utils.common import read_yaml, create_directories

In [8]:
class ConfigurationManager:
    def __init__(
        self,
        config_filepath = CONFIG_FILE_PATH,
        params_filepath = PARAMS_FILE_PATH):

        self.config = read_yaml(config_filepath)
        self.params = read_yaml(params_filepath)

    def get_data_process_config(self) -> DataProcessingConfig:
        config = self.config.data_processing
        params = self.params

        data_processing_config = DataProcessingConfig(
            source_dir= config.source_dir,
            raw_data_dir= config.raw_data_dir,
            processed_data_dir= config.processed_data_dir,
            split_data_dir = config.split_data_dir,
            image_size= params.image_size
        )

        return data_processing_config

In [10]:
import pybboxes as pbx

ModuleNotFoundError: No module named 'pybboxes'

In [9]:
import os
import shutil
import pybboxes as pbx
from src.YOLO_V8 import logger
from src.YOLO_V8.utils.common import get_size

ModuleNotFoundError: No module named 'pybboxes'

In [12]:
def join_path(root_path, join_file):
    return os.path.join(root_path, join_file)

In [29]:
class DataProcessing:
    def __init__(self, config: DataProcessingConfig):
        self.config = config
        self.image_width = int(self.config.image_size.split(",")[0])
        self.image_height = int(self.config.image_size.split(",")[1])
    
    def get_raw_data(self):
        source_dir = self.config.source_dir
        target_dir = self.config.raw_data_dir
        create_directories([target_dir])

        for groups in os.listdir(source_dir):
            groups_path = os.path.join(source_dir, groups)
            
            for folder in os.listdir(groups_path):

                files_source_dir = join_path(groups_path,folder)

                files_target_dir = join_path(target_dir, folder)

                create_directories([files_target_dir])

                files = os.listdir(files_source_dir)

                print(f"Start Getting Raw {folder} --->   ")
                self.copy_files(files, files_source_dir, files_target_dir, file_extension = None)

                
                        

    
    def get_proper_coordinate_line(self, line):
        coordinates = line.split(" ")
        box_class = coordinates[-1].split("\n")[0]

        coordinates = list(map(int,coordinates[:-1]))

        W = self.image_width
        H = self.image_height

        converted_coordinates = pbx.convert_bbox(coordinates, from_type="voc", to_type="yolo",
                                                image_size=(W,H))
        converted_coordinates = list(map(str,converted_coordinates))

        converted_coordinates.insert(0,box_class)
        result_coordinate_line = " ".join(converted_coordinates) + "\n"
        
        return result_coordinate_line
    
    def get_processed_labels(self, labels_source_dir, labels_target_dir):

        labels_file_count = 0
        for labels_file in os.listdir(labels_source_dir):
            labels_source_file_path = join_path(labels_source_dir, labels_file)
            labels_target_file_path = join_path(labels_target_dir, labels_file)

            if not os.path.isdir(labels_target_file_path) or get_size(labels_source_file_path) != get_size(labels_target_file_path):
                label_source_file = open(labels_source_file_path, "r")
                label_target_file = open(labels_target_file_path, "w")
                save_lines = []
                for lines in label_source_file.readlines():
                    processed_line = self.get_proper_coordinate_line(lines)
                    save_lines.append(processed_line)
                
                label_target_file.writelines(save_lines)
                label_source_file.close()
                label_target_file.close()

                labels_file_count = labels_file_count + 1
        
        print(f"{labels_file_count} labels files created from {labels_source_dir} to {labels_target_dir}.")

    
    def processed_data(self):
        source_dir = self.config.raw_data_dir 
        target_dir = self.config.processed_data_dir 

        create_directories([target_dir])

        for folder in os.listdir(source_dir):

            if folder == "images":
                image_source_dir = join_path(source_dir, folder)
                image_target_dir = join_path(target_dir, folder)
                
                create_directories([image_target_dir])

                image_files = os.listdir(image_source_dir)

                print(f"Start Getting Processed {folder} -----> ")

                self.copy_files(image_files, image_source_dir, image_target_dir, file_extension= None)

    

                

            if folder == "labels":
                labels_source_dir = join_path(source_dir, folder)
                labels_target_dir = join_path(target_dir, folder)

                create_directories([labels_target_dir])

                print(f"Start Getting Processed {folder} -----> ")
                
                self.get_processed_labels(labels_source_dir, labels_target_dir)


        
    def get_file_names(self, directory):
        file_name_list = []
        for files in os.listdir(directory):
            file_name = files.split(".")[0]
            file_name_list.append(file_name)
    
        return file_name_list
    
    def copy_files(self, files_names : list, source_dir, target_dir, file_extension):
    
        file_count = 0
        for files in files_names:
            
            if file_extension:
                files = files + file_extension

            file_source_path = join_path(source_dir, files)
            file_target_path = join_path(target_dir, files)

            if not os.path.isfile(file_target_path):
                shutil.copy(file_source_path, file_target_path)
                file_count = file_count +1

        print(f"{file_count} Files copied from {source_dir} to {target_dir}.")
    
    def split_data(self):
        source_dir = self.config.processed_data_dir 
        target_dir = self.config.split_data
        
        files_names = self.get_file_names(join_path(source_dir, "images"))

        random.shuffle(files_names)

        train_dir = join_path(target_dir, "train_data")
        val_dir = join_path(target_dir, "val_data")
        
        create_directories([target_dir,train_dir, val_dir])
        train_size = int((len(files_names) * 0.8))
        val_size = int(len(files_names) * 0.2)

        print(f"Traning files = {train_size} and Validation files = {val_size}")

        for folders in os.listdir(source_dir):
            
            if folders == "images":
                file_extension = ".jpg"
            if folders == "labels":
                file_extension = ".txt"
            
            files_source_dir = join_path(source_dir, folders)
            files_train_dir = join_path(train_dir, folders)
            files_val_dir = join_path(val_dir, folders)

            create_directories([files_train_dir, files_val_dir])

            print(f"Splitting {folders}")


            print(f"Creating training files ----")
            self.copy_files(files_names[:train_size], files_source_dir, files_train_dir, file_extension)

            print(f"Creating validation files ------")
            self.copy_files(files_names[train_size:], files_source_dir, files_val_dir, file_extension)





In [31]:
try:
    config = ConfigurationManager()
    data_processing_config = config.get_data_process_config()
    data_processing = DataProcessing(data_processing_config)
    data_processing.get_raw_data()
    data_processing.processed_data()
    data_processing.split_data()

except Exception as e:
    print(e)


In [None]:
# for files in os.listdir(files_folder):
                #     files_source_path = join_path(files_folder, files)
                #     files_target_path = join_path(target_folder, files)
                #     if not os.path.isfile(files_target_path):
                #         shutil.copy(files_source_path, files_target_path)
        
        # images = len(os.listdir(join_path(target_dir,"images")))
        # labels = len(os.listdir(join_path(target_dir,"labels")))
        # print(f"Raw data has been collected.")
        # print(f"Total Images = {images}, Total Labels File= {labels}")
        # logger.info(f"Raw data has been collected.")
        # logger.info(f"Total Images = {images}, Total Labels File= {labels}")