In [1]:
import os
import zipfile
import shutil
import json
from pathlib import Path
from collections import defaultdict
import datetime

class DefectDatasetMerger:
    def __init__(self, base_dir="."):
        self.base_dir = Path(base_dir)
        self.extract_dir = self.base_dir / "extracted_datasets"
        self.merged_dir = self.base_dir / "final_merged_dataset"
        self.metadata_dir = self.base_dir / "metadata"
        
        # Dataset configurations
        self.datasets = {
            "pcb-dataset": {"zip": "pcb-dataset.zip", "extracted": None},
            "neu-cls": {"zip": "neu-cls.zip", "extracted": None},
            "mvtec-ad": {"zip": "mvtec-ad.zip", "extracted": None}
        }
        
        # Supported image extensions
        self.image_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif'}
        
    def create_directories(self):
        """Create necessary directories"""
        self.extract_dir.mkdir(exist_ok=True)
        self.merged_dir.mkdir(exist_ok=True)
        self.metadata_dir.mkdir(exist_ok=True)
        print("‚úì Created directory structure")
    
    def extract_datasets(self):
        """Extract all dataset zip files"""
        print("\n" + "="*60)
        print("STEP 1: EXTRACTING DATASETS")
        print("="*60)
        
        for dataset_name, info in self.datasets.items():
            zip_path = self.base_dir / info["zip"]
            extract_path = self.extract_dir / dataset_name
            
            if not zip_path.exists():
                print(f"‚ö† Warning: {info['zip']} not found. Skipping...")
                continue
            
            print(f"\nüì¶ Extracting {dataset_name}...")
            
            # Remove existing extraction if present
            if extract_path.exists():
                shutil.rmtree(extract_path)
            
            extract_path.mkdir(exist_ok=True)
            
            # Extract zip file
            with zipfile.ZipFile(zip_path, 'r') as zip_ref:
                zip_ref.extractall(extract_path)
            
            self.datasets[dataset_name]["extracted"] = extract_path
            print(f"‚úì Extracted to: {extract_path}")
        
        print("\n‚úì All datasets extracted successfully!")
    
    def count_images(self, directory):
        """Recursively count images in a directory"""
        image_count = 0
        class_counts = defaultdict(int)
        
        for root, dirs, files in os.walk(directory):
            for file in files:
                if Path(file).suffix.lower() in self.image_extensions:
                    image_count += 1
                    # Try to determine class from parent directory
                    parent = Path(root).name
                    class_counts[parent] += 1
        
        return image_count, dict(class_counts)
    
    def generate_individual_metadata(self):
        """Generate metadata for individual datasets before merging"""
        print("\n" + "="*60)
        print("STEP 2: GENERATING INDIVIDUAL DATASET METADATA")
        print("="*60)
        
        individual_metadata = {}
        
        for dataset_name, info in self.datasets.items():
            if info["extracted"] is None:
                continue
            
            print(f"\nüìä Analyzing {dataset_name}...")
            
            total_images, class_counts = self.count_images(info["extracted"])
            
            metadata = {
                "dataset_name": dataset_name,
                "extraction_path": str(info["extracted"]),
                "total_images": total_images,
                "num_classes": len(class_counts),
                "classes": class_counts,
                "analysis_timestamp": datetime.datetime.now().isoformat()
            }
            
            individual_metadata[dataset_name] = metadata
            
            print(f"  Total Images: {total_images}")
            print(f"  Number of Classes: {len(class_counts)}")
            print(f"  Top 5 Classes: {dict(list(sorted(class_counts.items(), key=lambda x: x[1], reverse=True))[:5])}")
        
        # Save individual metadata
        metadata_file = self.metadata_dir / "individual_datasets_metadata.json"
        with open(metadata_file, 'w') as f:
            json.dump(individual_metadata, f, indent=2)
        
        print(f"\n‚úì Individual metadata saved to: {metadata_file}")
        
        return individual_metadata
    
    def merge_datasets(self):
        """Merge all datasets into a unified structure"""
        print("\n" + "="*60)
        print("STEP 3: MERGING DATASETS")
        print("="*60)
        
        # Create merged directory structure
        if self.merged_dir.exists():
            shutil.rmtree(self.merged_dir)
        self.merged_dir.mkdir(exist_ok=True)
        
        merge_stats = {
            "total_images_copied": 0,
            "datasets_merged": [],
            "file_mapping": []
        }
        
        for dataset_name, info in self.datasets.items():
            if info["extracted"] is None:
                continue
            
            print(f"\nüìÅ Merging {dataset_name}...")
            dataset_dir = self.merged_dir / dataset_name
            dataset_dir.mkdir(exist_ok=True)
            
            images_copied = 0
            
            # Walk through extracted directory and copy all images
            for root, dirs, files in os.walk(info["extracted"]):
                for file in files:
                    if Path(file).suffix.lower() in self.image_extensions:
                        src_path = Path(root) / file
                        
                        # Create relative path structure
                        rel_path = src_path.relative_to(info["extracted"])
                        dst_path = dataset_dir / rel_path
                        
                        # Create destination directory if needed
                        dst_path.parent.mkdir(parents=True, exist_ok=True)
                        
                        # Copy file
                        shutil.copy2(src_path, dst_path)
                        images_copied += 1
                        
                        # Record mapping
                        merge_stats["file_mapping"].append({
                            "source": str(src_path),
                            "destination": str(dst_path),
                            "dataset": dataset_name
                        })
            
            merge_stats["total_images_copied"] += images_copied
            merge_stats["datasets_merged"].append({
                "name": dataset_name,
                "images_copied": images_copied
            })
            
            print(f"  ‚úì Copied {images_copied} images")
        
        print(f"\n‚úì Total images merged: {merge_stats['total_images_copied']}")
        
        return merge_stats
    
    def generate_merged_metadata(self, merge_stats, individual_metadata):
        """Generate metadata for merged dataset"""
        print("\n" + "="*60)
        print("STEP 4: GENERATING MERGED DATASET METADATA")
        print("="*60)
        
        total_images, class_counts = self.count_images(self.merged_dir)
        
        merged_metadata = {
            "merged_dataset_path": str(self.merged_dir),
            "merge_timestamp": datetime.datetime.now().isoformat(),
            "total_images": total_images,
            "num_datasets": len(merge_stats["datasets_merged"]),
            "datasets_included": merge_stats["datasets_merged"],
            "total_classes": len(class_counts),
            "class_distribution": class_counts,
            "individual_datasets_summary": {}
        }
        
        # Add individual dataset summaries
        for dataset_name, metadata in individual_metadata.items():
            merged_metadata["individual_datasets_summary"][dataset_name] = {
                "original_images": metadata["total_images"],
                "original_classes": metadata["num_classes"]
            }
        
        # Save merged metadata
        metadata_file = self.metadata_dir / "merged_dataset_metadata.json"
        with open(metadata_file, 'w') as f:
            json.dump(merged_metadata, f, indent=2)
        
        print(f"\nüìä Merged Dataset Statistics:")
        print(f"  Total Images: {total_images}")
        print(f"  Number of Datasets: {len(merge_stats['datasets_merged'])}")
        print(f"  Total Classes: {len(class_counts)}")
        
        print(f"\n‚úì Merged metadata saved to: {metadata_file}")
        
        return merged_metadata
    
    def generate_summary_report(self, individual_metadata, merged_metadata):
        """Generate a human-readable summary report"""
        print("\n" + "="*60)
        print("STEP 5: GENERATING SUMMARY REPORT")
        print("="*60)
        
        report_file = self.metadata_dir / "merge_summary_report.txt"
        
        with open(report_file, 'w', encoding='utf-8') as f:
            f.write("="*70 + "\n")
            f.write("DEFECT DETECTION DATASET MERGE REPORT\n")
            f.write("="*70 + "\n\n")
            f.write(f"Generated: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
            
            # Individual datasets section
            f.write("-"*70 + "\n")
            f.write("INDIVIDUAL DATASETS (BEFORE MERGE)\n")
            f.write("-"*70 + "\n\n")
            
            total_before = 0
            for dataset_name, metadata in individual_metadata.items():
                f.write(f"Dataset: {dataset_name}\n")
                f.write(f"  Location: {metadata['extraction_path']}\n")
                f.write(f"  Total Images: {metadata['total_images']}\n")
                f.write(f"  Number of Classes: {metadata['num_classes']}\n")
                f.write(f"  Classes: {', '.join(metadata['classes'].keys())}\n\n")
                total_before += metadata['total_images']
            
            f.write(f"TOTAL IMAGES (ALL DATASETS): {total_before}\n\n")
            
            # Merged dataset section
            f.write("-"*70 + "\n")
            f.write("MERGED DATASET (AFTER MERGE)\n")
            f.write("-"*70 + "\n\n")
            f.write(f"Location: {merged_metadata['merged_dataset_path']}\n")
            f.write(f"Total Images: {merged_metadata['total_images']}\n")
            f.write(f"Number of Datasets Merged: {merged_metadata['num_datasets']}\n")
            f.write(f"Total Classes: {merged_metadata['total_classes']}\n\n")
            
            f.write("Images per Dataset:\n")
            for dataset in merged_metadata['datasets_included']:
                f.write(f"  - {dataset['name']}: {dataset['images_copied']} images\n")
            
            f.write("\n" + "-"*70 + "\n")
            f.write("VERIFICATION\n")
            f.write("-"*70 + "\n\n")
            f.write(f"Images before merge: {total_before}\n")
            f.write(f"Images after merge:  {merged_metadata['total_images']}\n")
            
            if total_before == merged_metadata['total_images']:
                f.write("‚úì SUCCESS: All images preserved during merge!\n")
            else:
                f.write(f"‚ö† WARNING: Image count mismatch! Difference: {abs(total_before - merged_metadata['total_images'])}\n")
            
            f.write("\n" + "="*70 + "\n")
        
        print(f"‚úì Summary report saved to: {report_file}")
        
        # Print summary to console
        print("\n" + "="*60)
        print("FINAL SUMMARY")
        print("="*60)
        print(f"\n‚úì Individual datasets analyzed: {len(individual_metadata)}")
        print(f"‚úì Total images merged: {merged_metadata['total_images']}")
        print(f"‚úì Merged dataset location: {self.merged_dir}")
        print(f"‚úì Metadata location: {self.metadata_dir}")
        print("\n" + "="*60)
    
    def run(self):
        """Execute the complete extraction and merging process"""
        print("\n" + "="*60)
        print("DEFECT DETECTION DATASET MERGER")
        print("="*60)
        print("\nStarting dataset extraction and merging process...")
        
        # Step 1: Create directories
        self.create_directories()
        
        # Step 2: Extract datasets
        self.extract_datasets()
        
        # Step 3: Generate individual metadata
        individual_metadata = self.generate_individual_metadata()
        
        # Step 4: Merge datasets
        merge_stats = self.merge_datasets()
        
        # Step 5: Generate merged metadata
        merged_metadata = self.generate_merged_metadata(merge_stats, individual_metadata)
        
        # Step 6: Generate summary report
        self.generate_summary_report(individual_metadata, merged_metadata)
        
        print("\n‚úì Process completed successfully!")
        print(f"\nYour merged dataset is ready at: {self.merged_dir}")
        print(f"Check the metadata folder for detailed reports: {self.metadata_dir}")


if __name__ == "__main__":
    # Initialize and run the merger
    merger = DefectDatasetMerger(base_dir=".")
    merger.run()


DEFECT DETECTION DATASET MERGER

Starting dataset extraction and merging process...
‚úì Created directory structure

STEP 1: EXTRACTING DATASETS

üì¶ Extracting pcb-dataset...
‚úì Extracted to: extracted_datasets\pcb-dataset

üì¶ Extracting neu-cls...
‚úì Extracted to: extracted_datasets\neu-cls

üì¶ Extracting mvtec-ad...
‚úì Extracted to: extracted_datasets\mvtec-ad

‚úì All datasets extracted successfully!

STEP 2: GENERATING INDIVIDUAL DATASET METADATA

üìä Analyzing pcb-dataset...
  Total Images: 1396
  Number of Classes: 13
  Top 5 Classes: {'Open_circuit': 116, 'Short': 116, 'Spurious_copper': 116, 'Open_circuit_rotation': 116, 'Short_rotation': 116}

üìä Analyzing neu-cls...
  Total Images: 1800
  Number of Classes: 1
  Top 5 Classes: {'images': 1800}

üìä Analyzing mvtec-ad...
  Total Images: 6612
  Number of Classes: 49
  Top 5 Classes: {'good': 4096, 'color': 186, 'scratch': 182, 'crack': 168, 'combined': 110}

‚úì Individual metadata saved to: metadata\individual_dat