In [2]:
import pandas as pd
import re
import numpy as np

# =========================
# 1️⃣ 读取数据
# =========================
df = pd.read_csv('corrected_transposed_JUNE24.csv')

# =========================
# 2️⃣ Section 列
# =========================
section_cols = {
    "Intro": "intro_chordsTransposed",
    "Verse": "verse_chordsTransposed",
    "Pre-Chorus": "pre_chorus_chordsTransposed",
    "Chorus": "chorus_chordsTransposed",
    "Post-Chorus": "post_chorus_chordsTransposed",
    "Bridge": "bridge_chordsTransposed",
    "Outro": "outro_chordsTransposed",
}

# =========================
# 3️⃣ 音高映射
# =========================
PC = {
    "C":0,"C#":1,"Db":1,"D":2,"D#":3,"Eb":3,"E":4,
    "F":5,"F#":6,"Gb":6,"G":7,"G#":8,"Ab":8,
    "A":9,"A#":10,"Bb":10,"B":11
}

ROMANS = ["I","II","III","IV","V","VI","VII"]
MAJOR_SCALE = [0,2,4,5,7,9,11]
MINOR_SCALE = [0,2,3,5,7,8,10]

# =========================
# 4️⃣ 工具函数
# =========================
def split_chords(chord_string):
    if pd.isna(chord_string):
        return []
    s = re.sub(r"[,\|;]", " ", str(chord_string))
    s = s.replace("-", " ")
    s = re.sub(r"\s+", " ", s).strip()
    return s.split()

def parse_chord(sym):
    m = re.match(r"^([A-G])([#b]?)(.*)$", sym)
    if not m:
        return None, "maj"
    root = m.group(1) + (m.group(2) or "")
    rest = m.group(3).lower()
    quality = "min" if rest.startswith("m") and not rest.startswith("maj") else "maj"
    return root, quality

def infer_key(chords):
    if not chords:
        return None, None
    root, quality = parse_chord(chords[0])
    mode = "minor" if quality == "min" else "major"
    return root, mode

def interval_to_roman(interval, mode, quality):
    scale = MAJOR_SCALE if mode == "major" else MINOR_SCALE
    diffs = [(abs(interval - deg), i) for i, deg in enumerate(scale)]
    _, idx = min(diffs)
    numeral = ROMANS[idx]
    if quality == "min":
        numeral = numeral.lower()
    return numeral

def convert_to_roman(chords, key_root, key_mode):
    if key_root not in PC:
        return []
    key_pc = PC[key_root]
    romans = []
    for sym in chords:
        root, quality = parse_chord(sym)
        if root not in PC:
            continue
        interval = (PC[root] - key_pc) % 12
        romans.append(interval_to_roman(interval, key_mode, quality))
    return romans

# =========================
# 5️⃣ 构建 long format
# =========================
rows = []

for _, r in df.iterrows():
    for section_name, col in section_cols.items():
        if col not in df.columns:
            continue

        chords = split_chords(r[col])
        if not chords:
            continue

        key_root, key_mode = infer_key(chords)
        roman_prog = convert_to_roman(chords, key_root, key_mode)

        if not roman_prog:
            continue

        rows.append({
            "song_title": r["song_title"],
            "section": section_name,
            "artist": "Taylor Swift",
            "key": key_root,
            "key_mode": key_mode,
            "roman_progression": roman_prog,
            "chord_progression_raw": chords
        })

long_df = pd.DataFrame(rows)

# =========================
# 6️⃣ 取 toy size = 50
# =========================
toy_df = long_df.head(50).copy()
toy_df["snippet_id"] = ["TS_{:03d}".format(i+1) for i in range(len(toy_df))]

# list 转字符串
toy_df["roman_progression"] = toy_df["roman_progression"].apply(lambda x: str(x))
toy_df["chord_progression_raw"] = toy_df["chord_progression_raw"].apply(lambda x: str(x))

# =========================
# 7️⃣ 输出 toy CSV
# =========================
toy_df = toy_df[[
    "snippet_id",
    "song_title",
    "section",
    "artist",
    "key",
    "key_mode",
    "roman_progression",
    "chord_progression_raw"
]]

toy_df.to_csv("toy_smashchords_tswift_50.csv", index=False)

print("Toy dataset created successfully!")


Toy dataset created successfully!


In [4]:
import pandas as pd
import ast
from collections import Counter, defaultdict

# =========================
# 1) 读 toy dataset
# =========================
toy = pd.read_csv("toy_smashchords_tswift_50.csv")

# roman_progression 在 CSV 里通常是字符串，比如 "[I, V, vi, IV]"
# 这里安全地转回 list
def parse_roman_list(s):
    if pd.isna(s):
        return []
    s = str(s).strip()
    # 兼容两种写法："[I, V, vi]" 或 "['I','V','vi']"
    if s.startswith("[") and "'" not in s:
        # 把 I, V, vi 变成 'I','V','vi'
        items = [x.strip() for x in s.strip("[]").split(",") if x.strip()]
        return items
    # 如果本来就是 python list 字符串，直接 literal_eval
    try:
        return ast.literal_eval(s)
    except Exception:
        # fallback
        return [x.strip() for x in s.strip("[]").split(",") if x.strip()]

toy["roman_list"] = toy["roman_progression"].apply(parse_roman_list)

# =========================
# 2) 统计转移次数 (bigram)
# =========================
transition_counts = Counter()
state_counts = Counter()

for prog in toy["roman_list"]:
    if not prog or len(prog) < 2:
        continue
    for a, b in zip(prog[:-1], prog[1:]):
        transition_counts[(a, b)] += 1
        state_counts[a] += 1

# 所有出现过的“状态”
states = sorted(set([a for a, _ in transition_counts.keys()] + [b for _, b in transition_counts.keys()]))

# =========================
# 3) 转成概率矩阵 (带 Laplace smoothing 防止 0 概率)
# =========================
alpha = 1.0  # smoothing strength (toy 阶段用 1 很稳)

# P(b|a) = (count(a->b)+alpha) / (count(a)+alpha*|V|)
V = len(states)

prob = defaultdict(dict)
for a in states:
    denom = state_counts[a] + alpha * V
    for b in states:
        num = transition_counts.get((a, b), 0) + alpha
        prob[a][b] = num / denom if denom > 0 else 1.0 / V

# =========================
# 4) 导出 counts & probs 为 CSV 矩阵
# =========================
counts_mat = pd.DataFrame(0, index=states, columns=states, dtype=int)
for (a, b), c in transition_counts.items():
    counts_mat.loc[a, b] = c

probs_mat = pd.DataFrame(0.0, index=states, columns=states, dtype=float)
for a in states:
    for b in states:
        probs_mat.loc[a, b] = prob[a][b]

counts_mat.to_csv("transition_counts.csv")
probs_mat.to_csv("transition_probs.csv")

print("Saved: transition_counts.csv, transition_probs.csv")
print("Num states:", V)
print("Top transitions:", transition_counts.most_common(10))

# =========================
# 5) 给 A->B transition 打分的函数
#    (A 的最后一个 chord -> B 的第一个 chord)
# =========================
def markov_score(from_prog, to_prog):
    """
    from_prog/to_prog: list[str]  (roman numerals list)
    return: P(first_of_B | last_of_A)
    """
    if not from_prog or not to_prog:
        return None
    a = from_prog[-1]
    b = to_prog[0]
    if a not in prob or b not in prob[a]:
        # 如果没见过，给一个很小的默认值
        return 1.0 / V
    return prob[a][b]

# =========================
# 6) 示例：对 toy dataset 随便算几个 transition
# =========================
# 用前两条 snippet 做 demo
A = toy.iloc[0]["roman_list"]
B = toy.iloc[1]["roman_list"]
print("Example A:", A)
print("Example B:", B)
print("Markov score P(B0|A_last) =", markov_score(A, B))


Saved: transition_counts.csv, transition_probs.csv
Num states: 14
Top transitions: [(('I', 'V'), 14), (('vi', 'IV'), 12), (('I', 'vi'), 12), (('V', 'V'), 11), (('V', 'II'), 10), (('IV', 'V'), 7), (('III', 'VII'), 6), (('II', 'iii'), 6), (('I', 'IV'), 6), (('i', 'III'), 5)]
Example A: ['I', 'v', 'IV']
Example B: ['i', 'VII', 'III', 'IV']
Markov score P(B0|A_last) = 0.03333333333333333


In [8]:
import pandas as pd
import ast
import math
from collections import defaultdict

# =========================
# 0) Load toy dataset
# =========================
toy = pd.read_csv("toy_smashchords_tswift_50.csv")

def parse_roman_list(s):
    if pd.isna(s):
        return []
    s = str(s).strip()
    # Handle "[I, V, vi, IV]" (no quotes)
    if s.startswith("[") and "'" not in s and '"' not in s:
        items = [x.strip() for x in s.strip("[]").split(",") if x.strip()]
        return items
    # Handle "['I','V','vi','IV']"
    try:
        return ast.literal_eval(s)
    except Exception:
        return [x.strip() for x in s.strip("[]").split(",") if x.strip()]

toy["roman_list"] = toy["roman_progression"].apply(parse_roman_list)

# Filter out empty progressions
toy = toy[toy["roman_list"].map(lambda x: isinstance(x, list) and len(x) > 0)].reset_index(drop=True)

# =========================
# 1) Normalize Roman symbols (light cleanup)
#    Keep b/#, keep case, strip °/+
# =========================
def norm_roman(r):
    r = str(r).strip()
    r = r.replace("°", "").replace("+", "")
    return r

toy["roman_list_norm"] = toy["roman_list"].apply(lambda xs: [norm_roman(x) for x in xs])

# =========================
# 2) Rotation-invariant representation for loop-like progressions
#    Example: [I,V,vi,IV] and [vi,IV,I,V] should be close.
#    We implement "canonical rotation": choose lexicographically smallest rotation.
# =========================
def all_rotations(seq):
    n = len(seq)
    return [tuple(seq[i:]+seq[:i]) for i in range(n)]

def canonical_rotation(seq):
    if not seq:
        return tuple()
    rots = all_rotations(list(seq))
    return min(rots)

toy["canon_loop"] = toy["roman_list_norm"].apply(lambda s: canonical_rotation(s))

# Also create a "set of bigrams" as a robust similarity signal
def bigrams(seq):
    if len(seq) < 2:
        return set()
    return set(zip(seq[:-1], seq[1:]))

toy["bigrams"] = toy["roman_list_norm"].apply(bigrams)

# =========================
# 3) Similarity metrics (Medley-style)
#    Main idea: harmonic pattern similarity >> one-step transition probability
#
#    We'll combine:
#      A) rotation similarity (exact match on canonical loop)  -> strong signal
#      B) bigram Jaccard similarity                           -> robust for different lengths
#      C) optional: length compatibility penalty
# =========================
def jaccard(a, b):
    if not a and not b:
        return 1.0
    inter = len(a & b)
    union = len(a | b)
    return inter / union if union > 0 else 0.0

def loop_match_score(canonA, canonB):
    # exact canonical loop match
    return 1.0 if canonA == canonB and len(canonA) > 0 else 0.0

def length_compatibility(lenA, lenB):
    # 1.0 when equal length, decays with difference
    d = abs(lenA - lenB)
    return 1.0 / (1.0 + d)

# =========================
# 4) Optional key proximity cost (small weight)
#    For medley, you can often transpose, so keep weight low or turn off.
# =========================
CIRCLE = ["C","G","D","A","E","B","F#","C#","Ab","Eb","Bb","F"]
ENHARMONIC = {"Db":"C#","Gb":"F#","Cb":"B","Fb":"E","E#":"F","B#":"C","A#":"Bb","D#":"Eb","G#":"Ab"}

def norm_key(k):
    if k is None or pd.isna(k):
        return None
    k = str(k).strip()
    return ENHARMONIC.get(k, k)

def circle_dist(k1, k2):
    k1 = norm_key(k1); k2 = norm_key(k2)
    if k1 not in CIRCLE or k2 not in CIRCLE:
        return None
    i = CIRCLE.index(k1); j = CIRCLE.index(k2)
    d = abs(i - j)
    return min(d, len(CIRCLE) - d)

def key_similarity(k1, k2):
    """
    Convert circle distance to similarity in [0,1]
    0 steps -> 1.0, 6 steps -> 0.0
    """
    d = circle_dist(k1, k2)
    if d is None:
        return 0.5
    return 1.0 - d / 6.0

# =========================
# 5) Final compatibility score
#    Score(A,B) in [0, ~something], higher = better match for medley stitching.
#    You can tune weights.
# =========================
def compatibility_score(A, B, w_loop=0.45, w_bigram=0.45, w_len=0.10, w_key=0.00):
    # Main: harmonic pattern similarity
    s_loop = loop_match_score(A["canon_loop"], B["canon_loop"])
    s_big = jaccard(A["bigrams"], B["bigrams"])
    s_len = length_compatibility(len(A["roman_list_norm"]), len(B["roman_list_norm"]))
    s_key = key_similarity(A["key"], B["key"]) if w_key > 0 else 0.0

    return (w_loop * s_loop) + (w_bigram * s_big) + (w_len * s_len) + (w_key * s_key)

# =========================
# 6) Compute top similar pairs (for inspection / demo)
# =========================
pairs = []
n = len(toy)

# Change this if you want to allow same song snippets:
REQUIRE_DIFFERENT_SONGS = True

# Set w_key small if you want key proximity as a tie-breaker (e.g., 0.05)
W_KEY = 0.05  # set to 0.0 if you want to ignore key entirely

for i in range(n):
    for j in range(n):
        if i == j:
            continue
        A = toy.iloc[i]
        B = toy.iloc[j]
        if REQUIRE_DIFFERENT_SONGS and A["song_title"] == B["song_title"]:
            continue

        score = compatibility_score(A, B, w_key=W_KEY)
        pairs.append({
            "from_snippet": A["snippet_id"],
            "from_song": A["song_title"],
            "from_section": A["section"],
            "from_key": A["key"],
            "from_prog": A["roman_progression"],
            "to_snippet": B["snippet_id"],
            "to_song": B["song_title"],
            "to_section": B["section"],
            "to_key": B["key"],
            "to_prog": B["roman_progression"],
            "score": score
        })

pairs_df = pd.DataFrame(pairs).sort_values("score", ascending=False)
pairs_df.head(50).to_csv("top_pairs_medley.csv", index=False)
print("Saved: top_pairs_medley.csv (top 50 pairs)")

# =========================
# 7) Build a K-snippet medley chain (K=5 default)
#    Use beam search to avoid getting stuck.
#    Constraint: all songs unique (typical medley).
# =========================
def build_medley_chain(toy_df, K=5, beam_width=30, w_key=W_KEY):
    # Precompute best outgoing neighbors for each snippet to speed up
    # We'll keep top M neighbors per snippet
    M = 40
    neighbors = defaultdict(list)

    # indexable rows
    rows = [toy_df.iloc[i] for i in range(len(toy_df))]

    for i, A in enumerate(rows):
        scored = []
        for j, B in enumerate(rows):
            if i == j:
                continue
            if REQUIRE_DIFFERENT_SONGS and A["song_title"] == B["song_title"]:
                continue
            s = compatibility_score(A, B, w_key=w_key)
            scored.append((s, j))
        scored.sort(reverse=True, key=lambda x: x[0])
        neighbors[i] = scored[:M]

    # Beam search state: (total_score, path_indices, used_song_titles)
    beam = []
    # initialize beam with each snippet as start
    for i, A in enumerate(rows):
        beam.append((0.0, [i], {A["song_title"]}))
    # keep top starts
    beam.sort(reverse=True, key=lambda x: x[0])
    beam = beam[:beam_width]

    for step in range(1, K):
        new_beam = []
        for total, path, used_songs in beam:
            last_i = path[-1]
            for s, j in neighbors[last_i]:
                B = rows[j]
                if B["song_title"] in used_songs:
                    continue  # enforce unique songs in medley
                new_total = total + s
                new_path = path + [j]
                new_used = set(used_songs)
                new_used.add(B["song_title"])
                new_beam.append((new_total, new_path, new_used))

        if not new_beam:
            break

        new_beam.sort(reverse=True, key=lambda x: x[0])
        beam = new_beam[:beam_width]

    best = max(beam, key=lambda x: x[0])
    best_total, best_path, _ = best

    # Convert to readable output
    chain_rows = []
    for idx in best_path:
        r = rows[idx]
        chain_rows.append({
            "snippet_id": r["snippet_id"],
            "song_title": r["song_title"],
            "section": r["section"],
            "key": r["key"],
            "roman_progression": r["roman_progression"]
        })

    out = pd.DataFrame(chain_rows)
    return best_total, out

best_score, best_chain = build_medley_chain(toy, K=5, beam_width=40, w_key=W_KEY)
best_chain.to_csv("best_medley_k5.csv", index=False)

print("Saved: best_medley_k5.csv")
print("Best chain total score:", round(best_score, 4))
print(best_chain)


Saved: top_pairs_medley.csv (top 50 pairs)
Saved: best_medley_k5.csv
Best chain total score: 1.8467
  snippet_id                                     song_title section key  \
0     TS_014                   bad blood (taylor's version)   Verse   C   
1     TS_010                                  midnight rain  Bridge   F   
2     TS_044                                 stay beautiful   Verse  C#   
3     TS_041                     fifteen (taylor's version)  Bridge   C   
4     TS_017  florida!!! (featuring florence + the machine)   Verse   F   

               roman_progression  
0        ['I', 'V', 'II', 'iii']  
1        ['I', 'V', 'II', 'iii']  
2  ['I', 'II', 'V', 'II', 'iii']  
3  ['I', 'iii', 'II', 'V', 'II']  
4             ['I', 'iii', 'II']  
