In [1]:
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple, Iterator
from enum import Enum
import asyncio
from pathlib import Path
import numpy as np
from collections import defaultdict, deque
import mido
import time

# Core Domain Entities

@dataclass
class Note:
    """Represents a musical note with timing information"""
    pitch: int  # MIDI note number (0-127)
    velocity: int  # Note velocity (0-127)
    start_time: float  # Absolute time when note starts
    duration: float  # How long the note plays
    channel: int = 0

@dataclass
class Rest:
    """Represents silence between notes"""
    start_time: float
    duration: float

@dataclass
class Track:
    """A sequence of musical events in a single track"""
    name: str
    notes: List[Note] = field(default_factory=list)
    rests: List[Rest] = field(default_factory=list)
    ticks_per_beat: int = 480
    tempo: int = 500000  # microseconds per beat

@dataclass
class TransitionTable:
    """Markov chain transition probabilities"""
    transitions: Dict[str, Dict[str, float]] = field(default_factory=dict)
    kemeny_constant: Optional[float] = None
    entropy: Optional[float] = None

# MIDI Analysis Classes

class MidiEventExtractor:
    """Extracts musical events from MIDI data"""
    
    def __init__(self):
        self.active_notes: Dict[int, float] = {}  # note -> start_time
        self.last_event_time = 0.0
        
    def extract_track(self, midi_track, ticks_per_beat: int = 480) -> Track:
        """Extract Notes and Rests from a MIDI track"""
        track = Track(name=getattr(midi_track, 'name', 'Untitled'))
        track.ticks_per_beat = ticks_per_beat
        
        current_time = 0.0
        last_note_end = 0.0
        
        for message in midi_track:
            current_time += self._ticks_to_seconds(message.time, ticks_per_beat)
            
            if message.type == 'note_on' and message.velocity > 0:
                # Check for rest before this note
                if current_time > last_note_end and len(track.notes) > 0:
                    rest_duration = current_time - last_note_end
                    track.rests.append(Rest(last_note_end, rest_duration))
                
                self.active_notes[message.note] = current_time
                
            elif message.type in ['note_off', 'note_on'] and message.velocity == 0:
                if message.note in self.active_notes:
                    start_time = self.active_notes.pop(message.note)
                    duration = current_time - start_time
                    
                    note = Note(
                        pitch=message.note,
                        velocity=getattr(message, 'velocity', 64),
                        start_time=start_time,
                        duration=duration,
                        channel=message.channel
                    )
                    track.notes.append(note)
                    last_note_end = max(last_note_end, current_time)
        
        return track
    
    def _ticks_to_seconds(self, ticks: int, ticks_per_beat: int, 
                         tempo: int = 500000) -> float:
        """Convert MIDI ticks to seconds"""
        return (ticks / ticks_per_beat) * (tempo / 1_000_000)

class MarkovChainBuilder:
    """Builds Markov chains from musical sequences"""
    
    def build_note_chain(self, notes: List[Note]) -> TransitionTable:
        """Build transition table for note sequences"""
        transitions = defaultdict(lambda: defaultdict(int))
        
        for i in range(len(notes) - 1):
            current = str(notes[i].pitch)
            next_note = str(notes[i + 1].pitch)
            transitions[current][next_note] += 1
        
        # Convert counts to probabilities
        return self._normalize_transitions(transitions)
    
    def build_duration_chain(self, durations: List[float]) -> TransitionTable:
        """Build transition table for note/rest durations"""
        # Quantize durations to make them discrete
        quantized = [self._quantize_duration(d) for d in durations]
        
        transitions = defaultdict(lambda: defaultdict(int))
        for i in range(len(quantized) - 1):
            current = str(quantized[i])
            next_dur = str(quantized[i + 1])
            transitions[current][next_dur] += 1
            
        return self._normalize_transitions(transitions)
    
    def build_content_chain(self, track: Track) -> TransitionTable:
        """Build chain including both notes and rests"""
        # Merge notes and rests into chronological sequence
        events = []
        for note in track.notes:
            events.append(('note', str(note.pitch), note.start_time))
        for rest in track.rests:
            events.append(('rest', 'rest', rest.start_time))
        
        # Sort by time
        events.sort(key=lambda x: x[2])
        
        transitions = defaultdict(lambda: defaultdict(int))
        for i in range(len(events) - 1):
            current = events[i][1]  # note pitch or 'rest'
            next_event = events[i + 1][1]
            transitions[current][next_event] += 1
            
        return self._normalize_transitions(transitions)
    
    def _quantize_duration(self, duration: float, 
                          quantization: float = 0.1) -> float:
        """Quantize duration to discrete values"""
        return round(duration / quantization) * quantization
    
    def _normalize_transitions(self, transitions: Dict) -> TransitionTable:
        """Convert transition counts to probabilities"""
        table = TransitionTable()
        
        for state, next_states in transitions.items():
            total = sum(next_states.values())
            table.transitions[state] = {
                next_state: count / total 
                for next_state, count in next_states.items()
            }
        
        # Calculate Kemeny constant and entropy
        table.kemeny_constant = self._calculate_kemeny_constant(table)
        table.entropy = self._calculate_entropy(table)
        
        return table
    
    def _calculate_kemeny_constant(self, table: TransitionTable) -> float:
        """Calculate Kemeny constant (simplified version)"""
        # This is a placeholder - full calculation requires matrix operations
        return len(table.transitions)
    
    def _calculate_entropy(self, table: TransitionTable) -> float:
        """Calculate Shannon entropy of transition table"""
        total_entropy = 0.0
        for state, transitions in table.transitions.items():
            state_entropy = 0.0
            for prob in transitions.values():
                if prob > 0:
                    state_entropy -= prob * np.log2(prob)
            total_entropy += state_entropy
        return total_entropy / len(table.transitions) if table.transitions else 0.0

class EntropyAnalyzer:
    """Analyzes entropy in sliding windows"""
    
    def __init__(self, window_size: int = 10):
        self.window_size = window_size
    
    def analyze_track_entropy(self, track: Track) -> List[Tuple[float, float]]:
        """Returns list of (entropy, timestamp) pairs"""
        # Create sequence of events with timestamps
        events = []
        for note in track.notes:
            events.append((str(note.pitch), note.start_time))
        for rest in track.rests:
            events.append(('rest', rest.start_time))
        
        events.sort(key=lambda x: x[1])  # Sort by time
        
        entropies = []
        for i in range(len(events) - self.window_size + 1):
            window_events = [e[0] for e in events[i:i + self.window_size]]
            entropy = self._calculate_window_entropy(window_events)
            timestamp = events[i][1]
            entropies.append((entropy, timestamp))
        
        return entropies
    
    def _calculate_window_entropy(self, events: List[str]) -> float:
        """Calculate entropy for a window of events"""
        from collections import Counter
        counts = Counter(events)
        total = len(events)
        
        entropy = 0.0
        for count in counts.values():
            prob = count / total
            entropy -= prob * np.log2(prob)
        
        return entropy

class MidiComposer:
    """Generates new MIDI compositions from Markov chains"""
    
    def __init__(self):
        self.rng = np.random.default_rng()
    
    def generate_track(self, note_chain: TransitionTable, 
                      duration_chain: TransitionTable,
                      rest_chain: TransitionTable,
                      length: int = 100) -> Track:
        """Generate a new track using the transition tables"""
        track = Track(name="Generated")
        current_time = 0.0
        
        # Start with most common note
        current_note = self._get_most_common_state(note_chain)
        
        for _ in range(length):
            # Generate note
            if current_note != 'rest':
                duration = self._sample_from_chain(duration_chain, "0.5")  # default
                note = Note(
                    pitch=int(current_note),
                    velocity=80,
                    start_time=current_time,
                    duration=float(duration)
                )
                track.notes.append(note)
                current_time += float(duration)
            
            # Generate rest
            rest_duration = self._sample_from_chain(rest_chain, "0.1")
            if float(rest_duration) > 0:
                track.rests.append(Rest(current_time, float(rest_duration)))
                current_time += float(rest_duration)
            
            # Get next note
            current_note = self._sample_from_chain(note_chain, current_note)
        
        return track
    
    def _get_most_common_state(self, chain: TransitionTable) -> str:
        """Get the most common starting state"""
        if not chain.transitions:
            return "60"  # Middle C
        return max(chain.transitions.keys(), 
                  key=lambda k: len(chain.transitions[k]))
    
    def _sample_from_chain(self, chain: TransitionTable, 
                          current_state: str) -> str:
        """Sample next state from transition probabilities"""
        if current_state not in chain.transitions:
            return current_state
        
        transitions = chain.transitions[current_state]
        states = list(transitions.keys())
        probabilities = list(transitions.values())
        
        return self.rng.choice(states, p=probabilities)

class MidiInterface:
    """Handles MIDI I/O operations"""
    
    def __init__(self):
        self.input_port: Optional[mido.ports.BaseInput] = None
        self.output_port: Optional[mido.ports.BaseOutput] = None
    
    def list_ports(self) -> Tuple[List[str], List[str]]:
        """List available MIDI input and output ports"""
        return mido.get_input_names(), mido.get_output_names()
    
    def open_ports(self, input_name: Optional[str] = None, 
                   output_name: Optional[str] = None):
        """Open MIDI ports for I/O"""
        try:
            if input_name:
                self.input_port = mido.open_input(input_name)
            if output_name:
                self.output_port = mido.open_output(output_name)
        except Exception as e:
            print(f"Failed to open MIDI ports: {e}")
    
    def load_midi_file(self, filepath: Path) -> List[Track]:
        """Load MIDI file and extract tracks"""
        midi_file = mido.MidiFile(filepath)
        extractor = MidiEventExtractor()
        
        tracks = []
        for midi_track in midi_file.tracks:
            track = extractor.extract_track(midi_track, midi_file.ticks_per_beat)
            if track.notes:  # Only include tracks with notes
                tracks.append(track)
        
        return tracks
    
    def save_track_to_midi(self, track: Track, filepath: Path):
        """Save a Track object to MIDI file"""
        midi_file = mido.MidiFile(ticks_per_beat=track.ticks_per_beat)
        midi_track = mido.MidiTrack()
        midi_file.tracks.append(midi_track)
        
        # Convert Track back to MIDI messages
        events = []
        
        # Add note events
        for note in track.notes:
            events.append((note.start_time, 'note_on', note))
            events.append((note.start_time + note.duration, 'note_off', note))
        
        # Sort by time
        events.sort(key=lambda x: x[0])
        
        current_time = 0.0
        for event_time, event_type, note in events:
            delta_time = int((event_time - current_time) * track.ticks_per_beat)
            current_time = event_time
            
            if event_type == 'note_on':
                msg = mido.Message('note_on', channel=note.channel,
                                 note=note.pitch, velocity=note.velocity,
                                 time=delta_time)
            else:
                msg = mido.Message('note_off', channel=note.channel,
                                 note=note.pitch, velocity=0,
                                 time=delta_time)
            
            midi_track.append(msg)
        
        midi_file.save(filepath)

# Main Application Class

class MarkovMidiGenerator:
    """Main application orchestrating all components"""
    
    def __init__(self):
        self.midi_interface = MidiInterface()
        self.chain_builder = MarkovChainBuilder()
        self.entropy_analyzer = EntropyAnalyzer()
        self.composer = MidiComposer()
        
        # Storage for learned patterns
        self.tracks: List[Track] = []
        self.note_chains: Dict[str, TransitionTable] = {}
        self.duration_chains: Dict[str, TransitionTable] = {}
        self.content_chains: Dict[str, TransitionTable] = {}
    
    def analyze_midi_file(self, filepath: Path) -> Dict[str, any]:
        """Analyze a MIDI file and build all Markov chains"""
        self.tracks = self.midi_interface.load_midi_file(filepath)
        
        results = {}
        for i, track in enumerate(self.tracks):
            track_name = f"track_{i}"
            
            # Build chains
            note_chain = self.chain_builder.build_note_chain(track.notes)
            duration_chain = self.chain_builder.build_duration_chain(
                [n.duration for n in track.notes]
            )
            content_chain = self.chain_builder.build_content_chain(track)
            
            # Store chains
            self.note_chains[track_name] = note_chain
            self.duration_chains[track_name] = duration_chain
            self.content_chains[track_name] = content_chain
            
            # Analyze entropy
            entropy_analysis = self.entropy_analyzer.analyze_track_entropy(track)
            
            results[track_name] = {
                'notes': len(track.notes),
                'rests': len(track.rests),
                'kemeny_constant': note_chain.kemeny_constant,
                'entropy': note_chain.entropy,
                'entropy_timeline': entropy_analysis
            }
        
        return results
    
    def generate_composition(self, track_name: str, length: int = 100) -> Track:
        """Generate new composition based on learned patterns"""
        if track_name not in self.note_chains:
            raise ValueError(f"No learned patterns for track: {track_name}")
        
        note_chain = self.note_chains[track_name]
        duration_chain = self.duration_chains[track_name]
        content_chain = self.content_chains[track_name]
        
        # Use content chain for rest patterns
        return self.composer.generate_track(
            note_chain, duration_chain, content_chain, length
        )

In [2]:
# Test cell - add this to your notebook
def test_basic_functionality():
    """Quick test of the main classes"""
    
    # Test creating a simple note
    note = Note(pitch=60, velocity=80, start_time=0.0, duration=1.0)
    print(f"✅ Created note: {note}")
    
    # Test creating a track
    track = Track(name="Test Track")
    track.notes.append(note)
    print(f"✅ Created track with {len(track.notes)} notes")
    
    # Test MIDI interface
    midi_interface = MidiInterface()
    inputs, outputs = midi_interface.list_ports()
    print(f"✅ Found {len(outputs)} MIDI output ports")
    
    return True

# Run the test
test_basic_functionality()

✅ Created note: Note(pitch=60, velocity=80, start_time=0.0, duration=1.0, channel=0)
✅ Created track with 1 notes
✅ Found 0 MIDI output ports


True