In [1]:
import h5py
import numpy as np
import pandas as pd
import cv2
import matplotlib.pyplot as plt

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.multioutput import MultiOutputRegressor
from sklearn.ensemble import RandomForestRegressor
from sklearn.base import BaseEstimator, TransformerMixin

from tensorflow.keras.applications import ResNet50
from tensorflow.keras.applications.resnet50 import preprocess_input
from tensorflow.keras.models import Model

# -----------------------------
# 1. Functions for Patch Extraction and CNN Features
# -----------------------------
def extract_patch(image, center, patch_size):
    """
    Extract a square patch from the image centered at the given coordinate.
    Assumes image shape is (height, width, channels) and center is (x, y).
    """
    x, y = int(center[0]), int(center[1])
    half_size = patch_size // 2
    # Ensure indices are within bounds
    y_min = max(y - half_size, 0)
    y_max = min(y + half_size, image.shape[0])
    x_min = max(x - half_size, 0)
    x_max = min(x + half_size, image.shape[1])
    patch = image[y_min:y_max, x_min:x_max, :]
    return patch

def extract_cnn_features(patch, cnn_model):
    """
    Resize, preprocess, and extract CNN features from a given image patch.
    
    Parameters:
      patch (ndarray): The image patch to process.
      cnn_model (Model): The pre-trained CNN model for feature extraction.
      
    Returns:
      features (ndarray): Flattened feature vector from the CNN.
    """
    # Resize patch to the input size expected by ResNet50 (e.g., 224x224)
    patch_resized = cv2.resize(patch, (224, 224))
    patch_preprocessed = preprocess_input(np.expand_dims(patch_resized, axis=0))
    features = cnn_model.predict(patch_preprocessed, verbose=0)
    return features.flatten()

# -----------------------------
# 2. Custom Transformer to Extract CNN Features from Patches
# -----------------------------
class PatchFeatureExtractor(BaseEstimator, TransformerMixin):
    def __init__(self, image, patch_size, cnn_model):
        """
        Parameters:
          image (ndarray): The whole-slide HE image as a numpy array.
          patch_size (int): Size (in pixels) of the square patch to extract.
          cnn_model (Model): Pre-trained CNN model for feature extraction.
        """
        self.image = image
        self.patch_size = patch_size
        self.cnn_model = cnn_model

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        features = []
        # X is expected to be an array of shape (n_samples, 2) containing [x, y] coordinates.
        for coord in X:
            patch = extract_patch(self.image, coord, self.patch_size)
            feat = extract_cnn_features(patch, self.cnn_model)
            features.append(feat)
        return np.array(features)

# -----------------------------
# 3. Pipeline Class for the Elucidata Challenge
# -----------------------------
class CellTypePipeline:
    """
    Pipeline for loading data, extracting image patch features using a CNN,
    training a multi-output regression model, and generating a submission file.
    """
    
    def __init__(self, h5_file_path, patch_size=64):
        self.h5_file_path = h5_file_path
        self.patch_size = patch_size
        self.train_spot_tables = {}
        self.train_images = {}
        self.cell_type_columns = None
        self.cnn_model = None  # To be initialized
        self.feature_extractor_pipeline = None

    def initialize_cnn_model(self):
        """
        Initialize a pre-trained ResNet50 model (without top layers) for feature extraction.
        """
        base_model = ResNet50(weights='imagenet', include_top=False, pooling='avg')
        self.cnn_model = Model(inputs=base_model.input, outputs=base_model.output)
        print("CNN feature extractor initialized.")

    def load_train_data(self):
        """
        Load training spot data from the H5 file and store each slide as a DataFrame.
        """
        with h5py.File(self.h5_file_path, "r") as f:
            train_spots = f["spots/Train"]
            for slide_name in train_spots.keys():
                spot_array = np.array(train_spots[slide_name])
                df = pd.DataFrame(spot_array)
                self.train_spot_tables[slide_name] = df
        print("Training spot data loaded successfully.")
        
    def load_train_images(self):
        """
        Load training HE images from the H5 file.
        Adjust the key if your H5 file uses a different naming convention.
        """
        with h5py.File(self.h5_file_path, "r") as f:
            # Adjust key if necessary. For example, try f["images/Train"] if "Images/Train" is not found.
            train_imgs = f["images/Train"]
            for slide_name in train_imgs.keys():
                image_array = np.array(train_imgs[slide_name])
                self.train_images[slide_name] = image_array
        print("Training images loaded successfully.")

    def load_test_data(self, slide_id):
        """
        Load test spot data for a given slide.
        """
        with h5py.File(self.h5_file_path, "r") as f:
            test_spots = f["spots/Test"]
            if slide_id not in test_spots:
                raise ValueError(f"Slide {slide_id} not found in test spot data.")
            spot_array = np.array(test_spots[slide_id])
            test_df = pd.DataFrame(spot_array)
        print(f"Test spot data for slide {slide_id} loaded successfully.")
        return test_df

    def load_test_image(self, slide_id):
        """
        Load test HE image for a given slide.
        """
        with h5py.File(self.h5_file_path, "r") as f:
            test_imgs = f["images/Test"]
            if slide_id not in test_imgs:
                raise ValueError(f"Slide {slide_id} not found in test images.")
            image_array = np.array(test_imgs[slide_id])
        print(f"Test image for slide {slide_id} loaded successfully.")
        return image_array

    def prepare_training_set(self, slide_id='S_1'):
        """
        Prepare training features and targets using image patches for a given slide.
        Uses the HE image to extract patches and then CNN features.
        """
        if slide_id not in self.train_spot_tables:
            raise ValueError(f"Slide {slide_id} not found in training spot data.")
        if slide_id not in self.train_images:
            raise ValueError(f"Slide {slide_id} image not loaded.")
            
        df = self.train_spot_tables[slide_id]
        # Assume first two columns are coordinates and the rest are cell type abundances.
        feature_cols = ['x', 'y']
        target_cols = [col for col in df.columns if col not in feature_cols]
        self.cell_type_columns = target_cols
        
        # Extract coordinates (for patch extraction)
        X_coords = df[feature_cols].values.astype(float)
        # Cell type abundance targets
        y = df[target_cols].values.astype(float)
        
        # Build a feature extractor pipeline that will extract CNN features from each patch.
        he_image = self.train_images[slide_id]
        patch_extractor = PatchFeatureExtractor(he_image, self.patch_size, self.cnn_model)
        self.feature_extractor_pipeline = Pipeline([
            ('patch_extractor', patch_extractor),
            ('scaler', StandardScaler())
        ])
        # Extract features for training
        X_features = self.feature_extractor_pipeline.fit_transform(X_coords)
        print(f"Extracted CNN features for slide {slide_id}.")
        return X_features, y

    def build_regression_pipeline(self):
        """
        Build and return a regression pipeline that uses the pre-extracted CNN features.
        """
        pipeline = Pipeline([
            ('regressor', MultiOutputRegressor(RandomForestRegressor(n_estimators=100, random_state=42)))
        ])
        return pipeline

    def train(self, X, y):
        """
        Train the regression model on the provided features and targets.
        """
        reg_pipeline = self.build_regression_pipeline()
        reg_pipeline.fit(X, y)
        print("Regression model training complete.")
        return reg_pipeline

    def predict(self, reg_model, X_test):
        """
        Predict cell type abundances on test features.
        """
        predictions = reg_model.predict(X_test)
        return predictions

    def create_submission(self, test_df, predictions, submission_filename="submission.csv"):
        """
        Create a submission CSV file with predicted cell type abundances.
        """
        pred_df = pd.DataFrame(predictions, columns=self.cell_type_columns, index=test_df.index)
        pred_df.insert(0, 'ID', pred_df.index)
        pred_df.to_csv(submission_filename, index=False)
        print(f"Submission file '{submission_filename}' created!")

# -----------------------------
# 4. Example Usage
# -----------------------------
if __name__ == "__main__":
    # Path to the provided H5 data file
    h5_file_path = "/kaggle/input/el-hackathon-2025/elucidata_ai_challenge_data.h5"
    
    # Initialize the pipeline with desired patch size (in pixels)
    pipeline_obj = CellTypePipeline(h5_file_path, patch_size=64)
    
    # Initialize the CNN feature extractor (ResNet50)
    pipeline_obj.initialize_cnn_model()
    
    # Load training spots and images
    pipeline_obj.load_train_data()
    pipeline_obj.load_train_images()
    
    # Prepare training features and targets from one slide (e.g., 'S_1')
    X_train, y_train = pipeline_obj.prepare_training_set(slide_id='S_1')
    
    # Train regression model on extracted CNN features
    reg_model = pipeline_obj.train(X_train, y_train)
    
    # Load test data and image for slide S_7 (as per challenge description)
    test_df = pipeline_obj.load_test_data(slide_id='S_7')
    test_image = pipeline_obj.load_test_image(slide_id='S_7')
    
    # Build a feature extractor for test slide using its HE image
    test_patch_extractor = PatchFeatureExtractor(test_image, pipeline_obj.patch_size, pipeline_obj.cnn_model)
    test_feature_pipeline = Pipeline([
        ('patch_extractor', test_patch_extractor),
        ('scaler', StandardScaler())
    ])
    X_test_coords = test_df[['x', 'y']].values.astype(float)
    X_test_features = test_feature_pipeline.fit_transform(X_test_coords)
    
    # Predict cell type abundances for test data
    predictions = pipeline_obj.predict(reg_model, X_test_features)
    
    # Create submission file
    pipeline_obj.create_submission(test_df, predictions, submission_filename="submission.csv")


Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5
[1m94765736/94765736[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step
CNN feature extractor initialized.
Training spot data loaded successfully.
Training images loaded successfully.
Extracted CNN features for slide S_1.
Regression model training complete.
Test spot data for slide S_7 loaded successfully.
Test image for slide S_7 loaded successfully.
Submission file 'submission.csv' created!
