In [5]:
!gdown --id 1JQPzJGIERV998BjtEcSSua3rMNhFpZkl
!gdown --id 1hCtv60Vog1f6VWj45DCyVgmQ0_jn98IA
!gdown --id 1qqL1uBarF3T1VcF0aN6jFO7tINgv1mrI
!git clone https://github.com/BrainTankDeepLearning/Week6.git

Downloading...
From: https://drive.google.com/uc?id=1JQPzJGIERV998BjtEcSSua3rMNhFpZkl
To: /content/Twitter_Data.csv
100% 20.9M/20.9M [00:00<00:00, 93.7MB/s]
Downloading...
From: https://drive.google.com/uc?id=1hCtv60Vog1f6VWj45DCyVgmQ0_jn98IA
To: /content/glove.6B.50d.txt
100% 171M/171M [00:01<00:00, 123MB/s]
Downloading...
From: https://drive.google.com/uc?id=1qqL1uBarF3T1VcF0aN6jFO7tINgv1mrI
To: /content/Reddit_Data.csv
100% 6.89M/6.89M [00:00<00:00, 60.6MB/s]
fatal: destination path 'Week6' already exists and is not an empty directory.


In [24]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import os
import string
import re

import matplotlib.pyplot as plt

from PIL import Image
import numpy as np

import pandas as pd

from torch.utils.data import Dataset, DataLoader

word2index = dict()
index2word = dict()

class GloveEmbeddings():
  def __init__(self):
    self.emb = dict()
    with open('glove.6B.50d.txt','rt') as fi:
        full_content = fi.read().strip().split('\n')
    for i in range(len(full_content)):
        i_word = full_content[i].split(' ')[0]
        i_embeddings = np.array([float(val) for val in full_content[i].split(' ')[1:]], dtype = np.float16)
        self.emb[i_word] = i_embeddings

  def get_emb(self, word):
    if word not in self.emb:
      self.emb[word] = np.random.uniform(0,1,50)
    return self.emb[word]

class TextDataset(Dataset):
  def __init__(self, url = "Reddit_Data.csv"):
    self.data = pd.read_csv(url, header = 0, names = ["Text", "Label"], skip_blank_lines = True, nrows = 5500)
    self.data.dropna(subset = ["Text", "Label"], inplace=True)
    self.data.astype({'Label': 'int8'}).dtypes
    self.data = self.data.to_numpy()

    self.emb = GloveEmbeddings()

    word2index["[EMPTY]"] = 0
    index2word[0] = "[EMPTY]"

  def __len__(self):
    return len(self.data)

  def __getitem__(self, idx):
    data = self.data[idx]
    text = data[0]
    label = int(data[1])

    sentence = re.sub(r'[^\w\s]', '', text)
    sentence = sentence.replace("\n", "")
    sentence = sentence.replace("\t", "")
    sentence = sentence.lower()

    words = sentence.split(" ")

    curr_one_hot = np.zeros([30, 50], dtype = np.float32)
    curr_tokens = np.zeros(30, dtype = "int32")
    
    for word_num, word in enumerate(words):
      if word_num >= 30:
        break
      curr_one_hot[word_num] = self.emb.get_emb(word)
      if word not in word2index:
        idx = len(word2index.keys())
        word2index[word] = idx
        index2word[idx] = word
      idx = word2index[word]
      curr_tokens[word_num] = idx

    return {"text": curr_tokens, "text_embedded": curr_one_hot, "label": label}

def get_dataloader(url = "Reddit_Data.csv"):
  ds = TextDataset(url)
    
  train_ds, test_ds = torch.utils.data.random_split(ds, (5000, 490))

  train_loader = DataLoader(train_ds, batch_size = 32, drop_last = True, shuffle = True)
  test_loader = DataLoader(test_ds, batch_size = 32, drop_last = True, shuffle = True)

  return train_loader, test_loader

def tokens_to_text(tokens):
  if len(tokens.shape) == 1:
    tokens = tokens.unsqueeze(0)

  out_list = []
  for token_list in tokens:
    word_list = ""
    for token in token_list:
      token = token.item()
      if token == 0:
        break
      word = index2word[token]
      word_list += word + " "
    out_list.append(word_list)
  
  return out_list

def softmax_loss(prediction, one_hot):
  prediction = torch.log(prediction)

  target = torch.empty(size = (len(one_hot), ), dtype = torch.long)
  for i, row in enumerate(one_hot):
    index = (row == 1).nonzero(as_tuple=True)[0]
    target[i] = index.item()

  loss = torch.nn.functional.nll_loss(prediction, target)

  return loss

def create_ground_truth(label):
  out = torch.zeros((32, 3))

  for i in range(len(label)):
    idx = label[i].item()

    out[i][idx + 1] = 1.0

  return out

In [28]:
class RNN_Cell(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(RNN_Cell, self).__init__()
        self.hidden_size = hidden_size
        self.in2hidden = nn.Linear(input_size + hidden_size, hidden_size)
        self.in2output = nn.Linear(input_size + hidden_size, output_size)
    
    def forward(self, x, hidden_state):
        if len(x.shape) == 1:
          x = x.unsqueeze(0)
        combined = torch.cat((x, hidden_state), 1)
        hidden = torch.sigmoid(self.in2hidden(combined))
        output = self.in2output(combined)
        return output, hidden
    
    def init_hidden(self):
        return nn.init.kaiming_uniform_(torch.empty(1, self.hidden_size))

class RNN(nn.Module):
  def __init__(self):
    super(RNN, self).__init__()
    pass

  def forward(self, embedded_sentence, sentence):
    pass

def train(model, train_dataset, optim):
  pass

def test(model, test_dataset):
  total_correct = 0
  for data in test_dataset:
    text = data["text"]
    text_emb = data["text_embedded"]
    label = data["label"]

    prediction = model(text_emb, text)

    highest_prediction = torch.argmax(prediction, dim = 1) - 1

    n_correct = (highest_prediction == label).sum()

    total_correct += n_correct.item()

  print(f"{total_correct / (len(test_dataset) * 32)}")

if __name__ == "__main__":
  train_ds, test_ds = get_dataloader("Reddit_Data.csv")

  model = RNN()
  #model.load_state_dict(torch.load("Week6/model_pretrained_rnn.state", map_location=torch.device("cpu")))

  optim = torch.optim.Adam(model.parameters())

  train(model, train_ds, optim)

  test(model, test_ds)

ValueError: ignored