In [None]:
import numpy as np
import pandas as pd
import re
import torch
from torch.utils.data import Dataset, DataLoader
from collections import Counter
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
import time
torch.cuda.is_available()
from tqdm import tqdm

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install langdetect

Collecting langdetect
  Downloading langdetect-1.0.9.tar.gz (981 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m981.5/981.5 kB[0m [31m5.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: langdetect
  Building wheel for langdetect (setup.py) ... [?25l[?25hdone
  Created wheel for langdetect: filename=langdetect-1.0.9-py3-none-any.whl size=993227 sha256=22c00e286b38d73ce931b43d735d6f61f2dde35005834af618ca85b6a64615e7
  Stored in directory: /root/.cache/pip/wheels/95/03/7d/59ea870c70ce4e5a370638b5462a7711ab78fba2f655d05106
Successfully built langdetect
Installing collected packages: langdetect
Successfully installed langdetect-1.0.9


In [None]:
from langdetect import detect

## Load the Data

In [None]:
def filter_english_songs(chunk):
    english_songs = []
    for index, row in chunk.iterrows():
        try:
            if detect(row['lyrics']) == 'en':
                english_songs.append(row)
        except:
            pass
    return pd.DataFrame(english_songs)

In [None]:
# Load the dataset in chunks
chunk_size = 10000  # Adjust chunk size based on available memory
songs_number = 5000

def sample_songs_from_chunks(data_chunks, genre, songs_number=songs_number):

    sampled_songs = pd.DataFrame()

    # Filter English songs and sample specified number of songs
    for chunk in data_chunks:
        data_by_genre = chunk[['lyrics', 'tag']][chunk['tag'] == genre]
        data_by_genre_english = filter_english_songs(data_by_genre)
        sampled_songs = pd.concat([sampled_songs, data_by_genre_english])
        if len(sampled_songs) >= songs_number:
            break

    # Sample specified number of songs
    sampled_songs = sampled_songs.sample(n=songs_number, random_state=42)
    return sampled_songs

# Read dataset in chunks
file_path = '/content/drive/MyDrive/Final_Project/Data_set/ds2.csv'

data_chunks = pd.read_csv(file_path, chunksize=chunk_size)
rap_songs = sample_songs_from_chunks(data_chunks, 'rap')

# Reset data chunks to read from the beginning
data_chunks = pd.read_csv(file_path, chunksize=chunk_size)
pop_songs = sample_songs_from_chunks(data_chunks, 'pop')


## PreProcessing

In [None]:
stopChars = [',','(',')','.','-','[',']','"', '{', '}']

def preprocessText(text):
    processedText = text.lower()
    processedText = re.sub(r'[^a-zA-Z\s\.,;!?"\'\[\]]', '', processedText)
    processedText = re.sub(r'\([^()]*\)|\[[^\[\]]*\]|\{[^{}]*\}', '', processedText)
    for char in stopChars:
        processedText = processedText.replace(char,'')

    return processedText

In [None]:
rap_songs['lyrics']= rap_songs['lyrics'].apply(preprocessText)
pop_songs['lyrics']= pop_songs['lyrics'].apply(preprocessText)
print(rap_songs.shape)
print(pop_songs.shape)

(5000, 2)
(5000, 2)


In [None]:
def clean_songs(songs_data):
    # Perform substitution
    lyrics_corpus = re.sub(r'\n|!|\?', lambda x: ' ' + x.group(0) + ' ', songs_data['lyrics'].str.cat(sep='\n').lower())

    # Split the processed text into words and newlines
    lyrics_corpus = re.findall(r'\S+|\n', lyrics_corpus)
    garbage_words = ["verse", "intro", "chorus", "bridge", "hook", "interlude", "outro", "prechorus", "postchorus", "instrumental"]

    # Remove garbage_words from the lyrics_corpus
    lyrics_corpus = [word for word in lyrics_corpus if word not in garbage_words]


    # Remove lines with two or fewer non-newline words
    filtered_corpus = []
    line = []
    word_count = 0
    for word in lyrics_corpus:
        if word == '\n':
            if word_count > 2:
                filtered_corpus.extend(line)
                filtered_corpus.append('\n')  # Keep the newline
            if word_count == 0:
                filtered_corpus.append('\n')  # Keep the newline
            line = []
            word_count = 0
        else:
            line.append(word)
            word_count += 1

    # If the last line has more than 2 words, add it to the filtered corpus
    if word_count > 2:
        filtered_corpus.extend(line)
        filtered_corpus.append('\n')  # Keep the newline

    return filtered_corpus

cleaned_rap_songs = clean_songs(rap_songs)
cleaned_pop_songs = clean_songs(pop_songs)

print('rap songs length:', len(cleaned_rap_songs))
print('pop songs length:', len(cleaned_pop_songs))

# Counting characters appeared in all lyrics
words_rap = sorted(list(set(cleaned_rap_songs)))
words_pop = sorted(list(set(cleaned_pop_songs)))
print('Rap Total Words:', len(words_rap))
print('Pop Total Words:', len(words_pop))

rap songs length: 3333625
pop songs length: 1903963
Rap Total Words: 64869
Pop Total Words: 25767


In [None]:
words = words_rap + words_pop
print('Total Words:', len(words))

word_to_int = dict((w, i) for i, w in enumerate(words))
int_to_word = dict((i, w) for i, w in enumerate(words))

Total Words: 90636


In [None]:
def create_sentences(lyrics, cleaned_corpus, seq_length, step):
    sentences = []
    next_words = []
    seq_length = 10 # The sentence window size
    step = 1 # The steps between the windows
    sentences = []
    next_words = []

    # Create Target and sentences window
    for i in range(0, len(cleaned_corpus) - seq_length, step):
        # range from current index to sequence length words
        sentences.append(cleaned_corpus[i: i + seq_length])
        next_words.append(cleaned_corpus[i + seq_length]) # the next character

    return sentences, next_words

rap_sentences, rap_next_words = create_sentences(cleaned_rap_songs, cleaned_rap_songs, seq_length=10, step=1)
pop_sentences, pop_next_words = create_sentences(cleaned_pop_songs, cleaned_pop_songs, seq_length=10, step=1)

rap_sentences = np.array(rap_sentences)
rap_next_words = np.array(rap_next_words)

pop_sentences = np.array(pop_sentences)
pop_next_words = np.array(pop_next_words)

# Print sentence window and next words
print('Sentence Window:')
print (rap_sentences[:5])
print('Target Words:')
print (rap_next_words[:5])
print('Number of sequences:', len(rap_sentences))

Sentence Window:
[['\n' 'show' 'a' 'big' 'timer' 'love' 'when' 'you' 'see' 'me']
 ['show' 'a' 'big' 'timer' 'love' 'when' 'you' 'see' 'me' 'riding']
 ['a' 'big' 'timer' 'love' 'when' 'you' 'see' 'me' 'riding' 'dubs']
 ['big' 'timer' 'love' 'when' 'you' 'see' 'me' 'riding' 'dubs' '\n']
 ['timer' 'love' 'when' 'you' 'see' 'me' 'riding' 'dubs' '\n' 'platinum']]
Target Words:
['riding' 'dubs' '\n' 'platinum' 'chain']
Number of sequences: 3333615


In [None]:
# transferring the word to index
def getdata(sentences, genre, next_words, seq_len=10):
    x = np.zeros((len(sentences),seq_len))
    g = np.zeros((len(sentences),seq_len))
    y = np.zeros((len(sentences)))
    length = len(sentences)
    index = 0
    for i in range(len(sentences)):
        sentence = sentences[i]
        for t, word in enumerate(sentence):
            x[i, t] = word_to_int[word]
            g[i, t] = genre
        y[i] = word_to_int[next_words[i]]
    return x, g, y

In [None]:
def encode_genre(genre):
    return 0 if genre == 'rap' else 1  # Encode 'rap' as 0 and 'pop' as 1

In [None]:
train_x_RAP, train_g_RAP, train_y_RAP = getdata(rap_sentences, 0, rap_next_words)
train_x_POP, train_g_POP, train_y_POP = getdata(pop_sentences, 1, pop_next_words)

train_x = np.concatenate([train_x_RAP, train_x_POP])
train_y = np.concatenate([train_y_RAP, train_y_POP])
train_g = np.concatenate([train_g_RAP, train_g_POP])

print('Tensors:')
print('train_x:', train_x)
print('train_y:', train_y)
print('train_g:', train_g)

print('\nTensors Shapes:')
print('Shape of train_x:', train_x.shape)
print('Shape of train_g:', train_g.shape)
print('Shape of train_y:', train_y.shape)

Tensors:
train_x: [[64869. 84979. 65040. ... 90486. 84461. 78726.]
 [84979. 65040. 67095. ... 84461. 78726. 83537.]
 [65040. 67095. 58030. ... 78726. 83537. 71704.]
 ...
 [90486. 68228. 84461. ... 65008. 87482. 70482.]
 [68228. 84461. 64869. ... 87482. 70482. 90486.]
 [84461. 64869. 90490. ... 70482. 90486. 71005.]]
train_y: [83537. 71704. 64869. ... 90486. 71005. 64869.]
train_g: [[0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 ...
 [1. 1. 1. ... 1. 1. 1.]
 [1. 1. 1. ... 1. 1. 1.]
 [1. 1. 1. ... 1. 1. 1.]]

Tensors Shapes:
Shape of train_x: (5237568, 10)
Shape of train_g: (5237568, 10)
Shape of train_y: (5237568,)


## LSTM Network Definition

In [None]:
class Word_LSTM(nn.Module):
    def __init__(self, n_vocab, hidden_dim, embedding_dim, num_genres=2, genre_embedding_dim=128, dropout=0.4):
        super(Word_LSTM, self).__init__()

        self.hidden_dim = hidden_dim
        self.embedding_dim = embedding_dim
        self.genre_embedding_dim = genre_embedding_dim
        self.genre_embedding = nn.Embedding(num_genres, genre_embedding_dim)
        self.word_embeddings = nn.Embedding(n_vocab, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim + genre_embedding_dim, hidden_dim, dropout=dropout, num_layers=2)
        self.fc = nn.Linear(hidden_dim, n_vocab)

    def forward(self, seq_in, genre_in):
        # Embedding layer for words
        embedded_words = self.word_embeddings(seq_in.t())

        # Embedding layer for genre
        genre_embedded = self.genre_embedding(genre_in.t())


        # Concatenate word embeddings with genre embeddings along the feature dimension
        concatenated = torch.cat((embedded_words, genre_embedded), dim=2)

        # LSTM layer
        lstm_out, _ = self.lstm(concatenated)

        # Last hidden state
        ht = lstm_out[-1]

        # Fully connected layer
        out = self.fc(ht)

        return out


In [None]:
# Convert numpy arrays to PyTorch tensors
X_train_tensor = torch.tensor(train_x, dtype=torch.long)
Y_train_tensor = torch.tensor(train_y, dtype=torch.long)
G_train_tensor = torch.tensor(train_g, dtype=torch.long)  # Genre tensor

print('Shape of training_x:', X_train_tensor.shape)
print('Shape of training_y:', Y_train_tensor.shape)
print('Shape of training_genre:', G_train_tensor.shape)

Shape of training_x: torch.Size([5237568, 10])
Shape of training_y: torch.Size([5237568])
Shape of training_genre: torch.Size([5237568, 10])


In [None]:
from sklearn.model_selection import train_test_split

batch_size = 2048

# Split data into training and validation sets
X_train, X_val, Y_train, Y_val, G_train, G_val = train_test_split(train_x, train_y, train_g, test_size=0.3, random_state=42)

# Convert to PyTorch tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.long)
Y_train_tensor = torch.tensor(Y_train, dtype=torch.long)
G_train_tensor = torch.tensor(G_train, dtype=torch.long)
X_val_tensor = torch.tensor(X_val, dtype=torch.long)
Y_val_tensor = torch.tensor(Y_val, dtype=torch.long)
G_val_tensor = torch.tensor(G_val, dtype=torch.long)

# Create PyTorch datasets
train_dataset = torch.utils.data.TensorDataset(X_train_tensor, Y_train_tensor, G_train_tensor)
val_dataset = torch.utils.data.TensorDataset(X_val_tensor, Y_val_tensor, G_val_tensor)

# Data loaders for training and validation sets
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=True)
# Define the model, optimizer, and criterion
model = Word_LSTM(n_vocab=len(words), hidden_dim=256, embedding_dim=256)
optimizer = torch.optim.Adam(model.parameters(), lr=2e-4)  # Using Adam optimizer
criterion = nn.CrossEntropyLoss()


## Training Loop

In [None]:
n_epochs = 20
avg_losses_train = []
avg_losses_val = []

for epoch in range(n_epochs):
    # Training
    model.train()
    avg_loss_train = 0.

    for x_batch, y_batch, g_batch in tqdm(train_loader):
        y_pred = model(x_batch, g_batch)
        loss = criterion(y_pred, y_batch)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        avg_loss_train += loss.item() / len(train_loader)

    avg_losses_train.append(avg_loss_train)

    # Validation
    model.eval()
    avg_loss_val = 0.

    with torch.no_grad():
        for x_batch, y_batch, g_batch in tqdm(val_loader):
            y_pred = model(x_batch, g_batch)
            loss = criterion(y_pred, y_batch)

            avg_loss_val += loss.item() / len(val_loader)

    avg_losses_val.append(avg_loss_val)

    print(f'Epoch {epoch + 1}/{n_epochs}, Train Loss: {avg_loss_train:.4f}, Val Loss: {avg_loss_val:.4f}')


 85%|████████▌ | 1531/1791 [24:50<04:13,  1.03it/s]


KeyboardInterrupt: 

## Train Loss plot

In [None]:
import matplotlib.pyplot as plt

# Plotting
plt.plot(range(1, n_epochs + 1), avg_losses_train, label='Train Loss')
plt.plot(range(1, n_epochs + 1), avg_losses_val, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()
plt.show()

In [None]:
# Save model and dictionaries
torch.save({
    'model_state_dict': model.state_dict(),
    'word_to_int': word_to_int,
    'int_to_word': int_to_word
}, '/content/drive/MyDrive/Final_Project/Models/model.pt')
