## ECE495 Final Project: Deep Neural Networks Text Generator

Run proper libraries

In [None]:
from urllib.request import urlretrieve

import csv
import re
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, LSTM, Dense, Dropout
from sklearn.model_selection import train_test_split

import nengo
import nengo_dl
import pandas as pd
import matplotlib.pyplot as plt
from dataclasses import dataclass

## FINAL SUBMISSION

In [None]:
# Function to process tweet text
def process_tweet_text(text):
    text = re.sub(r"http\S+", "", text)  # Remove URLs
    text = re.sub(r"@[a-zA-Z0-9_]+", "", text)  # Remove @ mentions
    text = text.strip(" ")  # Remove whitespace resulting from above
    text = re.sub(r" +", " ", text)  # Remove redundant spaces

    text = re.sub(r"&lt;", "<", text)
    text = re.sub(r"&gt;", ">", text)
    text = re.sub(r"&amp;", "&", text)
    return text


# Function to load tweets from dataset
def load_tweets_from_dataset(dataset_file):
    tweets = []
    dates = []
    stocks = []
    with open(dataset_file, "r", encoding="utf-8") as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            text = row["title"]  # Assuming 'title' is the column name for tweet text
            processed_text = process_tweet_text(text)
            date = row["date"]  # Assuming 'date' is the column name for tweet date
            stock = row[
                "stock"
            ]  # Assuming 'stock' is the column name for stock information
            tweets.append(processed_text)
            dates.append(date)
            stocks.append(stock)
    return tweets, dates, stocks


def preprocess_tweets(processed_texts):
    # Preprocessing adjustments for subsequence training
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(processed_texts)
    sequences = tokenizer.texts_to_sequences(processed_texts)

    max_sequence_length = max([len(seq) for seq in sequences])

    input_sequences = []
    for seq in sequences:
        for i in range(1, len(seq)):
            input_sequences.append(seq[: i + 1])

    sequences_padded = pad_sequences(
        input_sequences, maxlen=max_sequence_length, padding="pre"
    )
    return sequences_padded, tokenizer


# Load tweets from dataset
dataset_file = "analyst_ratings_processed.csv"
tweets, dates, stocks = load_tweets_from_dataset(dataset_file)


def create_and_train_model(X, Y, vocab_size):
    model = Sequential(
        [
            Embedding(vocab_size, 512, input_length=X.shape[1]),
            LSTM(512, return_sequences=True),
            Dropout(0.5),
            LSTM(512),
            Dropout(0.5),
            Dense(vocab_size, activation="softmax"),
        ]
    )

    model.compile(
        loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"]
    )
    model.fit(X, Y, epochs=150, batch_size=512)
    return model


def generate_tweet(
    model, tokenizer, prompt, max_sequence_length, num_words=20, temperature=0.5
):
    tweet = prompt
    for _ in range(num_words):
        sequence = tokenizer.texts_to_sequences([tweet])[0]
        sequence_padded = pad_sequences(
            [sequence], maxlen=max_sequence_length, padding="pre"
        )
        predictions = model.predict(sequence_padded)[0]
        predictions = np.log(predictions) / temperature
        exp_predictions = np.exp(predictions)
        predictions = exp_predictions / np.sum(exp_predictions)
        next_word_index = np.random.choice(len(predictions), p=predictions)
        next_word = tokenizer.index_word.get(next_word_index, "")
        tweet += " " + next_word
    return tweet


# Preprocess tweets
X, tokenizer = preprocess_tweets(tweets)
vocab_size = len(tokenizer.word_index) + 1  # Add 1 for the padding token
Y = to_categorical(X[:, -1], num_classes=vocab_size)
X = X[:, :-1]

# Create and train model
model = create_and_train_model(X, Y, vocab_size)

# Check model architecture
print("\nModel Summary:")
print(model.summary())

# Generate tweet with a more intuitive prompt
prompt = "The price of gold"
generated_tweet = generate_tweet(model, tokenizer, prompt, X.shape[1], num_words=20)
print("Generated Tweet:", generated_tweet)

prompt = "Barrick Gold Announces Deal"
generated_tweet = generate_tweet(model, tokenizer, prompt, X.shape[1], num_words=20)
print("Generated Tweet:", generated_tweet)

## CHECKPOINT 2

Model seems to work but there is not enough memory to complete the process. 

The code from EI will be in the 495_EI_Ver


In [None]:
# Function to process tweet text
def process_tweet_text(text):
    text = re.sub(r"http\S+", "", text)  # Remove URLs
    text = re.sub(r"@[a-zA-Z0-9_]+", "", text)  # Remove @ mentions
    text = text.strip(" ")  # Remove leading and trailing whitespace
    text = re.sub(r" +", " ", text)  # Remove redundant spaces
    text = re.sub(r"&lt;", "<", text)
    text = re.sub(r"&gt;", ">", text)
    text = re.sub(r"&amp;", "&", text)
    return text


# Function to load tweets from dataset
def load_tweets_from_dataset(dataset_file):
    tweets = []
    with open(dataset_file, "r", encoding="utf-8") as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            text = row["text"]  # Assuming 'text' is the column name for tweet text
            processed_text = process_tweet_text(text)
            tweets.append(processed_text)
    return tweets

In [None]:
def preprocess_tweets(processed_texts, max_sequence_length):
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(processed_texts)
    sequences = tokenizer.texts_to_sequences(processed_texts)
    sequences_padded = pad_sequences(
        sequences, maxlen=max_sequence_length, padding="post"
    )
    vocab_size = len(tokenizer.word_index) + 1  # Add 1 for the padding token
    return sequences_padded, vocab_size, tokenizer

In [None]:
# Load tweets from dataset
dataset_file = "stockerbot-export.csv"
tweets = load_tweets_from_dataset(dataset_file)

# Preprocess tweets with a maximum sequence length of 200 characters
X, vocab_size, tokenizer = preprocess_tweets(tweets, max_sequence_length=200)

# Split the data into training and testing sets (80% training, 20% testing)
X_train, X_test = train_test_split(X, test_size=0.2, random_state=42)

print("X_train", X_train.shape)
print("vocab_size", vocab_size)

# Convert labels to categorical (one-hot encoding)
Y_train = to_categorical(X_train, num_classes=vocab_size)
Y_test = to_categorical(X_test, num_classes=vocab_size)

In [None]:
# Define Model
with nengo.Network(seed=0) as net:
    neuron_type = nengo.LIF(amplitude=0.01)  # Define the neuron type

    # Define input node
    # Ensure the input node is correctly defined (flatten if necessary)
    input_node = nengo.Node(np.zeros(X_train.shape[1]))
    print(input_node.size_in)
    print(input_node.size_out)

    # Embedding layer
    # Apply neuron type after embedding layer if desired
    emb = nengo_dl.Layer(tf.keras.layers.Embedding(vocab_size, 64))(input_node)
    emb = nengo_dl.Layer(neuron_type)(emb)
    print(emb.size_in)
    print(emb.size_out)
    # LSTM layer
    # LSTM processing with neuron dynamics applied afterwards
    lstm = nengo_dl.Layer(tf.keras.layers.LSTM(4))(emb)
    lstm = nengo_dl.Layer(neuron_type)(lstm)
    print(lstm.size_in)
    print(lstm.size_out)
    # Output layer
    # It is common to not apply spiking neurons right before the output layer in classification tasks
    out = nengo_dl.Layer(tf.keras.layers.Dense(vocab_size, activation="softmax"))(lstm)

    # Probes
    output_p = nengo.Probe(out, label="out_probe")

    ###############################################

    # Add the batch dimension here to  the output and then only call sim=nengo_dl.Simulator(net)

    ###############################################

    print(output_p)

In [None]:
# Simulator
minibatch_size = 256
sim = nengo_dl.Simulator(net, minibatch_size=minibatch_size)

sim.compile(
    optimizer=tf.optimizers.Adam(),
    loss=tf.losses.CategoricalCrossentropy(),
    metrics=["accuracy"],
)

In [None]:
# Fit model
sim.fit(X_train, Y_train, epochs=5, validation_data=(X_test, Y_test))

In [None]:
# Function to generate text
def generate_text(seed_text, num_words, model, tokenizer, max_sequence_length):
    for _ in range(num_words):
        token_list = tokenizer.texts_to_sequences([seed_text])[0]
        token_list = pad_sequences(
            [token_list], maxlen=max_sequence_length, padding="post"
        )
        predicted_probs = model.predict(token_list, verbose=0)
        predicted_index = np.argmax(predicted_probs, axis=-1)
        predicted_word = tokenizer.index_word[predicted_index[0]]
        seed_text += " " + predicted_word
    return seed_text


# Generate text
seed_text = "The stock market"
generated_text = generate_text(seed_text, 10, sim, tokenizer, 200)
print("Generated text:", generated_text)

### CHECKPOINT #1
TF.KERAS IMPLEMENTATION

In [None]:
# Function to process tweet text
def process_tweet_text(text):
    text = re.sub(r"http\S+", "", text)  # Remove URLs
    text = re.sub(r"@[a-zA-Z0-9_]+", "", text)  # Remove @ mentions
    text = text.strip(" ")  # Remove whitespace resulting from above
    text = re.sub(r" +", " ", text)  # Remove redundant spaces

    text = re.sub(r"&lt;", "<", text)
    text = re.sub(r"&gt;", ">", text)
    text = re.sub(r"&amp;", "&", text)
    return text


# Function to load tweets from dataset
def load_tweets_from_dataset(dataset_file):
    tweets = []
    with open(dataset_file, "r", encoding="utf-8") as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            text = row["text"]  # Assuming 'text' is the column name for tweet text
            processed_text = process_tweet_text(text)
            tweets.append(processed_text)
    return tweets


# Function to preprocess tweet data
def preprocess_tweets(processed_texts):
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(processed_texts)
    sequences = tokenizer.texts_to_sequences(processed_texts)

    max_sequence_length = max([len(seq) for seq in sequences])
    sequences_padded = pad_sequences(
        sequences, maxlen=max_sequence_length, padding="pre"
    )

    return (
        sequences_padded,
        tokenizer,
    )  # Return tokenizer object instead of tokenizer.word_index


# Function to create and train the tweet generation model
def create_and_train_model(X, Y, vocab_size):
    model = Sequential()
    model.add(Embedding(vocab_size, 256, input_length=X.shape[1]))
    model.add(LSTM(256))
    model.add(Dense(vocab_size, activation="softmax"))

    # embedding, dense, lstm, dense, dropout, dense

    # __________THE FOLLOWING 2 LINES ARE NEEDED FOR TRAINING THE SYSTEM____________
    # _______________________________________________________________________________
    # _______________________________________________________________________________

    model.compile(
        loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"]
    )

    model.fit(
        X, Y, epochs=10, batch_size=128
    )  # Reduced epochs and increased batch size

    # _______________________________________________________________________________

    return model


# Function to generate tweets based on a prompt
def generate_tweet(model, tokenizer, prompt, max_sequence_length, num_words):
    tweet = prompt
    for _ in range(num_words):
        sequence = tokenizer.texts_to_sequences([tweet])[0]
        sequence_padded = pad_sequences(
            [sequence], maxlen=max_sequence_length, padding="pre"
        )
        next_word_index = np.argmax(model.predict(sequence_padded), axis=-1)[0]
        next_word = tokenizer.index_word[next_word_index]
        tweet += " " + next_word
    return tweet


# Load tweets from dataset
dataset_file = "stockerbot-export.csv"
tweets = load_tweets_from_dataset(dataset_file)

# Preprocess tweets
X, tokenizer = preprocess_tweets(tweets)
vocab_size = len(tokenizer.word_index) + 1  # Add 1 for the padding token
Y = to_categorical(X[:, -1], num_classes=vocab_size)
X = X[:, :-1]

## Check preprocessed data
# print("Sample processed tweet:", tweets[0])
# print("\nTokenizer word index:")
# for word, index in tokenizer.word_index.items():
#     print(f"{word}: {index}")


# Create and train model
model = create_and_train_model(X, Y, vocab_size)

# Check model architecture
print("\nModel Summary:")
print(model.summary())

# __________THE FOLLOWING 2 LINES ARE NEEDED FOR TRAINING THE SYSTEM____________
# _______________________________________________________________________________
# _______________________________________________________________________________

# Create and train model
model = create_and_train_model(X, Y, vocab_size)

# Generate tweet
prompt = "The stock market"
generated_tweet = generate_tweet(model, tokenizer, prompt, X.shape[1], num_words=20)
print("Generated Tweet:", generated_tweet)

# _______________________________________________________________________________

### CHECKPOINT #1
NENGO IMPLEMENTATION

In [None]:
#  # Function to process tweet text
# def process_tweet_text(text):
#     text = re.sub(r"http\S+", "", text)  # Remove URLs
#     text = re.sub(r"@[a-zA-Z0-9_]+", "", text)  # Remove @ mentions
#     text = text.strip(" ")  # Remove whitespace resulting from above
#     text = re.sub(r" +", " ", text)  # Remove redundant spaces

#     text = re.sub(r"&lt;", "<", text)
#     text = re.sub(r"&gt;", ">", text)
#     text = re.sub(r"&amp;", "&", text)
#     return text


# # Function to load tweets from dataset
# def load_tweets_from_dataset(dataset_file):
#     tweets = []
#     with open(dataset_file, "r", encoding="utf-8") as csvfile:
#         reader = csv.DictReader(csvfile)
#         for row in reader:
#             text = row["text"]  # Assuming 'text' is the column name for tweet text
#             processed_text = process_tweet_text(text)
#             tweets.append(processed_text)
#     return tweets


# # Function to preprocess tweets
# def preprocess_tweets(processed_texts):
#     tokenizer = Tokenizer()
#     tokenizer.fit_on_texts(processed_texts)
#     sequences = tokenizer.texts_to_sequences(processed_texts)

#     max_sequence_length = max([len(seq) for seq in sequences])
#     sequences_padded = pad_sequences(
#         sequences, maxlen=max_sequence_length, padding="pre"
#     )

#     vocab_size = len(tokenizer.word_index) + 1  # Add 1 for the padding token

#     return sequences_padded, vocab_size


# # Load tweets from dataset
# dataset_file = "stockerbot-export.csv"
# tweets = load_tweets_from_dataset(dataset_file)

# # Preprocess tweets
# X, vocab_size = preprocess_tweets(tweets)

# # Reshape input data to include batch dimension and set number of steps to 1
# X_reshaped = X[:, np.newaxis, :]

# # Define the NengoDL model
# with nengo.Network() as net:

#     # Define input node
#     input_node = nengo.Node([0] * X.shape[1])  # Adjusted to match the input shape

#     # Ensemble representing the embedding layer
#     embed_ens = nengo.Ensemble(n_neurons=64, dimensions=50)  # Assuming 50 dimensions for the embedding

#     # Connect input to embedding ensemble
#     nengo.Connection(input_node, embed_ens, transform=np.random.randn(X.shape[1], 50).T)

#     # Output node with dimensions equal to the vocabulary size
#     output_node = nengo.Node(size_in=vocab_size)

#     # Connect embedding ensemble to output node with a transform matrix of appropriate dimensions
#     nengo.Connection(embed_ens, output_node, transform=np.random.randn(50, vocab_size).T)

#     # Probe the output node
#     output_p = nengo.Probe(output_node)

# # Print architecture
# print("\nArchitecture:")
# print(net)

# # Compile the model
# try:
#     with nengo_dl.Simulator(net) as sim:
#         sim.compile(
#             optimizer=tf.optimizers.Adam(),
#             loss=tf.losses.CategoricalCrossentropy(),
#             metrics=["accuracy"],
#         )

#         # Check if the model compiles without errors
#         print("\nModel compiled successfully without errors!")
# except Exception as e:
#     print(f"\nError during model compilation: {e}")


# def generate_tweet(model, tokenizer, seed_text, max_length=50):
#     for _ in range(max_length):
#         token_list = tokenizer.texts_to_sequences([seed_text])[0]
#         token_list = pad_sequences([token_list], maxlen=max_length-1, padding='pre')
#         predicted = model.predict(token_list, verbose=0)
#         next_token = np.argmax(predicted)
#         next_word = tokenizer.index_word[next_token]
#         seed_text += " " + next_word
#         if next_word == 'endseq':
#             break
#     return seed_text

## CHECKPOINT 2