In [53]:
import numpy as np
import cv2
from skimage.feature import blob_log
from scipy.spatial import distance
import os
from tqdm.notebook import  tqdm
from multiprocessing import Pool


In [6]:

PROJECT_PATH = 'C:/Users/amityu/DataspellProjects/gel_sheets/'
#DATA_PATH = 'C:/Users/amityu/Gel_Sheet_Data/'
DATA_PATH = r'D:\amityu\backoffice_data\\' 
movie ='130721_CCA60_RAW'
ADD_PATH = os.path.join(PROJECT_PATH, "add_data/")
MOVIE_PATH = DATA_PATH +  movie + '/'
GRAPH_PATH = 'C:/Users/amityu/Gel_Sheet_Graph/'

In [7]:
image_sequence = np.load(MOVIE_PATH + 'np/height.npy')

In [8]:
def preprocess_image(gray_image):
    normalized_image = cv2.normalize(gray_image, None, 0, 255, cv2.NORM_MINMAX)
    blurred_image = cv2.GaussianBlur(normalized_image, (5, 5), 0)
    return blurred_image

def detect_blobs(image):
    blobs = blob_log(image, max_sigma=40, num_sigma=10, threshold=0.1)
    # Compute the radii in the 3rd column.
    blobs[:, 2] = blobs[:, 2] * np.sqrt(2)
    return blobs



In [9]:
tracks = {}
next_track_id = 0


In [10]:
def track_blobs(previous_blobs, current_blobs, max_distance=20):
    global next_track_id
    assigned_tracks = set()
    # Add an extra column to current_blobs for track_id
    current_blobs = np.hstack((current_blobs, np.zeros((current_blobs.shape[0], 1))))
    for i, current_blob in enumerate(current_blobs):
        min_dist = float('inf')
        closest_blob_idx = -1
        for j, previous_blob in enumerate(previous_blobs):
            dist = np.linalg.norm(current_blob[:2] - previous_blob[:2])
            if dist < min_dist and dist < max_distance:
                min_dist = dist
                closest_blob_idx = j
        if min_dist < max_distance:
            # Assign to existing track
            track_id = previous_blobs[closest_blob_idx][3]
            current_blobs[i, 3] = track_id  # Assign track_id to the 4th element
            assigned_tracks.add(track_id)
        else:
            # Create new track
            current_blobs[i, 3] = next_track_id
            next_track_id += 1
    return current_blobs


In [17]:
image_sequence = image_sequence[:5]

In [11]:
previous_blobs = None

for image in tqdm(image_sequence):
    # Preprocess the image
    preprocessed_image = preprocess_image(image)

    # Detect blobs
    blobs = detect_blobs(preprocessed_image)

    # Initialize track IDs for the first frame
    if previous_blobs is None:
        blobs = np.hstack((blobs, np.arange(len(blobs)).reshape(-1, 1)))
        next_track_id = len(blobs)
    else:
        # Track blobs
        blobs = track_blobs(previous_blobs, blobs)

    # Update tracks
    for blob in blobs:
        track_id = int(blob[3])
        position = blob[:2]
        # Update your tracks data structure accordingly
        if track_id in tracks:
            tracks[track_id].append(position)
        else:
            tracks[track_id] = [position]

    previous_blobs = blobs.copy()
    visualize_tracking(image.copy(), blobs)

cv2.destroyAllWindows()


  0%|          | 0/62 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [55]:
def get_blobs(_image):
    preprocessed_image = preprocess_image(_image)
    # Detect blobs
    blobs = detect_blobs(preprocessed_image)
    return preprocessed_image, blobs

In [37]:
def select_random_blobs(_blobs, num_blobs=30):
    num_detected_blobs = len(_blobs)
    if num_detected_blobs <= num_blobs:
        # If there are fewer than or equal to 30 blobs, return all
        return _blobs
    else:
        # Randomly select 30 indices without replacement
        indices = np.random.choice(num_detected_blobs, num_blobs, replace=False)
        selected_blobs = _blobs[indices]
        return selected_blobs


In [None]:





if __name__ == '__main__':
    with Pool() as p:
        result = p.map(get_blobs, image_sequence[:6])

In [48]:
def visualize_tracking(image, blobs):
    image_with_tracking = normalize_image_for_display(image)

    # Create a copy of the original image to draw on

    for blob in blobs:
        y, x, r = blob[:3]  # Ensure you're only taking the first three elements
        cv2.circle(image_with_tracking, (int(x), int(y)), int(r), (0, 255, 0), 2)
        # If you have track_id, you can include it as well
        # y, x, r, track_id = blob
        # cv2.putText(image_with_tracking, f'ID: {int(track_id)}', (int(x), int(y)), 
        #             cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)


    # Show the image with tracking overlays
    cv2.imshow('Tracking', image_with_tracking)

    cv2.waitKey(1)  # Adjust the waitKey as needed


In [51]:
visualize_tracking(preprocessed_image, select_random_blobs(blobs,30))

In [50]:
cv2.destroyAllWindows()


In [30]:
def normalize_image_for_display(image):
    # Normalize the image to [0, 255]
    image_normalized = cv2.normalize(image, None, 0, 255, cv2.NORM_MINMAX)
    # Convert to uint8
    image_uint8 = image_normalized.astype(np.uint8)
    return image_uint8