In [2]:
import sys
from loguru import logger
from bs4 import BeautifulSoup
import pandas as pd
import regex as re


## Logger configuration
logger.remove()
logger.add(sys.stderr, 
           level="INFO",
           format="{time:HH:mm:ss} | {level} | {message}")

## Convert XML soup to JSON format
def xml_to_json(element):
    
    """
    Recursively parses XML soup, returning as JSON format 
    """
    
    if isinstance(element, str):
        return element
    
    if not element.contents:
        return element.string
    
    result = {}
    
    for child in element.children:
        
        if isinstance(child, str):
            continue
        
        if child.name not in result:
            result[child.name] = xml_to_json(child)
            
        else:
            if not isinstance(result[child.name], list):
                result[child.name] = [result[child.name]]
            result[child.name].append(xml_to_json(child))
            
    ### Directly capture text nodes without 'text' key
    if element.string and element.string.strip():
        return element.string.strip()
    
    return result

In [5]:
def find_relationships(entity):
    
    """
    Given a JSON entity, return all available relationship information
    AND 
    """
    
    ## Record entity type 
    entity_type = entity["generalInfo"]["entityType"]
        
    ## Collect entity name
    name_ele = entity["names"]["name"]
    
    ### if name_ele is a dict, only one name entry exists
    if type(name_ele) == dict:
        
        #### Find Latin translation if more than one translation is present
        translation_element = name_ele["translations"]["translation"]
        
        if type(translation_element) == dict:
                entity_name = translation_element["formattedFullName"]

        elif type(translation_element) == list:
            for trans in translation_element:
                if trans["script"] == "Latin":
                    entity_name = trans["formattedFullName"]

    ### If name element is a list, aliases are present. Collect only primary name
    elif type(name_ele) == list:
        
        #### Find the primary name 
        for name in name_ele:
            if name["isPrimary"] == "true":
                translation_element = name["translations"]["translation"] 

                ##### Find Latin translation if more than one translation is present
                if type(translation_element) == dict:
                    entity_name = translation_element["formattedFullName"]
                                   
                elif type(translation_element) == list:
                    for trans in translation_element:
                        if trans["script"] == "Latin":
                            entity_name = trans["formattedFullName"]



    ## Confirm entity includes relationship information, if not just return entity name and type
    type_return = [entity_name, entity_type]
    logger.debug(f"type_return = {type_return}")
    
    if "relationships" not in entity.keys(): 
        return None, type_return
    if entity["relationships"] == None: 
        return None, type_return
    
    
## Collect relationship information
    relationships = entity["relationships"]["relationship"]
    rel_list = []
    
    ### if relationships is a dict, only one relationship is present
    if type(relationships) == dict:
        
        rel_type = relationships["type"]
        rel_entity = relationships["relatedEntity"]
        
        if rel_entity != None:
            rel_list = [entity_name, rel_type, rel_entity]
    
    ### If relationships is a list, multiple relationships are present 
    elif type(relationships) == list: 
        
        for rel in relationships:
            
            rel_type = rel["type"]
            rel_entity = rel["relatedEntity"]
            
            if rel_entity != None:
                rel_list.append([entity_name, rel_type, rel_entity]) 
            
    return rel_list, type_return

def format_name(entity_name):
    
    """
    Standardize the format for entity names retrieved from "formattedFullName"  
    """
    
    # Arrange name based on comma location, if present
    if ", " in entity_name:
        name_parts = entity_name.split(", ")
        entity_name = f"{name_parts[1]} {name_parts[0]}"
    
    # Apply title-case formatting
    entity_name = entity_name.title()
    
    # Capitalize any parenthetical text
    def capitalize(match):
        return match.group(1) + match.group(2).upper() + match.group(3)
    
    pattern = r'(\()([^\)]+)(\))'
    entity_name = re.sub(pattern, capitalize, entity_name)
    
    return entity_name



## Main Function
def rel_extractor_main(entity_data):

    relationships = []
    entity_types = []
    
    for entity in entity_data:
        
        logger.debug(f"Extracting from entity: {entity["generalInfo"]["identityId"]}")
        rel_search, e_type = find_relationships(entity)
        e_type = [e_type[0], e_type[1]]
        entity_types.append(e_type)
        
        
        if rel_search:
            if type(rel_search[0]) == str:
                relationships.append(rel_search)
            
            elif type(rel_search == list):
                for rel in rel_search:
                    relationships.append(rel)
            
            
    ## Convert relationships into a dataframe, apply formatting  
    rel_df = pd.DataFrame(relationships, columns=['entity_1', 'relationship', 'entity_2'])       
    rel_df["entity_1"] = rel_df["entity_1"].apply(format_name)
    rel_df["entity_2"] = rel_df["entity_2"].apply(format_name)
    
    type_df = pd.DataFrame(entity_types, columns=["entity_name", "entity_type"])
    # print(entity_types)
    ## Save dataframe as csv
    # df.to_csv(args.output_file, index = False)
    
    ## Or return DF if desired 
    # Seems better for use in main.py 
    return rel_df, type_df

In [6]:
def main(input_file):
    
    """
    Accepts an XML of US OFAC sanctions information, returning a csv of relationship nodes and edges 
    """
    
    ## Load XML Sanctions Data 
    try:
        with open(input_file, "rb") as file:
            xml_data = file.read()
            logger.debug(f"{input_file} loaded")

    except FileNotFoundError:
        logger.error(f"Input file not found: {input_file}")
        
    except Exception as e:
        logger.error(f"An unexpected error occurred: {e}")
        
        
    ## Convert XML to JSON format, isolate entity data 
    soup = BeautifulSoup(xml_data, features='xml')
    
    entity_json = xml_to_json(soup)
    entity_data = entity_json['sanctionsData']["entities"]["entity"]
    entity_data = [entity for entity in entity_data if entity["generalInfo"]["entityType"] in ["Individual", "Entity"]]
    logger.info(f"Entities found: {len(entity_data)}")
    
    ## Extract relationship data and create an entity type map df 
    rel_df, type_df = rel_extractor_main(entity_data)
    
    ### Result information
    logger.info(f"Relationships found: {rel_df.shape[0]}")
    logger.info(f"Entity types mapped: {type_df.shape[0]}")
    
    
    ### Save as csv?
    # rel_df.to_csv("relationships.csv", index = False)
    # logger.info(f"Relationship data saved as csv file: `relationships.csv`")
    
    # type_df.to_csv("entity_types.csv", index = False)
    # logger.info(f"Entity types saved as `entity_types.csv`")
    
    return rel_df, type_df
    

rel_df, type_df = main("XML Data/mideast_sanctions.xml")

rel_df.to_csv("IRGC_sanctions.csv", index = False)

21:31:46 | INFO | Entities found: 925
21:31:46 | INFO | Relationships found: 189
21:31:46 | INFO | Entity types mapped: 925
