In [1]:
import tensorflow as  tf #Required to fit Neural Networks
import os #Required to address wd
import pandas as pd #Required for data manipulation
import numpy as np #Required for mathematical operations
import re #Required for reshaping the tweets without punctuation signs
import string#Required for reshaping the tweets without punctuation signs
from tensorflow.keras.layers import TextVectorization #Required for text vectorisation
import sklearn as sk #Required for accuracy metrics
import torch
import torch.nn.functional as F
from datasets import Dataset


$$
\large\textbf{Twitter/X sentiment analysis}
$$


$$
\textbf{Problem definition:}
$$


$
\small\text{Companies are constantly launching and testing new products, usually teasing them on Twitter/X. }
$
$
\small\text{The amount of tweets that react to these company posts can be very large, making the overall sentiment difficult to measure. }
$
$
\small\text{Millions of tweets can react to a single post, it is virtually impossible for a company to read through all these tweets.
}
$
$
\small \text{This ML/AI solution can help companies by quickly analyzing tweets and determining the overal positiveness of the market towards a product.}
$
$
\small \text{Hence, a company can theoretically retrieve or download all the tweets with a certain hashtag, run them through the model, and average the results} 
$
$
\small \text{ to get a quick picture of the overall market sentiment towards the teased product.
}
$


$$
\textbf{Data and assumptions of the project:}
$$

$
\small\text{Data is a subset of the sentiment140 open dataset from tensorflow hub.}
\
\href{https://www.tensorflow.org/datasets/catalog/sentiment140}{TFSentiment140}
$
$
\small\text{The data file format has 6 fields:}
\\
$
$
\small\text{ - The polarity of the tweet (0 = negative, 4 = positive)}  
$

$
\small\text{ - The id of the tweet (2087)}  
$

$
\small\text{ - The date of the tweet (Sat May 16 23:58:44 UTC 2009)}  
$

$
\small\text{ - The query (lyx). If there is no query, then this value is NO_QUERY.}  
$

$
\small\text{ - The user that tweeted}  
$

$
\small\text{ - The text of the tweet (Lyx is cool)}
$

$
\small\text{The dataset used can be retrieved directly from:}
\
\href{https://github.com/tensorflow/datasets/tree/master/tensorflow_datasets/datasets/sentiment140/dummy_data/training.1600000.processed.noemoticon.csv}{CSVLinkData}
$


$$
\textbf{Data Citation:}
$$

$
\small{Go, Alec, Bhayani, Richa, and Huang, Lei.} 
$
$
\small{\textit{"Twitter Sentiment Classification using Distant Supervision."(2009). } Available:} 
$
$
\href{http://help.sentiment140.com/home}{http://help.sentiment140.com/home}
$

$$
\textbf{Implementation}
$$

$$
\small{\text{- Uploading the data, resizing and splitting it -}}
$$

In [2]:
#setting the working directory
directory="C:/Users/sergi/Documents/Py/twittersentiment"
os.chdir(directory)

In [3]:
#Loading the dataset
import warnings
warnings.filterwarnings("ignore")
training_val=pd.read_excel("training.xlsx")

In [4]:
#Showcasing and cleaning the data
training_val=training_val[["sentiment","id","date","query","user","tweet"]]
training_val.head()

Unnamed: 0,sentiment,id,date,query,user,tweet
0,0,1467810000.0,Mon Apr 06 22:19:45 PDT 2009,NO_QUERY,_TheSpecialOne_,"@switchfoot http://twitpic.com/2y1zl - Awww, t..."
1,0,1467811000.0,Mon Apr 06 22:19:49 PDT 2009,NO_QUERY,scotthamilton,is upset that he can't update his Facebook by ...
2,0,1467811000.0,Mon Apr 06 22:19:53 PDT 2009,NO_QUERY,mattycus,@Kenichan I dived many times for the ball. Man...
3,0,1467811000.0,Mon Apr 06 22:19:57 PDT 2009,NO_QUERY,ElleCTF,my whole body feels itchy and like its on fire
4,0,1467811000.0,Mon Apr 06 22:19:57 PDT 2009,NO_QUERY,Karoli,"@nationwideclass no, it's not behaving at all...."


In [5]:
tweets_train=[str(i) for i in training_val["tweet"].tolist()]
outputs = [0 if i == 0 else 1 if i == 4 else i for i in training_val["sentiment"].tolist()]
Data=pd.DataFrame({"inputs":tweets_train, "outputs":outputs})


In [6]:
#Summarising data
nr_positive=len(Data[Data["outputs"]==1])
nr_negative=len(Data[Data["outputs"]==0])
print(f"Nr of positive tweets: {nr_positive}, Nr of negative tweets:{nr_negative}, Total Nr of Tweets:{len(Data)}")

Nr of positive tweets: 249953, Nr of negative tweets:798622, Total Nr of Tweets:1048575


In [7]:
#Reducing the dataset to 50000 positive and negative (this is done to avoid excessive training time)
shuffled_positive = Data[Data["outputs"]==1].sample(frac=1, random_state=10).reset_index(drop=True)
shuffled_negative = Data[Data["outputs"]==0].sample(frac=1, random_state=10).reset_index(drop=True)

shuffled_positive_train=shuffled_positive.iloc[:int(nr_positive*0.9),:]
shuffled_negative_train=shuffled_negative.iloc[:int(nr_positive*0.9),:]

shuffled_positive_test=shuffled_positive.iloc[int(nr_positive*0.9):,:]
shuffled_negative_test=shuffled_negative.iloc[int(nr_positive*0.9):len(shuffled_positive),:]


train_data=pd.concat([shuffled_positive_train,shuffled_negative_train], axis=0).sample(frac=1, random_state=10).reset_index(drop=True)
test_data=pd.concat([shuffled_positive_test,shuffled_negative_test], axis=0).sample(frac=1, random_state=10).reset_index(drop=True)


print(f"The training data has been resized to have the same amount of positive and negative tweets: {len(shuffled_positive_train)} for positive, and {len(shuffled_negative_train)} for negative")
print(f"For the testing data we are using: {len(shuffled_positive_test)} for positive, and {len(shuffled_negative_test)} for negative")

train_data.head()


The training data has been resized to have the same amount of positive and negative tweets: 224957 for positive, and 224957 for negative
For the testing data we are using: 24996 for positive, and 24996 for negative


Unnamed: 0,inputs,outputs
0,@LaurenConrad I can't believe he lost... I'm s...,0
1,Larry's out for the night at a Halo party with...,0
2,"Chase has been home for an hour, and has alrea...",0
3,I sliced my thumb pretty bad today... but I do...,0
4,@megkautz ring tailed lemurs are my favorite n...,1


In [8]:
train_data=Dataset.from_pandas(train_data)
train_data

Dataset({
    features: ['inputs', 'outputs'],
    num_rows: 449914
})

In [9]:
#BERT
from transformers import AutoTokenizer

model_cpkt="distilbert-base-uncased"

tokenizer=AutoTokenizer.from_pretrained(model_cpkt)

# ##Alternatively: 
# from transformers import DistilBertTokenizer
# distilbert_tokenizer=DistilBertTokenizer.from_pretrained(model_cpkt)

#Example
encoded_text_example=tokenizer(train_data["inputs"][0])
print(encoded_text_example)

#see where each id maps to what word - Essentially we see the tokens
print(tokenizer.convert_ids_to_tokens(encoded_text_example.input_ids))
tokens = tokenizer.convert_ids_to_tokens(encoded_text_example.input_ids)
#Original text
print(train_data["inputs"][0])

#Convert token to strings
print(tokenizer.convert_tokens_to_string(tokens))

{'input_ids': [101, 1030, 10294, 8663, 12173, 1045, 2064, 1005, 1056, 2903, 2002, 2439, 1012, 1012, 1012, 1045, 1005, 1049, 2061, 6517, 1012, 102], 'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]}
['[CLS]', '@', 'lauren', '##con', '##rad', 'i', 'can', "'", 't', 'believe', 'he', 'lost', '.', '.', '.', 'i', "'", 'm', 'so', 'sad', '.', '[SEP]']
@LaurenConrad I can't believe he lost... I'm so sad. 
[CLS] @ laurenconrad i can ' t believe he lost... i ' m so sad. [SEP]


In [10]:
#Tokenising the  whole training set 
def tokenize(batch): 
    return tokenizer(batch["inputs"], padding=True, truncation=True)

encoded_train= train_data.map(tokenize)

Map:   0%|          | 0/449914 [00:00<?, ? examples/s]

In [11]:
encoded_train

Dataset({
    features: ['inputs', 'outputs', 'input_ids', 'attention_mask'],
    num_rows: 449914
})

Feeding our data into distilbert and obtaining the last hidden output for the model. 
This output will be fet into a dense network that will give us a sigmoid probability if the inputs has a positive or a negative sentiment. 

Below I specify the code for tf and torch inputs. 

In [None]:
##Obtaining the hidden states for the model using BERT. In the sense of an encoder-decoder model this would be th equivalent of the econder part.

##Pytorch

import torch
from transformers import AutoModel, AutoTokenizer

# Load tokenizer and model
model_checkpoint = "distilbert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = AutoModel.from_pretrained(model_checkpoint).to(device)

# Function to extract hidden states
def extract_hidden_states(batch):
    # Tokenize and pad the inputs
    # Convert input_ids and attention_mask to tensors and pad them to the same length
    inputs = tokenizer(batch["inputs"], padding=True, truncation=True, return_tensors="pt", max_length=512)
    
    input_ids = inputs["input_ids"].to(device)
    attention_mask = inputs["attention_mask"].to(device)
    
    # Forward pass through the model without gradient tracking
    with torch.no_grad():
        # Get the last hidden state from the DistilBERT model
        last_hidden_state = model(input_ids=input_ids, attention_mask=attention_mask).last_hidden_state
    
    # Return the hidden state for the [CLS] token (first token in the sequence)
    return {"hidden_state": last_hidden_state[:, 0].cpu().numpy()}

# Apply the function to extract hidden states using .map()
train_hidden = train_data.map(extract_hidden_states, batched=True, batch_size=32)


In [12]:
##Tensorflow

import tensorflow as tf
from transformers import TFAutoModel, AutoTokenizer

# Load tokenizer and model
model_checkpoint = "distilbert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)
modelbert = TFAutoModel.from_pretrained(model_checkpoint)

def extract_hidden_states_from_text(text):
    # Tokenize and pad the input
    inputs = tokenizer(text, padding=True, truncation=True, return_tensors="tf", max_length=512)

    input_ids = inputs["input_ids"]
    attention_mask = inputs["attention_mask"]
    
    # Forward pass through the model to get the last hidden state
    last_hidden_state = modelbert(input_ids=input_ids, attention_mask=attention_mask)[0]  # [0] for last_hidden_state
    
    # Return the hidden state for the [CLS] token (first token in the sequence)
    return last_hidden_state[:, 0].numpy()



# Function to extract hidden states
def extract_hidden_states(batch):
    # Tokenize and pad the inputs
    inputs = tokenizer(batch["inputs"], padding=True, truncation=True, return_tensors="tf", max_length=512)

    input_ids = inputs["input_ids"]
    attention_mask = inputs["attention_mask"]
    
    # Forward pass through the model without gradient tracking
    # tf.function can be used for optimizing the forward pass, but it is not strictly necessary here
    last_hidden_state = modelbert(input_ids=input_ids, attention_mask=attention_mask)[0]  # [0] for last_hidden_state
    
    # Return the hidden state for the [CLS] token (first token in the sequence)
    return {"hidden_state": last_hidden_state[:, 0].numpy()}

# Example of how you might apply this to your dataset
# Assuming train_data is a Dataset object with 'inputs' as one of the columns
 train_hidden = train_data.map(extract_hidden_states, batched=True, batch_size=32)






Some weights of the PyTorch model were not used when initializing the TF 2.0 model TFDistilBertModel: ['vocab_transform.weight', 'vocab_layer_norm.bias', 'vocab_layer_norm.weight', 'vocab_transform.bias', 'vocab_projector.bias']
- This IS expected if you are initializing TFDistilBertModel from a PyTorch model trained on another task or with another architecture (e.g. initializing a TFBertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFDistilBertModel from a PyTorch model that you expect to be exactly identical (e.g. initializing a TFBertForSequenceClassification model from a BertForSequenceClassification model).
All the weights of TFDistilBertModel were initialized from the PyTorch model.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFDistilBertModel for predictions without further training.


Map:   0%|          | 0/449914 [00:00<?, ? examples/s]

In [69]:
train_data

Dataset({
    features: ['inputs', 'outputs'],
    num_rows: 449914
})

In [16]:
train_hidden

Dataset({
    features: ['inputs', 'outputs', 'hidden_state'],
    num_rows: 449914
})

In [17]:
#Transforming hidden states into Keras tensorflow 
train_dataset = tf.data.Dataset.from_tensor_slices((train_hidden["hidden_state"], train_hidden["outputs"])).batch(1000)

In [35]:
# Calculate sizes for the training and validation datasets
dataset_size = len(list(train_dataset))  # Convert to list to get the size of the dataset
train_size = int(0.9 * dataset_size)
val_size = dataset_size - train_size

# Create training and validation datasets
train_split = train_dataset.take(train_size)
val_split = train_dataset.skip(train_size)


In [36]:
for example in val_split.take(1):
    print("Hidden States Shape:", example[0].shape)  # Should be (batch_size, 768)
    print("Outputs Shape:", example[1].shape)         # Should be (batch_size,)

Hidden States Shape: (1000, 768)
Outputs Shape: (1000,)


Decoder part of the model: 

Creating a keras dense model were the last hidden output from the bert model is passed

In [None]:
model = tf.keras.Sequential([
    tf.keras.layers.Dense(500, activation='relu'),  # Ensure the input shape matches
    tf.keras.layers.Dense(200, activation='relu'),  # Ensure the input shape matches
    tf.keras.layers.Dense(100, activation='relu'),
    tf.keras.layers.Dense(50, activation='relu'),
    tf.keras.layers.Dense(1, activation='sigmoid')  # Output layer with sigmoid for binary classification
])

model.compile(
    optimizer='adam',
    loss='binary_crossentropy',  # Use binary_crossentropy for binary classification
    metrics=['accuracy']
)

# Fit the model
model.fit(train_split, epochs=10, validation_data=val_split)                                                                


In [39]:
train_loss, train_accuracy = model.evaluate(train_split)
print(f'Categorical Cross entropy Loss Train set: {train_loss}') ###Loss value on th last dataset
print(f'Train set Accuracy: {train_accuracy}')

val_loss, val_accuracy = model.evaluate(val_split)
print(f'Categorical Cross entropy Loss Validation Set: {val_loss}') ###Loss value on th last dataset
print(f'Validation set Accuracy: {val_accuracy}')

[1m364/364[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 5ms/step - accuracy: 0.7869 - loss: 0.4455
Categorical Cross entropy Loss Train set: 0.44335004687309265
Train set Accuracy: 0.7881895899772644
[1m41/41[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 5ms/step - accuracy: 0.7719 - loss: 0.4731
Categorical Cross entropy Loss Validation Set: 0.4682205617427826
Validation set Accuracy: 0.7738292813301086


Defining a Function that will apply the hiddent state of the BERT model to out trained dense network.
The end product is a function that tell us if a tweet has postive or negative sentiment.

In [102]:
#Creating a function that tells me if a tweet is good or bad
def good_bad_twee(text): 
    text_hid=extract_hidden_states_from_text(text)
    predictions=model.predict([text_hid])
    if predictions>0.5: 
        print(text + ":is positive")
    else:
        print(text +"is negative")
    

In [103]:
tweet = "This is fantastic!!"
good_bad_twee(tweet)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step
This is fantastic!!:is positive


In [104]:
tweet = "This is fuckedup!!"
good_bad_twee(tweet)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
This is fuckedup!!is negative


In [106]:
tweet = "Great day for you"
good_bad_twee(tweet)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 15ms/step
Great day for you:is positive
