In [6]:
import pandas as pd
import pathlib
import numpy as np
from numpy import log as ln
import scipy.stats as stats
from scipy import ndimage
import multiprocessing
import time

In [7]:
def walk(path):
    """Collects paths for files in path parameter

    Parameters
    ----------
    path: str or pathlib.Path() object
            Path to metadata folder containing IDR study directories

    Returns
    -------
    PosixPath object
    """
    for p in pathlib.Path(path).iterdir(): 
        if p.is_dir(): 
            yield from walk(p)
            continue
        yield p.resolve()

# Define study metadata directory
studies_metadata_dir = pathlib.Path(
    '../data/metadata')

# Collect metadata file paths
metadata_files = list(walk(studies_metadata_dir))


In [19]:
# Define stats
# Relative and absolute frequencies
def category_frequencies(attribute_elements):
    """"""
    total_instances = sum(attribute_elements.values())
    rel_freq_dict = dict()
    abs_freq_list = list()
    for image_attribute in attribute_elements.keys():
        abs_freq_list.append(attribute_elements[image_attribute])
        rel_freq_dict[image_attribute] = attribute_elements[image_attribute] / total_instances
    return rel_freq_dict, abs_freq_list
    
# Shannon Index
def h_index(p):
    """Calculates the Shannon Index of a set of unique attribute instances.
    Parameters
    ----------
    p: dict
        Dictionary of relative frequencies for each unique element in an attribute column.

    Returns
    -------
    h: float
        Shannon Index value
    
    results: list
        List of each -p_iln(p_i) value to use for Normalized Median Evenness statistic.
    """
    results = list()
    for entry in p.values():
        results.append(entry * ln(entry))
    
    results = np.array(results)
    h = -(sum(results))
    return h, results

# Pielou's Evenness
def pielou(h, s):
    """"""
    return h / ln(s)

# Normalized Median Evenness
def nme(h_list):
    """"""
    temp_list = list()
    for h_value in h_list:
        temp_list.append(-1.0 * h_value)
    temp_list = np.array(temp_list)
    nme = ndimage.median(temp_list) / temp_list.max()
    return nme

def gini_coef(x):
    x = np.array(x)
    total = 0
    for i, xi in enumerate(x[:-1], 1):
        total += np.sum(np.abs(xi - x[i:]))
    return total / (len(x)**2 * np.mean(x))

def stats_pipeline(attribute_elements):
    # Richness
    s = len(attribute_elements.keys())

    # Shannon Index
    rel_frequencies, abs_frequencies = category_frequencies(attribute_elements=attribute_elements)
    h, pi_list = h_index(p=rel_frequencies)

    # Calculate Normalized Median Evenness
    nme_result = nme(pi_list)

    # Calculate Pielou's evenness
    j = pielou(h=h, s=s)

    # Calculate Gini coefficient
    gc = gini_coef(abs_frequencies)

    return s, h, nme_result, j, gc
    

In [9]:
# Define stat collection pipeline
def collect_study_stats(metadata_file_path, results_list, na_cols=["pixel_size_x", "pixel_size_y"]):
    """Collecting statistics within a single file"""

    # Read parquet into pandas df
    metadata_df = pd.read_parquet(metadata_file_path)

    # Extract metadata from file name and dataframe
    metadata_pq = metadata_file_path.name
    study_name = metadata_pq.split('.')[0]
    attribute_names = metadata_df.columns.to_list()

    # Remove irrelevant attributes
    for attribute in na_cols:
        attribute_names.remove(attribute)

    # Collect statistics for each attribute
    for attribute in attribute_names:
        unique_entries = metadata_df[attribute].unique()
        attribute_elements = dict()
        for element in unique_entries:
            attribute_elements[element] = len(metadata_df[metadata_df[attribute] == element])

        s, h, nme_result, j, gc = stats_pipeline(attribute_elements=attribute_elements)

        # Append stats to attribute_results
        results_list.append([study_name,
                                attribute, 
                                s,
                                h, 
                                nme_result, 
                                j,
                                gc])

    return stat_results_df

In [95]:
 # Collect metadata
 all_results_list = list()
 
 # Initialize Pool object
start = time.time()
available_cores = len(os.sched_getaffinity(0))
pool = multiprocessing.Pool(processes=available_cores)
print(
    f"\nNow processing {len(metadata_files)} studies with {available_cores} cpu cores.\n"
)

# Build the iterative object for pool.starmap()
metadata_file_paths = list(zip(metadata_files, all_results_list))


for metadata_path in metadata_file_paths:
    collect_study_stats(metadata_path, all_results_list)

stat_results_df = pd.DataFrame(data=all_results_list, columns=['Study_Name', 'Attribute', 'S', 'H', 'NME', 'J', 'GC'])
print(stat_results_df)

# Make directories
stats_dir = pathlib.Path("../data/statistics")
pathlib.Path.mkdir(stats_dir, exist_ok=True)

# Save individual stats as parquet file
output_file = pathlib.Path(
        screen_dir, f"individual_studies_diversity.parquet.gzip"
    )
stat_results_df.to_parquet(output_file, compression="gzip")



Now processing 3 studies with 16 cpu cores.



  nme = ndimage.median(temp_list) / temp_list.max()
  return h / ln(s)
  nme = ndimage.median(temp_list) / temp_list.max()
  return h / ln(s)
  nme = ndimage.median(temp_list) / temp_list.max()
  return h / ln(s)


                                 Study_Name             Attribute      S  \
0     idr0080-way-perturbation_screenA_2701             screen_id      1   
1     idr0080-way-perturbation_screenA_2701            study_name      1   
2     idr0080-way-perturbation_screenA_2701              plate_id     18   
3     idr0080-way-perturbation_screenA_2701            plate_name     18   
4     idr0080-way-perturbation_screenA_2701               well_id   6912   
5     idr0080-way-perturbation_screenA_2701        imaging_method      1   
6     idr0080-way-perturbation_screenA_2701                sample      1   
7     idr0080-way-perturbation_screenA_2701              organism      1   
8     idr0080-way-perturbation_screenA_2701         organism_part      1   
9     idr0080-way-perturbation_screenA_2701             cell_line      3   
10    idr0080-way-perturbation_screenA_2701                strain      1   
11    idr0080-way-perturbation_screenA_2701       gene_identifier     53   
12    idr008

In [20]:
def collect_databank_stats(metadata_dir, na_cols=["pixel_size_x", "pixel_size_y"]):
    """Statistics pipeline for computation accross a databank
    """
    metadata_directory = pathlib.Path("../data/metadata")

    # Open and concatinate study metadata dataframes from .parquet files
    databank_metadata = pd.concat([pd.read_parquet(study_metadata_file) for study_metadata_file in walk(metadata_directory)])
    
    # Get image_attribute names
    attribute_names = databank_metadata.columns.to_list()

    # Remove irrelevant attributes
    for attribute in na_cols:
        attribute_names.remove(attribute)

    results_list = list()
    # Collect statistics for each attribute
    for attribute in attribute_names:
        unique_entries = databank_metadata[attribute].unique()
        attribute_elements = dict()
        for element in unique_entries:
            attribute_elements[element] = len(databank_metadata[databank_metadata[attribute] == element])

        s, h, nme_result, j, gc = stats_pipeline(attribute_elements=attribute_elements)

        # Append stats to attribute_results
        results_list.append([
                                attribute, 
                                s,
                                h, 
                                nme_result, 
                                j,
                                gc])
    
    stat_results_df = pd.DataFrame(data=results_list, columns=['Attribute', 'S', 'H', 'NME', 'J', 'GC'])

    return stat_results_df
    
databank_stats = collect_databank_stats(metadata_dir=studies_metadata_dir)
print(databank_stats)


NameError: name 'study_name' is not defined