In [1]:
import tifffile
import numpy as np
from skimage.registration import optical_flow_tvl1
from skimage.transform import warp
from scipy.ndimage import map_coordinates
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import os
import plotly.graph_objects as go
import tqdm

## Load the original image 

In [2]:
# Define the relative path from your script location
relative_path = "MergedmTmG2.tif"

# Get the absolute path based on the current working directory
abs_path = os.path.join(os.getcwd(), relative_path)

# Load the image
MergedmTmG2 = tifffile.imread(abs_path)

# Check the original shape
print(f"Original shape: {MergedmTmG2.shape}")  # Expected: (56, 59, 3, 512, 512)

Original shape: (56, 59, 2, 512, 512)


In [3]:
# # Define the relative file paths
# input_filename = "MergedmTmG2.tif"
# output_filename_ch0 = "MergedmTmG2_Ch0.tif"
# output_filename_ch1 = "MergedmTmG2_Ch1.tif"

# # Get the absolute paths based on the script's working directory
# input_path = os.path.join(os.getcwd(), input_filename)
# output_path_ch0 = os.path.join(os.getcwd(), output_filename_ch0)
# output_path_ch1 = os.path.join(os.getcwd(), output_filename_ch1)

# # Load the image
# whole_image = tifffile.imread(input_path)

# # Split the channels
# channel_0 = whole_image[:, :, 0, :, :]  # First channel
# channel_1 = whole_image[:, :, 1, :, :]  # Second channel

# # Save each channel with metadata
# tifffile.imwrite(output_path_ch0, channel_0, imagej=True, metadata={'axes': 'TZYX'})
# tifffile.imwrite(output_path_ch1, channel_1, imagej=True, metadata={'axes': 'TZYX'})

# print(f"Channel 0 saved at: {output_path_ch0}")
# print(f"Channel 1 saved at: {output_path_ch1}")


In [4]:
# Define the relative paths
filename_ch0 = "MergedmTmG2_Ch0.tif"
filename_ch1 = "MergedmTmG2_Ch1.tif"

# Get the absolute paths based on the script's working directory
path_ch0 = os.path.join(os.getcwd(), filename_ch0)
path_ch1 = os.path.join(os.getcwd(), filename_ch1)

# Load the images
MergedmTmG2_Ch0 = tifffile.imread(path_ch0)
MergedmTmG2_Ch1 = tifffile.imread(path_ch1)

# Check the shape
print(f"MergedmTmG2_Ch0 shape: {MergedmTmG2_Ch0.shape}")  # Expected: (56, 59, 512, 512)
print(f"MergedmTmG2_Ch1 shape: {MergedmTmG2_Ch1.shape}")  # Expected: (56, 59, 512, 512)

MergedmTmG2_Ch0 shape: (56, 59, 512, 512)
MergedmTmG2_Ch1 shape: (56, 59, 512, 512)


In [5]:
print(f"Image stack dimensions (time, z, height, width): {MergedmTmG2_Ch0.shape}")
t, z, height, width = MergedmTmG2_Ch0.shape


Image stack dimensions (time, z, height, width): (56, 59, 512, 512)


## Calculation of 3D optical Flow (Ch0 and Ch1) in whole image

In [6]:
# Function to calculate optical flow for a given channel
def calculate_optical_flow(channel_data, channel_name):
    print(f"Starting Optical Flow Calculation for {channel_name}...")

    # Initialize results lists
    magnitude_results = []
    flow_x_results = []
    flow_y_results = []
    flow_z_results = []

    # Iterate through time steps
    for i in tqdm.tqdm(range(channel_data.shape[0] - 1), desc=f"Calculating Optical Flow ({channel_name})"):
        image0 = channel_data[i].astype(np.float16)
        image1 = channel_data[i + 1].astype(np.float16)

        # Compute optical flow
        flow_z, flow_y, flow_x = optical_flow_tvl1(image0, image1)
        magnitude = np.sqrt(flow_x**2 + flow_y**2 + flow_z**2)

        # Store results
        magnitude_results.append(magnitude)
        flow_x_results.append(flow_x)
        flow_y_results.append(flow_y)
        flow_z_results.append(flow_z)

    print(f"Optical Flow Calculation Completed for {channel_name}.")

    return magnitude_results, flow_x_results, flow_y_results, flow_z_results



In [7]:
# Calculate optical flow for Channel 0
magnitude_ch0, flow_x_ch0, flow_y_ch0, flow_z_ch0 = calculate_optical_flow(MergedmTmG2_Ch0, "Ch0")


Starting Optical Flow Calculation for Ch0...


Calculating Optical Flow (Ch0): 100%|████████| 55/55 [2:51:28<00:00, 187.07s/it]

Optical Flow Calculation Completed for Ch0.





In [8]:
# Calculate optical flow for Channel 1
magnitude_ch1, flow_x_ch1, flow_y_ch1, flow_z_ch1 = calculate_optical_flow(MergedmTmG2_Ch1, "Ch1")


Starting Optical Flow Calculation for Ch1...


Calculating Optical Flow (Ch1): 100%|████████| 55/55 [1:44:39<00:00, 114.18s/it]

Optical Flow Calculation Completed for Ch1.





In [9]:
# Define function to save results
def save_optical_flow_results(magnitude, flow_x, flow_y, flow_z, channel_name):
    print(f"Saving Optical Flow Results for {channel_name}...")

    np.save(os.path.join(output_dir, f"{channel_name}_magnitude.npy"), np.array(magnitude))
    np.save(os.path.join(output_dir, f"{channel_name}_flow_x.npy"), np.array(flow_x))
    np.save(os.path.join(output_dir, f"{channel_name}_flow_y.npy"), np.array(flow_y))
    np.save(os.path.join(output_dir, f"{channel_name}_flow_z.npy"), np.array(flow_z))

    print(f"Results saved in {output_dir}/{channel_name}_*.npy")

# Create a directory for saving results using a relative path
output_dir = "optical_flow_results"
os.makedirs(output_dir, exist_ok=True)  # Ensure directory exists

# Save results for both channels
save_optical_flow_results(magnitude_ch0, flow_x_ch0, flow_y_ch0, flow_z_ch0, "Ch0")
save_optical_flow_results(magnitude_ch1, flow_x_ch1, flow_y_ch1, flow_z_ch1, "Ch1")

print("All results saved successfully!")


Saving Optical Flow Results for Ch0...
Results saved in optical_flow_results/Ch0_*.npy
Saving Optical Flow Results for Ch1...
Results saved in optical_flow_results/Ch1_*.npy
All results saved successfully!


## Load the Optical Flow result 

In [6]:
# Define the relative path for the results directory
results_dir = "optical_flow_results"

# Load results for Channel 0
ch0_magnitude = np.load(os.path.join(results_dir, "Ch0_magnitude.npy"))
ch0_flow_x = np.load(os.path.join(results_dir, "Ch0_flow_x.npy"))
ch0_flow_y = np.load(os.path.join(results_dir, "Ch0_flow_y.npy"))
ch0_flow_z = np.load(os.path.join(results_dir, "Ch0_flow_z.npy"))

# Load results for Channel 1
ch1_magnitude = np.load(os.path.join(results_dir, "Ch1_magnitude.npy"))
ch1_flow_x = np.load(os.path.join(results_dir, "Ch1_flow_x.npy"))
ch1_flow_y = np.load(os.path.join(results_dir, "Ch1_flow_y.npy"))
ch1_flow_z = np.load(os.path.join(results_dir, "Ch1_flow_z.npy"))

print("Results loaded successfully!")

Results loaded successfully!


In [7]:
## Filter the magnitude of both channels

# Define the relative path for results
results_dir = "optical_flow_results"
os.makedirs(results_dir, exist_ok=True)  # Ensure directory exists

# Define threshold values
lower_threshold = 2
upper_threshold = 10

# Function to filter magnitude values
def filter_magnitude(magnitude_data, channel_name):
    print(f"Filtering magnitude for {channel_name}...")

    filtered_magnitude = [
        np.where(
            (m >= lower_threshold) & (m <= upper_threshold),
            m,
            0
        ) for m in magnitude_data
    ]

    # Save filtered magnitudes
    filtered_magnitude_file = os.path.join(results_dir, f"{channel_name}_filtered_magnitude.npy")
    np.save(filtered_magnitude_file, np.array(filtered_magnitude))

    print(f"Filtered magnitude saved at: {filtered_magnitude_file}")

# Apply filtering and saving for both Ch0 and Ch1
filter_magnitude(ch0_magnitude, "Ch0")
filter_magnitude(ch1_magnitude, "Ch1")

print("Filtering complete for both channels.")



Filtering magnitude for Ch0...
Filtered magnitude saved at: optical_flow_results/Ch0_filtered_magnitude.npy
Filtering magnitude for Ch1...
Filtered magnitude saved at: optical_flow_results/Ch1_filtered_magnitude.npy
Filtering complete for both channels.


In [8]:
# Define the relative path for the results directory
results_dir = "optical_flow_results"

# Load results for Channel 0
ch0_filtered_magnitude = np.load(os.path.join(results_dir, "Ch0_filtered_magnitude.npy"))

# Load results for Channel 1
ch1_filtered_magnitude = np.load(os.path.join(results_dir, "Ch1_filtered_magnitude.npy"))

print("Filtered Results loaded successfully!")

Filtered Results loaded successfully!


## Visualization (in napari)

In [None]:
import napari

viewer = napari.Viewer()


viewer.add_image(ch1_flow_z, name="ch1_flow_z")
viewer.add_image(ch1_flow_y, name="ch1_flow_y")
viewer.add_image(ch1_flow_x, name="ch1_flow_x")

viewer.add_image(ch0_flow_z, name="ch0_flow_z")
viewer.add_image(ch0_flow_y, name="ch0_flow_y")
viewer.add_image(ch0_flow_x, name="ch0_flow_x")


viewer.add_image(ch1_magnitude, name="ch1_magnitude")
viewer.add_image(ch0_magnitude, name="ch0_magnitude")

viewer.add_image(ch1_filtered_magnitude, name="ch1_filtered_magnitude")
viewer.add_image(ch0_filtered_magnitude, name="ch0_filtered_magnitude")

viewer.add_image(MergedmTmG2_Ch1[:-1,:,:,:], name="MergedmTmG2_Ch1")
viewer.add_image(MergedmTmG2_Ch0[:-1,:,:,:], name="MergedmTmG2_Ch0")
napari.run()
