In [1]:
import os
%pwd

'c:\\09_AHFID\\CervicalAI-Screen\\notebook'

In [2]:
os.chdir('../')
%pwd

'c:\\09_AHFID\\CervicalAI-Screen'

In [3]:
# 01_data_ingestion.ipynb
# Download and organize cervical image data for semi-supervised learning

import os
import zipfile
import gdown
from pathlib import Path
import shutil
from collections import Counter
import json
import urllib.request
import tarfile

In [4]:
# Configuration
ARTIFACTS_DIR = Path("artifacts")
RAW_DATA_DIR = ARTIFACTS_DIR / "raw_data"
PROCESSED_DATA_DIR = ARTIFACTS_DIR / "via_cervix_ssl"

# Create directories
ARTIFACTS_DIR.mkdir(exist_ok=True)
RAW_DATA_DIR.mkdir(exist_ok=True)
PROCESSED_DATA_DIR.mkdir(exist_ok=True)

print(f"Working directory: {Path.cwd()}")
print(f"Artifacts directory: {ARTIFACTS_DIR.absolute()}")

Working directory: c:\09_AHFID\CervicalAI-Screen
Artifacts directory: c:\09_AHFID\CervicalAI-Screen\artifacts


In [5]:
def download_from_gdrive(file_id, output_path):
    """Download file from Google Drive using gdown"""
    url = f"https://drive.google.com/uc?id={file_id}"
    
    try:
        print(f"Downloading data from Google Drive...")
        print(f"URL: {url}")
        gdown.download(url, str(output_path), quiet=False)
        print(f"Download completed: {output_path}")
        return True
    except Exception as e:
        print(f"Download failed: {e}")
        print("Please check:")
        print("1. File ID is correct")
        print("2. File sharing is enabled (Anyone with the link can view)")
        print("3. Internet connection is stable")
        return False

In [6]:
def download_from_url(url, output_path):
    """Download file from direct URL"""
    try:
        print(f"Downloading from URL: {url}")
        urllib.request.urlretrieve(url, output_path)
        print(f"Download completed: {output_path}")
        return True
    except Exception as e:
        print(f"Download failed: {e}")
        return False

In [7]:
def extract_archive(archive_path, extract_to):
    """Extract zip or tar archive"""
    try:
        print(f"Extracting {archive_path} to {extract_to}")
        
        if archive_path.suffix.lower() == '.zip':
            with zipfile.ZipFile(archive_path, 'r') as zip_ref:
                zip_ref.extractall(extract_to)
        elif archive_path.suffix.lower() in ['.tar', '.tar.gz', '.tgz']:
            with tarfile.open(archive_path, 'r:*') as tar_ref:
                tar_ref.extractall(extract_to)
        else:
            raise ValueError(f"Unsupported archive format: {archive_path.suffix}")
            
        print("Extraction completed")
        return True
    except Exception as e:
        print(f"Extraction failed: {e}")
        return False 

In [8]:
def copy_images(source_dir, target_dir, class_name):
    """Copy images from source to target directory"""
    count = 0
    valid_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif'}
    
    if not source_dir.exists():
        print(f"Warning: {source_dir} does not exist")
        return 0
    
    target_dir.mkdir(parents=True, exist_ok=True)
    
    for img_path in source_dir.rglob("*"):
        if img_path.is_file() and img_path.suffix.lower() in valid_extensions:
            target_path = target_dir / img_path.name
            
            # Handle duplicate names
            counter = 1
            original_target = target_path
            while target_path.exists():
                stem = original_target.stem
                suffix = original_target.suffix
                target_path = target_dir / f"{stem}_{counter}{suffix}"
                counter += 1
            
            try:
                shutil.copy2(img_path, target_path)
                count += 1
            except Exception as e:
                print(f"Error copying {img_path}: {e}")
    
    print(f"Copied {count} images from {class_name}")
    return count

In [9]:
def find_data_directory(extract_dir):
    """Find the actual data directory within extracted files"""
    possible_dirs = []
    
    # Look for common data directory patterns
    for item in extract_dir.rglob("*"):
        if item.is_dir():
            dir_name = item.name.lower()
            # Common patterns for medical image datasets
            if any(pattern in dir_name for pattern in ['cervix', 'cervical', 'via', 'data', 'images']):
                possible_dirs.append(item)
    
    # Check for directories containing class folders
    for item in extract_dir.rglob("*"):
        if item.is_dir():
            subdirs = [d.name for d in item.iterdir() if d.is_dir()]
            # Look for medical classification patterns
            if any(cls in subdirs for cls in ['Negative', 'Positive', 'Suspicious', 'Normal', 'Abnormal']):
                possible_dirs.append(item)
    
    if not possible_dirs:
        # Default to extraction directory itself
        possible_dirs = [extract_dir]
    
    # Return the most likely candidate (prefer deeper nested directories)
    return max(possible_dirs, key=lambda x: len(x.parts))

In [10]:
def organize_ssl_structure(source_dir, target_dir):
    """Organize data into SSL structure with binary classification"""
    
    # Create SSL directory structure
    labeled_dir = target_dir / "labeled"
    unlabeled_dir = target_dir / "unlabeled"
    
    labeled_dir.mkdir(parents=True, exist_ok=True)
    unlabeled_dir.mkdir(parents=True, exist_ok=True)
    
    # Binary classification: Negative vs Positive (includes suspicious)
    negative_dir = labeled_dir / "Negative"
    positive_dir = labeled_dir / "Positive"
    
    negative_dir.mkdir(parents=True, exist_ok=True)
    positive_dir.mkdir(parents=True, exist_ok=True)
    
    # Find actual data directory
    actual_source = find_data_directory(source_dir)
    print(f"Using source directory: {actual_source}")
    
    # Map source folders to target classes
    folder_mapping = {
        # Original folder name -> target class
        'Negative': 'Negative',
        'Normal': 'Negative',
        'negative': 'Negative',
        'normal': 'Negative',
        'Positive': 'Positive', 
        'positive': 'Positive',
        'Suspicious cancer': 'Positive',
        'Suspicious': 'Positive',
        'suspicious': 'Positive',
        'Cancer': 'Positive',
        'cancer': 'Positive',
        'Abnormal': 'Positive',
        'abnormal': 'Positive'
    }
    
    total_labeled = 0
    class_counts = Counter()
    
    # Process labeled data
    for item in actual_source.iterdir():
        if item.is_dir():
            folder_name = item.name
            target_class = folder_mapping.get(folder_name)
            
            if target_class == 'Negative':
                count = copy_images(item, negative_dir, folder_name)
                class_counts['Negative'] += count
                total_labeled += count
            elif target_class == 'Positive':
                count = copy_images(item, positive_dir, folder_name)
                class_counts['Positive'] += count
                total_labeled += count
            elif folder_name.lower() in ['unlabeled', 'unlabelled', 'unknown']:
                # Handle unlabeled data
                count = copy_images(item, unlabeled_dir, folder_name)
                class_counts['Unlabeled'] += count
            else:
                print(f"Unknown folder: {folder_name} - skipping")
    
    # Handle case where all images are in a single directory
    if total_labeled == 0:
        print("No class folders found. Checking for direct image files...")
        image_files = [f for f in actual_source.iterdir() 
                      if f.is_file() and f.suffix.lower() in {'.jpg', '.jpeg', '.png', '.bmp', '.tiff'}]
        
        if image_files:
            print(f"Found {len(image_files)} images without labels - treating as unlabeled")
            for img_file in image_files:
                try:
                    shutil.copy2(img_file, unlabeled_dir / img_file.name)
                    class_counts['Unlabeled'] += 1
                except Exception as e:
                    print(f"Error copying {img_file}: {e}")
    
    return class_counts

In [11]:
def print_data_summary(data_dir, class_counts):
    """Print comprehensive data summary"""
    print("\n" + "="*60)
    print("DATA ORGANIZATION SUMMARY")
    print("="*60)
    
    labeled_dir = data_dir / "labeled"
    unlabeled_dir = data_dir / "unlabeled"
    
    # Count actual files
    print("LABELED DATA:")
    actual_labeled = 0
    for class_dir in labeled_dir.iterdir():
        if class_dir.is_dir():
            count = len([f for f in class_dir.iterdir() if f.is_file()])
            print(f"  {class_dir.name}: {count} images")
            actual_labeled += count
    
    print(f"  Total labeled: {actual_labeled} images")
    
    # Count unlabeled data  
    print("\nUNLABELED DATA:")
    actual_unlabeled = 0
    if unlabeled_dir.exists():
        actual_unlabeled = len([f for f in unlabeled_dir.iterdir() if f.is_file()])
        print(f"  Unlabeled: {actual_unlabeled} images")
    else:
        print("  No unlabeled data found")
    
    total_images = actual_labeled + actual_unlabeled
    print(f"\nTOTAL DATASET: {total_images} images")
    
    if actual_labeled > 0 and actual_unlabeled > 0:
        ratio = actual_unlabeled / actual_labeled
        print(f"Unlabeled/Labeled ratio: {ratio:.1f}:1")
        print(f"SSL data ratio: {ratio:.1f}x more unlabeled data")
        
        # SSL suitability assessment
        if ratio >= 5:
            print("✓ Excellent ratio for semi-supervised learning")
        elif ratio >= 3:
            print("✓ Very good ratio for semi-supervised learning") 
        elif ratio >= 1:
            print("✓ Good ratio for semi-supervised learning")
        else:
            print("⚠ Limited unlabeled data - SSL benefits may be modest")
    elif actual_unlabeled == 0:
        print("⚠ No unlabeled data found - will use supervised learning only")
    
    # Binary classification assessment
    print(f"\nBINARY CLASSIFICATION SETUP:")
    print(f"  Class 0 (Negative): Normal/healthy cervical images")
    print(f"  Class 1 (Positive): Abnormal/suspicious cervical images")
    print(f"  This binary setup is optimal for screening applications")

In [12]:
def validate_ssl_structure(data_dir):
    """Validate the SSL data structure"""
    print("\nVALIDATING SSL DATA STRUCTURE...")
    
    labeled_dir = data_dir / "labeled"
    unlabeled_dir = data_dir / "unlabeled"
    
    issues = []
    warnings = []
    
    # Check labeled data structure
    if not labeled_dir.exists():
        issues.append("Missing labeled directory")
    else:
        required_classes = ["Negative", "Positive"]
        for cls in required_classes:
            cls_dir = labeled_dir / cls
            if not cls_dir.exists():
                issues.append(f"Missing class directory: {cls}")
            else:
                count = len([f for f in cls_dir.iterdir() if f.is_file()])
                if count == 0:
                    issues.append(f"Empty class directory: {cls}")
                elif count < 10:
                    warnings.append(f"Very few samples in {cls}: {count} images")
    
    # Check unlabeled data
    if not unlabeled_dir.exists():
        warnings.append("Missing unlabeled directory - SSL benefits limited")
    else:
        count = len([f for f in unlabeled_dir.iterdir() if f.is_file()])
        if count == 0:
            warnings.append("No unlabeled images found - SSL benefits limited")
        elif count < 50:
            warnings.append(f"Few unlabeled samples: {count} images")
    
    # Print results
    if issues:
        print("❌ CRITICAL ISSUES FOUND:")
        for issue in issues:
            print(f"  - {issue}")
        return False
    else:
        print("✅ Data structure validation passed")
        
    if warnings:
        print("⚠ WARNINGS:")
        for warning in warnings:
            print(f"  - {warning}")
    
    return True

In [13]:
def create_data_manifest(data_dir):
    """Create a detailed manifest of the organized data"""
    manifest = {
        "created_at": str(Path.cwd()),
        "data_structure": "semi_supervised_learning",
        "classification_type": "binary",
        "classes": {
            "0": "Negative (Normal/Healthy)",
            "1": "Positive (Abnormal/Suspicious)"
        },
        "directories": {
            "labeled": str(data_dir / "labeled"),
            "unlabeled": str(data_dir / "unlabeled")
        },
        "file_counts": {}
    }
    
    # Count files in each directory
    labeled_dir = data_dir / "labeled"
    unlabeled_dir = data_dir / "unlabeled"
    
    for class_dir in labeled_dir.iterdir():
        if class_dir.is_dir():
            count = len([f for f in class_dir.iterdir() if f.is_file()])
            manifest["file_counts"][class_dir.name] = count
    
    if unlabeled_dir.exists():
        count = len([f for f in unlabeled_dir.iterdir() if f.is_file()])
        manifest["file_counts"]["Unlabeled"] = count
    
    # Save manifest
    manifest_path = data_dir / "data_manifest.json"
    with open(manifest_path, "w") as f:
        json.dump(manifest, f, indent=2)
    
    print(f"Data manifest saved: {manifest_path}")
    return manifest

In [14]:
# Main execution function
def main():
    print("="*60)
    print("CERVICAL IMAGE DATA INGESTION FOR SSL")
    print("="*60)
    
    # Configuration options
    print("Data source options:")
    print("1. Google Drive file ID")
    print("2. Direct URL")
    print("3. Local file path")
    print("4. Skip download (data already extracted)")
    
    # For this script, you need to configure your data source:
    
    # Option 1: Google Drive
    GDRIVE_FILE_ID = "1Sw8aSal9R2Kh7PoD6Ma3QpXevCh2Kcrs" 
    
    # Option 2: Direct URL
    DIRECT_URL = "https://example.com/cervical_data.zip"  # Replace with actual URL
    
    # Option 3: Local file
    LOCAL_FILE = RAW_DATA_DIR / "cervical_data.zip"  # Place your file here
    
    # Choose your data source method
    download_success = False
    archive_path = None
    
    # Try Google Drive first (if ID is provided)
    if GDRIVE_FILE_ID != "YOUR_GOOGLE_DRIVE_FILE_ID_HERE":
        archive_path = RAW_DATA_DIR / "via-cervix.zip" # Corrected filename
        download_success = download_from_gdrive(GDRIVE_FILE_ID, archive_path)
    
    # Try direct URL if Google Drive failed
    elif DIRECT_URL != "https://example.com/cervical_data.zip":
        archive_path = RAW_DATA_DIR / "cervical_data_url.zip"
        download_success = download_from_url(DIRECT_URL, archive_path)
    
    # Try local file
    elif LOCAL_FILE.exists():
        archive_path = LOCAL_FILE
        download_success = True
        print(f"Using local file: {archive_path}")
    
    # Check for any existing archives
    else:
        print("No data source configured. Looking for existing files...")
        for pattern in ["*.zip", "*.tar", "*.tar.gz", "*.tgz"]:
            existing_files = list(RAW_DATA_DIR.glob(pattern))
            if existing_files:
                archive_path = existing_files[0]
                download_success = True
                print(f"Found existing archive: {archive_path}")
                break
    
    if not download_success or not archive_path:
        print("\n❌ No data source available. Please:")
        print("1. Set GDRIVE_FILE_ID to your Google Drive file ID, OR")
        print("2. Set DIRECT_URL to a direct download link, OR") 
        print("3. Place your data file in the raw_data directory")
        print("\nFor Google Drive:")
        print("- Share your file publicly (Anyone with link can view)")
        print("- Copy the file ID from the sharing URL")
        return False
    
    # Extract the archive
    extract_dir = RAW_DATA_DIR / "extracted"
    extract_dir.mkdir(exist_ok=True)
    
    if extract_archive(archive_path, extract_dir):
        print("Archive extracted successfully")
    else:
        print("❌ Failed to extract archive")
        return False
    
    # Organize data for SSL
    print("\nOrganizing data for semi-supervised learning...")
    class_counts = organize_ssl_structure(extract_dir, PROCESSED_DATA_DIR)
    
    if sum(class_counts.values()) == 0:
        print("❌ No images found in the extracted data")
        print("Please check the archive contents and directory structure")
        return False
    
    # Print summary
    print_data_summary(PROCESSED_DATA_DIR, class_counts)
    
    # Validate structure
    if not validate_ssl_structure(PROCESSED_DATA_DIR):
        print("❌ Data validation failed")
        return False
    
    # Create manifest
    manifest = create_data_manifest(PROCESSED_DATA_DIR)
    
    # Create metadata for next notebooks
    metadata = {
        "data_dir": str(PROCESSED_DATA_DIR),
        "labeled_dir": str(PROCESSED_DATA_DIR / "labeled"),
        "unlabeled_dir": str(PROCESSED_DATA_DIR / "unlabeled"),
        "classes": ["Negative", "Positive"],
        "num_classes": 2,
        "ssl_enabled": class_counts.get("Unlabeled", 0) > 0,
        "class_mapping": {
            "0": "Negative (Normal/Healthy)", 
            "1": "Positive (Abnormal/Suspicious)"
        },
        "file_counts": dict(class_counts),
        "binary_classification": True
    }
    
    metadata_path = ARTIFACTS_DIR / "data_metadata.json"
    with open(metadata_path, "w") as f:
        json.dump(metadata, f, indent=2)
    
    print(f"\nMetadata saved: {metadata_path}")
    
    print("\n" + "="*60)
    print("DATA INGESTION COMPLETED SUCCESSFULLY")
    print("="*60)
    print("✅ Data organized for semi-supervised learning")
    print("✅ Binary classification setup (Negative vs Positive)")
    print("✅ Ready for model preparation")
    print(f"\nNext step: Run 02_prepare_base_model.ipynb")
    
    return True

In [15]:
# Execute the main function
if __name__ == "__main__":
    success = main()
    
    if not success:
        print("\n" + "="*60)
        print("TROUBLESHOOTING GUIDE")
        print("="*60)
        print("If data ingestion failed, try:")
        print("1. Check your internet connection")
        print("2. Verify Google Drive file permissions")
        print("3. Ensure the archive contains cervical image folders")
        print("4. Check file formats (supported: ZIP, TAR, TAR.GZ)")
        print("5. Verify folder structure contains class directories")
else:
    print("Script loaded. Run main() to execute data ingestion.")

CERVICAL IMAGE DATA INGESTION FOR SSL
Data source options:
1. Google Drive file ID
2. Direct URL
3. Local file path
4. Skip download (data already extracted)
Downloading data from Google Drive...
URL: https://drive.google.com/uc?id=1Sw8aSal9R2Kh7PoD6Ma3QpXevCh2Kcrs


Downloading...
From (original): https://drive.google.com/uc?id=1Sw8aSal9R2Kh7PoD6Ma3QpXevCh2Kcrs
From (redirected): https://drive.google.com/uc?id=1Sw8aSal9R2Kh7PoD6Ma3QpXevCh2Kcrs&confirm=t&uuid=7dc23493-2bc6-44c9-807e-9582aebf99ac
To: c:\09_AHFID\CervicalAI-Screen\artifacts\raw_data\via-cervix.zip
100%|██████████| 138M/138M [08:31<00:00, 269kB/s] 


Download completed: artifacts\raw_data\via-cervix.zip
Extracting artifacts\raw_data\via-cervix.zip to artifacts\raw_data\extracted
Extraction completed
Archive extracted successfully

Organizing data for semi-supervised learning...
Using source directory: artifacts\raw_data\extracted\via-cervix
Copied 92 images from Negative
Copied 98 images from Positive
Copied 6377 images from Unlabeled

DATA ORGANIZATION SUMMARY
LABELED DATA:
  Negative: 92 images
  Positive: 98 images
  Total labeled: 190 images

UNLABELED DATA:
  Unlabeled: 6377 images

TOTAL DATASET: 6567 images
Unlabeled/Labeled ratio: 33.6:1
SSL data ratio: 33.6x more unlabeled data
✓ Excellent ratio for semi-supervised learning

BINARY CLASSIFICATION SETUP:
  Class 0 (Negative): Normal/healthy cervical images
  Class 1 (Positive): Abnormal/suspicious cervical images
  This binary setup is optimal for screening applications

VALIDATING SSL DATA STRUCTURE...
✅ Data structure validation passed
Data manifest saved: artifacts\via_c