In [None]:
import json
import gzip
from tqdm import tqdm

In [None]:
## public.jsonl comes from https://cov2tree.nyc3.cdn.digitaloceanspaces.com/latest_public.jsonl.gz
with gzip.open("latest_public.jsonl.gz", "rb") as infile:
    for line in infile:
        mutations = json.loads(line)
        break
    
## lookup dict for mutation ids
mutation_list = {
    x["mutation_id"] : {
        "gene": x['gene'],
        "ref": x['previous_residue'],
        "pos": x['residue_pos'],
        "alt": x['new_residue']
    }
    for x in mutations["mutations"]
}


In [None]:
with gzip.open("latest_public.jsonl.gz", "rb") as infile:
    infile_lines = {
        str(y["node_id"]) : y
        for x in infile.read().splitlines() 
        if ((y :=  json.loads(x)) and y.get("node_id", False))
    }

In [None]:
## lookup dict for mutation ids
mutation_list = [
    {
        "gene": x['gene'],
        "ref": x['previous_residue'],
        "pos": x['residue_pos'],
        "alt": x['new_residue']
    }
    for x in mutations["mutations"]
]


In [None]:
def get_parent_node_for_node_id(input_node_id):
    line_json = infile_lines[str(input_node_id)]
    node_id = line_json.get("node_id", False)
    if str(node_id) == str(input_node_id):
        return line_json.get("parent_id", False)
    #print("no parents????")
    return False

def get_node_json_by_node_id(input_node_id):
    line_json = infile_lines[str(input_node_id)]
    node_id = line_json.get("node_id", False)
    if str(node_id) == str(input_node_id):
        return line_json

def get_non_lineage_parent_mutations_for_node(input_node_id):
    ##print("\tGetting parent for", input_node_id)
    parent_node_id = get_parent_node_for_node_id(input_node_id)
    
    if not parent_node_id:
        #print("\tno parent found")
        return []

    #print("\tFound parent", parent_node_id)
    parent_json = get_node_json_by_node_id(parent_node_id)
    mutations_list = []
    
    if parent_json.get("clades", {}).get("pango", "") == "": ## if the pango lineage is blank
        #print("\trecursing")
        mutations_list = mutations_list + parent_json["mutations"] + get_non_lineage_parent_mutations_for_node(parent_node_id)
        return mutations_list
    else:
        #print("\tnot recursing")
        return mutations_list

In [None]:
result_dict = {}

## get amino acids and nucleotides for each lineage
for line in tqdm(infile_lines.values()):
    line_json = line
    pango_call = line_json.get("clades", {}).get("pango", False)
    if pango_call:
        if pango_call not in result_dict.keys(): ## get the first occurrence
            node_id = str(line_json["node_id"])
            
            ## we need to check the parent nodes
            ## if those parent nodes are not a defined PANGO lineage
            ## we need to include any associated mutations in the child node
            parent_mutations = get_non_lineage_parent_mutations_for_node(node_id)
            line_json_mutations = parent_mutations + line_json["mutations"]

            result_dict[pango_call] =  [mutation_list[idx] for idx in line_json_mutations] ## look up the ids to get the mutations
            #print("Using", line_json["name"], "( id", line_json["node_id"], ") for", pango_call)


In [None]:
temp_dict = {}

## remove amino acids
for k, v in result_dict.items():
    mut_list_nts = [x for x in v if x['gene'] == "nt"]
    if len(mut_list_nts):
        temp_dict[k] = mut_list_nts
    
result_dict = temp_dict

del temp_dict

In [None]:
## jsonify and write out
with open("../res/test_all_nt.js", "w") as outfile:
    outfile.write("var lineage_mutations_nts = ")
    outfile.write(json.dumps(result_dict, indent=4))