In [1]:
%pip install faker

^C
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 25.0 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip




In [None]:
import re
import json
import hashlib
from faker import Faker
from rdflib import Graph, Literal, URIRef, Namespace
from rdflib.namespace import RDF
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient
from openai import AzureOpenAI
from types import SimpleNamespace

# Namespaces
SCHEMA = Namespace("https://schema.org/")
APP = Namespace("http://application.com/")
PRO = Namespace("http://process.com/")
ORG1 = Namespace("http://organization.com/")
EMP = Namespace("http://employee.com/")
FIN = Namespace("http://financial.com/")

# Load configuration
def load_config():
    try:
        with open(r"config.json") as f:
            config = json.load(f, object_hook=lambda d: SimpleNamespace(**d))
        print("Config loaded:", vars(config))
        return config
    except FileNotFoundError:
        raise FileNotFoundError("Config file not found. Please check the path.")

# Initialize Azure OpenAI client
def initialize_azure_client(config):
    client = SecretClient(vault_url=config.key_vault_url, credential=DefaultAzureCredential())
    secret = client.get_secret(config.dev_secret_name)
    llm = AzureOpenAI(
        api_key=secret.value,
        api_version=config.chat.api_version,
        azure_endpoint=config.chat.azure_endpoint
    )
    print("AzureOpenAI client initialized")
    return llm

# LLM-based text generation
def generate_with_llm(llm, config, entity_type, field_name, old_value):
    prompt = [
        {
            "role": "system",
            "content": f"""
            You are an AI assistant tasked with generating anonymized values for a financial department's software industry knowledge graph.
            Generate a realistic and contextually appropriate replacement for the given {entity_type} {field_name}.
            - Ensure the output aligns with a financial/software context (e.g., company names, job titles, process titles, or app names).
            - Avoid using the original value: '{old_value}'.
            - If the original value contains codes, abbreviations, or separators (e.g., '-', 'SE', 'FIN'), create a plausible name that mimics corporate or technical naming conventions.
            - Keep the length and tone similar to the original where possible.

            Original {field_name}: '{old_value}'
            Provide a single anonymized value as output.
            """
        }
    ]
    try:
        response = llm.chat.completions.create(
            model=config.chat.model,
            messages=prompt
        )
        content = response.choices[0].message.content
        if content is None:
            print(f"Warning: LLM returned None for {entity_type} {field_name} with old_value '{old_value}'")
            fake = deterministic_faker(str(old_value))
            if entity_type == "Organization" and field_name == "name":
                return fake.company()
            elif entity_type == "Organization" and field_name == "description":
                return fake.catch_phrase()
            elif entity_type == "Person" and field_name == "jobTitle":
                return fake.job()
            elif entity_type == "Application" and field_name == "appName":
                return fake.word().title() + "App"
            elif entity_type == "Application" and field_name == "appDescription":
                return fake.sentence(nb_words=8)
            elif entity_type == "Process" and field_name == "title":
                return "Process " + fake.word().title()
            elif entity_type == "Process" and field_name == "description":
                return fake.sentence(nb_words=10)
            return "Anonymized Placeholder"
        return content.strip()
    except Exception as e:
        print(f"Error in generate_with_llm: {e}, old_value: '{old_value}'")
        raise

# Deterministic Faker
def deterministic_faker(seed_str, locale="en_US"):
    fake = Faker(locale)
    hash_int = int(hashlib.sha256(seed_str.encode('utf-8')).hexdigest(), 16) % (10**8)
    fake.seed_instance(hash_int)
    return fake

# Anonymization Functions
def anonymize_person(graph, person, llm, config):
    mapping_record = {}
    gid = str(graph.value(person, SCHEMA.gid) or person)
    fake = deterministic_faker(gid)
    
    old_given = str(graph.value(person, SCHEMA.givenName) or "")
    old_family = str(graph.value(person, SCHEMA.familyName) or "")
    old_email = str(graph.value(person, SCHEMA.email) or "")
    old_job = str(graph.value(person, SCHEMA.jobTitle) or "")
    old_tel = str(graph.value(person, SCHEMA.telephone) or "")
    old_location = str(graph.value(person, SCHEMA.location) or "")
    old_id = str(graph.value(person, SCHEMA.id) or "")
    
    given_parts = old_given.split()
    new_given = " ".join([fake.first_name() for _ in given_parts]) if given_parts else fake.first_name()
    new_family = fake.last_name()
    new_email = f"{new_given.lower().replace(' ', '.')}.{new_family.lower()}@example.net"
    new_job = generate_with_llm(llm, config, "Person", "jobTitle", old_job) if old_job else fake.job()
    new_tel = fake.phone_number()
    new_location = fake.city()
    new_gid = fake.bothify(text="Z########")
    person_str = str(person)
    new_id = person_str.split("emp:")[-1] if "emp:" in person_str else old_id
    
    graph.set((person, SCHEMA.givenName, Literal(new_given)))
    graph.set((person, SCHEMA.familyName, Literal(new_family)))
    graph.set((person, SCHEMA.email, Literal(new_email)))
    graph.set((person, SCHEMA.jobTitle, Literal(new_job)))
    graph.set((person, SCHEMA.telephone, Literal(new_tel)))
    graph.set((person, SCHEMA.location, Literal(new_location)))
    graph.set((person, SCHEMA.gid, Literal(new_gid)))
    graph.set((person, SCHEMA.id, Literal(new_id)))
    
    mapping_record.update({
        "givenName": {"old": old_given, "new": new_given},
        "familyName": {"old": old_family, "new": new_family},
        "email": {"old": old_email, "new": new_email},
        "jobTitle": {"old": old_job, "new": new_job},
        "telephone": {"old": old_tel, "new": new_tel},
        "location": {"old": old_location, "new": new_location},
        "gid": {"old": gid, "new": new_gid},
        "id": {"old": old_id, "new": new_id}
    })
    return mapping_record

def anonymize_organization(graph, org, llm, config):
    mapping_record = {}
    org_str = str(org)
    new_orgId = org_str.split("org1:")[-1] if "org1:" in org_str else str(graph.value(org, SCHEMA.orgId) or "")
    fake = deterministic_faker(new_orgId)
    
    old_name = str(graph.value(org, SCHEMA.name) or "")
    old_description = str(graph.value(org, SCHEMA.description) or "")
    old_orgID = str(graph.value(org, SCHEMA.orgID) or "")
    old_real_orgId = str(graph.value(org, SCHEMA.orgId) or "")
    
    new_name = generate_with_llm(llm, config, "Organization", "name", old_name) if old_name else fake.company()
    new_description = generate_with_llm(llm, config, "Organization", "description", old_description) if old_description else fake.catch_phrase()
    
    graph.remove((org, SCHEMA.orgID, None))
    graph.set((org, SCHEMA.name, Literal(new_name)))
    graph.set((org, SCHEMA.description, Literal(new_description)))
    graph.set((org, SCHEMA.orgId, Literal(new_orgId)))
    
    mapping_record.update({
        "name": {"old": old_name, "new": new_name},
        "orgID_removed": old_orgID,
        "orgId": {"old": old_real_orgId, "new": new_orgId},
        "description": {"old": old_description, "new": new_description}
    })
    return mapping_record

def anonymize_application(graph, app, llm, config):
    mapping_record = {}
    app_str = str(app)
    new_appId = app_str.split("app:")[-1] if "app:" in app_str else str(graph.value(app, APP.appId) or "")
    fake = deterministic_faker(new_appId)
    
    old_name = str(graph.value(app, APP.appName) or "")
    old_description = str(graph.value(app, APP.appDescription) or "")
    old_appId = str(graph.value(app, APP.appId) or "")
    old_accessLink = str(graph.value(app, APP.accessLink) or "")
    old_appLink = str(graph.value(app, APP.appLink) or "")
    old_appImage = str(graph.value(app, APP.appImage) or "")
    
    new_name = generate_with_llm(llm, config, "Application", "appName", old_name) if old_name else fake.word().title() + "App"
    new_description = generate_with_llm(llm, config, "Application", "appDescription", old_description) if old_description else fake.sentence(nb_words=8)
    new_accessLink = fake.url()
    new_appLink = fake.url()
    
    graph.set((app, APP.appName, Literal(new_name)))
    graph.set((app, APP.appDescription, Literal(new_description)))
    graph.set((app, APP.appId, Literal(new_appId)))
    graph.set((app, APP.accessLink, Literal(new_accessLink)))
    graph.set((app, APP.appLink, Literal(new_appLink)))
    graph.remove((app, APP.appImage, None))
    
    mapping_record.update({
        "appName": {"old": old_name, "new": new_name},
        "appDescription": {"old": old_description, "new": new_description},
        "appId": {"old": old_appId, "new": new_appId},
        "accessLink": {"old": old_accessLink, "new": new_accessLink},
        "appLink": {"old": old_appLink, "new": new_appLink},
        "appImage": {"old": old_appImage, "new": None}
    })
    return mapping_record

def anonymize_process(graph, proc, llm, config):
    mapping_record = {}
    proc_str = str(proc)
    new_processId = proc_str.split("pro:")[-1] if "pro:" in proc_str else str(graph.value(proc, PRO.processId) or "")
    fake = deterministic_faker(new_processId)
    
    old_title = str(graph.value(proc, PRO.title) or "")
    old_description = str(graph.value(proc, PRO.description) or "")
    old_processId = str(graph.value(proc, PRO.processId) or "")
    old_referenceUrls = str(graph.value(proc, PRO.referenceUrls) or "")
    
    new_title = generate_with_llm(llm, config, "Process", "title", old_title) if old_title else "Process " + fake.word().title()
    new_description = generate_with_llm(llm, config, "Process", "description", old_description) if old_description else fake.sentence(nb_words=10)
    new_referenceUrls = fake.url()
    
    graph.set((proc, PRO.title, Literal(new_title)))
    graph.set((proc, PRO.description, Literal(new_description)))
    graph.set((proc, PRO.processId, Literal(new_processId)))
    graph.set((proc, PRO.referenceUrls, Literal(new_referenceUrls)))
    
    mapping_record.update({
        "title": {"old": old_title, "new": new_title},
        "description": {"old": old_description, "new": new_description},
        "processId": {"old": old_processId, "new": new_processId},
        "referenceUrls": {"old": old_referenceUrls, "new": new_referenceUrls}
    })
    return mapping_record

def anonymize_sources(graph):
    mapping_records = {}
    for s, p, o in graph.triples((None, SCHEMA.sources, None)):
        old_source = str(o)
        new_source = old_source
        new_source = re.sub(r"https://cosmos\.siemens-energy\.cloud", "https://example.company.cloud", new_source)
        new_source = re.sub(r"https://finance-center\.mosaic\.siemens-energy\.cloud", "https://example.company.cloud", new_source)
        
        if "employeeGid=" in new_source:
            anonymized_gid = str(graph.value(s, SCHEMA.gid) or "")
            if anonymized_gid:
                new_source = re.sub(r"(employeeGid=)[^&]+", r"\g<1>" + anonymized_gid, new_source)
        if "id=" in new_source and (s, RDF.type, SCHEMA.Person) in graph:
            anonymized_id = str(graph.value(s, SCHEMA.id) or "")
            if anonymized_id:
                new_source = re.sub(r"(id=)[^&]+", r"\g<1>" + anonymized_id, new_source)
        if "process_id=" in new_source and (s, RDF.type, PRO.Process) in graph:
            anonymized_process_id = str(graph.value(s, PRO.processId) or "")
            if anonymized_process_id:
                new_source = re.sub(r"(process_id=)[^&]+", r"\g<1>" + anonymized_process_id, new_source)
        if "appId=" in new_source and (s, RDF.type, APP.Application) in graph:
            anonymized_app_id = str(graph.value(s, APP.appId) or "")
            if anonymized_app_id:
                new_source = re.sub(r"(appId=)[^&]+", r"\g<1>" + anonymized_app_id, new_source)
        
        if new_source != old_source:
            graph.set((s, SCHEMA.sources, Literal(new_source)))
        mapping_records[old_source] = new_source
    return mapping_records

def anonymize_all_entities(graph, llm, config):
    overall_mapping = {}
    for person in graph.subjects(RDF.type, SCHEMA.Person):
        overall_mapping[str(person)] = {"type": "Person", "mapping": anonymize_person(graph, person, llm, config)}
    for org in graph.subjects(RDF.type, SCHEMA.Organization):
        overall_mapping[str(org)] = {"type": "Organization", "mapping": anonymize_organization(graph, org, llm, config)}
    for app in graph.subjects(RDF.type, APP.Application):
        overall_mapping[str(app)] = {"type": "Application", "mapping": anonymize_application(graph, app, llm, config)}
    for proc in graph.subjects(RDF.type, PRO.Process):
        overall_mapping[str(proc)] = {"type": "Process", "mapping": anonymize_process(graph, proc, llm, config)}
    sources_mapping = anonymize_sources(graph)
    if sources_mapping:
        overall_mapping["sources"] = {"type": "sources", "mapping": sources_mapping}
    return overall_mapping

if __name__ == "__main__":
    input_ttl = r"kgCreation\ExtendedFinKG_Pro.ttl"
    output_ttl = r"anonymize\ExtendedFinKG_Pro_anonymized_llm_test.ttl"
    mapping_file = r"anonymize\anonymization_mapping_llm_test.json"
    
    config = load_config()
    llm = initialize_azure_client(config)
    
    try:
        test_response = generate_with_llm(llm, config, "Organization", "name", "TestCorp")
        print(f"Test LLM response: {test_response}")
    except Exception as e:
        print(f"LLM test failed: {e}")
        raise
    
    print("Loading RDF data...")
    g = Graph()
    g.parse(input_ttl, format="turtle")
    
    print("Anonymizing RDF entities with LLM...")
    overall_mapping = anonymize_all_entities(g, llm, config)
    
    g.serialize(destination=output_ttl, format="turtle")
    print(f"Updated RDF graph saved to {output_ttl}")
    
    with open(mapping_file, "w") as f:
        json.dump(overall_mapping, f, indent=2)
    print(f"Overall mapping saved to {mapping_file}")
    


In [None]:
import re
import json
import pandas as pd

def load_mapping(json_file):
    """Load the overall JSON mapping produced from the RDF anonymization."""
    with open(json_file, "r") as f:
        mapping = json.load(f)
    return mapping

def flatten_mapping(overall_mapping, min_length=3):
    """
    Convert the nested overall mapping into a flat dictionary that maps
    each original sensitive string to its new anonymized value.
    For Person entities, combine givenName and familyName into a full name.
    """
    flat = {}
    for entity, record in overall_mapping.items():
        entity_type = record.get("type")
        mapping = record.get("mapping", {})
        if entity_type == "Person":
            # Combine givenName and familyName into full name
            old_given = mapping.get("givenName", {}).get("old", "")
            old_family = mapping.get("familyName", {}).get("old", "")
            new_given = mapping.get("givenName", {}).get("new", "")
            new_family = mapping.get("familyName", {}).get("new", "")
            old_full_name = f"{old_given} {old_family}".strip()
            new_full_name = f"{new_given} {new_family}".strip()
            if len(old_full_name) >= min_length and old_full_name and new_full_name:
                flat[old_full_name] = new_full_name
            # Include other fields like gid, email, etc.
            for field, vals in mapping.items():
                if field not in ["givenName", "familyName"]: 
                    orig = str(vals.get("old", ""))
                    new_val = str(vals.get("new", ""))
                    if len(orig) >= min_length and orig and new_val:
                        flat[orig] = new_val
        else:
            # Handle other entity types (Organization, Application, Process, sources)
            for key, vals in mapping.items():
                if isinstance(vals, dict) and "old" in vals and "new" in vals:
                    orig = str(vals["old"])
                    new_val = str(vals["new"])
                else:  # For "sources" mapping
                    orig = str(key)
                    new_val = str(vals)
                if len(orig) >= min_length and orig and new_val:
                    flat[orig] = new_val
    return flat

def anonymize_text(text, flat_mapping, use_word_boundaries=False):
    """
    Replace each occurrence of any original sensitive string (a key in flat_mapping)
    with its new value. Prioritize longer strings (like full names) first.
    """
    if not isinstance(text, str):
        return text  
    for original in sorted(flat_mapping.keys(), key=len, reverse=True):
        new_val = flat_mapping[original]
        if use_word_boundaries:
            pattern = r'\b' + re.escape(original) + r'\b'
        else:
            pattern = re.escape(original)
        text = re.sub(pattern, new_val, text)
    return text

def process_excel_file(input_excel, output_excel, flat_mapping, columns_to_process=["Query", "Ground Truth Answer"], use_word_boundaries=False):
    """
    Load an Excel file, replace sensitive text in the specified columns using the flat mapping.
    """
    # Load Excel file
    df = pd.read_excel(input_excel)
    # Process each specified column
    for col in columns_to_process:
        if col in df.columns:
            df[col] = df[col].apply(lambda x: anonymize_text(x, flat_mapping, use_word_boundaries))
        else:
            print(f"Warning: Column '{col}' not found in Excel file.")
    # Save the updated Excel file
    df.to_excel(output_excel, index=False)
    print(f"Anonymized Excel file saved to: {output_excel}")

if __name__ == "__main__":
    mapping_file = r"anonymize\anonymization_mapping_llm_test.json"
    input_excel = r"data\LLMEval_1.xlsx"
    output_excel = r"data\LLMEval_anonymized_llm.xlsx"

    # Load and flatten the mapping
    overall_mapping = load_mapping(mapping_file)
    flat_mapping = flatten_mapping(overall_mapping, min_length=3)

    # Process the Excel file
    process_excel_file(
        input_excel=input_excel,
        output_excel=output_excel,
        flat_mapping=flat_mapping,
        columns_to_process=["Query", "Ground Truth Answer"],
        use_word_boundaries=True 
    )
