# Train data Loading

In [1]:
from wordcloud import WordCloud
from nltk.tokenize import RegexpTokenizer
import numpy as np
import pandas as pd
import regex as re
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns
sns.set(style="whitegrid")
import plotly.express as px
import plotly.graph_objects as go
import nltk
nltk.download('stopwords')
nltk.download('averaged_perceptron_tagger')
nltk.download('wordnet')
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.corpus import wordnet
from sklearn.preprocessing import OrdinalEncoder
pd.set_option('display.max_columns', None)

import warnings
warnings.filterwarnings('ignore')
from sklearn.preprocessing import OrdinalEncoder

In [2]:
df = pd.read_csv("twitter_training.csv")

In [3]:
# Label the columns
df.columns = ["tweet_id","place", "sentiments", "tweets"]
df = df.drop("place", axis=1)
df

# Data Cleaning and Preprocessing

In [4]:
df.info()

In [5]:
# Identify inconsistencies
#The code `duplicate_rows = df[df.duplicated()]` is identifying and storing the duplicate rows in the DataFrame `df`. The `df.duplicated()` function returns a boolean Series indicating whether each row is a duplicate or not. By passing this boolean Series as a filter to the DataFrame `df`, only the duplicate rows are selected and stored in the variable `duplicate_rows`.
duplicate_rows = df[df.duplicated()]
duplicate_rows

In [6]:
# Identify missing values
#The code is calculating the number of missing values in each column of the DataFrame `df`. It uses the `isna()` method to check for missing values and the `sum()` method to calculate the total number of missing values in each column. The result is stored in the variable `missing_values`.
missing_values = df.isna().sum()
missing_values

In [7]:
# removing duplicates

df.drop_duplicates(inplace = True)

# Remove the duplicate rows
df = df.drop_duplicates()

df['tweets'] = df['tweets'].astype(str)



stopw = set(stopwords.words("english"))

print(stopw)
df["tweets"] = df['tweets'].apply(lambda x: ' '.join(
    [word.lower() for word in x.split() if word.lower() not in stopw]))


def convert_list_to_str(l):
    st = ""
    for i in l:
        st = st+i+" "
    st = st[:-1]
    return st


tokenizer = RegexpTokenizer(r'\w+|\d+')
df["tweets_new"] = df["tweets"].apply(tokenizer.tokenize)
df["tweets_new"] = df["tweets_new"].apply(convert_list_to_str)


df

In [8]:

lemmatizer = WordNetLemmatizer()


def get_wordnet_pos(treebank_tag):
    if treebank_tag.startswith('J'):
        return wordnet.ADJ
    elif treebank_tag.startswith('V'):
        return wordnet.VERB
    elif treebank_tag.startswith('N'):
        return wordnet.NOUN
    elif treebank_tag.startswith('R'):
        return wordnet.ADV
    else:
        return wordnet.NOUN  # Default to noun if no mapping found


def lemmatize_sentence(sentence):
    # tokenizer = RegexpTokenizer(r'\w+')
    # words = word_tokenize(sentence)
    words = sentence.split()
    pos_tags = nltk.pos_tag(words)
    lemmatizer = WordNetLemmatizer()

    lemmatized_words = [lemmatizer.lemmatize(
        word, get_wordnet_pos(pos_tag)) for word, pos_tag in pos_tags]
    return ' '.join(lemmatized_words)


def lemmatize_text(text):
    words = text.split()
    lemmatized_words = [lemmatizer.lemmatize(word) for word in words]
    return ' '.join(lemmatized_words)

df["tweets_new"] = df["tweets_new"].apply(lemmatize_sentence)
print(df)

df["tweets"] = df["tweets_new"]

# df["tweets"] = df["tweets_new"].apply(convert_list_to_str)
df = df.drop(columns=["tweets_new"])
# print(df)

#fill 0 in mum
df['tweets'] = df['tweets'].fillna(0)

# Save the dataframe
df.to_csv("tweet_clean.csv", index=False)
df


In [9]:
#check for duplicates
df.duplicated().sum()

In [10]:
#check null
df['tweets'].isnull().sum()

# Exploratory data analysis (EDA)

In [11]:
#`df.describe().T` is transposing the output of the `describe()` method on a DataFrame `df`. The `describe()` method provides summary statistics of the numerical columns in the DataFrame, such as count, mean, standard deviation, minimum, maximum, and quartiles. By applying `.T` after `describe()`, the output is transposed, meaning the rows become columns and vice versa. This can be useful for better readability or for further analysis of the summary statistics.
df.describe().T

In [12]:
plt.figure(figsize=(10, 6))
sns.countplot(x='sentiments', data=df)
for container in plt.gca().containers:
    plt.gca().bar_label(container, fmt='%.2f')
plt.tight_layout()
plt.show()


In [13]:


df_copy = df.copy()

ordinal_encoder = OrdinalEncoder()

# Every single column with categorical values will be converted.
object_cols = ['sentiments']
df_copy[object_cols] = df_copy[object_cols].astype(str)

df_copy[object_cols] = ordinal_encoder.fit_transform(df_copy[object_cols])

df_copy.head()

ordinal_encoder = OrdinalEncoder()

# Every single column with categorical values will be converted.
object_cols = ['tweet_id', 'sentiments', 'tweets']
df[object_cols] = df[object_cols].astype(str)

df[object_cols] = ordinal_encoder.fit_transform(df[object_cols])

df.head()


In [14]:
print(df['sentiments'].nunique())

# download transformers and tensorflow

In [15]:
!pip install transformers

In [16]:
!pip install tensorflow

# BERT IMPLEMENTATION AND TRAINING

In [17]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from transformers import BertTokenizer, TFBertForSequenceClassification
from transformers import InputExample, InputFeatures
import tensorflow as tf
from tensorflow.keras.metrics import Precision, Recall
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical

# Split the DataFrame into training and validation sets
train_df, test_df = train_test_split(df_copy, test_size=0.2, random_state=0)

# Define labels as a list of sentiment labels from your DataFrame
train_labels = train_df['sentiments'].values.tolist()
test_labels = test_df['sentiments'].values.tolist()


num_labels = len(np.unique(train_labels))

# Instantiate the BERT model and tokenizer
model = TFBertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=num_labels)
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

from transformers import BertConfig

model_name = "bert-base-uncased"
config = BertConfig.from_pretrained(model_name)
max_seq_lengths = config.max_position_embeddings
embedding_dims = config.hidden_size

print("Max Sequence Length:", max_seq_lengths)
print("Embedding Dimension:", embedding_dims)


# precision and recall
precision = Precision()
recall = Recall()

# Prepare the inputs for the training set
train_input_ids, train_attention_masks, train_encoded_labels = [], [], []

for tweet, label in zip(train_df['tweets'], train_labels):
    inputs = tokenizer.encode_plus(tweet, add_special_tokens=True, max_length=128, pad_to_max_length=True,
                                   return_attention_mask=True, return_token_type_ids=True)
    train_input_ids.append(inputs['input_ids'])
    train_attention_masks.append(inputs['attention_mask'])
    train_encoded_labels.append(label)

# Convert lists to tensors
train_input_ids = tf.convert_to_tensor(train_input_ids)
train_attention_masks = tf.convert_to_tensor(train_attention_masks)


le = LabelEncoder()
train_encoded_labels = le.fit_transform(train_encoded_labels)


train_encoded_labels = to_categorical(train_encoded_labels)

# Prepare the inputs for the testing set
test_input_ids, test_attention_masks, test_encoded_labels = [], [], []

for tweet, label in zip(test_df['tweets'], test_labels):
    inputs = tokenizer.encode_plus(tweet, add_special_tokens=True, max_length=128, pad_to_max_length=True,
                                   return_attention_mask=True, return_token_type_ids=True)
    test_input_ids.append(inputs['input_ids'])
    test_attention_masks.append(inputs['attention_mask'])
    test_encoded_labels.append(label)


test_input_ids = tf.convert_to_tensor(test_input_ids)
test_attention_masks = tf.convert_to_tensor(test_attention_masks)


test_encoded_labels = le.transform(test_encoded_labels)

# Convert integer labels to one-hot encoded format
test_encoded_labels = to_categorical(test_encoded_labels)

# Define the batch size
batch_size = 32

# train_embeddings = model.predict([train_input_ids, train_attention_masks])
# test_embeddings = model.predict([test_input_ids, test_attention_masks])




In [18]:
# Compile the model
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=3e-5, epsilon=1e-08, clipnorm=1.0),
              loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
              metrics=[precision, recall, 'accuracy'])



In [19]:
# Train the model
history = model.fit(
    x=[train_input_ids, train_attention_masks],
    y=train_encoded_labels,
    batch_size=batch_size,
    epochs=10,
    validation_split=0.2
)
# train_embeddings = model.predict([train_input_ids, train_attention_masks])
# test_embeddings = model.predict([test_input_ids, test_attention_masks])

In [20]:
# Print the history
print(history.history)

In [21]:
# Assuming 'history' is the History object returned by model.fit()
history_dict = history.history

# Print the training accuracy
print("Accuracy: ", history_dict['accuracy'][-1])


# Validation Accuracy

In [22]:
test_embeddings = model.predict([test_input_ids, test_attention_masks])
from sklearn.metrics import accuracy_score

# Assuming test_encoded_labels are one-hot encoded labels 
# Assuming test_embeddings are the model predictions on the test data

# Convert one-hot encoded labels back to integers
test_predicted_labels = tf.argmax(test_embeddings.logits, axis=1).numpy()
test_true_labels = tf.argmax(test_encoded_labels, axis=1).numpy()

# Calculate  accuracy
accuracy = accuracy_score(test_true_labels, test_predicted_labels)
print("Test Accuracy:", accuracy)

In [23]:
# model.save_weights('bert_model.weights')
# model.save('bert_model.json')
# print("Model Saved")

# Test data preprocessing

In [24]:
test_data = pd.read_csv("twitter_validation.csv")
test_data.columns = ["tweet_id","place", "sentiments", "tweets"]
test_data = test_data.drop("place", axis=1)

test_data.drop_duplicates(inplace = True)

# Remove the duplicate rows
test_data = test_data.drop_duplicates()

test_data['tweets'] = test_data['tweets'].astype(str)



stopw = set(stopwords.words("english"))

print(stopw)
test_data["tweets"] = test_data['tweets'].apply(lambda x: ' '.join(
    [word.lower() for word in x.split() if word.lower() not in stopw]))


def convert_list_to_str(l):
    st = ""
    for i in l:
        st = st+i+" "
    st = st[:-1]
    return st


tokenizer = RegexpTokenizer(r'\w+|\d+')
test_data["tweets_new"] = test_data["tweets"].apply(tokenizer.tokenize)
test_data["tweets_new"] = test_data["tweets_new"].apply(convert_list_to_str)


lemmatizer = WordNetLemmatizer()


def get_wordnet_pos(treebank_tag):
    if treebank_tag.startswith('J'):
        return wordnet.ADJ
    elif treebank_tag.startswith('V'):
        return wordnet.VERB
    elif treebank_tag.startswith('N'):
        return wordnet.NOUN
    elif treebank_tag.startswith('R'):
        return wordnet.ADV
    else:
        return wordnet.NOUN  # Default to noun if no mapping found


def lemmatize_sentence(sentence):
    # tokenizer = RegexpTokenizer(r'\w+')
    # words = word_tokenize(sentence)
    words = sentence.split()
    pos_tags = nltk.pos_tag(words)
    lemmatizer = WordNetLemmatizer()

    lemmatized_words = [lemmatizer.lemmatize(
        word, get_wordnet_pos(pos_tag)) for word, pos_tag in pos_tags]
    return ' '.join(lemmatized_words)


def lemmatize_text(text):
    words = text.split()
    lemmatized_words = [lemmatizer.lemmatize(word) for word in words]
    return ' '.join(lemmatized_words)

test_data["tweets_new"] = test_data["tweets_new"].apply(lemmatize_sentence)
# print(test_data)

test_data["tweets"] = test_data["tweets_new"]

# df["tweets"] = df["tweets_new"].apply(convert_list_to_str)
test_data = test_data.drop(columns=["tweets_new"])
# print(df)

#fill 0 in mum
test_data['tweets'] = test_data['tweets'].fillna(0)

# Save the dataframe
test_data.to_csv("tweet_clean.csv", index=False)


In [25]:
print(test_data.head())
td_copy = test_data.copy()

ordinal_encoder = OrdinalEncoder()

# Every single column with categorical values will be converted.
object_cols = ['sentiments']
td_copy[object_cols] = td_copy[object_cols].astype(str)

td_copy[object_cols] = ordinal_encoder.fit_transform(td_copy[object_cols])

td_copy.head()

In [26]:
print(train_df.head())

# Test Data Evaluation

In [27]:
from sklearn.model_selection import train_test_split
from transformers import BertTokenizer, TFBertForSequenceClassification
import tensorflow as tf
from tensorflow.keras.metrics import Accuracy
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from transformers import BertTokenizer, TFBertForSequenceClassification
from transformers import InputExample, InputFeatures
import tensorflow as tf
from tensorflow.keras.metrics import Precision, Recall
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical

# Split the DataFrame into training and testing sets
train_df, test_df = df_copy, td_copy

# Define labels as a list of sentiment labels from your DataFrame
train_labels = train_df['sentiments'].values.tolist()
test_labels = test_df['sentiments'].values.tolist()


num_labels = len(np.unique(train_labels))

# Instantiate the BERT model and tokenizer
# model = TFBertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=num_labels)
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

from transformers import BertConfig

model_name = "bert-base-uncased"
config = BertConfig.from_pretrained(model_name)
max_seq_lengths = config.max_position_embeddings
embedding_dims = config.hidden_size

print("Max Sequence Length:", max_seq_lengths)
print("Embedding Dimension:", embedding_dims)



precision = Precision()
recall = Recall()

# Prepare the inputs for the testing set
test_input_ids, test_attention_masks, test_encoded_labels = [], [], []

for tweet, label in zip(test_df['tweets'], test_labels):
    inputs = tokenizer.encode_plus(tweet, add_special_tokens=True, max_length=128, pad_to_max_length=True,
                                   return_attention_mask=True, return_token_type_ids=True)
    test_input_ids.append(inputs['input_ids'])
    test_attention_masks.append(inputs['attention_mask'])
    test_encoded_labels.append(label)


test_input_ids = tf.convert_to_tensor(test_input_ids)
test_attention_masks = tf.convert_to_tensor(test_attention_masks)

# Convert integer labels to one-hot encoded format
test_encoded_labels = to_categorical(test_encoded_labels)

# Predict using the model on the test data
test_embeddings = model.predict([test_input_ids, test_attention_masks])

# model evaluate
# Compile the model
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=3e-5, epsilon=1e-08, clipnorm=1.0),
              loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
              metrics=[precision, recall, 'accuracy'])

# Evaluate the model
print(model.evaluate([test_input_ids, test_attention_masks], test_encoded_labels))



In [28]:
from sklearn.metrics import accuracy_score

# Assuming test_encoded_labels are one-hot encoded labels (as in the previous example)
# Assuming test_embeddings are the model predictions on the test data

# Convert one-hot encoded labels back to integers
test_predicted_labels = tf.argmax(test_embeddings.logits, axis=1).numpy()
test_true_labels = tf.argmax(test_encoded_labels, axis=1).numpy()

# Calculate Test Data accuracy
accuracy = accuracy_score(test_true_labels, test_predicted_labels)
print("Test Accuracy:", accuracy)