# Resolve Schema

**input**
```
+schema_path: str
```

**methods**
```
+read_json(schema_path: str): dict
+split_json(schema: dict): list
+resolver(entity1: dict, entity2: dict): dict
+resolve_defs(terms: dict, defs: dict) : dict
+ node_order(schema: dict): list
+resolve_nodes(nodeList: list, splitJsonList: list): list
+recombine_nodes(resolvedList: list) : dict
```

In [47]:
def find_fk(data: dict) -> str:
    for key, value in data.items():
        if isinstance(value, dict) and 'submitter_id' in value:
            return key
    raise ValueError("Foreign key not found in the provided data.")

data = {
    "alternate_timepoint": None,
    "baseline_timepoint": True,
    "freeze_thaw_cycles": "The samples were freezer thawed not all the same number of times, due to the different volumes that were in the tubes.",
    "volume_or_mass": "150ul",
    "sample_collection_method": "blood draw",
    "sample_id": 101,
    "sample_in_preservation": "frozen",
    "sample_in_storage": "yes",
    "sample_provider": "Baker",
    "sample_source": None,
    "sample_storage_method": "frozen, -80C freezer",
    "sample_type": "plasma",
    "storage_location": "Baker",
    "type": "sample",
    "key_fk": "subject-example-990910001",
    "key_pk": "sample-example-0000101",
    "subjects": {
        "submitter_id": "subject-example-990910001"
    },
    "submitter_id": "sample-example-0000101"
}

find_fk(data)

'subjects'

In [90]:
import json
from collections import defaultdict, deque
import os
import pandas as pd
from pydantic import BaseModel, create_model, ValidationError
from typing import Dict, Any, List

class ResolveSchema:
    
    def __init__(self, schema_path: str):
        """
        Initialize the ResolveSchema class.

        Parameters:
        - schema_path (str): The path to the JSON schema file.
        """
        self.schema_path = schema_path
        self.schema = self.read_json(self.schema_path)
        self.nodes = self.get_nodes()
        self.node_pairs = self.get_all_node_pairs()
        self.node_order = self.get_node_order(edges=self.node_pairs)
        self.schema_list = self.split_json()
        self.schema_def = self.return_schema("_definitions.yaml")
        self.schema_term = self.return_schema("_terms.yaml")
        self.schema_def_resolved = self.resolve_references(self.schema_def, self.schema_term)
        self.schema_list_resolved = self.resolve_all_references()
        
    def read_json(self, path: str) -> dict:
        """
        Read a JSON file and return its contents as a dictionary.

        Parameters:
        - path (str): The path to the JSON file.

        Returns:
        - dict: The contents of the JSON file.
        """
        with open(path) as f:
            return json.load(f)
    
    def get_nodes(self) -> list:
        """
        Retrieve all node names from the schema.

        Returns:
        - list: A list of node names.
        """
        nodes = list(self.schema.keys())
        return nodes
    
    def get_node_link(self, node_name: str) -> tuple:
        """
        Retrieve the links and ID for a given node.

        Parameters:
        - node_name (str): The name of the node.

        Returns:
        - tuple: A tuple containing the node ID and its links.
        """
        links = self.schema[node_name]['links']
        node_id = self.schema[node_name]['id']
        if 'subgroup' in links[0]:
            return node_id, links[0]['subgroup']
        else:
            return node_id, links

    def find_upstream_downstream(self, node_name: str) -> list:
        """
        Takes a node name and returns the upstream and downstream nodes.

        Parameters:
        - node_name (str): The name of the node.

        Returns:
        - list: A list of tuples representing upstream and downstream nodes.
        """
        node_id, links = self.get_node_link(node_name)
        
        # Ensure links is a list
        if isinstance(links, dict):
            links = [links]

        results = []

        for link in links:
            target_type = link.get("target_type")
            
            if not node_id or not target_type:
                print("Missing essential keys in link:", link)
                results.append((None, None))
                continue

            results.append((target_type, node_id))

        return results

    def get_all_node_pairs(self, excluded_nodes=["_definitions.yaml", "_terms.yaml", "_settings.yaml", "program.yaml"]) -> list:
        """
        Retrieve all node pairs, excluding specified nodes.

        Parameters:
        - excluded_nodes (list): A list of node names to exclude.

        Returns:
        - list: A list of node pairs.
        """
        node_pairs = []
        for node in self.nodes:
            if not node in excluded_nodes:
                node_pairs.extend(self.find_upstream_downstream(node))
            else:
                continue
        return node_pairs
    
    def get_node_order(self, edges: list) -> list:
        """
        Determine the order of nodes based on their dependencies.

        Parameters:
        - edges (list): A list of tuples representing node dependencies.

        Returns:
        - list: A list of nodes in topological order.
        """
        # Build graph representation
        graph = defaultdict(list)
        in_degree = defaultdict(int)

        for upstream, downstream in edges:
            graph[upstream].append(downstream)
            in_degree[downstream] += 1
            if upstream not in in_degree:
                in_degree[upstream] = 0

        # Perform Topological Sorting (Kahn's Algorithm)
        sorted_order = []
        zero_in_degree = deque([node for node in in_degree if in_degree[node] == 0])

        while zero_in_degree:
            node = zero_in_degree.popleft()
            sorted_order.append(node)
            
            for neighbor in graph[node]:
                in_degree[neighbor] -= 1
                if in_degree[neighbor] == 0:
                    zero_in_degree.append(neighbor)

        # Ensure core_metadata_collection is last
        sorted_order.remove("core_metadata_collection")
        sorted_order.append("core_metadata_collection")

        return sorted_order
    
    def split_json(self) -> list:
        """
        Split the schema into a list of individual node schemas.

        Returns:
        - list: A list of node schemas.
        """
        schema_list = []
        for node in self.nodes:
            schema_list.append(self.schema[node])
        return schema_list
    
    def return_schema(self, target_id: str) -> dict:
        """
        Retrieves the first dictionary from a list where the 'id' key matches the target_id.

        Parameters:
        - target_id (str): The value of the 'id' key to match.

        Returns:
        - dict: The dictionary that matches the target_id, or None if not found.
        """
        if target_id.endswith('.yaml'):
            target_id = target_id[:-5]
        
        result = next((item for item in self.schema_list if item.get('id') == target_id), None)
        if result is None:
            print(f"{target_id} not found")
        return result
    
    def resolve_references(self, schema: dict, reference: dict) -> dict:
        """
        Takes a gen3 jsonschema draft 4 as a dictionary and recursively resolves any references using a reference schema which has no references.

        Parameters:
        - schema (dict): The JSON node to resolve references in.
        - reference (dict): The schema containing the references.

        Returns:
        - dict: The resolved JSON node with references resolved.
        """
        ref_input_content = reference

        def resolve_node(node, manual_ref_content=ref_input_content):
            if isinstance(node, dict):
                if '$ref' in node:
                    ref_path = node['$ref']
                    ref_file, ref_key = ref_path.split('#')
                    ref_file = ref_file.strip()
                    ref_key = ref_key.strip('/')
                
                    # if a reference file is in the reference, load the pre-defined reference, if no file exists, then use the schema itself as reference
                    if ref_file:
                        ref_content = manual_ref_content
                    else:
                        ref_content = schema
                    
                    for part in ref_key.split('/'):
                        ref_content = ref_content[part]

                    resolved_content = resolve_node(ref_content)
                    # Merge resolved content with the current node, excluding the $ref key
                    return {**resolved_content, **{k: resolve_node(v) for k, v in node.items() if k != '$ref'}}
                else:
                    return {k: resolve_node(v) for k, v in node.items()}
            elif isinstance(node, list):
                return [resolve_node(item) for item in node]
            else:
                return node

        return resolve_node(schema)
    
    def resolve_all_references(self) -> list:
        """
        Resolves references in all other schema dictionaries using the resolved definitions schema.

        Returns:
        - list: A list of resolved schema dictionaries.
        """
        resolved_schema_list = []
        for node in self.nodes:
            
            if node == "_definitions.yaml" or node == "_terms.yaml":
                continue
            
            try:
                resolved_schema = self.resolve_references(self.schema[node], self.schema_def_resolved)
                resolved_schema_list.append(resolved_schema)
                print(f"Resolved {node}")
            except KeyError as e:
                print(f"Error resolving {node}: Missing key {e}")
            except Exception as e:
                print(f"Error resolving {node}: {e}")
                
        return resolved_schema_list
    
    
    def return_resolved_schema(self, target_id: str) -> dict:
        """
        Retrieves the first dictionary from a list where the 'id' key matches the target_id.

        Parameters:
        - target_id (str): The value of the 'id' key to match.

        Returns:
        - dict: The dictionary that matches the target_id, or None if not found.
        """
        if target_id.endswith('.yaml'):
            target_id = target_id[:-5]
        
        result = next((item for item in self.schema_list_resolved if item.get('id') == target_id), None)
        if result is None:
            print(f"{target_id} not found")
        return result
    
    
    
#### Parse Data Class

class ParseData():
    """Parses a json data into a list of dictionaries"""
    
    def __init__(self, data_folder_path: str = None, data_file_path: str = None, link_suffix: str = 's'):
        self.folder_path = data_folder_path
        self.file_path = data_file_path
        self.file_path_list = self.list_data_files()
        self.data_dict = self.load_json_data(self.file_path_list)
        self.data_nodes = self.get_node_names()
        self.link_suffix = link_suffix
        
        
        
    def read_json(self, path: str) -> dict:
        with open(path) as f:
            data = json.load(f)
            return data
    
    
    def list_data_files(self) -> list:
        """
        Lists all JSON data files in the specified folder or returns the single file path.

        This method checks if a folder path is provided. If so, it lists all files in the folder
        that have a '.json' extension and returns their absolute paths. If no folder path is provided,
        it returns the single file path specified during initialization.

        Returns:
        - list: A list of absolute file paths to JSON files.
        """
        if self.folder_path:
            json_paths = [os.path.abspath(os.path.join(self.folder_path, f)) for f in os.listdir(self.folder_path) if f.endswith(".json")]
        else:
            json_paths = [self.file_path]
        return json_paths
        
        
    def load_json_data(self, json_paths: list) -> list:
        """
        Loads JSON data from a list of file paths.

        This method reads each JSON file specified in the json_paths list,
        loads the data, and appends it to a list of JSON objects.

        Parameters:
        - json_paths (list): A list of file paths to JSON files.

        Returns:
        - list: A list of dictionaries containing the data from each JSON file.
        """
        json_files = {}
        for file in json_paths:
            json_data = self.read_json(file)
            

            
            file_basename = os.path.basename(file)
            file_basename = file_basename.replace('.json', '')
            if 'submitter_id' in json_data:
                json_data[f"{file_basename}{self.link_suffix}"] = json_data['submitter_id']
            json_files[file_basename] = json_data
            print(f"Loaded {file}")
        
        return json_files
    
    
    def get_node_names(self) -> list:
        """
        Retrieves the names of nodes from the JSON files.

        This method iterates over the list of file paths and extracts the node names
        by removing the '.json' extension from each file name.

        Returns:
        - list: A list of node names extracted from the JSON file paths.
        """
        node_names = []
        for node in self.file_path_list:
            if node.endswith('.json'):
                last_item = os.path.basename(node)
                node_names.append(last_item[:-5])
            else:
                node_names.append(node)
        return node_names
    
    
    def return_data(self, node: str) -> dict:
        """
        Retrieves data for a specified node.

        This method accesses the data dictionary and returns the data
        associated with the given node name.

        Parameters:
        - node (str): The name of the node for which data is to be retrieved.

        Returns:
        - dict: A dictionary containing the data for the specified node.
        """
        return self.data_dict[node]
            

class ParseXlsxMetadata():
    """
    Converts a specified sheet from the metadata dictionary to a JSON file. 
    Also formats and renames the primary and foreign keys into a gen3 compatible format
    """
    def __init__(self, xlsx_path: str, link_suffix: str = 's'):
        self.xlsx_path = xlsx_path
        self.xlsx_data_dict = self.parse_metadata_template()
        self.sheet_names = self.get_sheet_names()
        self.link_suffix = link_suffix
        
    def parse_metadata_template(self) -> dict:
        """
        Parses an Excel file and converts each sheet into a DataFrame.

        This function reads an Excel file specified by the `xlsx_path` and loads each sheet
        into a dictionary where the keys are the sheet names and the values are the DataFrames
        representing the data in those sheets. The first row of each DataFrame is removed.

        Args:
        - xlsx_path (str): The path to the Excel file to be parsed.

        Returns:
        - dict: A dictionary where each key is a sheet name and each value is a DataFrame
        containing the data from that sheet, with the first row removed.
        """
        # load xlsx file
        pd_dict = pd.read_excel(self.xlsx_path, sheet_name=None)

        # in each pandas data fram in the dict, remove the first row
        for key in pd_dict.keys():
            pd_dict[key] = pd_dict[key].iloc[1:, :]

        return pd_dict
    
    
    def get_sheet_names(self) -> list:
        return list(self.xlsx_data_dict.keys())
    
    
    def get_pk_fk_pairs(self, sheet_name: str) -> tuple:
        """
        Extracts the primary key (PK) and foreign key (FK) column names from a specified sheet.

        This method retrieves the first two column names from the given sheet in the Excel data dictionary,
        assuming the first column is the primary key and the second column is the foreign key.

        Args:
        - sheet_name (str): The name of the sheet from which to extract the PK and FK.

        Returns:
        - tuple: A tuple containing the primary key and foreign key column names.
        """
        sheet = self.xlsx_data_dict[sheet_name]
        first_two_columns = sheet.columns[:2].tolist()
        pk = first_two_columns[0]
        fk = first_two_columns[1]
        return pk, fk
    
    
    def pd_to_json(self, sheet_name: str, json_path: str) -> None:
        """
        Converts a specified sheet from the metadata dictionary to a JSON file. 
        Also formats and renames the primary and foreign keys into a gen3 compatible format

        Args:
        - metadata (dict): A dictionary where each key is a sheet name and each value is a DataFrame.
        - sheet_name (str): The name of the sheet to convert to JSON.
        - json_path (str): The path to the JSON file to be saved.

        Returns:
        - None
        """
        
        pk, fk = self.get_pk_fk_pairs(sheet_name)

        df = self.xlsx_data_dict[sheet_name]
        df['type'] = sheet_name # add node / entity name
        fk_name = fk.split('_uid')[0] #getting foreign key node name
        
        df['key_fk'] = df[fk].tolist() #creating var for fk
        df['key_pk'] = df[pk].tolist() #creating var for pk
        
        df[f"{fk_name}{self.link_suffix}"] = df[fk].apply(lambda x: {"submitter_id": x}) # format foreign key
        df_cleaned = df.where(pd.notnull(df), None) # removing NaN values
        df_cleaned['submitter_id'] = df_cleaned[pk].tolist() # adding primary key as submitter_id key
        df_cleaned = df_cleaned.loc[:, ~df_cleaned.columns.str.endswith('_uid')] # removing _uid columns
        data_list = df_cleaned.to_dict(orient='records') # converting to dict
        

        with open(json_path, 'w') as f:
            json.dump(data_list, f)

        
class TestLinkage(ResolveSchema, ParseData):
    def __init__(self, schema_path, data_folder_path: str = None, data_file_path: str = None, link_suffix: str = 's'):
        super().__init__(schema_path)
        self.data_inst = ParseData(data_folder_path, data_file_path, link_suffix=link_suffix)
        self.data_dict = self.data_inst.data_dict
        self.link_suffix = self.data_inst.link_suffix
        self.data_nodes = self.data_inst.data_nodes
        self.linkage_config = self.generate_config(self.data_dict)
        self.link_validation_results = self.validate_links(data_map=self.data_dict, config=self.linkage_config)
        
        
    def _find_fk(self, data: dict) -> str:
        for key, value in data.items():
            if isinstance(value, dict) and 'submitter_id' in value:
                return key
        return None
        

    def generate_config(self, data_map, link_suffix: str = 's') -> dict:
        config = {}
        for node, data in data_map.items():
            # print(node)
            fk = find_fk(data[0])
            if fk:
                config[node] = {"primary_key": f"{node}{link_suffix}", "foreign_key": f"{fk}"}
            if fk is None:
                config[node] = {"primary_key": f"{node}{link_suffix}", "foreign_key": None}
        return config



    def generate_models(self, config: Dict[str, Any]) -> Dict[str, type]:
        """
        Dynamically generate Pydantic models based on a configuration dictionary.

        Args:
            config (dict): A dictionary defining the entities and their fields.

        Returns:
            dict: A dictionary of dynamically created Pydantic models.
        """
        models = {}
        
        for entity_name, entity_config in config.items():
            fields = {}
            
            # Add primary key field
            pk = entity_config.get("primary_key")
            if pk:
                fields[pk] = (str, ...)  # Primary key is required
            
            # Add foreign key field
            fk = entity_config.get("foreign_key")
            if fk:
                fields[fk] = (str, None)  # Foreign key is optional
            
            # Create the model dynamically
            models[entity_name] = create_model(entity_name.capitalize(), **fields)
        
        return models


    def test_config_links(self, config_map: Dict[str, Any]) -> dict:
        broken_links = {}

        for key, value in config_map.items():
            fk = value['foreign_key']
            
            # Check if fk of the current key matches with the primary key of any of the other entities
            match_found = any(fk == v['primary_key'] for k, v in config_map.items() if k != key)
            
            if not match_found and fk is not None:
                broken_links[key] = fk
        
        if len(broken_links) == 0:
            print("Config Map Validated")
            return "valid"
        elif len(broken_links) > 0:
            print("Broken Config Map Links ('entity': 'foreign_key')")
            return broken_links
        


    def get_foreign_keys(self, data_map: Dict[str, List[Dict[str, Any]]], config: Dict[str, Any]) -> dict:
        """
        Uses the config to read the entity data from the data_map, and then uses the FK key outlined in the config to find the foreign key values. 
        Args:
            data_map (Dict[str, List[Dict[str, Any]]]): The data map containing the entity data
            config (Dict[str, Any]): The config dictionary

        Returns:
            dict: dictionary of entities and their foreign key values
        """
        fk_entities = {}
        
        for config_entity, config_keys in config.items():
            entity_data = data_map[config_entity]
            records_list = []
            
            for record in entity_data:
                fk = record.get(config_keys['foreign_key'])
                if fk:
                    if 'submitter_id' in fk:
                        records_list.append(fk['submitter_id'])
                    else:
                        records_list.append(fk)

            fk_entities[config_entity] = records_list
        # print("Foreign Key Entities ('entity': 'foreign_keys'):")
        return fk_entities
        
        
    def get_primary_keys(self, data_map: Dict[str, List[Dict[str, Any]]], config: Dict[str, Any]) -> dict:
        """
        Uses the config to read the entity data from the data_map, and then uses the PK key outlined in the config to find the primary key values. 
        Args:
            data_map (Dict[str, List[Dict[str, Any]]]): The data map containing the entity data
            config (Dict[str, Any]): The config dictionary

        Returns:
            dict: dictionary of entities and their primary key values
        """
        pk_entities = {}
        
        for config_entity, config_keys in config.items():
            entity_data = data_map[config_entity]
            records_list = []
            
            for record in entity_data:
                pk = record.get(config_keys['primary_key'])
                if pk:
                    if 'submitter_id' in pk:
                        records_list.append(pk['submitter_id'])
                    else:
                        records_list.append(pk)

            pk_entities[config_entity] = records_list
        # print("Primary Key Entities ('entity': 'primary_keys'):")
        return pk_entities
        
        
    def validate_links(self, data_map: Dict[str, List[Dict[str, Any]]], config: Dict[str, Any]) -> Dict[str, List[str]]:
        """
        Verifies Config file, then extracts primary and foreign key values from the data map. Then uses the foreign key values to validate the primary key values.

        Args:
            data_map (Dict[str, List[Dict[str, Any]]]): Contains the data for each entity
            config (Dict[str, Any]): The entity linkage config

        Returns:
            Dict[str, List[str]]: Dictionary of entities and their validation results
        """
        
        # validating config map
        valid_config = self.test_config_links(config)
        if valid_config != "valid":
            print("Invalid Config Map")
            print(config)
            return valid_config
        
        fk_entities = self.get_foreign_keys(data_map, config)
        pk_entities = self.get_primary_keys(data_map, config)
        
        validation_results = {}
        for entity, fk_values in fk_entities.items():
            invalid_keys = [fk for fk in fk_values if all(fk not in pk_values for pk_values in pk_entities.values())]
            validation_results[entity] = invalid_keys
            print(f"Entity '{entity}' has {len(invalid_keys)} invalid foreign keys: {invalid_keys}")
        return validation_results
        
        

In [88]:


def find_fk(data: dict) -> str:
    for key, value in data.items():
        if isinstance(value, dict) and 'submitter_id' in value:
            return key
    return None
    

def generate_config(data_map, link_suffix: str = 's') -> dict:
    config = {}
    for node, data in data_map.items():
        # print(node)
        fk = find_fk(data[0])
        if fk:
            config[node] = {"primary_key": f"{node}{link_suffix}", "foreign_key": f"{fk}"}
        if fk is None:
            config[node] = {"primary_key": f"{node}{link_suffix}", "foreign_key": None}
    return config



def generate_models(config: Dict[str, Any]) -> Dict[str, type]:
    """
    Dynamically generate Pydantic models based on a configuration dictionary.

    Args:
        config (dict): A dictionary defining the entities and their fields.

    Returns:
        dict: A dictionary of dynamically created Pydantic models.
    """
    models = {}
    
    for entity_name, entity_config in config.items():
        fields = {}
        
        # Add primary key field
        pk = entity_config.get("primary_key")
        if pk:
            fields[pk] = (str, ...)  # Primary key is required
        
        # Add foreign key field
        fk = entity_config.get("foreign_key")
        if fk:
            fields[fk] = (str, None)  # Foreign key is optional
        
        # Create the model dynamically
        models[entity_name] = create_model(entity_name.capitalize(), **fields)
    
    return models


def test_config_links(config_map: Dict[str, Any]) -> dict:
    broken_links = {}

    for key, value in config_map.items():
        fk = value['foreign_key']
        
        # Check if fk of the current key matches with the primary key of any of the other entities
        match_found = any(fk == v['primary_key'] for k, v in config_map.items() if k != key)
        
        if not match_found and fk is not None:
            broken_links[key] = fk
    
    if len(broken_links) == 0:
        return print("Config Map Validated")
    elif len(broken_links) > 0:
        print("Broken Config Map Links ('entity': 'foreign_key')")
        return broken_links
    


def get_foreign_keys(data_map: Dict[str, List[Dict[str, Any]]], config: Dict[str, Any]) -> dict:
    """
    Uses the config to read the entity data from the data_map, and then uses the FK key outlined in the config to find the foreign key values. 
    Args:
        data_map (Dict[str, List[Dict[str, Any]]]): The data map containing the entity data
        config (Dict[str, Any]): The config dictionary

    Returns:
        dict: dictionary of entities and their foreign key values
    """
    fk_entities = {}
    
    for config_entity, config_keys in config.items():
        entity_data = data_map[config_entity]
        records_list = []
        
        for record in entity_data:
            fk = record.get(config_keys['foreign_key'])
            if fk:
                if 'submitter_id' in fk:
                    records_list.append(fk['submitter_id'])
                else:
                    records_list.append(fk)

        fk_entities[config_entity] = records_list
    # print("Foreign Key Entities ('entity': 'foreign_keys'):")
    return fk_entities
    
    
def get_primary_keys(data_map: Dict[str, List[Dict[str, Any]]], config: Dict[str, Any]) -> dict:
    """
    Uses the config to read the entity data from the data_map, and then uses the PK key outlined in the config to find the primary key values. 
    Args:
        data_map (Dict[str, List[Dict[str, Any]]]): The data map containing the entity data
        config (Dict[str, Any]): The config dictionary

    Returns:
        dict: dictionary of entities and their primary key values
    """
    pk_entities = {}
    
    for config_entity, config_keys in config.items():
        entity_data = data_map[config_entity]
        records_list = []
        
        for record in entity_data:
            pk = record.get(config_keys['primary_key'])
            if pk:
                if 'submitter_id' in pk:
                    records_list.append(pk['submitter_id'])
                else:
                    records_list.append(pk)

        pk_entities[config_entity] = records_list
    # print("Primary Key Entities ('entity': 'primary_keys'):")
    return pk_entities
    
    
def validate_links(data_map: Dict[str, List[Dict[str, Any]]], config: Dict[str, Any]) -> Dict[str, List[str]]:
    """
    Verifies Config file, then extracts primary and foreign key values from the data map. Then uses the foreign key values to validate the primary key values.

    Args:
        data_map (Dict[str, List[Dict[str, Any]]]): Contains the data for each entity
        config (Dict[str, Any]): The entity linkage config

    Returns:
        Dict[str, List[str]]: Dictionary of entities and their validation results
    """
    
    test_config_links(config)
    fk_entities = get_foreign_keys(data_map, config)
    pk_entities = get_primary_keys(data_map, config)
    
    validation_results = {}
    for entity, fk_values in fk_entities.items():
        invalid_keys = [fk for fk in fk_values if all(fk not in pk_values for pk_values in pk_entities.values())]
        validation_results[entity] = invalid_keys
        print(f"Entity '{entity}' has {len(invalid_keys)} invalid foreign keys: {invalid_keys}")
    return validation_results

# def validate_relationships(data_map: Dict[str, List[Dict[str, Any]]], config: Dict[str, Any]) -> Dict[str, List[str]]:
    



data_map = {
    "metabolomics_file": [
        {"metabolomics_files": "metabolomics_file_1", "metabolomics_assays": {"submitter_id": "metabolomics_assay_1"}},
        {"metabolomics_files": "metabolomics_file_2", "metabolomics_assays": {"submitter_id": "metabolomics_assay_2"}},
        {"metabolomics_files": "metabolomics_file_3", "metabolomics_assays": {"submitter_id": "metabolomics_assay_3"}}, # broken link
    ],
    "metabolomics_assay": [
        {"metabolomics_assays": "metabolomics_assay_1", "samples": {"submitter_id": "sample_1"}},
        {"metabolomics_assays": "metabolomics_assay_2", "samples": {"submitter_id": "sample_2"}},
        {"metabolomics_assays": "metabolomics_assay_3", "samples": {"submitter_id": "sample_8"}}, # broken link
    ],
    "sample": [
        {"samples": "sample_1", "subjects": {"submitter_id": "subject_1"}},
        {"samples": "sample_2", "subjects": {"submitter_id": "subject_2"}},
        {"samples": "sample_3", "subjects": {"submitter_id": "subject_3"}},
    ],
    "subject": [
        {"subjects": "subject_1"},
        {"subjects": "subject_2"},
        {"subjects": "subject_3"},
    ]
}


link_config = generate_config(data_map, link_suffix='s')

models = generate_models(link_config)


In [89]:
validate_links(data_map, link_config)

Config Map Validated
Entity 'metabolomics_file' has 0 invalid foreign keys: []
Entity 'metabolomics_assay' has 1 invalid foreign keys: ['sample_8']
Entity 'sample' has 0 invalid foreign keys: []
Entity 'subject' has 0 invalid foreign keys: []


{'metabolomics_file': [],
 'metabolomics_assay': ['sample_8'],
 'sample': [],
 'subject': []}

In [78]:
get_foreign_keys(data_map, link_config)
get_primary_keys(data_map, link_config)

Foreign Key Entities ('entity': 'foreign_keys'):
Primary Key Entities ('entity': 'primary_keys'):


{'metabolomics_file': ['metabolomics_file_1',
  'metabolomics_file_2',
  'metabolomics_file_3'],
 'metabolomics_assay': ['metabolomics_assay_1',
  'metabolomics_assay_2',
  'metabolomics_assay_3'],
 'sample': ['sample_1', 'sample_2', 'sample_3'],
 'subject': ['subject_1', 'subject_2', 'subject_3']}

In [75]:
test_config_links(link_config)

All links are valid.


In [76]:
link_config


{'metabolomics_file': {'primary_key': 'metabolomics_files',
  'foreign_key': 'metabolomics_assays'},
 'metabolomics_assay': {'primary_key': 'metabolomics_assays',
  'foreign_key': 'samples'},
 'sample': {'primary_key': 'samples', 'foreign_key': 'subjects'},
 'subject': {'primary_key': 'subjects', 'foreign_key': None}}

In [76]:
# # Converting xlsx to json
# metadata_inst = ParseXlsxMetadata('../data/lipid_metadata_example.xlsx')
# metadata_sheets = metadata_inst.sheet_names

# for sheet in metadata_sheets:
#     metadata_inst.pd_to_json(sheet, f'../data/lipid_pass/{sheet}.json')

In [127]:
linkage_class.data_dict

{'metabolomics_file': [{'alternate_timepoint': '1a914a1577',
   'baseline_timepoint': True,
   'cv': 56.94475432813319,
   'data_category': 'mass spec analysed',
   'data_format': 'wiff',
   'data_type': 'MS/MS',
   'file_format': 'e387cadce7',
   'file_name': 'dummy_metab',
   'file_size': 87,
   'ga4gh_drs_uri': '150bf4b457',
   'md5sum': '756c381b71c2a7d346c72998ab334c00',
   'metabolomic_unit': 'pmol/mL',
   'metabolomics_assays': {'submitter_id': 'metabolomics_assay_356580ff6d'},
   'submitter_id': 'metabolomics_file_547f3d4417',
   'type': 'metabolomics_file'},
  {'alternate_timepoint': '578a14ee53',
   'baseline_timepoint': True,
   'cv': 43.00152620641602,
   'data_category': 'mass spec analysed',
   'data_format': 'wiff',
   'data_type': 'MS/MS',
   'file_format': '47a60862ef',
   'file_name': 'dummy_metab',
   'file_size': 0,
   'ga4gh_drs_uri': '2beb8c16ea',
   'md5sum': '43640335849622369f4843b817c1da2e',
   'metabolomic_unit': 'umol/mL',
   'metabolomics_assays': {'submitt

In [125]:
link_config

{'metabolomics_file': {'primary_key': 'submitter_id',
  'foreign_key': 'metabolomics_assays'},
 'medical_history': {'primary_key': 'submitter_id', 'foreign_key': 'subjects'},
 'metabolomics_assay': {'primary_key': 'submitter_id',
  'foreign_key': 'samples'},
 'sample': {'primary_key': 'submitter_id', 'foreign_key': 'subjects'},
 'subject': {'primary_key': 'submitter_id', 'foreign_key': None}}

In [92]:
# Testing linkage
linkage_class = TestLinkage(schema_path = "../schema/gen3_test_schema.json", data_folder_path = "../data/fail", link_suffix = "s")



Resolved demographic.yaml
Resolved project.yaml
Resolved serum_marker_assay.yaml
Resolved alignment_workflow.yaml
Resolved imaging_file.yaml
Resolved lipidomics_assay.yaml
Resolved metabolomics_file.yaml
Resolved acknowledgement.yaml
Resolved medical_history.yaml
Resolved _settings.yaml
Resolved blood_pressure_test.yaml
Resolved genomics_assay.yaml
Resolved variant_file.yaml
Resolved program.yaml
Resolved serum_marker_file.yaml
Resolved proteomics_assay.yaml
Resolved sample.yaml
Resolved unaligned_reads_file.yaml
Resolved aligned_reads_index_file.yaml
Resolved variant_workflow.yaml
Resolved proteomics_file.yaml
Resolved exposure.yaml
Resolved metabolomics_assay.yaml
Resolved lipidomics_mapping_file.yaml
Resolved lipidomics_file.yaml
Resolved aligned_reads_file.yaml
Resolved lab_result.yaml
Resolved medication.yaml
Resolved publication.yaml
Resolved subject.yaml
Resolved core_metadata_collection.yaml
Loaded /Users/harrijh/projects/gen3-data-validator/data/fail/metabolomics_file.json
Loa

In [13]:
data_map = linkage_class.data_dict
linkage_class.validate_relationships(data_map, link_config)


Generated models: {'metabolomics_file': <class '__main__.Metabolomics_file'>, 'medical_history': <class '__main__.Medical_history'>, 'metabolomics_assay': <class '__main__.Metabolomics_assay'>, 'sample': <class '__main__.Sample'>, 'subject': <class '__main__.Subject'>}
Validating entity: metabolomics_file with records: [{'alternate_timepoint': '1a914a1577', 'baseline_timepoint': True, 'cv': 56.94475432813319, 'data_category': 'mass spec analysed', 'data_format': 'wiff', 'data_type': 'MS/MS', 'file_format': 'e387cadce7', 'file_name': 'dummy_metab', 'file_size': 87, 'ga4gh_drs_uri': '150bf4b457', 'md5sum': '756c381b71c2a7d346c72998ab334c00', 'metabolomic_unit': 'pmol/mL', 'metabolomics_assays': {'submitter_id': 'metabolomics_assay_356580ff6d'}, 'submitter_id': 'metabolomics_file_547f3d4417', 'type': 'metabolomics_file'}, {'alternate_timepoint': '578a14ee53', 'baseline_timepoint': True, 'cv': 43.00152620641602, 'data_category': 'mass spec analysed', 'data_format': 'wiff', 'data_type': 'MS

KeyError: 'metabolomics_files'

In [93]:
# Example configuration and data map
config_map = {
    "samples": {"primary_key": "sample_id", "foreign_key": "subject_id"},
    "files": {"primary_key": "file_id", "foreign_key": "sample_id"},
    "subjects": {"primary_key": "subject_id", "foreign_key": "project_id"},
    "project": {"primary_key": "project_id", "foreign_key": None}
}

data_map = {
    "samples": [
        {"sample_id": "sample_1", "subject_id": "subject_1"},
        {"sample_id": "sample_2", "subject_id": "subject_3"},  # Invalid FK
        {"sample_id": "sample_3", "subject_id": "subject_4"}, # Invalid FK
        {"sample_id": "sample_4", "subject_id": "subject_5"} # Invalid FK
    ],
    "files": [
        {"file_id": "file_1", "sample_id": "sample_1"},
        {"file_id": "file_2", "sample_id": "sample_27"}  # Invalid FK
    ],
    "subjects": [
        {"subject_id": "subject_1", "project_id": "project_1"},  
        {"subject_id": "subject_2", "project_id": "project_2"}, # Missing project 2
    ],
    "project": [
        {"project_id": "project_1"}
    ]
}

linkage_class.validate_links(data_map, config_map)

Config Map Validated
Entity 'samples' has 3 invalid foreign keys: ['subject_3', 'subject_4', 'subject_5']
Entity 'files' has 1 invalid foreign keys: ['sample_27']
Entity 'subjects' has 1 invalid foreign keys: ['project_2']
Entity 'project' has 0 invalid foreign keys: []


{'samples': ['subject_3', 'subject_4', 'subject_5'],
 'files': ['sample_27'],
 'subjects': ['project_2'],
 'project': []}

In [19]:
# Trying pydandic code:

from pydantic import BaseModel, create_model, ValidationError
from typing import Dict, Any, List


def generate_models(config: Dict[str, Any]) -> Dict[str, type]:
    """
    Dynamically generate Pydantic models based on a configuration dictionary.

    Args:
        config (dict): A dictionary defining the entities and their fields.

    Returns:
        dict: A dictionary of dynamically created Pydantic models.
    """
    models = {}
    
    for entity_name, entity_config in config.items():
        fields = {}
        
        # Add primary key field
        pk = entity_config.get("primary_key")
        if pk:
            fields[pk] = (str, ...)  # Primary key is required
        
        # Add foreign key field
        fk = entity_config.get("foreign_key")
        if fk:
            fields[fk] = (str, None)  # Foreign key is optional
        
        # Create the model dynamically
        models[entity_name] = create_model(entity_name.capitalize(), **fields)
    
    return models


def validate_relationships(data_map: Dict[str, List[Dict[str, Any]]], models: Dict[str, type]) -> Dict[str, List[str]]:
    """
    Validate primary key and foreign key relationships between entities.

    Args:
        data_map (dict): A dictionary containing lists of entity data.
        models (dict): A dictionary of dynamically created Pydantic models.

    Returns:
        dict: A dictionary containing lists of error messages for each entity.
    """
    errors = {}

    # Validate each entity's data against its model
    for entity_name, records in data_map.items():
        model = models[entity_name]
        entity_errors = []
        
        # Validate each record
        for record in records:
            try:
                model(**record)
            except ValidationError as e:
                entity_errors.append(f"Validation error in {entity_name}: {e}")
        
        if entity_errors:
            errors[entity_name] = entity_errors

    # Perform cross-entity FK-PK validation
    pk_sets = {entity: {record[config["primary_key"]] for record in records}
               for entity, records in data_map.items()
               for config in [config_map[entity]]}
    
    for entity_name, config in config_map.items():
        fk_field = config.get("foreign_key")
        
        if fk_field:
            fk_entity = next((e for e, c in config_map.items() if c["primary_key"] == fk_field), None)
            
            if fk_entity:
                fk_values = {record[fk_field] for record in data_map[entity_name] if record[fk_field]}
                missing_keys = fk_values - pk_sets[fk_entity]
                
                if missing_keys:
                    if entity_name not in errors:
                        errors[entity_name] = []
                    errors[entity_name].append(f"Invalid foreign keys in {entity_name}: {missing_keys}")

    return errors


# Example configuration and data map
config_map = {
    "samples": {"primary_key": "sample_id", "foreign_key": "subject_id"},
    "files": {"primary_key": "file_id", "foreign_key": "sample_id"},
    "subjects": {"primary_key": "subject_id", "foreign_key": None}
}

data_map = {
    "samples": [
        {"sample_id": "sample_1", "subject_id": "subject_1"},
        {"sample_id": "sample_2", "subject_id": "subject_3"},  # Invalid FK
        {"sample_id": "sample_3", "subject_id": "subject_4"},
        {"sample_id": "sample_4", "subject_id": "subject_5"}
    ],
    "files": [
        {"file_id": "file_1", "sample_id": "sample_1"},
        {"file_id": "file_2", "sample_id": "sample_27"}  # Invalid FK
    ],
    "subjects": [
        {"subject_id": "subject_1"},  # Missing subject_2
        {"subject_id": "subject_2"},
    ]
}



# Generate models and validate relationships
models = generate_models(config_map)

try:
    error_results = validate_relationships(data_map, models)
    if error_results:
        print("Validation Errors:")
        for entity, errs in error_results.items():
            for err in errs:
                print(err)
except ValidationError as e:
    print(e)
except ValueError as e:
    print(e)

error_results


Validation Errors:
Invalid foreign keys in samples: {'subject_3', 'subject_4', 'subject_5'}
Invalid foreign keys in files: {'sample_27'}


{'samples': ["Invalid foreign keys in samples: {'subject_3', 'subject_4', 'subject_5'}"],
 'files': ["Invalid foreign keys in files: {'sample_27'}"]}