<a href="https://colab.research.google.com/github/EdoardoMaines/NLU-Project/blob/main/NLU_MyProject.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install d2l==0.17.0

import collections
import re
from d2l import torch as d2l
import nltk
from nltk.corpus.reader import ConllCorpusReader
from nltk.lm.vocabulary import Vocabulary


In [3]:
#OPEN FILES
def readFile(path):
  corpus = []
  with open(path) as file:
    for line in file:
      corpus.append(line)

    return corpus

In [4]:
#SPLIT IN WORDS
def tokenize(lines, token='word'):
  if token == 'word':
    return [line.split() for line in lines]
  if token == 'char':
    return [list(line) for line in lines]

In [5]:
#CLASS VOCABOLARY
class Vocab: 
  """Vocabulary for text."""
  def __init__(self, tokens=None, min_freq=0, reserved_tokens=None):
    if tokens is None:
      tokens = []
    if reserved_tokens is None:
      reserved_tokens = []

  # Sort according to frequencies
    counter = count_corpus(tokens)
    self._token_freqs = sorted(counter.items(), key=lambda x: x[1],
    reverse=True)
    # The index for the unknown token is 0
    self.idx_to_token = ['<unk>'] + reserved_tokens
    self.token_to_idx = {token: idx for idx, token in enumerate(self.idx_to_token)}
    for token, freq in self._token_freqs:
      if freq < min_freq:
        break
      if token not in self.token_to_idx:
        self.idx_to_token.append(token)
        self.token_to_idx[token] = len(self.idx_to_token) - 1
  def __len__(self):
    return len(self.idx_to_token)

  def __getitem__(self, tokens):
    if not isinstance(tokens, (list, tuple)):
      return self.token_to_idx.get(tokens, self.unk)
    return [self.__getitem__(token) for token in tokens]
  
  def to_tokens(self, indices):
    if not isinstance(indices, (list, tuple)):
      return self.idx_to_token[indices]
    return [self.idx_to_token[index] for index in indices]


  @property
  def unk(self): # Index for the unknown token
    return 0
  @property
  def token_freqs(self): # Index for the unknown token
    return self._token_freqs


def count_corpus(tokens):
  """Count token frequencies."""
  # Here `tokens` is a 1D list or 2D list
  if len(tokens) == 0 or isinstance(tokens[0], list):
  # Flatten a list of token lists into a list of tokens
    tokens = [token for line in tokens for token in line]
  return collections.Counter(tokens)

In [8]:
test_corpus = readFile('/content/ptb.test.txt')
train_corpus = readFile('/content/ptb.train.txt')
valid_corpus = readFile('/content/ptb.valid.txt')

#print(len(test_corpus))
tokens = tokenize(test_corpus)

# for i in range(20):
#   print(tokens[i])

In [None]:
vocab = Vocab(tokens)
vocab2 = d2l.Vocab(tokens)
print(list(vocab.token_to_idx.items())[:10]) ## in the dataset N = instead of numbers
print(list(vocab2.token_to_idx.items())[:10])

In [None]:
#convert each text line into a list of numerical indices

for i in [0, 10]:
  print('words:', tokens[i])
  print('indices:', vocab[tokens[i]])



In [None]:
#unigram
tokens = d2l.tokenize(test_corpus)
corpus = [token for line in tokens for token in line]
vocab = d2l.Vocab(corpus)
vocab.token_freqs[:10]

In [None]:
#bigram
bigram_token = [pair for pair in zip(corpus[:-1], corpus[1:])]
bigram_vocab = d2l.Vocab(bigram_token)
bigram_vocab.token_freqs[:10]

In [None]:
#trigram
trigram_token = [pair for pair in zip(corpus[:-2], corpus[1:-1], corpus[2:])]
trigram_vocab = d2l.Vocab(trigram_token)
trigram_vocab.token_freqs[:10]

In [25]:
import random
import torch

#get the data randomly
def seq_data_iter_random(corpus, batch_size, num_steps):
  """Generate a minibatch of subsequences using random sampling."""
  # Start with a random offset (inclusive of `num_steps - 1`) to partition a
  # sequence
  corpus = corpus[random.randint(0, num_steps - 1):]
  # Subtract 1 since we need to account for labels
  num_subseqs = (len(corpus) - 1) // num_steps
  # The starting indices for subsequences of length `num_steps`
  initial_indices = list(range(0, num_subseqs * num_steps, num_steps))
  # In random sampling, the subsequences from two adjacent random
  # minibatches during iteration are not necessarily adjacent on the
  # original sequence
  random.shuffle(initial_indices)

  def data(pos):
  # Return a sequence of length `num_steps` starting from `pos`
    return corpus[pos: pos + num_steps]



  num_batches = num_subseqs // batch_size

  for i in range(0, batch_size * num_batches, batch_size):
    # Here, `initial_indices` contains randomized starting indices for
    # subsequences
    initial_indices_per_batch = initial_indices[i: i + batch_size]
    X = [data(j) for j in initial_indices_per_batch]
    Y = [data(j + 1) for j in initial_indices_per_batch]
    yield torch.tensor(X), torch.tensor(Y)


#get the data sequentially
def seq_data_iter_sequential(corpus, batch_size, num_steps):
  """Generate a minibatch of subsequences using sequential partitioning."""
  # Start with a random offset to partition a sequence
  offset = random.randint(0, num_steps)
  num_tokens = ((len(corpus) - offset - 1) // batch_size) * batch_size
  Xs = torch.tensor(corpus[offset: offset + num_tokens])
  Ys = torch.tensor(corpus[offset + 1: offset + 1 + num_tokens])
  Xs, Ys = Xs.reshape(batch_size, -1), Ys.reshape(batch_size, -1)
  num_batches = Xs.shape[1] // num_steps
  for i in range(0, num_steps * num_batches, num_steps):
    X = Xs[:, i: i + num_steps]
    Y = Ys[:, i: i + num_steps]
    yield X, Y

In [29]:
class SeqDataLoader:
  """An iterator to load sequence data."""
  def __init__(self, batch_size, num_steps, use_random_iter, max_tokens):
    if use_random_iter:
      self.data_iter_fn = seq_data_iter_random
    else:
      self.data_iter_fn = seq_data_iter_sequential
      self.corpus = tokens[:max_tokens]
      self.vocab = vocab[:max_tokens]
      self.batch_size, self.num_steps = batch_size, num_steps

  def __iter__(self):
    return self.data_iter_fn(self.corpus, self.batch_size, self.num_steps)

In [30]:
def load_data(batch_size, num_steps, use_random_iter=False, max_tokens=10000):
  """Return the iterator and the vocabulary of the time machine dataset."""
  data_iter = SeqDataLoader(
  batch_size, num_steps, use_random_iter, max_tokens)
  return data_iter, data_iter.vocab

In [31]:
#CONCISE IMPLEMENTATION OF CNN

import torch
from torch import nn
from torch.nn import functional as F
from d2l import torch as d2l

batch_size, num_steps = 32, 35
train_iter, vocab = load_data(batch_size, num_steps)

TypeError: ignored