In [11]:
# 1. Installation and Setup

!pip install -q tensorflow==2.12.0
!pip install -q scikit-learn pandas numpy matplotlib pillow gradio opencv-python
!pip install -q "huggingface_hub>=0.14.1"


# 2. Initialization

import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import warnings
warnings.filterwarnings('ignore')


# 3. Synthetic Data Generation

import numpy as np
import cv2
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

def generate_synthetic_medical_images(num_samples=1000, img_size=(224, 224)):
    """Generate synthetic medical images with abnormalities"""
    X = np.zeros((num_samples, *img_size, 3))
    y = np.zeros(num_samples)
    
    for i in range(num_samples):
        # Base tissue texture
        X[i] = np.random.normal(0.5, 0.1, (*img_size, 3))
        
        # Add abnormalities (20% of samples)
        if i % 5 == 0:
            y[i] = 1
            center = (np.random.randint(50, img_size[0]-50), 
                     np.random.randint(50, img_size[1]-50))
            axes = (np.random.randint(15, 40), np.random.randint(15, 40))
            angle = np.random.randint(0, 180)
            color = (0.8, 0.1, 0.1)  # Reddish abnormality
            cv2.ellipse(X[i], center, axes, angle, 0, 360, color, -1)
            
            # Add spiculations
            for _ in range(np.random.randint(5, 15)):
                length = np.random.randint(5, 20)
                angle = np.random.randint(0, 360)
                end_point = (
                    int(center[0] + length * np.cos(np.radians(angle))),
                    int(center[1] + length * np.sin(np.radians(angle)))
                )
                cv2.line(X[i], center, end_point, color, 1)
        
        # Add realistic noise
        X[i] = cv2.addWeighted(X[i], 0.9, np.random.normal(0, 0.02, (*img_size, 3)), 0.1, 0)
        X[i] = np.clip(X[i], 0, 1)
    
    return X, y

# Generate and split data
X, y = generate_synthetic_medical_images()
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y)


# 4. Improved Model Architecture and Training

import tensorflow as tf
from tensorflow.keras import layers, models, applications, callbacks, regularizers

def build_improved_model(input_shape=(224, 224, 3)):
    """Enhanced model with better regularization and capacity"""
    inputs = layers.Input(shape=input_shape)
    
    # Using MobileNetV3Small for better efficiency
    base_model = applications.MobileNetV3Small(
        include_top=False,
        weights='imagenet',
        input_tensor=inputs,
        pooling='avg'
    )
    
    # Freeze base model layers
    base_model.trainable = False
    
    # Enhanced classification head
    x = base_model.output
    x = layers.Dense(256, activation='relu', 
                   kernel_regularizer=regularizers.l2(0.01))(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(1, activation='sigmoid')(x)
    
    return models.Model(inputs, outputs)

# Build and compile improved model
model = build_improved_model()

# Learning rate schedule
initial_learning_rate = 1e-4
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate,
    decay_steps=100,
    decay_rate=0.96,
    staircase=True)

# Custom metric callback that handles serialization
class SerializableHistory(callbacks.Callback):
    def __init__(self):
        super().__init__()
        self.history = {}
    
    def on_epoch_end(self, epoch, logs=None):
        logs = logs or {}
        for k, v in logs.items():
            self.history.setdefault(k, []).append(float(v))

# Create callback instance
history_callback = SerializableHistory()

# Compile the model
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=lr_schedule),
    loss='binary_crossentropy',
    metrics=[
        'accuracy',
        tf.keras.metrics.AUC(name='auc'),
        tf.keras.metrics.Precision(name='precision'),
        tf.keras.metrics.Recall(name='recall')
    ]
)

# Callbacks
callbacks_list = [
    callbacks.EarlyStopping(patience=10, monitor='val_auc', mode='max', restore_best_weights=True),
    callbacks.ReduceLROnPlateau(factor=0.5, patience=5, min_lr=1e-6),
    history_callback
]

# Calculate class weights
class_weight = {0: 1., 1: len(y_train[y_train==0]) / len(y_train[y_train==1])}

# Train model
history = model.fit(
    X_train, y_train,
    validation_data=(X_test, y_test),
    epochs=30,
    batch_size=32,
    callbacks=callbacks_list,
    class_weight=class_weight,
    verbose=1
)


# 5. Image Processing Pipeline

from skimage import exposure

class MedicalImagePreprocessor:
    def __init__(self, target_size=(224, 224)):
        self.target_size = target_size
        self.clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
    
    def preprocess(self, image):
        """Process any input image to model-compatible format"""
        # Convert to numpy array if needed
        if not isinstance(image, np.ndarray):
            image = np.array(image)
        
        # Handle different image formats
        if len(image.shape) == 2:  # Grayscale
            image = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)
        elif image.shape[2] == 4:  # RGBA
            image = cv2.cvtColor(image, cv2.COLOR_RGBA2RGB)
        
        # Resize to model's expected input size
        image = cv2.resize(image, self.target_size)
        
        # Convert to grayscale for processing
        gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
        
        # Enhancement pipeline
        enhanced = exposure.equalize_adapthist(gray)
        enhanced = self.clahe.apply(np.uint8(enhanced*255))
        enhanced = cv2.medianBlur(enhanced, 3)
        
        # Convert back to 3 channels
        processed = np.stack([enhanced]*3, axis=-1)
        
        return processed

preprocessor = MedicalImagePreprocessor(target_size=(224, 224))


# 6. Gradio Interface

import gradio as gr

def analyze_medical_scan(image, sensitivity=0.5):
    try:
        # Preprocess image
        processed = preprocessor.preprocess(image)
        
        # Prepare for model prediction
        img_array = np.expand_dims(processed, axis=0) / 255.0
        
        # Get prediction and ensure it's serializable
        pred = float(model.predict(img_array, verbose=0)[0][0])
        
        # Generate visualization
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10,5))
        ax1.imshow(image)
        ax1.set_title("Original Image")
        ax2.imshow(processed[:,:,0], cmap='gray')
        ax2.set_title("Processed View")
        plt.tight_layout()
        
        # Format results with native Python types
        risk_level = "High" if pred > 0.7 else "Moderate" if pred > 0.4 else "Low"
        confidence = min(0.99, max(0.01, pred * 1.2))
        
        report = {
            "Risk Score": pred,
            "Risk Level": risk_level,
            "Confidence": confidence,
            "Recommendation": get_recommendation(pred, sensitivity)
        }
        
        return report, fig
        
    except Exception as e:
        return {"Error": str(e)}, None

def get_recommendation(score, sensitivity):
    threshold = 0.3 + (0.4 * sensitivity)
    if score > threshold + 0.3:
        return "Urgent: Immediate specialist consultation recommended"
    elif score > threshold:
        return "Concerning: Additional diagnostic tests advised"
    elif score > threshold - 0.2:
        return "Monitor: Follow-up screening recommended"
    else:
        return "Normal: Routine screening advised"

# Create Gradio interface
with gr.Blocks(theme=gr.themes.Soft()) as demo:
    gr.Markdown("""
    # <center>Medical Imaging AI</center>
    ### <center>Cancer Detection System</center>
    """)
    
    with gr.Row():
        with gr.Column():
            scan_input = gr.Image(label="Upload Medical Scan", type="numpy")
            sensitivity = gr.Slider(0.1, 1.0, value=0.7, label="Detection Sensitivity")
            analyze_btn = gr.Button("Analyze", variant="primary")
            
        with gr.Column():
            report_output = gr.JSON(label="Diagnostic Report")
            visualization = gr.Plot(label="Scan Analysis")
    
    with gr.Accordion("Model Information", open=False):
        gr.Markdown(f"""
        - *Model*: MobileNetV3Small
        - *Validation AUC*: {history_callback.history['val_auc'][-1]:.4f}
        - *Input Size*: 224×224 pixels
        - *Processing*: CLAHE + Adaptive Histogram Equalization
        """)

    analyze_btn.click(
        fn=analyze_medical_scan,
        inputs=[scan_input, sensitivity],
        outputs=[report_output, visualization]
    )

# Launch the app
demo.launch(share=True)

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v3/weights_mobilenet_v3_small_224_1.0_float_no_top_v2.h5
Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30
* Running on local URL:  http://127.0.0.1:7860
* Running on public URL: https://2a27790ada4470e62b.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


