# **1. Install Tensorflow**

In [None]:
import tensorflow as tf

# Download and import MIT 6.S191 package
!pip install mitdeeplearning
import mitdeeplearning as mdl

import numpy as np
import matplotlib.pyplot as plt

# **2. Why is TensorFlow called TensorFlow**

* it handles flow of Tensors
* Tensors are multidimensional arrays that generalize vectors and matrices 
* it's shape defines number of dimensions and size of each dimension

a 0-d Tensor, scalar.



In [None]:
sport = tf.constant("Tennis", tf.string)
number = tf.constant(1.41421356237, tf.float64)

print("`sport` is a {} - dimensional Tensor".format(tf.rank(sport).numpy()))
print("`number` is a {} - dimensional Tensor".format(tf.rank(number).numpy()))

In [None]:
sports = tf.constant(["Tennis", "Basketball"], tf.string)
numbers = tf.constant([3.141592, 1.414213, 2.71821], tf.float64)

print("`sports` is a {}-d Tensor with shape: {}".format(tf.rank(sports).numpy(), tf.shape(sports)))
print("`numbers` is a {}-d Tensor with shape: {}".format(tf.rank(numbers).numpy(), tf.shape(numbers)))

In [None]:
### Defining higher-order Tensors ###

matrix = tf.constant([[3.141592, 1.414213],
                      [3.141592, 1.414213],
                      [3.141592, 1.414213]], tf.float64)

print("`matrix` is a {}-d Tensor with shape {}".format(tf.rank(matrix).numpy(), tf.shape(matrix).numpy()))

assert isinstance(matrix, tf.Tensor), "matrix must be a tf Tensor object"
assert tf.rank(matrix).numpy() == 2

- the base `tf.Tensor` class requires tensors to be "**rectangular**"
- every element of same size
- specialized tensors can have different shapes
 - ragged tensors
 - sparse tensors
 




In [None]:
a = tf.constant([[1,2],
                 [3,4]], tf.int32)
b = tf.constant([[1,1],
                 [1,1]])

print(tf.add(a, b), "\nAddition\n")
print(tf.multiply(a, b), "\nMultiplication\n")
print(tf.matmul(a,b), "\nMatrix Multiplication\n")

In [None]:
images = tf.zeros([10, 256, 256, 3])

assert isinstance(images, tf.Tensor)
assert tf.shape(images).numpy().tolist() == [10, 256, 256, 3], "matrix is incorrect shape"

# **3. Computations on Tensors**

In [None]:
def func(a,b):
  c = a + b
  d = b - 1
  e = c * d
  return e

In [None]:
a, b = 1.5, 2.5
e_out = func(a,b)
print(e_out)

## **4. Neural networkds in TensorFlow**

- Below we consider the example of a simple perceptron defined by just one dense layer: $ y = \sigma(Wx + b)$, where $W$ represents a matrix of weights, $b$ is a bias, $x$ is the input, $\sigma$ is the sigmoid activation function, and $y$ is the output. 
- Tensors can flow through abstract types called Layers -- the building blocks of neural networks. Layers implement common neural networks operations, and are used to update weights, compute losses, and define inter-layer connectivity. We will first define a Layer to implement the simple perceptron defined above.

In [None]:
# n_output_nodes: number of output nodes
# input_shape: shape of input
# x: input to the layer

class OurDenseLayer(tf.keras.layers.Layer):
  def __init__(self, n_output_nodes):
    super(OurDenseLayer, self).__init__()
    self.n_output_nodes = n_output_nodes
  
  def build(self, input_shape):
    d = int(input_shape[-1])

    self.W = self.add_weight("weight", shape=[d, self.n_output_nodes])
    # print(self.W)
    # print(d)
    # print(self.n_output_nodes)
    self.b = self.add_weight("bias", shape=[1, self.n_output_nodes])
    # print(self.b)
  def call(self, x):
    z = tf.add(tf.matmul(x, self.W), self.b)
    y = tf.sigmoid(z)

    return y

# for reproducing the output set random seed
tf.random.set_seed(1)
layer = OurDenseLayer(3)
layer.build((1,2))
x_input = tf.constant([[1,2.]], shape=(1,2))
y = layer.call(x_input)

print(y)
mdl.lab1.test_custom_dense_layer_output(y)


- TensorFlow has defined a number of Layers that are commonly used in neural networks, for example a Dense.
- 


In [None]:
### Defining a neural network using the Sequential API ###

from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense

n_output_nodes = 3

model = Sequential()
dense_layer = Dense(n_output_nodes, activation="sigmoid")

# add dense layer to the model
model.add(dense_layer)


In [None]:
x_input = tf.constant([[1,2.]], shape=(1,2))

model_output = model(x_input)
print(model_output)

In addition to defining models using the Sequential API, we can also define neural networks by directly subclassing the Model class, which groups layers together to enable model training and inference. The Model class captures what we refer to as a "model" or as a "network". Using Subclassing, we can create a class for our model, and then define the forward pass through the network using the call function. Subclassing affords the flexibility to define custom layers, custom training loops, custom activation functions, and custom models. Let's define the same neural network as above now using Subclassing rather than the Sequential model.

In [None]:
from tensorflow.keras import Model
from tensorflow.keras.layers import Dense

class SubclassModel(tf.keras.Model):
  def __init__(self, n_output_nodes):
    super(SubclassModel, self).__init__()
    self.dense_layer = Dense(n_output_nodes, activation="sigmoid")

  def call(self, inputs):
    return self.dense_layer(inputs)

  

In [None]:
n_output_nodes = 3

model = SubclassModel(n_output_nodes)
x_input = tf.constant([[1,2.]], shape=(1,2))

print(model.call(x_input))

Importantly, Subclassing affords us a lot of flexibility to define custom models. For example, we can use boolean arguments in the call function to specify different network behaviors, for example different behaviors during training and inference. Let's suppose under some instances we want our network to simply output the input, without any perturbation. We define a boolean argument isidentity to control this behavior:

In [None]:
### Defining a model using subclassing and specifying custom behavior ###

from tensorflow.keras import Model
from tensorflow.keras.layers import Dense

class IdentityModel(tf.keras.Model):
  def __init__(self, n_output_nodes):
    super(IdentityModel, self).__init__()
    self.dense_layer = tf.keras.layers.Dense(n_output_nodes, activation="sigmoid")

  def call(self, inputs, isidentity=False):
    if isidentity:
      return inputs
    x = self.dense_layer(inputs)
    return x


In [None]:
n_output_nodes = 3
model = IdentityModel(n_output_nodes)

x_input = tf.constant([[1,2.]], shape=(1,2))

out_activate = model(x_input, False)
out_identity = model(x_input, True)

print("Network output with activation: {}\n network identity output: {}".format(out_activate.numpy(), out_identity.numpy()))

## **5. Automatic differentiation in TensorFlow**

- all forward pass operations are recorded to a gradient tape
- the gradient tape is played backwards to **compute gradient**
- after it is played backwards once, the tape is discarded
- **persitent gradient tapes** are used to compute multiple gradients over same computation


`tf.GradientTape`

In [None]:
### Gradient computation with GradientTape ###

x = tf.Variable(3.0)

with tf.GradientTape() as tape:
  y = x * x

dy_dx = tape.gradient(y, x)

print(dy_dx)
assert dy_dx.numpy() == 6.0

In training neural networks, we use differentiation and stochastic gradient descent (SGD) to optimize a loss function. Now that we have a sense of how GradientTape can be used to compute and access derivatives, we will look at an example where we use automatic differentiation and SGD to find the minimum of $L=(x-x_f)^2$. Here $x_f$ is a variable for a desired value we are trying to optimize for; $L$ represents a loss that we are trying to minimize. While we can clearly solve this problem analytically ($x_{min}=x_f$), considering how we can compute this using GradientTape sets us up nicely for future labs where we use gradient descent to optimize entire neural network losses.

In [None]:
### Function minimization with automatic differentiation and SGD ###

# Initialize a random value for our initial x
# x = tf.Variable([tf.random.normal([1])])
x = tf.Variable(tf.random.normal([1]))

print("Initializing x={}".format(x.numpy()))

learning_rate = 1e-1  # learning_rate for SGD
x_f = 4               # target value
history = []

# run SGD for a number of iterations. 
# compute derivative of loss at each iteration.
# perform SGD update
for i in range(500):
  if x != x_f:
    with tf.GradientTape() as tape:
      loss = (x_f - x)**2

    grad = tape.gradient(loss, x)     # compute derivative of loss wrt x
    new_x = x - learning_rate * grad  # sgd update
    x.assign(new_x)                   # update x
    history.append(x.numpy()[0])
    # history.append(x.numpy())

print(i)
print(history)
# plot evolution of x as we optimize towards x_f
plt.plot(history)
plt.plot([-100,500], [x_f, x_f])
plt.legend(('Predicted', 'True'))
plt.xlabel('Iteration')
plt.ylabel('x value')

  

# **Music Generation with RNNs**

In [None]:
import os 
import time
import functools
from IPython import display as ipythondisplay
from tqdm import tqdm
!apt-get install abcmidi timidity > /dev/null 2>&1

len(tf.config.list_physical_devices('GPU')) > 0

In [None]:
# Download Irish folk songs represented in ABC notation, Dataset.
songs = mdl.lab1.load_training_data()

# check one of the songs
example_song = songs[5]



In [None]:
print(songs[:1])

In [None]:
# Convert song from ABC notation to an audio file and play it
mdl.lab1.play_song(example_song)

In [None]:
# join all songs in a single string
songs_joined = "\n\n".join(songs)

vocab = sorted(set(songs_joined))
print("There are", len(vocab), "unique characters in the dataset")


Let's take a step back and consider our prediction task. We're trying to train a RNN model to learn patterns in ABC music, and then use this model to generate (i.e., predict) a new piece of music based on this learned information.

Breaking this down, what we're really asking the model is: given a character, or a sequence of characters, what is the most probable next character? We'll train the model to perform this task.

To achieve this, we will input a sequence of characters to the model, and train the model to predict the output, that is, the following character at each time step. RNNs maintain an internal state that depends on previously seen elements, so information about all characters seen up until a given moment will be taken into account in generating the prediction.

# **Vectorize the text**
Before we begin training our RNN model, we'll need to create a numerical representation of our text-based dataset. To do this, we'll generate two lookup tables: one that maps characters to numbers, and a second that maps numbers back to characters. Recall that we just identified the unique characters present in the text.

In [None]:
# mapping from unique character to unique index
#   we can evaluate `char2idx["d"]`
char2idx = {u:i for i, u in enumerate(vocab)}

# Create a mapping from unique index to unique character
idx2char = np.array(vocab)

take a peek at this numerical representation

In [None]:
print('{')
for char,_ in zip(char2idx, range(20)):
    print('  {:4s}: {:3d},'.format(repr(char), char2idx[char]))
print('  ...\n}')

# same thing as above
for char in char2idx:
  print(" {:4s}: {:3d},".format(repr(char), char2idx[char]))
print("...\n")

In [None]:
def vectorize_string(string):
  # return np.array(char2idx[char] for char in string)
  vectorized_output = np.array([char2idx[char] for char in string])
  return vectorized_output
  
vectorized_songs = vectorize_string(songs_joined)

In [None]:
print(vectorized_songs[:10])
for num in vectorized_songs[:10]:
  print("{} {}".format(num, repr(idx2char[num])))


# **Create training examples and targets**
Our next step is to actually divide the text into example sequences that we'll use during training. Each input sequence that we feed into our RNN will contain seq_length characters from the text. We'll also need to define a target sequence for each input sequence, which will be used in training the RNN to predict the next character. For each input, the corresponding target will contain the same length of text, except shifted one character to the right.

To do this, we'll break the text into chunks of seq_length+1. Suppose seq_length is 4 and our text is "Hello". Then, our input sequence is "Hell" and the target sequence is "ello".

The batch method will then let us convert this stream of character indices to sequences of the desired size.

In [None]:
n = vectorized_songs.shape[0] - 1

In [None]:
print(np.random.choice(200678-5, 5))


In [None]:
### Batch definition to create training examples ###

def get_batch(vectorized_songs, seq_length, batch_size):
  # the length of the vectorized songs string
  n = vectorized_songs.shape[0] - 1
  # check how much difference would it make to not have -1 above   
  # n = vectorized_songs.shape[0]
  # randomly choose the starting indices for the examples in the training batch
  idx = np.random.choice(n-seq_length, batch_size)
  # print(idx)
  input_batch = [vectorized_songs[i : i+seq_length] for i in idx]
  # print(input_batch,"\n")
  output_batch = [vectorized_songs[i+1 : i+seq_length+1] for i in idx]
  # print(output_batch,"\n")
  x_batch = np.reshape(input_batch, [batch_size, seq_length])
  y_batch = np.reshape(output_batch, [batch_size, seq_length])
  # print([idx2char[i] for i in x_batch],"\n")
  # print(y_batch.shape,"\n")
  return x_batch, y_batch

test_args = (vectorized_songs, 10, 2)

if not mdl.lab1.test_batch_func_types(get_batch, test_args) or \
   not mdl.lab1.test_batch_func_shapes(get_batch, test_args) or \
   not mdl.lab1.test_batch_func_next_step(get_batch, test_args): 
   print("======\n[FAIL] could not pass tests")
else: 
   print("======\n[PASS] passed all tests!")

For each of these vectors, each index is processed at a single time step. So, for the input at time step 0, the model receives the index for the first character in the sequence, and tries to predict the index of the next character. At the next timestep, it does the same thing, but additionally, the RNN considers the information from the previous step, i.e., its updated state.

In [None]:
x_batch, y_batch = get_batch(vectorized_songs, seq_length=5, batch_size=1)



In [None]:
for i, (input_idx, target_idx) in enumerate(zip(np.squeeze(x_batch), np.squeeze(y_batch))):
  print("Step: {:3d}".format(i))
  print("input: {} {:s}".format(input_idx, repr(idx2char[input_idx])))
  print("output: {} {:s}\n".format(target_idx, repr(idx2char[target_idx])))

In [None]:
def LSTM(rnn_units):
  return tf.keras.layers.LSTM(
      rnn_units,
      return_sequences=True,
      recurrent_initializer='glorot_uniform',
      recurrent_activation='sigmoid',
      stateful=True,
  )

In [None]:
def build_model(vocab_size, embedding_dim, rnn_units, batch_size):
  model = tf.keras.Sequential([
      tf.keras.layers.Embedding(vocab_size, embedding_dim, batch_input_shape=[batch_size, None]),
      LSTM(rnn_units),
      tf.keras.layers.Dense(vocab_size)
  ])
  return model

model = build_model(len(vocab), embedding_dim=256, rnn_units=1024, batch_size=32)

In [None]:
model.summary()


In [None]:
x, y = get_batch(vectorized_songs, seq_length=100, batch_size=32)

pred = model(x)

# print("Input shape:      ", x.shape, " # (batch_size, sequence_length)")
print(x[0,0],"\n")
print(pred[0,0],"\n")
# print("Prediction shape: ", pred.shape, "# (batch_size, sequence_length, vocab_size)")

## **Predictions from the untrained model**
Let's take a look at what our untrained model is predicting.

To get actual predictions from the model, we sample from the output distribution, which is defined by a softmax over our character vocabulary. This will give us actual character indices. This means we are using a categorical distribution to sample over the example prediction. This gives a prediction of the next character (specifically its index) at each timestep.

Note here that we sample from this probability distribution, as opposed to simply taking the argmax, which can cause the model to get stuck in a loop.

Let's try this sampling out for the first example in the batch.

In [None]:
sampled_indices = tf.random.categorical(pred[0], num_samples=1)
sampled_indices = tf.squeeze(sampled_indices,axis=-1).numpy()
sampled_indices




In [None]:
print("Input: \n", repr("".join(idx2char[x[0]])))
print()
print("Next Char Predictions: \n", repr("".join(idx2char[sampled_indices])))


# **Training the model: loss and training operations**
 Now it's time to train the model!

At this point, we can think of our next character prediction problem as a standard classification problem. Given the previous state of the RNN, as well as the input at a given time step, we want to predict the class of the next character -- that is, to actually predict the next character.

To train our model on this classification task, we can use a form of the crossentropy loss (negative log likelihood loss). Specifically, we will use the sparse_categorical_crossentropy loss, as it utilizes integer targets for categorical classification tasks. We will want to compute the loss using the true targets -- the labels -- and the predicted targets -- the logits.

Let's first compute the loss using our example predictions from the untrained model:

In [None]:
def compute_loss(labels, logits):
  loss = tf.keras.losses.sparse_categorical_crossentropy(labels, logits, from_logits=True)
  return loss

example_batch_loss = compute_loss(y, pred)

print("Scalar Loss: {}".format(example_batch_loss.numpy().mean()))

In [None]:
### Hyperparameter setting and optimization ###

# Optimization parameters:
num_training_iterations = 2000  # Increase this to train longer
batch_size = 4  # Experiment between 1 and 64
seq_length = 100  # Experiment between 50 and 500
learning_rate = 5e-3  # Experiment between 1e-5 and 1e-1

# Model parameters: 
vocab_size = len(vocab)
embedding_dim = 256 
rnn_units = 1024  # Experiment between 1 and 2048

# Checkpoint location: 
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "my_ckpt")

In [None]:
### Define optimizer and training operation ###

model = build_model(vocab_size, embedding_dim, rnn_units, batch_size)

optimizer = tf.keras.optimizers.Adam(learning_rate)

@tf.function
def train_step(x,y):
  with tf.GradientTape() as tape:
    y_hat = model(x)

    loss = compute_loss(y, y_hat)
  
  grads = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(grads, model.trainable_variables))
  return loss

history = []
plotter = mdl.util.PeriodicPlotter(sec=2, xlabel='Iterations', ylabel='Loss')
if hasattr(tqdm, '_instances'): 
  tqdm._instances.clear() # clear if it exists

for iter in tqdm(range(num_training_iterations)):

  # Grab a batch and propagate it through the network
  x_batch, y_batch = get_batch(vectorized_songs, seq_length, batch_size)
  loss = train_step(x_batch, y_batch)

  # Update the progress bar
  history.append(loss.numpy().mean())
  plotter.plot(history)

  # Update the model with the changed weights!
  if iter % 100 == 0:     
    model.save_weights(checkpoint_prefix)
    
# Save the trained model and the weights
model.save_weights(checkpoint_prefix)


In [None]:
model = build_model(vocab_size, embedding_dim, rnn_units, batch_size=1) # TODO
# model = build_model('''TODO''', '''TODO''', '''TODO''', batch_size=1)

# Restore the model weights for the last checkpoint after training
model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))
model.build(tf.TensorShape([1, None]))

model.summary()

In [None]:
### Prediction of a generated song ###

def generate_text(model, start_string, generation_length=1000):
  # Evaluation step (generating ABC text using the learned RNN model)

  '''TODO: convert the start string to numbers (vectorize)'''
  input_eval = [char2idx[s] for s in start_string] # TODO
  # input_eval = ['''TODO''']
  input_eval = tf.expand_dims(input_eval, 0)

  # Empty string to store our results
  text_generated = []

  # Here batch size == 1
  model.reset_states()
  tqdm._instances.clear()

  for i in tqdm(range(generation_length)):
      '''TODO: evaluate the inputs and generate the next character predictions'''
      predictions = model(input_eval)
      # predictions = model('''TODO''')
      
      # Remove the batch dimension
      predictions = tf.squeeze(predictions, 0)
      
      '''TODO: use a multinomial distribution to sample'''
      predicted_id = tf.random.categorical(predictions, num_samples=1)[-1,0].numpy()
      # predicted_id = tf.random.categorical('''TODO''', num_samples=1)[-1,0].numpy()
      
      # Pass the prediction along with the previous hidden state
      #   as the next inputs to the model
      input_eval = tf.expand_dims([predicted_id], 0)
      
      '''TODO: add the predicted character to the generated text!'''
      # Hint: consider what format the prediction is in vs. the output
      text_generated.append(idx2char[predicted_id]) # TODO 
      # text_generated.append('''TODO''')
    
  return (start_string + ''.join(text_generated))

In [None]:
'''TODO: Use the model and the function defined above to generate ABC format text of length 1000!
    As you may notice, ABC files start with "X" - this may be a good start string.'''
generated_text = generate_text(model, start_string="X", generation_length=1000) # TODO
# generated_text = generate_text('''TODO''', start_string="X", generation_length=1000)

In [None]:
print(generated_text)

In [None]:
### Play back generated songs ###

generated_songs = mdl.lab1.extract_song_snippet(generated_text)

for i, song in enumerate(generated_songs): 
  # Synthesize the waveform from a song
  waveform = mdl.lab1.play_song(song)

  # If its a valid song (correct syntax), lets play it! 
  if waveform:
    print("Generated song", i)
    ipythondisplay.display(waveform)

**Export to GitHub**

In [None]:
!git remote add origin https://$uname:$password@github.com/$uname/6S191.git