In [2]:
# Import dependencies:

import numpy as np
import pandas as pd
import string
import re
import gensim.downloader as api
from collections import Counter, defaultdict

# NLP Libraries
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import cmudict
from gensim.downloader import load as gensim_load
# import spacy  # Uncomment if spaCy is used

# Feature Engineering
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.preprocessing import MinMaxScaler,LabelEncoder
from sklearn.decomposition import TruncatedSVD

# Sparse Matrix Operations
from scipy.sparse import csr_matrix, hstack, vstack

# Machine Learning
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
from joblib import dump, load


## Dataset Construction
The dataset originates from the Hugging Face genius-song-lyrics repository, which serves as the initial source of song lyrics. To further enhance the dataset and ensure a more comprehensive collection, additional lyrics were scraped directly from Genius.com, a widely recognized platform that hosts lyrics across various genres.
In final version, the dataset encompasses a diverse range of musical styles, including rock, rap, pop, country, and R&B. 

In [31]:
df = pd.read_csv('./data/uni.csv')
df=df.drop_duplicates(subset='lyrics', keep='first', ignore_index=False)
df.groupby('tag').size()

tag
country    24766
pop        24789
rap        24862
rb         24828
rock       24846
dtype: int64

In [3]:
 # Training dataset 10k songs for each genre
df = pd.read_csv('./data/uniform.csv') 
df=df.drop_duplicates(subset='lyrics', keep='first', ignore_index=False)
df.groupby('tag').size()

tag
country    10000
pop        10000
rap        10000
rb         10000
rock       10000
dtype: int64

In [3]:
df = pd.read_csv('./data/train_samp.csv')
#split lyrics into chorus and non-chorus subsets
df=df.drop_duplicates(subset='lyrics', keep='first', ignore_index=False)
chorus_data = []
non_chorus = []
for idx, row in df.iterrows():
    rlyr = row['lyrics']
    genre = row['tag']
    sections = re.split(r'\[(.*?)\]', rlyr)
    if sections[0].strip():
        content = sections[0].strip()
        data = {"tag": genre, "lyrics": content}
        chorus_data.append(data)  #unlabed as chorus
        
    for i in range(2, len(sections), 2):
        if sections[i-1] == '' or sections[i] =='':
            continue
        section_type = sections[i-1].strip().lower()
        content = sections[i].strip()
        if content:
            data = {"tag":genre,"lyrics":content}
            # Handle chorus sections
            if 'chor' in section_type:
                chorus_data.append(data)
                
            else:
                non_chorus.append(data)
        

chorus_df = pd.DataFrame(chorus_data)
non_chorus_df = pd.DataFrame(non_chorus)


## PREPROCESS

In [7]:
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import contractions

class LyricsPreprocessor:
    def __init__(self, custom_stopwords=None):
        """
        Initialize the preprocessor with optional custom stopwords.
        
        Args:
            custom_stopwords (list): Additional stopwords specific to lyrics
        """
        self.lemmatizer = WordNetLemmatizer()
        self.stop_words = set(stopwords.words('english'))
        if custom_stopwords:
            # Add custom stopwords for lyrics
            self.custom_stopwords = set(custom_stopwords or [])
            self.stop_words.update(self.custom_stopwords)
    
    def expand_contractions(self, text):
        """Expand contractions like "I'm" to "I am"."""
        return contractions.fix(text)
    
    def only_remove_non_alphanum(self, text):
        text = re.sub(r'[^a-zA-Z0-9\s\']', ' ', text)
        # Remove standalone apostrophes
        text = re.sub(r'\s\'|\'\s', ' ', text)
        return self.remove_extra_whitespace(text)
    
    def remove_special_chars(self, text):
        """Remove special characters and digits."""
        # Keep apostrophes for contractions but remove other special chars
        text = re.sub(r'[^a-zA-Z\s\']', ' ', text)
        # Remove standalone apostrophes
        text = re.sub(r'\s\'|\'\s', ' ', text)
        return text
    
    def remove_extra_whitespace(self, text):
        """Remove extra whitespace and newlines."""
        return ' '.join(text.split())
    
    def lemmatize_text(self, text):
        """Lemmatize words to their root form."""
        words = word_tokenize(text)
        return ' '.join([self.lemmatizer.lemmatize(word) for word in words])
    
    def remove_stopwords(self, text):
        """Remove common stopwords."""
        words = text.split()
        return ' '.join([word for word in words if word.lower() not in self.stop_words])
    
    def preprocess_text(self, text, steps=None):
        if steps is None:
            steps = ['expand_contractions', 'remove_special_chars', 
                    'remove_extra_whitespace', 'lemmatize_text', 'remove_stopwords']
        
        # Convert to lowercase first
        text = str(text).lower()
        
        for step in steps:
            if hasattr(self, step):
                text = getattr(self, step)(text)
        
        return text
    
    def preprocess_dataframe(self, df, column='lyrics', steps=None, inplace=False):

        if not inplace:
            df = df.copy()
        
        # Add preprocessed column
        preprocessed_column = f'preprocessed_{column}'
        df[preprocessed_column] = df[column].apply(lambda x: self.preprocess_text(x, steps))
        
        return df
    
def preprocess_df(df):
    return LyricsPreprocessor().preprocess_dataframe(chorus_df)
ch_processed = preprocess_df(chorus_df)

## Train Test single subset df

In [3]:
from scipy.sparse import vstack
def test(model, X_test, y_test):
    # Step 6: Evaluate the model
    y_pred = model.predict(X_test)
    print("Accuracy:", round(accuracy_score(y_test, y_pred)*100, 2))
    print("Classification Report:\n", classification_report(y_test, y_pred))
   
def tfidf_weighted_combined(df1, df2, text_column='lyrics', weight1=1.0, weight2=0.4):
    # Fill missing values with empty strings and ensure all data is string
    df1[text_column] = df1[text_column].fillna('').astype(str)
    df2[text_column] = df2[text_column].fillna('').astype(str)

    # Initialize TF-IDF vectorizer
    tfidf = TfidfVectorizer(max_features=3000, ngram_range=(1, 3), dtype=np.float32)

    # Transform chorus and non-chorus parts separately
    chorus_features = tfidf.fit_transform(df1[text_column]) * weight1
    non_chorus_features = tfidf.transform(df2[text_column]) * weight2

    # Combine the features
    combined_features = vstack([chorus_features, non_chorus_features])

    # Combine target labels
    combined_labels = np.concatenate([df1['tag'].values, df2['tag'].values])

    return combined_features, combined_labels, tfidf



def extract_and_reduce_features(df1, df2, weight1=1.0, weight2=1.0):
    # Get preprocessed features
    X_combined, y_combined, tfidf = tfidf_weighted_combined(
        df1, df2, weight1=weight1, weight2=weight2
    )
    
    # Dimensionality reduction
    svd = TruncatedSVD(n_components=400, random_state=42)
    X_combined = svd.fit_transform(X_combined)
    return X_combined, y_combined, tfidf, svd

def train_weighted_pipeline(df1, df2, model, weight1=1.0, weight2=0.8, model_name='weighted_model'):
    # Get preprocessed features
    
    X_reduced , y_combined, tfidf,svd = extract_and_reduce_features(df1,df2,weight1,weight2)
    
    # Split into training and test sets
    X_train, X_test, y_train, y_test = train_test_split(
        X_reduced, y_combined, test_size=0.1, random_state=42
    )
    
    pipeline = Pipeline([
        ('classifier', model)  # Classifier only
    ])

    # Train the classifier
    pipeline.named_steps['classifier'].fit(X_train, y_train)

    # Save the pipeline with tfidf and svd as additional components
    dump({'pipeline': pipeline, 'tfidf': tfidf, 'svd': svd}, model_name + '_pipeline.joblib')


    # Evaluate the model
    y_pred = pipeline.predict(X_test)
    
    print("Accuracy:", accuracy_score(y_test, y_pred))
    print("Classification Report:\n", classification_report(y_test, y_pred))
    
# X_reduced, y_combined, tfidf, svd = extract_features(chorus_df, non_chorus_df,weight1=0.9, weight2=0.6)
# # Split into training and test sets
# X_train, X_test, y_train, y_test = train_test_split(
#     X_reduced, y_combined, test_size=0.1, random_state=42
# )

# test(load("w-rf-pip_pipeline.joblib"),X_test, y_test)
train_weighted_pipeline(chorus_df, non_chorus_df, RandomForestClassifier( n_jobs=-1), 
                     weight1=1.5, weight2=0.9, model_name="4k-svd4-w")

Accuracy: 0.614614191000141
Classification Report:
               precision    recall  f1-score   support

     country       0.61      0.53      0.57      1316
         pop       0.75      0.29      0.42       914
         rap       0.68      0.79      0.74      1901
          rb       0.53      0.75      0.62      1831
        rock       0.64      0.46      0.54      1127

    accuracy                           0.61      7089
   macro avg       0.64      0.56      0.57      7089
weighted avg       0.63      0.61      0.60      7089



## Custom Feature Extractor

In [4]:
from collections import Counter, defaultdict
import numpy as np
import string
import gensim.downloader as api

# NLP Libraries
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import cmudict
from gensim.downloader import load as gensim_load
# import spacy  # Uncomment if spaCy is used

# Feature Engineering
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.preprocessing import MinMaxScaler,LabelEncoder
from sklearn.decomposition import TruncatedSVD

# Sparse Matrix Operations
from scipy.sparse import csr_matrix, hstack, vstack

# Machine Learning
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
from joblib import dump, load

class FeatureExtractor:
    def __init__(self):
        """
        Initialize the FeatureExtractor with an optional word embedding model.
        """
        try:
            self.word_embedding_model = api.load('word2vec-google-news-300')
        except:
            print("Memory error loading word2vec")
            self.word_embedding_model = None
    def _count_repeated_lines(self, text):
        """
        Count repeated lines in the text.
        """
        lines = text.split("\n")
        return len(lines) - len(set(lines))

    def get_stylistic_features(self, text):
        """
        Generate stylistic features from the text.
        """
        exclamation_count = text.count("!")
        question_count = text.count("?")
        uppercase_ratio = sum(1 for c in text if c.isupper()) / len(text) if text else 0
        digit_ratio = sum(1 for c in text if c.isdigit()) / len(text) if text else 0
        punctuation_ratio = (
            sum(1 for c in text if c in string.punctuation) / len(text) if text else 0
        )
        repeated_lines = self._count_repeated_lines(text)
        return [
            exclamation_count,
            question_count,
            uppercase_ratio,
            digit_ratio,
            punctuation_ratio,
            repeated_lines,
        ]

    def get_statistical_features(self, text):
        """
        Extract statistical features from text.
        """
        words = text.split()
        word_count = len(words)
        unique_word_count = len(set(words))
        avg_word_length = np.mean([len(word) for word in words]) if words else 0
        return [word_count, unique_word_count, avg_word_length]

    def get_embedding_features(self, text, weight=1.0):
        """
        Compute average word embedding for the text.
        """
        if not self.word_embedding_model:
            return [0] * self.word_embedding_model.vector_size  # Placeholder if no model
        words = [word for word in text.split() if word in self.word_embedding_model]
        if not words:
            return [0] * self.word_embedding_model.vector_size
        return np.mean([self.word_embedding_model[word] for word in words], axis=0) * weight

    def combine_statistical_and_stylistic_features(self, chorus_texts, non_chorus_texts, weight1=1.5, weight2=1.0):
        """
        Combine statistical and stylistic features with weighted chorus features.
        """
        def extract_features(texts):
            stats = [self.get_statistical_features(text) for text in texts]
            styles = [self.get_stylistic_features(text) for text in texts]
            return np.hstack((stats, styles))

        # Extract and weight features
        chorus_features = extract_features(chorus_texts) * weight1
        non_chorus_features = extract_features(non_chorus_texts) * weight2

        # Combine and normalize features
        combined_features = np.vstack((chorus_features, non_chorus_features))
        normalized_features = MinMaxScaler().fit_transform(combined_features)
        if chorus_features.size == 0 or non_chorus_features.size == 0:
            raise ValueError("Features for chorus or non-chorus texts are missing.")

        return normalized_features

    def prepare_combined_features(self, chorus_texts, non_chorus_texts, weight1=1.5, weight2=1.0):
        """
        Combine TF-IDF, statistical, stylistic, and word embedding features.
        """
        # Combine statistical and stylistic features
        stats_styles_features = self.combine_statistical_and_stylistic_features(
            chorus_texts, non_chorus_texts, weight1, weight2
        )
        stats_styles_sparse = csr_matrix(stats_styles_features)

        # Compute word embeddings
        embeddings_chorus = np.array(
            [self.get_embedding_features(text, weight1) for text in chorus_texts]
        )
        embeddings_non_chorus = np.array(
            [self.get_embedding_features(text, weight2) for text in non_chorus_texts]
        )
        combined_embeddings = np.vstack((embeddings_chorus, embeddings_non_chorus))
        embeddings_sparse = csr_matrix(combined_embeddings)

        # Combine all features
        return stats_styles_sparse, embeddings_sparse

        
def tfidf_weighted_combined(df1, df2, text_column='lyrics', weight1=1.0, weight2=0.4):
    # Fill missing values with empty strings and ensure all data is string
    df1[text_column] = df1[text_column].fillna('').astype(str)
    df2[text_column] = df2[text_column].fillna('').astype(str)

    # Initialize TF-IDF vectorizer
    tfidf = TfidfVectorizer(max_features=3000, ngram_range=(1, 3), dtype=np.float32)

    # Transform chorus and non-chorus parts separately
    chorus_features = tfidf.fit_transform(df1[text_column]) * weight1
    non_chorus_features = tfidf.transform(df2[text_column]) * weight2

    # Combine the features
    combined_features = vstack([chorus_features, non_chorus_features])

    # Combine target labels
    combined_labels = np.concatenate([df1['tag'].values, df2['tag'].values])

    return combined_features, combined_labels, tfidf

def custom_weighted_final(df1, df2, model, weight1=1.0, weight2=1.0, model_name="c-w"):
    # Example TF-IDF features (replace with actual computation)
    X_tfidf, y_combined, tfidf = tfidf_weighted_combined(
            df1, df2, weight1=weight1, weight2=weight2)

    # Initialize feature extractor with a word embedding model
    feature_extractor = FeatureExtractor()
    # Prepare combined features
    f1,f2 = feature_extractor.prepare_combined_features(
        df1['lyrics'], df2['lyrics'], weight1=weight1,weight2=weight2
    )
    F_unified = hstack([X_tfidf, f1,f2])
    
    # Shape of combined features
    print("Combined feature shape:", F_unified.shape)
    svd = TruncatedSVD(n_components=400, random_state=42)
    F_unified = svd.fit_transform(F_unified)
    
    # Train-test split
    X_train, X_test, y_train, y_test = train_test_split(
        F_unified, y_combined, test_size=0.2, random_state=42
    )

    pipeline = Pipeline([
        ('classifier', model)  # Classifier only
    ])

    # Train the classifier
    pipeline.named_steps['classifier'].fit(X_train, y_train)

    # Save the pipeline with tfidf and svd as additional components
    dump({'pipeline': pipeline, 'tfidf': tfidf, 'svd': svd, 
        'feature_extractor': feature_extractor,
          }, model_name + '_pipeline.joblib')


    # Evaluate the model
    y_pred = pipeline.predict(X_test)
    print("Accuracy:", accuracy_score(y_test, y_pred))
    return y_test, y_pred

y_test, y_pred=custom_weighted_final(chorus_df,non_chorus_df, RandomForestClassifier(n_jobs=-1), 
                      weight1=0.9, weight2=0.6, model_name="custom-svd4-10k")
print("Classification Report:\n")
print(classification_report(y_test, y_pred))


Memory error loading word2vec


AttributeError: 'NoneType' object has no attribute 'vector_size'