##1.0 Environment Setup

In [None]:
# Simple Fixed Version - Run this cell first
!apt-get update -qq > /dev/null
!apt-get install -y openjdk-11-jdk-headless -qq > /dev/null
!pip install pyspark==3.5.0 opencv-python-headless -q

W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
dataproc-spark-connect 0.8.3 requires pyspark[connect]~=3.5.1, but you have pyspark 3.5.0 which is incompatible.[0m[31m
[0m

##2.0 Mount Drive

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


##3.0 Import and Configuration

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
import cv2, os, shutil, numpy as np
from PIL import Image
import logging
from pathlib import Path

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Configuration
PATCHES_DIR = '/content/drive/MyDrive/Colab Notebooks/WSI-MIL-Pipeline/data/patches_normalized'
OUT_DIR = '/content/drive/MyDrive/Colab Notebooks/WSI-MIL-Pipeline/data/patches'
os.makedirs(OUT_DIR, exist_ok=True)

##4.0 Tissue Fraction Function0

In [None]:
def tissue_fraction_improved(path):
    """Calculate tissue fraction with comprehensive error handling"""
    try:
        if not os.path.exists(path):
            return 0.0

        img = cv2.imread(path)
        if img is None or img.size == 0:
            return 0.0

        # Convert to HSV and calculate tissue area
        hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
        # More robust tissue detection
        saturation = hsv[:,:,1]
        value = hsv[:,:,2]

        # Combined criteria for tissue detection
        tissue_mask = (value < 240) & (saturation > 10)
        return float(np.mean(tissue_mask))

    except Exception as e:
        logger.warning(f"Failed to process {path}: {e}")
        return 0.0

##5.0 Distributed File Operation Function

In [None]:
def copy_files_distributed(row_iterator):
    """Process files in distributed manner"""
    for row in row_iterator:
        try:
            source_path = row.path
            slide_name = row.slide
            dest_dir = Path(OUT_DIR) / slide_name
            dest_dir.mkdir(parents=True, exist_ok=True)

            # Copy file (consider shutil.copy2 for metadata preservation)
            shutil.copy2(source_path, dest_dir)

        except Exception as e:
            logger.error(f"Failed to copy {source_path}: {e}")

##6.0 Main Orchestration Logic Funtion

In [None]:
def main_improved():
    """Improved main function with better error handling and performance"""

    # Validate input directory
    if not os.path.exists(PATCHES_DIR):
        raise ValueError(f"Input directory {PATCHES_DIR} does not exist")

    spark = SparkSession.builder \
        .master('local[*]') \
        .appName('TissueSegmentation') \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .getOrCreate()

    try:
        # Build DataFrame more efficiently
        rows = []
        for slide in os.listdir(PATCHES_DIR):
            slide_dir = os.path.join(PATCHES_DIR, slide)
            if not os.path.isdir(slide_dir):
                continue

            for fn in os.listdir(slide_dir):
                if fn.lower().endswith(('.png', '.jpg', '.jpeg', '.tiff')):
                    full_path = os.path.join(slide_dir, fn)
                    rows.append((slide, full_path))

        if not rows:
            logger.warning("No patch files found!")
            return

        df = spark.createDataFrame(rows, ['slide', 'path'])

        # Register UDF with better error handling
        tissue_udf = udf(tissue_fraction_improved, DoubleType())

        # Filter and process
        filtered_df = df.filter(tissue_udf(df.path) > 0.05)

        # Process in distributed manner
        filtered_df.foreachPartition(copy_files_distributed)

        # Log statistics
        total_count = df.count()
        filtered_count = filtered_df.count()
        logger.info(f"Processed {total_count} patches, kept {filtered_count} "
                   f"({filtered_count/total_count*100:.1f}%)")

    except Exception as e:
        logger.error(f"Pipeline failed: {e}")
        raise

    finally:
        spark.stop()

##7.0 Execution Block

In [None]:
if __name__ == '__main__':
    main_improved()