In [None]:
# PID 5122089
# Name Adam Farrow
# Collaboration:
# N/A


import numpy as np
from pandas import read_csv
from sklearn.preprocessing import MinMaxScaler

# Step 1. Use pandas to read training and testing from txt file.
train = read_csv("train.txt")
test = read_csv("test.txt")

train_data = train["Passengers"].values.astype(np.float32)
test_data = test["Passengers"].values.astype(np.float32)


# Step 2. Normalize training and test data into [0, 1]. 
scaler = MinMaxScaler()
train_data_normalized = scaler.fit_transform(train_data.reshape(-1, 1)).flatten()
test_data_normalized = scaler.transform(test_data.reshape(-1, 1)).flatten()

# Step 3. Create a training and test datasets. 
x, y = [], []
for i in range(len(train_data_normalized) - 12):
    x.append(train_data_normalized[i:i+12])
    y.append(train_data_normalized[i+12])

trainX = np.array(x)
trainY = np.array(y)

x, y = [], []
for i in range(len(test_data_normalized) - 12):
    x.append(test_data_normalized[i:i+12])
    y.append(test_data_normalized[i+12])

testX = np.array(x)
testY = np.array(y)

# Step 4. Print out the shape of data.
print("trainX:", trainX.shape)
print("trainY:", trainY.shape)
print("testX:", testX.shape)
print("testY:", testY.shape)


trainX: (101, 12)
trainY: (101,)
testX: (17, 12)
testY: (17,)


In [None]:
import torch.nn as nn
# Complete the model architecture 
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
      super(RNN, self).__init__()
      self.hidden_size = hidden_size
      self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
      self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
      out, _ = self.rnn(x)
      out = self.fc(out[:, -1, :])
      return out

# Create an instance of model, optimizer and criterion. 
model = RNN(1, 4, 1)
optimizer = torch.optim.Adam(model.parameters(), lr=0.005)
criterion = nn.MSELoss()


# Train the RNN Model for 1000 epoch and print out the training loss for every 100 epochs. 
for epoch in range(1000):
    #we will use np.random to help generate random batches of size 10
    indices = np.random.randint(0, len(trainX), 10)
    inputs = torch.tensor(trainX[indices], dtype=torch.float32)
    targets = torch.tensor(trainY[indices], dtype=torch.float32)

    # Forward pass
    outputs = model(inputs.unsqueeze(-1))

    # Compute loss
    loss = criterion(outputs.squeeze(), targets)

    # Backward and optimize
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # print training every 100 epochs
    if (epoch + 1) % 100 == 0:
        print(f'Epoch {epoch+1} Loss: {loss.item():.4f}')

Epoch 100 Loss: 0.0453
Epoch 200 Loss: 0.0039
Epoch 300 Loss: 0.0069
Epoch 400 Loss: 0.0092
Epoch 500 Loss: 0.0026
Epoch 600 Loss: 0.0039
Epoch 700 Loss: 0.0043
Epoch 800 Loss: 0.0068
Epoch 900 Loss: 0.0047
Epoch 1000 Loss: 0.0063


In [None]:
import torch
import random
import numpy as np

from keras.preprocessing import sequence
from keras.datasets import imdb

# Step 1. Load IMDB dataset from keras. 
(x_train, y_train), (x_test, y_test) = imdb.load_data(num_words=1000)

# Step 2. Preprocess the sequences with padding 
x_train = sequence.pad_sequences(x_train, maxlen=100)
x_test = sequence.pad_sequences(x_test, maxlen=100)

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
import numpy as np


# Define LSTM model architecture
class LSTMClassifier(nn.Module):
    def __init__(self, max_features, embedding_dim, hidden_dim, num_layers):
        super(LSTMClassifier, self).__init__()
        # define the embedded , LSTM, and Linear layer
        self.embeddings = nn.Embedding(max_features, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim)
        self.linear = nn.Linear(hidden_dim, num_layers)

    def forward(self, x):
        # defining forward pass

        # embedding layer
        x = self.embeddings(x)
        # then we need to permute the dimensions for the LSTM input
        x = x.permute(1, 0, 2)
        # we then initialize the hidden state and the cell state tensors
        h_0 = torch.zeros(1, x.size(1), 8)
        c_0 = torch.zeros(1, x.size(1), 8)
        # LSTM layer
        lstm_out, _ = self.lstm(x, (h_0, c_0))
        # select the last layer of the LSTM sequence
        lstm_out = lstm_out[-1, :, :]
        # linear layer
        output = self.linear(lstm_out)


        # define sigmoid activation and apply it to the output
        sigmoid = nn.Sigmoid()
        return sigmoid(output).squeeze()

# create the DataLoader using tensor Dataset
train_loader = DataLoader(TensorDataset(torch.LongTensor(x_train), torch.FloatTensor(y_train)), batch_size=64, shuffle=True)

# Create an instance of LSTM model, an Adam optimizer and BCE loss
model = LSTMClassifier(1000, 8, 8, 1)
optimizer = optim.Adam(model.parameters())
criterion = nn.BCELoss()


best_accuracy = 0
# Training loop
for epoch in range(10):
    # set the model to training mode
    model.train()
    total_loss = 0.0
    correct = 0
    total = 0
    for inputs, labels in train_loader:
        # calculate the zero gradient
        optimizer.zero_grad()
        # we then forward pass
        outputs = model(inputs)
        # calculate our loss
        loss = criterion(outputs, labels)

        # backpropogration
        loss.backward()
        # optimization step
        optimizer.step()

        # add the loss of current item to total
        total_loss += loss.item()

        # convert outputs to match labels as binary predictions
        predicted = torch.round(outputs)

        # sum of total correct predictions
        correct += (predicted == labels).sum().item()
        # sum of total
        total += labels.size(0)
    # calculate the accuracy of the model
    accuracy = correct / total

    # conditional statements to keep track of best accuracy of the model
    if(best_accuracy == 0):
      best_accuracy = accuracy
    if(accuracy > best_accuracy):
      best_accuracy = accuracy

print(f"Best validation accuracy: {accuracy:.4f}")

Best validation accuracy: 0.8167


In [None]:
#import packages here
import numpy as np
import pandas as pd
import time

N_STATES = 6   # the width of 1-dim world
ACTIONS = ['left', 'right']     # the available actions to use
EPSILON = 0.9   # the degree of greedy (0＜ε＜1)
ALPHA = 0.1     # learning rate (0＜α≤1)
GAMMA = 0.9    # discount factor (0＜γ＜1)
MAX_EPOCHES = 13   # the max epoches
FRESH_TIME = 0.3    # the interval time

In [None]:
#define the function here
def build_q_table(n_states, actions):
   return pd.DataFrame(np.zeros((n_states, len(actions))), columns=actions)

q_table = build_q_table(N_STATES, ACTIONS)
print(q_table)

   left  right
0   0.0    0.0
1   0.0    0.0
2   0.0    0.0
3   0.0    0.0
4   0.0    0.0
5   0.0    0.0


In [None]:
#define the function here
# Given state and Q-table, choose action
def choose_action(state, q_table):
  # get all q values of the table
    state_actions = q_table.iloc[state, :]
    if np.random.uniform() < EPSILON or state_actions.all() == 0:
      # pick random action
        action_name = np.random.choice(ACTIONS)
    else:
      # choose q with largest value for greedy decision
        action_name = state_actions.idxmax()
    return action_name

sample_action = choose_action(0, q_table)
print(sample_action)

left


In [None]:
#define the function here
def get_env_feedback(S_current, A):
    # This is how agent will interact with the environment
    if A == 'right':    # move right
        # if we reach the end we will select the next state as terminal
        if S_current == N_STATES - 2:
            S_next = 'terminal'
            # reward is set to 1 for reaching the end
            R = 1
        # otherwise we move to the next state and set the reward to 0
        else:
            S_next = S_current + 1
            R = 0
    else:   # move left
        # when S_current is 0 we have reached a wall so we set the next equal to the current to not move
        if S_current == 0:
            S_next = S_current  # reach the wall
        else:
          # otherwise if we are not at the wall we will move to the left
            S_next = S_current - 1
        # reward is always 0 when moving left
        R = 0
    return S_next, R

sample_action = 'left'
S_current = 4
sample_feedback = get_env_feedback(S_current, sample_action)
print(sample_feedback)

(3, 0)


In [None]:
def update_env(S, episode, step_counter):
    # This is how environment be updated
    env_list = ['-']*(N_STATES-1) + ['T']   # '---------T' our environment
    if S == 'terminal':
        interaction = '  Episode %s: total_steps = %s' % (episode+1, step_counter)
        print('{}\n'.format(interaction), end='')
        time.sleep(2)
    else:
        env_list[S] = 'o'
        interaction = ''.join(env_list)
        print('\r{}'.format(interaction), end='')
        time.sleep(FRESH_TIME)

In [None]:
def reinforce_learning():
    q_table = build_q_table(N_STATES, ACTIONS)  # build Q-table
    for episode in range(MAX_EPOCHES):
        step_counter = 0  # counter for counting steps to reach the treasure
        S_current = 0  # start from the initial state
        is_terminated = False  # flag to continue or stop the loop
        update_env(S_current, episode, step_counter)  # update environment
        while not is_terminated:
            A = choose_action(S_current, q_table)  # choose one action
            S_next, R = get_env_feedback(S_current, A)  # take action & get next state and reward
            if S_next != 'terminal':  # if the explorer doesn't get to the treasure
                q_target = R + GAMMA * q_table.loc[S_next, :].max()  # Bellman equation
            else:
                q_target = R  # if next state is terminal, the estimated Q-value is the immediate reward
                is_terminated = True  # terminate this episode

            q_table.loc[S_current, A] += ALPHA * (q_target - q_table.loc[S_current, A])  # update Q-table
            S_current = S_next  # move to next state

            update_env(S_current, episode, step_counter + 1)
            step_counter += 1

    return q_table

In [None]:
#main function to run
if __name__ == "__main__":
    q_table = reinforce_learning()
    print('\r\nQ-table:\n')
    print(q_table)

----oT  Episode 1: total_steps = 25
----oT  Episode 2: total_steps = 12
----oT  Episode 3: total_steps = 54
----oT  Episode 4: total_steps = 17
----oT  Episode 5: total_steps = 9
----oT  Episode 6: total_steps = 25
----oT  Episode 7: total_steps = 9
----oT  Episode 8: total_steps = 12
----oT  Episode 9: total_steps = 9
----oT  Episode 10: total_steps = 20
----oT  Episode 11: total_steps = 11
----oT  Episode 12: total_steps = 11
----oT  Episode 13: total_steps = 29

Q-table:

       left     right
0  0.011891  0.033624
1  0.014986  0.074258
2  0.013679  0.213437
3  0.049346  0.421909
4  0.088946  0.745813
5  0.000000  0.000000
