In [None]:
import numpy as np
from scipy.optimize import linear_sum_assignment
import os

# Define a function to calculate the distance between bubbles
def bubble_distance(b1, b2):
    # Calculate the Euclidean distance between the centers of the bubbles
    dist = np.sqrt((b1[0]-b2[0])**2 + (b1[1]-b2[1])**2)
    # Calculate the difference in size between the bubbles
    size_diff = np.abs(b1[2] - b2[2])
    # Calculate the difference in velocity between the bubbles
    vel_diff = np.abs(b1[3] - b2[3])
    # Return the weighted distance
    return dist + size_diff + vel_diff

# Define a function to track bubbles across frames
def track_bubbles(folder_path):
    # Load the bubbles from the first frame
    frame0 = np.load(folder_path + "/frame0.npy")
    # Initialize an empty list to store the tracks
    tracks = []
    # Loop over the remaining frames
    for i in range(1, len(os.listdir(folder_path))):
        # Load the bubbles from the current frame
        frame1 = np.load(folder_path + "/frame{}.npy".format(i))
        # Initialize an empty distance matrix
        dist_matrix = np.zeros((len(frame0), len(frame1)))
        # Fill the distance matrix with pairwise distances
        for j in range(len(frame0)):
            for k in range(len(frame1)):
                dist_matrix[j,k] = bubble_distance(frame0[j], frame1[k])
        # Use the Hungarian algorithm to assign bubbles to tracks
        row_ind, col_ind = linear_sum_assignment(dist_matrix)
        # Create a list to store the new tracks
        new_tracks = []
        # Loop over the assignments
        for j, k in zip(row_ind, col_ind):
            # Check if the distance is below a threshold
            if dist_matrix[j,k] < 10:
                # Add the bubble to the track
                new_tracks.append(tracks[j] + [frame1[k]])
        # Add any unassigned bubbles as new tracks
        for k in range(len(frame1)):
            if k not in col_ind:
                new_tracks.append([frame1[k]])
        # Update the tracks
        tracks = new_tracks
        # Update the reference frame
        frame0 = frame1
    # Return the final tracks
    return tracks


In [None]:
import cv2
max_frames = 180
bubble_size_thresh = 5
bubble_vel_thresh = 10 
def extract_bubbles(image):
    # Convert to grayscale
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    
    # Apply median blur
    blur = cv2.medianBlur(gray, 5)
    
    # Threshold image
    _, thresh = cv2.threshold(blur, 0, 255, cv2.THRESH_BINARY+cv2.THRESH_OTSU)
    
    # Find contours
    contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    
    # Filter contours by size
    bubbles = []
    for i, contour in enumerate(contours):
        area = cv2.contourArea(contour)
        if area > bubble_size_thresh:
            M = cv2.moments(contour)
            if M["m00"] != 0:
                x = int(M["m10"] / M["m00"])
                y = int(M["m01"] / M["m00"])
                size = np.sqrt(area)
                bubbles.append(Bubble(i, x, y, size))
    
    return bubbles