In [1]:
# LCS
import numpy as np
from tqdm.notebook import tqdm
from joblib import Parallel, delayed

class LCS:
    
    def __init__(self):
        
        pass

In [2]:
    def _FTLE_(self):
        
        if hasattr(self, 'C') == False:
        
            self._cauchy_green_strain()
            
        print("=================FTLE=================")
    
        self.FTLE = np.zeros((self.len_Y, self.len_X, self.dim))*np.nan
        
        for i in range(self.len_Y):
        
            for j in range(self.len_X):
                
                lambda_min, lambda_max, v_min, v_max = self._eigenvalues_and_eigenvectors(self.C[i, j, :, :])
                        
                if lambda_min > 0:
                    
                    self.FTLE[i, j, 0] = 1/(2*(self.lenT))*np.log(lambda_min)
                    self.FTLE[i, j, 1] = 1/(2*(self.lenT))*np.log(lambda_max)
                        
        return self.FTLE[:,:,0], self.FTLE[:,:,1]

In [3]:
    def _PRA_(self):
            
        if hasattr(self, 'grad_Fmap_grid') == False:
        
            self._grad_Fmap_grid()
            
        print("=================PRA=================")
            
        self.PRA = np.zeros(self.X_domain.shape)
            
        for i in range(self.len_Y):
        
            for j in range(self.len_X):
                
                U, S, V = self._svd(self.grad_Fmap_grid[i, j, :, :])
                
                self.PRA[i, j] = np.arccos(U[0, 0]*V[0, 0]+U[0, 1]*V[0, 1])
        
        return self.PRA

In [4]:
    def _LAVD_(self):
        
        if hasattr(self, 'trajectory_grid') == False:
            
            self._trajectory_grid()
            
        print("=================LAVD=================")
            
        def parallelization(k, t):
                
            self.omega = self._vorticity(t)
                
            spatially_averaged_vorticity = np.nanmean(self.omega.ravel())
                
            LVD = np.zeros((self.len_Y, self.len_X))
            
            for i in range(self.len_Y):
            
                for j in range(self.len_X):
                    
                    x = np.array([self.trajectory_grid[i, j, 0, k], self.trajectory_grid[i, j, 1, k]]).reshape(1, -1)

                    W = self._vorticity_tensor(x, t)
                    
                    omega = W[0, 1]-W[1, 0]
                
                    LVD[i, j] = np.abs(omega-spatially_averaged_vorticity)
                    
            return LVD
        
        self.LVD = np.array(Parallel(n_jobs=self.Ncores, verbose = 1)(delayed(parallelization)(k, t) for k, t in tqdm(enumerate(self.time), total=len(self.time))))
        
        self.LAVD = np.nanmean(self.LVD, axis = 0)
        
        return self.LAVD

In [5]:
    def _TRA_(self):
        
        if hasattr(self, 'trajectory_grid') == False:
            
            self._trajectory_grid()

        print("=================TRA=================")
        
        self.TRA = np.zeros((self.len_Y, self.len_X))
            
        for i in range(self.len_Y):
            
            for j in range(self.len_X):
                    
                velx0 = self.velocity_grid[i, j, 0, 0]
                vely0 = self.velocity_grid[i, j, 1, 0]
                    
                vel0 = np.sqrt(velx0**2+vely0**2)
                    
                velxN = self.velocity_grid[i, j, 0, -1]
                velyN = self.velocity_grid[i, j, 1, -1]
                
                vel1 = np.sqrt(velxN**2+velyN**2)
                
                self.TRA[i, j] = np.abs(np.arccos((velx0*velxN+vely0*velyN)/(vel0*vel1)))
        
        return self.TRA

In [6]:
    def _TRA_bar_(self):
        
        if hasattr(self, 'trajectory_grid') == False:
            
            self._trajectory_grid()

        print("=================TRA_bar=================")
        
        self.TR_bar = np.zeros((self.len_Y, self.len_X, self.lenT-2))
        
        for k in tqdm(range(self.lenT-2), total = self.lenT-2):
            
            for i in range(self.len_Y):
            
                for j in range(self.len_X):
                    
                    velx0 = self.velocity_grid[i, j, 0, k]
                    vely0 = self.velocity_grid[i, j, 1, k]
                    
                    vel0 = np.sqrt(velx0**2+vely0**2)
                    
                    velx1 = self.velocity_grid[i, j, 0, k + 1]
                    vely1 = self.velocity_grid[i, j, 1, k + 1]
                
                    vel1 = np.sqrt(velx1**2+vely1**2)
                
                    self.TR_bar[i, j, k-1] = np.abs(np.arccos((velx0*velx1+vely0*vely1)/(vel0*vel1)))
        
        self.TRA_bar = np.nanmean(self.TR_bar, axis = 2)/(self.tN-self.t0)
        
        return self.TRA_bar

In [7]:
    def _TSE_(self):
    
        if hasattr(self, 'trajectory_grid') == False:
            
            self._trajectory_grid()

        print("=================TSE=================")
        
        self.TSE = np.zeros((self.len_Y, self.len_X))
            
        for i in range(self.len_Y):
            
            for j in range(self.len_X):
                    
                velx0 = self.velocity_grid[i, j, 0, 0]
                vely0 = self.velocity_grid[i, j, 1, 0]
                    
                vel0 = np.sqrt(velx0**2+vely0**2)
                    
                velxN = self.velocity_grid[i, j, 0, -1]
                velyN = self.velocity_grid[i, j, 1, -1]
                
                velN = np.sqrt(velxN**2+velyN**2)
                
                self.TSE[i, j] = 1/(self.tN-self.t0)*np.log(velN/vel0)
        
        return self.TSE

In [8]:
    def _TSE_bar_(self):
        
        if hasattr(self, 'trajectory_grid') == False:
            
            self._trajectory_grid()

        print("=================TSE_bar=================")
        
        self.TSE_bar = np.zeros((self.len_Y, self.len_X, self.lenT-2))
        
        for k in tqdm(range(self.lenT-2), total = self.lenT-2):
            
            for i in range(self.len_Y):
            
                for j in range(self.len_X):
                    
                    velx0 = self.velocity_grid[i, j, 0, k]
                    vely0 = self.velocity_grid[i, j, 1, k]
                    
                    vel0 = np.sqrt(velx0**2+vely0**2)
                    
                    velx1 = self.velocity_grid[i, j, 0, k + 1]
                    vely1 = self.velocity_grid[i, j, 1, k + 1]
                
                    vel1 = np.sqrt(velx1**2+vely1**2)
                
                    self.TSE_bar[i, j, k] = np.abs(np.log(vel1/vel0))
        
        self.TSE_bar = np.nanmean(self.TSE_bar, axis = 2)/(self.tN-self.t0)
        
        return self.TRA_bar

In [9]:
    def _hyperbolic_LCS_local_variational_theory(self, max_distance = 1, type = "shrinklines", max_line_length = 10, step_size = None):
        
        if step_size is None:
            step_size = self.dx/20
        
        self.lambda_max = np.zeros((self.len_Y, self.len_X))
        self.eigenvector_max = np.zeros((self.len_Y, self.len_X, self.dim))
        self.lambda_min = np.zeros((self.len_Y, self.len_X))
        self.eigenvector_min = np.zeros((self.len_Y, self.len_X, self.dim))
        
        for i in range(self.len_Y):
        
            for j in range(self.len_X):
        
                lambda_min, lambda_max, v_min, v_max = self._eigenvalues_and_eigenvectors(self.C[i, j, :, :])
        
                if np.isfinite(lambda_max) and np.isfinite(lambda_min):
                
                    self.lambda_max[i, j] = lambda_max
                    self.lambda_min[i, j] = lambda_min
                    self.eigenvector_max[i, j, :] = v_max
                    self.eigenvector_min[i, j, :] = v_min

        if type == "shrinklines":
    
            self.eigen = self.eigenvector_min
        
            # Find local maxima of the max eigenvalue field
            peak_x, peak_y, peak_field = self._find_2D_peaks(max_distance, self.X_domain, self.Y_domain, self.lambda_max)
            
        elif type == "stretchlines":
    
            self.eigen = self.eigenvector_max
        
            # Find local minima of the max eigenvalue field
            peak_x, peak_y, peak_field = self._find_2D_peaks(max_distance, self.X_domain, self.Y_domain, -self.lambda_max)
        
        else:    
            print("Variable type should either be strainlines or stretchlines")
        
        self.Interp_lambda_min = self._gridded_Interpolation(self.Y_domain, self.X_domain, self.lambda_min, "cubic")
        self.Interp_lambda_max = self._gridded_Interpolation(self.Y_domain, self.X_domain, self.lambda_max, "cubic")
        self.Interp_eigen_x = self._gridded_Interpolation(self.Y_domain, self.X_domain, self.eigen[:,:,0], "cubic")
        self.Interp_eigen_y = self._gridded_Interpolation(self.Y_domain, self.X_domain, self.eigen[:,:,1], "cubic")
    
        x_strainlines = []
        y_strainlines = []
        
        for i in tqdm(range(len(peak_x))):
            
            x = np.array([peak_x[i], peak_y[i]])
            bool_loc_max = True
            
            for j in range(len(x_strainlines)):
                    
                if np.sqrt((x[0]-x_strainlines[j])**2+(x[1]-y_strainlines[j])**2) < max_distance:
                    bool_loc_max = False
                    break
            
            x_forward_update = x
            x_backward_update = x
        
            dist_forward = 0
            dist_backward = 0
            dist_total = 0
        
            counter = 0
        
            while dist_total <= max_line_length and bool_loc_max == True:
                
                if counter == 0:
                    
                    vx = self.Interp_eigen_x(x[1], x[0])[0][0]
                    vy = self.Interp_eigen_y(x[1], x[0])[0][0]
                    
                    x_prime_forward = np.array([vx, vy])
                    x_prime_backward = -np.array([vx, vy])
                    
                if x_forward_update is not None:
                    
                    x_forward = x_forward_update
                    x_strainlines.append(x_forward[0])
                    y_strainlines.append(x_forward[1])
                    x_forward_update, x_prime_forward = self._RK4_tensorlines_orientational_discontinuity(x_forward, x_prime_forward, step_size)
                    
                    if x_forward_update is not None:
                        dist_forward = np.sqrt((x_forward_update[0]-x_forward[0])**2+(x_forward_update[1]-x_forward[1])**2)
                
                if x_backward_update is not None:
                    
                    x_backward = x_backward_update
                    x_strainlines.append(x_backward[0])
                    y_strainlines.append(x_backward[1])
                    x_backward_update, x_prime_backward = self._RK4_tensorlines_orientational_discontinuity(x_backward, x_prime_backward, step_size)
                    if x_backward_update is not None:
                        dist_backward = np.sqrt((x_backward_update[0]-x_backward[0])**2+(x_backward_update[1]-x_backward[1])**2)
        
                dist_total += dist_forward+dist_backward
                dist_forward = 0
                dist_backward = 0
            
                if x_forward_update is None and x_backward_update is None:
                    bool_loc_max = False
                
                counter += 1
    
        return x_strainlines, y_strainlines

In [10]:
    def _elliptic_LCS_local_variational_theory(self):
        
        print("=================Elliptic LCS from local variational theory=================")

In [11]:
    def _hyperbolic_LCS_global_variational_theory(self):
        
        print("=================Hyperbolic LCS from global variational theory=================")

In [12]:
    def _elliptic_LCS_global_variational_theory(self):
        
        print("=================Elliptic LCS from global variational theory=================")

In [13]:
    def _vorticity(self, t):
        
        self.omega = np.zeros((self.len_Y, self.len_X))
        
        for i in range(self.len_Y):
            
            for j in range(self.len_X):
                
                x = np.array([self.X_domain[i, j], self.Y_domain[i, j]]).reshape(1, -1)
    
                W = self._vorticity_tensor(x, t)
                
                self.omega[i, j] = W[0, 1]-W[1, 0]
                
        return self.omega              

In [14]:
    def _find_ridges(self, Field, threshold = None, type = "ridge", method = "threshold", resolution = 1, ds = 1, n_iterations = 100):
        
        if type == "ridge":
            
            sign = 1
            
            if threshold is None:
            
                print("Threshold value is None --> Specify threshold.")
                print("If not specified, threshold is set to ", 0.1, " of the maximum value of the given scalar field")
            
                threshold = .1*np.nanmax(Field)
            
        elif type == "trench":
            
            sign = -1
                  
            if threshold is None:
            
                print("Threshold value is None --> Specify threshold.")
                print("If not specified, threshold is set to ", 0.1, " of the maximum value of the given scalar field")
            
                threshold = .1*np.nanmin(Field)
            
        if method == "threshold":
            
            mask = (Field >= threshold)
            
            extrema_x = self.X_domain[mask].ravel()
            extrema_y = self.Y_domain[mask].ravel()
            
            return extrema_x, extrema_y
            
        elif method == "gradient":
            
            Field[np.isnan(Field)] = 0
            
            Interpolant_Field = self._Interpolation(self.Y_domain, self.X_domain, Field, method = "cubic")
            
            grad_Field = np.zeros((Field.shape[0], Field.shape[1], 2))
            
            for i in range(1, Field.shape[0]-1):
                
                for j in range(1, Field.shape[1]-1):
                    
                    dy = (self.Y_domain[i+1,0] - self.Y_domain[i-1, 0])/2
                    dx = (self.X_domain[0, j+1] - self.X_domain[0, j-1])/2

                    grad_Field[i, j, 0] = (Interpolant_Field(self.Y_domain[i, j], self.X_domain[i, j]+.1*dx)[0][0]-Interpolant_Field(self.Y_domain[i, j], self.X_domain[i, j]-.1*dx)[0][0])/(2*.1*dx)
                    grad_Field[i, j, 1] = (Interpolant_Field(self.Y_domain[i, j]+.1*dy, self.X_domain[i, j])[0][0]-Interpolant_Field(self.Y_domain[i, j]-.1*dy, self.X_domain[i, j])[0][0])/(2*.1*dy)
        
            grad_Fieldx = grad_Field[:, :, 0]
            grad_Fieldy = grad_Field[:, :, 1]
            grad_Fieldx[np.isnan(grad_Fieldx)] = 0
            grad_Fieldy[np.isnan(grad_Fieldy)] = 0
            
            Interpolant_gradx_Field = self._Interpolation(self.Y_domain, self.X_domain, grad_Fieldx)
            Interpolant_grady_Field = self._Interpolation(self.Y_domain, self.X_domain, grad_Fieldy)
            
            x_grid = np.linspace(np.min(self.X_domain), np.max(self.X_domain), self.Y_domain.shape[0]*resolution)
            y_grid = np.linspace(np.min(self.Y_domain), np.max(self.Y_domain), self.Y_domain.shape[1]*resolution)
            
            extrema_x = []
            extrema_y = []
            
            for x in tqdm(x_grid, total = len(x_grid)):
                    
                for y in y_grid:
                    
                    x_eval = x
                    y_eval = y
    
                    loc = self._check_location(np.array([x_eval, y_eval]).reshape(1, -1))[0]
                        
                    gradient = 10
                    
                    iter = 0
                        
                    while iter < n_iterations and loc == "IN" and Interpolant_Field(y_eval, x_eval)[0][0] > threshold:
                            
                        loc = self._check_location(np.array([x_eval, y_eval]).reshape(1, -1))[0]
                        
                        gradx = Interpolant_gradx_Field(y_eval, x_eval)[0][0]
                        grady = Interpolant_grady_Field(y_eval, x_eval)[0][0]
                        
                        gradient = np.sqrt(gradx**2+grady**2)
                        
                        x_eval = x_eval + sign * ds * gradx/gradient*dx
                        y_eval = y_eval + sign * ds * grady/gradient*dy
                        
                        iter += 1
                    
                        extrema_x.append(x_eval)
                        extrema_y.append(y_eval)
            
            return extrema_x, extrema_y

        else:
            
            print("The method argument is not valid. Use either threshold or gradient")

In [15]:
    def _find_2D_peaks(self, max_distance, X, Y, Field):
        
        def _find_all_local_maxima(X, Y, Field):
            
            loc_max_x, loc_max_y, loc_max_field = [], [], []
            
            for i in range(2, X.shape[0]-2):
                
                for j in range(2, Y.shape[1]-2):
                    
                    if np.isfinite(Field[i, j]) and Field[i, j] > Field[i+1, j] and Field[i, j] > Field[i-1, j] and Field[i, j] > Field[i, j+1] and Field[i, j] > Field[i, j-1]:
                        
                        loc_max_x.append(X[i, j])
                        loc_max_y.append(Y[i, j])
                        loc_max_field.append(Field[i, j])
            
            return loc_max_x, loc_max_y, loc_max_field
        
        loc_max_x, loc_max_y, loc_max_field = _find_all_local_maxima(X, Y, Field)
        
        n_loc_max = len(loc_max_x)
        
        peak_x, peak_y, peak_field = [], [], []
        
        for i in range(n_loc_max):
            
            bool_loc_max = True
    
            for j in range(n_loc_max):
            
                if i != j and loc_max_field[i] < loc_max_field[j] and np.sqrt((loc_max_x[i]-loc_max_x[j])**2+(loc_max_y[i]-loc_max_y[j])**2) <= max_distance:
                    
                    bool_loc_max = False
                
            if bool_loc_max:
                
                peak_x.append(loc_max_x[i])
                peak_y.append(loc_max_y[i])
                peak_field.append(loc_max_field[i])
                
        return peak_x, peak_y, peak_field

In [16]:
    def _differential_system_tensorlines_orientational_discontinuity(self, x, x_prime):
                
        # Check for orientational discontinuity by introducing appropriate scaling
                
        vx = self.Interp_eigen_x(x[1], x[0])[0][0]
        vy = self.Interp_eigen_y(x[1], x[0])[0][0]
            
        lambda_max = self.Interp_lambda_max(x[1], x[0])[0][0]
        lambda_min = self.Interp_lambda_min(x[1], x[0])[0][0]
            
        alpha = ((lambda_max-lambda_min)/(lambda_max+lambda_min))**2
            
        scaling = np.sign(vx*x_prime[0]+vy*x_prime[1])*alpha
            
        return scaling*np.array([vx, vy])

In [18]:
    def _RK4_tensorlines_orientational_discontinuity(self, x, x_prime, ds):
        
        # Define starting point.
        x1 = x
        
        # If x is outside defined domain --> vel is None --> _RK4 returns "None" and integration will stop.
            
        # Compute x_prime at the beginning of the time-step
        
        loc = self._check_location(x1)[0]
        
        if loc != "IN" or self.Interp_lambda_max(x1[1], x1[0])[0][0] < 1:
            return None, None
        x_prime = self._differential_system_tensorlines_orientational_discontinuity(x1, x_prime)  
        k1 = ds * x_prime

        #  position and time at the first midpoint.
        x2 = x1 + .5 * k1
        loc = self._check_location(x2)[0]
        
        if loc != "IN" or self.Interp_lambda_max(x2[1], x2[0])[0][0] < 1:
            return None, None
        
        # Compute x_prime at the first midpoint.
        x_prime = self._differential_system_tensorlines_orientational_discontinuity(x2, x_prime)   
        k2 = ds * x_prime

        # Update position at the second midpoint.
        x3 = x1 + .5 * k2
    
        loc = self._check_location(x3)[0]
        if loc != "IN" or self.Interp_lambda_max(x3[1], x3[0])[0][0] < 1: 
            return None, None
    
        # Compute velocity at the second midpoint.
        x_prime = self._differential_system_tensorlines_orientational_discontinuity(x3, x_prime)   
        k3 = ds * x_prime
    
        # Update position at the endpoint.
        x4 = x1 + k3
    
        loc = self._check_location(x4)[0]
        if loc != "IN" or self.Interp_lambda_max(x4[1], x4[0])[0][0] < 1:
            return None, None
    
        # Compute velocity at the end of the time-step.
        x_prime = self._differential_system_tensorlines_orientational_discontinuity(x4, x_prime)    
        k4 = ds * x_prime
    
        # define list for velocity and positions of particle
        x_prime_update = []
        x_update = []
        
        # Compute velocity
        for j in range(self.dim):
            # Update velocity of particles
            x_prime_update.append(1.0 / 6.0*(k1[j] + 2 * k2[j] + 2 * k3[j] + k4[j])/ds)
    
        # Integration x <-- x + x_prime*ds
        for j in range(self.dim):
            # Update position of particles
            x_update.append(x[j] + x_prime_update[j]*ds)

        x_update = np.array(x_update)
        x_prime_update = np.array(x_prime_update)
        
        if self._check_location(x_update)[0] != "IN" or self.Interp_lambda_max(x[1], x[0])[0][0] < 1: 
            return None, None
    
        return x_update, x_prime_update