This is a jupyter notebook designed to run the full cell counting pipeline piece by piece to test each individual section. Full implementation of the pipeline can be found in FinalPipeline.py

In [18]:
import numpy as np
import tifffile
import cv2
import json

import torch
import torch.nn as nn
from torchvision import transforms
from efficientnet_pytorch import EfficientNet
 

class ImageProcessor:
    def __init__(self, model_path='path_to_pretrained_model.pth'):
        # Load the pretrained EfficientNet model
        self.NN = EfficientNet.from_pretrained("efficientnet-b7", advprop=False)

        # Load the state dictionary of the custom model
        #custom_model_state_dict = torch.load(model_path)

        # Replace the classifier in the EfficientNet model with the custom classifier
        #self.NN.classifier.load_state_dict(custom_model_state_dict['classifier'])

        # Freeze the weights of the pre-trained model
        for param in self.NN.parameters():
            param.requires_grad = False

        # Assuming 'transform' is a torchvision.transforms object for preprocessing
        self.transform = transforms.Compose([
            transforms.ToTensor(),  
            transforms.RandomHorizontalFlip(0.4),
            transforms.RandomVerticalFlip(0.4),
            transforms.RandomInvert(0.7),
            transforms.RandomRotation(35),
            transforms.ToTensor(),
        ])

    def process_image(self, image_path):
        # Load the image from the .tif file
        image = tifffile.imread(image_path)

        # Apply your defined function to separate slices
        slices, x_coordinates = self.identify_droplet_segments(image)

        # Initialize an empty list to store cell counts and x coordinates
        results = []

        # Iterate over slices and pass through the pretrained neural network
        for i, slice in enumerate(slices):
            input_tensor = self.transform(slice)
            input_tensor = input_tensor.unsqueeze(0)  # Add batch dimension

            with torch.no_grad():
                output = self.NN(input_tensor)

            cell_count = output.item()

            # Append results to the list
            results.append({"x_coordinate": x_coordinates[i], "cell_count": cell_count})

        return results

    def identify_droplet_segments(self, image):

        # 1. Convert the image to greyscale
        grey_image = np.dot(image[..., :3], [0.299, 0.587, 0.114])

        # 2. Apply filtering
        sobel_filtered_image = self.sobel_filter(grey_image)

        # 3. Apply slice on x to get a list of the slices
        segments_x = self.slice_along_x(sobel_filtered_image)

        #setup segments
        segments = []

        #loop through each x segment to get its y segment.
        for segment_x in segments_x:
            print(segment_x)
            slice_y = grey_image[segment_x[0]:segment_x[1], :]
            mean_x_value = np.mean(segment_x)
            segments_y = self.slice_along_y(slice_y)
            full_segment = grey_image[segment_x[0]:segment_x[1], :][:, segments_y[0]:segments_y[1]]
            segments.append(full_segment)

        return segments, mean_x_value

    def sobel_filter(self, gray_arr):
        # Apply Sobel filter to the image
        sobel_x = cv2.Sobel(gray_arr, cv2.CV_64F, 1, 0, ksize=3)
        sobel_y = cv2.Sobel(gray_arr, cv2.CV_64F, 0, 1, ksize=3)
        mask = np.sqrt(sobel_x**2 + sobel_y**2)
        return mask

    def slice_along_x(self, image, threshold_start=0.01, threshold_end=0.01, min_consecutive_frames=5,
                  min_segment_length=400, max_segment_length=600):

        # Calculate variance along the x-axis
        x_variances = np.mean(image, axis=0)

        # Identify segments where the variance exceeds the threshold for at least min_consecutive_frames frames
        segments = []
        droplet_started = False
        consecutive_below_threshold = 0

        for i, value in enumerate(x_variances):
            if value > threshold_start and not droplet_started:
                droplet_started = True
                start_index = i
                consecutive_below_threshold = 0
            elif value <= threshold_end and droplet_started:
                consecutive_below_threshold += 1
                if consecutive_below_threshold >= min_consecutive_frames:
                    droplet_started = False
                    end_index = i - min_consecutive_frames
                    segment_length = end_index - start_index

                    # Check if the segment length is within the desired range
                    if min_segment_length <= segment_length <= max_segment_length:
                        segments.append((start_index, end_index))

                    consecutive_below_threshold = 0

        # If a droplet continues to the end of the image, consider it
        if droplet_started:
            end_index = len(x_variances) - 1
            segment_length = end_index - start_index

            # Check if the segment length is within the desired range
            if min_segment_length <= segment_length <= max_segment_length:
                segments.append((start_index, end_index))

        # Extract slices based on the identified positions
        slices = [image[:, start:end] for start, end in segments]

        return slices

    def slice_along_y(self, image, y_threshold_start=0.01, y_threshold_end=0.01, min_consecutive_frames = 5):

        # Calculate variance along the x-axis
        y_variances = np.mean(image, axis=1)

        # Identify segments where the variance exceeds the threshold for at least min_consecutive_frames frames
        segments = []
        droplet_started = False
        consecutive_below_threshold = 0

        for i, value in enumerate(y_variances):
            if value > y_threshold_start and not droplet_started:
                droplet_started = True
                start_index = i
                consecutive_below_threshold = 0
            elif value <= y_threshold_end and droplet_started:
                consecutive_below_threshold += 1
                if consecutive_below_threshold >= min_consecutive_frames:
                    droplet_started = False
                    end_index = i - min_consecutive_frames
                    segments.append((start_index, end_index))
                    consecutive_below_threshold = 0

        # Extract slices based on the identified positions
        slices = [image[start:end, :] for start, end in segments]

        return slices

# Example usage:
if __name__ == "__main__":

    inputPath = ""
    outputPath = ""
    DropletPositions = []
    CellCounts = []

    # TO ADD: for loop here, looping through each file in inputPath
    image_path = "../Data/Input/15.tif"
    image_processor = ImageProcessor()
    results = image_processor.process_image(image_path)

    # Append results to Droplet Positions and Cell Counts
    for result in results:
        DropletPositions.append(result["x_coordinate"])
        CellCounts.append(result["cell_count"])

    # After: Create a json file from DropletPositions and Cell Counts, Sequence Number, Frame Number
    output_data = {
        "DropletPositions": DropletPositions,
        "CellCounts": CellCounts,
        # Add other relevant information
    }

    # Save results to a .json file
    with open('output.json', 'w') as json_file:
        json.dump(output_data, json_file)


TiffPage 0: TypeError: read_bytes() missing 3 required positional arguments: 'dtype', 'count', and 'offsetsize'


Loaded pretrained weights for efficientnet-b7


UnboundLocalError: local variable 'mean_x_value' referenced before assignment