# 05 - Instance Data Translator

Translates RDF instance data to node and edge records.

**Input**:
- `bronze_triples` - Raw RDF triples
- `silver_node_types` - Node type definitions (from F4.1)
- `silver_properties` - Property mappings (from F4.2)

**Output**:
- `silver_nodes` - Node records with properties
- `silver_edges` - Edge records linking nodes

## Translation Rules

| RDF Pattern | LPG Output |
|-------------|------------|
| Subject with rdf:type | Node with labels |
| DatatypeProperty triple | Node property |
| ObjectProperty triple | Edge record |

In [None]:
# Configuration
INPUT_TRIPLES = "bronze_triples"
INPUT_NODE_TYPES = "silver_node_types"
INPUT_PROPERTIES = "silver_properties"
OUTPUT_NODES = "silver_nodes"
OUTPUT_EDGES = "silver_edges"

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, ArrayType, MapType, BooleanType
from pyspark.sql.window import Window
import re
import hashlib

# RDF namespace constants
RDF_TYPE = "http://www.w3.org/1999/02/22-rdf-syntax-ns#type"
RDFS_LABEL = "http://www.w3.org/2000/01/rdf-schema#label"
SKOS_PREFLABEL = "http://www.w3.org/2004/02/skos/core#prefLabel"

# Schema predicate URIs to exclude from instance data
SCHEMA_PREDICATES = {
    "http://www.w3.org/2000/01/rdf-schema#subClassOf",
    "http://www.w3.org/2000/01/rdf-schema#domain",
    "http://www.w3.org/2000/01/rdf-schema#range",
    "http://www.w3.org/2002/07/owl#equivalentClass",
    "http://www.w3.org/2002/07/owl#disjointWith",
    "http://www.w3.org/2002/07/owl#inverseOf",
}

# Schema type URIs (subjects with these types are schema, not instances)
SCHEMA_TYPES = {
    "http://www.w3.org/2002/07/owl#Class",
    "http://www.w3.org/2002/07/owl#ObjectProperty",
    "http://www.w3.org/2002/07/owl#DatatypeProperty",
    "http://www.w3.org/2002/07/owl#AnnotationProperty",
    "http://www.w3.org/2002/07/owl#Ontology",
    "http://www.w3.org/2000/01/rdf-schema#Class",
    "http://www.w3.org/1999/02/22-rdf-syntax-ns#Property",
    "http://www.w3.org/ns/shacl#NodeShape",
    "http://www.w3.org/ns/shacl#PropertyShape",
}

def extract_local_name(uri: str) -> str:
    """Extract local name from URI."""
    if uri is None:
        return None
    if uri.startswith("_:"):
        return uri[2:]
    if "#" in uri:
        return uri.split("#")[-1]
    if "/" in uri:
        return uri.split("/")[-1]
    return uri

def generate_node_id(uri: str, graph: str) -> str:
    """Generate stable node ID from URI."""
    if uri is None:
        return None
    if uri.startswith("_:"):
        # Blank node - create stable hash using graph context
        combined = f"{graph}:{uri}"
        return "blank_" + hashlib.md5(combined.encode()).hexdigest()[:12]
    # Named node - use local name or hash of full URI
    local = extract_local_name(uri)
    if local and len(local) <= 50:
        return local
    return hashlib.md5(uri.encode()).hexdigest()[:16]

def sanitize_property_name(name: str) -> str:
    """Convert property name to camelCase identifier."""
    if name is None:
        return None
    cleaned = re.sub(r'[^a-zA-Z0-9]', ' ', name)
    words = cleaned.split()
    if not words:
        return "unknownProperty"
    result = words[0].lower() + ''.join(w.capitalize() for w in words[1:])
    if result and not result[0].isalpha():
        result = "p" + result
    return result

extract_local_name_udf = F.udf(extract_local_name, StringType())
generate_node_id_udf = F.udf(generate_node_id, StringType())
sanitize_property_name_udf = F.udf(sanitize_property_name, StringType())

In [None]:
# Load input tables
df_triples = spark.table(INPUT_TRIPLES)
print(f"Loaded {df_triples.count()} triples from '{INPUT_TRIPLES}'")

# Load node types for label mapping
df_node_types = spark.table(INPUT_NODE_TYPES)
print(f"Loaded {df_node_types.count()} node types from '{INPUT_NODE_TYPES}'")

# Load property mappings
df_properties = spark.table(INPUT_PROPERTIES)
print(f"Loaded {df_properties.count()} properties from '{INPUT_PROPERTIES}'")

In [None]:
# Identify schema subjects (things that ARE classes/properties, not instances)
df_schema_subjects = df_triples.filter(
    (F.col("predicate") == RDF_TYPE) & 
    (F.col("object").isin(list(SCHEMA_TYPES)))
).select("subject").distinct()

schema_subjects = set(row.subject for row in df_schema_subjects.collect())
print(f"Found {len(schema_subjects)} schema subjects to exclude")

In [None]:
# Filter to instance triples only (exclude schema definitions)
df_instance_triples = df_triples.filter(
    ~F.col("subject").isin(schema_subjects) &
    ~F.col("predicate").isin(list(SCHEMA_PREDICATES))
)

print(f"Instance triples: {df_instance_triples.count()}")

In [None]:
# Get all unique instance subjects (potential nodes)
df_subjects = df_instance_triples.select(
    F.col("subject").alias("uri"),
    F.col("graph")
).distinct()

# Generate node IDs
df_subjects = df_subjects.withColumn(
    "node_id",
    generate_node_id_udf(F.col("uri"), F.col("graph"))
)

print(f"Found {df_subjects.count()} unique instance subjects")

In [None]:
# Get rdf:type declarations for instances → node labels
df_types = df_instance_triples.filter(
    F.col("predicate") == RDF_TYPE
).select(
    F.col("subject").alias("uri"),
    F.col("object").alias("type_uri"),
    F.col("graph")
)

# Join with node_types to get the label name
df_types_labeled = df_types.join(
    df_node_types.select(
        F.col("class_uri").alias("type_uri"),
        F.col("node_type").alias("label")
    ),
    "type_uri",
    "left"
)

# For types not in our node_types table, extract local name
df_types_labeled = df_types_labeled.withColumn(
    "label",
    F.coalesce(F.col("label"), extract_local_name_udf(F.col("type_uri")))
)

# Aggregate all labels per node
df_node_labels = df_types_labeled.groupBy("uri", "graph").agg(
    F.collect_set("label").alias("labels"),
    F.collect_set("type_uri").alias("type_uris")
)

print(f"Nodes with type declarations: {df_node_labels.count()}")

In [None]:
# Get property mapping info (which predicates are node props vs edges)
node_prop_predicates = set(
    row.property_uri for row in 
    df_properties.filter(F.col("mapping_type") == "node_property").collect()
)
edge_predicates = set(
    row.property_uri for row in 
    df_properties.filter(F.col("mapping_type") == "edge").collect()
)

print(f"Node property predicates: {len(node_prop_predicates)}")
print(f"Edge predicates: {len(edge_predicates)}")

In [None]:
# Extract node properties (datatype property triples)
# Include rdfs:label, skos:prefLabel, and mapped DatatypeProperties
label_predicates = [RDFS_LABEL, SKOS_PREFLABEL]
all_node_prop_predicates = node_prop_predicates.union(set(label_predicates))

df_node_props = df_instance_triples.filter(
    (F.col("predicate").isin(list(all_node_prop_predicates))) |
    (F.col("object_type") == "literal")  # All literals are node properties
).filter(
    F.col("predicate") != RDF_TYPE  # Exclude type triples
)

print(f"Node property triples: {df_node_props.count()}")

In [None]:
# Prepare property values with names
df_node_props = df_node_props.withColumn(
    "property_name",
    sanitize_property_name_udf(extract_local_name_udf(F.col("predicate")))
)

# Handle multi-valued properties by collecting into arrays
# Group by subject and property name
df_props_grouped = df_node_props.groupBy("subject", "graph", "property_name").agg(
    F.collect_list("object").alias("values"),
    F.first("datatype").alias("datatype"),
    F.first("lang").alias("lang")
)

# For single-valued properties, unwrap from array
df_props_grouped = df_props_grouped.withColumn(
    "value",
    F.when(F.size("values") == 1, F.col("values")[0]).otherwise(F.to_json(F.col("values")))
)

df_props_grouped.show(5, truncate=60)

In [None]:
# Pivot properties to create property map per node
df_props_pivot = df_props_grouped.groupBy("subject", "graph").pivot("property_name").agg(
    F.first("value")
)

# Get all property columns (everything except subject and graph)
prop_cols = [c for c in df_props_pivot.columns if c not in ["subject", "graph"]]
print(f"Property columns extracted: {len(prop_cols)}")

# Create properties map column
df_props_map = df_props_pivot.withColumn(
    "properties",
    F.create_map(
        *[item for col in prop_cols for item in (F.lit(col), F.col(col))]
    ) if prop_cols else F.lit(None).cast(MapType(StringType(), StringType()))
)

# Filter out null values from map
if prop_cols:
    df_props_map = df_props_map.withColumn(
        "properties",
        F.expr("map_filter(properties, (k, v) -> v IS NOT NULL)")
    )

In [None]:
# Build final nodes table
df_nodes = df_subjects.alias("s").join(
    df_node_labels.alias("l"),
    (F.col("s.uri") == F.col("l.uri")) & (F.col("s.graph") == F.col("l.graph")),
    "left"
).join(
    df_props_map.select("subject", "graph", "properties").alias("p"),
    (F.col("s.uri") == F.col("p.subject")) & (F.col("s.graph") == F.col("p.graph")),
    "left"
).select(
    F.col("s.node_id").alias("id"),
    F.col("s.uri"),
    F.coalesce(F.col("l.labels"), F.array(F.lit("Entity"))).alias("labels"),
    F.col("l.type_uris"),
    F.col("p.properties"),
    F.col("s.uri").startswith("_:").alias("is_blank_node"),
    F.col("s.graph").alias("source_graph")
)

# Add display name from properties (prefer label > prefLabel > id)
df_nodes = df_nodes.withColumn(
    "display_name",
    F.coalesce(
        F.col("properties")["label"],
        F.col("properties")["prefLabel"],
        F.col("id")
    )
)

print(f"Total nodes: {df_nodes.count()}")

In [None]:
# Show sample nodes
print("\n" + "=" * 60)
print("SAMPLE NODES")
print("=" * 60)

df_nodes.select("id", "display_name", "labels", "source_graph").show(15, truncate=40)

In [None]:
# Show nodes with properties
print("\nSample node properties:")
df_nodes.filter(F.col("properties").isNotNull()).select(
    "id", "labels", "properties"
).show(10, truncate=80)

In [None]:
# Build node ID lookup for edge creation
node_id_map = df_subjects.select("uri", "node_id").distinct()

In [None]:
# Extract edges (object property triples)
# An edge connects subject → object where both are URIs (not literals)
df_edge_triples = df_instance_triples.filter(
    (F.col("object_type") == "uri") &
    (F.col("predicate") != RDF_TYPE) &
    (~F.col("predicate").isin(list(SCHEMA_PREDICATES)))
)

print(f"Edge candidate triples: {df_edge_triples.count()}")

In [None]:
# Build edges with source and target IDs
df_edges = df_edge_triples.alias("e").join(
    node_id_map.alias("src"),
    F.col("e.subject") == F.col("src.uri"),
    "inner"
).join(
    node_id_map.alias("tgt"),
    F.col("e.object") == F.col("tgt.uri"),
    "inner"
).select(
    F.col("src.node_id").alias("source_id"),
    F.col("tgt.node_id").alias("target_id"),
    sanitize_property_name_udf(extract_local_name_udf(F.col("e.predicate"))).alias("type"),
    F.col("e.predicate").alias("predicate_uri"),
    F.col("e.subject").alias("source_uri"),
    F.col("e.object").alias("target_uri"),
    F.col("e.graph").alias("source_graph")
)

# Generate stable edge ID
df_edges = df_edges.withColumn(
    "id",
    F.concat(
        F.col("source_id"),
        F.lit("_"),
        F.col("type"),
        F.lit("_"),
        F.col("target_id")
    )
)

print(f"Total edges: {df_edges.count()}")

In [None]:
# Show sample edges
print("\n" + "=" * 60)
print("SAMPLE EDGES")
print("=" * 60)

df_edges.select("source_id", "type", "target_id", "source_graph").show(15, truncate=30)

In [None]:
# Edge type distribution
print("\nEdge type distribution:")
df_edges.groupBy("type").count().orderBy(F.desc("count")).show(20, truncate=40)

In [None]:
# Validation
print("\n" + "=" * 60)
print("VALIDATION")
print("=" * 60)

# Check for orphan edges (edges to nodes not in our node set)
node_ids = set(row.id for row in df_nodes.select("id").distinct().collect())
edge_source_ids = set(row.source_id for row in df_edges.select("source_id").distinct().collect())
edge_target_ids = set(row.target_id for row in df_edges.select("target_id").distinct().collect())

orphan_sources = edge_source_ids - node_ids
orphan_targets = edge_target_ids - node_ids

if orphan_sources:
    print(f"WARNING: {len(orphan_sources)} edges have source IDs not in nodes")
else:
    print("[OK] All edge sources exist in nodes")

if orphan_targets:
    print(f"WARNING: {len(orphan_targets)} edges have target IDs not in nodes")
else:
    print("[OK] All edge targets exist in nodes")

# Check for duplicate node IDs
dup_nodes = df_nodes.groupBy("id").count().filter(F.col("count") > 1)
if dup_nodes.count() > 0:
    print(f"WARNING: {dup_nodes.count()} duplicate node IDs")
    dup_nodes.show(5)
else:
    print("[OK] No duplicate node IDs")

# Check blank nodes
blank_count = df_nodes.filter(F.col("is_blank_node") == True).count()
print(f"\nBlank nodes: {blank_count}")

In [None]:
# Summary statistics
print("\n" + "=" * 60)
print("SUMMARY")
print("=" * 60)

total_nodes = df_nodes.count()
total_edges = df_edges.count()
nodes_with_props = df_nodes.filter(F.size("properties") > 0).count()
nodes_with_labels = df_nodes.filter(F.size("labels") > 0).count()
unique_labels = df_nodes.select(F.explode("labels").alias("label")).distinct().count()
unique_edge_types = df_edges.select("type").distinct().count()

print(f"\nNodes: {total_nodes}")
print(f"  - With properties: {nodes_with_props}")
print(f"  - With labels: {nodes_with_labels}")
print(f"  - Unique label types: {unique_labels}")
print(f"\nEdges: {total_edges}")
print(f"  - Unique edge types: {unique_edge_types}")

# Per-graph breakdown
print("\nPer-graph breakdown:")
df_nodes.groupBy("source_graph").count().withColumnRenamed("count", "nodes").join(
    df_edges.groupBy("source_graph").count().withColumnRenamed("count", "edges"),
    "source_graph",
    "outer"
).show()

In [None]:
# Save nodes to Delta table
df_nodes.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(OUTPUT_NODES)

print(f"Saved {total_nodes} nodes to '{OUTPUT_NODES}'")

In [None]:
# Save edges to Delta table
df_edges.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(OUTPUT_EDGES)

print(f"Saved {total_edges} edges to '{OUTPUT_EDGES}'")

In [None]:
print("\n" + "=" * 60)
print("INSTANCE TRANSLATION COMPLETE")
print("=" * 60)
print(f"\nOutput tables:")
print(f"  - {OUTPUT_NODES}: {total_nodes} nodes")
print(f"  - {OUTPUT_EDGES}: {total_edges} edges")
print(f"\nNext: Run 06_graph_builder to generate Graph Model JSON")