***
*Project:* Deep Active Inference

*Author:* Jingwei Liu (Music department, UC San Diego)
***

# <span style="background-color:darkorange; color:white; padding:2px 6px">Tutorial</span> 


# 12 Tone First Species Counterpoint

### Generating Rules:

Rules for melodic lines:
1. No note may be followed by the same note
2. Each note may appear a maximum of 3 times in length of 10
3. No more than 2 leaps larger than a 4th (>= 5) in length of 10
4. Leaps larger than a 3rd (>=5) should be followed by a change of direction and a step-wise motion (1 or 2)

Numerical restiction:
5. Cantus range: G2 - C5 (43 - 72); Counterpoint range:  G3 - C6 (55 - 84)
6. Valid melodic intervals: m2(1), M2(2), m3(3), M3(4), P4(5), P5(7), m6(8), P8(12)
7. Valid harmonic intervals: m3(3), M3(4), P5(7), m6(8), M6(9), P8(12)

Rules for harmonic counters:
8. Max of 3 of the same harmonic interval in length of 10
9. A perfect interval (P5 or P8) must be approached with countary motion and at least one of the voices moving by step

In [1]:
import numpy as np
import pandas as pd
import csv
from scamp import *
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import Dense,Flatten,Softmax,Input
import keras.backend as K 

Using TensorFlow backend.


In [2]:
df = pd.read_csv('Species.csv', sep=",",index_col = 0)
print(df)

      Cantus  Counter
0         58       67
1         66       69
2         64       71
3         68       76
4         66       74
...      ...      ...
9995      64       72
9996      56       60
9997      57       61
9998      61       65
9999      56       64

[10000 rows x 2 columns]


In [3]:
n = df.shape[0]
n

10000

In [4]:
X = tf.Variable(np.zeros((2,n*2)),dtype='int32')  # any data tensor
for i in range(0,2*n,2):
    X[0,i].assign(df['Cantus'][i/2] - 43)
    X[1,i].assign(0)
    X[0,i+1].assign(df['Counter'][i/2] - 55)
    X[1,i+1].assign(1)
X

<tf.Variable 'Variable:0' shape=(2, 20000) dtype=int32, numpy=
array([[15, 12, 23, ..., 10, 13,  9],
       [ 0,  1,  0, ...,  1,  0,  1]])>

In [5]:
depth = 30
X = tf.concat([tf.one_hot(X[0,:], depth),tf.dtypes.cast(tf.reshape(X[1,:],[20000,1]),tf.float32)],1)
X

<tf.Tensor: shape=(20000, 31), dtype=float32, numpy=
array([[0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 1.],
       [0., 0., 0., ..., 0., 0., 0.],
       ...,
       [0., 0., 0., ..., 0., 0., 1.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 1.]], dtype=float32)>

In [6]:
train_dataset = tf.data.Dataset.from_tensor_slices(X[:16000,:])
valid_dataset = tf.data.Dataset.from_tensor_slices(X[16000:19000,:])
test_dataset = tf.data.Dataset.from_tensor_slices(X[19000:,:])

In [7]:
n_steps = 20
batch_size = 18
window_length = n_steps
dataset = test_dataset.window(window_length, shift=2, drop_remainder=True)
dataset = dataset.flat_map(lambda window: window.batch(window_length))
shuffle_dataset = dataset.shuffle(20000).batch(batch_size,drop_remainder=True)

In [8]:
def creat_model(batch_size,Type):
    
    rnn_units = 12
    n_notes = 30

    input_A = keras.layers.Input(shape=(None,31), name="input_A") # (15, 20, 31)
    input_B = keras.layers.Input(shape=(1,31), name="input_B") # (15, 1, 31)

    hidden1 = keras.layers.LSTM(rnn_units, return_sequences=True)(input_A) # (15,20,12)
    hidden2 = keras.layers.LSTM(rnn_units, return_sequences=True)(hidden1) #(15,20,12)
    
    output_RNN = keras.layers.Dense(n_notes, activation='softmax', name = 'output_RNN')(hidden2) #(15,20,30)

    e = keras.layers.Dense(1, activation='tanh')(hidden2) #(15,20,1)
    e = keras.layers.Reshape([-1])(e) #(15,20)

    alpha = keras.layers.Activation('softmax')(e) #(15,20)
    c = keras.layers.Permute([2, 1])(keras.layers.RepeatVector(rnn_units)(alpha)) #(15,20,12)
    c = keras.layers.Multiply()([hidden2, c]) #(15,20,12)
    c = keras.layers.Lambda(lambda xin: K.sum(xin, axis=1), output_shape=(rnn_units,))(c) #(15,12)

    output_A = keras.layers.Dense(n_notes, activation = 'softmax', name = 'output_A')(c) #(15,30) (0, cantus)
    reshape = tf.reshape(output_A,[-1,1,30]) # (15, 1, 30)
    if Type == "_0_1":
        zero = tf.reshape(tf.zeros(batch_size),[batch_size,1,1])
        input_output_A = tf.concat((reshape,zero),2) # (15, 1, 31)  (0, cantus)
    elif Type == "_1_0":
        one = tf.reshape(tf.ones(batch_size),[batch_size,1,1])
        input_output_A = tf.concat((reshape,one),2) # (15, 1, 31)  (0, cantus)

    aux1 = keras.layers.SimpleRNN(rnn_units)(input_B, initial_state=c) #(15,12)
    aux2 = keras.layers.SimpleRNN(rnn_units)(input_output_A, initial_state=c) #(15,12)
    output_B1 = keras.layers.Dense(n_notes, activation = 'softmax', name = 'output_B1')(aux1) #(15,30) (1, counter)
    output_B2 = keras.layers.Dense(n_notes, activation = 'softmax', name = 'output_B2')(aux2) #(15,30) (1, counter)

    model = keras.models.Model([input_A, input_B], [output_RNN, output_A, output_B1, output_B2])
    
    aux_model1 = keras.models.Model([input_A, input_B], output_B1)
    aux_model2 = keras.models.Model(input_A, [output_A, output_B2])
    
    sub_model1 = keras.models.Model(input_A, output_A)
    sub_model2 = keras.models.Model(input_A, output_B2)
    
    att_model = keras.models.Model([input_A, input_B], alpha)
    
    return model,aux_model1,aux_model2,sub_model1,sub_model2,att_model

In [9]:
model_0_1_,aux_model1_0_1_,aux_model2_0_1_, sub_model1_0_1_,sub_model2_0_1_,att_model_0_1_ = creat_model(batch_size,'_0_1')
model_1_0_,aux_model1_1_0_,aux_model2_1_0_, sub_model1_1_0_,sub_model2_1_0_,att_model_1_0_ = creat_model(batch_size,'_1_0')

In [10]:
model_0_1_.summary()

Model: "functional_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_A (InputLayer)            [(None, None, 31)]   0                                            
__________________________________________________________________________________________________
lstm (LSTM)                     (None, None, 12)     2112        input_A[0][0]                    
__________________________________________________________________________________________________
lstm_1 (LSTM)                   (None, None, 12)     1200        lstm[0][0]                       
__________________________________________________________________________________________________
dense (Dense)                   (None, None, 1)      13          lstm_1[0][0]                     
_______________________________________________________________________________________

In [11]:
model_0_1_.load_weights('model_0_1_weight.h5')
model_1_0_.load_weights('model_1_0_weight.h5')

In [14]:
seq_length = 10

#generate_mode = "max"
generate_mode = "random"


mode = "first meet"
#mode = "brain 1"
#mode = "brain 2"
#mode = "brain 1_inside"
#mode = "brain 2_inside"


for window in shuffle_dataset.take(1):
    sequence = window
    initial = window
    
    pred_0_1_1 = np.zeros((batch_size,seq_length))
    pred_0_1_0 = np.zeros((batch_size,seq_length))
    pred2_0_1_1 = np.zeros((batch_size,seq_length))
    
    pred_1_0_0 = np.zeros((batch_size,seq_length))
    pred_1_0_1 = np.zeros((batch_size,seq_length))
    pred2_1_0_0 = np.zeros((batch_size,seq_length))
    
    obv_weight = np.zeros((batch_size,seq_length))
    act_weight = np.zeros((batch_size,seq_length))

    for s in range(seq_length):

        [pred_o,a] = aux_model2_0_1_.predict(initial) #(18,30)

        for i in range(batch_size):
            if generate_mode == "max":
                pred_0_1_0[i,s] = np.where(pred_o[i,:] == np.max(pred_o[i,:]))[0][0]
                pred_0_1_1[i,s] = np.where(a[i,:] == np.max(a[i,:]))[0][0]
            elif generate_mode == "random":
                pred_0_1_0[i,s] = np.random.choice(30, 1, p=pred_o[i,:])[0]
                pred_0_1_1[i,s] = np.random.choice(30, 1, p=a[i,:])[0]


        [pred_a,o] = aux_model2_1_0_.predict(initial) #(18,30)
        
        for i in range(batch_size):
            if generate_mode == "max":
                pred_1_0_1[i,s] = np.where(pred_a[i,:] == np.max(pred_a[i,:]))[0][0]
                pred_1_0_0[i,s] = np.where(o[i,:] == np.max(o[i,:]))[0][0]
            elif generate_mode == "random":
                pred_1_0_1[i,s] = np.random.choice(30, 1, p=pred_a[i,:])[0]
                pred_1_0_0[i,s] = np.random.choice(30, 1, p=o[i,:])[0]
            
            index = int(pred_1_0_0[i,s])
            obv_weight[i,s] = pred_o[i,index]
            
            index = int(pred_0_1_1[i,s])
            act_weight[i,s] = pred_a[i,index]
            
        
        one_hot = tf.one_hot(pred_0_1_0[:,s], 30) #(18,30)
        reshape = tf.reshape(one_hot,[batch_size,1,30])
        zero = tf.reshape(tf.zeros(batch_size),[batch_size,1,1])
        out_0_1_0 = tf.concat((reshape,zero),2) # (18, 1, 31)  (1, counter)
        
        one_hot = tf.one_hot(pred_0_1_1[:,s], 30) #(18,30)
        reshape = tf.reshape(one_hot,[batch_size,1,30])
        one = tf.reshape(tf.ones(batch_size),[batch_size,1,1])
        out_0_1_1 = tf.concat((reshape,one),2) # (18, 1, 31)  (1, counter)
        
        one_hot = tf.one_hot(pred_1_0_1[:,s], 30) #(18,30)
        reshape = tf.reshape(one_hot,[batch_size,1,30])
        one = tf.reshape(tf.ones(batch_size),[batch_size,1,1])
        out_1_0_1 = tf.concat((reshape,one),2) # (18, 1, 31)  (1, counter)
        
        one_hot = tf.one_hot(pred_1_0_0[:,s], 30) #(18,30)
        reshape = tf.reshape(one_hot,[batch_size,1,30])
        zero = tf.reshape(tf.zeros(batch_size),[batch_size,1,1])
        out_1_0_0 = tf.concat((reshape,zero),2) # (18, 1, 31)  (1, counter)

        
        a2 = aux_model1_0_1_.predict([initial,out_0_1_0])
        o2 = aux_model1_1_0_.predict([initial,out_1_0_1])
        
        for i in range(batch_size):
            if generate_mode == "max":
                pred2_0_1_1[i,s] = np.where(a2[i,:] == np.max(a2[i,:]))[0][0]
                pred2_1_0_0[i,s] = np.where(o2[i,:] == np.max(o2[i,:]))[0][0]
            elif generate_mode == "random":
                pred2_0_1_1[i,s] = np.random.choice(30, 1, p=a2[i,:])[0]
                pred2_1_0_0[i,s] = np.random.choice(30, 1, p=o2[i,:])[0]

        one_hot = tf.one_hot(pred2_0_1_1[:,s], 30) #(18,30)
        reshape = tf.reshape(one_hot,[batch_size,1,30])
        one = tf.reshape(tf.ones(batch_size),[batch_size,1,1])
        out2_0_1_1 = tf.concat((reshape,one),2) # (18, 1, 31)  (1, counter)
        
        one_hot = tf.one_hot(pred2_1_0_0[:,s], 30) #(18,30)
        reshape = tf.reshape(one_hot,[batch_size,1,30])
        zero = tf.reshape(tf.zeros(batch_size),[batch_size,1,1])
        out2_1_0_0 = tf.concat((reshape,zero),2) # (18, 1, 31)  (1, counter)
        

        if mode == "first meet":
            sequence = tf.concat((sequence,out_1_0_0),1)
            sequence = tf.concat((sequence,out_0_1_1),1)
        elif mode == "brain 1":
            sequence = tf.concat((sequence,out_0_1_0),1)
            sequence = tf.concat((sequence,out2_0_1_1),1)
        elif mode == "brain 2":
            sequence = tf.concat((sequence,out2_1_0_0),1)
            sequence = tf.concat((sequence,out_1_0_1),1)
        elif mode == "brain 1_inside":
            sequence = tf.concat((sequence,out_0_1_0),1)
            sequence = tf.concat((sequence,out_0_1_1),1)
        elif mode == "brain 2_inside":
            sequence = tf.concat((sequence,out_1_0_0),1)
            sequence = tf.concat((sequence,out_1_0_1),1)

            
        initial = sequence[:,-20:,:]
    if mode == "first meet":    
        print('collective_action:',pred_0_1_1)
        print('collective_observation:',pred_1_0_0)
        print('')
        
        step = [];
        for j in range(seq_length):
            step.append("step" + str(j))
        step.append("sequence match")

        sample = [];
        for i in range(batch_size):
            sample.append("sequence" + str(i))
        sample.append("step match")

        table = [];
        for i in range(batch_size):
            seq = [];
            for j in range(seq_length):
                seq.append(str(int(pred_0_1_1[i,j])) + "/(" + '%.2f'%(act_weight[i,j]) + ")")
            match_a = np.sum(act_weight[i,:])/seq_length
            seq.append('%.2f'%(match_a))
            table.append(seq)

        last_row = [];
        for j in range(seq_length):
            match_a = np.sum(act_weight[:,j])/batch_size
            last_row.append('%.2f'%(match_a))
        total_a = np.sum(act_weight)/(seq_length*batch_size)
        last_row.append('%.2f'%(total_a))
        table.append(last_row)

        format_row = "{:>11}" * (seq_length + 2)
        print(format_row.format("act/(pred_act_weight)", *step))
        for team, row in zip(sample, table):
            print(format_row.format(team, *row))

        print('')

        table2 = [];
        for i in range(batch_size):
            seq = [];
            for j in range(seq_length):
                seq.append(str(int(pred_1_0_0[i,j])) + "/(" + '%.2f'%(obv_weight[i,j]) + ")")
            match_o = np.sum(obv_weight[i,:])/seq_length
            seq.append('%.2f'%(match_o))
            table2.append(seq)

        last_row = [];
        for j in range(seq_length):
            match_o = np.sum(obv_weight[:,j])/batch_size
            last_row.append('%.2f'%(match_o))
        total_o = np.sum(obv_weight)/(seq_length*batch_size)
        last_row.append('%.2f'%(total_o))
        table2.append(last_row)

        format_row = "{:>11}" * (seq_length + 2)
        print(format_row.format("obv/(pred_obv_weight)", *step))
        for team, row in zip(sample, table2):
            print(format_row.format(team, *row))
    elif mode == "brain 1":
        print('musician1_action:',pred2_0_1_1)
        print('musician1_observation:',pred_0_1_0)
    elif mode == "brain 2":
        print('musician2_action:',pred_1_0_1)
        print('musician2_observation:',pred2_1_0_0)
    elif mode == "brain 1_inside":
        print('musician1_action_pred:',pred_0_1_1)
        print('musician1_observation_pred:',pred_0_1_0)
    elif mode == "brain 2_inside":
        print('musician2_action_pred:',pred_1_0_1)
        print('musician2_observation_pred:',pred_1_0_0)
    print('')
    
    

collective_action: [[24. 26. 23. 20. 15. 22. 16. 13. 13. 17.]
 [ 9.  8. 15.  7.  9. 11. 23. 13. 17. 11.]
 [23. 18. 18. 23. 24. 27. 20. 22. 18. 21.]
 [ 0. 11. 14. 12. 14. 18. 15. 11. 16. 16.]
 [15. 18.  9. 12. 13. 14.  9. 14. 14. 11.]
 [13.  9. 14.  7.  6.  6. 12. 11.  8. 15.]
 [13. 22. 12.  9. 17. 15. 18. 17. 21. 23.]
 [12. 16.  4.  3. 10.  9.  8.  4.  0.  4.]
 [13.  5.  7.  3.  0.  5.  6.  8.  6.  9.]
 [ 9.  7.  3. 10. 11.  8.  7. 13.  8.  9.]
 [ 4.  2.  3.  8. 15.  7. 17. 16. 11. 18.]
 [18. 16. 15. 10.  7.  8. 14. 14. 15. 13.]
 [ 3.  0.  9.  1.  1.  5.  9.  0.  6.  7.]
 [18. 16. 24. 17. 17. 15. 17. 18. 16. 15.]
 [ 2.  5.  5.  7.  6.  1.  2. 11. 13. 12.]
 [17.  4.  8. 15. 13. 10. 15.  9.  9. 14.]
 [16. 17. 11. 10. 12. 15. 12. 15. 17. 21.]
 [ 3. 10. 12. 11. 17. 14. 14. 20. 25. 10.]]
collective_observation: [[26. 27. 27. 28. 25. 22. 18. 17. 13. 19.]
 [13. 15. 11. 16. 19. 22. 17. 20. 15. 15.]
 [22. 19. 25. 29. 26. 22. 24. 26. 29. 23.]
 [17. 15. 18. 17. 19. 21. 24. 23. 23. 27.]
 [20. 22. 

In [15]:
sample =7
music = tf.reshape(sequence[sample,:,:],[-1,31])
#music = music[-20:,:]
row,col = music.shape
cantus_lst = [];
counter_lst = [];
for i in range(row):
    clas = np.where(music[i,:-1] == 1)[0][0]
    if music[i,-1] == 0:
        cantus_lst.append(clas+43)
    elif music[i,-1] == 1:
        counter_lst.append(clas+55)

In [16]:
cantus_lst

[51,
 55,
 63,
 61,
 49,
 51,
 52,
 56,
 52,
 51,
 48,
 52,
 52,
 56,
 58,
 56,
 57,
 54,
 58,
 56]

In [17]:
counter_lst

[63,
 59,
 66,
 65,
 58,
 59,
 60,
 59,
 56,
 58,
 67,
 71,
 59,
 58,
 65,
 64,
 63,
 59,
 55,
 59]

In [18]:
np.array(counter_lst) - np.array(cantus_lst) # real harmonic

array([12,  4,  3,  4,  9,  8,  8,  3,  4,  7, 19, 19,  7,  2,  7,  8,  6,
        5, -3,  3], dtype=int64)

In [19]:
np.diff(counter_lst)

array([ -4,   7,  -1,  -7,   1,   1,  -1,  -3,   2,   9,   4, -12,  -1,
         7,  -1,  -1,  -4,  -4,   4], dtype=int64)

In [20]:
np.diff(cantus_lst)

array([  4,   8,  -2, -12,   2,   1,   4,  -4,  -1,  -3,   4,   0,   4,
         2,  -2,   1,  -3,   4,  -2], dtype=int64)

In [22]:
s = Session()
s.tempo = 200
piano1 = s.new_part("piano")
piano2 = s.new_part("piano")

def cantus():
    for i in cantus_lst:
        piano2.play_note(i,1,4)
        
def counter():
    for i in counter_lst:
        piano1.play_note(i,1,4)
        
        
s.fast_forward_to_beat(100)

s.start_transcribing()
s.fork(counter)
s.fork(cantus)
s.wait_for_children_to_finish()
performance = s.stop_transcribing()
performance.to_score(title = "First Species Counterpoint", composer = "My programme",time_signature = "4/4").show_xml()

Using preset Piano Merlin for piano
Using preset Piano Merlin for piano
