In [1]:
import nltk   
import spacy            
import re     
import string            
import numpy as np
import pandas as pd
import math
import random
import matplotlib.pyplot as plt
from nltk.corpus import twitter_samples    # Corpus Twitter
from nltk.tokenize import word_tokenize 
import nltk
from nltk.stem import PorterStemmer
from nltk.tokenize import TweetTokenizer
nltk.download('punkt')
nltk.download('stopwords')

  from .autonotebook import tqdm as notebook_tqdm
[nltk_data] Error loading punkt: <urlopen error [SSL:
[nltk_data]     CERTIFICATE_VERIFY_FAILED] certificate verify failed:
[nltk_data]     unable to get local issuer certificate (_ssl.c:1123)>
[nltk_data] Error loading stopwords: <urlopen error [SSL:
[nltk_data]     CERTIFICATE_VERIFY_FAILED] certificate verify failed:
[nltk_data]     unable to get local issuer certificate (_ssl.c:1123)>


False

Lectura de Corpus

In [2]:
nltk.download('twitter_samples')

[nltk_data] Error loading twitter_samples: <urlopen error [SSL:
[nltk_data]     CERTIFICATE_VERIFY_FAILED] certificate verify failed:
[nltk_data]     unable to get local issuer certificate (_ssl.c:1123)>


False

In [3]:
pos_tweets = twitter_samples.strings('positive_tweets.json') #tweets positivos
neg_tweets = twitter_samples.strings('negative_tweets.json') #tweets negativos

print("Positive tweets: ", len(pos_tweets))
print("Negative tweets: ", len(neg_tweets))

Positive tweets:  5000
Negative tweets:  5000


Procesamiento


1. LowerCase
2. Lematización / Stemming
3. Remover stopword
4. Remover signos de puntuación
4. Remover urls y manejadores





In [4]:
def custom_tokenizer(nlp):
    special_cases = {":)": [{"ORTH": ":)"}], ":(": [{"ORTH": ":("}]}
    simple_url_re = re.compile(r'''^https?://''')
    suffixes = nlp.Defaults.suffixes + [r'''-+$''',]
    prefixes = nlp.Defaults.prefixes + [r'^[\-\—\–\+\+\.\!\/\,\"\(\)\[\]\{\}\:\;\<\>\?\¿\¡\|\&\#\@\$\%\^\*\_\\\'\`\~]']
    suffix_regex = spacy.util.compile_suffix_regex(suffixes)
    prefixes_regex = spacy.util.compile_prefix_regex(prefixes)
    return spacy.tokenizer.Tokenizer(nlp.vocab, rules=special_cases, suffix_search=suffix_regex.search, prefix_search=prefixes_regex.search, url_match=simple_url_re.match)

nlp = spacy.load("en_core_web_sm")
nlp.tokenizer = custom_tokenizer(nlp)


In [5]:
def normalization(data, regularization="lemma", language='english'):
  stopwords = nltk.corpus.stopwords.words(language)
  ps = PorterStemmer()
  normalized_data = []
  
  for tweet in data:
    tweet = re.sub(r'^RT[\s]+', '', tweet) # identificar retweets
    tweet = re.sub(r'https?://[^\s\n\r]+', '', tweet) #eliminar links
    tweet = re.sub(r'#', '', tweet) #eliminar símbolo gato
    tweet = re.sub(r'@\w+', '', tweet) #eliminar palabras que inicias con @
    tweet = re.sub(r'\d+', '', tweet) #eliminar números
    tweet = re.sub(' +', ' ', tweet) #quitar espacios

    if regularization == "stem":
      tweetTokenizer = TweetTokenizer()
      words = tweetTokenizer.tokenize(tweet)
      tokens = [ps.stem(w) for w in words]
    if regularization == "lemma":
      doc = nlp(tweet)
      tokens = [token.lemma_ for token in doc]
    else:
      doc = nlp(tweet)
      tokens = [token.text for token in doc]
    
    normalized_tweets = [w for w in tokens if w not in stopwords and not w==' ' and w not in string.punctuation]
    normalized_data.append(normalized_tweets)
  return normalized_data

In [6]:
norm_pos = normalization(pos_tweets)
norm_neg = normalization(neg_tweets)
all_tweets = norm_pos + norm_neg
tags = [1]*len(norm_pos) + [0]*len(norm_neg)

Shuffle samples

In [8]:
"""Shuffle samples"""
tuple_list = list(zip(all_tweets, tags)) #list of tuples (X,y)
random.seed(30)
random.shuffle(tuple_list)
X_, y_ = zip(*tuple_list)

X = list(X_)
y = list(y_)

Split Corpus in test and train

In [9]:
test_percentage = 0.2
split = int(len(X) * test_percentage)
X_test = X[:split]
y_test = y[:split]
X_train = X[split:]
y_train = y[split:]

m_train = len(X_train) #number of examples in X_train
m_test = len(X_test) #number of examples in X_test

Calcular freq(w,class_of_sentiment)

In [10]:
def build_frequencies(normalized_tweets, y):
    pos_freqs_dict, neg_freqs_dict = {}, {}
    for i, tweet in enumerate(normalized_tweets):
        for word in tweet:
            if y[i] == 1: #positive sentiment
                pos_freqs_dict[(word,1)] = pos_freqs_dict.get((word,1), 0) + 1
            else:
                #negative sentiment
                neg_freqs_dict[(word,0)] = neg_freqs_dict.get((word,0), 0) + 1
    return pos_freqs_dict, neg_freqs_dict

In [None]:
X_train

In [13]:
pos_freqs_dict, neg_freqs_dict = build_frequencies(X_train, y_train)

In [None]:
pos_freqs_dict

Calculate Vocabulary

In [15]:
at = [w for tweet in X_train for w in tweet]
fd = nltk.FreqDist(at)
vocabulary = sorted(list(fd.keys()))

Calcular N_class y tamaño de vocabulario

In [16]:
N_pos_class = sum(list(pos_freqs_dict.values()))
N_neg_class = sum(list(neg_freqs_dict.values()))
len_voc = len(vocabulary)
print("Number of words of the positive class: ", N_pos_class)
print("Number of words of the negative class: ", N_neg_class)
print("Vocabulary size: ", len_voc)


Number of words of the positive class:  27931
Number of words of the negative class:  29009
Vocabulary size:  10739


Calcular probabilidad condicional P(w|class)

Probabilidad de que una palabra se encuentre en una clase (sentimiento positivo o negativo)

Se hace uso de laplacian smoothing :

P(w_i|class) = (freq(w_i,class) + 1) / (N_class + len_voc)

Donde N_class puede ser N_pos_class y N_neg_class

In [19]:
def P__w_i_given_class__(freqs_dict, N_class, vocabulary, len_voc):
    #positive sentiment = 1
    #negative sentiment = 0
    sentiment = list(freqs_dict)[0][1]
    prob_dict = {}
    for word in vocabulary:
        freq = freqs_dict.get((word,sentiment), 0)
        #Use laplace smoothing
        prob_dict[(word,sentiment)] = ((freq + 1) / (N_class + len_voc))
    return prob_dict

In [20]:
pos_prob_dict = P__w_i_given_class__(pos_freqs_dict, N_pos_class, vocabulary, len_voc)
neg_prob_dict = P__w_i_given_class__(neg_freqs_dict, N_neg_class, vocabulary, len_voc)

Obtener lambda expresado como logaritmo

Calcular log-likelihood, Regla de condición de inferencia para clasificación binaria

In [30]:
def lambda_ratio(vocabulary, pos_prob_dict, neg_prob_dict):
    lambda_dict = {}
    #positive sentiment = 1
    #negative sentiment = 0
    for word in vocabulary:
        prob_w_given_pos_ = pos_prob_dict[(word,1)]
        prob_w_given_neg_ = neg_prob_dict[(word,0)]
        lambda_dict[word] = math.log(prob_w_given_pos_/prob_w_given_neg_)
    return lambda_dict

In [31]:
lambda_dict = lambda_ratio(vocabulary, pos_prob_dict, neg_prob_dict)

Obtener log prior

In [28]:
log_prior = math.log(len(pos_tweets)/len(neg_tweets))

In [27]:
def calculate_likelihood_per_tweet(tweet, lambda_dict, log_prior):
    log_likelihood = [lambda_dict.get(word, 0) for word in tweet]
    return sum([log_prior] + log_likelihood)

Evaluación

In [179]:
def naive_bayes_evaluation(X, lambda_dict, log_prior):
    y_pred = []
    for tweet in X:
        likelihood_per_tweet = calculate_likelihood_per_tweet(tweet, lambda_dict, log_prior)
        sentiment_pred = 1 if likelihood_per_tweet > 0 else 0
        y_pred.append(sentiment_pred)
    return y_pred

In [171]:
def confusion_matrix_values(y_pred, y_label):
  TP, FN, FP, TN = 0 , 0 , 0, 0
  for i in range(len(y_pred)):
    if y_pred[i] == 1 and  y_label[i] == 1:
      TP = TP + 1
    elif y_pred[i] == 1 and y_label[i] == 0:
      FP = FP + 1
    elif y_pred[i] == 0 and y_label[i] == 1:
      FN = FN + 1
    else:
      TN = TN + 1
  return TP, FN, FP, TN

In [172]:
"""Measures"""

def measures(TP, FN, FP, TN):
  accuracy = (TP + TN) / (TP + TN + FP + FN)
  fallout = FP / (FP + TN)
  precision = TP / (TP + FP)
  recall = TP / (TP + FN)
  F = (precision * recall) / (precision + recall)
  F1 = (2 * precision * recall)/(precision + recall)
  print("Presicion: ", precision)
  print("Recall: ", recall)
  print("F: ", F)
  print("F1: ", F1)
  print("Fallout: ", fallout)
  print("Accuracy: ", accuracy)

Evaluación para conjunto de entrenamiento

In [177]:
"""Evaluation for Train"""
y_train_pred = naive_bayes_evaluation(X_train, lambda_dict, log_prior)
TP, FN, FP, TN = confusion_matrix_values(y_train_pred, y_train)
print("EVALUATION FOR TRAIN SET: ")
print(TP,FN,FP,TN)
measures(TP,FN,FP,TN)

EVALUATION FOR TRAIN SET: 
3825 160 44 3971
Presicion:  0.9886275523391057
Recall:  0.9598494353826851
F:  0.487012987012987
F1:  0.974025974025974
Fallout:  0.010958904109589041
Accuracy:  0.9745


Evaluación para conjunto de prueba

In [178]:
"""Evaluation for Test"""
y_test_pred = naive_bayes_evaluation(X_test, lambda_dict, log_prior)
TP, FN, FP, TN = confusion_matrix_values(y_test_pred, y_test)
print("EVALUATION FOR TEST SET: ")
print(TP,FN,FP,TN)
measures(TP,FN,FP,TN)

EVALUATION FOR TEST SET: 
927 88 28 957
Presicion:  0.9706806282722513
Recall:  0.9133004926108375
F:  0.4705583756345178
F1:  0.9411167512690356
Fallout:  0.028426395939086295
Accuracy:  0.942
