-
Notifications
You must be signed in to change notification settings - Fork 0
/
trainRNN.py
185 lines (135 loc) · 6.17 KB
/
trainRNN.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import tensorflow as tf
import numpy as np
import os
import time
import functools
import pickle
from tqdm import tqdm
# Download the dataset
songs = pickle.load( open( "training_data/positive.pkl", "rb" ) )
# Join our list of song strings into a single string containing all songs
songs_joined = "\n\n".join(songs)
# Find all unique characters in the joined string
vocab = sorted(set(songs_joined))
char2idx = {u:i for i, u in enumerate(vocab)}
idx2char = np.array(vocab)
### Vectorize the songs string ###
def vectorize_string(string):
vectorized_output = np.array([char2idx[char] for char in string])
return vectorized_output
vectorized_songs = vectorize_string(songs_joined)
# check that vectorized_songs is a numpy array
assert isinstance(vectorized_songs, np.ndarray), "returned result should be a numpy array"
### Batch definition to create training examples ###
def get_batch(vectorized_songs, seq_length, batch_size):
# the length of the vectorized songs string
n = vectorized_songs.shape[0] - 1
# randomly choose the starting indices for the examples in the training batch
idx = np.random.choice(n-seq_length, batch_size)
input_batch = [vectorized_songs[i : i+seq_length] for i in idx]
output_batch = [vectorized_songs[i+1 : i+seq_length+1] for i in idx]
# x_batch, y_batch provide the true inputs and targets for network training
x_batch = np.reshape(input_batch, [batch_size, seq_length])
y_batch = np.reshape(output_batch, [batch_size, seq_length])
return x_batch, y_batch
x_batch, y_batch = get_batch(vectorized_songs, seq_length=5, batch_size=1)
def LSTM(rnn_units):
return tf.keras.layers.LSTM(
rnn_units,
return_sequences=True,
recurrent_initializer='glorot_uniform',
recurrent_activation='sigmoid',
stateful=True,
)
### Defining the RNN Model ###
def build_model(vocab_size, embedding_dim, rnn_units, batch_size):
model = tf.keras.Sequential([
# Layer 1: Embedding layer to transform indices into dense vectors of a fixed embedding size
tf.keras.layers.Embedding(vocab_size, embedding_dim, batch_input_shape=[batch_size, None]),
# Layer 2: LSTM with `rnn_units` number of units.
LSTM(rnn_units),
# Layer 3: Dense (fully-connected) layer that transforms the LSTM output into the vocabulary size.
tf.keras.layers.Dense(vocab_size)
])
return model
# Build a simple model with default hyperparameters. You will get the chance to change these later.
model = build_model(len(vocab), embedding_dim=256, rnn_units=1024, batch_size=32)
model.summary()
def compute_loss(labels, logits):
return tf.keras.losses.sparse_categorical_crossentropy(labels, logits, from_logits=True)
x, y = get_batch(vectorized_songs, seq_length=100, batch_size=32)
pred = model(x)
#Compute the loss using the true next characters from the example batch and the predictions from the untrained model several cells above
example_batch_loss = compute_loss(y, pred)
### Hyperparameter setting and optimization ###
# Optimization parameters:
num_training_iterations = 1500 # Increase this to train longer
batch_size = 75 # Experiment between 1 and 64
seq_length = 100 # Experiment between 50 and 500
learning_rate = 1e-3 # Experiment between 1e-5 and 1e-1
# Model parameters:
vocab_size = len(vocab)
embedding_dim = 256
rnn_units = 1024 # Experiment between 1 and 2048
# Checkpoint location:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "my_ckpt")
### Define optimizer and training operation ###
#instantiate a new model for training using the `build_model` function and the hyperparameters created above.
model = build_model(vocab_size, embedding_dim, rnn_units, batch_size)
#instantiate the optimizer
optimizer = tf.keras.optimizers.Adam(learning_rate)
def train_step(x, y):
with tf.GradientTape() as tape:
y_hat = model(x)
loss = compute_loss(y, y_hat)
# Now, compute the gradients
grads = tape.gradient(loss, model.trainable_variables)
# Apply the gradients to the optimizer so it can update the model accordingly
optimizer.apply_gradients(zip(grads, model.trainable_variables))
return loss
# Begin training!
history = []
if hasattr(tqdm, '_instances'): tqdm._instances.clear() # clear if it exists
for iter in tqdm(range(num_training_iterations)):
# Grab a batch and propagate it through the network
x_batch, y_batch = get_batch(vectorized_songs, seq_length, batch_size)
loss = train_step(x_batch, y_batch)
# Update the progress bar
history.append(loss.numpy().mean())
# Update the model with the changed weights!
if iter % 100 == 0:
model.save_weights(checkpoint_prefix)
# Save the trained model and the weights
model.save_weights(checkpoint_prefix)
print("Final Loss:", history[-1])
model = build_model(vocab_size, embedding_dim, rnn_units, batch_size=1)
# Restore the model weights for the last checkpoint after training
model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))
model.build(tf.TensorShape([1, None]))
model.summary()
### Prediction of a generated song ###
def generate_text(model, start_string, generation_length=1000):
# Evaluation step (generating ABC text using the learned RNN model)
input_eval = [char2idx[s] for s in start_string]
input_eval = tf.expand_dims(input_eval, 0)
print(input_eval)
# Empty string to store our results
text_generated = []
# Here batch size == 1
model.reset_states()
tqdm._instances.clear()
for i in tqdm(range(generation_length)):
predictions = model(input_eval)
# Remove the batch dimension
predictions = tf.squeeze(predictions, 0)
'''TODO: use a multinomial distribution to sample'''
predicted_id = tf.random.categorical(predictions, num_samples=1)[-1,0].numpy()
# Pass the prediction along with the previous hidden state as the next inputs to the model
input_eval = tf.expand_dims([predicted_id], 0)
'''TODO: add the predicted character to the generated text!'''
# Hint: consider what format the prediction is in vs. the output
text_generated.append(idx2char[predicted_id])
return (start_string + ''.join(text_generated))
generated_text = generate_text(model, start_string="b", generation_length=1000)
print(generated_text)