# Sentiment Analysis
Run sentiment analysis on a reddit-comment.csv file to filter each comment into a positive, negative, or neutral text file for text generation

In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from scipy.special import softmax
import pandas as pd

# initializing model and tokenizer
roberta = "cardiffnlp/twitter-roberta-base-sentiment"
model = AutoModelForSequenceClassification.from_pretrained(roberta)
tokenizer = AutoTokenizer.from_pretrained(roberta)


# preprocessessing step
def read_csv(file_name):
    # data from csv file is read and saved as a pandas dataframe
    ds = pd.read_csv(file_name)

    # drops unecessary columns
    ds = ds.drop(columns=['bs1'])
    ds = ds.drop(columns=['bs2'])
    ds = ds.drop(columns=['bs3'])
    ds = ds.drop(columns=['bs4'])
    ds = ds.drop(columns=['bs5'])
    ds = ds.drop(columns=['bs6'])
    ds = ds.drop(columns=['bs7'])
    ds = ds.drop(columns=['main thread'])
    ds = ds.drop(columns=['Subreddit'])
    ds = ds.drop(columns=['num'])
    ds = ds.fillna(" ")

    return ds

# runs given sentence through the roberta model to get sentiment scores
def polarity_scores_roberta(sentence):
    words = []
    # filtering out nonsensical data
    for word in sentence.split(' '):
        if word.startswith('@') and len(word) > 1:
            word = '@user'
        elif word.startswith('http'):
            word = 'http'
        elif word.startswith('www'):
            word = 'www'
        words.append(word)
    processed_data = ' '.join(words)

    # tokenizing data and converting into a tensor for model
    encoded_text = tokenizer(processed_data, return_tensors='pt')
    # calling model on text
    output = model(**encoded_text)
    # saving the scores into a numpy array
    scores = output[0][0].detach().numpy()
    # taking the softmax of the scores returned by the model
    scores = softmax(scores)
    scores_dict = {''
        'negative': scores[0],
        'neutral': scores[1],
        'positive': scores[2]
    }
    # determining the main sentiment of the text and then filtering the text to the appropriate txt file
    sentiment = max(scores[0], scores[1], scores[2])

    if sentiment == scores[0]:
        fr = open('negative_text.txt', 'a')
        fr.write(processed_data + "\n")
        fr.close()
    elif sentiment == scores[1]:
        fr = open('neutral_text.txt', 'a')
        fr.write(processed_data + "\n")
        fr.close()
    else:
        fr = open('positive_text.txt', 'a')
        fr.write(processed_data + "\n")
        fr.close()

    return scores_dict

# iterates through processed pandas data frame and runs the model on the entire dataset
def SentimentAnalysis_on_data(data):
    res = {}
    for i, j in data.itertuples(index=False):
        text = i
        myid = j
        try:
            result = polarity_scores_roberta(text)
            res[myid] = {**result}
         # sometimes the text can be too long for the roberta model yeilding in a runtime error
         # model needs to continute running despite runtime errors
        except RuntimeError:
            print("Comment is too long")


if __name__ == "__main__":
    # reads in csv file about comments on motorcycle subreddit
    ds = read_csv('lifestyle_motorcycles.csv')
    data = SentimentAnalysis_on_data(ds)

# CNN-LSTM and LSTM text generation

In [None]:
import torch
import torch.nn as nn
from torch import optim as optim
import matplotlib.pyplot as plt
from datasets import load_metric

# sets the device to use GPU in order to decrease runtime
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

bleu = load_metric("bleu")

# reads in file character by character and adds all characters into a list
def read_data(file_name):
    with open(file_name) as text_file:
        text = text_file.read()

    return text

# creates an index to char dictionary and vice versa leading up to the encoding step
def char_tokenization(text):
    char_to_index = {}
    index_to_char = {}
    index = 0
    for char in text:
        if char not in char_to_index:
            char_to_index[char] = index
            index_to_char[index] = char
            index += 1
    return char_to_index, index_to_char

# creates a tokenized vocabulary using the character to index dictionary
# and converts tokenized text list into a tensor
def convert_text_to_tokenized_tensor(text, char_to_index):
    text_tensor = []
    for char in text:
        text_tensor.append(char_to_index[char])
    text_tensor = torch.LongTensor(text_tensor).unsqueeze(dim=1)

    # move tensor to GPU
    text_tensor = text_tensor.to(device)
    return text_tensor

# define LSTM model
class LSTM(nn.Module):
    def __init__(self, input_size, output_size, hidden_size, num_layers):
        super(LSTM, self).__init__()
        self.embedder = nn.Embedding(input_size, input_size)
        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers)
        self.decoder = nn.Linear(hidden_size, output_size)

    def forward(self, input_sequence, hidden_state):
        input_sequence = self.embedder(input_sequence)

        # sends data to the LSTM layer
        output, hidden_state = self.lstm(input_sequence, hidden_state)

        # sends data to the fully connected linear layer
        output = self.decoder(output)
        return output, (hidden_state[0].detach(), hidden_state[1].detach())

# define CNN LSTM model
class CNN_LSTM(nn.Module):
    def __init__(self, input_size, output_size, hidden_size, num_layers, kernel_size):
        super(CNN_LSTM, self).__init__()
        self.embedder = nn.Embedding(input_size, input_size)
        self.cnn1d = nn.Conv1d(in_channels=input_size * 100, out_channels=input_size * 100, kernel_size=3, stride=1,
                               padding=1)
        self.pool = nn.MaxPool1d(kernel_size=kernel_size, stride=1)
        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers)
        self.decoder = nn.Linear(hidden_size, output_size)

    def forward(self, input_sequence, hidden_state):
        input_sequence = self.embedder(input_sequence)

        # reshapes word embedded input for CNN layer
        input_sequence = torch.flatten(input_sequence)
        unflatten_cnn = torch.nn.Unflatten(0, (7800, 1))
        input_sequence = unflatten_cnn(input_sequence)

        # sends data to the CNN layer
        input_sequence = self.cnn1d(input_sequence)
        input_sequence = torch.relu((input_sequence))
        # reshapes tensor to appropriate shape for LSTM layer
        unflatten_lstm = torch.nn.Unflatten(0, (100, 78))
        input_sequence = unflatten_lstm(input_sequence)
        input_sequence = input_sequence.reshape(100, 1, 78)

        # sends data to the LSTM layer
        output, hidden_state = self.lstm(input_sequence, hidden_state)
        # sends data to the fully connected linear layer
        output = self.decoder(output)
        return output, (hidden_state[0].detach(), hidden_state[1].detach())

# trains model over training data
def train_model(char_to_index, num_of_epochs, input_sequence_len, text,
                text_tensor, model):

    # set model to use GPU
    model = model.to(device)

    # optimizer and loss function for backpropogation
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    loss_function = nn.CrossEntropyLoss()
    training_loss_per_epoch = []

    # training loop
    for epoch in range(num_of_epochs):
        print(f"Epoch number: {epoch + 1}\n")

        # initialize the hidden state and running loss
        running_loss = 0
        hidden_state = None

        # random starting index
        starting_index = range(torch.randint(high=input_sequence_len, size=(1,)).item(), len(text) - input_sequence_len,
                               input_sequence_len)
        for cur_index in starting_index:
            # define what the predictor and response should be for training
            predict = text_tensor[cur_index:cur_index + input_sequence_len]
            response = text_tensor[cur_index + 1:cur_index + input_sequence_len + 1]

            # train the model and compute the loss
            prediction, hidden_state = model(predict, hidden_state)
            loss = loss_function(torch.squeeze(prediction), torch.squeeze(response))

            running_loss += loss.item()
            # backpropogation step to learn the weights
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        training_loss_per_epoch.append(running_loss / len(starting_index))

    return model, training_loss_per_epoch

# generates text using trained model
def generate_text(model, training_loss_per_epoch,text_tensor, num_of_epochs, text, num_of_chars_to_generate, index_to_char):

    # file for saving generated text
    generated_text = open('generated_text_file.txt', 'a')

    # number of epochs to generate text
    for epoch in range(num_of_epochs):
        print(f"\nEpoch number: {epoch + 1}\n")

        #initializing the hidden state and the random seed input character
        hidden_state = None
        starting_rand_index = torch.randint(high=len(text) - 1, size=(1,)).item()
        input_char = text_tensor[starting_rand_index:starting_rand_index + 1]

        # print out the average training loss for the epoch
        print(f"Average Training Loss: {training_loss_per_epoch[epoch]}\n")

        print("\nGenerating Text:\n")
        # generated text loop to generate 200 characters
        for char in range(num_of_chars_to_generate):
            # output of the model given input text and hiddent state
            output, hidden_state = model(input_char, hidden_state)
            # take the softmax to compute the probability distribution for next letter
            # and sample from the distribution
            probability = nn.functional.softmax(torch.squeeze(output), dim=0)
            distribution = torch.distributions.Categorical(probability)
            predicted_char = distribution.sample()
            # print out the predicted char
            print(index_to_char[predicted_char.item()], end='')

            generated_text.write(index_to_char[predicted_char.item()] + "")
            # set the next input for the model
            input_char[0][0] = predicted_char.item()
        generated_text.write("\n")
    torch.save(model, "generated_text.txt")

# graph the training loss
def plot_training_loss(training_loss_per_epoch, num_of_epochs):
    for i in range(num_of_epochs):
        plt.plot(i, training_loss_per_epoch[i], 'o')

    plt.title("Training Loss per Epoch graphed over 25 epochs for LSTM model")
    plt.xlabel("Number of Epochs")
    plt.ylabel("Training Loss Per Epoch")
    plt.show()

def bleu_score_analysis(reference_file, prediction_file):
  with open(reference_file) as reference_:
        reference = reference_.read()

  with open(prediction_file) as prediction_:
        prediction = prediction_.read()

  results = bleu.compute(predictions=prediction, references=reference)
  print(results)



if __name__ == "__main__":
    num_of_hidden_layers = 512
    num_of_layers = 3
    kernel_size = 3


    text = read_data("hamlet.txt")

    char_to_index, index_to_char = char_tokenization(text)

    text_tensor = convert_text_to_tokenized_tensor(text, char_to_index)

    # define the LSTM model
    model = LSTM(input_size=len(char_to_index), output_size=len(char_to_index), hidden_size=num_of_hidden_layers,
                      num_layers=num_of_layers)

    # cnn_lstm_model = CNN_LSTM(input_size=len(char_to_index), output_size=len(char_to_index),
    #                           hidden_size=num_of_hidden_layers, num_layers=num_of_layers,
    #                           kernel_size=kernel_size)

    trained_model, training_loss_per_epoch = train_model(char_to_index, num_of_epochs=25, input_sequence_len=100, text=text, text_tensor=text_tensor, model=model)

    generate_text(model=trained_model, training_loss_per_epoch=training_loss_per_epoch,
                  text_tensor=text_tensor, num_of_epochs=25, text=text,
                  num_of_chars_to_generate=200, index_to_char=index_to_char)

    plot_training_loss(training_loss_per_epoch=training_loss_per_epoch, num_of_epochs=25)
    bleu_score_analysis("hamlet.txt", "generated_text_file.txt")