# Packages

In [None]:
import tensorflow as tf
import numpy as np

# Base setting

In [None]:
year = ['2004','2006','2008','2009','2011','2013','2014','2015','2017','2018']
BATCH= 10
max_sequence_length = 128
vocab_size = 128
drop_prob = 0.1
num_layers=  5
num_heads = 8
ffn_hidden = 1024
d_model = 128

In [None]:
def make_sort_indx(x,batch_size):
    '''
    sort tensor
    [[0,x,0],[1,x,0],....,[m-1,x,n],[m,x,n]]
    to
    [[0,x,0],[0,x,1],...[m,x,n-1],[m,x,n]]
    '''
    t1 = tf.range(batch_size)
    t2 = tf.range(d_model)

    g1,g2 = tf.meshgrid(t1,t2)

    r = tf.stack([g1,tf.fill(tf.shape(g1),x),g2],axis=-1)

    r_flat = tf.reshape(r,[-1,3])

    indices = []
    for i in range(batch_size):
        indices = indices + [k for k in range(i,batch_size*d_model,batch_size)]
    # print('r_flat',r_flat)
    results = tf.gather(r_flat,indices)
    # print('results',results)
    results = results.numpy()
    results = results.tolist()
    return results

def indices_dict(batch_size):

    '''
    依照[m,x,n] 中 x 的不同給予不同的index
    '''
    index_list = list()
    for i in range(max_sequence_length):
        index_list.append(make_sort_indx(i,batch_size))

    return np.array(index_list)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Data Preprocessing

In [None]:
from google.colab import files

In [None]:
sequences = []

In [None]:
# 載入事先準備好的npy檔
for y in year:
  data = np.load(f'./drive/MyDrive/music_note_dataset/note_{y}.npy')
  sequences.append(data)

In [None]:
sequences[0]

array([[71, 71, 55, ..., 76, 50, 66],
       [67, 64, 54, ..., 74, 55, 73],
       [74, 57, 76, ..., 73, 74, 55],
       ...,
       [92, 92, 53, ..., 53, 47, 55],
       [79, 53, 86, ..., 43, 36, 48],
       [67, 43, 36, ..., 53, 43, 53]], dtype=int32)

In [None]:
[i.shape for i in sequences]

[(12674, 128),
 (13454, 128),
 (8870, 128),
 (12260, 128),
 (9480, 128),
 (8472, 128),
 (12930, 128),
 (8264, 128),
 (9374, 128),
 (13596, 128)]

In [None]:
X = np.concatenate(tuple(i for i in sequences),axis=0)

In [None]:
X[0]

array([71, 71, 55, 71, 59, 55, 59, 62, 62, 72, 71, 67, 72, 57, 74, 67, 72,
       74, 74, 72, 72, 67, 74, 67, 66, 66, 72, 57, 71, 64, 71, 72, 72, 74,
       59, 64, 62, 66, 62, 67, 66, 59, 74, 71, 71, 74, 78, 74, 59, 59, 79,
       78, 76, 60, 79, 60, 76, 79, 74, 79, 59, 74, 79, 59, 72, 79, 57, 72,
       71, 71, 72, 66, 57, 67, 81, 72, 66, 72, 55, 81, 72, 71, 71, 69, 59,
       59, 69, 67, 55, 71, 62, 67, 54, 62, 54, 67, 52, 67, 52, 50, 71, 50,
       49, 64, 76, 64, 76, 69, 52, 52, 69, 76, 57, 67, 76, 49, 67, 66, 66,
       49, 67, 49, 76, 67, 67, 76, 50, 66], dtype=int32)

In [None]:
X.shape

(109374, 128)

In [None]:
X = X.reshape((X.shape[0],X.shape[1]))
X = tf.constant(X)

# Generator

In [None]:
import math
import random
from tensorflow.keras import Model,Sequential,activations
from tensorflow.keras.layers import Dense,Dropout,Input

### PositionalEncoding

In [None]:
class PositionalEncoding(tf.keras.layers.Layer):

  def __init__(self,d_model,max_sequence_length):
    super().__init__()
    self.max_sequence_length = max_sequence_length
    self.d_model = d_model

    self.weights_var = self.add_weight(
      shape=(max_sequence_length,d_model),
      initializer='glorot_uniform',
      trainable=True,
      name='positional_encoding_weights'
    )

  def call(self):
    even_i = tf.range(start=0,limit=self.d_model,delta=2,dtype=tf.float32)
    denominator = tf.pow(10000.0,even_i/self.d_model)
    # denominator = tf
    position = tf.reshape(tf.range(self.max_sequence_length,dtype=tf.float32),[self.max_sequence_length,1])
    even_PE = tf.math.sin(position/denominator)
    odd_PE = tf.math.cos(position/denominator)
    stacked = tf.stack([even_PE,odd_PE],axis=2)
    # print(stacked)
    PE = tf.reshape(stacked, [stacked.shape[0],stacked.shape[1]*stacked.shape[2]])
    # print(PE)
    PE = tf.cast(PE,dtype=tf.float32)
    return PE * self.weights_var

### Embedding

In [None]:
class SequenceEmbedding(tf.keras.layers.Layer):

  def __init__(self,max_sequence_length,d_model,lan_to_index,START_TOKEN=None,END_TOKEN=None,PADDING_TOKEN=None):
    super().__init__()
    time_of_set = 4
    # self.vocab_size = len(lan_to_index)
    self.max_sequence_length = max_sequence_length
    self.embedding = TemporalEmbedding(max_sequence_length=max_sequence_length,d_model=d_model)
    self.lan_to_index = lan_to_index
    self.position_encoder =PositionalEncoding(d_model,max_sequence_length)
    self.dropout = Dropout(0.1)
    self.START_TOKEN = START_TOKEN
    self.END_TOKEN = END_TOKEN
    self.PADDING_TOKEN = PADDING_TOKEN


  def call(self,x,batch_size): # sentence
    # print('se_input',x)
    x = self.embedding(x,batch_size)
    # print('se0',x)
    pos = self.position_encoder.call()
    # print('se1',pos)
    x = self.dropout(x+pos)
    # print('se2',x)
    # x = tf.reshape(x,[x.shape[0],x.shape[1]])# 從二維轉成三維(1,128,512)
    return x

# def get_temporal_embeddings(position, d_model):
#     angle_rates = 1 / np.power(10000, (2 * (np.arange(d_model) // 2)) / np.float32(d_model))
#     return tf.constant(position * angle_rates,dtype=tf.float32)
def get_temporal_embeddings(position, d_model):
    angle_rates = 1 / tf.pow(10000.0, (2.0 * (tf.range(d_model, dtype=tf.float32) // 2.0) / tf.cast(d_model, dtype=tf.float32)))
    return position * angle_rates

class TemporalEmbedding(tf.keras.layers.Layer):

  def __init__(self,max_sequence_length,d_model):
    super(TemporalEmbedding,self).__init__()

    self.d_model = d_model
    self.max_sequence_length = max_sequence_length
    # print(f'TemporalEmbedding max_seq_len: {self.max_sequence_length},d_model: {self.d_model}')
    # 設定trainable
    # self.trainable = False
    self.temporal_weights = self.add_weight(
      shape=(d_model,max_sequence_length ),
      initializer='glorot_uniform',
      trainable=True,
      name='temporal_weights'
      )
  def call(self,inputs,batch_size):
    # inputs: N x d_model(every input sequence length)
    # output: N x max_seq_length x d_model
    inputs = tf.cast(inputs,dtype=tf.float32)
    # print(np.arange(self.max_sequence_length))
    # print(inputs.shape[0])

    # position = np.array([np.arange(self.max_sequence_length)[:, np.newaxis] for _ in range(batch_size)])
    bs  = tf.shape(inputs)[0]
    # print(np.arange(self.max_sequence_length))
    # print(inputs.shape[0])
    # 需要將position 改為使用Tensor做成

    # position = np.array([np.arange(self.max_sequence_length)[:, np.newaxis] for _ in range(batch_size)])
    # 定義 while_loop 的條件函數
    positions = tf.zeros([max_sequence_length, 0, 1], dtype=tf.float32)
    def condition(i, positions):
        return i < bs

    # 定義 while_loop 的主體函數
    def body(i, positions):
        # 在這裡進行每個迴圈的操作
        r = tf.range(max_sequence_length, dtype=tf.float32)
        r = tf.expand_dims(r, axis=-1)
        positions = tf.concat([positions, tf.expand_dims(r, axis=1)], axis=1)
        return i + 1, positions

    # 使用 tf.while_loop 進行迴圈
    i = tf.constant(0)
    _, positions = tf.while_loop(condition, body, [i, positions],shape_invariants=[i.get_shape(), tf.TensorShape([None, None, 1])])
    # 轉換形狀
    positions = tf.reshape(positions, [bs, max_sequence_length, 1])
    # print('position size:',position.shape)
    d_model = inputs.shape[-1]
    # print('d_model:',d_model)
    time_embedding = get_temporal_embeddings(positions, d_model)
    # print('time_embedding size:',time_embedding.shape)
    inputs = inputs[:,tf.newaxis,:]
    tf.TensorShape([time_embedding.shape[0]]).concatenate(tf.TensorShape(inputs.shape[1:]))
    # print(inputs.shape)
    # print(time_embedding.shape)
    combined_embedding = inputs + time_embedding
    # print('combined_embedding size:',combined_embedding.shape)
    return combined_embedding * self.temporal_weights # n*max_seq*d_model

### Multihead Attention 層


In [None]:
def scaled_dot_product(q,k,v,mask=None):
    # q,k,b = 30 x 8 x 128 x 64
    d_k = q.shape[-1] # 64
    # 計算scaled
    num_dimensions = tf.rank(k)# 獲取 tensor 的維度數
    perm = tf.concat([tf.range(num_dimensions - 2), tf.range(num_dimensions - 1, num_dimensions - 3, -1)], axis=0)
    scaled = tf.matmul(q,tf.transpose(k,perm=perm))/math.sqrt(d_k) # 30 x 8 x 128 x 128

    if mask:
        # masking for decoder
        mask_ = tf.fill(scaled.shape,float('-inf'))
        mask_ = tf.experimental.numpy.triu(mask_,k=1) # k=1 對角線右邊一個
        scaled += mask_ # 30 x 8 x 128 x 128
    attention = tf.nn.softmax(scaled,axis=-1) # 30 x 8 x 128 x 128
    values = tf.matmul(attention,v) # 30 x 8 x 200 x 64
    return values,attention

In [None]:

class MultiHeadAttention(tf.keras.layers.Layer):
  def __init__(self,d_model,num_heads):
    super().__init__()
    self.d_model = d_model # 512
    self.num_heads = num_heads # 8
    self.head_dim = d_model // num_heads # e.g. 512 // 8 = 64
    self.qkv_layer = Dense(3*d_model,input_shape=(d_model,)) # 512 x 1536
    self.linear_layer = Dense(d_model,input_shape=(d_model,)) # 512 x 512
    # self.batch_size = batch_size
    # 設定trainable
    self.trainable = False

  def call(self,x,mask=None): # mask 另外弄

    _,sequence_length,d_model = x.shape # 30 x 128 x 512
    batch_size = tf.shape(x)[0]
    # print(f"x shape: {x.shape}")

    qkv = self.qkv_layer(x) # 30 x 128 x 1536
    # print(f"qkv shape: {qkv.shape}")

    qkv = tf.reshape(qkv,[batch_size,sequence_length,self.num_heads,3*self.head_dim]) # 30 x 128 x 8 x 192(64*3)
    # print(f"qkv shape: {qkv.shape}")
    qkv = tf.transpose(qkv,perm=[0,2,1,3]) # 30 x 8 x 128 x 192
    # print(f"qkv shape: {qkv.shape}")
    q,k,v, = tf.split(qkv,3,axis=-1) # each are 30 x 8 x 128 x 64
    # print(f"q shape: {q.shape} || k shape: {k.shape} || v shape: {v.shape}")
    values,attention = scaled_dot_product(q,k,v,mask) # attention = 30 x 8 x 128 x 128, values = 30 x 8 x 128 x 64
#         print(f"values shape: {values.shape} || attention shape: {attention.shape}")
    values = tf.reshape(values,[batch_size,sequence_length,self.num_heads*self.head_dim]) # 30 x 128 x 512
#         print(f"values shape: {values.shape}")
    out = self.linear_layer(values)
#         print(f"out shape: {out.shape}")
    return out

In [None]:
class MultiHeadCrossAttention(tf.keras.layers.Layer):
  def __init__(self,d_model,num_heads):
    super().__init__()
    self.d_model = d_model # 512
    self.num_heads = num_heads # 8
    self.head_dim = d_model // num_heads # e.g. 512 // 8 = 64
    self.kv_layer = Dense(2*d_model,input_shape=(d_model,)) # 512 x 1024
    self.q_layer = Dense(d_model,input_shape=(d_model,)) # 512 x 512
    self.linear_layer = Dense(d_model,input_shape=(d_model,)) # 512 x 512
    # 設定trainable
    self.trainable = False
    # self.batch_size = batch_size

  def call(self,x,y,mask=None): # mask 另外弄

    _,sequence_length,d_model = x.shape # 30 x 128 x 512
    batch_size = tf.shape(x)[0]
#         print(f"x shape: {x.shape}")
    kv = self.kv_layer(x) # 30 x 128 x 1024
#         print(f"qkv shape: {qkv.shape}")
    q =  self.q_layer(y) # 30 x 128 x 512
    kv = tf.reshape(kv,[batch_size,sequence_length,self.num_heads,2*self.head_dim]) # 30 x 128 x 8 x 128
    q = tf.reshape(q,[batch_size,sequence_length,self.num_heads,self.head_dim]) # 30 x 128 x 8 x 64
#         print(f"qkv shape: {qkv.shape}")
    kv = tf.transpose(kv,perm=[0,2,1,3]) # 30 x 8 x 128 x 128
    q = tf.transpose(q,perm=[0,2,1,3]) # 30 x 8 x 128 x 64
#         print(f"qkv shape: {qkv.shape}")
    k,v = tf.split(kv,2,axis=-1) # k: 30 x 8 x 128 x 64 v: 30 x 8 x 128 x 64
#         print(f"q shape: {q.shape} || k shape: {k.shape} || v shape: {v.shape}")
    values,attention = scaled_dot_product(q,k,v,mask) # attention = 30 x 8 x 128 x 128, values = 30 x 8 x 128 x 64
#         print(f"values shape: {values.shape} || attention shape: {attention.shape}")
    values = tf.reshape(values,[batch_size,sequence_length,d_model]) # 30 x 128 x 512
#         print(f"values shape: {values.shape}")
    out = self.linear_layer(values) # 30 x 128 x 512
#         print(f"out shape: {out.shape}")
    return out

### Layer Normalization

In [None]:
class LayerNorm(tf.keras.layers.Layer):
  def __init__(self,parameter_shape,eps=1e-5):
    super().__init__()
    self.parameter_shape = parameter_shape # [512]
    self.eps = eps
    self.gamma = tf.Variable(tf.ones(parameter_shape)) # [512]
    self.beta = tf.Variable(tf.zeros(parameter_shape)) # [512]

  def call(self,x):
    # input = 30 x 200 x 512
    dims = [-(i+1) for i in range(len(self.parameter_shape))] # [-1]
    mean = tf.reduce_mean(x,axis=dims,keepdims=True) # 30 x 128 x 1
#         print(f"Mean \n ({mean.shape}): \n {mean}")
    var = tf.reduce_mean(((x-mean)**2),axis=dims,keepdims=True) # 30 x 128 x 1
    std = tf.math.sqrt(var+self.eps) # 30 x 128 x 1
#         print(f"Standard Deviation \n ({std.shape}): \n {std}")
    y = (x - mean) / std # 30 x 128 x 512
#         print(f"y \n ({y.shape}) = \n {y}")
    out = self.gamma * y + self.beta # # 30 x 128 x 512
#         print(f"out \n ({out.shape}) = \n {out}")
    return out

### Feed Forward (Fully Connected Network)

In [None]:
class PositionwiseFeedForward(tf.keras.layers.Layer):
  def __init__(self,d_model,hidden,drop_prob=0.1):

    super(PositionwiseFeedForward,self).__init__()
    self.linear1 = Dense(hidden,input_shape=(d_model,)) # 512 x 2048
    self.linear2 = Dense(d_model,input_shape=(hidden,)) # 2048 x 512
    self.relu = tf.keras.layers.ReLU() #
    self.dropout = Dropout(drop_prob)
    # 設定trainable
    self.trainable = False

  def call(self,x):
    # input = 30 x 128 x 512
    x = self.linear1(x) # 30 x 128 x 2048
  #         print(f"x \n {x.shape}")
    x = self.relu(x) # 30 x 128 x 2048
  #         print(f"x \n {x.shape}")
    x = self.dropout(x) # 30 x 128 x 2048
  #         print(f"x \n {x.shape}")
    x = self.linear2(x) # 30 x 128 x 512
  #         print(f"x \n {x.shape}")
    return x

## Encoder

### Encoder Block(Layer)

In [None]:
class EncoderLayer(tf.keras.layers.Layer):

  def __init__(self,d_model,ffn_hidden,num_heads,drop_prob):

    super(EncoderLayer,self).__init__()
    self.attention = MultiHeadAttention(d_model=d_model,num_heads=num_heads)
    self.norm1 = LayerNorm(parameter_shape=[d_model])
    self.dropout1 = Dropout(drop_prob)
    self.ffn = PositionwiseFeedForward(d_model=d_model,hidden=ffn_hidden,drop_prob=drop_prob )
    self.norm2 = LayerNorm(parameter_shape=[d_model])
    self.dropout2 = Dropout(drop_prob)
    # 設定trainable
    self.trainable = False
  def call(self,x,mask):
    residual_x = x # 30 x 128 x 512
    x = self.attention(x,mask=mask) # 30 x 128 x 512
    x = self.dropout1(x) # 30 x 128 x 512
    x = self.norm1(x+residual_x) # 30 x 128 x 512
    residual_x = x # 30 x 128 x 512
    x = self.ffn(x) # 30 x 128 x 512
    x = self.dropout2(x) # 30 x 128 x 512
    x= self.norm2(x+residual_x) # 30 x 128 x 512

    return x

In [None]:
# Encoder
class Encoder(tf.keras.layers.Layer):
  def __init__(self,d_model,ffn_hidden,num_heads,drop_prob,num_layers,
                max_sequence_length,lan_to_index=None,START_TOKEN=None,END_TOKEN=None,PADDING_TOKEN=None):
    super().__init__()
    self.sequence_embedding = SequenceEmbedding(max_sequence_length,d_model,lan_to_index,START_TOKEN,END_TOKEN,PADDING_TOKEN)
    self.layers = Sequential()
    for _ in range(num_layers):
        self.layers.add(EncoderLayer(d_model,ffn_hidden,num_heads,drop_prob))

    # 設定trainable
    self.trainable = False

  def call(self,x,mask,batch_size):
    x = self.sequence_embedding(x,batch_size)
    # print(x)
    x = self.layers(x,mask)
    return x

## Decoder
由於GAN中只關注身為Generator的Transformer的產出，不需要使用到Decoder的部分，所以在Transformer中也沒有加入Decoder，但為了完整起見，還是將code呈現上來

### Decoder Block(Layer)

In [None]:
class DecoderLayer(tf.keras.layers.Layer):

  def __init__(self,d_model,ffn_hidden,num_heads,drop_prob):

    super(DecoderLayer,self).__init__()
    self.self_attention = MultiHeadAttention(d_model=d_model,num_heads=num_heads)
    self.norm1 = LayerNorm(parameter_shape=[d_model])
    self.dropout1 = Dropout(drop_prob)
    # cross attention here
    self.encoder_decoder_attention =  MultiHeadCrossAttention(d_model=d_model,num_heads=num_heads)
    self.norm2 = LayerNorm(parameter_shape=[d_model])
    self.dropout2 = Dropout(drop_prob)
    self.ffn = PositionwiseFeedForward(d_model=d_model,hidden=ffn_hidden,drop_prob=drop_prob)
    self.norm3 = LayerNorm(parameter_shape=[d_model])
    self.dropout3 = Dropout(drop_prob)
    # 設定trainable
    self.trainable = False

  def call(self,x,y,mask=None,cross_mask=None):
        # print('y as input of layer',y)
    _y = y # for residual # 30 x 128 x 512
    y = self.self_attention(y,mask=mask) # 30 x 128 x 512
    y = self.dropout1(y) # 30 x 128 x 512
    y = self.norm1(y+_y) # 30 x 128 x 512

    _y = y # for residual # 30 x 128 x 512
    y = self.ffn(y)
    # Cross attention
    y = self.encoder_decoder_attention(x,y,mask=cross_mask)
    y = self.dropout2(y)
    y = self.norm2(y+_y)
    # print('y in layers',y)
    return y

In [None]:
# 為了自定義input (X,Y) 所以需要自訂Seuqential
class SequentialDecoder(Sequential):
  def call(self,*inputs):
    x,y,mask,cross_mask = inputs
    for layer in self.layers:
        y = layer(x,y,mask,cross_mask) # 30 x 128 x 512

    return y

In [None]:
# Decoder
class Decoder(tf.keras.layers.Layer):

  def __init__(self,d_model,ffn_hidden,num_heads,drop_prob,num_layers,
              max_sequence_length,lan_to_index=None,START_TOKEN=None,END_TOKEN=None,PADDING_TOKEN=None):

    super().__init__()
    self.sequence_embedding = SequenceEmbedding(max_sequence_length,d_model,lan_to_index,START_TOKEN,END_TOKEN,PADDING_TOKEN)
    self.layers  = SequentialDecoder()
    for _ in range(num_layers):
      self.layers.add(DecoderLayer(d_model,ffn_hidden,num_heads,drop_prob))
    # 設定trainable
    self.trainable = False


  def call(self,x,y,mask,cross_mask,AT_table,batch_):
    # AT_table is for autoregressive loop 的時候 做tensor_scatter_nd_add() 的indices 使用的
    # x: 30 x 128 x 512
    # y: 30 x 128 x 512cab_size
    # mask: 128 x 128
    batch_size = tf.shape(y)[0]
    y = self.sequence_embedding(y,batch_)

    num = int(y.shape[1])
    for i in range(num):

      # print('y delta:',y[:,:i+1,:])
      att_output = self.layers(x[:,:i+1,:],y[:,:i+1,:],mask,cross_mask)
      # print('att_output:',att_output)
      att_output = tf.reshape(att_output,[-1])
      # print('y',y)

      # 因為有特別製作indx_table, 這樣就不用在網路反覆運算一樣且可以重複使用的東西
      # temp = tf.constant(indx_table[str(i)])[:,:2]
      # indices 也要 跟著y[:,:i+1,:]增加
      indices = AT_table[:i+1,:,:]
      indices = tf.reshape(indices,[indices.shape[0]*indices.shape[1],indices.shape[2]])
      # print('now in loop:',i)
      # print('scatter y:',y)
      # print('scatter indices:',indices)
      # print('scatter att_output:',att_output)
      y = tf.tensor_scatter_nd_add(y,indices,att_output)
      # indices = tf.range(tf.shape[y][1])
      # y[:,i,:] += att_output[:,-1,:]

    # print(y)
    return y# 30 x 128 x 512

## Transformer

In [None]:
# 沒有 Decoder
class TransformerEncoderOnly(Model):

  def __init__(self,d_model,ffn_hidden,num_heads,drop_prob,num_layers,
              max_sequence_length,vocab_size,origin_to_index=None,transform_to_index=None,
                START_TOKEN=None,END_TOKEN=None,PADDING_TOKEN=None):

    super().__init__()
    self.start = START_TOKEN
    self.padding = PADDING_TOKEN
    self.end = END_TOKEN
    self.transform_to_indx = transform_to_index
    self.vocab_size = vocab_size
    self.max_seq_len = max_sequence_length
    self.input_layer = Input(shape=(BATCH,max_sequence_length))
    self.encoder = Encoder(d_model,ffn_hidden,num_heads,drop_prob,num_layers,max_sequence_length,origin_to_index,START_TOKEN,END_TOKEN,PADDING_TOKEN)
    self.decoder = Decoder(d_model,ffn_hidden,num_heads,drop_prob,num_layers,max_sequence_length,transform_to_index,START_TOKEN,END_TOKEN,PADDING_TOKEN)
    self.linear = Dense(vocab_size,input_shape=(d_model,))
    # 設定trainable
    self.trainable = False

  def call(self,inputs,batch_=1,encoder_mask=None,decoder_mask=None,cross_mask=None,): # x, y are batch of sentence

    # 準備好輸出
    output = []

    # 生成一個空的值(with start)
    x = inputs

    batch_size = tf.shape(x)[0]


    print(x.shape)
    x = tf.reshape(x,[batch_size,x.shape[1]])
    x = tf.cast(x,dtype=tf.float32)

    out = self.encoder(x,encoder_mask,batch_)

    # print('decoder output',out)
    out = self.linear(out)
    # print('linear',out)
    out = tf.argmax(out,axis=2)
    # print('output',out)
    return out

In [None]:
# 有 Decoder
class Transformer(Model):

  def __init__(self,d_model,ffn_hidden,num_heads,drop_prob,num_layers,
              max_sequence_length,vocab_size,origin_to_index,transform_to_index,
                START_TOKEN,END_TOKEN,PADDING_TOKEN):

    super().__init__()
    self.start = START_TOKEN
    self.padding = PADDING_TOKEN
    self.end = END_TOKEN
    self.transform_to_indx = transform_to_index
    self.vocab_size = vocab_size
    self.max_seq_len = max_sequence_length
    self.encoder = Encoder(d_model,ffn_hidden,num_heads,drop_prob,num_layers,max_sequence_length,origin_to_index,START_TOKEN,END_TOKEN,PADDING_TOKEN)
    self.decoder = Decoder(d_model,ffn_hidden,num_heads,drop_prob,num_layers,max_sequence_length,transform_to_index,START_TOKEN,END_TOKEN,PADDING_TOKEN)
    self.linear = Dense(vocab_size,input_shape=(d_model,))
    # 設定trainable
    self.trainable = False

  def call(self,inputs,y,AT_table,batch_,encoder_mask=None,decoder_mask=None,cross_mask=None,): # x, y are batch of sentence

    # 準備好輸出
    output = []

    # 生成一個空的值(with start)
    # y = [random.randint(0,131) for _ in range(self.max_seq_len)]

    x = inputs

    batch_size = tf.shape(x)[0]
    # print(batch_size)
    # print(y_)
    # y_ = tf.constant(y_,dtype=tf.float32)


    y = tf.reshape(y,[batch_size,y.shape[1]])
    y = tf.cast(y,dtype=tf.float32)
    # print(y)

    # print('y shape at Decoder input',y.shape)
    # y = tf.one_hot(y,self.vocab_size)
    # y = tf.reshape(y,[1,y.shape[0],y.shape[1]])
    # print('X for encoder:',x)
    x = self.encoder(x,encoder_mask,batch_)


    # print(self.max_seq_len,'/',self.delta**(-1))


    out = self.decoder(x,y,decoder_mask,cross_mask,AT_table,batch_)
    # print('decoder output',out)
    out = self.linear(out)
    # print('linear',out)
    out = tf.argmax(out,axis=2)
    # print('output',out)
    return out

# Discriminator

In [None]:
from tensorflow.keras.layers import InputLayer,LSTM,Dropout,LeakyReLU

In [None]:
class Discriminator(Model):

  def __init__(self,input_shape):
    super(Discriminator, self).__init__()
    self.model = tf.keras.Sequential([
        InputLayer(input_shape=input_shape),
        LSTM(256,input_shape=input_shape),
        Dropout(0.2),
        LeakyReLU(0.2),
        Dense(1, activation='sigmoid')
    ])

  def call(self, inputs, training):
    return self.model(inputs)

# GAN

In [None]:
clip_value = 1.0

In [None]:
class TransformerGAN(Model):

  def __init__(self,generator,discriminator,*args,**kwargs):
    # Pass through arg and kwargs to base class
    super().__init__(*args,**kwargs)

    # Create attribute for gen and disc
    self.generator = generator
    self.discriminator = discriminator
    self.table  = indices_dict(BATCH)
  def compile(self,g_opt,d_opt,g_loss,d_loss,*args,**kwargs):
    # Compile with base class
    super().compile(*args,**kwargs)

    # Create attribute for Losses and optimizers
    self.g_opt = g_opt
    self.d_opt = d_opt
    self.g_loss = g_loss
    self.d_loss = d_loss
    self.g_loss_series = []
    self.d_loss_series = []

  def train_step(self,data):
    batch,label = data
    batch = tf.cast(batch,dtype=tf.float32)

    batch_size = tf.shape(batch)[0]
    noise = tf.random.normal(shape=(batch_size,tf.shape(batch)[1]))

    with tf.GradientTape() as d_tape:

      # print('batch:',batch)
      # print('noise',noise)

      # generated_music = self.generator(batch,noise,self.table,BATCH,training=False)
      # encoder only
      # noise 跟 batch 結合

      inputs = batch + noise
      generated_music = self.generator(inputs,BATCH,training=False)
      # print(generated_music)
      # print(generated_music.shape)
      # 符合LSTM 的dim
      y_real = tf.reshape(label,[batch_size,1,label.shape[1]])
      generated_music = tf.reshape(generated_music,[batch_size,1,batch.shape[1]])

      yhat_real = self.discriminator(y_real,training=True)
      yhat_fake = self.discriminator(generated_music,training=True)


      # predict
      # 比對時原始資料跟generator 生成的資料的shape不一致
      yhat_realfake = tf.concat([yhat_real,yhat_fake],axis=0)
      # Create labels real and fake images
      # actual label
      y_realfake = tf.concat([tf.zeros_like(yhat_real),tf.ones_like(yhat_fake)],axis=0)
      # Add some noise to the TRUE output
      noise_real = 0.5*tf.random.uniform(tf.shape(yhat_real))
      noise_fake = -0.5*tf.random.uniform(tf.shape(yhat_fake))
      y_realfake += tf.concat([noise_real,noise_fake],axis=0)

      total_d_loss = self.d_loss(y_realfake,yhat_realfake)

      # Apply backpropagation - nn learn
      dgrad = d_tape.gradient(total_d_loss,self.discriminator.trainable_variables)

    self.d_opt.apply_gradients(zip(dgrad,self.discriminator.trainable_variables))

    with tf.GradientTape() as g_tape:
      # Generate some new
      tf.random.set_seed(100)

      noise = tf.random.normal(shape=(tf.shape(batch)[0],tf.shape(batch)[1]))
      # print('noise for gen training:',noise)
      # gen_music = self.generator(batch,noise,self.table,BATCH,training=True)
      # encoder only
      # noise 跟 batch結合
      inputs = batch + noise
      gen_music = self.generator(inputs,BATCH,training=True)
      gen_music = tf.reshape(gen_music,[batch_size,1,gen_music.shape[1]])
      # Create the predicted labels
      predicted_labels = self.discriminator(gen_music,training=False)
      # Calculate loss - trick to training to fake out the discriminator
      total_g_loss = self.g_loss(tf.zeros_like(predicted_labels),predicted_labels)
      ggrad = g_tape.gradient(total_g_loss,self.generator.trainable_variables)

      # 應用梯度之前進行梯度裁剪
      ggrad = tf.clip_by_global_norm(ggrad, clip_value)
    self.g_opt.apply_gradients(zip(ggrad,self.generator.trainable_variables))

    with tf.GradientTape() as g_tape:
      # Generate some new
      tf.random.set_seed(100)

      noise = tf.random.normal(shape=(tf.shape(batch)[0],tf.shape(batch)[1]))
      # print('noise for gen training:',noise)
      # gen_music = self.generator(batch,noise,self.table,BATCH,training=True)
      # encoder only
      # noise 跟 batch結合
      inputs = batch + noise
      gen_music = self.generator(inputs,BATCH,training=True)
      gen_music = tf.reshape(gen_music,[batch_size,1,gen_music.shape[1]])
      # Create the predicted labels
      predicted_labels = self.discriminator(gen_music,training=False)
      # Calculate loss - trick to training to fake out the discriminator
      total_g_loss = self.g_loss(tf.zeros_like(predicted_labels),predicted_labels)
      ggrad = g_tape.gradient(total_g_loss,self.generator.trainable_variables)

    self.g_opt.apply_gradients(zip(ggrad,self.generator.trainable_variables))

    return {"d_loss": total_d_loss,"g_loss": total_g_loss}

# Training

In [None]:
import random
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import BinaryCrossentropy,MeanSquaredError

In [None]:
epochs_ = 300
batchSize = 4
# 弄一個梯度裁剪避免梯度爆炸
# 梯度裁剪閥值
clip_value = 1.0
# clipvalue=clip_value

In [None]:
generator = TransformerEncoderOnly(d_model,ffn_hidden,num_heads,drop_prob,num_layers,max_sequence_length,
                        vocab_size)
discriminator = Discriminator(input_shape=(1,max_sequence_length))

In [None]:
g_opt = Adam(learning_rate=1e-4)
d_opt = Adam(learning_rate=1e-4)
# g_loss = BinaryCrossentropy()
g_loss = MeanSquaredError()
d_loss = BinaryCrossentropy()

In [None]:
gan = TransformerGAN(generator,discriminator)
gan.compile(g_opt,d_opt,g_loss,d_loss)

In [None]:
# print('inputs for GAN:',X)
with tf.device('/device:GPU:0'):
  hist = gan.fit(X,X,batch_size=BATCH,epochs=epochs_,verbose=1)

Epoch 1/300
(None, 128)
(None, 128)
(None, 128)
(None, 128)
(None, 128)
(None, 128)
Epoch 2/300
Epoch 3/300
Epoch 4/300
Epoch 5/300
Epoch 6/300
Epoch 7/300
Epoch 8/300
Epoch 9/300
Epoch 10/300
Epoch 11/300
Epoch 12/300
Epoch 13/300
Epoch 14/300
Epoch 15/300
Epoch 16/300
Epoch 17/300
Epoch 18/300
Epoch 19/300
Epoch 20/300
Epoch 21/300
Epoch 22/300
Epoch 23/300
Epoch 24/300
Epoch 25/300
Epoch 26/300
Epoch 27/300
Epoch 28/300
Epoch 29/300
Epoch 30/300
Epoch 31/300
Epoch 32/300
Epoch 33/300
Epoch 34/300
Epoch 35/300
Epoch 36/300
Epoch 37/300
Epoch 38/300
Epoch 39/300
Epoch 40/300
Epoch 41/300
Epoch 42/300
Epoch 43/300
Epoch 44/300
Epoch 45/300
Epoch 46/300
Epoch 47/300
Epoch 48/300
Epoch 49/300
Epoch 50/300
Epoch 51/300
Epoch 52/300
Epoch 53/300
Epoch 54/300
Epoch 55/300
Epoch 56/300
Epoch 57/300
Epoch 58/300
Epoch 59/300
Epoch 60/300
Epoch 61/300
Epoch 62/300
Epoch 63/300
Epoch 64/300
Epoch 65/300
Epoch 66/300
Epoch 67/300
Epoch 68/300
Epoch 69/300
Epoch 70/300
Epoch 71/300
Epoch 72/300
E

In [None]:
# 儲存weights
generator.save_weights('./drive/MyDrive/model_weights/generator_weights',save_format='tf')

discriminator.save_weights('./drive/MyDrive/model_weights/discriminator_weights',save_format='tf')

# Predict

### Load weights

In [None]:
generator = TransformerEncoderOnly(d_model,ffn_hidden,num_heads,drop_prob,num_layers,max_sequence_length,
                        vocab_size,note_to_indx,note_to_indx,START_TOKEN,END_TOKEN,PADDING_TOKEN,delta_)
discriminator = Discriminator(input_shape=(1,max_sequence_length))

# 載入生成器和鑑別器的權重
generator.load_weights('./drive/MyDrive/model_weights/generator_weights')
discriminator.load_weights('./drive/MyDrive/model_weights/discriminator_weights')

# 創建新的 GAN 模型
predict_gan = TransformerGAN(generator,discriminator)

# 梯度裁剪閥值
clip_value = 1.0

g_opt = Adam(learning_rate=1e-6,clipvalue=clip_value)
d_opt = Adam(learning_rate=1e-6)
g_loss = MeanSquaredError()
d_loss = BinaryCrossentropy()
predict_gan = TransformerGAN(generator,discriminator)

predict_gan.compile(g_opt,d_opt,g_loss,d_loss)

Create a sequence e.g. [0,0,0,...,0]

In [None]:
start = tf.zeros([1,128])


Predict

In [None]:
out = gan.generator(start,batch_=1)
out = out.numpy()

Run this if you don't have mido

In [None]:
!pip install mido

In [None]:
from mido import MidiFile , MidiTrack, Message

Create a class generate midi with the output

In [None]:
class Midi_():

  def __init__(self):
    self.mid = MidiFile()
    self.track = MidiTrack()

  def play_part(self,note, len_ ,note_bias=0,vel=1,delay=0,change=False,double=False):
  # 每個節拍的時間長度

    temple = 60*60*10/75
    # 大調，參考別人的做法的，我也不是很懂樂理
    major_notes = [0,2,2,1,2,2,2,1]
    # C4 - 正中間的 DO(60)
    base_note = note
    # print(base_note)
    bias = random.randint(-1,1)
    vel = round(64*vel)
    # delay = random.random()
    t_start = round(delay*temple)
    t_end =  round(temple*len_)

    # if base_note < 128 and base_note > 0:
        # base_note = base_note+bias


    if not double:
      self.track.append(Message("note_on",  note=base_note,velocity=vel,time=t_start))
      self.track.append(Message("note_off", note=base_note,velocity=vel,time=t_end))
    if change:
      self.track.append(Message("control_change",channel=0,control=64,value=64,time=t_start))
      self.track.append(Message("control_change",channel=0,control=64,value=0,time=t_end))
    if double:
      self.track.append(Message("program_change", channel=1,program=41 ,time = t_start))
      self.track.append(Message("note_on",channel=1, note=base_note,velocity=vel,time = t_start))
      self.track.append(Message("note_off",channel=1, note=base_note,velocity=vel,time = t_end))
      self.track.append(Message("program_change", channel=1,program=0 ,time=t_end))
    # return self.track

  def make_file(self,notes,name='new_song.mid'):

    for n in notes[0]:
      self.play_part(int(n),0.5)

    self.mid.tracks.append(self.track)

    self.mid.save(name)

In [None]:
midi_encode = Midi_()
midi_encode.make_file(notes=out,name = f'./drive/MyDrive/music_note_dataset/test16_2018_100.mid')