In [None]:
"""
get cell type info from manc 
"""
import pandas as pd
import numpy as np
import re
import neuprint

# for connecting to neuPrint (add token)
token = ""
np_client = neuprint.Client('https://neuprint.janelia.org', dataset='manc:v1.2.1', token=token)


In [None]:
# get cell type info
query = ("MATCH (n:Neuron) RETURN DISTINCT n.systematicType, n.type, n.class, n.subclass, "
         "n.somaNeuromere, n.hemilineage, n.birthtime, "
         "n.target, n.origin, n.celltypePredictedNt, n.entryNerve, n.exitNerve, n.longTract, count(n) AS count")
np_results_raw = np_client.fetch_custom(query)

In [None]:
np_results = np_results_raw

In [None]:
# rename columns
np_results = np_results.rename(columns=(lambda x: x.removeprefix('n.')))
np_results = np_results.rename(columns=({'class': 'cell_class'}))

In [None]:
# some preprocessing
# drop rows where type is null or ends with xx instead of number
np_results = np_results[np_results['type'].notnull()]
np_results = np_results[~(np_results['type'].str.endswith('xx') | np_results['type'].str.endswith('XX'))]

In [None]:
# only keep cell types that are in new_cell_FBbt_ids.tsv
fbbt_mapping = pd.read_csv('new_cell_FBbt_ids.tsv', low_memory=False, sep='\t')

In [None]:
np_results = np_results[np_results['type'].isin(fbbt_mapping['type'])]

In [None]:
# convert null-like values to nulls
np_results = np_results.replace({'<NA>': None, 'NA': None, 'TBD': None, 'none': None, 'None': None,
                                 'unclear': None, 'unknown': None})

In [None]:
# minor replacements
np_results['cell_class'] = np_results['cell_class'].replace('Sensory TBD', 'sensory neuron')
np_results['birthtime'] = np_results['birthtime'].replace('early secondary', 'secondary')
np_results['type'] = np_results['type'].replace('oviDN', 'DNad001')
np_results['origin'] = np_results['origin'].map(lambda x: x.replace('tct', 'Tct'), na_action='ignore')
np_results = np_results.replace('AbNTBD', 'AbN')

In [None]:
# split target and origin on . if the . is not followed by a space
# drop sides from target, origin
def process_regions(x):
    if not x:
        return None
    else:
        x = x.strip('. ')
        x = re.sub('\. ', ' ', x)
        x = re.sub('[LR]HS ', '', x)
        return '|'.join(list(set([y.strip(',') for y in re.split('\.|_', x) 
                                  if y.strip(',') not in ['L', 'R', 'RL', 'LR']])))
np_results['target'] = np_results['target'].map(process_regions, na_action='ignore')
np_results['origin'] = np_results['origin'].map(process_regions, na_action='ignore')

In [None]:
# split nerves on space or _ and drop commas and L/R
def process_nerves(x):
    if not x:
        return None
    else:
        return '|'.join(list(set([y.strip(',') for y in re.split(' |_', x) if y.strip(',') not in ['L', 'R']])))
np_results['entryNerve'] = np_results['entryNerve'].map(process_nerves, na_action='ignore')
np_results['exitNerve'] = np_results['exitNerve'].map(process_nerves, na_action='ignore')

In [None]:
# aggregate duplicate info rows and sum count
np_results_grouped = np_results.groupby(["type"], dropna=False).agg({
    "cell_class": set, 
    "hemilineage": set,
    'systematicType': set,
    "subclass": set, 
    'somaNeuromere': set, 
    "birthtime": set, 
    "celltypePredictedNt": set, 
    "target": set, 
    "origin": set, 
    "entryNerve": set, 
    "exitNerve": set, 
    "longTract": set, 
    'count': 'sum'
}).reset_index()

In [None]:
# attempt to find common denominators for multiple value fields
segment_prefixes = ['prothoracic ', 'mesothoracic ', 'metathoracic ', 'front ', 'middle ', 'hind ']
segment_suffixes = ['T1', 'T2', 'T3', ' A1', ' A2', ' A3']

abdominal_nerves = ['AbN', 'AbNT', 'AbN1', 'AbN2', 'AbN3', 'AbN4', 'AbN5']
tectulums = ['Tct', 'LTct', 'HTct', 'IntTct', 'UTct', 'WTct']

def map_multiple_to_terms_in_common(input_set):
    """
    1. check for an entry of None (at least one individual has no info)
    2. unpack lists from set of strings (giving list of sets)
    3. consider 'multi' to match all and drop from set
    4. check if an item appears in all sets and add to common_terms set
    5. remove any common terms from the listed sets
    6. remove segment specificity substrings (prefixes and suffixes above)
    7. check if an item appears in all sets and add to common_terms set
    8. check if all sets contain an abdominal nerve (if so add 'AbN')
    9. check if all sets contain a type of tectulum (if so add 'Tct')
    """
    if None in input_set:
        return {None}
    unpacked_set = [set(x.split('|')) for x in input_set if x]
    if len(unpacked_set) > 1:
        try:
            unpacked_set.remove('multi')
        except ValueError:
            pass
        if len(unpacked_set) == 1:
            return unpacked_set[0]
        
        common_terms = set.intersection(*unpacked_set)
        unpacked_set = [set([b for b in a if b not in common_terms]) for a in unpacked_set]
        
        neutralised_list_of_sets = [set([b.removeprefix(p).removesuffix(s) 
                                         for p in segment_prefixes for s in segment_suffixes for b in a]) 
                                    for a in unpacked_set]
        
        common_neutral_terms = set.intersection(*neutralised_list_of_sets)
        common_terms.update(common_neutral_terms)
        
        if all(any(b in abdominal_nerves for b in a) for a in neutralised_list_of_sets):
            common_terms.update(['AbN'])
        
        if all(any(b in tectulums for b in a) for a in neutralised_list_of_sets):
            common_terms.update(['Tct'])
        
        if len(common_terms) > 0:
            return common_terms
        else:
            return {None}
        
    elif len(unpacked_set) == 1:
        return unpacked_set[0]
    else:
        return {None}

In [None]:
cols_to_process = ['target', 'origin', 'entryNerve', 'exitNerve', 'longTract']
for col in cols_to_process:
    np_results_grouped['common_' + col] = np_results_grouped[col].map(map_multiple_to_terms_in_common)

np_results_grouped = np_results_grouped.drop(columns=cols_to_process)

In [None]:
# fix only some of a type classed as ascending
def drop_ascending(set_of_classes):
    if len(set_of_classes) > 1:
        for t in ['sensory', 'afferent']:
            if all(t in x for x in set_of_classes):
                cell_class = t + ' neuron'
            else:
                cell_class = {None}
        return cell_class
    else:
        return set_of_classes
    
np_results_grouped['cell_class'] = np_results_grouped['cell_class'].map(drop_ascending)

In [None]:
# map {None} to empty set (don't want to lose None from sets len > 1)
np_results_grouped_filtered = np_results_grouped.map(lambda x: x - {None} if ((type(x) is set) and len(x)==1) else x)

In [None]:
np_results_grouped_filtered.to_csv('typing_info.tsv', sep='\t', index=None)

In [None]:
# FBbt mapping files - probably don't need to update this

# get unique values from two columns of lists
entry = np_results_grouped['common_entryNerve'].explode().drop_duplicates()
exit = np_results_grouped['common_exitNerve'].explode().drop_duplicates()
nerves = pd.concat([exit, entry.rename({'common_entryNerve':'common_exitNerve'})]).drop_duplicates()



In [None]:
cell_class = np_results_grouped['cell_class'].explode().drop_duplicates()

In [None]:
cell_class.to_csv('class_FBbt_map2.tsv', sep='\t', index=None) # change to do others too