In [1]:
pip install python-dotenv

Note: you may need to restart the kernel to use updated packages.


In [2]:
import os
from dotenv import load_dotenv

# Load the variables from .env into the system environment
load_dotenv()

# Retrieve the key
api_key = os.getenv("LLAMA_API_KEY")

if api_key:
    print("API Key loaded successfully!")
else:
    print("Error: API Key not found. Check your .env file.")

API Key loaded successfully!


In [3]:
import os
from pathlib import Path
from dotenv import load_dotenv

# This finds the directory where your script is located
env_path = Path('.') / '.env'
load_dotenv(dotenv_path=env_path)

print(f"Looking for .env at: {os.path.abspath(env_path)}")
print(f"Key found: {os.getenv('LLAMA_API_KEY')}")

Looking for .env at: /Users/mc/Desktop/FD1/.env
Key found: e9f357c0ee4ac7a50e9f12bcdff50267c189c095819017ad913f26d6816ce092


In [4]:
import rdflib
import dotenv
print("Libraries installed and ready for mapping!")

Libraries installed and ready for mapping!


In [17]:
import json
from rdflib import Graph, Namespace, URIRef, Literal, RDF, RDFS, OWL

# 1. Load your JSON and Ontology
with open('/Users/mc/Downloads/ontology_ready_metadata.json', 'r') as f:
    metadata = json.load(f)

g = Graph()
g.parse("/Users/mc/Downloads/data_democratization_ontology.owl", format="xml")

# 2. Define Namespaces (must match your .owl file)
EX = Namespace("http://example.org/fanduel/data-democratization-ontology#")
g.bind("ex", EX)

def map_metadata_to_owl():
    # Loop through tables in JSON
    for table in metadata['tables']:
        # Create a unique URI for the table
        # We sanitize the physical name to use as a fragment
        table_uri = EX[table['physical_name'].replace('.', '_')]
        
        # Assign Type: EnrichedTable or BaseTable
        table_type = EX.EnrichedTable if table['table_type'] == "ENRICHED" else EX.BaseTable
        g.add((table_uri, RDF.type, table_type))
        
        # Add metadata properties
        g.add((table_uri, RDFS.label, Literal(table['physical_name'])))
        g.add((table_uri, EX.assetName, Literal(table['physical_name'])))
        g.add((table_uri, EX.businessDefinition, Literal(table['description'])))

        # Map Columns
        for col in table['columns']:
            # Create unique Column URI (Table_Column)
            col_uri = EX[f"{table['physical_name'].replace('.', '_')}_{col['name']}"]
            g.add((col_uri, RDF.type, EX.Column))
            g.add((col_uri, EX.assetName, Literal(col['name'])))
            g.add((col_uri, EX.inferredDatatype, Literal(f"xsd:{col['data_type']}")))
            
            # Link Column to Table
            g.add((table_uri, EX.hasColumn, col_uri))
            g.add((col_uri, EX.belongsToTable, table_uri))

    # 3. Save the enriched ontology
    g.serialize(destination="enriched_ontology.owl", format="xml")
    print("Mapping complete! Created enriched_ontology.owl")

if __name__ == "__main__":
    map_metadata_to_owl()

Mapping complete! Created enriched_ontology.owl


In [19]:
import ollama
import sys

def get_llama_mapping_logic(prompt):
    try:
        response = ollama.chat(model='llama3', messages=[
            {'role': 'user', 'content': f"Write Python rdflib code for: {prompt}"}
        ])
        return response['message']['content']
    except Exception as e:
        return f"\nCONNECTION ERROR: Ensure 'ollama serve' is running in your terminal. \nDetails: {e}"

# Test the connection
print("--- Llama 3 Mapping Logic ---")
result = get_llama_mapping_logic("Map JSON relationship_type 'belongs_to' to OWL property 'ex:relatedToTable'")
print(result)

--- Llama 3 Mapping Logic ---
Here is an example of how you can use the `rdflib` library in Python to map a JSON relationship type `'belongs_to'` to an OWL property `ex:relatedToTable`:
```
from rdflib import URIRef, Literal
from rdflib.namespace import RDFS

# Define the namespace for the ex prefix
ex = URIRef("http://example.org/")

# Create a new RDF graph
g = Graph()

# Define the OWL property 'ex:relatedToTable'
related_to_table = URIRef(ex + "relatedToTable")

# Define the JSON relationship type 'belongs_to' as an RDF term
belongs_to = Literal("belongs_to", lang="json")

# Map the JSON relationship type to the OWL property
g.add((URIRef(ex + "Table"), RDFS.relatedTo, related_to_table))
g.add((related_to_table, RDFS.label, belongs_to))

print(g.serialize(format="turtle"))
```
This code defines a new RDF graph `g` and creates an OWL property `ex:relatedToTable`. It then maps the JSON relationship type `'belongs_to'` to this property using the `RDFS.relatedTo` predicate. Finally, it

In [21]:
import json
from rdflib import Graph, Namespace, URIRef, Literal, RDF, RDFS, XSD

# 1. Setup Namespaces (Must match your .owl file precisely)
EX = Namespace("http://example.org/fanduel/data-democratization-ontology#")
PROV = Namespace("http://www.w3.org/ns/prov#")

def enrich_ontology():
    # Load JSON data
    with open('/Users/mc/Downloads/ontology_ready_metadata.json', 'r') as f:
        data = json.load(f)

    # Load existing Ontology
    g = Graph()
    g.parse("/Users/mc/Downloads/data_democratization_ontology.owl", format="xml")
    g.bind("ex", EX)
    g.bind("prov", PROV)

    # 2. Map Tables
    for table in data['tables']:
        # Create a URI-safe name (e.g., replacing dots with underscores)
        safe_name = table['physical_name'].replace('.', '_')
        table_uri = EX[safe_name]

        # Determine Class: EnrichedTable or BaseTable
        table_class = EX.EnrichedTable if table['table_type'] == "ENRICHED" else EX.BaseTable
        g.add((table_uri, RDF.type, table_class))
        
        # Add Data Properties
        g.add((table_uri, RDFS.label, Literal(table['physical_name'])))
        g.add((table_uri, EX.assetName, Literal(table['physical_name'])))
        g.add((table_uri, EX.businessDefinition, Literal(table['description'])))

        # 3. Map Columns
        for col in table['columns']:
            col_uri = EX[f"{safe_name}_{col['name']}"]
            g.add((col_uri, RDF.type, EX.Column))
            g.add((col_uri, EX.assetName, Literal(col['name'])))
            g.add((col_uri, EX.inferredDatatype, Literal(f"xsd:{col['data_type']}")))
            
            # Create Relationships
            g.add((table_uri, EX.hasColumn, col_uri))
            g.add((col_uri, EX.belongsToTable, table_uri))

    # 4. Map Relationships (Lineage/Joins)
    for rel in data.get('relationships', []):
        source_uri = EX[rel['source_table'].replace('.', '_')]
        target_uri = EX[rel['target_table'].replace('.', '_')]
        
        # Using a loose relationship property from your ontology
        g.add((source_uri, EX.relatedToTable, target_uri))

    # Save the new file
    g.serialize(destination="enriched_ontology.owl", format="xml")
    print("Successfully created enriched_ontology.owl!")

if __name__ == "__main__":
    enrich_ontology()

Successfully created enriched_ontology.owl!


In [5]:
import ollama
from rdflib import Graph, Namespace, Literal, RDF

# 1. Setup Environment
EX = Namespace("http://example.org/fanduel/data-democratization-ontology#")
g = Graph()
g.parse("enriched_ontology.owl", format="xml")

def get_fan_duel_inference(asset_name, columns, existing_relations):
    """
    FanDuel-specific prompt to infer business logic from raw Databricks metadata.
    """
    prompt = f"""
    CONTEXT (FanDuel Data Lake):
    Technical Table: {asset_name}
    Technical Columns: {', '.join(columns)}
    Existing Ontological Links: {', '.join(existing_relations)}

    TASK:
    1. Define 'Purpose': Explain what this table does for a FanDuel Business Analyst (e.g., wallet management, fraud detection, marketing).
    2. 'Composable Join': Suggest which table this should join with to create a '360 View' of the customer (e.g., joining ledger lines with user profiles).
    3. 'Governance Risk': Rate 1-5 how sensitive this is for GDPR/AML compliance.
    4. 'Example Query': Write a simple SQL snippet that a user could copy/paste.

    FORMAT:
    Purpose: [text]
    JoinSuggestion: [Table Name]
    RiskScore: [1-5]
    SQLSnippet: [SQL code]
    """
    
    response = ollama.chat(model='llama3', messages=[{'role': 'user', 'content': prompt}])
    return response['message']['content']

def enhance_fanduel_ontology():
    # Loop through tables identified in your OWL file
    for table_uri in g.subjects(RDF.type, None):
        if (table_uri, RDF.type, EX.EnrichedTable) in g or (table_uri, RDF.type, EX.BaseTable) in g:
            asset_name = str(g.value(table_uri, EX.assetName))
            # Extract technical columns to give Llama 3 context
            columns = [str(g.value(c, EX.assetName)) for c in g.objects(table_uri, EX.hasColumn)]
            # Extract existing relations (e.g., relatedToTable)
            relations = [str(o).split('#')[-1] for o in g.objects(table_uri, EX.relatedToTable)]
            
            print(f"Generating FanDuel Business Context for: {asset_name}...")
            ai_output = get_fan_duel_inference(asset_name, columns, relations)
            
            # Semantic Integration Logic
            lines = ai_output.split('\n')
            for line in lines:
                if line.startswith("Purpose:"):
                    g.set((table_uri, EX.businessDefinition, Literal(line.replace("Purpose:", "").strip())))
                if line.startswith("SQLSnippet:"):
                    g.set((table_uri, EX.exampleQuery, Literal(line.replace("SQLSnippet:", "").strip())))
                if line.startswith("RiskScore:"):
                    score = line.replace("RiskScore:", "").strip()
                    g.set((table_uri, EX.governanceScore, Literal(int(score) * 20))) # Scale 1-5 to 0-100

    g.serialize(destination="fanduel_smart_metadata.owl", format="xml")
    print("Success: fanduel_smart_metadata.owl created with Llama 3 intelligence.")

if __name__ == "__main__":
    enhance_fanduel_ontology()

Generating FanDuel Business Context for: foundation.financial.ledger_lines_enriched...
Generating FanDuel Business Context for: foundation.financial.withdrawals_enriched_v4...
Generating FanDuel Business Context for: foundation.financial.ledger_lines_enriched_v1...
Generating FanDuel Business Context for: foundation.financial.deposits_enriched_v1...
Generating FanDuel Business Context for: foundation.account.verified_user_details...
Generating FanDuel Business Context for: foundation.account.verification_attempt_v1...
Generating FanDuel Business Context for: foundation.account.authgateway_session_created_events...
Generating FanDuel Business Context for: foundation.financial.ledger_account_balances_v1...
Generating FanDuel Business Context for: foundation.financial.deposits_v4...
Generating FanDuel Business Context for: foundation.financial.withdrawals_v4...
Generating FanDuel Business Context for: foundation.financial.worldpay_transactions...
Success: fanduel_smart_metadata.owl create

Retrieval and Grounding 

In [4]:
import json
import rdflib
from rdflib import Graph, Namespace, RDF, RDFS, OWL
from difflib import SequenceMatcher

# Setup Namespaces
EX = Namespace("http://example.org/fanduel/data-democratization-ontology#")

def get_similarity(a, b):
    """Simple string similarity ratio for ranking candidates."""
    return SequenceMatcher(None, a.lower(), b.lower()).ratio()

class OntologyRetriever:
    def __init__(self, owl_path):
        self.g = Graph()
        self.g.parse(owl_path, format="xml")
        
        # Pre-cache classes and properties with their metadata
        self.classes = self._get_entities(OWL.Class)
        self.properties = self._get_entities(OWL.ObjectProperty) + self._get_entities(OWL.DatatypeProperty)

    def _get_entities(self, entity_type):
        entities = []
        for s in self.g.subjects(RDF.type, entity_type):
            name = str(s).split('#')[-1]
            label = str(self.g.value(s, RDFS.label)) or ""
            comment = str(self.g.value(s, RDFS.comment)) or ""
            entities.append({
                "uri": str(s),
                "name": name,
                "label": label,
                "comment": comment,
                "tokens": name.replace('_', ' ').lower()
            })
        return entities

    def retrieve_candidates(self, tech_name, is_table=True):
        """Finds top 3 candidates from the ontology for a given technical string."""
        search_space = self.classes if is_table else self.properties
        candidates = []
        
        # Clean the technical name (remove prefixes like foundation.financial)
        clean_name = tech_name.split('.')[-1].replace('_', ' ')
        
        for ent in search_space:
            score = get_similarity(clean_name, ent['tokens'])
            if score > 0.3: # Minimum threshold
                candidates.append({**ent, "score": round(score, 2)})
        
        # Rank by similarity score
        return sorted(candidates, key=lambda x: x['score'], reverse=True)[:3]

def run_retrieval_stage():
    retriever = OntologyRetriever("data_democratization_ontology.owl")
    with open('ontology_ready_metadata.json', 'r') as f:
        metadata = json.load(f)

    retrieval_results = []
    
    for table in metadata['tables']:
        table_context = {
            "technical_table": table['physical_name'],
            "table_candidates": retriever.retrieve_candidates(table['physical_name'], is_table=True),
            "columns": []
        }
        
        for col in table['columns']:
            col_context = {
                "technical_column": col['name'],
                "property_candidates": retriever.retrieve_candidates(col['name'], is_table=False)
            }
            table_context['columns'].append(col_context)
            
        retrieval_results.append(table_context)
    
    # Save results for the Generator Stage
    with open('retrieval_grounding_output.json', 'w') as f:
        json.dump(retrieval_results, f, indent=4)
    print("Retrieval & Grounding complete. Results saved to retrieval_grounding_output.json")

if __name__ == "__main__":
    run_retrieval_stage()

Retrieval & Grounding complete. Results saved to retrieval_grounding_output.json


Final Grounding

In [5]:
import json
import rdflib
from rdflib import Graph, Namespace, RDF, RDFS, OWL
from difflib import SequenceMatcher

# Setup Namespaces
EX = Namespace("http://example.org/fanduel/data-democratization-ontology#")

def get_similarity(a, b):
    """Calculates string similarity for ranking candidates."""
    return SequenceMatcher(None, a.lower(), b.lower()).ratio()

class OntologyRetriever:
    def __init__(self, owl_path):
        self.g = Graph()
        self.g.parse(owl_path, format="xml")
        
        # Pre-cache classes and properties with metadata for 'Grounding'
        self.classes = self._get_entities(OWL.Class)
        self.properties = self._get_entities(OWL.ObjectProperty) + self._get_entities(OWL.DatatypeProperty)

    def _get_entities(self, entity_type):
        entities = []
        for s in self.g.subjects(RDF.type, entity_type):
            name = str(s).split('#')[-1]
            label = str(self.g.value(s, RDFS.label)) or ""
            comment = str(self.g.value(s, RDFS.comment)) or ""
            entities.append({
                "uri": str(s),
                "name": name,
                "label": label,
                "comment": comment,
                "tokens": name.replace('_', ' ').lower()
            })
        return entities

    def retrieve_candidates(self, tech_name, is_table=True):
        """Finds top candidates from the OWL based on technical metadata."""
        search_space = self.classes if is_table else self.properties
        candidates = []
        
        # Clean technical name (e.g., foundation.financial.ledger -> ledger)
        clean_name = tech_name.split('.')[-1].replace('_', ' ')
        
        for ent in search_space:
            score = get_similarity(clean_name, ent['tokens'])
            if score > 0.3: 
                candidates.append({**ent, "score": round(score, 2)})
        
        return sorted(candidates, key=lambda x: x['score'], reverse=True)[:3]

def run_retrieval_stage():
    retriever = OntologyRetriever("data_democratization_ontology.owl")
    with open('ontology_ready_metadata.json', 'r') as f:
        metadata = json.load(f)

    retrieval_results = []
    for table in metadata['tables']:
        context = {
            "tech_table": table['physical_name'],
            "class_candidates": retriever.retrieve_candidates(table['physical_name'], True),
            "columns": []
        }
        for col in table['columns']:
            context['columns'].append({
                "tech_column": col['name'],
                "prop_candidates": retriever.retrieve_candidates(col['name'], False)
            })
        retrieval_results.append(context)
    
    with open('grounding_context.json', 'w') as f:
        json.dump(retrieval_results, f, indent=4)
    print("Grounding context generated in grounding_context.json")

if __name__ == "__main__":
    run_retrieval_stage()

Grounding context generated in grounding_context.json


Generator (example)

In [7]:
import json
import ollama
from rdflib import Graph, Namespace, URIRef, Literal, RDF

# Setup Namespaces
EX = Namespace("http://example.org/fanduel/data-democratization-ontology#")

def get_final_mapping(tech_item, candidates):
    """
    Asks Llama 3 to select the best ontological candidate 
    based on the retrieved grounding context.
    """
    candidate_summary = "\n".join([
        f"- {c['name']} (Score: {c['score']}): {c['comment']}" 
        for c in candidates
    ])
    
    prompt = f"""
    TECHNICAL ASSET: {tech_item}
    CANDIDATES FROM ONTOLOGY:
    {candidate_summary}

    TASK:
    1. Select the BEST candidate.
    2. Explain why based on the 'comment' or 'label'.
    3. Return the choice in format: Selection: [Name]
    """
    
    response = ollama.chat(model='llama3', messages=[{'role': 'user', 'content': prompt}])
    return response['message']['content']

def run_generator_stage():
    # Load Grounding Context
    with open('grounding_context.json', 'r') as f:
        grounding_data = json.load(f)

    g = Graph()
    g.parse("enriched_ontology.owl", format="xml")

    for entry in grounding_data:
        # Finalize Table Mapping
        table_uri = EX[entry['tech_table'].replace('.', '_')]
        decision = get_final_mapping(entry['tech_table'], entry['class_candidates'])
        
        if "Selection:" in decision:
            selected_class = decision.split("Selection:")[1].strip()
            g.add((table_uri, EX.describesBusinessConcept, EX[selected_class]))

        # Finalize Column Mappings
        for col in entry['columns']:
            col_uri = EX[f"{entry['tech_table'].replace('.', '_')}_{col['tech_column']}"]
            col_decision = get_final_mapping(col['tech_column'], col['prop_candidates'])
            
            if "Selection:" in col_decision:
                selected_prop = col_decision.split("Selection:")[1].strip()
                g.add((col_uri, EX.referencesBusinessConcept, EX[selected_prop]))

    g.serialize(destination="final_grounded_ontology.owl", format="xml")
    print("Generator Stage complete. Final ontology saved as final_grounded_ontology.owl")

if __name__ == "__main__":
    run_generator_stage()

http://example.org/fanduel/data-democratization-ontology#EnrichedTable**

I chose EnrichedTable as the best candidate because it has a score of 0.43, which is not the highest but still relatively high. Additionally, its label "A table produced by enrichment or denormalization (e.g., deposits_enriched_v1)" suggests that it is likely to be related to financial data and is an enriched or denormalized version of a table. This matches with the name of the technical asset being considered, which also has "enriched" in its name.

The other candidates, LedgerLine and LedgerEntity, have lower scores and their labels do not seem directly related to financial data enrichment or denormalization. does not look like a valid URI, trying to serialize this will break.
http://example.org/fanduel/data-democratization-ontology#** postedAsLedgerLine

**Reasoning:** The score for this candidate is the highest (0.62), indicating a strong match between the technical asset "ledger_line_id" and the concept of "

Generator Stage complete. Final ontology saved as final_grounded_ontology.owl
