In [1]:
import sqlite3 as sql
import pandas as pd
import networkx as nx
import numpy as np
from pathlib import Path

In [None]:
# read input files
tax_data_dir = Path('../taxonomy/')
tax_data_dir.mkdir(exist_ok=True)
nodes_df = pd.read_csv(tax_data_dir / 'nodes.dmp', header=None, sep="|", usecols=[0, 1, 2], names = [
    'tax_id', 'parent_tax_id', 'rank'
], skipinitialspace=True, converters={'rank': lambda x: x.strip()})

# we don't immediately need names
names_df = pd.read_csv(tax_data_dir / 'names.dmp', header=None, sep="|", usecols=[0, 1], names = [
    'tax_id', 'name'
], skipinitialspace=True, converters={'name': lambda x: x.strip()})

In [3]:
names_join_df = pd.merge(
    names_df, nodes_df, on='tax_id', how='left'
)
nodes_by_tax_id = nodes_df.set_index('tax_id')
joint_by_name = names_join_df.set_index('name')

In [4]:
# track lineage by tax id
graph = nx.DiGraph(nodes_df[['parent_tax_id', 'tax_id']].to_records(index=False).tolist()[1:])
# find the root nodes of graph
# In a directed graph, root nodes have in-degree 0 (no incoming edges)
# should only be one root
root = [n for n, d in graph.in_degree() if d == 0][0]


In [5]:
ranks = ['kingdom', 'phylum', 'class', 'order', 'family', 'genus']
alt_kingdom = 'domain'
n_ranks = len(ranks)
# used to sort the id_df by rank
def check_rank(name):
    try:
        return ranks.index(name)
    except ValueError:
        return np.inf

# this function spects that id_df is sorted by the same order as
# ranks, therefore we can just get the correct rank just by the index
# if out of range, that means the rank of the subject is higher than the requested
# we'll eventually get an index error for everything except species
def get_id_at_rank(id_df, rank_idx):
    try:
        return int(id_df.index[rank_idx])
    except IndexError:
        return None

vect_len = lambda col: col.str.len()
vect_check_rank = np.vectorize(check_rank)

In [6]:
#nodes_by_tax_id is actually the edges graph. It's called that because it's derived from nodes.dmp
def get_all_parents(node: int, graph: nx.DiGraph, nodes_by_tax_id: pd.DataFrame):
    s_path = nx.shortest_path(graph, 1, node)
    tmp = nodes_by_tax_id.loc[s_path]
    filter_msk = tmp['rank'].isin(ranks)
    # Fix for lineages that don't have kingdom
    if (tmp['rank'] == ranks[0]).sum() == 0:
        available_ranks = tmp['rank'].to_list()
        if alt_kingdom in available_ranks:
            filter_msk.iloc[available_ranks.index(alt_kingdom)] = True
        else:
            filter_msk.iloc[2] = True # The second node from the root is the first meaningful one.

    tmp = tmp.loc[filter_msk]
    tmp_res = {
        rank: get_id_at_rank(tmp, i)
        for i, rank in enumerate(ranks)
    }
    tmp_res['tax_id'] = node
    return tmp_res


In [7]:
# don't both either node of the first level, they're useless and cause errors
filtered_nodes = nodes_by_tax_id.loc[nodes_by_tax_id.query("parent_tax_id != 1").index]

all_parents = pd.DataFrame.from_dict([
    get_all_parents(tax_id, graph, nodes_by_tax_id) for
    tax_id in filtered_nodes.index
], dtype='Int64')[['tax_id'] + ranks]

In [8]:

# .groupby('rank').agg({'tax_id': lambda x: x.unique()})

In [9]:
all_parents.to_csv('../tax_parents.csv', index=False)