In [None]:
# Associate manual annotation of celltypes (x,y positions)
# to single cells
# authors: Pacome Prompsy
# contact: pacome.prompsy@chuv.ch
# Guenova Lab
# CHUV (Centre Hospitalier Universitaire Vaudois), Lausanne, Suisse


In [None]:
import sys
sys.path.append(".")
from torch.utils.tensorboard import SummaryWriter
import os
import torch
import argparse
import numpy as np
from torch.utils.data import DataLoader, WeightedRandomSampler
import json
import os
import pandas as pd
import numpy as np
from tifffile import imread
import random
import cv2
import tifffile
from scipy.spatial.distance import cdist
import re
import fileinput
import shutil

In [None]:
output_dir = "../output/CellSighter/marker/"
training_dir = "../output/CellSighter/marker/marker_classification/"
marker_file  = "../annotation/marker_metadata.csv"
tiff_dir = "../output/input/"
segmentation_dir  = "../output/segmentation/"
cell_table_dir  = "../output/cell_table/"
config_file = "../output/CellSighter/marker/Predictions/config.json"

In [None]:
marker_df = pd.read_csv(os.path.join(marker_file))

marker_unique = marker_df.Marker[marker_df.PassOverallQuality == True]
marker_unique = marker_unique[marker_unique != "DAPI"]
marker_unique = marker_unique.values

In [None]:
for sample in os.listdir(tiff_dir)[63:68]:
    print(sample)
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    base_dir = os.path.join(output_dir, "Predictions")
    if not os.path.exists(base_dir):
        os.makedirs(base_dir)

    print(sample)
    annotator = ""
    if len(annotator) > 0:
        name = sample + "-" + annotator
    else:
        name = sample

    annotation_dir = "../output/manual_annotation_marker/" + name

    for marker in marker_unique:
        print(marker)
        marker_dir = os.path.join(base_dir, marker)
        if not os.path.exists(marker_dir):
            os.makedirs(marker_dir)

        celltypes_dir = os.path.join(marker_dir, "CellTypes")
        if not os.path.exists(celltypes_dir):
            os.makedirs(celltypes_dir)

        cells_dir = os.path.join(celltypes_dir, "cells")
        if not os.path.exists(cells_dir):
            os.makedirs(cells_dir)
        cells2labels_dir = os.path.join(celltypes_dir, "cells2labels")
        if not os.path.exists(cells2labels_dir):
            os.makedirs(cells2labels_dir)
        data_dir = os.path.join(celltypes_dir, "data")
        if not os.path.exists(data_dir):
            os.makedirs(data_dir)
        images_dir =  os.path.join(data_dir, "images")
        if not os.path.exists(images_dir):
            os.makedirs(images_dir)    

        if os.path.isfile(os.path.join(tiff_dir, sample, marker + ".tiff")):
    
            # Load segmentation 
            whole_cell = imread(os.path.join(segmentation_dir, sample + "_whole_cell.tiff"))
            # Save in the "cells" folder
            np.savez(os.path.join(cells_dir, name +".npz"), data = whole_cell)


            if annotator == "":

                # Reading the cell centroids
                cells = pd.read_csv(os.path.join(cell_table_dir, sample + "_cell_table_size_normalized.csv.gz"))
                cells = cells[['centroid-0', 'centroid-1', 'label']]

                labels = np.zeros(int(max(cells.label)) + 1)
                for i in range(len(labels)):
                    labels[i] = -1

                # Save as npz in the "data" folder
                np.savez(os.path.join(cells2labels_dir,  name + ".npz"), data = labels)

            else:
                # Load the labels
                csv_files = [marker + "+.csv", marker + "-.csv"]
                dfs = []
                for csv_file in csv_files:
                    # Read the CSV file into a dataframe
                    df = pd.read_csv(os.path.join(annotation_dir, csv_file))

                    # Get the cell type from the file name
                    cell_type = os.path.basename(csv_file).split('.csv')[0]
                    cell_type = re.sub(marker,"",cell_type)
                    if cell_type == "+":
                        cell_type = 1
                    if cell_type == "-":
                        cell_type = 0
                    # Add a column for the cell type
                    df['class'] = cell_type

                    # Append the dataframe to the list of dataframes
                    dfs.append(df)
                result_df = pd.concat(dfs, ignore_index=True)

                # Reading the cell centroids
                cells = pd.read_csv(os.path.join(cell_table_dir, sample + "_cell_table_size_normalized.csv.gz"))
                cells = cells[['centroid-0', 'centroid-1', 'label']]
                cells["class"] = -1

                # Loop through each cell in the "cells" dataframe
                for i, row in result_df.iterrows():
                    # Extract the x and y coordinates of the cell centroid
                    point_x, point_y, cell_class = row["axis-0"], row["axis-1"], row["class"]

                    # Initialize a dictionary to store the distances to the closest point in each cell type
                    distances = {}

                    # Loop through each cell type in the "result_df" dataframe

                    # Filter the "result_df" dataframe to include only the points for the current cell type
                    cells_locations = cells[['centroid-0', 'centroid-1', 'label']].values
                    cells_locations = cells_locations[((cells_locations[...,0] > point_x - 200) & (cells_locations[...,0] < point_x + 200)) &
                                             ((cells_locations[...,1] > point_y - 200) & (cells_locations[...,1] < point_y + 200))]

                    if cells_locations.shape[0] > 0:
                        # Compute the distances from the cell centroid to each point in the filtered dataframe
                        cell_distances = cdist([[point_x, point_y]], cells_locations[...,0:2]).flatten()
                        label = cells_locations[np.where(cell_distances==np.min(cell_distances))[0][0],2]

                        # Add the closest cell type
                        if np.min(cell_distances) < 150: 
                            cells.loc[(cells.label == label),"class"] = cell_class

                labels = np.zeros(int(max(cells.label)) + 1)
                for i in range(len(labels)):
                    labels[i] = -1
                idx = [int(item) for item in cells["label"].to_list()]
                labels[idx] = cells["class"]

                # Save as npz in the "data" folder
                np.savez(os.path.join(cells2labels_dir,  name + ".npz"), data = labels)


            # Copy full markers images

            all_markers = []

            # Load the markers 
            for mark in [marker, "DAPI"]:

                # Load segmentation 
                marker_image = imread(os.path.join(tiff_dir, sample, mark + ".tiff"))

                # Save in the "cells" folder
                all_markers.append(marker_image)


            # Combine
            all_markers = np.array(all_markers)
            all_markers = np.transpose(all_markers, (1, 2, 0))

            # Save as npz in the "data" folder

            np.savez(os.path.join(images_dir,  name + ".npz"),  data = all_markers)

            with open(os.path.join(marker_dir, 'channels.txt'), 'w') as f:
                f.write(marker + "\n")
                f.write('DAPI')