In [1]:
import json

file_path2 = 'processed_performance_data 2.json'
with open(file_path2, 'r') as file:
    data = json.load(file)

learning_tips_path = 'learning_points.json'
with open(learning_tips_path, 'r') as file:
    tips = json.load(file)

print(len(data), data[0].keys())
song_ids = set([s['song_id'] for s in data])

323 dict_keys(['recording_id', 'assignment_id', 'song_id', 'answers', 'measure_indices', 'performance_data'])


In [2]:
sample = data[0] 
song_type = sample['song_id']
learning_points = tips[song_type]['learning_points']
lyrics_per_measure = tips[song_type]['lyrics_per_measure']


In [None]:
# NOTE_NAMES = ['도(C)', '도#(C#)', '레(D)', '레#(D#)', '미(E)', '파(F)', '파#(F#)', '솔(G)', '솔#(G#)', '라(A)', '라#(A#)', '시(B)']
NOTE_NAMES = ['도', '도#', '레', '레#', '미', '파', '파#', '솔', '솔#', '라', '라#', '시']
YUL_NAMES = ['황', '태', '중', '임', '남', '무', '청황', '청태', '청중', '청임', '청남', '청무']


In [2]:
import json
from collections import defaultdict, Counter

# --- MIDI Number to Note Name ---
def midi_to_note_name(midi_num):
    octave = (midi_num // 12) - 1
    note = NOTE_NAMES[midi_num % 12]
    if octave <= 2:
        return f"낮은 {note}"
    elif octave == 3:
        return note
    else:
        return f"높은 {note}"

# --- Pitch Difference Calculation ---
def pitch_difference(answer_pitch, performed_pitches):
    if answer_pitch == -1 or not performed_pitches:
        return None
    avg_pitch = sum(performed_pitches) / len(performed_pitches)
    return avg_pitch - answer_pitch

# --- Pitch Level Classification with Octave Consideration ---
def classify_pitch_level(pitches):
    has_high = any(p >= 60 for p in pitches)
    has_mid = any(53 <= p <= 57 for p in pitches)
    has_low = any(p <= 52 for p in pitches)

    if has_high:
        return "고음역대"
    elif has_mid:
        return "중간 음역대"
    else:
        return "저음역대"

# --- Measure-Level Musical Feature Analysis ---
def measure_features(measure, answers, measure_indices, performance_data, lyrics_per_measure=None):
    notes = [(a, p, idx) for idx, (a, i, p) in enumerate(zip(answers, measure_indices, performance_data)) if i == measure and a != -1 and p]
    if not notes:
        return {}, 0.0, 0

    answer_pitches = [a for a, _, _ in notes]
    pitch_range = max(answer_pitches) - min(answer_pitches)

    if all(x == answer_pitches[0] for x in answer_pitches):
        direction = "유지"
    elif all(x <= y for x, y in zip(answer_pitches, answer_pitches[1:])):
        direction = "상행"
    elif all(x >= y for x, y in zip(answer_pitches, answer_pitches[1:])):
        direction = "하행"
    else:
        direction = "복합적 이동"

    density = len(answer_pitches)
    note_density = "빠른 패시지" if density >= 6 else ("보통" if density >= 3 else "느린 패시지")
    pitch_level = classify_pitch_level(answer_pitches)

    pitch_diffs = [pitch_difference(a, p) for a, p, _ in notes if pitch_difference(a, p) is not None]
    avg_diff = sum(pitch_diffs) / len(pitch_diffs) if pitch_diffs else 0
    error_count = sum(abs(d) >= 0.5 for d in pitch_diffs)

    level = "정확" if abs(avg_diff) < 0.5 else ("조금 높음" if avg_diff > 0 else "조금 낮음")
    if abs(avg_diff) > 1.5:
        level = "매우 높음" if avg_diff > 0 else "매우 낮음"

    wrong_lyrics = []
    if lyrics_per_measure and str(measure) in lyrics_per_measure:
        syllables = lyrics_per_measure[str(measure)]
        for local_idx, (a, p, _) in enumerate(notes):
            diff = pitch_difference(a, p)
            if diff and abs(diff) >= 0.5 and local_idx < len(syllables):
                syllable = syllables[local_idx]
                wrong_lyrics.append(f"{syllable}({midi_to_note_name(a)})")

    result = {
        'pitch_level': pitch_level,
        'pitch_variability': '음정 변화가 많음' if pitch_range >= 5 else '음정 변화 적음',
        'pitch_direction': direction,
        'note_density': note_density,
        'pitch_error': f"{level} ({avg_diff:+.1f})",
        'frequent_wrong_syllables': wrong_lyrics[:3]  # 최대 3개
    }
    return result, abs(avg_diff), error_count

# --- 핵심 마디만 요약해서 LLM 입력용 데이터 생성 ---
def prepare_llm_input(answers, measure_indices, performance_data, top_n=3, lyrics_per_measure=None, learning_points=None):
    all_measures = sorted(set(measure_indices))
    analysis = {}
    pitch_accurate = 0
    total = 0
    note_errors = defaultdict(list)
    error_stats = {}

    for m in all_measures:
        result, avg_err, err_count = measure_features(m, answers, measure_indices, performance_data, lyrics_per_measure)
        analysis[str(m)] = result
        error_stats[m] = {'avg_error': avg_err, 'error_count': err_count}

    for ans, perf in zip(answers, performance_data):
        if ans == -1 or not perf:
            continue
        total += 1
        diff = pitch_difference(ans, perf)
        if abs(diff) < 0.75:
            pitch_accurate += 1
        elif abs(diff) >= 0.75:
            note_errors[ans].append(abs(diff))

    pitch_accuracy_percent = (pitch_accurate / total * 100) if total else 100
    top_wrong_notes = sorted(note_errors.items(), key=lambda x: (len(x[1]), sum(x[1])), reverse=True)[:3]
    frequent_wrong_notes = [
        {
            # "midi": int(n),
            "note": midi_to_note_name(int(n))
        } for n, _ in top_wrong_notes
    ]

    top_by_error = sorted(error_stats.items(), key=lambda x: x[1]['avg_error'], reverse=True)[:top_n]
    top_by_count = sorted(error_stats.items(), key=lambda x: x[1]['error_count'], reverse=True)[:top_n]
    selected = set(str(m) for m, _ in top_by_error + top_by_count)

    critical_measures = {}
    for m in selected:
        critical_measures[m] = analysis[m]
        if lyrics_per_measure and str(m) in lyrics_per_measure:
            critical_measures[m]['lyrics'] = lyrics_per_measure[str(m)]
        if learning_points and str(m) in learning_points:
            critical_measures[m]['learning_point'] = learning_points[str(m)]

    return {
        "overall_summary": {
            "pitch_accuracy_percent": round(pitch_accuracy_percent, 2),
            "frequent_wrong_notes": frequent_wrong_notes
        },
        "critical_measures": critical_measures
    }


In [18]:
import json

# 데이터 불러오기
with open("/home/habang/Education/LLMFeedback/dataset/sample.json", "r") as f:
    recordings = json.load(f)

# 결과 저장용 딕셔너리
all_results = {}

# 각 recording에 대해 분석 수행
for rec in recordings:
    answers = rec["answers"]
    measure_indices = rec["measure_indices"]
    performance_data = rec["performance_data"]
    recording_id = rec["recording_id"]

    result = prepare_llm_input(
        answers,
        measure_indices,
        performance_data,
        lyrics_per_measure=lyrics_per_measure,
        learning_points=learning_points
    )
    all_results[recording_id] = result

# recording_id=107에 대한 결과 예시 출력
import pprint
pprint.pprint(all_results[103], sort_dicts=False)


{'overall_summary': {'pitch_accuracy_percent': 34.23,
                     'frequent_wrong_notes': [{'note': '미'},
                                              {'note': '도'},
                                              {'note': '솔'}]},
 'critical_measures': {'30': {'pitch_level': '저음역대',
                              'pitch_variability': '음정 변화가 많음',
                              'pitch_direction': '복합적 이동',
                              'note_density': '보통',
                              'pitch_error': '매우 높음 (+7.9)',
                              'frequent_wrong_syllables': []},
                       '21': {'pitch_level': '중간 음역대',
                              'pitch_variability': '음정 변화 적음',
                              'pitch_direction': '복합적 이동',
                              'note_density': '빠른 패시지',
                              'pitch_error': '매우 높음 (+3.8)',
                              'frequent_wrong_syllables': ['가(라)',
                                                