In [1]:
# mount Google Drive root
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [2]:
!pip install pytorch_pretrained_bert

Collecting pytorch_pretrained_bert
[?25l  Downloading https://files.pythonhosted.org/packages/d7/e0/c08d5553b89973d9a240605b9c12404bcf8227590de62bae27acbcfe076b/pytorch_pretrained_bert-0.6.2-py3-none-any.whl (123kB)
[K     |████████████████████████████████| 133kB 2.8MB/s 
Installing collected packages: pytorch-pretrained-bert
Successfully installed pytorch-pretrained-bert-0.6.2


In [0]:
import torch
from pytorch_pretrained_bert import BertTokenizer, BertForMaskedLM, BertForNextSentencePrediction
import csv
import random
import argparse
from nltk.translate.bleu_score import sentence_bleu
import warnings
import torch.nn.functional as F
from torch.distributions import Categorical

In [0]:
@torch.no_grad()
def predict_masked_word(tokenizer, model, device):
  
    # Tokenized input
    text1 = "[CLS] And the riot squad they're restless, they need somewhere to go [SEP]"
    text2 = "As Lady and I look out tonight, from Desolation Row. [SEP]"
    tokenized_text1 = tokenizer.tokenize(text1)
    tokenized_text2 = tokenizer.tokenize(text2)
    tokenized_text = tokenized_text1 + tokenized_text2

    # Mask a token that we will try to predict back with `BertForMaskedLM`
    masked_index = tokenized_text2.index('lady') + len(tokenized_text1)
    tokenized_text[masked_index] = '[MASK]'

    # Convert token to vocabulary indices
    indexed_tokens = tokenizer.convert_tokens_to_ids(tokenized_text)

    # Define sentence A and B indices associated to 1st and 2nd sentences (see paper)
    segments_ids = [0]*len(tokenized_text1) + [1]*(len(tokenized_text2))

    # Convert inputs to PyTorch tensors
    tokens_tensor = torch.tensor([indexed_tokens])
    segments_tensors = torch.tensor([segments_ids])

    tokens_tensor = tokens_tensor.to(device)
    segments_tensors = segments_tensors.to(device)

    predictions = model(tokens_tensor, segments_tensors)

    # Confirm we were able to predict the correct '[MASK]'
    predicted_index = torch.argmax(predictions[0, masked_index]).item()
    predicted_token = tokenizer.convert_ids_to_tokens([predicted_index])[0]
    print("\n","Predicted [MASK] = ",predicted_token,"\n")

In [0]:
def predict_next_sentence(sentenceA, sentenceBs, tokenizer, model, device):
    
    sentenceA_toks = tokenizer.tokenize(sentenceA)
    sentenceA_ids = tokenizer.convert_tokens_to_ids(sentenceA_toks)
    sentenceA_types = [0] * len(sentenceA_ids)
    sentenceA_attention = [1] * len(sentenceA_ids)
    tok_ids = []
    tok_types = []
    tok_attention = []
    
    sentenceBs_ids = []
    for sentenceB in sentenceBs:
        sentenceB_toks = tokenizer.tokenize(sentenceB)
        sentenceB_ids = tokenizer.convert_tokens_to_ids(sentenceB_toks)
        sentenceBs_ids.append(sentenceB_ids)
        
    max_sentenceB_length = max(len(sentenceB_ids) for sentenceB_ids in sentenceBs_ids)
    for sentenceB_ids in sentenceBs_ids:
        padding_size = max_sentenceB_length - len(sentenceB_ids)
        padded_sentenceB_ids = sentenceB_ids + [0] * padding_size
        padded_sentenceB_types = [1] * max_sentenceB_length
        padded_sentenceB_attention = [1] * len(sentenceB_ids) + [0] * padding_size
        tok_ids.append(sentenceA_ids + padded_sentenceB_ids)
        tok_types.append(sentenceA_types + padded_sentenceB_types)
        tok_attention.append(sentenceA_attention + padded_sentenceB_attention)
    
    tok_ids_tensor = torch.LongTensor(tok_ids)
    tok_types_tensor = torch.LongTensor(tok_types)
    tok_attention_tensor = torch.LongTensor(tok_attention)
    
    tok_ids_tensor = tok_ids_tensor.to(device)
    tok_types_tensor = tok_types_tensor.to(device)
    tok_attention_tensor = tok_attention_tensor.to(device)
    
    seq_relationship_logits = model(tok_ids_tensor, tok_types_tensor, tok_attention_tensor)
    
    return sentenceBs[seq_relationship_logits[:,0].argmax().tolist()]

In [0]:
@torch.no_grad()
def generate_predictions(args):
    all_lines = []
    all_pairs = []
    with open(args.datafile, encoding='utf8') as csv_file:
        csv_reader = csv.DictReader(csv_file)
        for row in csv_reader:
            lines = row['lyrics'].split('\n')
            for i in range(len(lines) - 1):
                all_pairs.append((lines[i], lines[i + 1]))
                all_lines.append(lines[i])
            all_lines.append(lines[len(lines) - 1])

    sampled_data_x = {}
    sampled_data_y = {}
    correct_pairs = random.sample(all_pairs, 100)
    for line1, line2 in correct_pairs:
        sampled_data_y[line1] = line2
        sampled_data_x[line1] = [line2]
        sampled_data_x[line1].extend(random.sample(all_lines, 2))

    with open('gdrive/My Drive/CIS530/predfile', 'w') as file:
        for i, (line1, line2s) in enumerate(sampled_data_x.items()):
            line2 = predict_next_sentence(line1, line2s, tokenizer, next_sent_pred_model, device)
            file.write(f'{line1}\t{line2}\n')
            if (i + 1) % 10 == 0:
                print(f'Finished predicting {i + 1} lines...')
    with open('gdrive/My Drive/CIS530/goldfile', 'w') as file:
        for line1, line2 in sampled_data_y.items():
            file.write(f'{line1}\t{line2}\n')

In [0]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
masked_model = BertForMaskedLM.from_pretrained('bert-base-uncased')
masked_model.eval()
next_sent_pred_model = BertForNextSentencePrediction.from_pretrained('bert-base-uncased')
next_sent_pred_model.eval()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

masked_model = masked_model.to(device)
next_sent_pred_model = next_sent_pred_model.to(device)

In [50]:
predict_masked_word(tokenizer, masked_model, device)


 Predicted [MASK] =  dad 



In [51]:
parser = argparse.ArgumentParser()
parser.add_argument('--datafile', type=str, required=True)
args = parser.parse_args(['--datafile', 'gdrive/My Drive/CIS530/Lyrics-Generation/data/test_rock.csv'])
generate_predictions(args)

Finished predicting 10 lines...
Finished predicting 20 lines...
Finished predicting 30 lines...
Finished predicting 40 lines...
Finished predicting 50 lines...
Finished predicting 60 lines...
Finished predicting 70 lines...
Finished predicting 80 lines...
Finished predicting 90 lines...
Finished predicting 100 lines...


In [52]:
warnings.simplefilter("ignore", UserWarning)

def bleuScore(gold, pred):
    cumulativeBlue, totalSentences = 0, len(gold)

    for line in gold:
        assert line in pred
        reference = [gold[line].split(' ')]
        candidate = pred[line].split(' ') 
        cumulativeBlue += sentence_bleu(reference, candidate, weights=(.334, 0.333, 0.333, 0))

    return cumulativeBlue / totalSentences

def accuracy(gold, pred):
    num_correct, num_total = 0, 0
    for line1 in gold:
        assert line1 in pred
        if gold[line1] == pred[line1]:
            num_correct += 1
        num_total += 1

    accuracy = num_correct / num_total

    return accuracy

def loadData(name):
    data = {}
    with open(name) as file:
        for line in file:
            line1, line2 = line.strip().split('\t')
            data[line1] = line2

    return data

def main(args):
    gold = loadData(args.goldfile)
    pred = loadData(args.predfile)

    assert len(gold) == len(pred)

    print(f'Accuracy: {accuracy(gold, pred):.2f}')
    print(f'BLEU score: {bleuScore(gold, pred):.2f}')

if __name__ == '__main__':
    parser = argparse.ArgumentParser()

    parser.add_argument('--goldfile', type=str, required=True)
    parser.add_argument('--predfile', type=str, required=True)

    args = parser.parse_args(['--goldfile', 'gdrive/My Drive/CIS530/goldfile', '--predfile', 'gdrive/My Drive/CIS530/predfile'])
    main(args)

Accuracy: 0.47
BLEU score: 0.50


In [54]:
""" Try to generate from BERT """

MASK = "[MASK]"
MASK_ATOM = "[MASK]"

def preprocess(tokens, tokenizer, device):
    """ Preprocess the sentence by tokenizing and converting to tensor """
    tok_ids = tokenizer.convert_tokens_to_ids(tokens)
    tok_tensor = torch.tensor([tok_ids])
    tok_tensor = tok_tensor.to(device)
    return tok_tensor


def get_mask_ids(masking):
    if masking:
      mask_ids = [int(d) for d in masking.split(',')]
    else:
      mask_ids = []     
    return mask_ids

  
def get_seed_sent(seed_sentence, tokenizer, masking=None, n_append_mask=0):
    """ Get initial sentence to decode from, possible with masks """

    # Get initial mask
    mask_ids = get_mask_ids(masking)

    # Tokenize, respecting [MASK]
    seed_sentence = seed_sentence.replace(MASK, MASK_ATOM)
    toks = tokenizer.tokenize(seed_sentence)
    for i, tok in enumerate(toks):
        if tok == MASK_ATOM:
            mask_ids.append(i)

    # Mask the input
    for mask_id in mask_ids:
        toks[mask_id] = MASK

    # Append MASKs
    for _ in range(n_append_mask):
        mask_ids.append(len(toks))
        toks.append(MASK)
    mask_ids = sorted(list(set(mask_ids)))

    seg = [0] * len(toks)
    seg_tensor = torch.tensor([seg])
    return toks, seg_tensor, mask_ids

  
def load_model(version):
    """ Load model """
    model = BertForMaskedLM.from_pretrained(version)
    model.eval()
    return model


def predict(model, tokenizer, tok_tensor, seg_tensor, how_select="argmax"):
    """ Get model predictions and convert back to tokens """
    preds = model(tok_tensor, seg_tensor)

    if how_select == "sample":
        dist = Categorical(logits=F.log_softmax(preds[0], dim=-1))
        pred_idxs = dist.sample().tolist()
    elif how_select == "topk":
        kth_vals, kth_idx = F.log_softmax(preds[0], dim=-1).topk(3, dim=-1)
        dist = Categorical(logits=kth_vals)
        pred_idxs = kth_idx.gather(dim=1, index=dist.sample().unsqueeze(-1)).squeeze(-1).tolist()
    elif how_select == "argmax":
        pred_idxs = preds.argmax(dim=-1).tolist()[0]
    else:
        raise NotImplementedError("Prediction procedure %s not found!" % how_select)

    pred_toks = tokenizer.convert_ids_to_tokens(pred_idxs)
    return pred_toks
  

def masked_decoding(toks, seg_tensor, masks, model, tokenizer, device, selection_strategy):
    """ Decode from model by replacing masks """
    for step_n, mask_id in enumerate(masks):
        tok_tensor = preprocess(toks, tokenizer, device)
        pred_toks = predict(model, tokenizer, tok_tensor, seg_tensor, selection_strategy)
        toks[mask_id] = pred_toks[mask_id]
    return toks

  
def detokenize(pred_toks):
    """ Return the detokenized lyric prediction """
    new_sent = []
    for i, tok in enumerate(pred_toks):
        if tok.startswith("##"):
            new_sent[len(new_sent) - 1] = new_sent[len(new_sent) - 1] + tok[2:]
        else:
            new_sent.append(tok)
    return new_sent


@torch.no_grad()
def main():

    toks, seg_tensor, mask_ids = get_seed_sent("[CLS] Sing with me, Sing for the years [SEP] [MASK] [MASK] [MASK] [MASK] , [MASK] [MASK] [MASK] tears. [SEP]",
                                               tokenizer,
                                               masking=None,
                                               n_append_mask=0)
    
    seg_tensor = seg_tensor.to(device)
        
    pred_toks = masked_decoding(toks, seg_tensor, mask_ids, masked_model, tokenizer, device, "argmax")
    
    pred_lyric = detokenize(pred_toks)
    
    print("\nFinal: %s" % (" ".join(pred_lyric)),"\n")


if __name__ == '__main__':
    main()


Final: [CLS] sing with me , sing for the years [SEP] sing with me now , sing for the tears . [SEP] 

