Spot detection on 1 FOV so far

In [1]:
import matplotlib.pyplot as plt
import pandas as pd
from PIL import Image
from pathlib import Path
from skimage import filters, measure, exposure, morphology, io
from skimage.measure import regionprops
from collections import defaultdict
import numpy as np
import glob
import time
import pickle
import re
import os
import gc
import logging
import csv
import napari
from collections import Counter

In [5]:
#To check if I have the right saved names of my data

for file_name in os.listdir(r"C:\Users\s369577\Desktop\test"):
    print(file_name)

.DS_Store
out_opt_flow_registered_X10_Y2_c01_Alexa_488.tif
out_opt_flow_registered_X10_Y2_c01_Alexa_568.tif
out_opt_flow_registered_X10_Y2_c01_Alexa_647.tif
out_opt_flow_registered_X10_Y2_c01_Atto_425.tif
out_opt_flow_registered_X10_Y2_c01_Atto_490LS.tif
out_opt_flow_registered_X10_Y2_c01_DAPI.tif
out_opt_flow_registered_X10_Y2_c02_Alexa_488.tif
out_opt_flow_registered_X10_Y2_c02_Alexa_568.tif
out_opt_flow_registered_X10_Y2_c02_Alexa_647.tif
out_opt_flow_registered_X10_Y2_c02_Atto_425.tif
out_opt_flow_registered_X10_Y2_c02_Atto_490LS.tif
out_opt_flow_registered_X10_Y2_c02_DAPI.tif
out_opt_flow_registered_X10_Y2_c03_Alexa_488.tif
out_opt_flow_registered_X10_Y2_c03_Alexa_568.tif
out_opt_flow_registered_X10_Y2_c03_Alexa_647.tif
out_opt_flow_registered_X10_Y2_c03_Atto_425.tif
out_opt_flow_registered_X10_Y2_c03_Atto_490LS.tif
out_opt_flow_registered_X10_Y2_c03_DAPI.tif
out_opt_flow_registered_X10_Y2_c04_Alexa_488.tif
out_opt_flow_registered_X10_Y2_c04_Alexa_568.tif
out_opt_flow_registered_X

In [2]:
# Set up logging
logging.basicConfig(filename='spot_detection.log', filemode="w", level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


# Had to check if my directory and the names were correct (yes i checked it two times cuz of trust issues)
# print("Sample files in directory:")
# for f in os.listdir(r"C:\Users\s369577\Desktop\test"):
#     print(f)
#     if f.startswith("out_opt_flow_registered"):
#         break
#threshold = 0.5  # Threshold for intensity to determine presence of a base

spot_summary = []
total_spots_detected = 0  # Initialize spot counter

def find_fovs(data_directory):
    """Scan the data directory to find all unique FOVs."""
    fovs = set()
    for file_name in os.listdir(data_directory):
        if "out_opt_flow_registered" in file_name:
            parts = file_name.split("_")
            # Now find the X and Y identifiers explicitly
            for i, part in enumerate(parts):
                if part.startswith("X") and i + 1 < len(parts) and parts[i+1].startswith("Y"):
                    fov = f"{part}_{parts[i+1]}"
                    fovs.add(fov)
                    #print(fov) just checking if fov is correct
    return list(fovs)

def load_images(fovs, channel_names, rounds, data_directory):
    """Load images for each FOV, channel, and round."""
    fov_images = {}
    for fov in fovs:
        fov_images[fov] = []
        for round_idx in range(rounds):
            round_images = []
            for channel in channel_names:
                file_pattern = f"{data_directory}/out_opt_flow_registered_{fov}_c{round_idx+1:02d}_{channel}.tif"
                file_path = glob.glob(file_pattern)
                if file_path:
                    image = np.array(Image.open(file_path[0]))
                    round_images.append(image)
                else:
                    logging.warning(f"File not found for {file_pattern}")
                    round_images.append(None)
            fov_images[fov].append(round_images)
    return fov_images

def preprocess_images(images):
    """Preprocess images (Gaussian blur and normalization)."""
    processed_images = []
    for round_images in images:
        round_processed_images = []
        for image in round_images:
            if image is not None:
                blurred = filters.gaussian(image, sigma=1)
                normalized = exposure.rescale_intensity(blurred, out_range='uint8')
                round_processed_images.append(normalized)
            else:
                round_processed_images.append(None)
        processed_images.append(round_processed_images)
    return processed_images

def detect_spots(image):
    """Detect spots in an image using Otsu's thresholding."""
    if image is not None:
        threshold_value = filters.threshold_otsu(image)
        binary_image = image > threshold_value
        labeled_image, num_features = measure.label(binary_image, return_num=True)
        #Filter out very small regions (e.g., noise) by area
        spots = [s for s in measure.regionprops(labeled_image) if s.area >= 3]
        return spots
    else:
        return []

def combine_spots(spots_list):
    """Combine spots from all rounds and channels."""
    all_spots = []
    for round_spots in spots_list:
        for channel_spots in round_spots:
            all_spots.extend(channel_spots)
    return all_spots

def visualize_spots(image, spots, output_dir, fov, channel, round_idx = None):
    """Visualize detected spots on an image and save the result."""
    if image is not None:
        plt.imshow(image, cmap='gray')
        for spot in spots:
            y, x = spot.centroid
            plt.plot(x, y, 'ro')
        suffix = f"_round{round_idx+1}" if round_idx is not None else ""
        output_path = f"{output_dir}/{fov}_{channel}{suffix}_spots.png"
        plt.savefig(output_path)
        plt.close()
        logging.info(f"Saved visualization for {fov}_{channel} to {output_path}")


DETECTING THE SPOtS


In [3]:

# Example usage
data_directory = r"C:\Users\s369577\Desktop\F1 CCTB\data\selected-tiles\selected-tiles" #data directory
output_directory = r"C:\Users\s369577\Desktop\F1 CCTB\results\spot_detected"  #output directory
channel_names = ['Atto_425', 'DAPI', 'Alexa_488', 'Alexa_568', 'Alexa_647', 'Atto_490LS']
rounds = 4

# Initialize CSV and record already processed FOVs
csv_path = os.path.join(output_directory, "spot_summary.csv")
processed_fovs = set()
file_exists = os.path.exists(csv_path)
if file_exists:
    with open(csv_path, mode="r", newline="") as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            processed_fovs.add(row["FOV"])

# Step 1: Identify all FOVs
fovs = find_fovs(data_directory)
total_fovs = len(fovs)

# Step 2: Load images for all FOVs
fov_images = load_images(fovs, channel_names, rounds, data_directory)

# Step 3: Process each FOV (skip already processed)
for idx, (fov, images) in enumerate(fov_images.items(), 1):
    if fov in processed_fovs:
        print(f"Skipping already processed FOV: {fov} ({idx}/{total_fovs})")
        continue

    start_time = time.time()
    print(f"Processing FOV: {fov} ({idx}/{total_fovs})")
    logging.info(f"Processing FOV: {fov}")
    processed_images = preprocess_images(images)

    # Detect spots in each round and channel
    unique_coords = set()

    for round_images in processed_images:
        for image in round_images:
            spots = detect_spots(image)
            for spot in spots:
                y, x = map(int, spot.centroid)
                unique_coords.add((y, x)) #store rounded coordinates of spots

    total_spots_detected += len(unique_coords)  # Count unique spots

    coords_output_dir = r"C:\Users\s369577\Desktop\F1 CCTB\results\spot_detected\coords"
    os.makedirs(coords_output_dir, exist_ok=True)
    coord_path = os.path.join(coords_output_dir, f"{fov}_coords.pkl")
    with open(coord_path, 'wb') as f:
        pickle.dump(list(unique_coords), f)

    # Generate summary and spot visualizations per round and channel
    spot_summary = []
    for round_idx, round_images in enumerate(images):
        for channel_idx, image in enumerate(round_images):
            if image is not None:
                channel = channel_names[channel_idx]
                spots = detect_spots(image)
                spot_count = len(spots)
                if spot_count > 0:
                    visualize_spots(image, spots, output_directory, fov, channel, round_idx)
                spot_summary.append([fov, round_idx + 1, channel, spot_count])

    # Append spot data to CSV
    with open(csv_path, mode='a', newline='') as csvfile:
        writer = csv.writer(csvfile)
        if not file_exists:
            writer.writerow(["FOV", "Round", "Channel", "Spot Count"])
        for row in spot_summary:
            writer.writerow(row)

    # Free memory after each FOV
    del processed_images
    del unique_coords
    del spot_summary
    gc.collect()

    print(f"Done: {fov} in {time.time() - start_time:.1f} seconds\n")

# Finish logging
logging.shutdown()
print(f"Total unique spots detected across all FOVs: {total_spots_detected}")


⏩ Skipping already processed FOV: X17_Y0 (1/190)
⏩ Skipping already processed FOV: X12_Y7 (2/190)
⏩ Skipping already processed FOV: X16_Y0 (3/190)
⏩ Skipping already processed FOV: X19_Y11 (4/190)
🔄 Processing FOV: X21_Y4 (5/190)
✅ Done: X21_Y4 in 18.6 seconds

⏩ Skipping already processed FOV: X14_Y1 (6/190)
🔄 Processing FOV: X19_Y14 (7/190)
✅ Done: X19_Y14 in 7.2 seconds

⏩ Skipping already processed FOV: X11_Y1 (8/190)
⏩ Skipping already processed FOV: X13_Y15 (9/190)
⏩ Skipping already processed FOV: X10_Y6 (10/190)
⏩ Skipping already processed FOV: X16_Y15 (11/190)
🔄 Processing FOV: X20_Y5 (12/190)
✅ Done: X20_Y5 in 17.0 seconds

⏩ Skipping already processed FOV: X18_Y3 (13/190)
🔄 Processing FOV: X16_Y3 (14/190)
✅ Done: X16_Y3 in 14.3 seconds

⏩ Skipping already processed FOV: X11_Y3 (15/190)
⏩ Skipping already processed FOV: X19_Y2 (16/190)
🔄 Processing FOV: X17_Y5 (17/190)
✅ Done: X17_Y5 in 17.3 seconds

🔄 Processing FOV: X14_Y5 (18/190)
✅ Done: X14_Y5 in 20.9 seconds

⏩ Skippin

In [10]:
# FAILSAVE TEST
#  Load the first .pkl file manually
path = r"C:\Users\s369577\Desktop\F1 CCTB\results\spot_detected\coords\X10_Y2_coords.pkl"
with open(path, "rb") as f:
    coords = pickle.load(f)

print(coords[:5])  # Preview first 5 spot coordinates
print(f"Total spots: {len(coords)}")

[(206, 217), (194, 917), (306, 304), (677, 742), (698, 536)]
Total spots: 3194


Barcode decoding

In [4]:
# Configuration
data_directory = r"C:\Users\s369577\Desktop\F1 CCTB\data\selected-tiles\selected-tiles"
spot_coord_dir = r"C:\Users\s369577\Desktop\F1 CCTB\results\spot_detected\coords"
output_csv_path = r"C:\Users\s369577\Desktop\F1 CCTB\results\Barcode\barcode.csv"
summary_csv_path = r"C:\Users\s369577\Desktop\F1 CCTB\results\Barcode\barcode_summary.csv"
tagl_path = r"C:\Users\s369577\Desktop\F1 CCTB\data\taglist\taglist.csv"
channel_names = ['Atto_425', 'DAPI', 'Alexa_488', 'Alexa_568', 'Alexa_647', 'Atto_490LS']
rounds = 4

# Barcode base mapping
channel_to_base = {
    'Atto_425': 'A',
    'Alexa_488': 'C',
    'Alexa_568': 'G',
    'Alexa_647': 'T',
}

decoding_channels = list(channel_to_base.keys())

def load_and_preprocess_images(fov, rounds, channel_names):
    processed = []
    for round_idx in range(rounds):
        round_processed = []
        for channel in channel_names:
            pattern = f"{data_directory}/out_opt_flow_registered_{fov}_c{round_idx+1:02d}_{channel}.tif"
            files = glob.glob(pattern)
            if files:
                with Image.open(files[0]) as img:
                    image = np.array(img)
                blurred = filters.gaussian(image, sigma=1)
                normalized = exposure.rescale_intensity(blurred, out_range='uint8')
                round_processed.append(normalized)
            else:
                round_processed.append(None)
        processed.append(round_processed)
    return processed

# Load barcode reference
taglist_df = pd.read_csv(tagl_path)
barcode_to_gene = dict(zip(taglist_df['Code'], taglist_df['Name']))
gene_counter = Counter()

# Start decoding
fovs = find_fovs(data_directory)
with open(output_csv_path, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(["FOV", "Y", "X", "Barcode", "Gene"])

    for fov in fovs:
        processed_images = load_and_preprocess_images(fov, rounds, channel_names)

        # Load saved DAPI-based spot coordinates from pickle
        coord_path = os.path.join(spot_coord_dir, f"{fov}_coords.pkl")
        if not os.path.exists(coord_path):
            continue

        with open(coord_path, 'rb') as f:
            coordinates = pickle.load(f)

        for (y, x) in coordinates:
            base_sequence = []

            intensity_threshold = 0.3  # Adjust this threshold as needed

            for round_idx in range(rounds):
                max_intensity = -np.inf
                best_base = None
                for channel_idx, channel in enumerate(channel_names):
                    if channel not in decoding_channels:
                        continue
                    image = processed_images[round_idx][channel_idx]
                    if image is None or y >= image.shape[0] or x >= image.shape[1]:
                        continue
                    intensity = image[y, x]
                    if intensity > max_intensity and intensity > intensity_threshold:
                        max_intensity = intensity
                        best_base = channel_to_base[channel]
                base_sequence.append(best_base)

            if None in base_sequence:
                continue

            barcode = ''.join(base_sequence)
            gene = barcode_to_gene.get(barcode, "invalid")
            gene_counter[gene] += 1
            writer.writerow([fov, y, x, barcode, gene])

# Write gene summary
with open(summary_csv_path, 'w', newline='') as summary_file:
    writer = csv.writer(summary_file)
    writer.writerow(['Gene', 'Count'])
    for gene, count in gene_counter.most_common():
        writer.writerow([gene, count])


In [10]:
barcode_csv = r"C:\Users\s369577\Desktop\F1 CCTB\results\Barcode\barcode.csv"
output_path = r"C:\Users\s369577\Desktop\F1 CCTB\results\Barcode\barcode_summary.csv"

df = pd.read_csv(barcode_csv)

df_filtered = df[df['Gene'] != 'invalid']

summary = df_filtered.groupby(["FOV", "Gene"]).size().reset_index(name='Count')

summary.to_csv(output_path, index=False)

print(f" Saved FOV-Gene summary to: {output_path}")

 Saved FOV-Gene summary to: C:\Users\s369577\Desktop\F1 CCTB\results\Barcode\barcode_summary.csv


Napari

In [6]:
# Configuration
fov = "X10_Y2"  # Change to desired FOV
data_dir = r"C:\Users\s369577\Desktop\F1 CCTB\data\selected-tiles\selected-tiles"
coords_dir = r"C:\Users\s369577\Desktop\F1 CCTB\results\spot_detected\coords"
barcode_csv = r"C:\Users\s369577\Desktop\F1 CCTB\results\Barcode\barcode.csv"
channels = ['Atto_425', 'Alexa_488', 'Alexa_568', 'Alexa_647']
channel_colors = {
    'Atto_425': 'magenta',
    'Alexa_488': 'yellow',
    'Alexa_568': 'red',
    'Alexa_647': 'green',
}

# Load all 4 rounds for each channel and stack them
layers = []
for channel in channels:
    for round_idx in range(4):  # 0-based index for 4 rounds
        pattern = os.path.join(data_dir, f"out_opt_flow_registered_{fov}_c{round_idx+1:02d}_{channel}.tif")
        if os.path.exists(pattern):
            image = np.array(Image.open(pattern))
            name = f"{channel}_R{round_idx+1}"
            color = channel_colors[channel]
            layers.append((image, name, color))
        else:
            print(f"Image missing: {pattern}")

# Load decoded coordinates and metadata
coords = []
barcodes = []
genes = []

if os.path.exists(barcode_csv):
    import pandas as pd
    df = pd.read_csv(barcode_csv)
    df_fov = df[df['FOV'] == fov]
    coords = df_fov[['Y', 'X']].to_numpy()
    barcodes = df_fov['Barcode'].tolist()
    genes = df_fov['Gene'].tolist()
else:
    print(f" Barcode CSV not found at: {barcode_csv}")

# Start napari viewer
viewer = napari.Viewer()

# Add round & channel images
for img, name, color in layers:
    viewer.add_image(img, name=name, colormap=color, blending='additive')

# Add predicted points if available
if len(coords) > 0:
    points_layer = viewer.add_points(
        coords,
        size=5,
        face_color='white',
        name='decoded_spots',
        properties={'barcode': barcodes, 'gene': genes}
    )
    # Set edge_color separately
    points_layer.edge_color = 'black'

napari.run()

FOR ALL FOVS
(DOch nicht gemacht da sorgen um den Rechner hier)

In [None]:

# # Configuration
# data_dir = r"C:\Users\s369577\Desktop\F1 CCTB\data\selected-tiles\selected-tiles"
# coords_dir = r"C:\Users\s369577\Desktop\F1 CCTB\results\spot_detected\coords"
# barcode_csv = r"C:\Users\s369577\Desktop\F1 CCTB\results\Barcode\barcode.csv"
# channels = ['Atto_425', 'Alexa_488', 'Alexa_568', 'Alexa_647']
# channel_colors = {
#     'Atto_425': 'magenta',
#     'Alexa_488': 'yellow',
#     'Alexa_568': 'red',
#     'Alexa_647': 'green',
# }

# # Load FOV list
# def find_fovs(directory):
#     fovs = set()
#     for name in os.listdir(directory):
#         if "out_opt_flow_registered" in name:
#             parts = name.split("_")
#             for i, part in enumerate(parts):
#                 if part.startswith("X") and i + 1 < len(parts) and parts[i + 1].startswith("Y"):
#                     fovs.add(f"{part}_{parts[i + 1]}")
#     return sorted(fovs)

# fovs = find_fovs(data_dir)
# df = pd.read_csv(barcode_csv)

# # Loop through each FOV
# for fov in fovs:
#     viewer = napari.Viewer()
#     layers = []

#     # Load images
#     for channel in channels:
#         for round_idx in range(4):
#             path = os.path.join(data_dir, f"out_opt_flow_registered_{fov}_c{round_idx+1:02d}_{channel}.tif")
#             if os.path.exists(path):
#                 img = np.array(Image.open(path))
#                 name = f"{channel}_R{round_idx+1}"
#                 color = channel_colors[channel]
#                 viewer.add_image(img, name=name, colormap=color, blending='additive')
#             else:
#                 print(f"Missing: {path}")

#     # Load decoded spots for this FOV
#     df_fov = df[df["FOV"] == fov]
#     if not df_fov.empty:
#         coords = df_fov[['Y', 'X']].to_numpy()
#         barcodes = df_fov['Barcode'].tolist()
#         genes = df_fov['Gene'].tolist()

#         points_layer = viewer.add_points(
#             coords,
#             size=5,
#             face_color='white',
#             name='decoded_spots',
#             properties={'barcode': barcodes, 'gene': genes}
#         )
#         points_layer.edge_color = 'black'

#     print(f"Viewing {fov}. Close Napari window to proceed to the next.")
#     napari.run()