### Astrocyte analysis

Calculates the following measurements indicated in the block diagram of the methods section (steps (a) and (e) in the diagram):

* Astrocyte density
* Astrocyte extension length
* Astrocyte process branching point density
* Angle entropy
* Fraction of astrocytes contacting blood vessels

In [None]:
from pathlib import Path
import os
import natsort

import numpy as np
import scipy
import matplotlib.pyplot as plt
from PIL import Image
import scipy.ndimage as ndi
import pandas as pd
from skimage.morphology import skeletonize
import networkx as nx
from pyvane.pipeline import DefaultNetworkBuilder
from pyvane.image import Image as pvImage

CHANNEL_VESSEL = 'CD31'
CHANNEL_FIBER = 'GFAP'   # Not used
CHANNEL_ASTRO = 'GFP'

In [3]:
def load_roi(file):
    '''Load ROI for P0.'''
    
    if 'P0' in str(file):
        file_roi = str(file).replace('GFP(astrocytes)', 'masks').replace('.tif', '.png')
        img_roi = plt.imread(file_roi).astype(np.uint8)
        if img_roi.ndim==3:
            img_roi = img_roi[:, :, 0]
        if img_roi.max()>1:
            img_roi = 1*(img_roi==255)
        return img_roi
    else:
        return None

def read_data(file_astro, input_folders, exclude_GFAP=False):
    '''Read data regarding segmented structures.'''

    file_vessel = file_astro.replace(CHANNEL_ASTRO, CHANNEL_VESSEL)
    file_fiber = file_astro.replace(CHANNEL_ASTRO, CHANNEL_FIBER)
    img_bin_astro = np.array(Image.open(input_folders['astro']/file_astro))//255
    img_bin_vessel = np.array(Image.open(input_folders['vessel']/file_vessel))//255
    if exclude_GFAP:
        img_bin_fibers = None
    else:
        img_bin_fibers = np.array(Image.open(input_folders['GFAP']/file_fiber))//255
    img_bin_GFP = np.array(Image.open(input_folders['GFP']/file_astro))//255

    if 'P0' in file_astro:
        img_roi = load_roi(input_folders['ROI']/file_astro)
    else:
        img_roi = np.ones(img_bin_astro.shape)
        
    img_data = {'astro':img_bin_astro, 'vessel':img_bin_vessel, 'GFAP':img_bin_fibers, 'GFP':img_bin_GFP}
        
    return img_data, img_roi

input_folders = {
    'astro':Path('astrocytes/UNET_segmentations'),  # Astrocyte bodies
    'vessel':Path('vessels/UNET_segmentations'),    # Segmented blood vessels
    'GFAP':Path('GFAP/pipeline_labels'),            # GFAP signal
    'GFP':Path('astrocytes/pipeline_labels') ,      # Astrocyte processes
    'ROI':Path('ROI')                               # Regions of interest for P0
}   

pixel_size = (0.454, 0.454)   # microns
channels = ['astro', 'vessel', 'GFAP', 'GFP']   # 'astro' contains only astrocytes bodies identified by the CNN

### Functions for modeling and analyzing astrocytes

In [11]:
class Process:
    '''Stores information about an astrocyte process.'''
    
    def __init__(self, idx, gfp_pixels, skeleton_pixels, has_astro):
        
        self.idx = idx
        self.gfp_pixels = gfp_pixels
        self.skeleton_pixels = skeleton_pixels
        self.has_astro = has_astro

class MarkedProcess:
    """Create process with information regarding the location of its pixels with respect to astrocytes
    and blood vessels.
    
    `ind_individual_astro_pixels` is a dictionary keyed by astrocyte body index. Each key contains the respective 
    indices of the pixels belonging to the astrocyte. These indices can be used in `gfp_pixels` and 
    `gfp_skel_pixels` for finding the actual pixels.
    """
    
    def __init__(self, process, ind_individual_astro_pixels, ind_vessel_pixels, ind_individual_skel_astro_pixels, 
                 ind_skel_vessel_pixels):
        
        self.idx = process.idx
        self.gfp_pixels = process.gfp_pixels
        self.gfp_skel_pixels = process.skeleton_pixels
        self.has_astro = process.has_astro

        self.ind_individual_astro_pixels = ind_individual_astro_pixels
        self.ind_vessel_pixels = ind_vessel_pixels
        self.ind_individual_skel_astro_pixels = ind_individual_skel_astro_pixels
        self.ind_skel_vessel_pixels = ind_skel_vessel_pixels

class ProcessAstroMap:
    '''Maps indices of individual processes to indices of astrocytes, and vice-versa.'''
    
    def __init__(self):
        
        self._astro_to_process = {}
        self._process_to_astro = {}
        
    def add_key_astro(self, astro_idx, process_idx):
        
        self._astro_to_process[astro_idx] = process_idx
        process_to_astro = self._process_to_astro
        if process_idx in process_to_astro:
            process_to_astro[process_idx].append(astro_idx)
        else:
            process_to_astro[process_idx] = astro_idx
        
    def add_key_process(self, astro_indices, process_idx):
        
        for astro_idx in astro_indices:
            self._astro_to_process[astro_idx] = process_idx
        process_to_astro = self._process_to_astro
        if process_idx in process_to_astro:
            process_to_astro[process_idx].extend(astro_indices)
        else:
            process_to_astro[process_idx] = astro_indices
        
    def astro_to_process(self, idx):
        
        return self._astro_to_process[idx]
        
    def process_to_astro(self, idx):
        
        return self._process_to_astro[idx]
    
def get_astrocyte_labels(img_astro):
    '''Labels of connected components.'''
    
    se = np.ones((3, 3))
    img_label_astro, _ = ndi.label(img_astro, se)
    
    return img_label_astro
        
def find_processes(img_data, img_roi):
    """All connected components identified in the union image between astrocyte bodies
    and GFP are considered a process. If the process contains an astrocyte body, the
    respective parameter has_astro is marked as true."""
           
    img_astro_gfp = np.logical_or(img_data['astro'], img_data['GFP'])
    img_process = img_astro_gfp*img_roi
    skeleton = skeletonize(img_process)*img_roi
    
    se = np.ones((3, 3))
    img_label_process, _ = ndi.label(img_process, se)
    img_label_astro, _ = ndi.label(img_data['astro']*img_roi, se)

    process_slices = ndi.find_objects(img_label_process)
    processes = {}
    process_astro_map = ProcessAstroMap()
    for process_idx, process_slice in enumerate(process_slices, start=1):
        subimg_process = img_label_process[process_slice]
        subimg_astro = img_label_astro[process_slice]
        subimg_skeleton = skeleton[process_slice]

        mask = subimg_process==process_idx
        astro_indices = np.unique(subimg_astro[mask])
        if astro_indices[0]==0:
            first_idx = 1
        else:
            first_idx = 0
        if len(astro_indices[first_idx:])>0:
            process_astro_map.add_key_process(astro_indices[first_idx:], process_idx)
            has_astro = True
        else:
            has_astro = False
            
        rows, cols = np.nonzero(mask)
        rows += process_slice[0].start
        cols += process_slice[1].start
        gfp_pixels = list(zip(rows, cols))
        
        subimg_skeleton_masked = mask*subimg_skeleton
        rows, cols = np.nonzero(subimg_skeleton_masked)
        rows += process_slice[0].start
        cols += process_slice[1].start
        skeleton_pixels = list(zip(rows, cols))

        process = Process(process_idx, gfp_pixels, skeleton_pixels, has_astro)
        processes[process_idx] = process
            
    return processes, process_astro_map

def mark_processes(processes, img_data, vessel_dil_radius=2):
    """Identify process and skeleton pixels that are over astrocytes and blood vessels."""
    
    img_vessel = img_data['vessel']
    if vessel_dil_radius>0:
        img_vessel = ndi.binary_dilation(img_vessel, iterations=vessel_dil_radius)

    img_label_astro = get_astrocyte_labels(img_data['astro'])
    
    marked_processes = {}
    for process_idx, process in processes.items():
        
        gfp_pixels = np.array(process.gfp_pixels)
        gfp_skel_pixels = np.array(process.skeleton_pixels)
        
        img_vessel_over_process = img_vessel[gfp_pixels[:,0], gfp_pixels[:,1]]
        ind_vessel_pixels = np.nonzero(img_vessel_over_process)[0].tolist()
        
        skel_vessel_over_process = img_vessel[gfp_skel_pixels[:,0], gfp_skel_pixels[:,1]]
        ind_skel_vessel_pixels = np.nonzero(skel_vessel_over_process)[0].tolist()
        
        if process.has_astro:
            ind_individual_astro_pixels = get_individual_astro_pixels(img_label_astro, gfp_pixels)
            ind_individual_skel_astro_pixels = get_individual_astro_pixels(img_label_astro, gfp_skel_pixels)
        else:
            ind_individual_astro_pixels = {}
            ind_individual_skel_astro_pixels = {}
        
        mp = MarkedProcess(process, ind_individual_astro_pixels, ind_vessel_pixels, 
                           ind_individual_skel_astro_pixels, ind_skel_vessel_pixels)
        marked_processes[process_idx] = mp
        
    return marked_processes

def get_individual_astro_pixels(img_label_astro, gfp_pixels):
    
    astro_indices = img_label_astro[gfp_pixels[:,0], gfp_pixels[:,1]]
    astro_unique_indices = np.unique(astro_indices)
    if astro_unique_indices[0]==0:
        first_idx = 1
    else:
        first_idx = 0
    ind_individual_astro_pixels = {}
    for astro_idx in astro_unique_indices[first_idx:]:
        ind_individual_astro_pixels[astro_idx] = np.nonzero(astro_indices==astro_idx)[0].tolist()

    return ind_individual_astro_pixels

def draw_process(process, img=None, img_shape=None, draw_astro_skel=True):
    '''Draw processes in an image.'''
    
    if img is None and img_shape is None:
        raise ValueError("Either `img` or `img_shape` must be provided.")
    
    if img is None:
        img = np.zeros((img_shape[0], img_shape[1], 3), dtype=np.uint8)        
    
    rows, cols = zip(*process.gfp_pixels)
    img[rows, cols] = (0, 128, 0)
    rows, cols = zip(*process.gfp_skel_pixels)
    img[rows, cols] = (0, 255, 0)
    if len(process.ind_vessel_pixels)>0:
        vessel_pixels = [process.gfp_pixels[ind] for ind in process.ind_vessel_pixels]
        rows, cols = zip(*vessel_pixels)
        #img[rows, cols] = (128, 0, 0)
    if len(process.ind_skel_vessel_pixels)>0:
        vessel_pixels = [process.gfp_skel_pixels[ind] for ind in process.ind_skel_vessel_pixels]
        rows, cols = zip(*vessel_pixels)
        #img[rows, cols] = (255, 0, 0)
    num_astro = len(process.ind_individual_astro_pixels)
    for astro_idx, (_, ind_astro_pixels) in enumerate(process.ind_individual_astro_pixels.items()):
        astro_pixels = [process.gfp_pixels[ind] for ind in ind_astro_pixels]
        if len(astro_pixels)>0:
            rows, cols = zip(*astro_pixels)
            img[rows, cols] = (0, 0, 128)
    if draw_astro_skel:
        for astro_idx, (_, ind_astro_pixels) in enumerate(process.ind_individual_skel_astro_pixels.items()):
            astro_pixels = [process.gfp_skel_pixels[ind] for ind in ind_astro_pixels]
            if len(astro_pixels)>0:
                rows, cols = zip(*astro_pixels)
                img[rows, cols] = (0, 0, 255)
        
    return img

def visualize_results(img_data, processes, hide_no_astro=False, img_roi=None):
    
    if img_roi is None:
        img_roi = np.ones_like(img_data['astro'], dtype=np.uint8)
    
    img_vessel = img_data['vessel']
    img_astro = img_data['astro']
    img = np.zeros((img_vessel.shape[0], img_vessel.shape[1], 3), dtype=np.uint8)  
    rows, cols = np.nonzero(np.logical_and(img_vessel, img_roi))
    rows, cols = np.nonzero(np.logical_and(img_astro, img_roi))
    img[rows, cols, 2] = 80
    
    for process in processes.values():
        if hide_no_astro==False or process.has_astro:
            draw_process(process, img, draw_astro_skel=False)
        
    return img

def identify_relations(img_data, processes):
    '''Identifies astrocytes processes near blood vessels.'''
    
    img_vessel = img_data['vessel']
    img_astro = img_data['astro']
    img_gfp = img_data['GFP']
    
    astro_vessel_paths = {}
    for process in processes.values():
        if process.has_astro:
            astro_vessel_paths[process.idx] = {}
            # Remove all astrocytes from the skeleton
            astro_pixels = []
            for astro_idx, ind_astro_pixels in process.ind_individual_skel_astro_pixels.items():
                astro_pixels.extend([process.gfp_skel_pixels[ind] for ind in ind_astro_pixels])
            process_skeleton = set(process.gfp_skel_pixels) - set(astro_pixels)   # Modified line (20/09/2021)
                
            for astro_idx, ind_astro_pixels in process.ind_individual_skel_astro_pixels.items():
                astro_pixels = [process.gfp_skel_pixels[ind] for ind in ind_astro_pixels]
                astro_stubs_inds = identify_stubs(process_skeleton, astro_pixels)
                astro_stubs = [astro_pixels[ind] for ind in astro_stubs_inds]
                for stub in astro_stubs:
                    paths, costs = follow_path(stub, process, process_skeleton)
                    for path, cost in zip(paths, costs):
                        if img_vessel[path[-1]]>0:
                            if astro_idx in astro_vessel_paths[process.idx]:
                                astro_vessel_paths[process.idx][astro_idx].append((path, cost))
                            else:
                                astro_vessel_paths[process.idx][astro_idx] = [(path, cost)]
                   
                if astro_idx in astro_vessel_paths[process.idx]:
                    unique_paths = remove_paths_with_same_end(astro_vessel_paths[process.idx][astro_idx])
                    astro_vessel_paths[process.idx][astro_idx] = unique_paths

                                
    return astro_vessel_paths
                                    
def identify_stubs(process_skeleton, astro_skel_pixels):
    '''Get first pixel of the skeleton of a process.'''
    
    stubs = []
    for idx, astro_pixel in enumerate(astro_skel_pixels):
        for nei in iterate_neighbors(astro_pixel):
            if nei in process_skeleton:
                stubs.append(idx)
                
    return stubs
        
def iterate_neighbors(pixel_coords):
    """Iterate over neighbors of a given pixel
    
    Parameters
    ----------
    pixel_coord : tuple of int
        Reference pixel.
    Yields
    -------
    tuple of int
        The neighbors of the pixel.
    """

    neis = [(pixel_coords[0]-1, pixel_coords[1]-1), (pixel_coords[0]-1, pixel_coords[1]),
            (pixel_coords[0]-1, pixel_coords[1]+1), (pixel_coords[0], pixel_coords[1]-1),
            (pixel_coords[0], pixel_coords[1]+1), (pixel_coords[0]+1, pixel_coords[1]-1),
            (pixel_coords[0]+1, pixel_coords[1]), (pixel_coords[0]+1, pixel_coords[1]+1)]

    for n in neis:
        yield n   
        
def get_next_points(current_point, points, visited):
        
        next_points = []
        for nei in iterate_neighbors(current_point):
            if nei in points and nei not in visited:
                next_points.append(nei)
            
        return next_points
        
def follow_path(stub, process, process_skeleton):
    '''Follow the skeleton of a given process parametrically.'''

    gfp_skel_pixels = process.gfp_skel_pixels
    if len(process.ind_skel_vessel_pixels)==0:
        return [[]], []
    else:
        vessel_skel_pixels = [process.gfp_skel_pixels[ind] for ind in process.ind_skel_vessel_pixels]
    
    if stub in vessel_skel_pixels:
        # Stub is already inside a blood vessel. Return list with a single path having one pixel
        return [[stub]], [0]
    
    query_points = [stub]
    visited_points = set([stub])
    successors = {}
    predecessors = {}
    cost = {stub:0}
    terminations = set()
    while len(query_points)>0:
        current_point = query_points.pop()
        next_points = get_next_points(current_point, process_skeleton, visited_points)
        if len(next_points)>0:
            successors[current_point] = next_points
            for next_point in next_points:
                if next_point in predecessors:
                    predecessors[next_point].append(current_point)
                else:
                    predecessors[next_point] = [current_point]
                new_cost = cost[current_point] + dist(current_point, next_point)
                if next_point in cost:
                    cost[next_point] = np.minimum(cost[next_point], new_cost)
                else:
                    cost[next_point] = new_cost
                visited_points.add(next_point)
                if next_point in vessel_skel_pixels:
                    terminations.add(next_point)
                else:
                    query_points.append(next_point)
                    
    paths = []
    total_costs = []
    for termination in terminations:
        current_point = termination
        path = [termination]
        while current_point!=stub:
            preds = predecessors[current_point]
            min_cost = 10e10
            next_point = None
            for point in preds:
                if cost[point]<min_cost:
                    next_point = point
            path.append(next_point)
            current_point = next_point
            
        total_cost = cost[path[0]]
        total_costs.append(total_cost)
        paths.append(path[::-1])
            
    return paths, total_costs
    
def dist(point1, point2):
    
    if point1[0]==point2[0] or point1[1]==point2[1]:
        return 1
    else:
        return np.sqrt(2)
        
def remove_paths_with_same_end(identified_paths):

    path_end_info = {}
    paths_to_remove = set()
    for idx, (path, cost) in enumerate(identified_paths):
        path_end = path[-1]
        if path_end in path_end_info:
            if path_end_info[path_end][0]<=cost:
                paths_to_remove.add(idx)
            else:
                path_rem_idx = path_end_info[path_end][1]
                paths_to_remove.add(path_rem_idx)
        else:
            path_end_info[path_end] = (cost, idx)

    unique_paths = []
    for idx in range(len(identified_paths)):
        if idx not in paths_to_remove:
            unique_paths.append(identified_paths[idx])
            
    return unique_paths

def get_astro_vessel_distance(relations):
    '''Distance between astrocyte and blood vessel.'''

    astro_vessel_distance = {}
    for process_idx, astro_data in relations.items():
        if len(astro_data)>0:
            for astro_idx, stubs in astro_data.items():
                min_length = min([length for path, length in stubs])
                astro_vessel_distance[astro_idx] = min_length          
                
    return astro_vessel_distance

def astro_fraction_touching_vessel_from_relations(img_astro, relations, img_roi=None, normalized=True):
    
    area = img_roi.sum()
    astro_vessel_distance = get_astro_vessel_distance(relations)
    num_astro_touching = len(astro_vessel_distance)
    img_lab = get_astrocyte_labels(img_astro)
    num_astro = img_lab.max()
    
    if normalized:
        fraction_touching = num_astro_touching/num_astro  # Fraction of astrocytes touching blood vessels, measurement used in the paper
    else:
        fraction_touching = num_astro_touching/area       # Density of astrocytes touching blood vessels
    
    return fraction_touching

def astro_fraction_touching_vessel(img_data, img_roi, normalized=True):
    '''Main function for calculating the fraction of astrocytes having a process overlapping with a blood vessel.'''

    processes, process_astro_map = find_processes(img_data, img_roi)
    marked_processes = mark_processes(processes, img_data, vessel_dil_radius=0)
    relations = identify_relations(img_data, marked_processes)
    aftv = astro_fraction_touching_vessel_from_relations(img_data['astro'], relations, img_roi, normalized)
    
    return aftv

def get_age_dataframe(dataframe, age):
        
    return dataframe[list(map(lambda x: age in x, dataframe.index))]

### Contact analysis

Measures fraction of astrocytes contacting blood vessels.

In [None]:
pixel_size = np.array(pixel_size)
files = os.listdir(input_folders['astro'])

touching_data = {}
files_tags = []
for file_astro in [files[4*48+1]]:
    
    print(file_astro)
    filename = file_astro.split('.')[0]
    file_tag = '@'.join(filename.split('@')[:-1])
    files_tags.append(file_tag)
    
    img_data, img_roi = read_data(file_astro, input_folders, exclude_GFAP=True)

    aftv = astro_fraction_touching_vessel(img_data, img_roi, normalized=False)
    touching_data[file_tag] = aftv

pd_touching = pd.DataFrame.from_dict(data=touching_data, orient='index', columns=["Fraction"])
column_name = 'Density of astrocytes touching blood vessels (normalized by astrocyte density)'
output_file = '../results/2D analysis astrocytes results (normalized).xlsx'
df = pd.read_excel(output_file, index_col=0, engine='openpyxl')     # Update excel file containing other results
df.loc[pd_touching['Fraction'].index, column_name] = pd_touching['Fraction']
df.to_excel(output_file)


### Astrocyte process angle entropy

Analysis of the complexity of astrocyte processes. The entropy of the angles of the processes are calculated for each astrocyte, and averaged over samples.

In [9]:
def vector_norm(vector):
    """The norm of a vector."""

    return np.sqrt(np.sum(vector**2))

def parametric_line_fit(path):
    """Fit a straight line to the points in `path`. The returned value is an array containing
    points along the fitted line. Note that the points are equally spaced along `path`.

    Parameters
    ----------
    path : list of tuple
        List containing points in the path.

    Returns
    -------
    fitted_coord : ndarray
        Points along the fitted line. The size of the array is the same as the input list `path`.
    """

    path = np.array(path)

    dpath = np.diff(path, axis=0)
    dlength = np.sqrt(np.sum(dpath**2, axis=1))
    s = np.array([0] + (np.cumsum(dlength)/sum(dlength)).tolist())

    fitted_coord = []
    for coord in path.T:
        slope, intercept = scipy.polyfit(s, coord, 1)
        fitted_coord.append(slope*s + intercept)

    return fitted_coord

def point_path_dist(point, path):
    """Calculate all distances between a point and a path.

    Parameters
    ----------
    point : tuple of float
        A point.
    path : list of tuple
        List containing points in the path.

    Returns
    -------
    float
        The requested distances.
    """

    return np.sqrt(np.sum((path-point)**2, axis=1))

def find_path_segment(path, pixel_index, radius):
    """Find the segment in `path` in which all points are a distance smaller than or equal
    to `radius` to the point in `path` defined by `pixel_index`. This is used for extracting
    the segment around a given point in the path for calculating the tortuosity.

    Parameters
    ----------
    path : list of tuple
        List containing points in the path.
    pixel_index : int
        The index of the point in `path` that will be used for extracting the segment.
    radius : float
        The radius used for extracting the segment.

    Returns
    -------
    edge_segment : ndarray
        The extracted segment.
    """

    path = np.array(path)
    pixel = path[pixel_index]

    dist = point_path_dist(pixel, path)
    ind_left, is_left_border = find_left_path_segment_idx(path, pixel_index, radius, dist)
    ind_right, is_right_border = find_right_path_segment_idx(path, pixel_index, radius, dist)

    edge_segment = path[ind_left:ind_right]

    return edge_segment

def points_line_dist(points, v1, v2):
    """Calculate the smallest distance for each point in `points` to the line
    defined by the points `v1` and `v2`.

    Parameters
    ----------
    points : list of tuple
        A set of points.
    v1 : tuple
        The first point of a line.
    v2 : tuple
        The second point of a line.

    Returns
    -------
    distances : ndarray
        The requested distances.
    """

    points, v1, v2 = np.array(points), np.array(v1), np.array(v2)

    if points.ndim==1:
        points = points[None]

    # Line versor
    v12 = v2 - v1
    n12 = v12/vector_norm(v12)

    # Vector from v1 to points
    v1p = v1 - points

    # Scalar product between v1p and n12 (i.e., size of v1p projected into n12)
    v1pProjn12 = np.sum(v1p*n12, axis=1)
    v1pProjn12 = v1pProjn12.reshape(points.shape[0], 1)

    # Vector from points to the closest point in the line
    vpx = v1p - v1pProjn12*n12

    # Norm of all vectors in vpx
    distances = np.sqrt(np.sum(vpx**2, axis=1))

    return distances

def find_left_path_segment_idx(path, pixel_index, radius, dist=None):
    """Auxiliary function for `find_path_segment`. Find the starting index of the
    segment that will be extracted from the path.

    Parameters
    ----------
    path : list of tuple
        List containing points in the path.
    pixel_index : int
        The index of the point in `path` that will be used for extracting the segment.
    radius : float
        The radius used for extracting the segment.
    dist : float, optional
        The distances between the point defined by `pixel_index` and each point in `path`.
        It is calculated if not provided.

    Returns
    -------
    ind_left : int
        Starting index of the segment to be extracted.
    is_left_border : bool
        If True, indicates that the segment begins at the first point of `path`.
    """

    if dist is None:
        pixel = path[pixel_index]
        dist = point_path_dist(pixel, path)
    ind_left = pixel_index
    while ind_left>=0 and dist[ind_left]<=radius:
    #while ind_left>0 and dist[ind_left]<radius:    # To agree with old algorithm
        ind_left -= 1
    ind_left += 1

    if ind_left==0:
        is_left_border = True
    else:
        is_left_border = False

    return ind_left, is_left_border

def find_right_path_segment_idx(path, pixel_index, radius, dist=None):
    """Auxiliary function for `find_path_segment`. Find the last index of the
    segment that will be extracted from the path.

    Parameters
    ----------
    path : list of tuple
        List containing points in the path.
    pixel_index : int
        The index of the point in `path` that will be used for extracting the segment.
    radius : float
        The radius used for extracting the segment.
    dist : float, optional
        The distances between the point defined by `pixel_index` and each point in `path`.
        It is calculated if not provided.

    Returns
    -------
    ind_right : int
        Last index +1 of the segment to be extracted. Thus, the index can be used for
        slicing the `path` (e.g., path[:ind_right]).
    is_right_border : bool
        If True, indicates that the segment ends at the last point of `path`.
    """

    num_pixels = len(path)
    if dist is None:
        pixel = path[pixel_index]
        dist = point_path_dist(pixel, path)
    ind_right = pixel_index
    while ind_right<num_pixels and dist[ind_right]<=radius:
        ind_right += 1
    ind_right -= 1

    if ind_right==num_pixels-1:
        is_right_border = True
    else:
        is_right_border = False

    return ind_right+1, is_right_border

def path_angles(path, scale, return_valid=True):
    """Calculates the angle of a straight line fit around each point in `path`.

    Parameters
    ----------
    path : list of tuple
        List containing points in the path.
    scale : float
        The scale at which the straight line will be fitted.
    return_valid : bool
        If True, regions close to terminations and bifurcations of blood vessels are not considered
        in the calculation. This ignored region becomes larger with the `scale` parameter.

    Returns
    -------
    angles : list of float
        The calculated angles.
    left_border_index : int
        If `return_valid` is True, contains the index of the first point used in the calculationd.
    right_border_index : int
        If `return_valid` is True, contains the index of the last point used in the calculation.
    """

    if len(path)<2 or scale<2:
        raise ValueError("Path must have at least two points for fitting.")

    num_pixels = len(path)
    left_border_index = num_pixels
    right_border_index = 0
    found_left_border = False
    found_right_border = False

    radius = scale/2
    idx_first_pixel, _ = find_right_path_segment_idx(path, 0, radius)
    idx_first_pixel -= 1
    idx_last_pixel, _ = find_left_path_segment_idx(path, num_pixels-1, radius)
    idx_last_pixel += 1
    left_border_index = idx_first_pixel
    right_border_index = idx_last_pixel
    if not return_valid:
        idx_first_pixel = 0
        idx_last_pixel = num_pixels

    angles = []
    for pixel_index in range(idx_first_pixel, idx_last_pixel):
        pixel = path[pixel_index]

        path_segment = find_path_segment(path, pixel_index, scale/2)

        if len(path_segment)<2:
            raise ValueError("Path segment must have at least two points for fitting.")

        line_coords = parametric_line_fit(path_segment)
        v1, v2 = [], []
        for coord in line_coords:
            v1.append(coord[0])   #v1: (row, col) of first point
            v2.append(coord[-1])  #v2: (row, col) of last point

        angle = np.arctan((v2[0]-v1[0])/(v2[1]-v1[1]))
        angles.append(angle)

    return angles, left_border_index, right_border_index

def find_process_index(path, img_skel_label):
    '''Find index of processes contained in `path`.'''

    process_indices = set()
    for pixel in path:
        process_indices.add(img_skel_label[tuple(pixel)])
    if 0 in process_indices:
        process_indices.remove(0)
    if len(process_indices)>1:
        print("Warning, more than one process index found")
    
    return list(process_indices)[0]

def astro_process_angles(marked_processes, img_data, scale, length_threshold=10):
    '''Main function for finding the local angle of astrocyte processes.'''

    img_gfp = img_data['GFP']
    img_skel = np.zeros((img_gfp.shape[0], img_gfp.shape[1]), dtype=np.uint8)
    img_skel_label = np.zeros((img_gfp.shape[0], img_gfp.shape[1]), dtype=int)
    for process_idx, process in enumerate(marked_processes.values(), start=1):
        if process.has_astro:
            astro_pixels = []
            for astro_idx, ind_astro_pixels in process.ind_individual_skel_astro_pixels.items():
                astro_pixels.extend([process.gfp_skel_pixels[ind] for ind in ind_astro_pixels])
            process_skeleton = set(process.gfp_skel_pixels) - set(astro_pixels)
            for pixel in process_skeleton:                
                img_skel[pixel] = 1
                img_skel_label[pixel] = process_idx

    img = pvImage(img_skel)
    nb = DefaultNetworkBuilder(length_threshold=length_threshold)
    graph = nb.apply(img)

    edges_atts = nx.get_edge_attributes(graph, 'path')
    image_angles = {}
    edges_by_process = {}
    for edge, path in edges_atts.items():
        if len(path)>length_threshold:
            path = np.array(path)
            process_idx = find_process_index(path, img_skel_label)
            angles, _, _ = path_angles(path, scale=scale, return_valid=True)
            if len(angles)>0:
                angles = [ang*180/np.pi for ang in angles]
                if process_idx in image_angles:
                    image_angles[process_idx].append(angles)
                    edges_by_process[process_idx].append(path)
                else:
                    image_angles[process_idx] = [angles]          
                    edges_by_process[process_idx] = [path]
                
    return image_angles, edges_by_process, graph, img_skel_label

def entropy(values, bins, use_exp=False, n=None):
    '''Entropy of a list of values.'''
    
    hist, _ = np.histogram(values, bins)
    probs = hist/hist.sum()
       
    ind = probs>0
    entropy = -np.sum(probs[ind]*np.log(probs[ind]))
    
    if use_exp:
        entropy = np.exp(entropy)
        if n is not None:
            entropy /= n
    else:
        if n is not None:
            entropy /= np.log(n)
    
    return entropy

def calculate_angles(files, input_folders, scale, length_threshold=10, vessel_dil_radius=0):
    '''Calculate process angles for all files.'''

    angle_data = {}
    files_tags = []
    for file_astro in files:

        print(file_astro)
        filename = file_astro.split('.')[0]
        file_tag = '@'.join(filename.split('@')[:-1])
        files_tags.append(file_tag)

        img_data, img_roi = read_data(file_astro, input_folders, exclude_GFAP=True)

        processes, process_astro_map = find_processes(img_data, img_roi)
        marked_processes = mark_processes(processes, img_data, vessel_dil_radius=vessel_dil_radius)
        image_angles, edges_by_process, graph, img_skel_label = astro_process_angles(marked_processes, img_data, scale, length_threshold)

        angle_data[file_tag] = image_angles

    return angle_data

def calculate_entropies(angle_data, bins, use_exp=False, n=None):
    '''Angle entropies for the processes of each astrocyte. The values are averaged over the whole sample.'''
    
    entropies = {}
    for file_tag, image_angles in angle_data.items():
        angles = []
        process_entropies = []
        for process_idx, process_paths in image_angles.items():
            process_angles = [ang for path in process_paths for ang in path]
            angles += process_angles

            process_entropy = entropy(process_angles, bins, use_exp, n)

            process_entropies.append(process_entropy)

        image_entropy = entropy(angles, bins, use_exp, n)
        entropies[file_tag] = [image_entropy, np.mean(process_entropies)]

    pd_entropy = pd.DataFrame.from_dict(data=entropies, orient='index', columns=["Image entropy", "Avg process entropy"])
    
    return pd_entropy

In [7]:
output_file = '../results/angle_heterogeneity.xlsx'
length_threshold = 9   # Edges smaller than this are considered spurious and pruned.
scale = 20             # Scale, in pixels, for the straight line fit 
n = 25                 # Number of histogram bins for entropy calculation
bins = np.linspace(-90, 90, n)


pixel_size = np.array(pixel_size)
files = os.listdir(input_folders['astro'])
files = natsort.natsorted(files)

angle_data = calculate_angles(files, input_folders, scale, length_threshold)
pd_entropy = calculate_entropies(angle_data, bins)

pd_entropy.to_excel(output_file)

### Density, length and branchness

Calculates number of astrocytes, length and branching point density of astrocytes processes.

In [None]:
def path_length(path):

    dpath = np.diff(path, axis=0)
    dlengths = np.sqrt(np.sum(dpath**2, axis=1))
    path_length = np.sum(dlengths)

    return path_length

output_file = Path('2D analysis astrocytes extensions_results.xlsx')

files_tags = []
measurements = []
for file_astro in files:

    print(file_astro)
    filename = file_astro.split('.')[0]
    file_tag = '@'.join(filename.split('@')[:-1])
    files_tags.append(file_tag)

    img_data, img_roi = read_data(file_astro, input_folders, exclude_GFAP=True)

    processes, process_astro_map = find_processes(img_data, img_roi)
    marked_processes = mark_processes(processes, img_data, vessel_dil_radius=0)

    # Remove skeleton above astrocyte bodies (i.e., keep only skeleton of processes)
    img_gfp = img_data['GFP']
    img_skel = np.zeros((img_gfp.shape[0], img_gfp.shape[1]), dtype=np.uint8)
    img_skel_label = np.zeros((img_gfp.shape[0], img_gfp.shape[1]), dtype=int)
    for process_idx, process in enumerate(marked_processes.values(), start=1):
        if process.has_astro:
            astro_pixels = []
            for astro_idx, ind_astro_pixels in process.ind_individual_skel_astro_pixels.items():
                astro_pixels.extend([process.gfp_skel_pixels[ind] for ind in ind_astro_pixels])
            process_skeleton = set(process.gfp_skel_pixels) - set(astro_pixels)
            for pixel in process_skeleton:                
                img_skel[pixel] = 1
                img_skel_label[pixel] = process_idx

    img = pvImage(img_skel, pix_size=pixel_size)
    # Build graph of astrocytes processes
    nb = DefaultNetworkBuilder(length_threshold=length_threshold)
    graph = nb.apply(img)

    # Process length
    edges_atts = nx.get_edge_attributes(graph, 'path')
    total_length = 0
    for edge, path in edges_atts.items():
            path = np.array(path)
            length = path_length(path)
            total_length += length*pixel_size[0]

    # Astrocyte density
    img_bin_astro = img_data['astro']
    area = np.sum(img_roi)*np.prod(pixel_size)
    img_bin_astro = img_bin_astro*img_roi
    _, num_comp_astro = ndi.label(img_bin_astro, np.ones((3, 3)))
    astro_density = num_comp_astro/area
    
    # Number of bifurcations
    degrees = np.array(list(dict(graph.degree()).values()))
    num_bif = np.sum(degrees>=3)
    
    measurements.append((1e6*astro_density, 1e3*total_length/area, 1e6*num_bif/area))
    
columns_names = ['Astrocyte Density (1/mm^2)', 'Extension length (mm/mm^2)', 'Extension branching density (1/mm^2)']
quantifications_table = pd.DataFrame(measurements, index=files_tags, columns=columns_names)
quantifications_table.to_excel(output_file, sheet_name='Results', index_label='Filename')