In [None]:
# data_handler.py

import pandas as pd
import numpy as np
import re
import os
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from transformers import BertTokenizer, TFBertModel
from tqdm import tqdm
import json # For saving/loading tokenizer

def load_and_preprocess_data(file_path, text_column, label_columns):
    """
    Loads data from an Excel file and performs initial preprocessing.

    Args:
        file_path (str): Path to the Excel dataset.
        text_column (str): Name of the column containing the text data (lyrics).
        label_columns (list): List of column names containing the emotion labels.

    Returns:
        pandas.DataFrame: Processed DataFrame.
    Raises:
        FileNotFoundError: If the specified file does not exist.
        ValueError: If essential columns are missing or data is invalid.
    """
    print(f"Loading data from {file_path}...")
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"Error: The file '{file_path}' was not found. Please check the path.")

    try:
        df = pd.read_excel(file_path)
        print("Data loaded successfully.")
        print(f"Dataset shape: {df.shape}")
        print("Dataset head:")
        print(df.head())
    except Exception as e:
        raise ValueError(f"An error occurred during data loading: {e}")

    # Drop rows where lyrics are missing or empty
    df.dropna(subset=[text_column], inplace=True)
    df = df[df[text_column].astype(str).str.strip() != '']
    if df.empty:
        raise ValueError("Error: No valid lyrics found after dropping missing/empty rows.")

    print("\nStarting text preprocessing...")
    df[text_column] = df[text_column].apply(clean_text)
    print("Text cleaning complete.")

    # Ensure label columns exist and are numeric (0 or 1)
    print("\nPreparing labels...")
    for col in label_columns:
        if col not in df.columns:
            raise ValueError(f"Error: Label column '{col}' not found in the dataset. Please check LABEL_COLUMNS configuration.")
        # Ensure they are integers (0 or 1)
        df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0).astype(int)
        # Basic check for non-binary values that might indicate an issue
        if not df[col].isin([0, 1]).all():
            print(f"Warning: Column '{col}' contains values other than 0 or 1 after conversion. This might affect multi-label binarization.")
    print("Labels prepared.")
    return df

def clean_text(text):
    """
    Cleans the input text by converting to lowercase, removing punctuation,
    and extra whitespace.

    Args:
        text (str): The input string to clean.

    Returns:
        str: The cleaned string.
    """
    text = str(text).lower() # Convert to lowercase
    text = re.sub(r'[^a-z0-9\s]', '', text) # Remove punctuation and special characters
    text = re.sub(r'\s+', ' ', text).strip() # Remove extra whitespace
    return text

def tokenize_and_pad_sequences(texts, max_words, max_sequence_length, tokenizer=None):
    """
    Tokenizes text data and pads sequences.

    Args:
        texts (pd.Series): Series of text data.
        max_words (int): Maximum number of unique words to keep.
        max_sequence_length (int): Max length for padding sequences.
        tokenizer (tf.keras.preprocessing.text.Tokenizer, optional): Pre-fitted tokenizer.
                                                                     If None, a new one is fitted.

    Returns:
        tuple: (padded_sequences, tokenizer)
    """
    if tokenizer is None:
        tokenizer = Tokenizer(num_words=max_words, oov_token="<unk>")
        tokenizer.fit_on_texts(texts)
        print(f"Found {len(tokenizer.word_index)} unique tokens.")
    else:
        print("Using provided tokenizer.")

    sequences = tokenizer.texts_to_sequences(texts)
    padded_sequences = pad_sequences(sequences, maxlen=max_sequence_length, padding='post', truncating='post')
    print(f"Padded sequences shape: {padded_sequences.shape}")
    return padded_sequences, tokenizer

def get_bert_embeddings(texts, max_len=512, batch_size=32):
    """
    Generates BERT embeddings for a list of texts.

    Args:
        texts (pd.Series): Series of text data.
        max_len (int): Maximum sequence length for BERT tokenizer.
        batch_size (int): Batch size for processing texts for BERT embeddings.

    Returns:
        numpy.ndarray: Array of BERT embeddings.
    """
    print("\nInitializing BERT Tokenizer and Model...")
    tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
    bert_model = TFBertModel.from_pretrained('bert-base-uncased')
    print("Generating BERT embeddings...")
    embeddings = []
    for i in tqdm(range(0, len(texts), batch_size)):
        batch = texts[i:i+batch_size]
        tokens = tokenizer(batch.tolist(), padding='max_length', truncation=True,
                           max_length=max_len, return_tensors='tf')
        outputs = bert_model(tokens)['last_hidden_state'][:, 0, :]  # CLS token
        embeddings.append(outputs.numpy())
    print("BERT embeddings generated successfully.")
    return np.concatenate(embeddings, axis=0)

def split_data(X, y, validation_split, random_state=42):
    """
    Splits data into training and validation sets.

    Args:
        X (numpy.ndarray): Features (padded sequences or embeddings).
        y (numpy.ndarray): Labels.
        validation_split (float): Proportion of data to use for validation.
        random_state (int): Seed for random splitting.

    Returns:
        tuple: (X_train, X_val, y_train, y_val)
    """
    print(f"\nSplitting data into {100*(1-validation_split)}% train and {100*validation_split}% validation...")
    X_train, X_val, y_train, y_val = train_test_split(
        X, y, test_size=validation_split, random_state=random_state)
    print(f"Train data shape: {X_train.shape}, Train labels shape: {y_train.shape}")
    print(f"Validation data shape: {X_val.shape}, Validation labels shape: {y_val.shape}")
    return X_train, X_val, y_train, y_val

def save_tokenizer(tokenizer, path):
    """Saves the tokenizer configuration to a JSON file."""
    tokenizer_json = tokenizer.to_json()
    with open(path, 'w', encoding='utf-8') as f:
        f.write(json.dumps(tokenizer_json, ensure_ascii=False))
    print(f"Tokenizer saved to {path}")

def load_tokenizer(path):
    """Loads the tokenizer configuration from a JSON file."""
    with open(path, 'r', encoding='utf-8') as f:
        tokenizer_json = json.load(f)
    tokenizer = Tokenizer.from_json(tokenizer_json)
    print(f"Tokenizer loaded from {path}")
    return tokenizer