In [1]:
from CTMC import CTMC
import mido
import math

In [2]:
import numpy as np
from mido import MidiFile, MidiTrack, Message, MAX_PITCHWHEEL
import random
from collections import defaultdict, Counter

In [2]:
class DTMC:
    """
    n-th order DTMC implementation specifically for this project; 
    certain extensions made to basic DTMC to handle modulation, midi file idiosyncracies, and song production
    """
    def __init__(self, transition_matrix=None, state_space=None, ngram=3, smoothing=.001):
        self.transition_matrix = transition_matrix if transition_matrix else defaultdict(lambda: defaultdict(int))
        self.state_space = state_space if state_space else set()
        self.time = 0
        self.smoothing = smoothing
        self.ngram = ngram
        self.notes = None
        
    def initialize(self, state=None):
        if state:
            self.current = state
        else:
            self.current = list(self.state_space)[np.random.randint(len(self.state_space))]
        
    def update(self, notes, reset=False):
        self.notes = notes
        notes = ['NOP' for _ in range(self.ngram - 1)] + notes
        prev = 'START'
        
        if reset:
            self.state_space = set()
            self.transition_matrix = defaultdict(lambda: defaultdict(int))

        for i, note in enumerate(notes[:1-self.ngram]):
            curr = tuple([notes[j] for j in range(i, i + self.ngram)])
            self.state_space.add(curr)
            self.transition_matrix[prev][curr] += 1
            prev = curr
            
        for state in self.state_space:
            for other_state in self.state_space:
                if other_state != state and other_state not in self.transition_matrix[state]:
                    self.transition_matrix[state][other_state] = self.smoothing
    
    def step(self):
        self.time += 1
        
        candidates, counts = [], []
        for cand in self.transition_matrix[self.current]:
            candidates.append(cand)
            counts.append(self.transition_matrix[self.current][cand])

        counts = np.array(counts) / np.sum(counts)
        if len(candidates) == 0:
            print('RANDOM SWITCH')
            self.current = list(self.state_space)[np.random.randint(len(self.state_space))]
            return self.current[-1]
        else:
            index = np.random.choice(range(len(candidates)), p=counts)
            self.current = candidates[index]
            return self.current[-1]
    
    def generate_song(self, length=50):
        notes = []
        for _ in range(length):
            note = self.step()
            notes.append(note)
        return notes
    
    def sync_pair(self, other):
        def compute_matches(set1, set2):
            total_size = len(set1) + len(set2)
            overlap_size = len(set1.union(set2))
            return total_size - overlap_size
        
        def modulate(states, b):
            def mod_ngram(ngram, b):
                modified = []
                for elem in ngram:
                    if type(elem) == int:
                        modified.append(elem + b)
                    else:
                        modified.append(elem)
                return tuple(modified)
            return {mod_ngram(state, b) for state in states}
        
        most_matches = 0
        best_b = None
        
        for i in range(-30, 30):
            modulated = modulate(self.state_space, i)
            matches = compute_matches(modulated, other.state_space)
            if matches > most_matches:
                best_b = i
                most_matches = matches
                
        self.notes = [note + best_b for note in self.notes]
        self.update(self.notes, reset=True)

In [1]:
import sys
import time

class CTMC:
    """
    vanilla CTMC implementation
    """
    def __init__(self, Q, pi_zero):
        self.curr_state = np.random.choice(a=np.arange(pi_zero.size), p=pi_zero)
        self.Q = Q

    def step(self):
        rates_to_run = self.Q[self.curr_state]

        times = []
        for rate in rates_to_run:
            if rate <= 0:
                times.append(sys.maxsize)
            else:
                times.append(np.random.exponential(1/rate))

        idx = np.argmin(times)
        next_transition_state = idx
        next_transition_time = times[idx]
        self.curr_state = next_transition_state
        return self.curr_state, next_transition_time


In [4]:
def midi_from_notes(notes, filename='test.mid', timestep=300):
    outfile = MidiFile()
    track = MidiTrack()
    outfile.tracks.append(track)

    track.append(Message('program_change', program=12))

    for i, note in enumerate(notes):
        if note:
            track.append(Message('note_on', note=note, velocity=100, time=0))
            track.append(Message('note_off', note=note, velocity=100, time=timestep))
        else:
            prev = track.pop()
            track.append(Message('note_off', note=prev.note, velocity=100, time=prev.time + timestep))
            
    outfile.save(filename)
    
def notes_to_DTMC(notes, ngram=3):
    dtmc = DTMC(ngram=ngram)
    dtmc.update(notes)
    return dtmc

def notes_from_midi(midifile):
    midi_file = MidiFile(midifile)
    track = midi_file.tracks[0]
    keep_track = []
    for msg in track:
        if not msg.is_meta and msg.type == 'note_on' and msg.velocity > 0:
            keep_track.append(msg.note)
            
    return keep_track

In [5]:
def find_num_shared_ngrams(dtmc1, dtmc2):
    #compare the keys in the DTMC transition matrix; tally up number of the same things
    dtmc1_keys = set(dtmc1.transition_matrix.keys())
    dtmc2_keys = set(dtmc2.transition_matrix.keys())
    return len(dtmc1_keys & dtmc2_keys)

def transitions_from_ngram_counts(sim_ngram_counts):
    #perhaps change this for later
    #for now:
    #setting the correct 'self loop' value
    #multiply entire matrix by single factor such that fastest expected transition time is 4 seconds
    
    fastest_transition = 0
    for i, row in enumerate(sim_ngram_counts):
        row[i] = -sum(row)
        if fastest_transition > row[i]:
            fastest_transition = row[i]
    
    transition_matrix = sim_ngram_counts * (-0.2/fastest_transition)
        
    return transition_matrix
        
    
    

In [6]:
all_dtmcs = []
files = ['witchcraft.mid', 'sevennationarmy.mid']
for file in files:
    notes = notes_from_midi('./midis/' + file)
    dtmc = notes_to_DTMC(notes, 3)
    if len(all_dtmcs) > 0:
        dtmc.sync_pair(all_dtmcs[0])
    all_dtmcs.append(dtmc)

In [7]:
sim_ngram_counts = np.zeros((len(all_dtmcs), len(all_dtmcs)))
for i in range(len(all_dtmcs)):
    for j in range(i, len(all_dtmcs)):
        if j == i:
            sim_ngram_counts[i][j] = 0
        else:
            x = find_num_shared_ngrams(all_dtmcs[i], all_dtmcs[j])
            sim_ngram_counts[i][j] = x
            sim_ngram_counts[j][i] = x

In [8]:
ctmc_transition_matrix = transitions_from_ngram_counts(sim_ngram_counts)

In [9]:
test_ctmc = CTMC(ctmc_transition_matrix, np.array([1,0,0]))

In [36]:
#now, actually generate some midi files 
i = 0
last_state = None
notes = []
while i < 10: #just mash 10 songs 2gether
    song, time = test_ctmc.step()
    current_song_dtmc = all_dtmcs[song]
    
    current_song_dtmc.initialize(last_state)
    
    #assuming 196bpm = 3.2666 notes/second
    num_steps = math.ceil(3.2666*time)
        
    notes.extend(current_song_dtmc.generate_song(num_steps))
    #saves ngram corresponding to last n notes for lookup in next dtmc to step to
    print(song, time)
    last_state = tuple(notes[-3:])
    i+=1

1 1.575216750231989
RANDOM SWITCH
0 4.057168479383952
RANDOM SWITCH
1 9.20795490685298
0 4.838455128963014
RANDOM SWITCH
1 5.742336550169821
RANDOM SWITCH
0 7.70105154515946
RANDOM SWITCH
1 1.7799885415733776
RANDOM SWITCH
0 0.33752406047257605
RANDOM SWITCH
1 11.60684878368205
RANDOM SWITCH
0 0.3138885991922355


In [11]:
midi_from_notes(notes, filename='switches.mid')