In [None]:
base_dir = '/home/labs/hornsteinlab/Collaboration/MOmaps/input/images/raw/Cory/indi-image-pilot-20241128'
NOVA_HOME = '/home/labs/hornsteinlab/Collaboration/NOVA_GAL/NOVA'

In [1]:
import os
import sys
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import cv2
from matplotlib.backends.backend_pdf import PdfPages
os.environ['NOVA_HOME'] = NOVA_HOME
sys.path.insert(1, os.getenv('NOVA_HOME'))
from src.preprocessing.preprocessing_utils import rescale_intensity, fit_image_shape
%load_ext autoreload
%autoreload 2

In [2]:
# Utility functions

def show_images(
    df: pd.DataFrame, 
    marker: str, 
    samples: int = 5, 
    show_DAPI: bool = True, 
    batch: int = None, 
    rep: int = None, 
    condition: str = None, 
    cell_line: str = None
) -> None:
    """
    Display images from a DataFrame based on specific criteria and optionally show corresponding DAPI images.

    Parameters:
        df (pd.DataFrame): The input DataFrame containing image data.
        marker (str): The marker to filter and display images for.
        samples (int, optional): Number of images to display. Defaults to 5.
        show_DAPI (bool, optional): Whether to display corresponding DAPI images. Defaults to True.
        batch (int, optional): The batch number to filter by. Defaults to None.
        rep (int, optional): The replicate number to filter by. Defaults to None.
        condition (str, optional): The condition to filter by. Defaults to None.
        cell_line (str, optional): The cell line to filter by. Defaults to None.

    Returns:
        None
    """
    df = get_specific_imgs(
        df, marker=marker, batch=batch, rep=rep, 
        condition=condition, cell_line=cell_line
    ).sample(frac=1, random_state=1)  # Shuffle the filtered DataFrame

    for ind, target_path in enumerate(df.Path.values[:samples]):
        print(ind + 1)
        # Display the target image
        show_processed_tif(target_path)
        print(target_path)

        if show_DAPI:
            # Display the corresponding DAPI image
            dapi_file_name = get_dapi_path(target_path, marker)
            print(dapi_file_name)
            show_processed_tif(dapi_file_name)
            print('--------------------------------')  

def get_dapi_path(path, marker1, marker2 = 'DAPI'):
    """
    Modify the given path to generate a DAPI file name.

    Parameters:
        path (str): Original file path.

    Returns:
        str: Modified path for the DAPI file.
    """
    return path.replace(marker1, marker2)

def show_label(path):
    path_l = path.split("/")
    return path_l[-7:]

def process_tif(path):
    """
    Read and process the image.

    Parameters:
        path (str): Path to the image file.

    Returns:
        ndarray: Processed image.
    """
    img = cv2.imread(path, cv2.IMREAD_ANYDEPTH)
    img = fit_image_shape(img, (1024, 1024))
    img = rescale_intensity(img)
    return img
    
def show_processed_tif(path):
    # read the image stack
    img = process_tif(path)
    
    # show the image with grid 
    fig, ax = plt.subplots(figsize=(7,7))
    plt.imshow(img, cmap='gray')
    put_tiles_grid(image=img, ax=ax)
    plt.axis('off')
    plt.title(show_label(path), color='purple')
    print(f"Img shape: {img.shape}")
    plt.show()
    
def put_tiles_grid(image, ax):
    # assumes 1000x1000 image
    # Add dashed grid lines for 64 blocks
    num_blocks = 10
    block_size = 100
    for i in range(1, num_blocks):
        # Draw horizontal dashed lines
        ax.plot([0, 1000], [i * block_size, i * block_size], linestyle='--', lw=1, alpha=0.5, color='pink')
        # Draw vertical dashed lines
        ax.plot([i * block_size, i * block_size], [0, 1000], linestyle='--', lw=1, alpha=0.5, color='pink')
    # Remove x and y axis labels
    ax.set_xticks([])
    ax.set_yticks([])

def get_specific_imgs(
    df: pd.DataFrame, 
    marker: str = None, 
    batch: int = None, 
    rep: int = None, 
    condition: str = None, 
    cell_line: str = None
) -> pd.DataFrame:
    """
    Filter a DataFrame to retrieve specific rows based on the given parameters.

    Parameters:
        df (pd.DataFrame): The input DataFrame containing image data.
        marker (str, optional): The marker to filter by. Defaults to None.
        batch (int, optional): The batch number to filter by. Defaults to None.
        rep (int, optional): The replicate number to filter by. Defaults to None.
        condition (str, optional): The condition to filter by. Defaults to None.
        cell_line (str, optional): The cell line to filter by. Defaults to None.

    Returns:
        pd.DataFrame: A DataFrame filtered based on the specified criteria.
    """
    filtered_df = df.copy()
    if marker is not None:
        filtered_df = filtered_df[filtered_df['Marker'] == marker]
    if batch is not None:
        filtered_df = filtered_df[filtered_df['Batch'] == f'batch{str(batch)}']
    if rep is not None:
        filtered_df = filtered_df[filtered_df['Rep'] == f'rep{str(rep)}']
    if condition is not None:
        filtered_df = filtered_df[filtered_df['Condition'] == condition]
    if cell_line is not None:
        filtered_df = filtered_df[filtered_df['CellLine'] == cell_line]
    return filtered_df

def create_img_pdf_report(df, marker, condition, output_file, reps=8, batches=3, samples=3):
    """
    Create a PDF report of images where each page corresponds to a batch, with rows for each rep and images for each condition.

    Parameters:
        df (pd.DataFrame): DataFrame with image data.
        condition (str): Condition to filter by ('Untreated' or 'Stress').
        output_file (str): Path to save the PDF file.
    """
    with PdfPages(f'{marker}_{output_file}') as pdf:
        for batch in range(1, batches+1):
            fig, axes = plt.subplots(reps, samples, figsize=(12, 24))  # 8 reps, 3 images each
            fig.suptitle(f"Batch {batch} - Condition: {condition}", fontsize=16)

            for rep in range(1, reps+1):
                print(marker, batch, rep, condition)
                images = get_specific_imgs(
                    df, marker=marker, batch=batch, rep=rep, condition=condition, cell_line='WT')
                images = images.sample(n=samples, random_state=1)

                for i, path in enumerate(images['Path'][:3]):
                    img = process_tif(path)

                    ax = axes[rep - 1, i]
                    ax.imshow(img, cmap='gray')
                    ax.axis('off')
                    ax.set_title(f"Rep {rep} - Img {i+1}")

            plt.tight_layout(rect=[0, 0.03, 1, 0.95])
            pdf.savefig(fig)
            plt.close(fig)
            
def extract_image_metadata(base_dir):
    """
    Traverse through a directory structure and extract metadata for images.

    Args:
        base_dir (str): The base directory containing the images.

    Returns:
        pd.DataFrame: A DataFrame containing metadata with columns 
                      ['Path', 'RootFolder', 'Marker', 'Condition', 'CellLine', 
                       'Batch_Rep', 'Rep', 'Batch', 'Panel'].
    """
    # Prepare a list to store extracted data
    data = []

    # Traverse through all directories and files in the base directory
    for root, dirs, files in os.walk(base_dir):
        for file in files:
            if file.endswith('.tif'):  # Filter only .tif files
                # Construct the full file path
                file_path = os.path.join(root, file)

                # Extract components from the file path
                parts = root.split(os.sep)
                batch = next((p for p in parts if p.startswith('batch')), None)
                panel = next((p for p in parts if p.startswith('panel')), None)
                condition = parts[parts.index(batch) + 3] if batch else None  # "Condition" is 3 levels after "Batch"
                cell_line = parts[parts.index(batch) + 1] if batch else None  # "CellLine" is the first level after "Batch"
                rep = next((p for p in parts if p.startswith('rep')), None)
                marker = parts[-1]  # "Marker" is the last folder

                # Store the data
                data.append({
                    'Path': file_path,
                    'RootFolder': base_dir,
                    'Marker': marker,
                    'Condition': condition,
                    'CellLine': cell_line,
                    'Batch_Rep': f'{batch}/{rep}' if batch and rep else None,
                    'Rep': rep,
                    'Batch': batch,
                    'Panel': panel
                })

    # Create a DataFrame from the data
    return pd.DataFrame(data)

In [3]:
df = extract_image_metadata(base_dir)

In [7]:
# Example Usage:

# Show images for the marker 'P54'
# Filters: default parameters for all other options
# show_images(df, 'P54')

# Show images for the marker 'G3BP1'
# Exclude DAPI, limit to 3 samples, and filter by Batch 1, Rep 1, WT cell line, and Untreated condition
# show_images(df, marker='G3BP1', show_DAPI=False, samples=3, batch=1, rep=1, cell_line='WT', condition='Untreated')

# Create a PDF report for the marker 'TIA1' under the 'Untreated' condition
# Output the report as 'marker_name_Untreated/stress_Report.pdf'
# create_img_pdf_report(df, marker='TIA1', condition='Untreated', output_file='Untreated_Report.pdf')