<a href="https://colab.research.google.com/github/Da-Pen/CS486-twitter-bot/blob/main/CS486_LSTM_word_level.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [33]:
import numpy as np
from collections import defaultdict

# CONSTANTS
NEWS_ORGS_DATA_FILE_NAME = '/content/data/newsorgs_data'
TRUMP_DATA_FILE_NAME = '/content/data/donald_trump_data'
SKIP_URLS = True
SKIP_ELLIPSES = True
SKIP_RETWEETS = True
SKIP_REPLIES = True     # it seems like Trump often has tweets where he simply replies to another Twitter user or quotes them. They usually start with '@' or '"@'. If this is set to true, then ignore those tweets.
MIN_TWEET_LENGTH = 50 # characters

# returns a string minus all the urls in it
def ignore_urls(s):
    return ' '.join([x for x in s.split() if 'http' not in x])


# gets a list of strings representing the tweets in the given file.
# can limit the number of tweets to get using upto.
# replaces 'NEWLINE's with actual \n characters.
def get_tweets_list(filename, upto=None):
    f = open(filename, 'r')
    lines = f.read().split('\n')[:upto]
    f.close()
    # replace NEWLINE's and ignore all lines that do not have spaces (because they are probably just a link)
    # lines = [line.replace('NEWLINE', '\n') for line in lines if line.strip().find(' ') != -1]
    # if ONLY_LOWERCASE:
    #     lines = [line.lower() for line in lines]
    if SKIP_ELLIPSES:  # skip tweets with the '…' character, which indicates that it has been truncated
        lines = [line for line in lines if line.find('…') == -1]
    if SKIP_URLS:
        lines = [ignore_urls(line) for line in lines]
    if SKIP_RETWEETS:
        lines = [line for line in lines if line[:2] != 'RT']
    if SKIP_REPLIES:
        lines = [line for line in lines if len(line) > 0 and line[0] != '@' and line[:2] != '"@']
    # # check what percentage of characters are valid: if less than MIN_VALID_CHAR_PERCENT are valid, then ignore this tweet. Otherwise, delete invalid characters.
    # lines = [filter_invalid_chars(line) for line in lines if filter_invalid_chars(line) is not None]
    return np.array(lines)

# given a list of tweets, gets a map of words to occurrences
def get_words(tweets):
    all_words = defaultdict(lambda: 0)
    for tweet in tweets:
        words = tweet.split(' ')
        for word in words:
            all_words[word] += 1
    return all_words

def get_words_list(words_map):
    min_occurrence = 5
    words_list = []
    for word in words_map.keys():
        if words_map[word] > min_occurrence:
            words_list.append(word)
    return words_list


def filter_words(tweet, words_set):
    return ' '.join([word for word in tweet.split(' ') if word in words_set])

trump_tweets = get_tweets_list(TRUMP_DATA_FILE_NAME)

words_list = get_words_list(get_words(trump_tweets))
word_to_index = dict((c, i) for i, c in enumerate(words_list))
index_to_word = dict((i, c) for i, c in enumerate(words_list))
print(len(words_list))
print(words_list)
words_set = set(words_list)
# ignore all invalid words in tweets
print("BEFORE", len(trump_tweets))
new_trump_tweets = []
for tweet in trump_tweets:
    filtered_tweet = filter_words(tweet, words_set)
    if len(filtered_tweet) > 0.8*len(tweet):
        new_trump_tweets.append(filtered_tweet)
trump_tweets = new_trump_tweets

# filter short tweets
trump_tweets = [tweet for tweet in trump_tweets if len(tweet) > MIN_TWEET_LENGTH]

print("AFTER", len(trump_tweets))


def main():
    pass    # do nothing (may comment out if we want to test something)

if __name__ == '__main__':
    main()


5781
BEFORE 20002
AFTER 7126


Train Model

In [45]:
from keras.callbacks import LambdaCallback
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import LSTM, Bidirectional, BatchNormalization, Activation
from keras.layers import Dropout
from keras.optimizers import RMSprop
from keras.optimizers import Adam
from keras.utils.data_utils import get_file
import random
import io
from google.colab import files

INPUT_LENGTH = 5  # based on INPUT_LENGTH characters, our model generates the next character
GENERATED_TWEET_LENGTH = 20 # words


def sample(preds, temperature=1.0):
    # helper function to sample an index from a probability array
    preds = np.asarray(preds).astype('float64')
    preds = np.log(preds) / temperature
    exp_preds = np.exp(preds)
    preds = exp_preds / np.sum(exp_preds)
    probas = np.random.multinomial(1, preds, 1)
    return np.argmax(probas)


def on_epoch_end(epoch, _, data, model):
    # Function invoked at end of each epoch. Prints generated text.
    print()
    print('----- Generating text after Epoch: %d' % epoch)
    for _ in range(2):     # use 10 different tweets as samples
        tweet = np.random.choice(data) # select random tweet
        start_index = 0

        for diversity in [0.2, 0.4, 0.6, 1.0]:
        # for diversity in [0.1, 0.2, 0.3, 0.4]:
        # for diversity in [0.3, 0.4, 0.5]:
            print('----- diversity:', diversity)

            generated = ''
            sentence = tweet.split(' ')[start_index: start_index + INPUT_LENGTH]
            generated += ' '.join(sentence)
            print('----- Generating with seed: "' + ' '.join(sentence) + '"')
            # sys.stdout.write(generated)

            for i in range(GENERATED_TWEET_LENGTH):
                x_pred = np.zeros((1, INPUT_LENGTH, len(words_list)))
                for t, word in enumerate(sentence):
                    x_pred[0, t, word_to_index[word]] = 1.

                preds = model.predict(x_pred, verbose=0)[0]
                next_index = sample(preds, diversity)
                next_word = index_to_word[next_index]
                generated += ' ' + next_word
                sentence = sentence[1:] + [next_word]

                # sys.stdout.write(next_word)
                # sys.stdout.flush()
            print(generated)
            print()


def train_from_data(data, train_limit=None):
    # convert the raw tweets list to input and output
    # input is equal to INPUT_LENGTH characters, output is a single character
    if train_limit:
        data = data[:train_limit]
    sentences = []
    next_words = []
    for tweet in data:
        tweet_words = tweet.split(' ')
        for i in range(0, len(tweet_words) - INPUT_LENGTH):
            sentences.append(tweet_words[i: i + INPUT_LENGTH])
            next_words.append(tweet_words[i + INPUT_LENGTH])
    print('# training samples:', len(sentences))
    # for i in range(10):
    #     print(sentences[i],'->',next_words[i])

    # vectorize the data
    print('Vectorization...')
    x = np.zeros((len(sentences), INPUT_LENGTH, len(words_list)), dtype=np.bool)
    y = np.zeros((len(sentences), len(words_list)), dtype=np.bool)
    for i, sentence in enumerate(sentences):
        for t, word in enumerate(sentence):
            x[i, t, word_to_index[word]] = 1
        y[i, word_to_index[next_words[i]]] = 1

    # build the model
    print('Build model...')
    model = Sequential()
    model.add(LSTM(128, input_shape=(INPUT_LENGTH, len(words_list))))
    # model.add(LSTM(len(VALID_CHARS) * 7, input_shape=(INPUT_LENGTH, len(VALID_CHARS))))
    
    # model.add(BatchNormalization())
    # model.add(Activation('selu'))

    # model.add(Dense(len(VALID_CHARS)*4))
    # model.add(Activation('selu'))

    # model.add(Dense(len(VALID_CHARS)*4))
    # model.add(BatchNormalization())
    # model.add(Activation('selu'))

    # model.add(Bidirectional(LSTM(128), input_shape=(INPUT_LENGTH, len(VALID_CHARS))))
    model.add(Dense(len(words_list), activation='softmax'))

    # optimizer = RMSprop(lr=0.01)
    optimizer = Adam()
    model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['categorical_crossentropy', 'accuracy'])

    epochs = 10
    
    print_callback = LambdaCallback(on_epoch_end=lambda a, b: on_epoch_end(a, b, data, model))

    # train the model
    model.fit(x, y,
            epochs=epochs,
            callbacks=[print_callback]
            )

    # save and download the model
    model.save('/content/model')
    !zip -r /content/model.zip /content/model
    files.download('/content/model.zip')

def main():
    # TRAIN TRUMP
    # trump_data = get_tweets_list(TRUMP_DATA_FILE_NAME, 1000)  # TODO remove upto
    print("number of trump tweets:", len(trump_tweets))
    train_from_data(trump_tweets)
    # TRAIN NEWS ORGS
    # TODO


if __name__ == '__main__':
    main()

number of trump tweets: 7126
# training samples: 94025
Vectorization...
Build model...
Epoch 1/10
----- Generating text after Epoch: 0
----- diversity: 0.2
----- Generating with seed: "He makes a mistake every"
He makes a mistake every and the the and of the U.S. of the and of the of of the of and the the and

----- diversity: 0.4
----- Generating with seed: "He makes a mistake every"
He makes a mistake every the in the but of the election of the morning of the this very Chuck and the and for are

----- diversity: 0.6
----- Generating with seed: "He makes a mistake every"
He makes a mistake every of the to of the Show of the people. THE all and the being of the @nytimes and the nice

----- diversity: 1.0
----- Generating with seed: "He makes a mistake every"
He makes a mistake every interesting income has up big buying record where CHANGE U.S. on to parents on National offer of want bring Barack

----- diversity: 0.2
----- Generating with seed: "Entrepreneurs: Keep your momentum. Witho

KeyboardInterrupt: ignored