# Film-KG — TSV-only Pipeline + Rekursive Traversierung

Dieses Notebook:
1) Lädt deine `movie_kg_triples.tsv`.
2) **Backup** der TSV-Datei anlegen.
3) Vorverarbeitung: kanonisierte Character-URIs `ex:char/<slug>` + `ex:featuresCharacter` **ins TSV anhängen** (und in den In-Memory-Graph einfügen).
4) Ableitung:
   - `ex:SAME_UNIVERSE` (Self-Join über Character) **nur TSV-Append**
   - `ex:CREATIVE_PAIR` (Director×Actor ≥2) **nur TSV-Append**
5) **Rekursive Traversierung** über `ex:SAME_UNIVERSE` (SPARQL Property Path **und** Python-BFS).

## 0) Setup & Konfiguration

In [10]:
!python -c "import rdflib" 2>/dev/null || pip -q install rdflib==7.0.0
from rdflib import Graph, Namespace, URIRef, Literal
from rdflib.namespace import RDF, RDFS, XSD
from pathlib import Path
from collections import defaultdict
from itertools import combinations
import shutil, re
from datetime import datetime

DATA_PATH = Path("../data/kg/triples/movie_kg_triples.tsv")  # bestehende KG-Datei (TSV)
OUT_DIR = Path("../data/kg/triples"); OUT_DIR.mkdir(parents=True, exist_ok=True)
BATCH_SIZE = 50000  # Tripel je Ausgabedatei

SCHEMA = Namespace("http://schema.org/")
EX     = Namespace("http://example.org/")
CHAR_NS = Namespace(str(EX) + "char/")

print("Data:", DATA_PATH.resolve())
print("Output:", OUT_DIR.resolve())


Data: /Users/tschaffel/PycharmProjects/letterboxd-KG/data/kg/triples/movie_kg_triples.tsv
Output: /Users/tschaffel/PycharmProjects/letterboxd-KG/data/kg/triples


## 1) Daten laden (robuster TSV-Parser)

In [11]:
g = Graph(); g.bind("schema", SCHEMA); g.bind("ex", EX); g.bind("rdf", RDF)
prefix_map = {"schema": str(SCHEMA), "rdf": str(RDF), "rdfs": str(RDFS), "xsd": str(XSD), "ex": str(EX)}

def parse_term(term: str):
    term = term.strip()
    if len(term) >= 2 and term[0] == '"' and term[-1] == '"':
        return Literal(term[1:-1])
    if term.startswith("http://") or term.startswith("https://"):
        return URIRef(term)
    if ":" in term:
        pfx, local = term.split(":", 1)
        if pfx in prefix_map:
            return URIRef(prefix_map[pfx] + local)
    return Literal(term)

count = 0
with DATA_PATH.open("r", encoding="utf-8") as f:
    for line in f:
        line = line.strip()
        if not line or line.startswith("#"):
            continue
        parts = line.split("\t")
        if len(parts) != 3:
            continue
        s, p, o = map(parse_term, parts)
        g.add((s, p, o))
        count += 1
print("Geladene Tripel:", count)
print("Beispiel-Tripel:")
for i, (s,p,o) in enumerate(g):
    print("-", s, p, o)
    if i >= 4: break


Geladene Tripel: 87251
Beispiel-Tripel:
- movie9397 http://schema.org/actor person175533
- person2512504 http://www.w3.org/1999/02/22-rdf-syntax-ns#type http://schema.org/Person
- movie58151 http://schema.org/duration runtime106
- movie581997 http://schema.org/name Batman vs Teenage Mutant Ninja Turtles
- movie7191 http://schema.org/actor person6858


## 2) Backup & TSV-Append-Helfer

In [12]:
BACKUP_DIR = OUT_DIR / "backups"; BACKUP_DIR.mkdir(parents=True, exist_ok=True)
BACKUP_PATH = BACKUP_DIR / f"movie_kg_triples_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.tsv"

# Backup anlegen
shutil.copy2(DATA_PATH, BACKUP_PATH)
print("Backup gespeichert:", BACKUP_PATH)

def term_to_str(term):
    if isinstance(term, URIRef):
        # Versuche Prefix-Kurzform
        for pfx, ns in prefix_map.items():
            if str(term).startswith(ns):
                return f"{pfx}:{str(term)[len(ns):]}"
        return str(term)
    elif isinstance(term, Literal):
        s = str(term).replace('"', '\"')
        return f'"{s}"'
    else:
        return str(term)

def append_triples_tsv(triples):
    with DATA_PATH.open("a", encoding="utf-8") as f:
        for (s,p,o) in triples:
            f.write(term_to_str(s) + "\t" + term_to_str(p) + "\t" + term_to_str(o) + "\n")


Backup gespeichert: ../data/kg/triples/backups/movie_kg_triples_backup_20250916_231813.tsv


## 3) Vorverarbeitung: Character-URIs + `ex:featuresCharacter` (TSV-only)

In [13]:
canon_re = re.compile(r"\s*\([^)]*\)")
def canonize(text: str) -> str:
    s = canon_re.sub("", text)
    return s.strip().lower()

def slugify(text: str) -> str:
    t = re.sub(r"[^a-z0-9]+", "-", text)
    t = re.sub(r"-+", "-", t).strip("-")
    return t or "x"

char_uri_by_canon = {}
added_nodes = 0; added_edges = 0

new_feature_triples = []  # für optionales Mitschreiben in TSV

for f, _, ch in g.triples((None, SCHEMA.character, None)):
    c = canonize(str(ch))
    if not c:
        continue
    uri = char_uri_by_canon.get(c)
    if uri is None:
        uri = URIRef(CHAR_NS + slugify(c))
        char_uri_by_canon[c] = uri
        if (uri, RDF.type, EX.Character) not in g:
            g.add((uri, RDF.type, EX.Character))
            g.add((uri, EX.canonName, Literal(c)))
            new_feature_triples.append((uri, RDF.type, EX.Character))
            new_feature_triples.append((uri, EX.canonName, Literal(c)))
            added_nodes += 1
    if (f, EX.featuresCharacter, uri) not in g:
        g.add((f, EX.featuresCharacter, uri))
        new_feature_triples.append((f, EX.featuresCharacter, uri))
        added_edges += 1

print("Neue Character-Knoten:", added_nodes)
print("Neue featuresCharacter-Kanten:", added_edges)

# Optional: diese neuen Vorverarbeitungs-Tripel direkt ins TSV anhängen
#if new_feature_triples:
#    append_triples_tsv(new_feature_triples)
#    print("Vorverarbeitungs-Tripel ins TSV angehängt:", len(new_feature_triples))


Neue Character-Knoten: 5392
Neue featuresCharacter-Kanten: 6645


## 4) SAME_UNIVERSE (Self-Join über Character) — **TSV-only**

In [14]:
from itertools import combinations
films_by_char = defaultdict(list)
for f, _, c in g.triples((None, EX.featuresCharacter, None)):
    films_by_char[c].append(f)

new_su_triples = []
seen = set()
for char_uri, films in films_by_char.items():
    if len(films) < 2:
        continue
    films_sorted = sorted(set(films), key=str)
    for f1, f2 in combinations(films_sorted, 2):
        key = (str(f1), str(f2))
        if key in seen:
            continue
        seen.add(key)
        triple = (f1, EX.sameUniverse, f2)
        new_su_triples.append(triple)
        g.add(triple)  # auch in-memory für Traversierung

append_triples_tsv(new_su_triples)
print("SAME_UNIVERSE Tripel angehängt:", len(new_su_triples))


SAME_UNIVERSE Tripel angehängt: 1576


## 5) CREATIVE_PAIR (Director×Actor ≥2) — **TSV-only**

In [15]:
pair_counts = defaultdict(int)
for f, _, d in g.triples((None, SCHEMA.director, None)):
    for _, _, a in g.triples((f, SCHEMA.actor, None)):
        pair_counts[(d, a)] += 1

new_cp_triples = []
for (d, a), n in pair_counts.items():
    if n >= 2:
        new_cp_triples.append((d, EX.creativePair, a))
        new_cp_triples.append((d, EX.creativePairRoles, Literal("Director,Actor")))
        new_cp_triples.append((d, EX.creativePairCount, Literal(n)))
        g.add((d, EX.creativePair, a))
        g.add((d, EX.creativePairRoles, Literal("Director,Actor")))
        g.add((d, EX.creativePairCount, Literal(n)))

append_triples_tsv(new_cp_triples)
print("CREATIVE_PAIR Tripel angehängt:", len(new_cp_triples))


CREATIVE_PAIR Tripel angehängt: 2763


## 6) Rekursive Traversierung über `ex:SAME_UNIVERSE`


In [38]:
from rdflib import URIRef, Literal

def _resolve_seed(seed_str: str):
    s = seed_str.strip()
    if s.startswith("http://") or s.startswith("https://"):
        term = URIRef(s)
        return term, f"<{term}>"
    uri = URIRef(str(EX) + s)
    if (uri, None, None) in g or (None, None, uri) in g:
        return uri, f"<{uri}>"
    lit = Literal(s)
    return lit, f"\"{lit}\""

def sparql_same_universe_bidir(seed_str: str, limit: int = 500, include_seed: bool = False):
    seed_term, sparql_seed = _resolve_seed(seed_str)
    q = (
            "PREFIX ex: <http://example.org/>\n"
            "SELECT DISTINCT ?g WHERE {\n"
            f"  VALUES ?seed {{ {sparql_seed} }}\n"
            "  ?seed (ex:sameUniverse | ^ex:sameUniverse)+ ?g .\n"
            "} LIMIT " + str(limit)
    )
    rows = [row[0] for row in g.query(q)]
    if not include_seed:
        rows = [uri for uri in rows if uri != seed_term]
    return [str(u) for u in rows]

res = sparql_same_universe_bidir("movie452522", limit=1000, include_seed=False)
print(len(res), "Filme im selben Universe")
res[:10]


1 Filme im selben Universe


['movie1923']

## 7) Strong pair

In [43]:
# --- Rule 3: Strong Cast Pair (SQL version; Movie -> Actor direction fixed) ---
import pandas as pd
import sqlite3
import re

# ===================== CONFIG =====================
TRIPLES_TSV       = "../data/kg/triples/movie_kg_triples.tsv"  # path to your triples
CAST_PREDICATE    = "schema:actor"              # <-- set your movie->actor predicate here
MIN_COAPPS        = 3                       # threshold for a "strong" pair
MAKE_SYMMETRIC    = True                    # also emit B->A for strongCastWith
OUT_STRONG_PAIRS  = "../data/kg/triples/derived_strong_cast_pairs.tsv"
OUT_FEATURE_ANN   = "../data/kg/triples/derived_features_strong_cast.tsv"
# ==================================================

# ---------- Robust load of triples -> (subject, predicate, object) -------------
def load_triples(path):
    # Try TSV with header; if not, fallback strategies
    try:
        df = pd.read_csv(path, sep="\t", dtype="string")
    except Exception:
        df = pd.read_csv(path, sep=",", dtype="string")

    cols = [c.strip().lower() for c in df.columns.tolist()]
    # common header variants
    mapping_opts = [
        {"subject":"subject","predicate":"predicate","object":"object"},
        {"s":"subject","p":"predicate","o":"object"},
        {"head":"subject","relation":"predicate","tail":"object"},
        {"h":"subject","r":"predicate","t":"object"},
        {"source":"subject","relation":"predicate","target":"object"},
        {"from":"subject","edge":"predicate","to":"object"},
    ]
    applied = None
    for cand in mapping_opts:
        if set(cand.keys()).issubset(set(cols)):
            applied = {}
            for k,v in cand.items():
                # find real column name as present
                real = df.columns[cols.index(k)]
                applied[real] = v
            break

    if applied:
        df = df.rename(columns=applied)
    else:
        # if no recognizable headers, assume first three columns are s/p/o
        if df.shape[1] < 3:
            # final fallback: read as no-header
            df = pd.read_csv(path, sep="\t", header=None, dtype="string")
            if df.shape[1] < 3:
                df = pd.read_csv(path, sep=",", header=None, dtype="string")
            if df.shape[1] < 3:
                raise ValueError("Could not detect three columns for triples.")
        df = df.iloc[:, :3].copy()
        df.columns = ["subject","predicate","object"]

    # Canonicalize & drop NA
    df = df[["subject","predicate","object"]].astype("string").dropna()
    return df

trip = load_triples(TRIPLES_TSV)

# ---------- Filter to Movie -> Actor predicate (direction fixed) ---------------
cast = (
    trip.loc[trip["predicate"] == CAST_PREDICATE, ["subject","object"]]
    .dropna()
    .rename(columns={"subject":"movie","object":"actor"})
    .drop_duplicates()
    .astype("string")
)

if cast.empty:
    raise ValueError(f"No triples found with predicate '{CAST_PREDICATE}'. "
                     f"Check CAST_PREDICATE or your triples file.")

# Optional: strip angle brackets if your IRIs are like <...>
# cast["movie"] = cast["movie"].str.replace(r"^<|>$", "", regex=True)
# cast["actor"] = cast["actor"].str.replace(r"^<|>$", "", regex=True)

# ---------- SQL pipeline in SQLite --------------------------------------------
con = sqlite3.connect(":memory:")
cast.to_sql("cast", con, index=False, if_exists="replace")

# 1) All unordered actor pairs within each movie (actorA < actorB to canonicalize)
con.execute("""
            CREATE INDEX IF NOT EXISTS idx_cast_movie ON cast(movie);
            """)

# 2) Co-appearance counts across all movies
#    - Build pairs per movie via self-join
#    - Count distinct movies per pair
pair_counts_sql = f"""
WITH pairs AS (
  SELECT c1.movie AS movie,
         CASE WHEN c1.actor < c2.actor THEN c1.actor ELSE c2.actor END AS actorA,
         CASE WHEN c1.actor < c2.actor THEN c2.actor ELSE c1.actor END AS actorB
  FROM cast c1
  JOIN cast c2
    ON c1.movie = c2.movie
   AND c1.actor < c2.actor            -- ensures unordered pair once per movie
)
SELECT actorA, actorB, COUNT(DISTINCT movie) AS coapps
FROM pairs
GROUP BY actorA, actorB
HAVING coapps >= {int(MIN_COAPPS)};
"""

df_strong = pd.read_sql_query(pair_counts_sql, con)

# 3) Emit strongCastWith triples (optionally symmetric)
if df_strong.empty:
    trip_strong_pairs = pd.DataFrame(columns=["subject","predicate","object"])
else:
    base = pd.DataFrame({
        "subject": df_strong["actorA"],
        "predicate": "strongCastWith",
        "object": df_strong["actorB"]
    })
    if MAKE_SYMMETRIC:
        sym = pd.DataFrame({
            "subject": df_strong["actorB"],
            "predicate": "strongCastWith",
            "object": df_strong["actorA"]
        })
        trip_strong_pairs = pd.concat([base, sym], ignore_index=True).drop_duplicates()
    else:
        trip_strong_pairs = base

trip_strong_pairs.to_csv(OUT_STRONG_PAIRS, sep="\t", index=False)

# 4) Annotate movies that feature at least one strong pair
#    Reuse the CTE of strong pairs and join with within-movie pairs
feature_sql = f"""
WITH pairs AS (
  SELECT c1.movie AS movie,
         CASE WHEN c1.actor < c2.actor THEN c1.actor ELSE c2.actor END AS actorA,
         CASE WHEN c1.actor < c2.actor THEN c2.actor ELSE c1.actor END AS actorB
  FROM cast c1
  JOIN cast c2
    ON c1.movie = c2.movie
   AND c1.actor < c2.actor
),
strong AS (
  SELECT actorA, actorB
  FROM (
    SELECT CASE WHEN c1.actor < c2.actor THEN c1.actor ELSE c2.actor END AS actorA,
           CASE WHEN c1.actor < c2.actor THEN c2.actor ELSE c1.actor END AS actorB,
           COUNT(DISTINCT c1.movie) AS coapps
    FROM cast c1
    JOIN cast c2
      ON c1.movie = c2.movie
     AND c1.actor < c2.actor
    GROUP BY actorA, actorB
  )
  WHERE coapps >= {int(MIN_COAPPS)}
)
SELECT p.movie, p.actorA, p.actorB
FROM pairs p
JOIN strong s
  ON p.actorA = s.actorA AND p.actorB = s.actorB;
"""

df_feats = pd.read_sql_query(feature_sql, con)

if df_feats.empty:
    trip_features = pd.DataFrame(columns=["subject","predicate","object"])
else:
    trip_features = pd.DataFrame({
        "subject": df_feats["movie"],
        "predicate": "featuresStrongCastPair",
        "object": (df_feats["actorA"] + "::" + df_feats["actorB"])
    }).drop_duplicates()

trip_features.to_csv(OUT_FEATURE_ANN, sep="\t", index=False)

con.close()

# ---------- Report -------------------------------------------------------------
print("✅ Strong cast (SQL) rule applied.")
print(f"Cast edges (Movie -> Actor) loaded: {len(cast):,}")
print(f"Strong pairs (canonical, coapps ≥ {MIN_COAPPS}): {len(df_strong):,}")
print(f"Wrote: {OUT_STRONG_PAIRS}  ({len(trip_strong_pairs):,} triples)")
print(f"Wrote: {OUT_FEATURE_ANN}   ({len(trip_features):,} triples)")


✅ Strong cast (SQL) rule applied.
Cast edges (Movie -> Actor) loaded: 12,586
Strong pairs (canonical, coapps ≥ 3): 674
Wrote: ../data/kg/triples/derived_strong_cast_pairs.tsv  (1,348 triples)
Wrote: ../data/kg/triples/derived_features_strong_cast.tsv   (2,445 triples)


In [46]:
# --- Rule 3 (SQL) with prefixed preds + castPairIDs ---------------------------
import pandas as pd
import sqlite3

# ===================== CONFIG =====================
TRIPLES_TSV         = "../data/kg/triples/movie_kg_triples.tsv"   # path to your triples
CAST_PREDICATE      = "schema:actor"               # movie -> actor predicate in your KG
MIN_COAPPS          = 3                        # threshold for strong pair
MAKE_SYMMETRIC      = True                     # also write B->A for strongCastWith
PRED_STRONG         = "ex:strongCastWith"
PRED_FEATURES       = "ex:featuresStrongCastPair"
PRED_CASTPAIR_ACTOR = "schema:actor"

OUT_STRONG_PAIRS    = "../data/kg/triples/derived_strong_cast_pairs.tsv"
OUT_FEATURE_ANN     = "../data/kg/triples/derived_features_strong_cast.tsv"
OUT_CASTPAIR_NODES  = "../data/kg/triples/derived_castpair_nodes.tsv"
# ==================================================

# ---------- Robust load of triples -> (subject, predicate, object) -------------
def load_triples(path):
    # Try TSV, fallback to CSV
    try:
        df = pd.read_csv(path, sep="\t", dtype="string")
    except Exception:
        df = pd.read_csv(path, sep=",", dtype="string")

    cols = [c.strip().lower() for c in df.columns.tolist()]
    mapping_opts = [
        {"subject":"subject","predicate":"predicate","object":"object"},
        {"s":"subject","p":"predicate","o":"object"},
        {"head":"subject","relation":"predicate","tail":"object"},
        {"h":"subject","r":"predicate","t":"object"},
        {"source":"subject","relation":"predicate","target":"object"},
        {"from":"subject","edge":"predicate","to":"object"},
    ]
    applied = None
    for cand in mapping_opts:
        if set(cand.keys()).issubset(set(cols)):
            applied = {}
            for k,v in cand.items():
                real = df.columns[cols.index(k)]
                applied[real] = v
            break
    if applied:
        df = df.rename(columns=applied)
    else:
        # assume first three columns are s/p/o
        if df.shape[1] < 3:
            # try reading as no header
            df = pd.read_csv(path, sep="\t", header=None, dtype="string")
            if df.shape[1] < 3:
                df = pd.read_csv(path, sep=",", header=None, dtype="string")
            if df.shape[1] < 3:
                raise ValueError("Could not detect three columns for triples.")
        df = df.iloc[:, :3].copy()
        df.columns = ["subject","predicate","object"]

    return df[["subject","predicate","object"]].astype("string").dropna()

trip = load_triples(TRIPLES_TSV)

# ---------- Filter to Movie -> Actor predicate (direction fixed) ---------------
cast = (
    trip.loc[trip["predicate"] == CAST_PREDICATE, ["subject","object"]]
    .dropna()
    .rename(columns={"subject":"movie","object":"actor"})
    .drop_duplicates()
    .astype("string")
)

if cast.empty:
    raise ValueError(f"No triples found with predicate '{CAST_PREDICATE}' (Movie -> Actor).")

# ---------- SQL pipeline in SQLite --------------------------------------------
con = sqlite3.connect(":memory:")
cast.to_sql("cast", con, index=False, if_exists="replace")
con.execute("CREATE INDEX IF NOT EXISTS idx_cast_movie ON cast(movie);")

# Pairs per movie (unordered via actorA < actorB) and strong pair detection
pair_counts_sql = f"""
WITH pairs AS (
  SELECT c1.movie AS movie,
         CASE WHEN c1.actor < c2.actor THEN c1.actor ELSE c2.actor END AS actorA,
         CASE WHEN c1.actor < c2.actor THEN c2.actor ELSE c1.actor END AS actorB
  FROM cast c1
  JOIN cast c2
    ON c1.movie = c2.movie
   AND c1.actor < c2.actor
)
SELECT actorA, actorB, COUNT(DISTINCT movie) AS coapps
FROM pairs
GROUP BY actorA, actorB
HAVING coapps >= {int(MIN_COAPPS)};
"""
df_strong = pd.read_sql_query(pair_counts_sql, con)

# If no strong pairs, write empty outputs and finish
if df_strong.empty:
    pd.DataFrame(columns=["subject","predicate","object"]).to_csv(OUT_STRONG_PAIRS, sep="\t", index=False)
    pd.DataFrame(columns=["subject","predicate","object"]).to_csv(OUT_FEATURE_ANN,  sep="\t", index=False)
    pd.DataFrame(columns=["subject","predicate","object"]).to_csv(OUT_CASTPAIR_NODES, sep="\t", index=False)
    con.close()
    print("✅ Strong cast (SQL) rule applied. No strong pairs found.")
    print(f"Wrote: {OUT_STRONG_PAIRS} (0), {OUT_FEATURE_ANN} (0), {OUT_CASTPAIR_NODES} (0)")
else:
    # ---------- Assign deterministic castPairIDs (castPair1, castPair2, ...) ----
    # Sort for stable numbering
    df_strong = df_strong.sort_values(["actorA","actorB"]).reset_index(drop=True)
    df_strong["castPairID"] = ["castPair{}".format(i+1) for i in range(len(df_strong))]

    # ---------- (1) ex:strongCastWith triples between the two actors -----------
    base = pd.DataFrame({
        "subject": df_strong["actorA"],
        "predicate": PRED_STRONG,
        "object": df_strong["actorB"]
    })
    if MAKE_SYMMETRIC:
        sym = pd.DataFrame({
            "subject": df_strong["actorB"],
            "predicate": PRED_STRONG,
            "object": df_strong["actorA"]
        })
        trip_strong_pairs = pd.concat([base, sym], ignore_index=True).drop_duplicates()
    else:
        trip_strong_pairs = base
    trip_strong_pairs.to_csv(OUT_STRONG_PAIRS, sep="\t", index=False)

    # ---------- (2) ex:featuresStrongCastPair: Movie -> castPairID -------------
    # Recompute movie-level pairs in SQL and join to strong pairs to know which movie has which castPairID
    movie_pairs_sql = """
                      WITH pairs AS (
                          SELECT c1.movie AS movie,
                                 CASE WHEN c1.actor < c2.actor THEN c1.actor ELSE c2.actor END AS actorA,
                                 CASE WHEN c1.actor < c2.actor THEN c2.actor ELSE c1.actor END AS actorB
                          FROM cast c1
                                   JOIN cast c2
                                        ON c1.movie = c2.movie
                                            AND c1.actor < c2.actor
                      )
                      SELECT movie, actorA, actorB
                      FROM pairs; \
                      """
    df_movie_pairs = pd.read_sql_query(movie_pairs_sql, con)

    # Join movie pairs with strong list to get castPairID per movie
    df_join = df_movie_pairs.merge(
        df_strong[["actorA","actorB","castPairID"]],
        on=["actorA","actorB"],
        how="inner"
    ).drop_duplicates()

    trip_features = pd.DataFrame({
        "subject": df_join["movie"],
        "predicate": PRED_FEATURES,
        "object": df_join["castPairID"]
    }).drop_duplicates()
    trip_features.to_csv(OUT_FEATURE_ANN, sep="\t", index=False)

    # ---------- (3) CastPair node → schema:actor → Person triples --------------
    # For each castPairID, add two triples linking to the two actors
    cp_actor_a = pd.DataFrame({
        "subject": df_strong["castPairID"],
        "predicate": PRED_CASTPAIR_ACTOR,
        "object": df_strong["actorA"]
    })
    cp_actor_b = pd.DataFrame({
        "subject": df_strong["castPairID"],
        "predicate": PRED_CASTPAIR_ACTOR,
        "object": df_strong["actorB"]
    })
    trip_castpair_nodes = pd.concat([cp_actor_a, cp_actor_b], ignore_index=True).drop_duplicates()
    trip_castpair_nodes.to_csv(OUT_CASTPAIR_NODES, sep="\t", index=False)

    con.close()

    # ---------- Report ---------------------------------------------------------
    print("✅ Strong cast (SQL) rule applied.")
    print(f"Strong pairs (canonical, coapps ≥ {MIN_COAPPS}): {len(df_strong):,}")
    print(f"Wrote:")
    print(f"  - {OUT_STRONG_PAIRS}     ({len(trip_strong_pairs):,} triples, {PRED_STRONG})")
    print(f"  - {OUT_FEATURE_ANN}      ({len(trip_features):,} triples, {PRED_FEATURES})")
    print(f"  - {OUT_CASTPAIR_NODES}   ({len(trip_castpair_nodes):,} triples, {PRED_CASTPAIR_ACTOR})")


✅ Strong cast (SQL) rule applied.
Strong pairs (canonical, coapps ≥ 3): 674
Wrote:
  - ../data/kg/triples/derived_strong_cast_pairs.tsv     (1,348 triples, ex:strongCastWith)
  - ../data/kg/triples/derived_features_strong_cast.tsv      (2,445 triples, ex:featuresStrongCastPair)
  - ../data/kg/triples/derived_castpair_nodes.tsv   (1,348 triples, schema:actor)


In [47]:
# --- Rule 3 (SQL) -> append new triples directly to movie_kg_triples.tsv ------
import pandas as pd
import sqlite3
import os, csv, shutil
from datetime import datetime

# ===================== CONFIG =====================
TRIPLES_TSV         = "../data/kg/triples/movie_kg_triples.tsv"   # dein KG
CAST_PREDICATE      = "schema:actor"   # Movie -> Actor
MIN_COAPPS          = 3                # threshold for strong pair
MAKE_SYMMETRIC      = True             # B->A für ex:strongCastWith
PRED_STRONG         = "ex:strongCastWith"
PRED_FEATURES       = "ex:featuresStrongCastPair"
PRED_CASTPAIR_ACTOR = "schema:actor"

MAKE_BACKUP         = True             # vor dem Anhängen Backup erzeugen
BACKUP_DIR          = "../data/kg/triples/backups"  # wohin sichern
# ==================================================

# ---------- helpers: load triples + safe append --------------------------------
def load_triples(path):
    try:
        df = pd.read_csv(path, sep="\t", dtype="string")
    except Exception:
        df = pd.read_csv(path, sep=",", dtype="string")
    cols = [c.strip().lower() for c in df.columns.tolist()]
    mapping_opts = [
        {"subject":"subject","predicate":"predicate","object":"object"},
        {"s":"subject","p":"predicate","o":"object"},
        {"head":"subject","relation":"predicate","tail":"object"},
        {"h":"subject","r":"predicate","t":"object"},
        {"source":"subject","relation":"predicate","target":"object"},
        {"from":"subject","edge":"predicate","to":"object"},
    ]
    applied = None
    for cand in mapping_opts:
        if set(cand.keys()).issubset(set(cols)):
            applied = {}
            for k,v in cand.items():
                real = df.columns[cols.index(k)]
                applied[real] = v
            break
    if applied:
        df = df.rename(columns=applied)
    else:
        if df.shape[1] < 3:
            df = pd.read_csv(path, sep="\t", header=None, dtype="string")
            if df.shape[1] < 3:
                df = pd.read_csv(path, sep=",", header=None, dtype="string")
            if df.shape[1] < 3:
                raise ValueError("Could not detect three columns for triples.")
        df = df.iloc[:, :3].copy()
        df.columns = ["subject","predicate","object"]
    return df[["subject","predicate","object"]].astype("string").dropna()

def sanitize_cell(x: str) -> str:
    if pd.isna(x): return ""
    s = str(x)
    return s.replace("\t"," ").replace("\r"," ").replace("\n"," ").strip()

def append_triples_safely(df_triples: pd.DataFrame, path: str):
    # erwartet Spalten: subject, predicate, object
    df = df_triples[["subject","predicate","object"]].copy()
    for c in ["subject","predicate","object"]:
        df[c] = df[c].map(sanitize_cell)

    # optional: Backup
    if MAKE_BACKUP and os.path.exists(path):
        os.makedirs(BACKUP_DIR, exist_ok=True)
        ts = datetime.now().strftime("%Y%m%d_%H%M%S")
        backup_path = os.path.join(BACKUP_DIR, f"movie_kg_triples_backup_{ts}.tsv")
        shutil.copy2(path, backup_path)
        print("📂 Backup gespeichert unter:", backup_path)

    # falls Datei existiert, sicherstellen, dass sie mit \n endet
    if os.path.exists(path):
        with open(path, "rb") as f:
            try:
                f.seek(-1, os.SEEK_END)
                if f.read(1) != b"\n":
                    with open(path, "ab") as g:
                        g.write(b"\n")
            except OSError:
                pass  # leere Datei

    # tatsächlich anhängen (ohne Header!)
    df.to_csv(
        path,
        mode="a",
        header=False,
        index=False,
        sep="\t",
        lineterminator="\n",
        encoding="utf-8",
        quoting=csv.QUOTE_MINIMAL,
        escapechar="\\",
    )

# ---------- load triples + filter cast ----------------------------------------
trip = load_triples(TRIPLES_TSV)

cast = (
    trip.loc[trip["predicate"] == CAST_PREDICATE, ["subject","object"]]
    .dropna()
    .rename(columns={"subject":"movie","object":"actor"})
    .drop_duplicates()
    .astype("string")
)

if cast.empty:
    raise ValueError(f"No triples found with predicate '{CAST_PREDICATE}' (Movie -> Actor).")

# ---------- SQL pipeline in SQLite --------------------------------------------
con = sqlite3.connect(":memory:")
cast.to_sql("cast", con, index=False, if_exists="replace")
con.execute("CREATE INDEX IF NOT EXISTS idx_cast_movie ON cast(movie);")

pair_counts_sql = f"""
WITH pairs AS (
  SELECT c1.movie AS movie,
         CASE WHEN c1.actor < c2.actor THEN c1.actor ELSE c2.actor END AS actorA,
         CASE WHEN c1.actor < c2.actor THEN c2.actor ELSE c1.actor END AS actorB
  FROM cast c1
  JOIN cast c2
    ON c1.movie = c2.movie
   AND c1.actor < c2.actor
)
SELECT actorA, actorB, COUNT(DISTINCT movie) AS coapps
FROM pairs
GROUP BY actorA, actorB
HAVING coapps >= {int(MIN_COAPPS)};
"""
df_strong = pd.read_sql_query(pair_counts_sql, con)

if df_strong.empty:
    con.close()
    print("✅ Strong cast (SQL) rule angewendet. Keine starken Paare gefunden. Nichts angehängt.")
else:
    # stabile IDs
    df_strong = df_strong.sort_values(["actorA","actorB"]).reset_index(drop=True)
    df_strong["castPairID"] = [f"castPair{i+1}" for i in range(len(df_strong))]

    # (1) ex:strongCastWith
    base = pd.DataFrame({
        "subject":  df_strong["actorA"],
        "predicate": PRED_STRONG,
        "object":   df_strong["actorB"]
    })
    if MAKE_SYMMETRIC:
        sym = pd.DataFrame({
            "subject":  df_strong["actorB"],
            "predicate": PRED_STRONG,
            "object":   df_strong["actorA"]
        })
        trip_strong_pairs = pd.concat([base, sym], ignore_index=True).drop_duplicates()
    else:
        trip_strong_pairs = base

    # (2) ex:featuresStrongCastPair: Movie -> castPairID
    movie_pairs_sql = """
                      WITH pairs AS (
                          SELECT c1.movie AS movie,
                                 CASE WHEN c1.actor < c2.actor THEN c1.actor ELSE c2.actor END AS actorA,
                                 CASE WHEN c1.actor < c2.actor THEN c2.actor ELSE c1.actor END AS actorB
                          FROM cast c1
                                   JOIN cast c2
                                        ON c1.movie = c2.movie
                                            AND c1.actor < c2.actor
                      )
                      SELECT movie, actorA, actorB
                      FROM pairs; \
                      """
    df_movie_pairs = pd.read_sql_query(movie_pairs_sql, con)
    con.close()

    df_join = df_movie_pairs.merge(
        df_strong[["actorA","actorB","castPairID"]],
        on=["actorA","actorB"],
        how="inner"
    ).drop_duplicates()

    trip_features = pd.DataFrame({
        "subject":  df_join["movie"],
        "predicate": PRED_FEATURES,
        "object":   df_join["castPairID"]
    }).drop_duplicates()

    # (3) CastPair node -> schema:actor -> Person (2 Triples je Pair)
    cp_actor_a = pd.DataFrame({
        "subject":  df_strong["castPairID"],
        "predicate": PRED_CASTPAIR_ACTOR,
        "object":   df_strong["actorA"]
    })
    cp_actor_b = pd.DataFrame({
        "subject":  df_strong["castPairID"],
        "predicate": PRED_CASTPAIR_ACTOR,
        "object":   df_strong["actorB"]
    })
    trip_castpair_nodes = pd.concat([cp_actor_a, cp_actor_b], ignore_index=True).drop_duplicates()

    # --- combine all new triples and append to main KG ------------------------
    all_new = pd.concat(
        [trip_strong_pairs, trip_features, trip_castpair_nodes],
        ignore_index=True
    ).drop_duplicates()

    if all_new.empty:
        print("ℹ️ Keine neuen Triples zum Anhängen gefunden.")
    else:
        append_triples_safely(all_new, TRIPLES_TSV)
        print("✅ Neue Triples angehängt an:", TRIPLES_TSV)
        print(f"   + ex:strongCastWith:        {(trip_strong_pairs.shape[0]):,}")
        print(f"   + ex:featuresStrongCastPair:{(trip_features.shape[0]):,}")
        print(f"   + schema:actor (CastPair):  {(trip_castpair_nodes.shape[0]):,}")


📂 Backup gespeichert unter: ../data/kg/triples/backups/movie_kg_triples_backup_20250920_172308.tsv
✅ Neue Triples angehängt an: ../data/kg/triples/movie_kg_triples.tsv
   + ex:strongCastWith:        1,348
   + ex:featuresStrongCastPair:2,445
   + schema:actor (CastPair):  1,348
