<a href="https://colab.research.google.com/github/56sarager/Colabs/blob/main/Laser_Tweezer_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Brownain Motion Analysis
Google Drive link: https://drive.google.com/drive/folders/1r6tCrijdmC0SwljzmJbzgc6kOWLlm8AE?usp=sharing <br>
To use: upload Brownian zip file from provided Google Drive and run all the cells in the drop down. For each of the input videos a Plotly plot of the squared displacement of detected particles will be generated with zoom, on hover, and trace selection functionality enabled. A similar plot will be generated in Matplotlib and saved for use in lab reports. All data from the analysis is saved to a csv file and the input video is returned with annotations specifying the particles' velocities. Sample saved outputs of this cell may be found in the Brownian folder of the Google Drive.<br>
Methodology: Break the videos into frames. Take masks of the frames using specified brightness/ z-threshold, minimum particle radius, and maximum particle radius. Track the particles from frame to frame by assuming the particles closest to each other are the same particles from frame to frame. Calculate the center to center displacement of the particles from frame to frame and use this displacement as well as the time interval between frames to calculate the velocity. Square the displacement in order to calculate mean square displacment. Save all of this data to generate the desired outputs stated above.

In [None]:
import zipfile
import os

def unzip_file(zip_path, extract_to):
    os.makedirs(extract_to, exist_ok=True)

    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)

    print(f"Extracted all files to {extract_to}")

zip_file_path = '/content/Brownian.zip'
extraction_directory = 'Brownian'
unzip_file(zip_file_path, extraction_directory)

Extracted all files to Brownian


In [None]:
import numpy as np
import pandas as pd
import cv2
from scipy.spatial import distance
import os
import plotly.graph_objects as go
import matplotlib.pyplot as plt

max_particle_radius = 40
detection_threshold = 50

position_error = 3
time_interval_error = 1/30

lower_red = np.array([0, 30, 30])
upper_red = np.array([10, 255, 255])

px_to_um = 1

def calculate_velocity(prev_position, curr_position, time_interval):
    if time_interval == 0:
        return 0
    dist = distance.euclidean(prev_position, curr_position)
    return (dist * px_to_um) / time_interval

def calculate_velocity_error(distance, time_interval):
    if distance == 0 or time_interval == 0:
        return 0
    pos_error_term = (position_error / distance) ** 2 if distance != 0 else 0
    time_error_term = (time_interval_error / time_interval) ** 2 if time_interval != 0 else 0
    error = np.sqrt(pos_error_term + time_error_term)
    return error

def calculate_msd(positions):
    if len(positions) < 2:
        return 0
    displacements = [(positions[t][0] - positions[t-1][0])**2 + (positions[t][1] - positions[t-1][1])**2
                     for t in range(1, len(positions))]
    return np.mean(displacements) * (px_to_um ** 2)

def calculate_msd_error(distance):
    if distance == 0:
        return 0
    msd_error = (position_error / distance) if distance != 0 else 0
    return msd_error

def plot_msd_with_matplotlib(df, red_tinted_particles, video_duration, total_frames, size, video_name):
    plt.figure(figsize=(10, 6))
    for pid, group in df.groupby('particle_id'):
        label = f"Particle {pid}"
        if pid == 1:
            label += " (Mean)"
        if pid in red_tinted_particles:
            label += " (Red-Tinted)"
        plt.plot(group['frame'] / total_frames * video_duration, group['msd'], label=label)
    plt.title(f"Square Displacement Over Time - {size} Micron Polybeads", fontsize=14)
    plt.xlabel("Time (s)", fontsize=12)
    plt.ylabel("SD (µm²)", fontsize=12)
    plt.legend()
    plt.grid(True)
    plt.tight_layout()

    image_path = video_name + '_msd_plot.png'
    plt.savefig(image_path)
    plt.close()
    print(f"MSD plot saved to {image_path}")

def process_video(video_path, output_video_path):
    size = video_path.split('.')[0].split('_')[-1].split('m')[0]
    cap = cv2.VideoCapture(video_path)
    #.wmv file saved incorrectly. Must define properties
    fps = 30
    total_frames = fps*60
    video_duration = total_frames / fps
    time_interval = 1.0 / fps if fps > 0 else 0
    frame_width = 1920
    frame_height = 1080

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height))

    particle_positions = {}
    msd_per_frame = {}
    data_records = []
    red_tinted_particles = set()

    first_frame = True
    frame_count = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        blurred = cv2.GaussianBlur(gray, (15, 15), 0)
        _, thresh = cv2.threshold(blurred, detection_threshold, 255, cv2.THRESH_BINARY_INV)
        contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

        current_positions = []
        hsv_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

        for contour in contours:
            ((x, y), radius) = cv2.minEnclosingCircle(contour)
            if min_particle_radius < radius < max_particle_radius:
                center = (int(x), int(y))
                current_positions.append(center)

                if first_frame:
                    particle_id = len(particle_positions) + 1
                    particle_positions[particle_id] = [center]
                    msd_per_frame[particle_id] = []
                else:
                    for pid, positions in particle_positions.items():
                        if distance.euclidean(positions[-1], center) < 10:
                            particle_positions[pid].append(center)
                            if len(particle_positions[pid]) > 1:
                                prev_position = particle_positions[pid][-2]
                                velocity = calculate_velocity(prev_position, center, time_interval)
                                velocity_error = calculate_velocity_error(distance.euclidean(prev_position, center), time_interval)
                                msd = calculate_msd(particle_positions[pid])
                                msd_per_frame[pid].append(msd)
                                msd_error = calculate_msd_error(distance.euclidean(prev_position, center))
                                x_int, y_int = int(x), int(y)
                                mask = cv2.inRange(hsv_frame, lower_red, upper_red)
                                if np.any(mask[y_int-5:y_int+5, x_int-5:x_int+5]):
                                    red_tinted_particles.add(pid)
                                label = f"Particle {pid}"
                                if pid == 1:
                                    label += " (Mean)"
                                if pid in red_tinted_particles:
                                    label += " (Red-Tinted)"
                                data_records.append({
                                    'frame': frame_count,
                                    'particle_id': label,
                                    'x': center[0],
                                    'y': center[1],
                                    'velocity': velocity,
                                    'velocity_error': velocity_error,
                                    'msd': msd,
                                    'msd_error': msd_error
                                })

                                cv2.arrowedLine(frame, prev_position, center, (255, 0, 0), 2)
                                cv2.putText(frame, f'ID={pid} V={velocity:.2f}, MSD={msd:.2f}',
                                            (center[0] + 10, center[1] - 10),
                                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 2)

                            cv2.putText(frame, f'{pid}', (center[0] - 10, center[1] - 10),
                                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)

                            x_int, y_int = int(x), int(y)
                            mask = cv2.inRange(hsv_frame, lower_red, upper_red)
                            if np.any(mask[y_int-5:y_int+5, x_int-5:x_int+5]):
                                red_tinted_particles.add(pid)

        out.write(frame)

        frame_count += 1
        first_frame = False

    cap.release()
    out.release()

    df = pd.DataFrame(data_records)

    fig = go.Figure()
    for pid, group in df.groupby('particle_id'):
        label = f"Particle {pid}"
        if pid == 1:
            label += " (Average)"
        if pid in red_tinted_particles:
            label += " (Red-Tinted)"
        fig.add_trace(go.Scatter(x=group['frame'] / total_frames * video_duration, y=group['msd'], mode='lines+markers', name=label))
    fig.update_layout(title=f"Square Displacement Over Time- {size} Micron Polybeads",
                      xaxis_title="Time (s)",
                      yaxis_title="SD (µm²)",
                      template="plotly_dark")
    config = {'scrollZoom': True}
    fig.show(config=config)

    csv_path = video_name + '_analysis.csv'
    df.to_csv(csv_path, index=False)
    plot_msd_with_matplotlib(df, red_tinted_particles, video_duration, total_frames, size, video_name)
    print(f"Data saved to {csv_path}")
    max_value = df["msd_error"].max()
    min_value = df["msd_error"].min()
    average_value = df["msd_error"].mean()

    print(f"Max. MSD uncertainty: {max_value}")
    print(f"Min. MSD uncertainty: {min_value}")
    print(f"Ave. MSD uncertainty: {average_value}")
    print(f"Annotated video saved to {output_video_path}")

path = ['/content/Brownian/Brownian/bm_1m.wmv', '/content/Brownian/Brownian/bm_2m.wmv', '/content/Brownian/Brownian/bm_3m.wmv']
for video_path in path:
    if video_path == '/content/Brownian/Brownian/bm_1m.wmv':
        min_particle_radius = 3
    elif video_path == '/content/Brownian/Brownian/bm_2m.wmv':
        min_particle_radius = 10
    elif video_path == '/content/Brownian/Brownian/bm_3m.wmv':
        min_particle_radius = 15
    video_name, video_extension = os.path.splitext(video_path)
    output_video_path = video_name + '_analysis.mp4'
    process_video(video_path, output_video_path)

MSD plot saved to bm_3m_msd_plot.png
Data saved to bm_3m_analysis.csv
Max. MSD uncertainty: 3.0
Min. MSD uncertainty: 0.0
Ave. MSD uncertainty: 1.7314498595802843
Annotated video saved to bm_3m_analysis.mp4


#Radial Trapping
Google Drive link: https://drive.google.com/drive/folders/1r6tCrijdmC0SwljzmJbzgc6kOWLlm8AE?usp=sharing <br>
To use: upload R Trapping zip file from provided Google Drive and run all the cells in the drop down. For each of the input videos and annotated output video will be generated along with csv files containing velocity and velocity uncertainty data. The final output of this cet of cells is the average velocity and velocity uncertainty for various particle sizes and the calculated Stoke's Drag Force along with the associated uncertainty. It is noted that it is unlikely usable data will be output for particle radii of 1, 3, and 10 microns. The intensity of the focus of the laser is simply to faint to be detected in the 1 micron video and the 10 micron video has particles stuck to the walls of the slide. <br>
Methodology: Methodology same as in Brownian Motion Analysis case, but plots are not desired so that code has been removed. Code is added to manipulate the dataframes and achieve the desired outputs stated above.

In [None]:
import zipfile
import os

def unzip_file(zip_path, extract_to):
    os.makedirs(extract_to, exist_ok=True)

    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)

    print(f"Extracted all files to {extract_to}")

zip_file_path = '/content/R Trapping.zip'
extraction_directory = 'R'
unzip_file(zip_file_path, extraction_directory)

In [None]:
import numpy as np
import pandas as pd
import cv2
from scipy.spatial import distance
import os
import plotly.graph_objects as go

min_particle_radius = 5
max_particle_radius = 40
detection_threshold = 50

lower_red = np.array([0, 30, 30])
upper_red = np.array([10, 255, 255])

px_to_um = 1

def calculate_velocity(prev_position, curr_position, time_interval):
    if time_interval == 0:
        return 0
    dist = distance.euclidean(prev_position, curr_position)
    return (dist * px_to_um) / time_interval

def calculate_velocity_error(distance, time_interval):
    if distance == 0 or time_interval == 0:
        return 0
    pos_error_term = (position_error / distance) ** 2 if distance != 0 else 0
    time_error_term = (time_interval_error / time_interval) ** 2 if time_interval != 0 else 0
    error = np.sqrt(pos_error_term + time_error_term)
    return error

def process_video(video_path, output_video_path):
    size = video_path.split('.')[0].split('_')[-1].split('m')[0]
    cap = cv2.VideoCapture(video_path)
    fps = 30
    total_frames = fps*vd
    video_duration = total_frames / fps
    time_interval = 1.0 / fps if fps > 0 else 0
    frame_width = 1920
    frame_height = 1080

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height))

    particle_positions = {}
    msd_per_frame = {}
    data_records = []
    red_tinted_particles = set()

    first_frame = True
    frame_count = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        blurred = cv2.GaussianBlur(gray, (15, 15), 0)
        _, thresh = cv2.threshold(blurred, detection_threshold, 255, cv2.THRESH_BINARY_INV)
        contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

        current_positions = []
        hsv_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

        for contour in contours:
            ((x, y), radius) = cv2.minEnclosingCircle(contour)
            if min_particle_radius < radius < max_particle_radius:
                center = (int(x), int(y))
                current_positions.append(center)

                if first_frame:
                    particle_id = len(particle_positions) + 1
                    particle_positions[particle_id] = [center]
                else:
                    for pid, positions in particle_positions.items():
                        if distance.euclidean(positions[-1], center) < 10:
                            particle_positions[pid].append(center)
                            if len(particle_positions[pid]) > 1:
                                prev_position = particle_positions[pid][-2]
                                velocity = calculate_velocity(prev_position, center, time_interval)
                                velocity_error = calculate_velocity_error(distance.euclidean(prev_position, center), time_interval)
                                x_int, y_int = int(x), int(y)
                                mask = cv2.inRange(hsv_frame, lower_red, upper_red)
                                if np.any(mask[y_int-5:y_int+5, x_int-5:x_int+5]):
                                    red_tinted_particles.add(pid)
                                label = f"Particle {pid}"
                                if pid == 1:
                                    label += " (Average)"
                                if pid in red_tinted_particles:
                                    label += " (Red-Tinted)"
                                data_records.append({
                                    'frame': frame_count,
                                    'particle_id': label,
                                    'x': center[0],
                                    'y': center[1],
                                    'velocity': velocity,
                                    'velocity_error': velocity_error
                                })

                                cv2.arrowedLine(frame, prev_position, center, (255, 0, 0), 2)
                                cv2.putText(frame, f'ID={pid} V={velocity:.2f}',
                                            (center[0] + 10, center[1] - 10),
                                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 2)

                            cv2.putText(frame, f'{pid}', (center[0] - 10, center[1] - 10),
                                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)

                            x_int, y_int = int(x), int(y)
                            mask = cv2.inRange(hsv_frame, lower_red, upper_red)
                            if np.any(mask[y_int-5:y_int+5, x_int-5:x_int+5]):
                                red_tinted_particles.add(pid)

        out.write(frame)

        frame_count += 1
        first_frame = False

    cap.release()
    out.release()

    df = pd.DataFrame(data_records)

    csv_path = video_name + '_analysis.csv'
    df.to_csv(csv_path, index=False)
    print(f"Data saved to {csv_path}")
    print(f"Annotated video saved to {output_video_path}")

path = ['/R/R/R(1)_1m.wmv','/R/R/R_1m.wmv', '/R/R/R_2m.wmv', '/R/R/R_3m.wmv', '/R/R/R(2)_6m.avi', '/R/R/R_6m.avi', '/R/R/R_10m.wmv']
for video_path in path:
    if video_path == 'R(1)_1m.wmv':
        vd = 120
    if video_path == 'R_1m.wmv':
        vd = 120
    if video_path == 'R_2m.wmv':
        vd = 120
    if video_path == 'R_3m.wmv':
        vd = 60
    if video_path == 'R_6m.avi':
        vd = 50
    if video_path == 'R(2)_6m.avi':
        vd = 50
    if video_path == 'R_10m.wmv':
        vd = 60
    video_name, video_extension = os.path.splitext(video_path)
    output_video_path = video_name + '_analysis.mp4'
    process_video(video_path, output_video_path)

Data saved to R(1)_1m_analysis.csv
Annotated video saved to R(1)_1m_analysis.mp4
Data saved to R_1m_analysis.csv
Annotated video saved to R_1m_analysis.mp4
Data saved to R_2m_analysis.csv
Annotated video saved to R_2m_analysis.mp4
Data saved to R_3m_analysis.csv
Annotated video saved to R_3m_analysis.mp4
Data saved to R(2)_6m_analysis.csv
Annotated video saved to R(2)_6m_analysis.mp4
Data saved to R_6m_analysis.csv
Annotated video saved to R_6m_analysis.mp4
Data saved to R_10m_analysis.csv
Annotated video saved to R_10m_analysis.mp4


In [None]:
df = pd.read_csv('/content/R(2)_6m_analysis.csv')
red_tint_df = df[df['particle_id'].str.contains('Red-Tint', na=False)]
base_string = "/content/R_{}m_analysis.csv"
numbers = [1, 2, 3, 6, 10]
paths = [base_string.format(num) for num in numbers]
for path in paths:
    df = pd.read_csv(path)
    df2 = df[df['particle_id'].str.contains('Red-Tint', na=False)]
    df2['video_path'] = path
    red_tint_df = pd.concat([red_tint_df, df2], ignore_index=True)
df = pd.read_csv('/content/R(2)_6m_analysis.csv')
frame_ranges = [(23,27)]*30
red_tint_df = df[df['particle_id'].str.contains('Red-Tint', na=False)]
red_tint_df[red_tint_df['frame'].apply(lambda x: any(start <= x <= end for start, end in frame_ranges))]
red_tint_df['video_path'] = path

base_string = "/content/R_{}m_analysis.csv"
numbers = [1, 2, 3, 6, 10]
paths = [base_string.format(num) for num in numbers]

for path in paths:
    if path == '/content/R_1m_analysis.csv':
        frame_ranges = [(75,79)*30]
    if path == '/content/R_2m_analysis.csv':
        frame_ranges = [(30,47), (20,27), (10,17), (55,60)]*30
    if path == '/content/R_3m_analysis.csv':
        frame_ranges = [(23,27),(27,36),(38,43),(50,54)]*30
    if path == '/content/R_6m_analysis.csv':
        frame_ranges = [(39,45)]*30
    if path == '/content/R_10m_analysis.csv':
        frame_ranges = [(12,22)]*30
    df = pd.read_csv(path)
    df2 = df[df['particle_id'].str.contains('Red-Tint', na=False)]
    df2 = df2[df2['frame'].apply(lambda x: any(start <= x <= end for start, end in frame_ranges))]
    df2['video_path'] = path
    red_tint_df = pd.concat([red_tint_df, df2], ignore_index=True)
velocity_averages = red_tint_df.groupby('video_path')['velocity'].mean().reset_index()
velocity_error = red_tint_df.groupby('video_path')['velocity_error'].mean().reset_index()
eta = 10**(-3) #Ns/m^2
def extract_r_from_path(path):
    import re
    match = re.search(r'_(\d+)m', path)
    return int(match.group(1)) if match else None
velocity_averages['r'] = velocity_averages['video_path'].apply(extract_r_from_path)
velocity_averages['velocity_error'] = velocity_error['velocity_error']
velocity_averages['stokes_drag'] = 6 * np.pi * eta * velocity_averages['r'] * velocity_averages['velocity']*10**(-12)
velocity_averages['stokes_drag_error'] = 6 * np.pi * eta * velocity_averages['r'] * velocity_error['velocity_error']*10**(-12)
print(velocity_averages)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  red_tint_df['video_path'] = path
