In [None]:
# Well Cropper
# generate x-y coordinates of well centers, will be used to crop videos

""" import required libraries """

import os
import math
import numpy as np
import cv2
import sys

""" setup custom dataclasses and functions """

# prints message in console and saves it to logs, if enabled
def report(text):
    print(text)
    log.append(text + '\n')


""" initial setup """

WellConfig = [27, 1, 3]  # well identification binarizing thresholds
ProConfig = [21, 1.5, 3] # well identification binarizing thresholds
Blur_val = 9 # blurring threshold for plate identification

fps = 1 # acquisition frame rate (per minute)
shorten = True # limit processing to first X frames
desired_frames = 360 # process these many seconds
skip_frames = False # skip first X frames before processing the video
timeskip = 360 # amount of frames to skip

harsh = False # onlhy process videos with matching amount of wells found
Radius_censor = 0 # shrink identified wells by X pixels
PlateDim = [8, 12] # multi-well plate dimensions in terms of wells

trouble_image = True # save troubleshooting frames
trouble_reading = False # print a message every time you read a frame

force_well_radius = True # standardize individual well video size
forced_radius = 156

write_indiv_videos = True # produce a zoomed-in video for each well
full_video = False # save original-sized marked videos
create_logs = False # create a .txt log file

log = []
if create_logs:
    log.append('WellConfig: [' + str(WellConfig[0]) + ', ' + str(WellConfig[1]) + ', ' + str(WellConfig[2]) + ']\n')
    log.append('ProConfig: [' + str(ProConfig[0]) + ', ' + str(ProConfig[1]) + ', ' + str(ProConfig[2]) + ']\n')
    log.append('BlurVal: ' + str(Blur_val) + '\n')
    log.append('RadiusCens: ' + str(Radius_censor) + '\n')
    if shorten:
        log.append('ProcessDur: ' + str(desired_frames) + ' sec\n')
    else:
        log.append('ProcessDur: Full\n')
    log.append('\n')

""" execute video processing """
file =  # enter the path to the video file

cap = cv2.VideoCapture(file)
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

path =  # enter the path to the output directory for images that help with troubleshooting

""" find wells from first frame """

ret, frametemp = cap.read()
framer = cv2.cvtColor(frametemp, cv2.COLOR_BGR2GRAY)

bin_adaptive_im = cv2.adaptiveThreshold(framer,
                                        1,
                                        cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
                                        cv2.THRESH_BINARY,
                                        WellConfig[0],
                                        WellConfig[1])

kernel = np.ones((WellConfig[2], WellConfig[2]), np.uint8)
bin_im = bin_adaptive_im
bin_im = cv2.morphologyEx(bin_im, cv2.MORPH_CLOSE, kernel)
bin_im = cv2.morphologyEx(bin_im, cv2.MORPH_OPEN, kernel)

# make plate image
well_im = np.stack([bin_im * 255, bin_im * 255, bin_im * 255], axis = -1)
well_im = cv2.medianBlur(well_im, Blur_val)

well_im = cv2.cvtColor(well_im, cv2.COLOR_BGR2GRAY)
ray_im = well_im.copy()

if trouble_image:
    cv2.imwrite(os.path.join(path, 'trb_processed.jpg'), well_im)

circles = cv2.HoughCircles(well_im,
                           method = cv2.HOUGH_GRADIENT,
                           dp = 1,
                           minDist = 75,
                           param1 = 100,
                           param2 = 30,
                           minRadius = 100,
                           maxRadius = 160)

circles = np.uint16(np.around(circles))

formWells = circles[0, :]  # formatted Plate coordinate list

N_wells = len(formWells) # counts found plates

N_expected = PlateDim[0] * PlateDim[1]
if N_wells != N_expected:
    if harsh:
        message = 'ERROR: ' + str(N_wells) + ' wells identified; ' + str(N_expected) + ' expected'
        report(message)
        cap.release()
        sys.exit()
    else:
        message = 'WARNING: ' + str(N_wells) + ' wells identified'
        report(message)

if trouble_image:
    well_im = cv2.cvtColor(well_im, cv2.COLOR_GRAY2RGB)
    for i in circles[0, :]:
        cv2.circle(well_im, (i[0], i[1]), i[2], (0, 0, 255), 5)
        cv2.circle(well_im, (i[0], i[1]), 2, (255, 0, 0), 25)
    
    cv2.imwrite(os.path.join(path, 'trb_simple_wells.jpg'), well_im)
    ray_im_out = cv2.cvtColor(ray_im, cv2.COLOR_GRAY2RGB)


""" name wells """
wells = formWells

if force_well_radius:
    for well in wells:
        well[2] = forced_radius

Xmin = 6000
Xmax = 0
Ymin = 6000
Ymax = 0
for well in wells:
    Xmin = min(Xmin, well[0])
    Xmax = max(Xmax, well[0])
    Ymin = min(Ymin, well[1])
    Ymax = max(Ymax, well[1])
Xstep = (Xmax - Xmin) / PlateDim[1]
Ystep = (Ymax - Ymin) / PlateDim[0]

namedWells = []
for well in wells:
    if well[0] == Xmin:
        Xc = 1
    else:
        Xc = int((well[0] - Xmin - 1) // Xstep + 1)
    if well[1] == Ymin:
        Yc = 1
    else:
        Yc = int((well[1] - Ymin - 1) // Ystep + 1)
        
    namedWells.append([[Yc, Xc], well])

In [None]:
# Video Test YAML File Generator

import yaml
import os

def generate_yaml_files(output_dir, crop_centers):
    base_config = {
        "model_path": "",                       # enter the path to the model, i.e. the best checkpoint file
        "model": {
            "name": "ResidualUNet3D",
            "in_channels": 3,                  # 3: if you use RGB images; 1: if you use grayscale images
            "out_channels": 2,
            "layer_order": "gcr",
            "f_maps": 64,                      # match the f_maps used in training
            "num_groups": 8,
            "final_sigmoid": False
        },
        "predictor": {
            "name": "VideoPredictor"
        },
        "loaders": {
            "dataset": "VideoDataset",
            "batch_size": 2,
            "num_workers": 8,
            "test": {
                "file_paths": ["/path/to/video/file"],   # enter the path to the video file
                "spatial_crop": 320,
                "frame_crop": 100,               # number of frames that is processed in one batch
                "frame_range": [700000, 900000], # range of frames specified for processing
                "global_normalization": True,
                "transformer": {
                    "raw": [
                        {"name": "Standardize"},
                        {"name": "ToTensor", "expand_dims": True}
                    ]
                }
            }
        }
    }
    
    os.makedirs(output_dir, exist_ok=True)

    for well_data in namedWells:
        well_name = chr(well_data[0][0] + 64) + str(well_data[0][1])
        crop_center = [int(well_data[1][1]), int(well_data[1][0])]  # [y, x] format
        
        config = base_config.copy()
        config["loaders"]["output_dir"] = f"/path/to/output/directory/{well_name}" # enter the path to the output directory for predictions
        config["loaders"]["test"]["crop_center"] = crop_center
        
        yaml_path = os.path.join(output_dir, f"test_config-VideoTest-{well_name}.yml")
        with open(yaml_path, "w") as yaml_file:
            yaml.dump(config, yaml_file, default_flow_style=None, sort_keys=False)


generate_yaml_files("/path/to/save/yaml/files", namedWells)


In [None]:
# Prob Field YAML File Generator

import yaml
import os

def generate_yaml_files(output_dir, crop_centers):
    base_config = {
        "model_path": "",                       # enter the path to the model, i.e. the best checkpoint file
        "model": {
            "name": "ResidualUNet3D",
            "in_channels": 3,                   # 3: if you use RGB images; 1: if you use grayscale images
            "out_channels": 2,
            "layer_order": "gcr",
            "f_maps": 64,                       # match the f_maps used in training
            "num_groups": 8,
            "final_sigmoid": False
        },
        "predictor": {
            "name": "ProbFieldPredictor"
        },
        "loaders": {
            "dataset": "ProbFieldDataset",
            "batch_size": 8,
            "num_workers": 8,
            "test": {
                "frame_crop": 1000,
                "frame_range": [1, 200001],     # range of frames (in predicted videos of cropped wells) specified for processing
                "threshold": 250,               # binarization threshold
                "Eclosion": True
            }
        }
    }
    
    os.makedirs(output_dir, exist_ok=True)

    for well_data in namedWells:
        well_name = chr(well_data[0][0] + 64) + str(well_data[0][1])
        
        config = base_config.copy()
        config["loaders"]["test"]["file_paths"] = [f"/path/to/predicted/videos/{well_name}/predicted_video_names.avi"]  # # enter the path to the predicted videos
        config["loaders"]["output_dir"] = f"/path/to/output/directory/{well_name}/"  # enter the path to the output directory
        yaml_path = os.path.join(output_dir, f"test_config-ProbField-{well_name}.yml")
        with open(yaml_path, "w") as yaml_file:
            yaml.dump(config, yaml_file, default_flow_style=None, sort_keys=False)

generate_yaml_files("/path/to/save/yaml/files", namedWells)
