# Parser for PAS-X Data File
This notebook focuses on parsing data from PAS-X software to extract and model/organize high-cell density fermentation data.  

PAS-X file contains data about one project which contains several batches of different runs, and each run contains data of several features.   

The <b>primary objective</b> is to parse the PAS-X data file with configs files such as parser_config and run_config in order to generate document structures (JSON) for our graph, concretely node documents and edge documents such as: project, batch_cell_cultures, run, strain, species, initial_conditions and process_conditions.   

In [51]:
# 1. Install dependencies
#!pip install pandas
#!pip install pyarango

In [1]:
# 2. Import modules
import os
import yaml
import json
import hashlib
from functools import reduce
import operator
import pandas as pd

In [2]:
# 3. Define file paths
data_dir = '../data'
data_path = os.path.join(data_dir, "20240328_dataset for ambrDB_DDBproject.json")
config_dir = '../config'
config_path = os.path.join(config_dir, 'parser_config.yaml')
run_config_path = os.path.join(config_dir, 'run_config.yaml')
strain_mapping_path = os.path.join(data_dir, "strains_organism.csv")
output_dir = '../output'
nodes_path = os.path.join(output_dir, "fermentdb_nodes_full.json")
edges_path = os.path.join(output_dir, "fermentdb_edges_full.json")
imodulon_files = {'e_coli': {'precise1k': os.path.join(data_dir, 'iM_table.csv')}}

### 1. Load data and config files

In [3]:
# Load PASX data file
with open(data_path, 'r') as dbfile:
    data = json.load(dbfile)

data.keys()

dict_keys(['end', 'created_by', 'editable', 'unit_operations', 'workflow', 'creation_time', 'project', 'description', 'status', 'name', 'event_names', 'key_variable', 'id', 'type', 'result_notes', 'progress', 'country', 'manager', 'trending_settings', 'deletable', 'modification_time', 'batch_phase_names', 'tags', 'start', 'departments', 'editable_plots', 'sites', 'batches'])

In [4]:
# Load parser_config.yaml
with open(config_path, 'r') as cfile:
    config = yaml.load(cfile, Loader=yaml.SafeLoader)

config

{'Project': {'_key': 'project',
  'id': 'project',
  'name': 'name',
  'description': 'description',
  'progress': 'progress',
  'status': 'status',
  'deletable': 'deletable',
  'editable': 'editable',
  'type': 'type',
  'creation_time': 'creation_time',
  'tags': 'tags',
  'modification_time': 'modification_time',
  'start': 'start',
  'end': 'end'},
 'Country': {'_key': 'country', 'id': 'country', 'name': 'country'},
 'User': {'_key': 'manager/username',
  'id': 'manager/username',
  'name': 'manager/first_name',
  'surname': 'manager/last_name'},
 'Batch_cell_culture': {'_key': 'id', 'id': 'id', 'name': 'name'},
 'Run': {'batches': {'id': 'id',
   'name': 'name',
   'description': 'description',
   'creation_time': 'creation_time',
   'modification_time': 'modification_time',
   'batch_start': 'batch_start',
   'batch_end': 'batch_end',
   'first_timestamp': 'first_timestamp',
   'last_timestamp': 'last_timestamp',
   'variables': 'variables',
   'phases': 'phases'}}}

In [5]:
# Load run_config.yaml
with open(run_config_path, 'r') as cfile:
    rconfig = yaml.load(cfile, Loader=yaml.SafeLoader)

rconfig

{'Run': {'id': 'id',
  'Culture Type': 'culture_type',
  'Seed': 'seed',
  'Sample ID': 'sample_id',
  'Experiment': 'experiment',
  'Replicate #': 'replicate_number',
  'Strain Batch': 'strain_batch',
  'Container ID (calculated)': 'container_id',
  'Control?': 'is_control',
  'Comments': 'comments',
  'Inducer': 'inducer',
  'Volume Unit_INDUCTION': 'volume_unit_induction',
  'Condition': 'condition'},
 'Fermenter': {'Container Type': 'name'},
 'Initial_condition': {'SOP': 'SOP',
  'Aeration Gas Type': 'aeration_gas_type',
  'Aeration Profile': 'aeration_profile',
  'Maximum Aeration (slpm)': 'maximum_aeration_slpm',
  'Minimum Aeration (slpm)': 'minimum_aeration_slpm',
  'Maximum Stirring or Shaking Speed (rpm)': 'maximum_stirring_speed_rpm',
  'Minimum Stirring or Shaking Speed (rpm)': 'minimum_stirring_speed_rpm',
  'DO Control Setpoint (%)': 'DO_control_setpoint_percentage',
  'DO Control Cascade Level 1': 'DO_control_cascade_level1',
  'DO Control Cascade Level 2': 'DO_control_c

### 2. Create node collections dictionary structure
Create a dictionary structure for nodes collections with parser_config file and PASX data file. 

In [7]:
# Function Definitions

def get_from_nested_dict(data_dict:dict, map_list:list):
    '''
    Extracts value from a nested dictionary given a list of keys (different levels).

    parameters:
        data_dict (dict): dictionary structure 
        map_list (list): list of nested keys

    return:
        nested_value: nested value given the provided list of keys

    example:
        >>> data_dict = {'a': {'b': {'c': 5}}}
        >>> value = get_from_nested_dict(data_dict=data_dict, map_list=['a', 'b', 'c'])
        >>> print(value)
        5
    '''

    nested_value = None
    try:
        nested_value = reduce(operator.getitem, map_list, data_dict)
    except:
        pass

    return nested_value

def get_hash(key, prefix=""):
    '''
    Get a hash value for a given key:
    Hash a string using SHA-1, before encode key string into bytes using the UTF-8 encoding, as the sha1() expects bytes as input. 
    Convert the binary hash value into a hexadecimal string and then into an 8-digit integer.
    Convert it to a string and add a prefix.

    parameters:
        key (str): Input string to be hashed.
        prefix (str, optional): Prefix to prepend to the hash value. Defaults to "".

    return:
        str: Hash value.

    example:
        >>> input_key = "example_key"
        >>> get_hash(input_key, prefix="HASH_")
        'HASH_45200d86'
    '''
    hkey = str(int(hashlib.sha1(key.encode("utf-8")).hexdigest(),16) % (10 ** 8))
    hkey = f"{prefix}{hkey}"
    
    return hkey


def get_collections_from_pasx(data: dict, config: dict) -> dict:
    '''
    This function creates a dictionary with the node collections expected
    in FermentDB. It requires a dictionary with the data exported from PASX
    in json format, and a configuration file that specifies the mapping between
    PASX and FermentDB structure. See example in '/config/parser_config.yaml'

    parameters:
        data (dict): dictionary with the data read from PASX in json format
        config (dict): mapping configuration to adapt to FermentDB structure

    return:
        collections (dict): dictionary with the expected node objects in FermentDB

    example:
        >>> get_collections_from_pasx(data=pasx_json_dict, config=parser_config.yaml)
    '''
    collections = {}
    for c in config:
        collections[c] = {}
        # collections[c] = []
        for a in config[c]:
            if type(config[c][a]) != dict:
                key = config[c][a].split('/')
                value = get_from_nested_dict(data, key)
                if a == '_key':
                    value = str(value)
                    if c == 'Project':  
                        value = get_hash(value, prefix="P")
                    elif c == 'Batch_cell_culture':  
                        value = get_hash(value, prefix="B")
                    elif c == 'User':  
                        value = get_hash(value, prefix="U")
                    elif c == 'Country':  
                        value = get_hash(value, prefix="Co")
                collections[c].update({a: value})
                # collections[c].append({a: value})
            else:
                collections[c] = []
                for nested_data in data[a]:
                    nested_collection = {}
                    for nested_a in config[c][a]:
                        key = config[c][a][nested_a].split('/')
                        value = get_from_nested_dict(nested_data, key)
                        if nested_a == '_key':
                            value = str(value)
                        nested_collection.update({nested_a: value})
                    collections[c].append(nested_collection)
                    
    return collections

In [8]:
# Create node collections 
collections = get_collections_from_pasx(data, config)

# print(collections["Project"])
# print(collections["User"])
# print(collections["Country"])
# print(collections["Batch_cell_culture"])

### 3. Create edges collection and update node collections
- Create a dictionary for edges collections.
- Extract variables and phases from Run Collection and create node collections: initial_conditions, process_condition, Strain and phase_event. 

In [9]:
# Function Definitions

def get_unique_json_from_list_of_dicts(d, unique_key='id'):
    '''
    Get a list of unique dictionaries based on a specified key.
    
    parameters:
        d (list): list of dictionaries.
        unique_key (str, optional): Key to determine uniqueness. Default to 'id'.
    
    return: 
        list: list of unique dictionaries. 
    
    example:
        >>> example_list = [
        ...     {"id": 1, "name": "Alice"},
        ...     {"id": 2, "name": "Bob"},
        ...     {"id": 1, "name": "Charlie"},  # Duplicate id
        ...     {"id": 3, "name": "Alice"}     # Duplicate name
        ... ]
        >>> get_unique_json_from_list_of_dicts(example_list)
        #Intermediate step:
        # {1: {"id": 1, "name": "Alice"}, 2: {"id": 2, "name": "Bob"}, 3: {"id": 3, "name": Alice}}.
        [{'id': 1, 'name': 'Alice'}, {'id': 2, 'name': 'Bob'}, {'id': 3, 'name': 'Alice'}]
    '''
    unique_list = list({v[unique_key]:v for v in d}.values())
    
    return unique_list

def get_run_conditions(collections, rconfig):
    '''
    Update node collections structure with nodes: initial condition, process conditions, fermenter, and strain.
    Generate edges structure with collections: has_initial_condition, has_condition, cultures_strain, and uses_fermenter.  
    
    parameter:
    - collections (dict): A dictionary containing collections of data.

    returns:
    - edges (dict): A dictionary containing edges data representing the relationships between runs, conditions, and strain.
    '''
    iconditions_collection = []
    pconditions_collection = []
    fermenter_collection = []
    strain_collection = []
    edges = {'has_initial_condition': {'edges':[],
                                        'from_collection': ['Run'],
                                        'to_collection': ['Initial_condition']},
                'has_condition': {'edges': [],
                                'from_collection': ['Run'],
                                'to_collection': ['Process_condition']},
                'has_measured_imodulon': {'edges':[],
                              'from_collection': ['Run'],
                              'to_collection':['iModulon']},
                'cultures_strain': {'edges': [],
                                'from_collection': ['Run'],
                                'to_collection': ['Strain']},
                'uses_fermenter': {'edges': [],
                                'from_collection': ['Run'],
                                'to_collection': ['Fermenter']},
            }
    
    for run in collections['Run']:
        run["_key"] = run["name"]+"_"+str(run['id'])
        run["_key"] = get_hash(run["_key"], prefix="R")
        for variable in run['variables']:
            variable["_key"] = str(variable['name'])
            variable["_key"] = get_hash(variable["_key"], prefix="C")
            data = variable.pop('data')
            timestamps = variable.pop('timestamps')
            unit = variable.pop('unit')
            _ = variable.pop('categorical_data')
            _ = variable.pop('raw_data')
            _ = variable.pop('datetime_data')
            _ = variable.pop('errors')
            
            if variable['name'] in rconfig['Run']:
                if type(data) == list:
                    data = data[0][0]
                if variable['name'] == "Strain Batch":
                    strain = '_'.join(data.split('-')[:1])
                    strain_key = get_hash(strain, prefix="S")
                    strain_collection.append({'_key': strain_key,
                                            'name': strain,
                                            'rank': 'strain'})
                    edges['cultures_strain']['edges'].append({'_from': f"Run/{run['_key']}",
                                                            '_to': f'Strain/{strain_key}',
                                                            'strain_batch': data})
                run.update({rconfig['Run'][variable['name']]: data}) # Add to edge cultures strain and delete?
            elif variable['name'] in rconfig['Fermenter']:
                variable["_key"] = get_hash(data, prefix="F")
                fermenter_collection.append({'_key': variable["_key"],
                                             'name': data})
                edges['uses_fermenter']['edges'].append({'_from': f"Run/{run['_key']}",
                           '_to': f"Fermenter/{variable['_key']}"})
            elif variable['name'] in rconfig['Initial_condition']:
                iconditions_collection.append(variable)
                edges['has_initial_condition']['edges'].append({'_from': f"Run/{run['_key']}",
                           '_to': f"Initial_condition/{variable['_key']}",
                           'data': data,
                           'unit': unit})
            elif '_RNAseq' in variable['name']:
                if not all(v == 0 for v in data):
                    variable['name'] = ' '.join(variable['name'].replace('_RNAseq', '').split('_'))
                    variable["_key"] = get_hash(variable["name"], prefix="iM")
                    edges['has_measured_imodulon']['edges'].append({'_from': f"Run/{run['_key']}",
                                                           '_to': f"iModulon/{variable['_key']}",
                                                           'data': data,
                                                           'timestamps': timestamps})
            else:
                pconditions_collection.append(variable)
                if not all(v == 0 for v in data):
                    edges['has_condition']['edges'].append({'_from': f"Run/{run['_key']}",
                            '_to': f"Process_condition/{variable['_key']}",
                            'data': data,
                            'unit': unit,
                            'timestamps': timestamps})
        del run['variables']
    
    iconditions_collection = get_unique_json_from_list_of_dicts(d=iconditions_collection, 
                                                                    unique_key='_key')
    pconditions_collection = get_unique_json_from_list_of_dicts(d=pconditions_collection, 
                                                                    unique_key='_key')
    fermenter_collection = get_unique_json_from_list_of_dicts(d=fermenter_collection, 
                                                                    unique_key='_key')
    strain_collection = get_unique_json_from_list_of_dicts(d=strain_collection, 
                                                                    unique_key='_key')
    collections['Initial_condition'] = iconditions_collection
    collections['Process_condition'] = pconditions_collection
    collections['Fermenter'] = fermenter_collection
    collections['Strain'] = strain_collection
    
    return edges

def get_run_phases(collections):
    '''
    Retrieves phases data from Run collection and create Phase_event node collection and has_phase edge collection.  

    parameter:
    - collections (dict): A dictionary containing collections of data.

    returns:
    - dict: A dictionary containing edges data representing the relationships between runs and phases.
    '''
    phases_collection = []
    edges = {'has_phase': {'edges':[],
                           'from_collection': ['Run'],
                           'to_collection':['Phase_event']}}

    for run in collections['Run']:
        for phase in run['phases']:
            phase["_key"] = phase['name']
            phase["_key"] = get_hash(phase["_key"], prefix="PH")
            attributes = {"event_start": phase.pop("start"),
                          "event_end": phase.pop("end"),
                          "comment": phase.pop("comment"),
                          "created_by": phase.pop("created_by"),
                          "start": phase.pop("relative_start"),
                          "end": phase.pop("relative_end")}
            attributes.update({'_from': f"Run/{run['_key']}",
                           '_to': f"Phase_event/{phase['_key']}"})
            phases_collection.append(phase)
            edges['has_phase']['edges'].append(attributes)
            phases_collection = get_unique_json_from_list_of_dicts(d=phases_collection, 
                                                                    unique_key='_key')
        del run['phases']
    
    collections['Phase_event'] = phases_collection

    return edges
    

def generate_project_edges(collections, edges):
    '''
    Generate edges between collections: has_batch, created_by, and has_run; between nodes: user, project and batch_cell_culture nodes collections.
    
    parameter: 
        collections (dict): A dictionary containing collections as keys and their corresponding documents as values.
        edges(dict): A dictionary representing edges between collections.
    
    returns:
        None: This function updates the 'edges' dictionary in place.
    
    example:
        collections = {
        'Project': {'_key': 'project_key', 'creation_time': '2024-05-01T12:00:00Z'},
        'User': {'_key': 'user_key'},
        'Batch_cell_culture': {'_key': 'batch_key'},
        'Run': [{'_key': 'run_key1'}, {'_key': 'run_key2'}, ...]
        }
        edges = {}
        generate_project_edges(collections, edges)
        # edges will be updated with edges between collections.
    '''
   
    # edges.update({'has_batch': {'edges':[], 
    #                             'from_collection': ['Project'],
    #                             'to_collection':['Batch_cell_culture']},
    #             'created_by': {'edges':[],
    #                             'from_collection': ['Project'],
    #                             'to_collection':['User']},
    #             'has_run': {'edges':[], 
    #                         'from_collection': ['Batch_cell_culture'],
    #                         'to_collection':['Run']}
    #                             })
    
    # for user in collections['User']:
    #     for batch in collections['Batch_cell_culture']:
    #         for project in collections['Project']:
    #             if '_key' in project:
    #                 edges['has_batch']['edges'].append({'_from': f"Project/{project['_key']}",
    #                                                     '_to': f"Batch_cell_culture/{batch['_key']}"})
    #                 edges['created_by']['edges'].append({'_from': f"Project/{project['_key']}",
    #                                                     '_to': f"User/{user['_key']}"})
    project = collections['Project']
    user = collections['User']
    batch = collections['Batch_cell_culture']

    edges.update({'has_batch': {'edges':[{'_from': f"Project/{project['_key']}",
                                          '_to': f"Batch_cell_culture/{batch['_key']}"}], 
                                'from_collection': ['Project'],
                                'to_collection':['Batch_cell_culture']},
                  'created_by': {'edges':[{'_from': f"Project/{project['_key']}",
                                          '_to': f"User/{user['_key']}",
                                          'creation_date': project['creation_time']}],
                                'from_collection': ['Project'],
                                'to_collection':['User']}})
    edges.update({'has_run': {'edges':[], 
                              'from_collection': ['Batch_cell_culture'],
                                'to_collection':['Run']}})
    for run in collections['Run']:
        edges['has_run']['edges'].append({'_from': f"Batch_cell_culture/{batch['_key']}",
                                          '_to': f"Run/{run['_key']}"})

### 4. Create iModulon node and edges collection

In [10]:
#  Define iModulon function 
def get_imodulon_collection(organism, dataset, table_path):
    '''
    Process iModulon data to generate a collection for a given organism and dataset. 
    - Reads data from a CSV file into a pandas DataFrame. 
    - Adds additional fields such as a key column from hashing name column, and creates linkout column by constructing URLs to generate individual links for each iModulon entry.
    - Processes dataset by handling misisng values and dropping the 'k' column as it is no loger needed (values areincorporated into the linkout column) 

    parameters:
        organism (str): The name of the organism (e.g., 'e_coli').
        dataset (str): The name of the dataset (e.g., 'precise1k').
        table_path (str): The file path to the CSV file containing the iModulon data (e.g., '/path/to/iM_table.csv').

    return:
        data_dict: A list of dictionaries representing processed iModulon data
    '''
    imodulon_link = f'https://imodulondb.org/iModulon.html?organism={organism}&dataset={dataset}&k='
    data = pd.read_csv(table_path, sep=',', header=0)
    data['_key'] = data['name'].apply(lambda n: get_hash(n, prefix="iM"))
    data['linkout'] = data['k'].apply(lambda k: imodulon_link+str(k))
    data = data.fillna('NaN').drop('k', axis=1)
    data_dict = data.to_dict(orient='records')
    return data_dict

In [11]:
# Generate iModulon collection
collections['iModulon'] = []
for organism in imodulon_files:
    for dataset in imodulon_files[organism]:
        imodulon_path = imodulon_files[organism][dataset]
        collections['iModulon'].extend(get_imodulon_collection(organism=organism, dataset=dataset, table_path=imodulon_path))

In [13]:
# Create edges with conditions collections, phases collections and project edges

edges = get_run_conditions(collections, rconfig)
edges.update(get_run_phases(collections))
generate_project_edges(collections, edges)

### 5. Create species node and edge collections.


In [14]:
with open(strain_mapping_path, 'r') as strain_file:
    strain_mapping = pd.read_csv(strain_file, sep=',')

# From df to dict
strain_mapping = {r[0]: {'name': r[1], 'taxid':r[2]} for i,r in strain_mapping.iterrows()}


  strain_mapping = {r[0]: {'name': r[1], 'taxid':r[2]} for i,r in strain_mapping.iterrows()}


In [15]:
# Generate a Species collection: nodes and edges
species = []

# Create edge 'belongs_to'
if 'belongs_to' not in edges:
    edges['belongs_to'] = {
        'edges': [],
        'from_collection': ['Strain'],
        'to_collection': ['Species']
    }

for strain in collections['Strain']:
    strain_key = strain['_key']
    name = strain['name']
    if name in strain_mapping:
        organism = strain_mapping[name]['name']
        taxid = str(strain_mapping[name]['taxid'])
        species.append({
            '_key': taxid,
            'name': organism,
            'rank': 'species'
        })
        edges['belongs_to']['edges'].append({
            '_from': f'Strain/{strain_key}',
            '_to': f'Species/{taxid}'
        })

collections['Species'] = get_unique_json_from_list_of_dicts(d=species,
                                                            unique_key='_key')                               

### 6. Create institution node collection and edges collection

In [16]:
# Generate an Institution node collection with a document at its edges
if 'Institution' not in collections:
        institution_key = get_hash("NNFCB", prefix="I")
        collections['Institution'] = [{'_key': institution_key,
                                'name': 'NNFCB - Novo Nordisk Foundation Center for Biosustainability (DTU Biosustain)',
                                'address': 'Building 220, Kemitorvet. 2800 Kgs. Lyngby',
                                'email': 'biosustain@biosustain.dtu.dk',
                                'phone_number':'+45 45 25 80 00'
                                }]

project = collections['Project']
user = collections['User']
country = collections['Country']

edges.update({'created_at': {'edges':[{'_from': f"Project/{project['_key']}",
                                          '_to': f"Institution/{institution_key}"}], 
                                'from_collection': ['Project'],
                                'to_collection':['Institution']},
                'works_at': {'edges':[{'_from': f"User/{user['_key']}",
                                          '_to': f"Institution/{institution_key}"}],
                                'from_collection': ['User'],
                                'to_collection':['Institution']},
                'from': {'edges':[{'_from': f"Institution/{institution_key}",
                                          '_to': f"Country/{country['_key']}",
                                          'creation_date': project['creation_time']}],
                                'from_collection': ['Institution'],
                                'to_collection':['Country']}})

# ## Create 'created_at' edge
# if 'created_at' not in edges:
#         edges['created_at'] = {'edges': [],
#                                 'from_collection': ['Project'],
#                                 'to_collection': ['Institution']}

# for project in collections['Project']:
#         print(f"Project: {project}")
#         project_key = int(project["_key"])
#         edges['created_at']['edges'].append({
#                 '_from': f"Project/{project_key}",
#                 '_to': f"Institution/{institution_key}"
#         })

# ## Create 'works_at' edge
# if 'works_at' not in edges:
#         edges['works_at'] = {'edges':[], 
#                                 'from_collection': ['User'],
#                                 'to_collection': ['Institution']}

# for user in collections['User']:
#         user_key = int(user["_key"])
#         edges['works_at']['edges'].append({
#                 '_from': f"User/{user_key}",
#                 '_to': f"Institution/{institution_key}"
#         })



# ## Create 'from' edge
# if 'from' not in edges:
#         edges['from'] = {'edges': [],
#                                 'from_collection': ['Institution'],
#                                 'to_collection': ['Country']}

# for country in collections['Country']:
#         country_key = int(country["_key"])
#         edges['from']['edges'].append({
#                 '_from': f"Institution/{institution_key}",
#                 '_to': f"Country/{country_key}"
#         })




### 7. Output nodes and edges collections as two JSON files

In [17]:
# Output nodes collections 
os.makedirs(os.path.dirname(nodes_path), exist_ok = True)
nodes_str = json.dumps(collections)
with open(nodes_path, 'w') as out:
    out.write(nodes_str)

In [18]:
# Output edges collections
os.makedirs(os.path.dirname(edges_path), exist_ok = True)
edges_str = json.dumps(edges)
with open(edges_path, 'w') as out:
    out.write(edges_str)