# AI Music Analysis Improved

## Setup Code

In [None]:
# Imports
from openai import OpenAI
import os
from pathlib import Path
from langchain_community.document_loaders import PyPDFLoader
from langchain.document_loaders import PDFPlumberLoader
from langchain_core.documents import Document
from langchain.tools import tool
from typing_extensions import List, TypedDict, Optional
from langchain_core.prompts import ChatPromptTemplate
from typing import Optional, List
from pydantic import BaseModel, Field
from langchain.chat_models import init_chat_model
import io
import pandas as pd
from langgraph.graph import START, StateGraph
from langchain_community.document_loaders import TextLoader
from music21 import converter, interval, note, metadata, stream, pitch
from pathlib import Path
import json
from langchain_experimental.tools.python.tool import PythonREPLTool
from collections import Counter
import xml.etree.ElementTree as ET

In [28]:
# Load the 9 sample documents as XML


def loadXML(filepath: str):
    loader = TextLoader(filepath)
    pages = loader.load()
    doc: Document = Document(page_content="", metadata=pages[0].metadata)
    for page in pages:
        doc.page_content += page.page_content
    return doc


directory = Path(r"C:\Users\charl\Documents\VSCode\Music Analysis\MEI Sample")
file_paths = [str(file) for file in directory.iterdir() if file.is_file()]
docs: List[Document] = []
for file in file_paths:
    docs.append(loadXML(file))

In [29]:
# Check loaded content

# print(docs[5])

In [None]:
# Convert to a music21 dict

def extract_title_composer_from_mei(filepath):
    try:
        tree = ET.parse(filepath)
        root = tree.getroot()
        ns = {'mei': 'http://www.music-encoding.org/ns/mei'}

        # Try <workList>/<work>/<title>
        title = root.find('.//mei:workList/mei:work/mei:title', ns)
        if title is None:
            # fallback to <fileDesc>/<titleStmt>/<title>
            title = root.find('.//mei:fileDesc/mei:titleStmt/mei:title', ns)
        title_text = title.text.strip() if title is not None else "Unknown Title"

        # Try <workList>/<work>/<composer>
        composer = root.find('.//mei:workList/mei:work/mei:composer', ns)
        if composer is None:
            # fallback to <persName role="composer">
            composer = root.find('.//mei:persName[@role="composer"]', ns)
        composer_text = composer.text.strip() if composer is not None else "Unknown Composer"

        return title_text, composer_text

    except Exception as e:
        return "Unknown Title", "Unknown Composer"


def musicxml_to_summary_dict(filepath):
    try:
        score = converter.parse(filepath)
    except Exception as e:
        return {"error": f"Failed to parse file: {e}"}

    meta = score.metadata or metadata.Metadata()
    title, composer = extract_title_composer_from_mei(filepath)


    part_data = []
    for part in score.parts:
        part_id = part.id or f"Part_{len(part_data)+1}"
        notes = [n for n in part.recurse().notes if n.isNote]
        pitches = [n.nameWithOctave for n in notes]

        intervals = []
        for i in range(1, len(notes)):
            iv = interval.Interval(noteStart=notes[i-1], noteEnd=notes[i])
            intervals.append(iv.directedName)

        time_sigs = [ts.ratioString for ts in part.recurse().getElementsByClass('TimeSignature')]
        key_sigs = [ks.sharps for ks in part.recurse().getElementsByClass('KeySignature')]
        analyzed_key = part.analyze('key').name

        part_data.append({
            "part_id": part_id,
            "num_notes": len(pitches),
            "pitches": pitches,
            "intervals": intervals,
            
            "analyzed_key": analyzed_key
        })

    return {
        "title": title,
        "composer": composer,
        "mei_path": filepath,
        "num_parts": len(part_data),
        "parts": part_data
    }

# === Example usage ===
if __name__ == "__main__":
    directory = Path("C:/Users/charl/Documents/VSCode/Music Analysis/MEI Sample")
    summaries = {}

    for file in directory.glob("*.mei"):
        print(f"Processing {file.name}...")
        summary = musicxml_to_summary_dict(str(file))
        summaries[file.stem] = summary

    # Save or inspect results
    with open("music_summaries.json", "w") as f:
        json.dump(summaries, f, indent=2)


Processing Bach_BWV_0772.mei...
Processing Bach_BWV_0778.mei...
Processing Bach_BWV_0786.mei...
Processing Bartok_Mikrokosmos_022.mei...
Processing Bartok_Mikrokosmos_031.mei...
Processing Bartok_Mikrokosmos_104.mei...
Processing Morley_1595_01_Go_ye_my_canzonettes.mei...
Processing Morley_1595_07_Leave_now_mine_eyes.mei...
Processing Morley_1595_12_Flora_wilt_thou.mei...


In [31]:
def inspect_music21_object(score):
    print(f"Top-level object type: {type(score)}")

    if isinstance(score, stream.Score):
        print(f"Number of parts: {len(score.parts)}")
        for i, part in enumerate(score.parts):
            print(f"  - Part {i+1}: {type(part)} with id='{part.id}'")

            measures = list(part.getElementsByClass(stream.Measure))
            print(f"    Number of measures: {len(measures)}")
            if measures:
                print(f"    First measure contents: {[type(e) for e in measures[0]]}")

            notes = part.recurse().notes
            note_list = list(notes)[:5]
            print(f"    First 5 notes: {[n.nameWithOctave for n in note_list if n.isNote]}")
            break  # Remove this to inspect all parts
    elif isinstance(score, stream.Part):
        print("Parsed a single Part instead of a full Score.")


def reinspect_from_json(json_path, score_directory):
    with open(json_path, "r", encoding="utf-8") as f:
        summaries = json.load(f)

    score_dir = Path(score_directory)

    for name in summaries.keys():
        matching_files = list(score_dir.glob(f"{name}.*"))
        if not matching_files:
            print(f"\n No file found for {name}")
            continue

        filepath = str(matching_files[0])
        print(f"\nInspecting: {filepath}")
        try:
            score = converter.parse(filepath)
            inspect_music21_object(score)
        except Exception as e:
            print(f"Failed to parse {name}: {e}")

# === Example usage ===
if __name__ == "__main__":
    JSON_PATH = r"C:\Users\charl\Documents\VSCode\Music Analysis\music_summaries.json"
    SCORE_DIR = r"C:\Users\charl\Documents\VSCode\Music Analysis\MEI Sample"

    reinspect_from_json(JSON_PATH, SCORE_DIR)


Inspecting: C:\Users\charl\Documents\VSCode\Music Analysis\MEI Sample\Bach_BWV_0772.mei
Top-level object type: <class 'music21.stream.base.Score'>
Number of parts: 2
  - Part 1: <class 'music21.stream.base.Part'> with id='2368170020592'
    Number of measures: 22
    First measure contents: [<class 'music21.clef.TrebleClef'>, <class 'music21.stream.base.Voice'>]
    First 5 notes: ['C4', 'D4', 'E4', 'F4', 'D4']

Inspecting: C:\Users\charl\Documents\VSCode\Music Analysis\MEI Sample\Bach_BWV_0778.mei
Top-level object type: <class 'music21.stream.base.Score'>
Number of parts: 2
  - Part 1: <class 'music21.stream.base.Part'> with id='2368164670176'
    Number of measures: 23
    First measure contents: [<class 'music21.clef.TrebleClef'>, <class 'music21.key.Key'>, <class 'music21.meter.base.TimeSignature'>, <class 'music21.stream.base.Voice'>]
    First 5 notes: ['B4', 'A4', 'G4', 'F#4', 'G4']

Inspecting: C:\Users\charl\Documents\VSCode\Music Analysis\MEI Sample\Bach_BWV_0786.mei
Top-lev

### Tools

In [72]:

@tool
def get_overall_key(data: dict) -> str:
    """Returns the overall analyzed key from the parsed score dictionary."""
    return data.get("parts", [{}])[0].get("analyzed_key", "Unknown")

@tool
def list_horizontal_intervals_part(data: dict, part_index: int = 0) -> list:
    """Returns the list of melodic intervals for a given part index."""
    try:
        return data["parts"][part_index]["intervals"]
    except (IndexError, KeyError):
        return ["Invalid part index or missing data"]

@tool
def detect_imitation(data: dict) -> str:
    """
    Detects basic melodic imitation by comparing first 10 intervals of each part.
    """
    try:
        p1 = data["parts"][0]["intervals"][:10]
        p2 = data["parts"][1]["intervals"][:10]
        if p1 == p2:
            return "Perfect imitation in first 10 intervals."
        elif any(ng in p2 for ng in p1[:5]):
            return "Partial imitation detected."
        else:
            return "No clear imitation found."
    except Exception as e:
        return f"Error in detection: {e}"
    
@tool
def analyze_vertical_intervals(data: dict) -> list:
    """
    Analyzes vertical harmonic intervals between the first two parts in a duet.
    Returns a list of interval names (e.g. ['P5', 'm6', 'M3']) where both parts have aligned pitches.
    """
    try:
        part1 = data["parts"][0]["pitches"]
        part2 = data["parts"][1]["pitches"]
        min_len = min(len(part1), len(part2))
        
        intervals = []
        for i in range(min_len):
            try:
                p1 = pitch.Pitch(part1[i])
                p2 = pitch.Pitch(part2[i])
                iv = interval.Interval(noteStart=p1, noteEnd=p2)
                intervals.append(iv.name)
            except Exception as e:
                intervals.append(f"error@{i}")
        
        return intervals[:50]  # Limit to first 50 for LLM context length
    except Exception as e:
        return [f"Error processing parts: {e}"]
    

@tool
def detect_cadence_end(data: dict) -> str:
    """
    Detects if the end of a duet contains a cadence-like vertical interval (e.g., P5 -> P8).
    Looks at the final 3 vertical intervals.
    """
    try:
        part1 = data["parts"][0]["pitches"]
        part2 = data["parts"][1]["pitches"]
        min_len = min(len(part1), len(part2))

        if min_len < 3:
            return "Not enough material to check cadence."

        endings = []
        for i in range(min_len - 3, min_len):
            try:
                p1 = pitch.Pitch(part1[i])
                p2 = pitch.Pitch(part2[i])
                iv = interval.Interval(noteStart=p1, noteEnd=p2)
                endings.append(iv.name)
            except:
                endings.append("error")

        if endings[-1] in ["P8", "P1"] and any(e in ["P4", "P5", "m6", "M6"] for e in endings[:-1]):
            return f"Cadence-like ending detected: {endings}"
        return f"No clear cadence. Last intervals: {endings}"
    except Exception as e:
        return f"Error analyzing cadence: {e}"

@tool
def count_notes(data: dict, pitch_with_octave: bool = True) -> str:
    """
    Returns a table of pitch counts using the 'mei_path' field in the score dictionary.
    """
    try:
        filepath = data.get("mei_path")
        if not filepath:
            return "Error: 'mei_path' not found in input."

        score = converter.parse(filepath)
        all_notes = []

        for n in score.recurse().notes:
            if isinstance(n, note.Note):
                pitch = n.nameWithOctave if pitch_with_octave else n.name
                all_notes.append(pitch)

        if not all_notes:
            return "No notes found in the score."

        counts = Counter(all_notes)
        df = pd.DataFrame(counts.items(), columns=["Pitch", "Count"])
        df = df.sort_values("Pitch").reset_index(drop=True)

        return df.to_string(index=False)
    except Exception as e:
        return f"count_notes failed: {e}"


@tool
def find_motif_matches(data: dict, length: int = 4) -> str:
    """
    Looks for repeated interval motifs of a given length in both parts.
    Returns the first matching motif if found.
    """
    try:
        p1_grams = [
            tuple(data["parts"][0]["intervals"][i:i+length])
            for i in range(len(data["parts"][0]["intervals"]) - length + 1)
        ]
        p2_grams = [
            tuple(data["parts"][1]["intervals"][i:i+length])
            for i in range(len(data["parts"][1]["intervals"]) - length + 1)
        ]

        for gram in p1_grams:
            if gram in p2_grams:
                return f"Motif match found: {gram}"
        return "No matching motifs found between parts."
    except Exception as e:
        return f"Error finding motif: {e}"



tools = [get_overall_key,list_horizontal_intervals_part,detect_imitation,analyze_vertical_intervals,detect_cadence_end,count_notes,find_motif_matches,PythonREPLTool()]

In [73]:
import getpass
import os

if not os.environ.get("OPENAI_API_KEY"):
  os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter API key for OpenAI: ")

from langchain.chat_models import init_chat_model

llm = init_chat_model("gpt-4o", model_provider="openai")

llm_with_tools = llm.bind_tools(tools)

In [74]:
with open("music_summaries.json", "r") as f:
    all_scores = json.load(f)

## Queries

In [81]:
template = ChatPromptTemplate([
    ("system", """You are an expert on music analysis. 
You are analyzing two-part scores using tools. 
You are given one score at a time as a dictionary named `data`. 
If you use a tool, you must pass this dictionary as the `data` parameter.

Only use the information provided in `data`. If you have uncertainty, express it."""),
    
    ("human", "Question: {question}\n\nContext (pass to tool as `data`):\n{context}")
])


In [83]:
def ask_llm_tools(question: str, score_name: str):
    prompt = template.invoke(
        {
            "question": question,
            "context": all_scores[score_name]
        }
    )
    response = llm_with_tools.invoke(prompt)
    print("Raw result:", response)
    print("Content:", response.content)
    print("Tool calls:", getattr(response, 'tool_calls', None))
    return response.content

def ask_llm_no_tools(question: str, score_name: str):
    prompt = template.invoke(
        {
            "question": question,
            "context": all_scores[score_name]
        }
    )
    response = llm.invoke(prompt)
    return response.content

### Tabular Data: LLM with tools vs LLM vs Correct response

LLM With Tools:

In [84]:
print(ask_llm_tools("Return tabular data on the following piece of music. That is, return a table of how many of each notes there is throughout the piece.","Bach_BWV_0772"))

Raw result: content='' additional_kwargs={'tool_calls': [{'id': 'call_82j6IjCTgX5Yq5idubYAfucJ', 'function': {'arguments': '{"pitch_with_octave":true}', 'name': 'count_notes'}, 'type': 'function'}], 'refusal': None} response_metadata={'token_usage': {'completion_tokens': 17, 'prompt_tokens': 4574, 'total_tokens': 4591, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_a288987b44', 'id': 'chatcmpl-Bvt32JOeEUW51hsKK4M6naBnxTcmK', 'service_tier': 'default', 'finish_reason': 'tool_calls', 'logprobs': None} id='run--4ca4e01d-837b-41b3-af64-b2110b1848e1-0' tool_calls=[{'name': 'count_notes', 'args': {'pitch_with_octave': True}, 'id': 'call_82j6IjCTgX5Yq5idubYAfucJ', 'type': 'tool_call'}] usage_metadata={'input_tokens': 4574, 'output_tokens': 17, 'total_tokens': 4591, 'input_toke

LLM Without Tools:

In [42]:
print(ask_llm_no_tools("Return tabular data on the following piece of music. That is, return a table of how many of each notes there is throughout the piece.","Bach_BWV_0772"))

Based on the provided context, here is a table displaying how many of each pitch there is throughout the piece "Invention No. 1 in C major" by Johann Sebastian Bach. This data is extracted from the two-parts in the JSON dictionary.

| Pitch | Count |
|-------|-------|
| C2    | 1     |
| G2    | 3     |
| D2    | 1     |
| A2    | 2     |
| G3    | 13    |
| C3    | 9     |
| D3    | 14    |
| E3    | 14    |
| F3    | 4     |
| F#3   | 8     |
| G#3   | 2     |
| A3    | 13    |
| B3    | 17    |
| B-3   | 7     |
| C4    | 16    |
| D4    | 15    |
| E4    | 18    |
| F4    | 16    |
| G4    | 15    |
| A4    | 13    |
| B4    | 19    |
| C5    | 20    |
| C#5   | 2     |
| D5    | 20    |
| E5    | 20    |
| F5    | 19    |
| G#4   | 4     |
| G5    | 14    |
| A5    | 9     |
| G#5   | 1     |
| B5    | 3     |
| C6    | 3     |
| E5    | 14    |

This tabulation sums up the frequency of each pitch across the two parts provided, based on the 'pitches' arrays in the JSON data. The c

Correct Response:

In [39]:
def count_notes(filepath, pitch_with_octave=True):
    """
    Count the number of times each pitch appears in a score.
    
    Args:
        filepath (str): Path to the MEI/MusicXML file.
        pitch_with_octave (bool): If True, include octave (e.g., 'C4'); else use pitch class (e.g., 'C').
    
    Returns:
        pd.DataFrame: A table showing pitch and count.
    """
    score = converter.parse(filepath)
    all_notes = []

    for n in score.recurse().notes:
        if isinstance(n, note.Note):
            if pitch_with_octave:
                all_notes.append(n.nameWithOctave)
            else:
                all_notes.append(n.name)

    counts = Counter(all_notes)
    df = pd.DataFrame(counts.items(), columns=["Pitch", "Count"])
    df = df.sort_values("Pitch").reset_index(drop=True)
    return df

count_notes(r"C:\Users\charl\Documents\VSCode\Music Analysis\MEI Sample\Bach_BWV_0772.mei")

Unnamed: 0,Pitch,Count
0,A2,3
1,A3,21
2,A4,28
3,A5,10
4,B-3,4
5,B-4,4
6,B2,4
7,B3,19
8,B4,26
9,B5,3
