In [None]:
### USE ONLY THESE PACKAGES ###
import os
import csv
import json
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from tqdm.notebook import tqdm
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt

### Upload the lyrics_dataset.zip in the colab's folder first !

In [None]:
!unzip lyrics_dataset.zip

### Do not change this seed number

In [None]:
np.random.seed(1)
torch.manual_seed(1)

In [None]:
## Use below code to train your model with all lyrics
#lyrics = list()
#for txt_file in os.listdir('./lyrics_dataset'):
#    if txt_file[0] != '.':
#        target_txt = os.path.join('./lyrics_dataset', txt_file)
#        print(target_txt)
#        f = open(target_txt, 'r', encoding='UTF-8')
#        curr_lyrics = f.readlines()
#        for i in range(len(curr_lyrics)):
#            curr_lyrics[i] = curr_lyrics[i].lower()
#        curr_lyrics = list(set(curr_lyrics))
#        lyrics += curr_lyrics
#print(len(lyrics))

## Use below code to train your model with specific artist's lyrics
target_txt = './lyrics_dataset/bruce-springsteen.txt'
f = open(target_txt, 'r')
lyrics = f.readlines()
for i in range(len(lyrics)):
    lyrics[i] = lyrics[i].lower()
lyrics = list(set(lyrics))

#"lyrics" is a list of strings.
#for l in lyrics:
#     print(l)

In [None]:
import re # This module provides regular expression matching operations
import collections #This module is used to count the frequency of vocabularies
#both moduls are standard libraries in python

In [None]:
class lyricsDataset(Dataset): # 12 points
    def __init__(self, lyrics, window_len):
        self.lyrics = lyrics
        self.window_len = window_len
        # window len means, that, if i-th word is target
        # you would consider i-window_len ~ i+window_len words as context words

        ##--------------write below-------------## <--- do not erase this afterwards
        self.trimmed_lyrics = list()    
        self.vocabulary = list()
        self.frequency = dict()
        # your code in here
        # eliminate every character except for alphabet, number, and space (" ")
        for i in range(len(lyrics)):
            filtered_string = re.sub("[^0-9a-z]+", " ", lyrics[i])
            string2list = filtered_string.split() 
            if len(string2list) != 0:
                self.trimmed_lyrics.append(string2list)
        
        # merge all trimmed_lyrics to one large list, then get vocabulary 
        merged_list = list()
        for i in self.trimmed_lyrics:
            merged_list.extend(i)
        
        list_set = set(merged_list) 
        self.vocabulary = list(list_set)
        
        # count the frequency of each vocabulary
        self.frequency = collections.Counter(merged_list)
        ##--------------write above-------------## <--- do not erase this   
        
        ### MUST TO-DO 1. (+3) -> self.trimmed_lyrics
        # for strings in self.lyrics, 
        # (+2) write a code to eliminate every character except for alphabet, number, and space (" ")
        ### for example, - + ? ! ' " [ ] ( ) <- char like this should be excluded. 
        # (+1) after that, split each string with respect to the space.
        ### If proprocessed string is "hello hello hello", 
        ### a list ['hello', 'hello', 'hello'] should be generated.
        ### Then, put that list into the self.trimmed_lyrics.
        ### self.trimmed_lyrics needs to be a LIST which has LIST as element.

        ### MUST TO-DO 2. (+2) -> self.vocabulary
        # (+2) Put all words(string) in self.trimmed_lyrics to the self.vocabulary. 
        ### In self.vocabulary, each word needs to be unique.
        ### which means, this list (self.vocabulary) should not have duplicated elements. 
        ### self.vocabulary needs to be a LIST which contains unique words in self.trimmed_lyrics
        ### If your code contains duplicated words, you would not get the point.
        ### If there is a word that are neglected from self.trimmed_lyrics, you would not get the point.

        ### MUST TO-DO 3. (+2) -> self.frequency
        # (+2) In self.frequency, Record how many times each word in self.vocabulary 
        ###                              appears in self.trimmed_lyrics.
        ### For example, if "love" appears 100 times in self.trimmed_lyrics,
        ### it should be: self.frequency["love"] = 100

    def __len__(self):
        # DO NOT TOUCH BELOW. JUST USE BELOW CODE FOR YOUR __len__ 
        return len(self.trimmed_lyrics)
        # DO NOT TOUCH ABOVE. JUST USE ABOVE CODE FOR YOUR __len__ 

    def __getitem__(self, idx):
        ### MUST TO-DO 4. (+5) --> sample
        ##--------------write below-------------##
        sample = dict()
        sample['pairs'] = list()
        # your code here
        #generate random number as the index of target_word
        target_idx = np.random.randint(0,len(self.trimmed_lyrics[idx]))
        #print(idx)
        #print(target_idx)
        
        #if only one word is in the selected trimmed_lyrics[idx], let context_word same as target_word
        #since there are not so many scentences with only one word, doing so will not affect the training of the network 
        if len(self.trimmed_lyrics[idx]) == 1:
            sample['pairs'].append((self.vocabulary.index(self.trimmed_lyrics[idx][target_idx]),self.vocabulary.index(self.trimmed_lyrics[idx][target_idx])))
        
        #otherwise, select the neightbouring word as contex_word according to the window_length
        else:
            for i in range(-self.window_len,self.window_len+1):
                context_idx = target_idx + i 
                #check if context_word is within rhe range of selected trimmed_lyrics[idx]
                if context_idx >= 0 and context_idx < len(self.trimmed_lyrics[idx]) and context_idx != target_idx:
                    sample['pairs'].append((self.vocabulary.index(self.trimmed_lyrics[idx][target_idx]),self.vocabulary.index(self.trimmed_lyrics[idx][context_idx])))
        
        return sample
        ##--------------write above-------------##
        # Bring out one list, from self.trimmed_lyrics. (i.e., self.trimmed_lyrics[idx])
        # Then, the list should contain the splited sentence in each lyric.
        ### i.e., self.trimmed_lyrics[idx] would look like ['hey', 'nice', 'to', 'meet', 'you']
        # (+1) randomly select the target word from self.trimmed_lyrics[idx]
        # (+4) based on selected target word, and self.window_len,
        #      generate a tuple of (target word index, context word index),
        #      and add that tuple into sample['pairs']
        ### if 'nice' is randomly chosen as the target element, and self.window_len = 1
        ### if self.vocabulary[100] == 'hey', 
        ###    self.vocabulary[500] == 'nice',
        ###    self.vocabulary[300] == 'to'
        ### sample['pairs'] should be [(500, 300), (500, 100)]

In [None]:
### DO NOT TOUCH BELOW. JUST USE THESE LINES.
### PENALTY (-5) CAN BE APPLIED IF YOUR CODE DOES NOT WORK FOR VARIOUS VALUES OF WINDOW_LEN
dataset = lyricsDataset(lyrics, 2)
dataloader = DataLoader(dataset, batch_size=1, shuffle=True)
### DO NOT TOUCH ABOVE. JUST USE THESE LINES.

In [None]:
class Word2Vec(nn.Module): # 15 points
    def __init__(self, num_vocabs, embed_dim = 300): ## do not change 'embed_dim' value.
        ### MUST TO-DO 5 : (+5)
        ##--------------write below-------------##
        # your code here
        ##--------------write above-------------##
        # Define a model which can map word's integer index value to word embedding.
        super(Word2Vec, self).__init__()
        self.num_vocabs = num_vocabs
        self.embed_dim = embed_dim
        self.linear = torch.nn.Linear(in_features = self.num_vocabs, out_features = self.embed_dim, bias=False)
        
    def forward(self, pairs):
        '''
        input : pairs - one tuple of (target word index, context word index)
        '''
        ### MUST TO DO 6 : (+10)
        ##--------------write below-------------##
        # your code here
        ##--------------write above-------------##
        # return the embedding of target word and context word.
        # i.e., return target_embed, context_embed
        
        # create one_hot encoding as input to the network, return embedding of target_word and context_word 
        target_onehot = torch.nn.functional.one_hot(pairs[0], num_classes=self.num_vocabs).float()
        context_onehot = torch.nn.functional.one_hot(pairs[1], num_classes=self.num_vocabs).float()
        target_embed = self.linear(target_onehot)
        context_embed = self.linear(context_onehot)
        
        return target_embed, context_embed

In [None]:
### DO NOT TOUCH BELOW. JUST USE THESE LINES. MAKE YOUR CODE WORK WITH THESE LINES
model = Word2Vec(len(dataset.vocabulary))
optimizer = torch.optim.AdamW(model.parameters())
device = 'cuda'
### DO NOT TOUCH ABOVE. JUST USE THESE LINES. MAKE YOUR CODE WORK WITH THESE LINES
### USE THIS GIVEN OPTIMIZER.

In [None]:
def train(model, optimizer, sample_1, sample_2): # 13 points
    ### MUST TO DO 6 : (+12)
    ##--------------write below-------------##
    model.train()
    model = model.to(device)
    optimizer.zero_grad()
    
    #create label for positive samples of first target_word
    fisrt_positive_label = torch.ones(len(sample_1['pairs'])).to(device)
    #create label for negative samples of first target_word
    fisrt_negative_label = torch.zeros(len(sample_2['pairs'])).to(device)
    
    #create label for positive samples of second target_word
    second_positive_label = torch.ones(len(sample_2['pairs'])).to(device)
    #create label for negative samples of second target_word
    second_negative_label = torch.zeros(len(sample_1['pairs'])).to(device)
    
    
    #tuples of first target word 
    first_positive_sample = torch.LongTensor(len(sample_1['pairs']),2).to(device)
    first_negative_sample = torch.LongTensor(len(sample_2['pairs']),2).to(device)
    
    #create tuples of positive samples of first target_word (target_word: first_target_word, context_word: context_word of first target_word)
    for i in range(len(sample_1['pairs'])):
        first_positive_sample[i][0] = sample_1['pairs'][i][0]
        first_positive_sample[i][1] = sample_1['pairs'][i][1]      
    
    #create tuples of negative samples of first target_word (target_word: first_target_word, context_word: context_word of second target_word)
    for i in range(len(sample_2['pairs'])):
        first_negative_sample[i][0] = sample_1['pairs'][0][0]
        first_negative_sample[i][1] = sample_2['pairs'][i][1]
    
    #tuples of second target word 
    second_positive_sample = torch.LongTensor(len(sample_2['pairs']),2).to(device)
    second_negative_sample = torch.LongTensor(len(sample_1['pairs']),2).to(device)
        
    #create tuples of positive samples of second target_word (target_word: second_target_word, context_word: context_word of second target_word)
    for i in range(len(sample_2['pairs'])):
        second_positive_sample[i][0] = sample_2['pairs'][i][0]
        second_positive_sample[i][1] = sample_2['pairs'][i][1]      
    
    #create tuples of nagetive samples of second target_word (target_word: second_target_word, context_word: context_word of first target_word)
    for i in range(len(sample_1['pairs'])):
        second_negative_sample[i][0] = sample_2['pairs'][0][0]
        second_negative_sample[i][1] = sample_1['pairs'][i][1]
  
    
    # prediction for positve samples of first target_word
    first_positive_prediction = torch.zeros(len(sample_1['pairs'])).to(device)
    # prediction for negative samples of first target_word
    first_negative_prediction = torch.zeros(len(sample_2['pairs'])).to(device)
    
    # prediction for positve samples of second target_word
    second_positive_prediction = torch.zeros(len(sample_2['pairs'])).to(device)
    # prediction for negative samples of second target_word
    second_negative_prediction = torch.zeros(len(sample_1['pairs'])).to(device)

    #prediction, using inner_product and sigmoid function 
    for i in range(len(sample_1['pairs'])):
        target_embed, context_embed = model((first_positive_sample[i][0],first_positive_sample[i][1]))
        inner_product = torch.dot(torch.flatten(target_embed),torch.flatten(context_embed))
        first_positive_prediction[i] = torch.sigmoid(inner_product)
    
    for i in range(len(sample_2['pairs'])):
        target_embed, context_embed = model((first_negative_sample[i][0],first_negative_sample[i][1]))
        inner_product = torch.dot(torch.flatten(target_embed),torch.flatten(context_embed))
        first_negative_prediction[i] = torch.sigmoid(inner_product)
    
    for i in range(len(sample_2['pairs'])):
        target_embed, context_embed = model((second_positive_sample[i][0],second_positive_sample[i][1]))
        inner_product = torch.dot(torch.flatten(target_embed),torch.flatten(context_embed))
        second_positive_prediction[i] = torch.sigmoid(inner_product)
        
    for i in range(len(sample_1['pairs'])):
        target_embed, context_embed = model((second_negative_sample[i][0],second_negative_sample[i][1]))
        inner_product = torch.dot(torch.flatten(target_embed),torch.flatten(context_embed))
        second_negative_prediction[i] = torch.sigmoid(inner_product)
    
    #calculate BCE loss for each sample_pairs 
    criterion = nn.BCELoss(reduction='sum') #need to calculate sigmoid 
    first_positive_loss = criterion(first_positive_prediction, fisrt_positive_label)
    first_negative_loss = criterion(first_negative_prediction, fisrt_negative_label)
    second_positive_loss = criterion(second_positive_prediction, second_positive_label)
    second_negative_loss = criterion(second_negative_prediction, second_negative_label)
    
    #aaverage loss (total_loss averaged by number of samples) 
    total_loss = (first_positive_loss+first_negative_loss+second_positive_loss+second_negative_loss)/(2*(len(sample_1['pairs'])+len(sample_2['pairs'])))
        
    total_loss.backward()
    # your code here
    optimizer.step()
    return total_loss
    # return the current loss value
    ##--------------write above-------------##
    
    ## sample_1 will contain positive (target_1, context_1) tuples
    ## sample_2 will contain another positive (target_2, context_2) tuples
    ## But for the negative sampling, we need negative (target, context) tuples.
    ## NEGATIVE TUPLES can be generated by (target_1, context_2), (target_2, context_1)
    ## (+5) Generate and give positive & negative tuples for model's input
    ##### Then, you would get : target_embed, context_embed = model((target, context)).
    ## (+4) calculate the distance between target_embed and context_embed by DOT PRODUCT
    ## (+4) calculate the loss based on that distance, and optimize the model
    ####    Label positive tuples as class '1', otherwise as class '0'
    ####    You can also use sigmoid function rather than the softmax function.
    ####    At the end, you must return the current loss value.

In [None]:
### DO NOT TOUCH BELOW. JUST USE THESE LINES. MAKE YOUR CODE WORK THESE CODES.
### PENALTY (-5) WILL BE GIVEN WHEN YOUR CODE RAISES AN ERROR DURING EPOCH.
### YOUR TRAINING NEEDS TO BE PERFORMED WITH VARIOUS .TXT FILES.
max_epoch = 5
for epoch in range(max_epoch):
    total_loss = 0.0
    cnt = 0
    for sample in tqdm(dataloader):
        if cnt > 0:
            curr_loss = train(model, optimizer, sample, prev_sample)
            total_loss += curr_loss / len(dataloader)    
        prev_sample = sample
        cnt += 1
        if cnt % 200 == 0:
            print('[EPOCH {}] SAMPLED TRAIN LOSS : {}'.format(epoch, curr_loss))
    print('[EPOCH {}] TOTAL LOSS : {}'.format(epoch, total_loss))

In [None]:
### MUST TO DO 7 : (+10)
##--------------write below-------------##
# your code here
for param in model.parameters():
    embedding_tensor = param
    print(1)
embedding = embedding_tensor.cpu().detach().numpy()
embedding = embedding.transpose()
##--------------write above-------------##
## (+10) bring your word embedding as a numpy array "embedding"
#### "embedding" should be N by D array, 
#### where N is the number of vocabularies, and D is the dimension of the word embedding.
#### embedding[i, :] should be word embedding of dataset.vocabulary[i]

In [None]:
embedding.shape

In [None]:
### JUST USE THESE LINES. MAKE YOUR CODE WORK THESE CODES.
reducer = PCA(n_components=2)
# or try use
# reducer = TSNE(n_components=2, verbose=1)
reduce_results = reducer.fit_transform(embedding)

In [None]:
### DO NOT TOUCH BELOW. JUST USE THESE LINES. MAKE YOUR CODE WORK THESE CODES.
top_k = 50
sort_idx = np.argsort(list(dataset.frequency.values()))[::-1]
sort_idx = sort_idx[:top_k]
frequent_vocabs = [list(dataset.frequency.keys())[si] for si in sort_idx]
plt.figure(figsize=(10, 6))

for idx, vocab in enumerate(dataset.vocabulary):
    if vocab in frequent_vocabs:
        plt.plot(reduce_results[idx, 0], reduce_results[idx, 1], '.')
        plt.text(reduce_results[idx, 0], reduce_results[idx, 1], vocab)

In [None]:
### DO NOT TOUCH BELOW. JUST USE THESE LINES. MAKE YOUR CODE WORK THESE CODES.
min_dist = 10000000
target_word = 'i'
for idx, vocab in enumerate(dataset.vocabulary):
    if vocab != target_word:
        distance = np.linalg.norm(embedding[dataset.vocabulary.index(target_word)] - embedding[idx])
        min_dist = min(distance, min_dist)
        if min_dist == distance:
            nearest_to_target = vocab
print('"{}" is nearest to "{}"'.format(target_word, nearest_to_target))