# Detección de velocidad angular

Una vez tenemos los archivos de trayectorias experimentales se ejecutan los siguientes pasos para el algoritmo:
1. Para cada video se hace un bucle que recorre todas las partículas
2. Para cada particula se calculan los perfiles de luminosidad de las aspas en cada fotograma
3. Para cada par de perfiles de luminosidad consecutivos se calcula su "time-lagged correlation function"
4. Se detecta (y se refina) el máximo de esas correlaciones que coincide con el desplazamiento angular de la particula entre esos dos fotogramas

In [1]:
import glob
import pims
import multiprocessing as mp
from functools import partial
import numpy as np
import pandas as pd
pd.options.mode.chained_assignment = None  # default='warn'
from tqdm import tqdm
from scipy.interpolate import interp1d
from scipy.signal import savgol_filter

In [2]:
SHAPE = [832, 800] # Dimensiones del video

Tras importar las librerias necesarias defino algunas funciones útiles que llamaré más adelante:

In [7]:
def angle_from_2D_points(array_x, array_y):
    """ Given two 1D arrays, corresponding to 'x' and 'y' coordinates of 2D points,
        this function returns the angle of those points directions

    Parameters
    ----------
    array_x : array
        1D array of 'x' positions, ints or floats.        
    array_y : array
        1D array of 'y' positions, ints or floats.   
    
    Returns
    -------
    array
        A 1D numpy array with points angles expressed in degrees.
    """

    hipotenusas = np.linalg.norm([array_x, array_y], axis=0)
    senos = np.abs(array_y / hipotenusas)
    rad_angles = np.arcsin(senos)

    indices_cuadrante_sup_der = np.intersect1d(np.where(np.sign(array_y)>0), np.where(np.sign(array_x)>0), assume_unique=True)
    indices_cuadrante_sup_izq = np.intersect1d(np.where(np.sign(array_y)>0), np.where(np.sign(array_x)<0), assume_unique=True)
    indices_cuadrante_inf_der = np.intersect1d(np.where(np.sign(array_y)<0), np.where(np.sign(array_x)>0), assume_unique=True)
    indices_cuadrante_inf_izq = np.intersect1d(np.where(np.sign(array_y)<0), np.where(np.sign(array_x)<0), assume_unique=True)

    indices_eje_izquierdo = np.intersect1d(np.where(np.sign(array_y)==0), np.where(np.sign(array_x)<0), assume_unique=True)
    indices_eje_inferior = np.intersect1d(np.where(np.sign(array_y)<0), np.where(np.sign(array_x)==0), assume_unique=True)

    rad_angles[indices_cuadrante_sup_der] = rad_angles[indices_cuadrante_sup_der] + 0
    rad_angles[indices_cuadrante_sup_izq] = np.pi - rad_angles[indices_cuadrante_sup_izq]
    rad_angles[indices_cuadrante_inf_der] = 2*np.pi - rad_angles[indices_cuadrante_inf_der]
    rad_angles[indices_cuadrante_inf_izq] = rad_angles[indices_cuadrante_inf_izq] + np.pi

    rad_angles[indices_eje_izquierdo] = np.pi
    rad_angles[indices_eje_inferior] = 3*np.pi/2


    deg_angles = np.rad2deg(rad_angles)
    return deg_angles



def maskImage(img, mask):
    """ Masks an input image

    Parameters
    ----------
    img : array
        Input image
    mask : array
        True/False array with same shape as input image

    Returns
    -------
    masked_img : array
        output masked image

    """
    masked_img = img.copy()
    masked_img[~mask] = 0
    
    return masked_img



def fast_ring_mask(center, XX, YY, r_min=15, r_max=22):
    """ Creates an OpenCV circular mask

    Parameters
    ----------
    XX, YY : numpy arrays
        XX, YY = np.ogrid[:800, :1280] where 800 and 1280 are height and width respectively
    center : tuple or list, optional
        Pair of coordinates for the mask's central point. If not specified uses: [w/2, h/2]
    r_min, rmax : float, optional
        Values of the mask radiuses of the ring.
    
    Returns
    -------
    mask : array
        Mask for using with the image (array of True/False values)
    """
    dist_from_center = np.linalg.norm([XX - center[1], YY - center[0]])
    mask = np.logical_and(dist_from_center <= r_max, dist_from_center >= r_min)
    return mask
# Función parcial a la que solo hay que pasarle ya el centro de la partícula y crea la máscara
XX, YY = np.ogrid[:SHAPE[1], :SHAPE[0]]
partial_ring_mask = partial(fast_ring_mask, XX=XX, YY=YY, r_min=15, r_max=23) 



def get_brightness_profile_single_particle(row, video_path):
    """
    This function calculates the brightness profile of a particle from its position
    and image

    Parameters
    ----------
    row : list
        Lista con los siguientes elementos:
            0 -> Index of the row in the original trajectories dataframe 
            1 -> nº frame
            2 -> position, x particle center
            3 -> positión, y particle center
    video_path : str
        Path to the .cine video file corresponding to the current experiment

    Returns
    -------
    Tuple withe the following items:
    indice : int
        Index of the row in the original trajectories dataframe 
    brillo : array
        Array with sorted (by angle) values of brighness extrected from the blades image

    """
    # tamaño ventana filtro savgol debe ser 21 para r_min=15, r_max=20, y 29 para r_min=15, r_max=23
    indice, frame, x_0, y_0 = row
    video=pims.Cine(video_path)
    frame_orig = video.get_frame(int(frame)-1) #EL -1 ES PARA CORREGIR QUE EN LOS DATAFRAME EMPIEZO EN FRAME 1 Y NO 0, QUITAR SI FALLA

    mask = partial_ring_mask([x_0, y_0])
    frame = maskImage(frame_orig, mask)

    y, x = np.where(frame != 0)
    brillo = frame[(y,x)]
    brillo = brillo * (255/brillo.max())
    x_rel_to_center = x - x_0
    y_rel_to_center = y - y_0
    angulos = angle_from_2D_points(x_rel_to_center, y_rel_to_center)
    sort_inds = np.argsort(angulos)
    angulos = angulos[sort_inds]
    brillo = brillo[sort_inds]
    brillo = savgol_filter(brillo, window_length=29, polyorder=3)

    return (int(indice), brillo)



def crosscorr_lag(datax, datay, lag=0, wrap=False):
    """ Lag-N cross correlation. 
    Shifted data filled with NaNs 
    
    Parameters
    ----------
    lag : int, default 0
    datax, datay : pandas.Series objects of equal length
    Returns
    ----------
    crosscorr : float
    """
    datax = pd.Series(datax)
    datay = pd.Series(datay)
    if wrap:
        shiftedy = datay.shift(lag)
        shiftedy.iloc[:lag] = datay.iloc[-lag:].values
        return datax.corr(shiftedy)
    else: 
        return datax.corr(datay.shift(lag))
    
    
    
def find_displacement_angle_from_lagged_correlation(pair_of_arrays, N_ASPAS=14, N_NEIGHBORS_FOR_SUBPIXEL=4):
    """
    Given two consecutive brightness profiles corresponding to the same particle, returns
    the angular displacement between them by means of a lagged cross correlation (similar
    to what is used in PIV techniques)

    Parameters
    ----------
    pair_of_arrays : list
        List with 2 consecutive arrays each containing the brightness profile of particles in consecutive frames   
    N_ASPAS : int, optional
        Expected number of blades in the profile. The default is 14.
    N_NEIGHBORS_FOR_SUBPIXEL : int, optional
        Number of points used to refine the detected maximum of the lagged cross-correlation. The default is 4.

    Returns
    -------
    angulo_desplazamiento_rad : float
        Value, in radians, of the angular displacement between the two brighness profiles
    """
    
    # a y b son los arrays de luminosidades
    a = pair_of_arrays[0]
    b = pair_of_arrays[1]

    # Nº de puntos (indices) en el array que corresponden a un intervalo angular de media aspa   
    puntos_por_media_aspa = int(len(a)/(2*N_ASPAS))
    # El lag maximo para hacer la correlacion entre los dos arrays oscila entre -0.5aspas y +0.5aspas
    valores_lag = np.arange(-puntos_por_media_aspa, puntos_por_media_aspa)
    
    # Calculamos las correlaciones para esos lags
    correlaciones = []
    for lag in valores_lag:
        correlaciones.append(crosscorr_lag(a, b, lag=lag))
        
    # Ahora tenemos que calcular el máximo de esta curva y afinar para tener más precisión
    first_guess_max_index = np.argmax(correlaciones)
    # Calculamos el máximo de la serie y cogemos un entorno a su alrededor donde interpolaremos
    # con más puntos un polinomio de tercer grado para afinar la deteccion del máximo
    # Los dos primero ifs son para tratar los casos extremos
    if first_guess_max_index<=N_NEIGHBORS_FOR_SUBPIXEL:
        surrounding_region = correlaciones[0 : first_guess_max_index+N_NEIGHBORS_FOR_SUBPIXEL]
        x = np.arange(valores_lag[0], valores_lag[first_guess_max_index+N_NEIGHBORS_FOR_SUBPIXEL])
    elif first_guess_max_index>=len(valores_lag)-N_NEIGHBORS_FOR_SUBPIXEL:
        surrounding_region = correlaciones[first_guess_max_index-N_NEIGHBORS_FOR_SUBPIXEL :]
        x = np.arange(valores_lag[first_guess_max_index-N_NEIGHBORS_FOR_SUBPIXEL], valores_lag[-1]+1)
    else:
        surrounding_region = correlaciones[first_guess_max_index-N_NEIGHBORS_FOR_SUBPIXEL+1 : first_guess_max_index+N_NEIGHBORS_FOR_SUBPIXEL]
        x = np.arange(valores_lag[first_guess_max_index-N_NEIGHBORS_FOR_SUBPIXEL+1], valores_lag[first_guess_max_index+N_NEIGHBORS_FOR_SUBPIXEL])
    # Aproximamos a una cúbica       
    f = interp1d(x, surrounding_region, kind = 'cubic')   
    x_new = np.linspace(x[0], x[-1], 300) # Nuevo array de lags para la interpolacion, con 300 valores en lugar de 2*N_NEIGHBORS_FOR_SUBPIXEL
    y_new = f(x_new)
    second_guess_max_index = np.argmax(y_new)
    
    # En todo el array del perfil de brillo de las aspas hay 2*pi radianes -> cada valor representa (2*np.pi/len(a))
    # El desplazamiento se multiplica por eso
    angulo_desplazamiento_rad = (2*np.pi/len(a)) * x_new[second_guess_max_index]
    return angulo_desplazamiento_rad


def detect_spin_from_correlation_brightness_profile_video(video_file, data_file, frame_max='max', PARALLEL=True):
    """
    Calculates the angular velocity of disks with blades

    Parameters
    ----------
    video_file : str
        Path to the .cine experimental movie
    data_file : str
        Path to the .pkl.xz experimental trajectories file
    frame_max : int, optional
        Maximum number of frames to process. The default is 'max'.
    PARALLEL : bool, optional
        Use multiprocessing or not. The default is True

    Returns
    -------
    new_df : pandas DataFrame
        Dataframe with an added column 'w' representing the instantaneous angular displacement in rad/frame

    """
    # GRADOS_POR_ASPA = 360/14.
    df = pd.read_pickle(data_file, compression='xz')
    df['indice'] = df.index

    rows = df[['indice','frame','x','y']].values
    N = len(rows)
    if frame_max != 'max':
        sub = df[df.frame <= frame_max]
        rows = sub[['indice','frame','x','y']].values
        N = len(rows)

    # Función parcial, ahora solo acepta como entrada una lista, de la forma [indice, n_frame, x, y]
    partial_get_brightness_profile_single_particle = partial(get_brightness_profile_single_particle, video_path=video_file)
    # Calculamos los angulos de los picos de brillo alrededor de las partículas.
    # Usamos múltiples procesadores
    N_CORES = mp.cpu_count()
    print(f'Computing angular brightness profiles using {N_CORES} cores \n')
    with mp.Pool(processes=N_CORES) as pool:
        dict_indices_max_mins = dict(list(tqdm(pool.imap(partial_get_brightness_profile_single_particle, rows), total=N, desc=f"{video_file}", unit=" particles")))

    df['array_brillos'] = df['indice'].map(dict_indices_max_mins)
    
    
    # Ahora, para cada partícula tengo que ir haciendo la correlacion frame a frame 
    # de los perfiles de brillo para calcular la velocidad angular
    new_df = []
    for t in set(df.track):
        sub = df[df.track==t]
        
        # Tengo que iterar sobre tuplas de brillos correspondientes a frames consecutivos:
        brighness_profile_pairs = [sub.iloc[i:i+2]['array_brillos'].values for i in range(len(sub))]
        
        if PARALLEL==True:
            N_CORES = mp.cpu_count()
            print(f'Correlating consecutive frames using {N_CORES} cores \n')
            with mp.Pool(processes=N_CORES) as pool:
                desplazamientos_angulares = list(tqdm(pool.imap(find_displacement_angle_from_lagged_correlation, brighness_profile_pairs[:-1])))
        else:
            desplazamientos_angulares = list(tqdm(map(find_displacement_angle_from_lagged_correlation, brighness_profile_pairs[:-1])))
        
        desplazamientos_angulares.append(desplazamientos_angulares[-1]) #El ultimo elemento no lo podemos clcular, repetimos el anterior
        sub['w'] = np.array(desplazamientos_angulares)
        new_df.append(sub)
        
    new_df = pd.concat(new_df)
        
    #return new_df[['frame','track','x','y','w']]
    return new_df

Ahora, ya podemos ejecutar la detección de ángulos, primero definimos algunas variables y directorios:

In [6]:
FOLDER = 'C:/Users/malopez/Desktop/pruebasFran/' # Directorio donde están los archivos .pkl y .cine
PARALLEL = True # Usado para paralelizar o no ciertas funciones

track_files = glob.glob(FOLDER + '*_trajectories.pkl.xz', recursive=True)
track_files

['C:/Users/malopez/Desktop/pruebasFran\\105fa8bdab0e675f2ce62ab9bbb422ae_raw_trajectories.pkl.xz',
 'C:/Users/malopez/Desktop/pruebasFran\\2dad36cbc58c33e0410d2b9ee1ff9cdc_raw_trajectories.pkl.xz',
 'C:/Users/malopez/Desktop/pruebasFran\\4195dccd212da00d52f76d98cf7e413b_raw_trajectories.pkl.xz',
 'C:/Users/malopez/Desktop/pruebasFran\\47e6c4f6901cce30755826711f4a43bf_raw_trajectories.pkl.xz',
 'C:/Users/malopez/Desktop/pruebasFran\\643e88ebecb763c9b60732454a4eae52_raw_trajectories.pkl.xz',
 'C:/Users/malopez/Desktop/pruebasFran\\679bb36091b5ffccddb7e94618a5e6f6_raw_trajectories.pkl.xz',
 'C:/Users/malopez/Desktop/pruebasFran\\7da0eb99bd3172dde3244f91a6caee41_raw_trajectories.pkl.xz']

Y por último iteramos sobre todos estos archivos para sacar las velocidades angulares:

In [None]:
    for track_file in track_files:
        # Leo la información para saber cual es el archivo .cine correspondiente
        experiment_id = os.path.split(track_file)[-1].split('_')[0]
        info_file = folder + f'{experiment_id}_experiment_info.txt'
        with open(info_file) as f:
            jsonstr = json.load(f)
        video_file = folder + os.path.split(jsonstr['original_file'])[-1]

        # Llamo a la funcion que correlaciona perfiles de luminosidad
        df = detect_spin_from_correlation_brightness_profile_video(video_file, track_file, frame_max='max', PARALLEL=PARALLEL)
        new_df = df[['frame','track','x','y','w','natural_spin']]
        # Paso de radianes por fotograma a aspas por fotograma
        new_df['w'] = 14 * new_df['w'] / (2*np.pi)
        # Guardo en un nuevo archivo
        new_df.to_pickle(folder + f'{experiment_id}_w.pkl.xz', compression='xz')
