# Configuration

In [36]:
# nothing so far

# Imports

In [55]:
import itertools
import os
from glob import glob
from more_itertools import only
from pathlib import Path

import matplotlib.pyplot as plt
import matplotlib.patheffects as path_effects
from matplotlib import collections as mc
from matplotlib.path import Path as matpltPath

import seaborn as sns
from statannotations.Annotator import Annotator

import numpy as np
from numpy.linalg import norm
from scipy.interpolate import interpn

import pandas as pd
from tqdm import tqdm
import unittest

# Shared variables

# Functions

In [38]:
def polarity_scale(ellipsoid_radii, principle_moments):
    """ This function computes the scale for polarity vectors which is used in the rest of the notebook. 

    Args:
        ellipsoid_radii (np.array of size (...) x 3): array of radii, sorted at the last axis
        principle_moments (np.array of size (...) x 3): array of radii, sorted at the last axis

    Returns:
        np.array of size (...): array containg the new scale coefficients
    """

    with np.errstate(invalid='ignore'):  # we set z = 0 whenever we have no data:
        z = 1 - np.sqrt( ellipsoid_radii[..., 0] * ellipsoid_radii[..., 1] ) / ellipsoid_radii[..., 2]
        z = np.nan_to_num(z)
    return z

In [39]:
def volume_of_revolution(r0, dy, dr):
    return np.pi * dy * (r0**2 + 2*r0*dr/2 + dr**2/3)

def compute_volume_area(xs, ys):
    """ Computes the volume and area of one embryo

    Args:
        xs (np.array): x coordinates of embryo boundary
        ys (np.array): y coordinates of embryo boundary

    Returns:
        tupel(float, float): volume, area
    """

    vol = 0.0
    area = 0.0

    for i in range(1, len(ys)):
        x0, y0 = xs[i-1], ys[i-1]
        x1, y1 = xs[i], ys[i]
        r0 = x0 
        dy = y1-y0 
        dr = x1-x0

        vol += volume_of_revolution(r0, dy, dr) 
        area += dy * (r0 + 0.5*dr)

    return np.abs(vol), np.abs(area) 

# test
xs_test = [np.sin(a) for a in np.linspace(0,np.pi,10000)]
ys_test = [np.cos(a) for a in np.linspace(0,np.pi,10000)]
assert( np.abs( compute_volume_area(xs_test, ys_test)[0] - np.pi * 1**2 * 4/3 ) < 1e-5)

In [40]:
def rescale_data(data, scale_by_volume = True):
    """Rescales the data according to the volume or area 

    Args:
        data (Dict like): input dataset
        scale_by_volume (bool, optional): Use volume or area. Defaults to True.

    Returns:
        Dict: rescaled data
    """

    embryo_X, embryo_Y = data["embryo_X"], data["embryo_Y"]

    vol, area = compute_volume_area(embryo_X, embryo_Y)
    if scale_by_volume:
        f = ((4.0/3.0*np.pi) / vol)**(1.0/3.0)
    else:
        f = (np.pi / area)**(1.0/2.0)


    

    new_data = {}

    for k in ["xs","ys","embryo_X","embryo_Y", "embryo_avg_mag", "embryo_min_mag",  "embryo_max_mag", "embryo_std_mag","cut_plane_height"]: 
        new_data[k] = f * data[k]

    for k in ["major_axes", "principle_moments", "ellipsoid_radii", "embryo_bnd_angls", "cell_labels"]:
        new_data[k] = data[k]

    new_data["scale"] = f

    # we do not scale the major_axes, principle_moments and ellipsoid_radii since these lengths carry a meaning and we assume that embryo differences
    # are explained by variations in cell count and not by variations in individual cell size

    return new_data, f 

def load_rescaled_data(fn, scale_by_volume = True):
    """ Loads and rescaled data directly.

    Args:
        fn (str): filename
        scale_by_volume (bool, optional): Normalize volume or area . Defaults to True.

    Returns:
        Dict: rescaled dataset
    """
    return rescale_data(np.load(fn, scale_by_volume))[0]

# test: 
# by_volume = True
# data_rescaled, _  = rescale_data(data, scale_by_volume=by_volume)
# _, f = rescale_data(data_rescaled, scale_by_volume=by_volume)
# print(f)
# assert(np.isclose(f, 1.0))

In [41]:
def interpolate_vectors(xs, ys, major_axes, radii, moments, xs_unif, ys_unif):
    """ Interpolate a vectorfield onto a new grid. This function is used to ensure what we can overlay vectors.

    Args:
        xs (np.array): x coordinates
        ys (np.array): y coordinates
        major_axes (np.array): array containing the vector field of major axes (normalized)
        radii (np.array): ellipsolid radii
        moments (np.array): principle components
        xs_unif (np.array): new x coordinates
        ys_unif (np.array): new y coordinates

    Returns:
        np.array: returns an array of shape 'number of embryos' x 'len(xs_unif)' x 'len(ys_unif)' x 3 with the new vector field
        of polartities which are scaled using the `polarity_scale` function.
    """

    scale = polarity_scale(radii, moments)
    vecs = scale[...,None] * major_axes

    pts_unif = np.stack(np.meshgrid(xs_unif, ys_unif, indexing='ij'),axis=-1).reshape(-1, 2)  # get N x 2 array of all xy coordinates
    
    out = np.empty((vecs.shape[0], len(xs_unif), len(ys_unif), vecs.shape[3]))  # num embryos x 'X' x 'Y' x 'dim'

    for k_slice in range(vecs.shape[0]):
        slice = vecs[k_slice, ...]
        vecs_unif = interpn((xs, ys), slice, pts_unif, bounds_error=False, fill_value = 0.0)
        out[k_slice,...] = np.reshape(vecs_unif, (len(xs_unif), len(ys_unif), -1))
    
    return out

In [53]:
def average_boundary(datas, boundary_type = "avg", stage = ""):
    mags = np.mean(np.array([data[f"embryo_{boundary_type}_mag"] for data in datas]), axis = 0)
    angls = datas[0]["embryo_bnd_angls"]

    s = np.sin(angls); c = np.cos(angls)
    X = mags * s 
    Y = mags * c 

    # save files as well: 
    np.savez(f"{boundary_type}_boundary_XY_stage={stage.replace("/", "_")}.npz", X = X, Y = Y, mags = mags, angls = angls)

    return angls, mags, X, Y 

## Weighted nematic average

In [42]:
def normalized(a, axis=-1, order=2):
    """Normalize vectors

    Args:
        a (np.array): vectors
        axis (int, optional): normalize along axis. Defaults to -1.
        order (int, optional): order of the norm. Defaults to 2.

    Returns:
        np.array: normalized vectors
    """

    l2 = np.atleast_1d(np.linalg.norm(a, order, axis))
    l2[l2==0] = 1
    return a / np.expand_dims(l2, axis)

In [43]:
def weighted_nematic_mean(u):
    """Compute weighted nematic mean value which is given as the maximal eigenvalue * eigenvector of the tensor
    1/n sum(i=1)^n (w[i] dim/(dim-1)*u[i] u[i]^T - 1/(dim-1)*I) where w[i] = norm(u[i]).

    Args:
        u (np.array of size (...) x dim): array of vectors to average. The average is taken with respect to the inital axes.

    Returns:
        np.array of size dim: weighted nematic average
    """

    u = np.reshape(u, (-1, u.shape[-1]))
    
    dim = u.shape[-1]
    n = u.shape[0]

    w = np.linalg.norm(u, axis = -1)

    if w.sum() == 0:
        # shortcut for empty cases 
        return np.zeros((dim,))
    
    v = normalized(u, axis = -1)

    
    A = np.einsum('i,ij,ik -> jk', w, v, v) / n
    omega = dim/(dim-1)*A - np.sum(w)/n * np.eye(dim)/(dim-1)

    eigenvalues, eigenvectors = np.linalg.eigh(omega)

    return eigenvalues[-1] * eigenvectors[:,-1]

# Test:
# u = np.random.randn(10000, 2, 2)
# u_mean = weighted_nematic_mean(u)
# assert( np.linalg.norm(u_mean) < 0.05 )

In [52]:
def compute_nematic_mean(vecs):
    """ Apply the weighted nematic mean function to all pixels and average over the first axis.

    Args:
        vecs (np.array of size (#embryodata x #xs x #ys x 3)): vectorfield

    Returns:
        np.array of size (#xs x #ys x 3): nematic mean
    """

    # combine all data 
    out = np.zeros(vecs.shape[1:])

    # iterate over all pixels 
    for i in tqdm(range(vecs.shape[1])): 
        for j in range(vecs.shape[2]):
            data_at_pixel = vecs[:,i,j,:]

            mean_at_pixel = weighted_nematic_mean(data_at_pixel) 

            out[i,j,:] = mean_at_pixel 

    return out 

In [54]:
def averageExpData(data, xs, zs, boundary_x, boundary_z):

    X, Z = np.meshgrid(xs, zs)  # Create a grid
    pointsXZ = np.c_[X.ravel(), Z.ravel()]
    polygon = matpltPath(np.array([boundary_x, boundary_z]).T)
    inside = polygon.contains_points(pointsXZ)
    inside_mask = inside.reshape(X.shape)

    # keep only points and data inside the exp boundary
    data_inside = data[inside_mask]
    X_inside = X[inside_mask]
    Z_inside = Z[inside_mask]

    # calculate volume element
    dx = xs[1] - xs[0]  # x resolution
    dz = zs[1] - zs[0]  # z resolution
    volume_element = 2*np.pi*X_inside*dx*dz  # 2*pi*r*dr*dz, but r=x in cylyndrical coordinates

    # perform the integration
    numerator = np.sum(data_inside * volume_element)
    denominator = np.sum(volume_element)
    return numerator/denominator


## Plot helpers

In [44]:
def plot_boundary(ax, data, boundary_type, **kwargs):
    """ Plot the embryo boundary.

    Args:
        ax (Axis): current axis
        datas (Dict): dataset
        boundary_type (str): can be 'min', 'max' or 'avg'. 
    """
    
    R = data[f"embryo_{boundary_type}_mag"]
    embryo_bnd_angls = data["embryo_bnd_angls"]

    s = np.sin(embryo_bnd_angls)
    c = np.cos(embryo_bnd_angls)

    ax.plot(R * s, R * c, **kwargs)

def plot_all_boundaries(ax, data):
    plot_boundary(ax, data, "min", linewidth = 0.8, color = "darkgreen", linestyle = "dashed")
    plot_boundary(ax, data, "max", linewidth = 0.8, color = "orange", linestyle = "dashed")
    plot_boundary(ax, data, "avg", linewidth = 0.8, color = "black", linestyle = "solid")

In [None]:
# helper function used later to plot axes as lines which are centered at a pixel
def create_axes(xs, ys, nematic_axes, steps, l):

    xs = xs[::steps[0]]
    ys = ys[::steps[1]]
    grid = itertools.product(xs, ys)

    uv = nematic_axes[::steps[0], ::steps[1], :]

    return [[(x-l*vx,y-l*vy),(x+l*vx,y+l*vy)] for ((x,y),vx, vy) in zip(grid, uv[:,:,0].flatten(), uv[:,:,1].flatten())]

def plot_axes(ax, xs, ys, nematic_axes, steps = (10, 10), l = 0.5, lw = 1.6):
    
    line_data = create_axes(xs, ys, nematic_axes, steps=steps, l = l)
    lines = mc.LineCollection(line_data, color = "black", linewidth = lw, path_effects = [path_effects.Stroke(capstyle="round")])
    ax.add_collection(lines)

# Tests

This section contains unit tests for the functions defined above.

In [None]:
class TestAnalysisFunctions(unittest.TestCase):
    def test_polarity_scale(self):
        # Test with infinite cylinder
        radii = np.array([[1, 0, 0]])
        moments = np.array([[1, 0, 0]])
        result = polarity_scale(radii, moments)
        self.assertAlmostEqual(result[0], 0.0)

        # Test with flat shape
        radii_zero = np.array([[0, 1, 1]])
        moments = np.array([[0, 1, 1]])
        result_zero = polarity_scale(radii_zero, moments)
        self.assertEqual(result_zero[0], 1.0)

    def test_volume_area(self):
        # Test sphere volume (already implemented)
        xs_sphere = [np.sin(a) for a in np.linspace(0, np.pi, 10000)]
        ys_sphere = [np.cos(a) for a in np.linspace(0, np.pi, 10000)]
        vol, area = compute_volume_area(xs_sphere, ys_sphere)
        self.assertAlmostEqual(vol, np.pi * 4/3, places=5)

        # Test cylinder volume
        xs_cylinder = [1.0] * 1000  # radius = 1
        ys_cylinder = np.linspace(0, 2, 1000)  # height = 2
        vol_cyl, area_cyl = compute_volume_area(xs_cylinder, ys_cylinder)
        self.assertAlmostEqual(vol_cyl, np.pi * 2, places=5)  # V = πr²h

    def test_weighted_nematic_mean_random(self):
        # Test with random normal distributed vectors
        u = np.random.randn(100000, 2)
        u_mean = weighted_nematic_mean(u)
        self.assertLess(np.linalg.norm(u_mean), 0.01) # should be small 

    def test_weighted_nematic_mean_identical(self):
        # Test with identical vectors
        u = np.tile(np.array([1.0, 2.0]), 10).reshape(-1,2)
        u_mean = weighted_nematic_mean(u)

        # Should return a vector in the same direction
        expected = np.array([1.0, 2.0])
        np.testing.assert_array_almost_equal(np.abs(u_mean), np.abs(expected))

    def test_weighted_nematic_mean_single_nonzero(self):
        # Test with one directed vector and rest zeros
        u = np.vstack((np.array([[1,2]]), np.zeros((100,2))))
        u_mean = weighted_nematic_mean(u)
        np.testing.assert_array_less(np.abs(u_mean), 0.05)

# Run the tests
unittest.main(argv=[''], exit=False)

.......
----------------------------------------------------------------------
Ran 5 tests in 0.048s

OK
..
----------------------------------------------------------------------
Ran 5 tests in 0.048s

OK


<unittest.main.TestProgram at 0x14e9019b4a0>