In [7]:
import os
import pandas as pd
import numpy as np
import re
import string
import time
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix, ConfusionMatrixDisplay
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import SelectKBest, f_classif, f_regression
from skopt import BayesSearchCV
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from xgboost import XGBClassifier
from sentence_transformers import SentenceTransformer


ANNOTATIONS_PATH = "/kaggle/input/datasets/rkhalm/pmemo2019/PMEmo2019/annotations/static_annotations.csv"
LYRICS_PATH = "/kaggle/input/datasets/rkhalm/pmemo2019/PMEmo2019/lyrics"
FEATURES_PATH = "/kaggle/input/datasets/rkhalm/pmemo2019/PMEmo2019/features/static_features.csv"


annotations = pd.read_csv(ANNOTATIONS_PATH)
features = pd.read_csv(FEATURES_PATH)

annotations = annotations.sort_values("musicId").reset_index(drop=True)
features = features.sort_values("musicId").reset_index(drop=True)

merged = pd.merge(features, annotations, on="musicId", how="inner")


def load_lyrics(music_id):
    path = os.path.join(LYRICS_PATH, f"{music_id}.lrc")
    try:
        with open(path, encoding="utf-8") as f:
            return f.read()
    except:
        return ""

merged["lyrics"] = merged["musicId"].apply(load_lyrics).fillna("")



#Text cleaning
def clean_text(text):
    text = text.lower()
    text = re.sub(r"\[.*?\]", "", text)
    text = re.sub(r"https?://\S+|www\.\S+", "", text)
    text = re.sub(r"\d+", "", text)
    text = re.sub(f"[{re.escape(string.punctuation)}]", "", text)
    text = re.sub(r"\s+", " ", text).strip()
    return text

merged["clean_lyrics"] = merged["lyrics"].apply(clean_text)


#BERT embeddings
encoder = SentenceTransformer("all-MiniLM-L6-v2")
X_text = encoder.encode(merged["clean_lyrics"].tolist(), show_progress_bar=True)


#Targets
y_val = merged["Valence(mean)"].values
y_aro = merged["Arousal(mean)"].values


#Audio features
X_audio_raw = merged.drop(columns=[
    "musicId",
    "Arousal(mean)",
    "Valence(mean)",
    "lyrics",
    "clean_lyrics"
]).values


#Split
idx = np.arange(len(merged))
idx_train, idx_test = train_test_split(idx, test_size=0.2, random_state=42)

X_audio_train = X_audio_raw[idx_train]
X_audio_test = X_audio_raw[idx_test]

X_text_train = X_text[idx_train]
X_text_test = X_text[idx_test]

y_val_train, y_val_test = y_val[idx_train], y_val[idx_test]
y_aro_train, y_aro_test = y_aro[idx_train], y_aro[idx_test]


#Scaling
scaler = StandardScaler()
X_audio_train_sc = scaler.fit_transform(X_audio_train)
X_audio_test_sc = scaler.transform(X_audio_test)

k = min(400, X_audio_train_sc.shape[1])


#Feature selection
selector_val = SelectKBest(f_regression, k=k)
X_audio_train_val = selector_val.fit_transform(X_audio_train_sc, y_val_train)
X_audio_test_val = selector_val.transform(X_audio_test_sc)

selector_aro = SelectKBest(f_regression, k=k)
X_audio_train_aro = selector_aro.fit_transform(X_audio_train_sc, y_aro_train)
X_audio_test_aro = selector_aro.transform(X_audio_test_sc)


#Early fusion
X_train_val = np.hstack([X_text_train, X_audio_train_val])
X_test_val  = np.hstack([X_text_test,  X_audio_test_val])

X_train_aro = np.hstack([X_text_train, X_audio_train_aro])
X_test_aro  = np.hstack([X_text_test,  X_audio_test_aro])


Batches:   0%|          | 0/24 [00:00<?, ?it/s]

In [8]:
from sklearn.pipeline import Pipeline
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.linear_model import Ridge
from sklearn.svm import SVR
from sklearn.ensemble import RandomForestRegressor
from sklearn.neighbors import KNeighborsRegressor
from xgboost import XGBRegressor
from skopt import BayesSearchCV

models = {
    'Ridge': (
        Ridge(),
        {'alpha': (1e-3, 1e+3, 'log-uniform')}
    ),

    'SVR': (
        SVR(),
        {
            'C': (1e-3, 1e+3, 'log-uniform'),
            'gamma': (1e-4, 1e-1, 'log-uniform'),
            'kernel': ['rbf']
        }
    ),

    'Random Forest': (
        RandomForestRegressor(),
        {
            'n_estimators': (100, 1000),
            'max_depth': (5, 50),
            'min_samples_split': (2, 20),
            'min_samples_leaf': (1, 10)
        }
    ),

    'XGBoost': (
        XGBRegressor(
            objective='reg:squarederror',
            tree_method='hist',
            device='cuda'
        ),
        {
            'n_estimators': (100, 1000),
            'max_depth': (3, 15),
            'learning_rate': (1e-2, 3e-1, 'log-uniform'),
            'subsample': (0.5, 1.0),
            'colsample_bytree': (0.5, 1.0)
        }
    ),

    'k-Nearest Neighbors': (
        KNeighborsRegressor(),
        {
            'n_neighbors': (1, 30),
            'weights': ['uniform', 'distance'],
            'p': [1, 2]
        }
    )
}


def reg_metrics(y_true, y_pred):
    mae = mean_absolute_error(y_true, y_pred)
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    r2 = r2_score(y_true, y_pred)
    return mae, rmse, r2

results_val = []
trained_models_val = {}

# VALENCE REGRESSION

In [9]:
for name, (model, param_grid) in tqdm(models.items()):
    print(f"\nTraining {name}...")

    pipeline = Pipeline([('model', model)])

    opt = BayesSearchCV(
        estimator=pipeline,
        search_spaces={'model__' + k: v for k, v in param_grid.items()},
        n_iter=40,
        scoring='neg_root_mean_squared_error',  # оптимизируем RMSE
        cv=3,
        n_jobs=-1,
        verbose=0,
        random_state=42,
        refit=True,
    )

    opt.fit(X_train_val, y_val_train)

    y_pred = opt.predict(X_test_val)

    mae, rmse, r2 = reg_metrics(y_val_test, y_pred)

    print(f"Model: {name}, MAE: {mae:.4f}, RMSE: {rmse:.4f}, R2: {r2:.4f}")

    results_val.append({
        'Model': name,
        'Best Params': opt.best_params_,
        'MAE': float(mae),
        'RMSE': float(rmse),
        'R2': float(r2),
        'Best CV (neg_RMSE)': float(opt.best_score_),
    })

    trained_models_val[name] = opt

results_val_df = pd.DataFrame(results_val).sort_values(by='RMSE', ascending=True)
display(results_val_df)

  0%|          | 0/5 [00:00<?, ?it/s]


Training Ridge...


 20%|██        | 1/5 [00:26<01:47, 26.82s/it]

Model: Ridge, MAE: 0.0902, RMSE: 0.1149, R2: 0.4360

Training SVR...


 40%|████      | 2/5 [00:59<01:30, 30.27s/it]

Model: SVR, MAE: 0.0901, RMSE: 0.1135, R2: 0.4494

Training Random Forest...


 60%|██████    | 3/5 [1:17:00<1:09:57, 2098.93s/it]

Model: Random Forest, MAE: 0.0890, RMSE: 0.1121, R2: 0.4626

Training XGBoost...


Potential solutions:
- Use a data structure that matches the device ordinal in the booster.
- Set the device for booster before call to inplace_predict.


  return func(**kwargs)
Potential solutions:
- Use a data structure that matches the device ordinal in the booster.
- Set the device for booster before call to inplace_predict.


  return func(**kwargs)
Potential solutions:
- Use a data structure that matches the device ordinal in the booster.
- Set the device for booster before call to inplace_predict.


  return func(**kwargs)
Potential solutions:
- Use a data structure that matches the device ordinal in the booster.
- Set the device for booster before call to inplace_predict.


  return func(**kwargs)
Potential solutions:
- Use a data structure that matches the device ordinal in the booster.
- Set the device for booster before call to inplace_predict.


  return func(**kwargs)


Model: XGBoost, MAE: 0.0875, RMSE: 0.1089, R2: 0.4931

Training k-Nearest Neighbors...


100%|██████████| 5/5 [1:33:49<00:00, 1125.88s/it]

Model: k-Nearest Neighbors, MAE: 0.0914, RMSE: 0.1146, R2: 0.4388





Unnamed: 0,Model,Best Params,MAE,RMSE,R2,Best CV (neg_RMSE)
3,XGBoost,{'model__colsample_bytree': 0.6549662025178873...,0.087496,0.1089,0.493087,-0.112645
2,Random Forest,"{'model__max_depth': 9, 'model__min_samples_le...",0.088986,0.112127,0.462603,-0.116551
1,SVR,"{'model__C': 0.5130480864902878, 'model__gamma...",0.090135,0.113492,0.449439,-0.113329
4,k-Nearest Neighbors,"{'model__n_neighbors': 27, 'model__p': 2, 'mod...",0.091368,0.114584,0.43879,-0.119051
0,Ridge,{'model__alpha': 356.05207413580825},0.090221,0.114874,0.435952,-0.112594


In [10]:
results_val_df.to_csv('valence_regression_results.csv', index=False)

# AROUSAL REGRESSION

In [13]:
import ast
from sklearn.pipeline import Pipeline

def parse_best_params(x):
    if isinstance(x, dict):
        return x
    if isinstance(x, str):
        x = x.strip()
        if not x:
            return {}
        return ast.literal_eval(x)
    return {}

#Model -> Best Params (FROM VALENCE TUNING)
best_params_map = {
    row["Model"]: parse_best_params(row["Best Params"])
    for _, row in results_val_df.iterrows()
}

results_aro = []
trained_models_aro = {}

for name, (model, param_grid) in tqdm(models.items()):
    print(f"\nRefit {name} with params from results_val_df...")

    if name not in best_params_map:
        print(f"[SKIP] {name} not found in results_val_df")
        continue

    best_params = best_params_map[name]
    pipeline = Pipeline([('model', model)])

    if best_params:
        pipeline.set_params(**best_params)

    pipeline.fit(X_train_aro, y_aro_train)
    y_pred = pipeline.predict(X_test_aro)

    mae, rmse, r2 = reg_metrics(y_aro_test, y_pred)

    print(f"Model: {name}, MAE: {mae:.4f}, RMSE: {rmse:.4f}, R2: {r2:.4f}")

    results_aro.append({
        "Model": name,
        "Best Params": best_params,
        "MAE": float(mae),
        "RMSE": float(rmse),
        "R2": float(r2),
    })

    trained_models_aro[name] = pipeline

results_aro_df = pd.DataFrame(results_aro).sort_values(by="RMSE", ascending=True).reset_index(drop=True)
display(results_aro_df)

 20%|██        | 1/5 [00:00<00:00,  6.50it/s]


Refit Ridge with params from results_val_df...
Model: Ridge, MAE: 0.0732, RMSE: 0.0958, R2: 0.7293

Refit SVR with params from results_val_df...


 40%|████      | 2/5 [00:00<00:00,  7.13it/s]

Model: SVR, MAE: 0.0758, RMSE: 0.0998, R2: 0.7060

Refit Random Forest with params from results_val_df...


 60%|██████    | 3/5 [03:52<03:32, 106.22s/it]

Model: Random Forest, MAE: 0.0747, RMSE: 0.1022, R2: 0.6922

Refit XGBoost with params from results_val_df...


100%|██████████| 5/5 [03:55<00:00, 47.01s/it] 

Model: XGBoost, MAE: 0.0718, RMSE: 0.0986, R2: 0.7130

Refit k-Nearest Neighbors with params from results_val_df...
Model: k-Nearest Neighbors, MAE: 0.0784, RMSE: 0.1043, R2: 0.6790





Unnamed: 0,Model,Best Params,MAE,RMSE,R2
0,Ridge,{'model__alpha': 356.05207413580825},0.073234,0.095791,0.729315
1,XGBoost,{'model__colsample_bytree': 0.6549662025178873...,0.071819,0.098627,0.713046
2,SVR,"{'model__C': 0.5130480864902878, 'model__gamma...",0.075768,0.099834,0.705982
3,Random Forest,"{'model__max_depth': 9, 'model__min_samples_le...",0.074736,0.102153,0.692165
4,k-Nearest Neighbors,"{'model__n_neighbors': 27, 'model__p': 2, 'mod...",0.078388,0.104308,0.679038


In [14]:
results_aro_df.to_csv('arousal_regression_results.csv', index=False)

In [None]:
# import umap.umap_ as umap
# import pandas as pd

# # Преобразуем в DataFrame для удобства
# df_vis = pd.DataFrame(X_text)
# df_vis['label'] = y_quadrant

# # Возьмем равное число примеров из каждого класса
# sampled_df = df_vis.groupby('label').sample(n=25, random_state=42)
# X_sample = sampled_df.drop(columns='label').values
# y_sample = sampled_df['label'].values


# import matplotlib.pyplot as plt
# import umap.umap_ as umap

# # UMAP-проекция в 2D
# reducer = umap.UMAP(n_neighbors=15, min_dist=0.1, metric='cosine', random_state=42)
# X_umap = reducer.fit_transform(X_sample)

# # Визуализация
# plt.figure(figsize=(10, 7))
# scatter = plt.scatter(
#     X_umap[:, 0], X_umap[:, 1],
#     c=y_sample,
#     cmap='tab10',    # 10 классов
#     alpha=0.8,
#     s=50             # размер точек
# )
# plt.colorbar(scatter, label='Emotion Quadrant')
# plt.title("UMAP Visualization")
# plt.xlabel("UMAP 1")
# plt.ylabel("UMAP 2")
# plt.grid(True)
# plt.tight_layout()
# plt.show()


In [17]:
import numpy as np
from sklearn.metrics import accuracy_score, f1_score, precision_recall_fscore_support, confusion_matrix

#Russell model of emotion
def assign_quadrant(valence, arousal, thr=0.5):
    if valence <= thr and arousal <= thr:
        return 2
    elif valence > thr and arousal <= thr:
        return 3
    elif valence <= thr and arousal > thr:
        return 1
    else:
        return 0

def quadrant_vec(v, a, thr=0.5):
    v = np.asarray(v).ravel()
    a = np.asarray(a).ravel()
    return np.array([assign_quadrant(v[i], a[i], thr=thr) for i in range(len(v))], dtype=int)
    
y_true_quad = quadrant_vec(y_val_test, y_aro_test, thr=0.5)

In [20]:
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

def reg_metrics(y_true, y_pred):
    mae = mean_absolute_error(y_true, y_pred)
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    r2 = r2_score(y_true, y_pred)
    return mae, rmse, r2

trained_models_val = dict(trained_models_val)
trained_models_aro = dict(trained_models_aro)

#Valence LinearRegression
lin_val = LinearRegression()
lin_val.fit(X_train_val, y_val_train)
v_pred = lin_val.predict(X_test_val)
mae_v, rmse_v, r2_v = reg_metrics(y_val_test, v_pred)
print(f"LinearReg (Valence): MAE={mae_v:.4f} RMSE={rmse_v:.4f} R2={r2_v:.4f}")

trained_models_val["Linear Regression"] = lin_val

#Arousal LinearRegression
lin_aro = LinearRegression()
lin_aro.fit(X_train_aro, y_aro_train)
a_pred = lin_aro.predict(X_test_aro)
mae_a, rmse_a, r2_a = reg_metrics(y_aro_test, a_pred)
print(f"LinearReg (Arousal): MAE={mae_a:.4f} RMSE={rmse_a:.4f} R2={r2_a:.4f}")

trained_models_aro["Linear Regression"] = lin_aro

LinearReg (Valence): MAE=0.2829 RMSE=0.3615 R2=-4.5849
LinearReg (Arousal): MAE=0.2956 RMSE=0.3784 R2=-3.2232


In [21]:
results_quad = []

common_models = sorted(set(trained_models_val.keys()) & set(trained_models_aro.keys()))

for name in common_models:
    model_val = trained_models_val[name]
    model_aro = trained_models_aro[name]

    v_pred = model_val.predict(X_test_val)
    a_pred = model_aro.predict(X_test_aro)

    y_pred_quad = quadrant_vec(v_pred, a_pred, thr=0.5)

    acc = accuracy_score(y_true_quad, y_pred_quad)

    # weighted metrics
    f1 = f1_score(y_true_quad, y_pred_quad, average="weighted")
    prec, rec, _, _ = precision_recall_fscore_support(
        y_true_quad,
        y_pred_quad,
        average="weighted",
        zero_division=0
    )

    results_quad.append({
        "Model": name,
        "Accuracy": float(acc),
        "Precision": float(prec),
        "Recall": float(rec),
        "F1": float(f1),
    })

results_quad_df = (
    pd.DataFrame(results_quad)
    .sort_values("F1", ascending=False)
    .reset_index(drop=True)
)

display(results_quad_df)

Unnamed: 0,Model,Accuracy,Precision,Recall,F1
0,k-Nearest Neighbors,0.733766,0.719599,0.733766,0.722466
1,SVR,0.707792,0.720119,0.707792,0.71137
2,XGBoost,0.714286,0.704842,0.714286,0.708026
3,Ridge,0.714286,0.702381,0.714286,0.704919
4,Random Forest,0.707792,0.705535,0.707792,0.703739
5,Linear Regression,0.454545,0.589375,0.454545,0.498079


In [23]:
results_quad_df.to_csv('PMEmo_FUSION_classification_from_regression_weighted_final.csv', index=False)