# Movie Dialogue Analysis - Based on Three Star Wars Movies
***This part is done by Yufei Zhang(25405381)***

## 1. Extract Star Wars movied data from archive

In [12]:
import pandas as pd, re, unicodedata
from pathlib import Path


SCHEMAS = {
    "titles": ["movieID","title","year","rating","votes","genres"],
    "characters": ["characterID","character","movieID","movie","gender","position"],
    "lines": ["lineID","characterID","movieID","character","text"],
    "conversations": ["character1ID","character2ID","movieID","utteranceIDs"],
}

def load_tsv(path: Path, names):
    df = pd.read_csv(path, sep="\t", header=None, names=names, engine="python", on_bad_lines="skip")
    for c in names:
        if df[c].dtype == object:
            df[c] = df[c].astype(str).str.strip().str.strip("'").str.strip('"')
    return df

def norm(s):
    if not isinstance(s,str): return ""
    s = unicodedata.normalize("NFKC", s).lower()
    s = re.sub(r"\s+", " ", s)
    return s.strip()

titles = load_tsv("movie_titles_metadata.tsv", SCHEMAS["titles"])
characters = load_tsv("movie_characters_metadata.tsv", SCHEMAS["characters"])
lines = load_tsv("movie_lines.tsv", SCHEMAS["lines"])
convos = load_tsv("movie_conversations.tsv", SCHEMAS["conversations"])

titles["n"] = titles["title"].map(norm)

mask_iv_plain = (titles["n"]=="star wars") & (titles["year"].astype(str).str.contains("1977"))
mask_v  = titles["n"].str.contains("empire strikes back", na=False)
mask_vi = titles["n"].str.contains("return of the jedi", na=False)

tri = titles[mask_iv_plain | mask_v | mask_vi].copy()
sw_ids = set(tri["movieID"].astype(str))

sw_movies = titles[titles["movieID"].astype(str).isin(sw_ids)][["movieID","title","year","rating","votes","genres"]].drop_duplicates()
sw_characters = characters[characters["movieID"].astype(str).isin(sw_ids)].copy()
sw_lines = lines[lines["movieID"].astype(str).isin(sw_ids)].copy()
sw_convos = convos[convos["movieID"].astype(str).isin(sw_ids)].copy()

sw_movies.to_csv("starwars_core/sw_movies.csv", index=False, encoding="utf-8")
sw_characters.to_csv("starwars_core/sw_characters.csv", index=False, encoding="utf-8")
sw_lines.to_csv("starwars_core/sw_lines.csv", index=False, encoding="utf-8")
sw_convos.to_csv("starwars_core/sw_conversations.csv", index=False, encoding="utf-8")

len(sw_movies), len(sw_characters), len(sw_lines), len(sw_convos), sw_movies

(3,
 42,
 1115,
 371,
     movieID                                       title  year  rating  \
 337    m337          star wars: the empire strikes back  1982     8.0   
 489    m489  star wars: episode vi - return of the jedi  1983     8.3   
 529    m529                                   star wars  1977     8.8   
 
         votes                                        genres  
 337      42.0  ['animation' 'adventure' 'action' 'fantasy']  
 489  215058.0     ['action' 'adventure' 'fantasy' 'sci-fi']  
 529  326619.0     ['action' 'adventure' 'fantasy' 'sci-fi']  )

# 2. Carry out data preprocessing and cleaning. 
Since some roles have no clear direction, such as the Blue Leader, we will delete them.

***This part is done by Yufei Zhang(25405381)***

In [13]:
import os, time, pandas as pd

CORE_DIR = "starwars_core"
OUT_DIR  = "starwars_filtered"
os.makedirs(OUT_DIR, exist_ok=True)

def ok(p): 
    return "✅" if os.path.exists(p) else "❌"

print("Files check:")
print(" -", ok(f"{CORE_DIR}/sw_movies.csv"), f"{CORE_DIR}/sw_movies.csv")
print(" -", ok(f"{CORE_DIR}/sw_characters.csv"), f"{CORE_DIR}/sw_characters.csv")
print(" -", ok(f"{CORE_DIR}/sw_lines.csv"), f"{CORE_DIR}/sw_lines.csv")
print(" -", ok(f"{CORE_DIR}/sw_conversations.csv"), f"{CORE_DIR}/sw_conversations.csv")

t0 = time.time()
def log(msg):
    dt = time.time()-t0
    print(f"[{dt:6.1f}s] {msg}")


Files check:
 - ✅ starwars_core/sw_movies.csv
 - ✅ starwars_core/sw_characters.csv
 - ✅ starwars_core/sw_lines.csv
 - ✅ starwars_core/sw_conversations.csv


In [14]:
# buttons to control output
MAKE_NETWORK   = True
MAKE_WORDCLOUD = False   

# having pictures for these characters
avatar_whitelist = {
    "ackbar": "ackbar.png",
    "admiral piett": "admiral_piett.png",
    "ben kenobi": "ben_kenobi.png",
    "biggs": "biggs.png",
    "boushh": "boushh.png",
    "c-3po": "c3po.png",
    "chewie": "chewie.png",
    "han": "han.png",
    "lando": "lando.png",
    "leia": "leia.png",
    "luke": "luke.png",
    "ninedenine": "ninedenine.png",
    "owen": "owen.png",
    "rieekan": "rieekan.png",
    "vader": "vader.png",
    "wedge": "wedge.png",
    "yoda": "yoda.png",
}

# aliases for character names
aliases = {
    "threepio": "c-3po",
    "piett": "admiral piett",
    "darth vader": "vader",
    "princess leia": "leia",
    "lando calrissian": "lando",
    "ben": "ben kenobi",
}

import re
def norm_name(s: str) -> str:
    return re.sub(r"\s+", " ", str(s).strip().lower())

def canon_name(s: str) -> str:
    n = norm_name(s)
    return aliases.get(n, n)


In [None]:
log("Loading core tables (robust)...")
import json, re

# make sure to load sw_movies.csv or sw_titles.csv
titles_path = None
for nm in ["sw_movies.csv", "sw_titles.csv"]:
    p = f"{CORE_DIR}/{nm}"
    if os.path.exists(p):
        titles_path = p
        break
if titles_path is None:
    raise FileNotFoundError("Missing sw_movies.csv / sw_titles.csv in starwars_core/")

titles = pd.read_csv(titles_path)
chars  = pd.read_csv(f"{CORE_DIR}/sw_characters.csv")
lines  = pd.read_csv(f"{CORE_DIR}/sw_lines.csv")            # line_num already included
convs  = pd.read_csv(f"{CORE_DIR}/sw_conversations.csv")

log(f"titles={titles.shape}, chars={chars.shape}, lines={lines.shape}, convs={convs.shape}")
log(f"convs columns: {list(convs.columns)}")

# normalize character names
chars["character_norm"] = chars["character"].map(canon_name).map(norm_name)
lines["character_norm"] = lines["character"].map(canon_name).map(norm_name)

# position already exists in chars
def parse_utter_list(cell):
    s = str(cell).strip().replace("'", '"')
    s = re.sub(r"\[\s*([^\]]+)\s*\]", lambda m: "[" + ",".join(m.group(1).split()) + "]", s)
    if not s.startswith("["):
        return re.findall(r"L\d+", s)
    try:
        raw = json.loads(s)
        return [str(x).strip('"') for x in raw]
    except Exception:
        return re.findall(r"L\d+", s)

# find utterances column
utter_col = None
for c in convs.columns:
    lc = c.strip().lower()
    if "parsed" in lc and "utter" in lc:
        utter_col = c
        break
if utter_col is None:
    for c in convs.columns:
        lc = c.strip().lower()
        if "utter" in lc:   # utterances
            utter_col = c
            break

if utter_col is None:
    raise KeyError("sw_conversations.csv missing utterances column")

# get parsed_utterances
if "parsed_utterances" == utter_col:
    # have parsed_utterances, but may be string, need to parse again
    convs["parsed_utterances"] = convs[utter_col].apply(parse_utter_list)
else:
    # just original utterances, need to parse
    convs["parsed_utterances"] = convs[utter_col].apply(parse_utter_list)

log(f"Using utterances column: {utter_col}")
sample = convs["parsed_utterances"].iloc[0] if len(convs) else "N/A"
log(f"Sample parsed_utterances: {sample}")

# filter characters by whitelist
keep_names = set(avatar_whitelist.keys())
log(f"Whitelist characters ({len(keep_names)}): {sorted(keep_names)}")

chars_keep = chars[chars["character_norm"].isin(keep_names)].copy()
lines_keep = lines[lines["character_norm"].isin(keep_names)].copy()
log(f"Filtered chars={chars_keep.shape}, lines={lines_keep.shape}")


[   0.0s] Loading core tables (robust)...
[   0.1s] titles=(3, 6), chars=(42, 6), lines=(1115, 5), convs=(371, 4)
[   0.1s] convs columns: ['character1ID', 'character2ID', 'movieID', 'utteranceIDs']
[   0.1s] Using utterances column: utteranceIDs
[   0.1s] Sample parsed_utterances: ['L191961', 'L191962', 'L191963', 'L191964', 'L191965', 'L191966']
[   0.1s] Whitelist characters (17): ['ackbar', 'admiral piett', 'ben kenobi', 'biggs', 'boushh', 'c-3po', 'chewie', 'han', 'lando', 'leia', 'luke', 'ninedenine', 'owen', 'rieekan', 'vader', 'wedge', 'yoda']
[   0.1s] Filtered chars=(32, 7), lines=(1054, 6)


In [16]:
import re

# normalize column names to lower case and strip spaces
def normalize_cols(df):
    df = df.copy()
    df.columns = [c.strip().lower() for c in df.columns]
    return df

lines = normalize_cols(lines)
chars = normalize_cols(chars)
convs = normalize_cols(convs)

# make a mapping for lines table
col_map_lines = {
    "lineid": "lineID",
    "characterid": "characterID",
    "movieid": "movieID",
    "character": "character",
    "text": "text",
    "line_num": "line_num",   # if exists
}
# restore desired casing (only for code convenience; can use all lower case too)
def remap(df, cmap):
    for k, v in list(cmap.items()):
        if k in df.columns and v not in df.columns:
            df.rename(columns={k: v}, inplace=True)
    return df

lines = remap(lines, col_map_lines)

# if line_num not exists, try to extract from lineID
if "line_num" not in lines.columns:
    def line_num_from_id(lid):
        m = re.search(r"(\d+)", str(lid))
        return int(m.group(1)) if m else None
    lines["line_num"] = lines["lineID"].apply(line_num_from_id)

# fill character_norm if missing
if "character_norm" not in lines.columns:
    lines["character_norm"] = lines["character"].map(canon_name).map(norm_name)

# filter by whitelist
keep_names = set(avatar_whitelist.keys())
lines_keep = lines[lines["character_norm"].isin(keep_names)].copy()

# check required columns
missing_cols = [c for c in ["lineID","movieID","line_num","character_norm"] if c not in lines_keep.columns]
if missing_cols:
    raise KeyError(f"lines_keep missing：{missing_cols}；please check the previous cell.")

# make a quick snapshot
print("lines_keep columns:", list(lines_keep.columns))
print("example：")
display(lines_keep.head(3)[["lineID","movieID","character_norm","line_num","text"]])


lines_keep columns: ['lineID', 'characterID', 'movieID', 'character', 'text', 'character_norm', 'line_num']
example：


Unnamed: 0,lineID,movieID,character_norm,line_num,text
0,L191966,m337,vader,191966,And code the signal to my private chamber.
2,L191964,m337,vader,191964,Move this ship out of the asteroid field and i...
4,L191962,m337,vader,191962,The Emperor?


# 3. Build the nodes and edges of conversations
***This part is done by Yufei Zhang(25405381)***

In [17]:
from collections import defaultdict
from tqdm.auto import tqdm
import re, json
import pandas as pd

# normalize column names (strip spaces only, keep case)
def norm_cols(df):
    df = df.copy()
    df.columns = [c.strip() for c in df.columns]
    return df

convs = norm_cols(convs)

# find movieID column
movie_col = None
for c in convs.columns:
    lc = c.lower()
    if lc in ("movieid", "movie_id", "movie"):
        movie_col = c
        break
if movie_col is None:
    raise KeyError(f"无法在 sw_conversations.csv 中识别电影列名，现有列：{list(convs.columns)}")

# rename to movieID if needed
if movie_col != "movieID":
    convs = convs.rename(columns={movie_col: "movieID"})

# deal with parsed_utterances column
def parse_utter_list(cell):
    s = str(cell).strip().replace("'", '"')
    s = re.sub(r"\[\s*([^\]]+)\s*\]", lambda m: "[" + ",".join(m.group(1).split()) + "]", s)
    if not s.startswith("["):
        return re.findall(r"L\d+", s)
    try:
        raw = json.loads(s)
        return [str(x).strip('"') for x in raw]
    except Exception:
        return re.findall(r"L\d+", s)

if "parsed_utterances" not in convs.columns:
    # try to find utterances column
    utter_col = None
    for c in convs.columns:
        if "utter" in c.lower():
            utter_col = c
            break
    if utter_col is None:
        raise KeyError(f"找不到 utterances 列，现有列：{list(convs.columns)}")
    convs["parsed_utterances"] = convs[utter_col].apply(parse_utter_list)
else:
    
    convs["parsed_utterances"] = convs["parsed_utterances"].apply(parse_utter_list)

# make sure lines_keep has required columns
required = {"lineID","movieID","character_norm","line_num"}
missing = required - set(lines_keep.columns)
if missing:
    raise KeyError(f"lines_keep 缺列 {missing}，请检查前一单元格。")

line_lookup = lines_keep.set_index("lineID")[["movieID","character_norm","line_num"]].to_dict(orient="index")

# generate events (use column unpacking to avoid attribute access)
events = []
total_rows = len(convs)
for mov, utts in tqdm(convs[["movieID","parsed_utterances"]].itertuples(index=False, name=None), total=total_rows):
    prev = None
    for lid in utts:
        info = line_lookup.get(lid)
        if not info:
            continue
        spk = info["character_norm"]
        t   = info["line_num"]
        if prev is not None and spk != prev:
            a, b = sorted([prev, spk])
            events.append({"movieID": mov, "t": t, "src": a, "dst": b})
        prev = spk

events_df = pd.DataFrame(events)
log(f"events_df={events_df.shape}")


100%|██████████| 371/371 [00:00<00:00, 185579.82it/s]

[   0.2s] events_df=(655, 4)





In [18]:
# edge weights (total)
edges_total = events_df.groupby(["movieID","src","dst"]).size().reset_index(name="weight")

# cumulative edges (time)
def cumulative_edges(df):
    df = df.sort_values("t")
    cum = defaultdict(int)
    out = []
    for _, r in df.iterrows():
        key = (r["src"], r["dst"])
        cum[key] += 1
        out.append({"movieID": r["movieID"], "t": r["t"], "src": r["src"], "dst": r["dst"], "weight": cum[key]})
    return pd.DataFrame(out)

edges_time = (pd.concat([cumulative_edges(g) for _, g in events_df.groupby("movieID")], ignore_index=True)
              if len(events_df) else pd.DataFrame(columns=["movieID","t","src","dst","weight"]))

log(f"edges_total={edges_total.shape}, edges_time={edges_time.shape}")


[   0.2s] edges_total=(40, 4), edges_time=(655, 5)


In [19]:
# analysis checks
from collections import Counter

# if avatar whitelist characters are really in lines_keep?
print("the time that characters show：")
print(lines_keep["character_norm"].value_counts().reindex(sorted(keep_names)).fillna(0).astype(int))

# number of utterances that can be mapped to line_lookup
total_utts = 0
hit_utts = 0
miss_examples = []
for mov, utts in convs[["movieID","parsed_utterances"]].itertuples(index=False, name=None):
    total_utts += len(utts)
    for lid in utts:
        if lid in line_lookup:
            hit_utts += 1
        elif len(miss_examples) < 10:
            miss_examples.append(lid)

hit_rate = hit_utts / total_utts if total_utts else 0
print(f"\nutterance mapping hitted: {hit_rate:.2%}  (hit {hit_utts} / sum {total_utts})")
if miss_examples:
    print("miss lineID：", miss_examples)

# look at a sample conversation
if len(convs):
    sample = convs.iloc[0]
    print("\nconversations example：")
    print("movieID:", sample["movieID"])
    print("parsed_utterances (first 10):", sample["parsed_utterances"][:10])


the time that characters show：
character_norm
ackbar             6
admiral piett      9
ben kenobi        93
biggs              5
boushh             3
c-3po             60
chewie            17
han              248
lando             70
leia             177
luke             232
ninedenine         4
owen               4
rieekan           12
vader             64
wedge              6
yoda              44
Name: count, dtype: int32

utterance mapping hitted: 94.53%  (hit 1054 / sum 1115)
miss lineID： ['L191961', 'L191963', 'L191965', 'L191995', 'L191997', 'L191999', 'L192001', 'L192006', 'L192008', 'L192010']

conversations example：
movieID: m337
parsed_utterances (first 10): ['L191961', 'L191962', 'L191963', 'L191964', 'L191965', 'L191966']


In [20]:
# build nodes table
import pandas as pd

def ensure_col(df, name, fill=None):
    if name not in df.columns:
        df[name] = fill
    return df

# chars_keep from previous cells
print("chars_keep columns:", list(chars_keep.columns))

# fill required columns
for c in ["movieID","character_norm","movie","gender","credit_pos"]:
    chars_keep = ensure_col(chars_keep, c, None)

# credit_pos coerce to numeric
chars_keep["credit_pos"] = pd.to_numeric(chars_keep["credit_pos"], errors="coerce")

# build nodes
nodes = (
    chars_keep
    .sort_values(["movieID","character_norm","credit_pos"], na_position="last")
    .drop_duplicates(subset=["movieID","character_norm"])
    [["movieID","character_norm","movie","gender","credit_pos"]]
    .rename(columns={"character_norm":"character"})
)

# the faction mapping
faction_map = {
    "ackbar": "Rebel Alliance",
    "admiral piett": "Galactic Empire",
    "ben kenobi": "Jedi",
    "biggs": "Rebel Alliance",
    "boushh": "Unknown",
    "c-3po": "Droid",
    "chewie": "Rebel Alliance",
    "han": "Rebel Alliance",
    "lando": "Rebel Alliance",
    "leia": "Rebel Alliance",
    "luke": "Jedi",
    "ninedenine": "Unknown",
    "owen": "Unknown",
    "rieekan": "Rebel Alliance",
    "vader": "Galactic Empire",
    "wedge": "Rebel Alliance",
    "yoda": "Jedi",
}
nodes["faction"] = nodes["character"].map(lambda x: faction_map.get(x, "Unknown"))

# picture filenames
nodes["avatar_png"] = nodes["character"].map(lambda n: avatar_whitelist.get(n))

# save nodes and edges
nodes.to_csv(f"{OUT_DIR}/nodes.csv", index=False)

# prevent missing edges tables
if 'edges_total' not in globals():
    edges_total = pd.DataFrame(columns=["movieID","src","dst","weight"])
if 'edges_time' not in globals():
    edges_time = pd.DataFrame(columns=["movieID","t","src","dst","weight"])

edges_total.to_csv(f"{OUT_DIR}/edges_total.csv", index=False)
edges_time.to_csv(f"{OUT_DIR}/edges_time.csv", index=False)

log("done：nodes.csv / edges_total.csv / edges_time.csv")
display(nodes.head(10))


chars_keep columns: ['characterID', 'character', 'movieID', 'movie', 'gender', 'position', 'character_norm']
[   0.2s] done：nodes.csv / edges_total.csv / edges_time.csv


Unnamed: 0,movieID,character,movie,gender,credit_pos,faction,avatar_png
1,m337,admiral piett,star wars: the empire strikes back,?,,Galactic Empire,admiral_piett.png
2,m337,ben kenobi,star wars: the empire strikes back,?,,Jedi,ben_kenobi.png
13,m337,c-3po,star wars: the empire strikes back,?,,Droid,c3po.png
3,m337,chewie,star wars: the empire strikes back,?,,Rebel Alliance,chewie.png
7,m337,han,star wars: the empire strikes back,M,,Rebel Alliance,han.png
8,m337,lando,star wars: the empire strikes back,m,,Rebel Alliance,lando.png
9,m337,leia,star wars: the empire strikes back,F,,Rebel Alliance,leia.png
10,m337,luke,star wars: the empire strikes back,m,,Jedi,luke.png
12,m337,rieekan,star wars: the empire strikes back,?,,Rebel Alliance,rieekan.png
14,m337,vader,star wars: the empire strikes back,?,,Galactic Empire,vader.png


# 4. Build the emotional visualization library and interactive website
***This part is done by Yufei Zhang(25405381)***

In [21]:
import re, pandas as pd, numpy as np
from pathlib import Path

nodes = pd.read_csv("starwars_filtered/nodes.csv")
edges_time = pd.read_csv("starwars_filtered/edges_time.csv")
edges_total = pd.read_csv("starwars_filtered/edges_total.csv")
sw_chars = pd.read_csv("starwars_core/sw_characters.csv")
sw_lines = pd.read_csv("starwars_core/sw_lines.csv")
sw_movies = pd.read_csv("starwars_core/sw_movies.csv")

# normalize column names
for df in (nodes, edges_time, edges_total, sw_chars, sw_lines, sw_movies):
    df.columns = [c.strip().lower() for c in df.columns]

# avatar mapping
AVATAR_DIR = Path("assets/avatars")
def norm_key(s: str) -> str:
    return re.sub(r"[^a-z0-9_]+", "", str(s).lower().strip().replace(" ", "_"))

avatar_index = {}
if AVATAR_DIR.exists():
    for p in AVATAR_DIR.glob("*.png"):
        avatar_index[norm_key(p.stem)] = f"/assets/avatars/{p.name}"


# nodes processing
if "character" not in nodes.columns:
    # normalize common alternative column names
    for alt in ["name", "label", "char", "speaker"]:
        if alt in nodes.columns:
            nodes.rename(columns={alt: "character"}, inplace=True)
            break
assert "character" in nodes.columns, "nodes.csv need one of the colomn（character/name/label/char/speaker ）"

nodes["character"] = nodes["character"].astype(str)
nodes["id"] = nodes.get("id", pd.Series(index=nodes.index, dtype=object))
if nodes["id"].isna().all():
    nodes["id"] = nodes["character"].map(norm_key)

# if id exists but not string, convert to string
nodes = nodes.drop_duplicates(subset=["id"]).reset_index(drop=True)

# if no avatar_png column, create one
if "avatar_png" not in nodes.columns:
    nodes["avatar_png"] = None

avatar_index = {}
if AVATAR_DIR.exists():
    for p in AVATAR_DIR.glob("*.png"):
        avatar_index[norm_key(p.stem)] = f"/assets/avatars/{p.name}"

def fill_avatar(row):
    cur = str(row.get("avatar_png") or "").strip()
    if cur:
        return cur
    return avatar_index.get(row["id"], None)

nodes["avatar_png"] = nodes.apply(fill_avatar, axis=1)

# whitelist: must have avatar
nodes_whitelist = nodes[nodes["avatar_png"].notna() & (nodes["avatar_png"].astype(str) != "")]
nodes_whitelist = nodes_whitelist.drop_duplicates(subset=["id"]).copy()
nodes_whitelist["id_norm"] = nodes_whitelist["id"]  
whitelist_ids = set(nodes_whitelist["id"].astype(str))

print(f"[Whitelist] avater: {len(nodes_whitelist)} / {len(nodes)}")

# make sure sw_lines has 'character' column
sw_lines = sw_lines.copy()
if "character" not in sw_lines.columns or sw_lines["character"].isna().all():
    cid_col = "characterid" if "characterid" in sw_characters.columns else (
              "character_id" if "character_id" in sw_characters.columns else None)
    assert cid_col is not None, "sw_characters.csv 需要 characterid/character_id 以把台词里的ID映射到姓名"
    id2name = dict(zip(sw_characters[cid_col].astype(str), sw_characters["character"].astype(str)))
    key_in_lines = "character_id" if "character_id" in sw_lines.columns else ("characterid" if "characterid" in sw_lines.columns else None)
    assert key_in_lines is not None, "sw_lines.csv 缺少 character 或 character_id/characterid"
    sw_lines["character"] = sw_lines[key_in_lines].astype(str).map(id2name)

# line_num
if "line_num" not in sw_lines.columns:
    def line_num_from_id(x):
        m = re.search(r"(\d+)", str(x));  return int(m.group(1)) if m else None
    sw_lines["line_num"] = sw_lines["lineid"].map(line_num_from_id) if "lineid" in sw_lines.columns else sw_lines.reset_index().index

sw_lines["char_norm"] = sw_lines["character"].astype(str).map(norm_key)
norm2id = dict(zip(nodes_whitelist["id_norm"], nodes_whitelist["id"].astype(str)))
sw_lines["character_id"] = sw_lines["char_norm"].map(norm2id)

# just whitelist lines
lines_whitelist = sw_lines[sw_lines["character_id"].notna()].copy()
lines_whitelist["line_num"] = pd.to_numeric(lines_whitelist["line_num"], errors="coerce")

print(f"sum line：{len(sw_lines)}，whitelist line：{len(lines_whitelist)}，whitelist avater：{lines_whitelist['character_id'].nunique()}")

# fill sentiment (offline)
POS = {"good","great","excellent","love","like","hope","friend","happy","joy","win","calm","safe","brave","peace","trust","support"}
NEG = {"bad","terrible","awful","hate","kill","fear","anger","war","enemy","sad","cry","hurt","lose","pain","threat","betray","death","dark"}

def classify_sentiment_offline(text: str):
    t = re.sub(r"[^a-zA-Z']+", " ", str(text)).lower().split()
    pos = sum(1 for w in t if w in POS); neg = sum(1 for w in t if w in NEG)
    total = pos + neg
    comp = 0.0 if total==0 else (pos-neg)/total
    lab = "pos" if comp>0.05 else ("neg" if comp<-0.05 else "neu")
    return lab, comp

if not lines_whitelist.empty:
    sent_df = lines_whitelist["text"].astype(str).apply(
        lambda t: pd.Series(classify_sentiment_offline(t), index=["sent_label","compound"])
    )
    lines_whitelist = pd.concat([lines_whitelist, sent_df], axis=1)

assert "id" in nodes.columns and "character" in nodes.columns, "nodes.csv need: id, character"
nodes["id"] = nodes["id"].astype(str)
nodes["character"] = nodes["character"].astype(str)

if "avatar_png" not in nodes.columns:
    nodes["avatar_png"] = None

def fill_avatar(row):
    if pd.notna(row.get("avatar_png")) and str(row["avatar_png"]).strip():
        return row["avatar_png"]
    k = norm_key(row["id"])  # make sure key is normalized
    return avatar_index.get(k, None)

nodes["avatar_png"] = nodes.apply(fill_avatar, axis=1)

# whitelist: must have avatar
nodes_whitelist = nodes[nodes["avatar_png"].notna() & (nodes["avatar_png"].astype(str) != "")]
nodes_whitelist = nodes_whitelist.drop_duplicates(subset=["id"]).copy()
nodes_whitelist["id_norm"] = nodes_whitelist["id"].astype(str)  
whitelist_set = set(nodes_whitelist["id_norm"])

# make sure sw_lines has 'character' column
sw_lines2 = sw_lines.copy()
if "character" not in sw_lines2.columns or sw_lines2["character"].isna().all():
    cid_col_chars = "characterid" if "characterid" in sw_chars.columns else (
        "character_id" if "character_id" in sw_chars.columns else None
    )
    assert cid_col_chars is not None, "sw_characters.csv 需要 characterid/character_id 用于ID→姓名映射"
    id2name = dict(zip(sw_chars[cid_col_chars].astype(str), sw_chars["character"].astype(str)))

    key_in_lines = "character_id" if "character_id" in sw_lines2.columns else (
        "characterid" if "characterid" in sw_lines2.columns else None
    )
    assert key_in_lines is not None, "sw_lines.csv 缺少 character 或 character_id/characterid"
    sw_lines2["character"] = sw_lines2[key_in_lines].astype(str).map(id2name)

sw_lines2["char_key"] = sw_lines2["character"].astype(str).map(norm_key)

wl = sw_lines2[sw_lines2["char_key"].isin(whitelist_set)].copy()

if "line_num" not in wl.columns:
    def line_num_from_id(x):
        m = re.search(r"(\d+)", str(x))
        return int(m.group(1)) if m else None
    wl["line_num"] = wl["lineid"].map(line_num_from_id) if "lineid" in wl.columns else wl.reset_index().index

wl["line_num"] = pd.to_numeric(wl["line_num"], errors="coerce")
wl["text"] = wl.get("text", "").astype(str)

# only keep needed columns
cols_keep = []
for c in ["movieid","lineid","line_num","character","text","sent_label","compound","char_key"]:
    if c in wl.columns: cols_keep.append(c)
lines_whitelist = wl.loc[:, cols_keep].copy()

# de-duplicate columns if any
lines_whitelist = lines_whitelist.loc[:, ~lines_whitelist.columns.duplicated(keep="last")]

# rebuild char2lines
char2lines = {
    key: df.sort_values("line_num").reset_index(drop=True)
    for key, df in lines_whitelist.groupby("char_key")
}

print(f"[Rebuilt] whitelist lines = {len(lines_whitelist)}  | characters with lines = {len(char2lines)}")


[Whitelist] avater: 17 / 17
sum line：1115，whitelist line：897，whitelist avater：15
[Rebuilt] whitelist lines = 897  | characters with lines = 15


In [22]:
# ===================================================
# Final Cell — factions from nodes.csv + movie titles in dropdown
# ===================================================
from pathlib import Path
import re, socket
import pandas as pd
from jupyter_dash import JupyterDash, jupyter_app
from dash import Dash, html, dcc, Input, Output, State
import dash_cytoscape as cyto
import plotly.express as px
import plotly.graph_objects as go

# ------------------ Config ------------------
AVATAR_DIR = Path("assets/avatars")
BG_COLOR   = "#0b0f19"

# key: normalized faction name; value: color hex
DEFAULT_FACTION_COLORS = {
    "rebellion":   "#4ade80",
    "rebel":       "#4ade80",
    "resistance":  "#4ade80",
    "empire":      "#f87171",
    "imperial":    "#f87171",
    "jedi":        "#60a5fa",
    "sith":        "#f43f5e",
    "smuggler":    "#fbbf24",
    "bounty_hunter": "#fb7185",
    "droid":       "#22d3ee",
    "neutral":     "#a78bfa",
    "first_order": "#f471b5"
}

# reusable color palette
PALETTE = [
    "#22d3ee", "#a78bfa", "#f59e0b", "#34d399", "#60a5fa",
    "#fb7185", "#f472b6", "#93c5fd", "#fbbf24", "#10b981",
    "#f43f5e", "#86efac", "#fda4af", "#d8b4fe", "#7dd3fc"
]

def norm_key(s:str) -> str:
    return re.sub(r"[^a-z0-9]+", "", str(s).strip().lower())

def natural_key(s:str):
    return [int(t) if t.isdigit() else t for t in re.split(r'(\d+)', str(s))]

# data copies
_nodes = nodes.copy()
_edges = edges_total.copy()

if "id" not in _nodes.columns and "character" in _nodes.columns:
    _nodes["id"] = _nodes["character"]

_nodes["id_norm"] = _nodes["id"].astype(str).map(norm_key)
if "character" not in _nodes.columns:
    _nodes["character"] = _nodes["id"]

for c in ("src","dst"):
    _edges[c] = _edges[c].astype(str).map(norm_key)

if "weight" not in _edges.columns:
    _edges["weight"] = 1
if "movieid" not in _edges.columns:
    _edges["movieid"] = ""   

# user override factions
def load_nodes_override():
    for fn in ["nodes.csv", "proj/nodes.csv", "starwars_filtered/nodes.csv"]:
        p = Path(fn)
        if p.exists():
            try:
                df = pd.read_csv(p)
                return df
            except Exception:
                pass
    return None

override = load_nodes_override()
override_map = {}
if override is not None:
    # use "id" or "character" as key
    key_col = "id" if "id" in override.columns else \
              ("character" if "character" in override.columns else None)
    fac_col = "faction" if "faction" in override.columns else None
    if key_col and fac_col:
        odf = override[[key_col, fac_col]].copy()
        odf["id_norm"] = odf[key_col].astype(str).map(norm_key)
        for _, r in odf.iterrows():
            kid = r["id_norm"]
            fct = str(r[fac_col]).strip()
            if kid and fct:
                override_map[kid] = fct

# build node_map
node_map = {}
for _, r in _nodes.iterrows():
    kid = r["id_norm"]
    if kid not in node_map:
        node_map[kid] = {
            "id_norm": kid,
            "label": str(r.get("character", r.get("id", kid))),
            "avatar_png": str(r.get("avatar_png", "") or ""),
            "faction": str(r.get("faction", "") or "")
        }
# apply overrides
for kid, fac in override_map.items():
    if kid in node_map:
        node_map[kid]["faction"] = fac

edge_ids = set(_edges["src"].tolist() + _edges["dst"].tolist())
for mid in edge_ids:
    if mid not in node_map:
        node_map[mid] = {"id_norm": mid, "label": mid, "avatar_png": "", "faction": ""}


def load_movie_titles_from_csv():
    candidates = [
        r"E:\AIDM\AIDM7330_programming\proj\starwars_core\sw_movies.csv",
        "starwars_core/sw_movies.csv",
        "proj/starwars_core/sw_movies.csv",
    ]
    for fp in candidates:
        p = Path(fp)
        if p.exists():
            try:
                df = pd.read_csv(p)
                # normalize columns
                df.columns = [c.lower() for c in df.columns]
                if {"movieid","title"} <= set(df.columns):
                    # use str keys
                    mp = {str(r["movieid"]): str(r["title"]).strip().title()
                          for _, r in df.iterrows()}
                    return mp
            except Exception:
                pass
    return {}

movie_title_map = load_movie_titles_from_csv()

def collect_movie_ids():
    vals = []
    if "movieid" in _edges.columns:
        vals.extend(_edges["movieid"].dropna().astype(str).tolist())
    if 'sw_lines' in globals() and "movieid" in sw_lines.columns:
        vals.extend(sw_lines["movieid"].dropna().astype(str).tolist())
    vals = sorted(sorted(set(vals)), key=natural_key)
    return vals

movie_ids = collect_movie_ids()

# if no titles, just use IDs
MOVIE_OPTIONS = [{"label": "All (Trilogy)", "value": "all"}] + [
    {"label": movie_title_map.get(mid, mid), "value": mid} for mid in movie_ids
]


# ---------------- Sentiment (lightweight) ----------------
POS_WORDS = set("good great love hope happy peace rescue freedom victory friend win brave help".split())
NEG_WORDS = set("bad hate kill dark fear death loss war attack betray fail enemy angry".split())

def tiny_sent_score(text: str) -> int:
    t = re.findall(r"[a-z']+", str(text).lower())
    pos = sum(w in POS_WORDS for w in t)
    neg = sum(w in NEG_WORDS for w in t)
    return 1 if pos>neg else (-1 if neg>pos else 0)

def compute_sentiment_counts(lines_df: pd.DataFrame) -> dict:
    if lines_df.empty: 
        return {"pos":0,"neg":0,"neu":0}
    col = None
    for c in ["sentiment","sent","polarity_label"]:
        if c in lines_df.columns:
            col = c; break
    if col is None:
        vals = lines_df["text"].astype(str).map(tiny_sent_score)
    else:
        vals = lines_df[col].astype(int).clip(-1,1)
    return {
        "pos": int((vals== 1).sum()),
        "neg": int((vals==-1).sum()),
        "neu": int((vals== 0).sum())
    }

# collect faction colors
def build_faction_color_map():
    facs = sorted({str(v.get("faction","")).strip() for v in node_map.values() if v is not None})
    facs_l = [f.lower() for f in facs if f]
    mapping = {}
    used = set()
    for f in facs_l:
        if f in DEFAULT_FACTION_COLORS:
            mapping[f] = DEFAULT_FACTION_COLORS[f]
            used.add(DEFAULT_FACTION_COLORS[f])
    # color assignment from palette
    palette_iter = (c for c in PALETTE if c not in used)
    for f in facs_l:
        if f not in mapping:
            mapping[f] = next(palette_iter, "#94a3b8")
    return mapping

FACTION_COLORS = build_faction_color_map()

# -------------- Filter helpers ----------------
def filter_edges_by_movie(df: pd.DataFrame, movie_sel):
    if movie_sel == "all" or "movieid" not in df.columns: 
        return df.copy()
    return df[df["movieid"].astype(str) == str(movie_sel)].copy()

def build_elements_for_movie(movie_sel):
    et = filter_edges_by_movie(_edges, movie_sel)
    keep_ids = set(et["src"].tolist() + et["dst"].tolist())
    if et.empty:
        w_min = w_max = 1
    else:
        w_min = int(et["weight"].min()); w_max = int(et["weight"].max())
        w_min = max(1, w_min); w_max = max(w_min, w_max)

    elems = []
    for kid in keep_ids:
        info = node_map[kid]
        label = info["label"]
        img_name = (AVATAR_DIR / info.get("avatar_png","")).name
        fac = str(info.get("faction","")).lower()
        color = FACTION_COLORS.get(fac, "#94a3b8")
        style = {"border-color": color}
        data  = {"id": kid, "label": label}
        if img_name and (AVATAR_DIR / img_name).exists():
            data["img"] = f"assets/avatars/{img_name}"
        else:
            style["background-color"] = "#999999"
        elems.append({"data": data, "style": style})

    for _, e in et.iterrows():
        elems.append({
            "data": {"source": e["src"], "target": e["dst"], "weight": int(e["weight"]) }
        })

    stylesheet = [
        {"selector": "node", "style": {
            "label": "data(label)",
            "text-opacity": 0,           
            "width": 44, "height": 44,
            "background-fit": "cover",
            "background-image": "data(img)",
            "color": "#fff",
            "font-size": 12,
            "text-valign": "top",
            "text-outline-width": 2,
            "text-outline-color": "#000",
            "border-width": 3,
        }},
        {"selector": "edge", "style": {
            "line-color": "#6b7280",
            "width": f"mapData(weight, {w_min}, {w_max}, 1, 8)"
        }},
        {"selector": ".hovered", "style": {"text-opacity": 1}},
    ]
    return elems, stylesheet

# -------------- Bar chart helper ----------------
def character_bar_for_movie(movie_sel):
    if 'sw_lines' not in globals():
        return go.Figure().update_layout(template="plotly_dark", height=260, margin=dict(l=20,r=20,t=10,b=30))
    df = sw_lines.copy()
    if "movieid" in df.columns and movie_sel!="all":
        df = df[df["movieid"].astype(str) == str(movie_sel)]
    if df.empty:
        return go.Figure().update_layout(template="plotly_dark", height=260, margin=dict(l=20,r=20,t=10,b=30))
    ser = df.groupby("character")["text"].count().sort_values(ascending=False).head(20)
    fig = px.bar(ser, orientation="v", labels={"value":"Lines", "character":"Character"})
    fig.update_layout(template="plotly_dark", height=260, margin=dict(l=20,r=20,t=10,b=30), xaxis_tickangle=-30)
    return fig

# -------------- Sentiment charts ----------------
def sentiment_figs_for(char_key, movie_sel):
    if 'sw_lines' not in globals():
        return go.Figure(), go.Figure()
    df = sw_lines.copy()
    df["k"] = df["character"].astype(str).map(norm_key)
    if "movieid" in df.columns and movie_sel!="all":
        df = df[df["movieid"].astype(str) == str(movie_sel)]
    df = df[df["k"]==char_key]

    counts = compute_sentiment_counts(df)
    pie = px.pie(values=[counts["pos"], counts["neu"], counts["neg"]],
                 names=["Positive","Neutral","Negative"], hole=0.45)\
            .update_layout(template="plotly_dark", height=260, margin=dict(l=10,r=10,t=10,b=10))

    if "line_num" in df.columns and not df.empty:
        temp = df[["line_num","text"]].sort_values("line_num").copy()
        temp["s"] = temp["text"].astype(str).map(tiny_sent_score)
        temp["ma"] = temp["s"].rolling(20, min_periods=1).mean()
        line = go.Figure()
        line.add_trace(go.Scatter(x=temp["line_num"], y=temp["ma"], mode="lines"))
        line.update_layout(template="plotly_dark", height=260, margin=dict(l=10,r=10,t=10,b=10),
                           yaxis_title="Sentiment (rolling)", xaxis_title="Line order")
    else:
        line = go.Figure().update_layout(template="plotly_dark", height=260, margin=dict(l=10,r=10,t=10,b=10))
    return pie, line

# -------------- Legends ----------------
def legend_block():
    # fraction colors
    items = []
    for fac, col in sorted(FACTION_COLORS.items()):
        if not fac: 
            continue
        items.append(html.Div([
            html.Span(style={"display":"inline-block","width":"12px","height":"12px",
                             "border":"2px solid "+col,"borderRadius":"50%","marginRight":"8px"}),
            html.Span(fac.capitalize())
        ], style={"marginRight":"16px","display":"inline-flex","alignItems":"center"}))
    edge_legend = html.Div("Edge width = number of dialogues between two characters", style={"opacity":0.8})
    return html.Div([
        html.Div(items, style={"display":"flex","flexWrap":"wrap","gap":"10px","marginBottom":"6px"}),
        edge_legend
    ], style={"color":"#cbd5e1","fontSize":"13px"})

# -------------- Dash App ----------------
try:
    for k, srv in list(jupyter_app._server_threads.items()):
        try: srv.kill()
        except Exception: pass
    jupyter_app._server_threads = {}
except Exception: pass

def get_free_port():
    s = socket.socket(); s.bind(('',0)); p=s.getsockname()[1]; s.close(); return p
PORT = get_free_port()

app = JupyterDash(__name__)

app.layout = html.Div([
    html.Div([
        html.Div([
            html.Label("Movie", style={"color":"#e5e7eb","fontWeight":"600"}),
            dcc.Dropdown(
                options=[{"label": "All (Trilogy)", "value": "all"}] + 
                        [{"label": movie_title_map.get(mid, mid), "value": mid} for mid in movie_ids],
                value="all", id="movie_dd", clearable=False,
                style={"backgroundColor":"#111827","color":"#111827"}
            )
        ], style={"width":"360px","marginRight":"16px"}),
        html.Div(legend_block(), style={"flex":"1"})
    ], style={"display":"flex","alignItems":"center","gap":"12px","padding":"8px 12px"}),

    cyto.Cytoscape(
        id="cy",
        elements=[],
        stylesheet=[],
        layout={"name": "cose", "randomize": True, "idealEdgeLength": 120, "nodeOverlap": 10},
        style={"width":"100%","height":"520px","backgroundColor":BG_COLOR},
        minZoom=0.2, maxZoom=2.5
    ),

    html.Div([
        html.Div([
            html.H4("Character Sentiment", style={"color":"#e5e7eb","margin":"6px 0"}),
            html.Div(id="char_name", style={"color":"#93c5fd","marginBottom":"6px"}),
            html.Div([
                dcc.Graph(id="sent_pie", figure=go.Figure(), config={"displayModeBar": False}),
                dcc.Graph(id="sent_line", figure=go.Figure(), config={"displayModeBar": False})
            ], style={"display":"grid","gridTemplateColumns":"1fr 1fr","gap":"8px"})
        ], style={"flex":"1","padding":"0 12px"}),

        html.Div([
            html.H4("Lines per Character (selected movie)", style={"color":"#e5e7eb","margin":"6px 0"}),
            dcc.Graph(id="bar_lines", figure=go.Figure(), config={"displayModeBar": False})
        ], style={"flex":"1","padding":"0 12px"})
    ], style={"display":"flex","gap":"12px","padding":"6px 0"})
], style={"fontFamily":"Inter, system-ui, -apple-system, Segoe UI, Roboto, Helvetica, Arial, sans-serif"})

# ----------- Callbacks ----------------
@app.callback(
    Output("cy","elements"),
    Output("cy","stylesheet"),
    Input("movie_dd","value")
)
def refresh_graph(movie_sel):
    elems, stylesheet = build_elements_for_movie(movie_sel)
    return elems, stylesheet

@app.callback(
    Output("cy","stylesheet", allow_duplicate=True),
    Input("cy","mouseoverNodeData"),
    State("cy","stylesheet"),
    prevent_initial_call=True
)
def hover_label(node_data, cur_styles):
    base = [s for s in cur_styles if s.get("selector") != "node.hovered"]
    if node_data and "id" in node_data:
        return base + [{"selector": f'node[id = "{node_data["id"]}"]', "style": {"text-opacity": 1}, "classes": "hovered"}]
    return base

@app.callback(
    Output("char_name","children"),
    Output("sent_pie","figure"),
    Output("sent_line","figure"),
    Input("cy","tapNodeData"),
    State("movie_dd","value")
)
def on_tap(node_data, movie_sel):
    if not node_data:
        return "Click a character node...", go.Figure(), go.Figure()
    kid = node_data["id"]
    label = node_map.get(kid, {}).get("label", kid)
    pie, line = sentiment_figs_for(kid, movie_sel)
    return f"{label}", pie, line

@app.callback(
    Output("bar_lines","figure"),
    Input("movie_dd","value")
)
def on_movie_change(movie_sel):
    return character_bar_for_movie(movie_sel)

print(f"[INFO] factions (unique): {sorted({str(v.get('faction','')) for v in node_map.values()})}")
print(f"[INFO] movie_ids: {movie_ids} (titles available: {len(movie_title_map)})")
print(f"Running at http://127.0.0.1:{PORT}")
app.run_server(mode="external", host="127.0.0.1", port=PORT, debug=False)


[INFO] factions (unique): ['Droid', 'Galactic Empire', 'Jedi', 'Rebel Alliance', 'Unknown']
[INFO] movie_ids: ['m337', 'm489', 'm529'] (titles available: 3)
Running at http://127.0.0.1:63330



JupyterDash is deprecated, use Dash instead.
See https://dash.plotly.com/dash-in-jupyter for more details.



Dash app running on http://127.0.0.1:63330/


Exception in thread Thread-36 (run):
Traceback (most recent call last):
  File "e:\Anaconda\envs\starwars-net\Lib\threading.py", line 1045, in _bootstrap_inner
    self.run()
  File "e:\Anaconda\envs\starwars-net\Lib\site-packages\ipykernel\ipkernel.py", line 788, in run_closure
    _threading_Thread_run(self)
  File "e:\Anaconda\envs\starwars-net\Lib\threading.py", line 982, in run
    self._target(*self._args, **self._kwargs)
  File "e:\Anaconda\envs\starwars-net\Lib\site-packages\retrying.py", line 55, in wrapped_f
    return Retrying(*dargs, **dkw).call(f, *args, **kw)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "e:\Anaconda\envs\starwars-net\Lib\site-packages\retrying.py", line 289, in call
    raise attempt.get()
          ^^^^^^^^^^^^^
  File "e:\Anaconda\envs\starwars-net\Lib\site-packages\retrying.py", line 326, in get
    raise exc.with_traceback(tb)
  File "e:\Anaconda\envs\starwars-net\Lib\site-packages\retrying.py", line 273, in call
    attempt = Attemp