

# Set up

In [1]:
# Install necessary library

!pip install transformers &> /dev/null
!pip install gensim &> /dev/null

In [29]:
!spacy download en_core_web_md

Collecting en_core_web_md==2.2.5
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_md-2.2.5/en_core_web_md-2.2.5.tar.gz (96.4 MB)
[K     |████████████████████████████████| 96.4 MB 1.0 MB/s 
[38;5;2m✔ Download and installation successful[0m
You can now load the model via spacy.load('en_core_web_md')


In [61]:
import torch
from transformers import GPT2Tokenizer, GPT2LMHeadModel
import numpy as np
import json
import gensim.downloader as api
import gensim
import spacy
import en_core_web_md
import os

In [3]:
device = 'cpu'
if torch.cuda.is_available():
    device = 'cuda'
tokenizer = GPT2Tokenizer.from_pretrained('gpt2-medium')
model = GPT2LMHeadModel.from_pretrained('gpt2-medium')
model = model.to(device)


Downloading:   0%|          | 0.00/0.99M [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/446k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/1.29M [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/718 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/1.42G [00:00<?, ?B/s]



In [33]:
similarity_function = en_core_web_md.load()

In [14]:
word2vectors = gensim.downloader.load('glove-twitter-100')



In [42]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
# model link:
# https://drive.google.com/drive/folders/1j6WISBalKI6icL_GmSlLSv-6CI71KE8X?usp=sharing
# you can load the model folder once you have added a shortcut to the root directory of your google drive

In [5]:
file_path = '/content/drive/MyDrive/trained_models' 
person_list = ['ArianaGrande', 'ddlovato', 'cnnbrk', 'katyperry', 'shakira', 'twitter', 'cristiano', 'theellenshow', 'justinbieber', 'rihanna', 'YouTube', 'instagram', 'selenagomez', 'BarackObama', 'jtimberlake', 'KimKardashian', 'britneyspears', 'taylorswift', 'jimmyfallon', 'ladygaga', 'donaldtrump']
for idx, i in enumerate(person_list):
    person_list[idx] = person_list[idx].lower()

In [29]:
word2vectors.similarity('queen', 'woman')

0.7844197

# Function definition

In [72]:
# Select the word in word_list that is closest to the input word using word2vec
def return_closest_word(word, word_list):
    score_list = []
    for i in word_list:
        try:
            score_list.append(word2vectors.similarity(word, i))
        except KeyError:
            score_list.append(np.random.rand())
    return word_list[np.argmax(score_list)]

In [47]:
def choose_from_top(probs, n=5):
    ind = np.argpartition(probs, -n)[-n:]
    top_prob = probs[ind]
    top_prob = top_prob / np.sum(top_prob) # Normalize
    choice = np.random.choice(n, 1, p = top_prob)
    token_id = ind[choice][0]
    return int(token_id)

In [48]:
import math
def beam_sampling_strategy(model, cur_ids, beam_width, end_of_text):
    outputs = model(cur_ids, labels=cur_ids)
    loss, logits = outputs[:2]
    softmax_logits = torch.softmax(logits[0,-1], dim=0).to('cpu') #Take the first(from only one in this case) batch and the last predicted embedding
    beams = [(cur_ids, softmax_logits, 0)]
    finished = []
    for ii in range(100):
        new_beams = []
        # each beam
        for beam in beams:
            # print(beam)
            # sample
            samples = torch.multinomial(beam[1], beam_width, replacement=True)
            # create beam for each sample
            for sample in samples:
                cur = ((torch.cat([beam[0], torch.ones((1,1)).long().to(device) * int(sample)], dim = 1),None,(beam[2] + torch.log(beam[1][sample]))* (1.+math.log(len(beam[0])+1.))))
                if sample in end_of_text:
                    finished.append(cur)
                else:
                    new_beams.append(cur)
        new_beams = sorted(new_beams, reverse=True, key=lambda x:x[2])
        new_beams = new_beams[:beam_width]
        for i, new_beam in enumerate(new_beams):
            outputs = model(new_beam[0], labels=new_beam[0])
            loss, logits = outputs[:2]
            softmax_logits = torch.softmax(logits[0,-1], dim=0).to('cpu')
            new_beams[i] = (new_beam[0], softmax_logits, new_beam[2])
        beams = new_beams

    return sorted(finished, reverse=True, key=lambda x:x[2])[0][0]

In [80]:
def generate_text(username, model_choice, output_file=True, method=0, model=model, file_path=file_path):
    model_path = os.path.join(file_path, f"gpt2_medium_{model_choice}_final.pt")
    model.load_state_dict(torch.load(model_path))

    output_file_path = f'generated_tweet_{username}.txt'

    model.eval()
    
    tweet_num = 0
    with torch.no_grad():
        while True:
        
            tweet_finished = False

            cur_ids = torch.tensor(tokenizer.encode(f'{username}:')).unsqueeze(0).to(device)

            if (method == 0):
                for i in range(100):
                    outputs = model(cur_ids, labels=cur_ids)
                    loss, logits = outputs[:2]
                    softmax_logits = torch.softmax(logits[0,-1], dim=0) #Take the first(from only one in this case) batch and the last predicted embedding
                    if i < 3:
                        n = 20
                    else:
                        n = 10
                    next_token_id = choose_from_top(softmax_logits.to('cpu').numpy(), n=n) #Randomly(from the topN probability distribution) select the next word
                    cur_ids = torch.cat([cur_ids, torch.ones((1,1)).long().to(device) * next_token_id], dim = 1) # Add the last word to the running sequence

                    if next_token_id in tokenizer.encode('<|endoftext|>'):
                        tweet_finished = True
                        break

                if tweet_finished:
                    
                    tweet_num = tweet_num + 1
                    output_list = list(cur_ids.squeeze().to('cpu').numpy())
                    output_text = tokenizer.decode(output_list)
                    output_text = output_text.replace('<|endoftext|>', '')
                    output_text = output_text.replace('\n', ' ')
                    if (len(output_text) < 50):
                        tweet_num = tweet_num - 1
                    else:
                        print(f"{tweet_num}: {output_text} \n")
                        if output_file:
                            with open(output_file_path, 'w') as f:
                                f.write(output_text)
            else:
                cur_ids = beam_sampling_strategy(model, cur_ids, 15, tokenizer.encode('<|endoftext|>'))
                tweet_num = tweet_num + 1
                
                output_list = list(cur_ids.squeeze().to('cpu').numpy())
                output_text = tokenizer.decode(output_list)
                output_text = output_text.replace('<|endoftext|>', '')
                output_text = output_text.replace('\n', ' ')
                print(f"{tweet_num}:{output_text} \n")
            
                if output_file:
                    with open(output_file_path, 'w') as f:
                        f.write(output_text)
            
            if tweet_num >= 10:
                break


# Start

In [None]:
# User input
user_input = input('Please enter the twitter account name: you can choose from the name in person_list or some general english word.\t')

predicting_type = int(input('Please enter your prefer way of generating tweets: \n \t 0 is random sampling using distribution, 1 is beam search.\t'))

output_file = bool(input('Do you want an output file?\n\t 1 is Yes, 0 is No.\t'))

generate_text(user_input, return_closest_word(user_input, person_list), 
              output_file=output_file, method=predicting_type)