In [1]:
import os
import cv2
import numpy as np
from scipy.optimize import linear_sum_assignment
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import timeit
from scipy import stats

In [2]:

# Video parameters
width, height = 300, 300
num_frames = 100

fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # 'mp4v' for .mp4 format
out = cv2.VideoWriter('data/time_lapse_video/synthetic_video_no_noise_no_bars.mp4', fourcc, 20.0, (width, height))

# Define the large stationary ball parameters
ball_radius = 100
center_x, center_y = 150, 150

# Define the frames for the "blip-out" periods (the ball should not be drawn during these frames)
blip_start_frame1 = 9
blip_end_frame1   = 10

blip_start_frame2 = 19
blip_end_frame2   = 20

blip_start_frame3 = 29
blip_end_frame3   = 30

blip_start_frame4 = 39
blip_end_frame4   = 40


for frame_idx in range(num_frames):
    # Create a blank frame (background)
    frame = np.full((height, width, 3), (0, 0, 0), dtype=np.uint8)

    # Calculate the hue value to cycle from 0 to 360 degrees,
    # then mirror it so that it transitions from 0 to 180 and back to 0.
    hue_value = int((frame_idx / num_frames) * 360) % 180
    #if hue_value > 180:
    #    hue_value = 360 - hue_value

    # Create an HSV color and convert it to BGR
    ball_color_hsv = np.array([[[hue_value, 255, 255]]], dtype=np.uint8)
    ball_color_bgr = cv2.cvtColor(ball_color_hsv, cv2.COLOR_HSV2BGR)[0][0]
    ball_color_tuple = tuple(int(c) for c in ball_color_bgr)

    # Only draw the ball if the current frame is not within any blip-out interval.
    if not ((blip_start_frame1 <= frame_idx < blip_end_frame1) or 
            (blip_start_frame2 <= frame_idx < blip_end_frame2) or 
            (blip_start_frame3 <= frame_idx < blip_end_frame3) or 
            (blip_start_frame4 <= frame_idx < blip_end_frame4)):
        cv2.circle(frame, (center_x, center_y), ball_radius, ball_color_tuple, -1)

    # Generate base random noise for the frame (general noise)
    #base_noise = np.random.normal(0, 0.1, (height, width, 3)).astype(np.uint8)

    # Create horizontal noise bars
    #bar_height = np.random.randint(10, 20)  # Height of each noise band
    #shift_speed = 3  # Vertical shift speed (pixels per frame)
    #shift_position = (frame_idx * shift_speed) % height  # Calculate shifting position

    # Generate bar noise with stronger intensity
    #bar_noise = np.zeros((height, width, 3), dtype=np.uint8)
    #for y in range(shift_position, height, bar_height * 2):  # Skip space between bars
    #    bar_noise_end = min(y + bar_height, height)
    #    bar_noise[y:bar_noise_end] = np.random.normal(0, 10, (bar_noise_end - y, width, 3)).astype(np.uint8)

    # Combine base noise with bar noise and add to the frame
    #combined_noise = cv2.add(base_noise, bar_noise)
    #frame = cv2.add(frame, combined_noise)

    # Write the frame to the video file
    out.write(frame)

# Release the video writer object and clean up
out.release()
cv2.destroyAllWindows()


In [3]:
def flatten_histogram(image, num_bins, htype='hue'):
    
    if htype == 'color':
        """
        Construct a flattened 1D histogram for a color image by concatenating the histograms
        of each RGB channel.
        """
        if len(image.shape) == 3:  # Color image (RGB)
            channels = cv2.split(image)
            histograms = [np.histogram(chan, bins=num_bins, range=(0, 256))[0] for chan in channels]
            histogram = np.concatenate(histograms)
        else:  # Grayscale image
            histogram, _ = np.histogram(image, bins=num_bins, range=(0, 256))
        
        return histogram
    
    elif htype == 'hue':
        """
        Construct a 1D hue histogram by converting the image to HSV and calculating the
        histogram of the hue channel.
        """
        hsv_image = cv2.cvtColor(image, cv2.COLOR_RGB2HSV)
        hue_channel = hsv_image[:, :, 0]  # Hue ranges from 0 to 179 in OpenCV

        histogram, _ = np.histogram(hue_channel, bins=num_bins, range=(0, 180))
        return histogram
    
    elif htype == 'intensity':
        """
        Construct a 1D intensity histogram by converting the image to grayscale.
        """
        grayscale_image = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) if len(image.shape) == 3 else image
        histogram, _ = np.histogram(grayscale_image, bins=num_bins, range=(0, 256))
        return histogram

    else:
        raise ValueError(f"Unsupported histogram type: {htype}")
        
        
        

In [4]:

def S_from_bins(f, min_, max_):
    
    T = 0
    i_ls = list(range(min_, max_ + 1))
    for i, ival in enumerate(i_ls):
        T += ival * f[i]
        
    T = T/sum(f)
    S = (max_ - T)/(max_ - min_)
    return S


def DS_from_bins(f1, f2, min1, max1, min2, max2):

    T1 = 0
    i_ls = list(range(min1, max1 + 1))
    for i, ix in enumerate(i_ls):
        T1 += ix * f1[i]
        
    T1 = T1/sum(f1)
    S1 = (max1 - T1)/(max1 - min1)
    
    
    T2 = 0
    i_ls = list(range(min2, max2 + 1))
    for i, ix in enumerate(i_ls):
        T2 += ix * f2[i]
        
    T2 = T2/sum(f2)
    S2 = (max2 - T2)/(max2 - min2)
    
    return S1 - S2


In [6]:
import timeit

typ = 'intensity'

def process_video(video_path, num_bins, return_frames):
    
    video = cv2.VideoCapture(video_path)
    total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))

    metrics = {'DS': [], 'S': [], 'WD': []}
    frame_indices = []
    first_frame = None
    max_ds_value = None
    max_ds_frame = None

    start_index = 0
    end_index = total_frames

    reference_hist = None
    for frame_count in range(total_frames):
        success, frame = video.read()
        if not success:
            break
        if frame_count < start_index or frame_count >= end_index:
            continue
        if frame_count == start_index:
            reference_hist = flatten_histogram(frame, num_bins, typ)
            first_frame = frame
        
        frame_hist = flatten_histogram(frame, num_bins, typ)
        
        # Expand histograms into sample data
        x1 = np.repeat(np.arange(len(frame_hist)), frame_hist)
        x2 = np.repeat(np.arange(len(reference_hist)), reference_hist)
        
        bin_positions = np.arange(1, len(frame_hist) + 1)  # Positions of bins (1-based)
        WD_v = stats.wasserstein_distance(bin_positions, bin_positions, u_weights=frame_hist, v_weights=reference_hist)
        
        DS_v = DS_from_bins(frame_hist, reference_hist, 0, len(frame_hist)-1, 0, len(reference_hist)-1)
        S_v = S_from_bins(frame_hist, 0, len(frame_hist)-1)
        
        metrics['DS'].append(DS_v)
        metrics['S'].append(S_v)
        metrics['WD'].append(WD_v)
                            
        ds_value = metrics['DS'][-1]
        if max_ds_value is None or abs(ds_value) > abs(max_ds_value):
            max_ds_value = ds_value
            max_ds_frame = frame
        
        frame_indices.append(frame_count)

    video.release()
    if return_frames:
        return metrics, frame_indices, first_frame, max_ds_frame, max_ds_value
    else:
        return metrics, frame_indices

    

def make_fig(inputs, lab, first_frame, max_ds_frame, max_ds_value):
    
    fs = 20
    
    fig = plt.figure(figsize=(10, 8))
    
    gs = gridspec.GridSpec(nrows=3, ncols=1, height_ratios=[0.2, 0.4, 0.4])
    
    # ---------------------
    # Row 0: Frame images
    # ---------------------
    # Define the time points at which to grab frames.
    
    time_points = [0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 99]
    
    video_path = f'data/time_lapse_video/{lab}.mp4'
    video = cv2.VideoCapture(video_path)
    frames_at_time_points = []
    for point in time_points:
        video.set(cv2.CAP_PROP_POS_FRAMES, point)
        success, frame = video.read()
        if success:
            frames_at_time_points.append(frame)
    video.release()
    
    resized_frames = [cv2.resize(frame, (80, 80)) for frame in frames_at_time_points]
    
    # Create a sub-Gridspec in row 0 with one row and as many columns as frames.
    gs_images = gs[0].subgridspec(1, len(resized_frames))
    for i, frame in enumerate(resized_frames):
        ax_img = fig.add_subplot(gs_images[0, i])
        ax_img.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        ax_img.set_title(f'{time_points[i]}', fontsize=fs-4)
        ax_img.axis('off')
    
    wd = inputs[2][0]
    s = inputs[0][0]
    sd = inputs[1][0]
    sd_abs = np.abs(sd)
    frames = inputs[1][1]
    
    # ---------------------
    # Row 1: Top plot
    # ---------------------
    ax = fig.add_subplot(gs[1, 0])
    ax.xaxis.tick_top()
    ax.xaxis.set_label_position('top')
    
    ax.plot(frames, wd, color='k')
    ax.set_ylabel('Wasserstein Distance', fontsize=fs-2, c='k')
    ax.tick_params(axis='both', labelsize=12)
    ax_twin = ax.twinx()
    ax_twin.plot(frames, sd_abs, color='k')
    ax_twin.set_ylabel(r"$|Δ\mathcal{S}|$", fontsize=fs, c='k')
    ax_twin.tick_params(axis='both', labelsize=12)
    ax_twin.set_xlim(-4, 104)    
    
    
    # ---------------------
    # Row 2: Bottom plot
    # ---------------------
    
    ax = fig.add_subplot(gs[2, 0])
    
    ax.plot(frames, sd, color='k')
    ax.set_ylabel(r"Δ$\mathcal{S}$", fontsize=fs, c='k')
    ax.tick_params(axis='both', labelsize=12)
    ax_twin = ax.twinx()
    ax_twin.plot(frames, s, color='k')
    ax_twin.set_ylabel(r"$\mathcal{S}$", fontsize=fs, c='k')
    ax_twin.tick_params(axis='both', labelsize=12)
    ax_twin.set_xlim(-4, 104)
    ax.set_xlabel('Frame', fontsize=fs)
    
    fig.patch.set_facecolor('white')
    plt.tight_layout()
    plt.subplots_adjust(hspace=0.15)
    plt.savefig(f'Final_Figs/manuscript/Fig6_{lab}_'+typ+'.pdf', bbox_inches='tight', format='pdf', dpi=600)
    plt.savefig(f'Final_Figs/manuscript/Fig6_{lab}_'+typ+'.png', bbox_inches='tight', dpi=600)
    
    plt.close()

    

# ---------------------
# Run the analysis for each video
# ---------------------

#lists = [['synthetic_video_with_noise_and_bars']]
#lists = [['synthetic_video_no_noise_no_bars_no_spikes']]
lists = [['synthetic_video_no_noise_no_bars']]


for ls in lists:
    lab = ls[0]
    video_path = f'data/time_lapse_video/{lab}.mp4'
    if os.path.exists(f'Final_Figs/from_image_analysis/{lab}_enhanced.pdf'):
        print('Already exists:', lab)
    else:
        print(f'Processing {lab}...')
    
    num_bins = 256
    metrics, frame_indices, first_frame, max_ds_frame, max_ds_value = process_video(
        video_path, num_bins, return_frames=True
    )
    
    inputs = [
        [metrics['S'], frame_indices, 'S'],
        [metrics['DS'], frame_indices, 'DS'],
        [metrics['WD'], frame_indices, 'WD'],
    ]
    
    make_fig(inputs, lab, first_frame, max_ds_frame, max_ds_value)



Processing synthetic_video_no_noise_no_bars...
