# 0. Description

This is a jupyter notebook file (python script) that aligns two binary images that are misaligned by unknown amount of offset in the horiztonal and/or vertical directions. This script will overlay the given two images in all possible ways, pixel by pixel, and select the best aligned images. It assumes the images are in the same scale of units, in the same orientation, and expected to have some degree of overlap.

This script returns a dataframe of pixel coordinates of both images (labelled such that the pixel source is clear) and the aligned images

For the current version, the script is hard-coded such that one of the image is exactly 2408 pixels by 2408 pixels and named the two images to be 'cell' and 'collagen', but the script can be amended for a general case. This repository will further be improved such that it will include an example toy images and be more generalizable for use.

This python code was ran in version 3.7.4.

# 1. Set up

## 1.1 libraries

In [1]:
import sys
print("print version")
print(sys.version)

import os
import time
# insert your path to python libraries here
sys.path.insert(1, "/path/to/lib/")
import numpy as np
import pandas as pd
import random
import matplotlib.pyplot as plt
import re
import subprocess
from PIL import Image
from IPython.display import display
from concurrent.futures import ProcessPoolExecutor
from functools import partial
from functools import wraps

print version
3.7.4 (default, Sep 11 2019, 11:24:51) 
[GCC 6.2.0]


## 1.2 variables

In [2]:
# insert your path to project here
path_project = '/path/to/project/'

path_image_cell = f'{path_project}data/image_cell_231019/png_converted/'
path_image_collagen = f'{path_project}data/image_collagen_230904/'

path_df_collagen = f'{path_project}data/df_collagen_231023/'
path_df_collagen_mod = f'{path_project}data/df_collagen_modified_231019/'
vec_filenames_df_collagen = os.listdir(f'{path_df_collagen}')

path_df_cell = f'{path_project}data/df_cell_231023/'

path_map = f'{path_project}data/'
file_map = f'{path_map}core_patient_mapping.csv'

file_df_cell = f'{path_project}data/df_cell.csv'
df_cell = pd.read_csv(file_df_cell)
df_cell['id'] = df_cell['tumor_ids'].astype(str) + '_' + df_cell['core_ids'].astype(str)

path_plot_overlap = f"{path_project}plot/overlap_231030/"
path_plot_overlap_shifted = f"{path_project}plot/overlap_shifted_231030/"
path_df_collagen_modified = f"{path_project}data/df_collagen_modified_231030/"
path_map_leftover = f'{path_project}script/overlay_collagen_cell/'

colormap_scatter = {'cell': 'blue', 'collagen': 'red'}

## 1.3 function

In [3]:
def timeit(func):
    """Decorator to measure execution time of a function."""
    @wraps(func)
    def timeit_wrapper(*args, **kwargs):
        start_time = time.perf_counter()
        result = func(*args, **kwargs)
        end_time = time.perf_counter()
        total_time = end_time - start_time
        print(f'\nFunction <{func.__name__}> Took {total_time:.4f} seconds')
        return result
    return timeit_wrapper

def translate_R1(c, min_input, max_input, min_output, max_output):
    """Translates a value from one range to another."""
    return ((c - min_input) / (max_input - min_input) * (max_output - min_output) + min_output)

@timeit
def get_df_map():
    """Reads and processes mapping data from a CSV file."""
    df_map = pd.read_csv(file_map)
    df = df_map

    # Melt the DataFrame to reshape data
    df_map_melted = pd.melt(df, id_vars=['TumorID', 'TMA', 'Block_ID_cust', 'Variant'],
                            value_vars=['Core 1', 'Core 2'], 
                            var_name='CoreType', value_name='Value')

    # Drop rows with NaN in the 'Value' column
    df_map_melted = df_map_melted.dropna(subset=['Value'])

    # Clean up and format data in 'CoreType' and 'Value' columns
    df_map_melted['CoreType'] = df_map_melted['CoreType'].str.replace(' ', '')
    df_map_melted = df_map_melted.applymap(lambda x: x.strip() if isinstance(x, str) else x)
    df_map_melted['CoreID'] = '1,' + df_map_melted['Value'].astype(str)
    df_map_melted['CoreID_modified'] = (df_map_melted['TMA'].astype(str) + '.' +
                                        df_map_melted['Variant'].astype(str) + '_' +
                                        df_map_melted['CoreID'].astype(str))
    df_map_melted = df_map_melted[['TumorID', 'CoreID_modified']]
    return df_map_melted

def get_filename(index_image):
    """Retrieves the filename based on image index and mapping data."""
    for index_temp, row in df_filenames_png[['key_PNG', 'filename']].iloc[(index_image):(index_image + 1)].iterrows():
        i = row['key_PNG']
        df_match = df_map_melted[df_map_melted['CoreID_modified'] == i]

        if df_match.shape[0] == 0:
            print(f'<get_filename> WARNING: no match for {i}; skip')
        elif df_match.shape[0] != 1:
            print(f'<get_filename> ERROR: more than one match')
        else:
            print(f'<get_filename> SUCCESS: one match for {i}')
            i_tumor_id, i_core_id = df_match[['TumorID', 'CoreID_modified']].iloc[0].values
            i_filename_cell = row['filename']
            i_filename_collagen = re.sub(r"_phenotype_map-\d+", "", i_filename_cell)
            i_filename_output = f'{i_tumor_id}_{i_core_id}'
            return i_filename_cell, i_filename_collagen, i_filename_output

def fn_reconstruct(df, img=None, bool_color_white=True, bool_save=False, path_save=None, name_plot=None):
    """Reconstructs an image from a DataFrame of pixel coordinates and RGB values."""
    # Create a blank image array
    if img is None:
        reconstructed_img = np.zeros((max(df['x']) + 1, max(df['y']) + 1, 3), dtype=np.uint8)
        print(f"<fn_reconstruct> image size: {max(df['x']) + 1, max(df['y']) + 1}")
    else:
        reconstructed_img = np.zeros((img.size[1], img.size[0], 3), dtype=np.uint8)
        print(f'<fn_reconstruct> image size: {img.size[1], img.size[0]}')

    # Fill the image array with pixel values
    if bool_color_white:
        reconstructed_img[df['x'].values, df['y'].values] = [255, 255, 255]
    else:
        reconstructed_img[df['x'].values, df['y'].values] = df[['red', 'green', 'blue']].values

    # Display the reconstructed image
    plt.imshow(reconstructed_img)
    plt.show()

    # Save the image if required
    if bool_save and path_save and name_plot:
        file_path = os.path.join(path_save, f"{name_plot}.png")
        plt.savefig(file_path)
    plt.close()

def fn_image_to_df(file_image):
    """Converts an image file into a DataFrame containing pixel coordinates and RGB values, excluding white pixels."""
    # Open the image and convert it to RGB
    colourImg = Image.open(file_image)
    colourPixels = colourImg.convert("RGB")

    # Get image dimensions
    width, height = colourImg.size

    # Reshape the RGB data array to match image dimensions
    colourArray = colourArray.reshape((height, width, 3))

    # Create an array of pixel indices
    indicesArray = np.moveaxis(np.indices((height, width)), 0, 2)

    # Combine the indices and color data, and reshape into a 2D array
    allArray = np.dstack((indicesArray, colourArray)).reshape((-1, 5))

    # Create a DataFrame from the array
    df = pd.DataFrame(allArray, columns=["y", "x", "red", "green", "blue"])

    # Exclude white pixels (where R=G=B=255)
    df_subset = df[~((df['red'] == 255) & (df['green'] == 255) & (df['blue'] == 255))]
    return colourImg, df_subset
        
# get a dataframe df_filenames_png which has 
def fn_get_df_filenames_png(path_image_cell):
    """Generates a DataFrame from filenames in the specified directory using a regex pattern."""
    filenames_png = os.listdir(path_image_cell)

    # Regex pattern to capture the required portions of the filename
    pattern = r"HTMA_(\d{3})\.(\d)_Core\[(\d+,\d+,\d+)\]"

    # Lists to store the extracted values
    three_digits = []
    single_digit = []
    comma_separated_digits = []

    # Iterate through each filename and extract the desired information
    for filename in filenames_png:
        match = re.search(pattern, filename)
        if match:
            three_digits.append(match.group(1))
            single_digit.append(match.group(2))
            comma_separated_digits.append(match.group(3))
        else:
            print('no match for:', filename)

    # Convert the lists to a DataFrame
    df_filenames_png = pd.DataFrame({
        'ThreeDigits': three_digits,
        'SingleDigit': single_digit,
        'CommaSeparatedDigits': comma_separated_digits
    })

    # Create 'key_PNG' column by combining the captured patterns
    df_filenames_png['key_PNG'] = (df_filenames_png['ThreeDigits'].astype(str) + 
                                   '.' + 
                                   df_filenames_png['SingleDigit'].astype(str) + 
                                   '_' + 
                                   df_filenames_png['CommaSeparatedDigits'].astype(str))
    df_filenames_png['filename'] = filenames_png

    return(df_filenames_png)

# Create a DataFrame from the list of filenames
def fn_create_df_map(df_cell, path_df_cell, path_df_collagen, df_filenames_png):
    """Creates a DataFrame mapping cell and collagen files to their respective data."""
    df_image_cell_files = pd.DataFrame({'file_cell': os.listdir(path_df_cell)})
    df_image_cell_files['file_cell'] = df_image_cell_files['file_cell'].str.replace('.tsv$', '', regex=True)

    df_image_collagen_files = pd.DataFrame({'file_collagen': os.listdir(path_df_collagen)})
    df_image_collagen_files['file_collagen'] = df_image_collagen_files['file_collagen'].str.replace('.tsv$', '', regex=True)

    df_map = pd.merge(
        pd.merge(df_cell[['id', 'core_ids']].drop_duplicates(), 
                 df_image_cell_files, 
                 how = 'left', left_on = 'id', right_on = 'file_cell'), 
        df_image_collagen_files,
        how = 'left', left_on = 'id', right_on = 'file_collagen')
    df_map_temp = pd.merge(
        df_map,
        df_filenames_png[['key_PNG', 'filename']],
        how = 'left', left_on = 'core_ids', right_on = 'key_PNG'
    )

    df_map_temp['filename_cell'] = df_map_temp['filename']
    df_map_temp['filename_collagen'] = df_map_temp['filename'].str.replace('_phenotype_map-14.png', '.png', regex=True)
    df_map_temp.drop('filename', axis = 1, inplace = True)
    return(df_map_temp)

def create_pixel_offset_tuples(width, height):
    """Generates a dictionary of pixel offset tuples for given width and height."""
    if (width > 2408) or (height > 2408):
        print(f"<create_pixel_offset_tuples> wrong width and height: {width} {height}")
        
    # Subtracting 2409 from A and B
    n_pixel_offset_x = 2409 - width
    n_pixel_offset_y = 2409 - height
    
    # Creating vec_offset_x and vec_offset_y arrays
    vec_offset_x = np.arange(n_pixel_offset_x)
    vec_offset_y = np.arange(n_pixel_offset_y)
    
    # Creating all combinations of pairs from vec_offset_x and vec_offset_y
    pixel_pairs = [(x, y) for x in vec_offset_x for y in vec_offset_y]
    
    # Creating a dictionary with tuples as keys and empty values
    pixel_dict = {pair: None for pair in pixel_pairs}
    
    return pixel_dict
def save_df(df, filename_output, TYPE=None):
    """Saves the given DataFrame to a TSV file based on the specified TYPE."""
    if TYPE == 'collagen':
        # Construct the filename for collagen and save the DataFrame
        file_df_collagen = f'{path_df_collagen}{filename_output}.tsv'
        df.to_csv(file_df_collagen, sep='\t', index=None)
    elif TYPE == 'cell':
        # Construct the filename for cell and save the DataFrame
        file_df_cell = f'{path_df_cell}{filename_output}.tsv'
        df.to_csv(file_df_cell, sep='\t', index=None)
    else:
        print(f'<save_df> ERROR: TYPE must be collagen or cell')

def count_overlap(df1, df2):
    """Counts the number of overlapping pixels between two DataFrames."""
    df_merge_temp = pd.merge(df1[['x', 'y']], df2[['x', 'y']], 
                             how = 'outer', left_on = ['x', 'y'], right_on = ['x', 'y'], indicator = True)
    return(sum(df_merge_temp['_merge'] == 'both'))

def compute_overlap(keys_subset, df_image_collagen, df_image_cell):
    """Computes the overlap of pixels between collagen and cell images for each offset key."""
    result = []
    for keys in keys_subset:
        x, y = keys
        df_temp = pd.DataFrame({'x': df_image_collagen['x'] + x, 'y': df_image_collagen['y'] + y})
        overlap = count_overlap(df_image_cell, df_temp)
        result.append((keys, overlap))
    return result

from functools import partial
from concurrent.futures import ProcessPoolExecutor
import numpy as np

@timeit
def parallel_compute_overlap(dict_offset, df_image_collagen, df_image_cell, num_splits=10):
    """Performs parallel computation of pixel overlap between collagen and cell images."""
    print(f'<parallel_compute_overlap> trying {len(dict_offset.keys())} combinations')
    
    # Convert the keys to a NumPy array for proper splitting
    keys_array = np.array(list(dict_offset.keys()), dtype=object)

    # Splitting the keys for parallel processing
    keys_split = np.array_split(keys_array, num_splits)
    
    # Creating partial function for compute_overlap with fixed arguments
    partial_func = partial(compute_overlap, df_image_collagen=df_image_collagen, df_image_cell=df_image_cell)
    
    # Using ProcessPoolExecutor for parallel processing
    with ProcessPoolExecutor() as executor:
        results = list(executor.map(partial_func, keys_split))
    
    # Updating dict_offset with results from parallel computation
    for subset in results:
        for keys, overlap in subset:
            dict_offset[tuple(keys)] = overlap
        
    return dict_offset

def transform_df_image_collagen(df_image_collagen, df_image_cell, df_cell_subset, 
                                bool_save=False, 
                                path_save=path_df_collagen_modified,
                                name_df=None):
    """Transforms the coordinates of collagen DataFrame based on cell DataFrame and optional saving."""
    # Calculating minimum and maximum coordinates for cell image and cell DataFrame
    min_x_cell_image = min(df_image_cell['x'])
    max_x_cell_image = max(df_image_cell['x'])
    min_y_cell_image = min(df_image_cell['y'])
    max_y_cell_image = max(df_image_cell['y'])

    min_x_cell_df = min(df_cell_subset['X'])
    max_x_cell_df = max(df_cell_subset['X'])
    min_y_cell_df = min(df_cell_subset['Y'])
    max_y_cell_df = max(df_cell_subset['Y'])

    # Applying translation to y and x coordinates
    df_image_collagen['y_trans'] = df_image_collagen['y'].apply(
        lambda c: translate_R1(c, min_y_cell_image, max_y_cell_image, min_y_cell_df, max_y_cell_df)
    )
    df_image_collagen['x_trans'] = df_image_collagen['x'].apply(
        lambda c: translate_R1(c, min_x_cell_image, max_x_cell_image, min_x_cell_df, max_x_cell_df)
    )

    # Creating a modified DataFrame with translated coordinates
    df_image_collagen_mod = df_image_collagen[['x_trans', 'y_trans']]
    df_image_collagen_mod.columns = ['x', 'y']

    # Optionally save the modified DataFrame
    if bool_save:
        file_df_image_collagen_mod = f'{path_save}{name_df}.tsv'
        df_image_collagen_mod[['x', 'y']].to_csv(file_df_image_collagen_mod, index=False, sep='\t')

    return df_image_collagen_mod

@timeit
def fn_align(index_core, df_map):
    """Aligns and visualizes the overlap between collagen and cell images for a given core index."""
    # Retrieve the specific row from df_map based on the index
    row = df_map.iloc[index_core]
    ID = row['id']
    
    print(f'<fn_align> starting with index: {index_core}, ID: {ID}')
    
    # Constructing file paths
    filename_df_image_collagen = f"{path_df_collagen}{ID}.tsv"
    filename_df_image_cell = f"{path_df_cell}{ID}.tsv"
    filename_image_collagen = f"{path_image_collagen}{row['filename_collagen']}"
    file_paths = [filename_df_image_collagen, filename_df_image_cell, filename_image_collagen]

    # Checking if all required files exist
    if not all(os.path.isfile(path) for path in file_paths):
        print(f"<fn_align> some files don't exist for {ID}")
        return

    # Loading the data from files
    df_cell_subset = df_cell[df_cell['id'] == ID]
    df_image_collagen = pd.read_csv(filename_df_image_collagen, sep='\t')[['x', 'y']]
    df_image_cell = pd.read_csv(filename_df_image_cell, sep='\t')[['x', 'y']]
    image_collagen = Image.open(filename_image_collagen)

    width, height = image_collagen.size

    # Aligning and processing images if size is not standard
    if (width != 2408) or (height != 2408):
        dict_offset = create_pixel_offset_tuples(width, height)
        print(f'<fn_align> image_collagen size: {width} x {height}')
        
        updated_dict_offset = parallel_compute_overlap(dict_offset, df_image_collagen, df_image_cell)
        key_with_highest_value = max(updated_dict_offset, key=updated_dict_offset.get)
        print(f"<fn_align> key with highest value {key_with_highest_value}")

        # Adjusting collagen image coordinates based on the offset
        x, y = key_with_highest_value
        df_image_collagen['x'] = df_image_collagen['x'] + x
        df_image_collagen['y'] = df_image_collagen['y'] + y

    # Setting colors for visualization
    df_image_collagen[['red', 'green', 'blue']] = [255, 0, 0]
    df_image_cell[['red', 'green', 'blue']] = [255, 255, 255]

    # Combining and reconstructing the image
    df_temp = pd.concat([df_image_collagen, df_image_cell], axis=0)
    fn_reconstruct(df_temp, bool_color_white=False, bool_save=True, path_save=path_plot_overlap, name_plot=ID)

    # Transforming and saving the modified collagen image
    df_image_collagen_modified = transform_df_image_collagen(df_image_collagen, df_image_cell, df_cell_subset, 
                                                             bool_save=True, path_save=path_df_collagen_modified, name_df=ID)
    
    # Preparing data for scatter plot
    df_cell_subset_temp = df_cell_subset[['X', 'Y']].copy()
    df_cell_subset_temp.columns = ['x', 'y']
    df_cell_subset_temp['type'] = 'cell'
    df_image_collagen_modified = df_image_collagen_modified.copy()
    df_image_collagen_modified['type'] = 'collagen'
    
    # Plotting and saving the scatter plot
    df_temp = pd.concat([df_image_collagen_modified, df_cell_subset_temp], axis=0)
    colors = df_temp['type'].map(colormap_scatter)
    plt.scatter(df_temp['x'], df_temp['y'], color=colors)
    plt.savefig(f'{path_plot_overlap_shifted}{ID}.png')
    plt.show()

def update_map(df_map, vec_completed, bool_save=False, path_save=None, name_df=None):
    """Updates the mapping DataFrame to exclude completed jobs, with optional saving."""
    # Merging df_map with completed file list to identify unfinished jobs
    df_map_temp = pd.merge(df_map, pd.DataFrame({'file_plot': vec_completed}),
                           how='left', left_on='id', right_on='file_plot')
    
    # Keeping only the rows without matches in vec_completed
    result = df_map_temp[df_map_temp['file_plot'].isna()].copy()
    
    # Dropping the temporary 'file_plot' column
    result.drop('file_plot', axis=1, inplace=True)
    
    # Saving the updated DataFrame if requested
    if bool_save:
        result.to_csv(f'{path_save}{name_df}.tsv', sep='\

# main

In [4]:
df_map_melted = get_df_map()
df_filenames_png = fn_get_df_filenames_png(path_image_cell)
df_map = fn_create_df_map(df_cell, path_df_cell, path_df_collagen, df_filenames_png)

In [1]:
for i in range(13, 14):
    print(f"starting index {i}")
    # run functions here
    # fn_align(i, df_map)

starting index 13
