<a href="https://colab.research.google.com/github/akiabe/coding-practice/blob/master/sentiment_with_DNNs.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [66]:
# load data set

import numpy as np
import nltk
from nltk.corpus import twitter_samples
#nltk.download('twitter_samples')

all_positive_tweets = twitter_samples.strings('positive_tweets.json')
all_negative_tweets = twitter_samples.strings('negative_tweets.json')

#print(len(all_positive_tweets))
#print(len(all_negative_tweets))

train_pos = all_positive_tweets[:4000]
val_pos = all_positive_tweets[4000:]

train_neg = all_negative_tweets[:4000]
val_neg = all_negative_tweets[4000:]

#print(len(train_pos))
#print(len(val_pos))
#print(len(train_neg))
#print(len(val_neg))

train_x = train_pos + train_neg
val_x = val_pos + val_neg

#print(len(train_x))
#print(len(val_x))

train_y = np.append(np.ones(len(train_pos)), np.zeros(len(train_neg)))
val_y = np.append(np.ones(len(val_pos)), np.zeros(len(val_neg)))

#print(len(train_y))
#print(len(val_y))


In [67]:
# process tweet

def process_tweet(tweet):
  import re
  import string

  #nltk.download('stopwords')
  from nltk.corpus import stopwords
  from nltk.stem import PorterStemmer
  from nltk.tokenize import TweetTokenizer

  stemmer = PorterStemmer()
  stopwords_english = stopwords.words('english')

  tweet = re.sub(r'\$\w*', '', tweet)
  tweet = re.sub(r'^RT[\s]+', '', tweet)
  tweet = re.sub(r'https?:\/\/.*[\r\n]*', '', tweet)
  tweet = re.sub(r'#', '', tweet)

  tokenizer = TweetTokenizer(preserve_case=False, 
                             strip_handles=True, 
                             reduce_len=True)
  
  tweet_tokens = tokenizer.tokenize(tweet)
  
  tweets_clean = []
  for word in tweet_tokens:
    if (word not in stopwords_english and
        word not in string.punctuation):
      stem_word = stemmer.stem(word)
      tweets_clean.append(stem_word)
  
  return tweets_clean

# test case
#print(train_pos[0])
#process_tweet(train_pos[0])


In [68]:
# build the vocabulary

Vocab = {'__PAD__': 0, '__</e>__': 1, '__UNK__': 2}

for tweet in train_x:
  processed_tweet = process_tweet(tweet)
  for word in processed_tweet:
    if word not in Vocab:
      Vocab[word] = len(Vocab)

#print(len(Vocab))
#Vocab


In [69]:
# convert tweet to tensor

def tweet_to_tensor(tweet, vocab_dict, 
                    unk_token='__UNK__', verbose=False):
  
  word_l = process_tweet(tweet)

  if verbose:
    print("list of words from the processed tweet:")
    print(word_l)
  
  tensor_l = []
  unk_ID = vocab_dict[unk_token]

  if verbose:
    print(f"the unique integer ID for the unk_token is {unk_ID}")
  
  for word in word_l:
    word_ID = vocab_dict[word] if word in vocab_dict.keys() else unk_ID
    tensor_l.append(word_ID)
  
  return tensor_l

# test case
#print(val_pos[0])
#tweet_to_tensor(val_pos[0], vocab_dict=Vocab, verbose=True)


In [74]:
# create batch generator

def data_generator(data_pos, data_neg, batch_size, loop, vocab_dict,
                   shuffle=False):
  import random

  assert batch_size % 2 == 0
  
  n_to_take = batch_size // 2

  pos_index = 0
  neg_index = 0

  len_data_pos = len(data_pos)
  len_data_neg = len(data_neg)

  pos_index_lines = list(range(len_data_pos))
  neg_index_lines = list(range(len_data_neg))

  if shuffle:
    random.shuffle(pos_index_lines)
    random.shuffle(neg_index_lines)
  
  stop = False

  while not stop:
    batch = []
    
    for i in range(n_to_take):
      if pos_index >= len_data_pos:
        if not loop:
          stop = True;
          break;
        
        pos_index = 0

        if shuffle:
          random.shuffle(pos_index_lines)
      
      tweet = data_pos[pos_index_lines[pos_index]]
      tensor = tweet_to_tensor(tweet, vocab_dict)
      batch.append(tensor)
      pos_index = pos_index + 1
    
    for i in range(n_to_take):
      if neg_index >= len_data_neg:
        if not loop:
          stop = True;
          break;
        
        neg_index = 0

        if shuffle:
          random.shuffle(neg_index_lines)
      
      tweet = data_neg[neg_index_lines[neg_index]]
      tensor = tweet_to_tensor(tweet, vocab_dict)
      batch.append(tensor)
      neg_index = neg_index + 1
    
    if stop:
      break;
    
    pos_index += n_to_take
    neg_index += n_to_take

    max_len = max([len(t) for t in batch])

    tensor_pad_l = []

    for tensor in batch:
      n_pad = max_len - len(tensor)
      pad_l = [0] * n_pad
      tensor_pad = tensor + pad_l
      tensor_pad_l.append(tensor_pad)
    
    inputs = np.array(tensor_pad_l)

    target_pos = [1] * n_to_take
    target_neg = [0] * n_to_take
    target_l = target_pos + target_neg
    targets = np.array(target_l)

    example_weights = np.ones_like(targets)

    yield inputs, targets, example_weights


import random
random.seed(30)

def train_generator(batch_size, shuffle = False):
    return data_generator(train_pos, train_neg, batch_size, True, Vocab, shuffle)

def val_generator(batch_size, shuffle = False):
    return data_generator(val_pos, val_neg, batch_size, True, Vocab, shuffle)

def test_generator(batch_size, shuffle = False):
    return data_generator(val_pos, val_neg, batch_size, False, Vocab, shuffle)

inputs, targets, example_weights = next(train_generator(4, shuffle=True))
print(f"inputs : {inputs}")
print(f"targets : {targets}")
print(f"weight : {example_weights}")

inputs : [[2005 4451 3201    9    0    0    0    0    0    0    0]
 [4954  567 2000 1454 5174 3499  141 3499  130  459    9]
 [3761  109  136  583 2930 3969    0    0    0    0    0]
 [ 250 3761    0    0    0    0    0    0    0    0    0]]
targets : [1 1 0 0]
weight : [1 1 1 1]


In [None]:
# model
def classifier(vocab_size=len(vocab), embedding_dim=256, output_dim=2,
               model='train'):
  from trax import layers as tl

  embed_layer = tl.Embedding(
      vocab_size=vocab_size,
      d_feature=embedding_dim)
  
