Data sourced from https://www.kaggle.com/datasets/utkarshxy/stock-markettweets-lexicon-data  
Some data processing abridged from https://www.kaggle.com/code/juniorbueno/stock-market-sentimen-bert-tokenizer  
Various code snippets from COMP9444 assignment 'paraphrased'

You will need to install (via pip3): torch, matplotlib, numpy, nltk.  
You will also need to run (with python3 in terminal)  
`>>>import nltk`  
`>>>nltk.download('stopwords')`  
`>>>nltk.download('wordnet')`  
`>>>nltk.download('omw-1.4')`

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import re
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords
from collections import Counter
from random import shuffle

with open('stock_data.csv', encoding='utf8') as csvfile:
    df = pd.read_csv(csvfile, delimiter=',')

df.dropna(axis=0, how='any', inplace=True)                         # Excludes null-containing rows
print(df['Sentiment'].value_counts())

In [None]:
# Hyperparameters
word_frequency_requirement = 4 # the number of times a word has to appear to be given
# it's own encoding. All words under this limit are encoded as the same 'unknown' word.
train_proportion = 0.8
hidden_layer_size = 70
learning_rate = 0.001
#batch_size = 32 # Batch size 1 only for now.
epochs = 10

In [None]:
# Regex removal of various undesirable parts of a tweet
def clean_tweet(tweet):
  tweet = re.sub(r"@[A-Za-z0-9]+", ' ', tweet) # Twitter handle removal
  tweet = re.sub(r"https?://[A-Za-z0-9./]+", ' ', tweet) # URL removal
  tweet = re.sub(r"[']", "", tweet) # Apostrophe removal
  tweet = re.sub(r"[^a-zA-Z.!?]", ' ', tweet) # Remove symbols that are not alphabetic or sentence endings
  tweet = re.sub(r"([^a-zA-Z])", r" \1 ", tweet) # Places spaces around sentence endings,
  # so they are encoded as their own words, rather than being lumped in with other words.
  tweet = re.sub(r" +", ' ', tweet) # Excess whitespace removal
  tweet = tweet.lower() # Send tweet to lowercase
  return tweet

In [None]:
# Prepare word lemmatizer and stopwords list for sanitisation
lemmatizer = WordNetLemmatizer()
stops = set(stopwords.words('english'))

def tokenize(tweet):
    tweet = clean_tweet(tweet)
    tweet = filter(lambda w: w not in stops, tweet.strip().split()) # Remove stopwords
    return list(map(lemmatizer.lemmatize, tweet)) # Lemmatize words.

In [None]:
san_df = pd.DataFrame([
    df['Text'].map(tokenize),
    df['Sentiment'].map(lambda x: torch.tensor([1,0]) if (x==1) else torch.tensor([0,1]))
    ]).T
    
indexes = [i for i, x in enumerate(san_df['Text']) if len(x) <= 5]
san_df.drop(indexes, inplace=True)
san_df.reset_index(drop=True, inplace=True)

print(san_df.Text[0])
san_df

In [None]:
# Counter class counts number of appearances of all words
word_count = Counter()
for tweet in san_df['Text']:
    word_count.update(tweet)
        
# Create a dictionary that maps words to their one-hot vector indices
vocab = [word for word in word_count if word_count[word] >= word_frequency_requirement] # vocab contains all words meeting the word frequency requirement.

dictionary = {word : i+1 for i, word in enumerate(vocab)} # dicionary is a mapping of each vocab word to its vector index.The +1 reserves the zero index.

dictionary[None] = 0 # Index 0 is reserved to be a blanket classification for all words below the word frequency requirement.

word_count

In [None]:
max_tweet_length = max(len(x) for x in san_df['Text'])

encoded_df = pd.DataFrame([[list(map(lambda w : dictionary.get(w, 0), tweet)) for tweet in san_df['Text']]]).T

encoded_df[0] = encoded_df[0].map( lambda x: x + [0] * (max_tweet_length - len(x)) )

    # print(min([min(t) for t in encoded_df[0]]))
    # print(min(dictionary.values())

onehot_df = pd.DataFrame([
    [F.one_hot(torch.LongTensor(enc_tweet), len(dictionary)+1) for enc_tweet in encoded_df[0]],
    san_df['Sentiment']
    ]).T

In [None]:
# Shuffle data and split into training and testing data
full_dataset = list(zip(one_hot_encoded_tweets, sentiment))

shuffle(full_dataset)

train_size = int(train_proportion * len(full_dataset))

train_dataset = full_dataset[:train_size]
test_dataset = full_dataset[train_size:]

train_dataset = list(zip(*train_dataset))
test_dataset = list(zip(*test_dataset))

# Training data and Training labels are kept as nested lists rather than tensors where possible,
# as tweets have varying length. This prevents the full data set from being represented as a pytorch
# tensor, which requires that all dimensions of the tensor must be equal.
# Yes, this sucks. It can be avoided in the future by padding sequences.
tr_data = train_dataset[0]
tr_label = train_dataset[1]

te_data_tensor = list(map(torch.FloatTensor, test_dataset[0]))
te_label_tensor = torch.FloatTensor(test_dataset[1])

In [None]:
class SRN_model(nn.Module):
    def __init__(self, num_input, num_hid, num_out):
        super().__init__()
        self.num_hid = num_hid
        self.batch_size = 1
        self.H0= nn.Parameter(torch.Tensor(num_hid))
        self.W = nn.Parameter(torch.Tensor(num_input, num_hid))
        self.U = nn.Parameter(torch.Tensor(num_hid, num_hid))
        self.hid_bias = nn.Parameter(torch.Tensor(num_hid))
        self.V = nn.Parameter(torch.Tensor(num_hid, num_out))
        self.out_bias = nn.Parameter(torch.Tensor(num_out))

        # Various initialisation schemes. Initialisation is important.
        nn.init.zeros_(self.H0)
        nn.init.xavier_normal_(self.W)
        nn.init.xavier_normal_(self.U)
        nn.init.zeros_(self.hid_bias)
        nn.init.xavier_normal_(self.V)
        nn.init.zeros_(self.out_bias)

    def init_hidden(self):
        H0 = torch.tanh(self.H0)
        return(H0.unsqueeze(0))
 
    def forward(self, seq):
        seq_size, _ = seq.size()
        h_t = self.init_hidden().to(seq.device)
        for t in range(seq_size):
            x_t = seq[t]
            c_t = x_t @ self.W + h_t @ self.U + self.hid_bias
            h_t = torch.tanh(c_t)
        output = h_t @ self.V + self.out_bias
        return output

In [None]:
def train(net, criterion, optimizer, data, label):
    net.init_hidden()

    # Forward
    output = net(data)

    # Apply output nonlinearity. Log_softmax chosen as it is suited for classification tasks
    log_prob = F.log_softmax(output, dim=1)
    
    loss = criterion(log_prob[0], label)
    
    loss.backward()

    optimizer.step()

    return loss.data.item()

In [None]:
# Counts the number of correct predictions the model can perform on the testing set
def predict(net, test_data, test_label):
    correct = 0
    for i in range(len(test_data)):
        output = net(test_data[i])
        if (test_label[i][torch.argmax(output.data)] == 1): correct += 1
    return correct

In [None]:
net = SRN_model(len(dictionary),hidden_layer_size,3)

# Negative log likelihood loss. Suited for classification tasks.
criterion = F.nll_loss

optimizer = optim.Adam(net.parameters(), lr=learning_rate, weight_decay=0.00001)

In [None]:
plot_loss = []
plot_correct = []

num_examples = len(tr_data)
num_batches = num_examples

for e in range(epochs):
    loss = 0.

    # Shuffle the dataset and convert training data sequences to FloatTensors right before training.
    # Converting them to FloatTensors earlier causes bugs in the zip function. Frustratingly.
    full_training_dataset = list(zip(tr_data, tr_label))
    shuffle(full_training_dataset)
    shuffled_training_dataset = list(zip(*full_training_dataset))

    tr_data_tensor  = list(map(torch.FloatTensor, shuffled_training_dataset[0]))
    
    tr_label_tensor = torch.LongTensor(shuffled_training_dataset[1])

    # Trains on every training data item individually each epoch
    for i in range(num_examples):
        loss += train(net, criterion, optimizer, tr_data_tensor[i], tr_label_tensor[i])

    # Evaluate proportion of the test set correctly predicted.
    correct = predict(net, te_data_tensor, te_label_tensor)/len(te_data_tensor)*100

    # Append loss and accuracy results to lists for later plotting.
    plot_loss.append(loss/num_batches)
    plot_correct.append(correct)
    
    # Print loss and accuracy every epoch.
    print("Epoch %02d, loss = %f, accuracy = %.2f%%" % (e+1, loss / num_batches, correct))

In [None]:
# Plot results
plt.plot(plot_loss)
plt.xlabel('Epoch')
plt.ylabel('Avg. Loss per Epoch (on Training Set)')
plt.show()

plt.plot(plot_correct)
plt.xlabel('Epoch')
plt.ylabel('Accuracy per Epoch (on Test Set)')
plt.show()