In [None]:
#!pip install scikit-learn

import pandas as pd
from sklearn.preprocessing import RobustScaler, StandardScaler, MinMaxScaler
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import os
from keybert import KeyBERT
from sentence_transformers import SentenceTransformer

In [None]:
# 가사 데이터 추가된 최종 데이터셋(tracks_with_lyrics)이 완성되면 바꾸기
#songs_df = pd.read_csv('raw_data_songs/tracks_with_lyrics.csv')
songs_df = pd.read_csv('raw_data_songs/tracks_raw.csv')

In [None]:
# 필요한 column 있으면 추가하기
# 가사 데이터 수집되면 'lyrics' 추가
# 'uri'는 웹 플레이어 구현할 떄 필요할 듯?
songs_df = songs_df[['track_name', 'artist_name', 'valence', 'tempo', 'energy', 'loudness', 'danceability', 'uri', 'track_id', 'lyrics']]
songs_df.head()

In [None]:
songs_df.count()

In [None]:
songs_df.dropna(subset=['lyrics'], inplace=True)
songs_df.dropna(subset=['valence'], inplace=True)
songs_df.dropna(subset=['uri'], inplace=True)
songs_df.count()

In [None]:
# 분포 그려보니까 바로 MinMax 쓰면 너무 한쪽으로 skewed됨
# 0~1로 정규화안되있는 'tempo', 'loudness' 이상치 처리
def remove_outliers(df, column):
    Q1 = df[column].quantile(0.25)
    Q3 = df[column].quantile(0.75)
    IQR = Q3 - Q1
    return df[~((df[column] < (Q1 - 1.5 * IQR)) | (df[column] > (Q3 + 1.5 * IQR)))]


songs_df = remove_outliers(songs_df, 'tempo')
songs_df = remove_outliers(songs_df, 'loudness')


In [None]:
#pre_scaler = StandardScaler()
#pre_scaler = RobustScaler()
scaler = MinMaxScaler(feature_range=(-1, 1))
songs_df[['valence', 'tempo', 'energy', 'loudness', 'danceability']] = scaler.fit_transform(songs_df[['valence', 'tempo', 'energy', 'loudness', 'danceability']])
songs_df.head()

In [None]:
# 분포 확인
columns_to_plot = ['valence', 'tempo', 'energy', 'loudness', 'danceability']

plt.figure(figsize=(20, 15))
for i, column in enumerate(columns_to_plot, 1):
    plt.subplot(3, 2, i)
    sns.histplot(songs_df[column], kde=True)
    plt.title(f'Distribution of {column.capitalize()}')
    plt.xlabel(column.capitalize())
    plt.ylabel('Frequency')

plt.tight_layout()
plt.show()

In [None]:
# 가중치 설정
weights = {
    'tempo': 0.60,
    'energy': 0.15,
    'loudness': 0.15,
    'danceability': 0.10
}

# Calculate 'arousal' value
songs_df['arousal'] = (
    songs_df['tempo'] * weights['tempo'] +
    songs_df['energy'] * weights['energy'] +
    songs_df['loudness'] * weights['loudness'] +
    songs_df['danceability'] * weights['danceability']
)

songs_df.head()


In [None]:
def classify_emotion(songs_df):
    def get_emotion(row):
        if row['valence'] > 0 and row['arousal'] >= 0:
            return '1'
        elif row['valence'] >= 0 and row['arousal'] < 0:
            return '4'
        elif row['valence'] <= 0 and row['arousal'] > 0:
            return '2'
        elif row['valence'] < 0 and row['arousal'] <= 0:
            return '3'
        else:
            return None

    songs_df['emotion'] = songs_df.apply(get_emotion, axis=1)
    return songs_df 


In [None]:
# Define the function to calculate the scalar value and categorize it
# 그냥 임의로 IQR로 설정했는데 나중에 바꾸던가 사용하지 않아도 될 듯
def classify_intensity(songs_df):
    songs_df['scalar_value'] = np.sqrt(songs_df['valence']**2 + songs_df['arousal']**2)
    
    Q1 = songs_df['scalar_value'].quantile(0.25)
    Q2 = songs_df['scalar_value'].quantile(0.50)
    Q3 = songs_df['scalar_value'].quantile(0.75)

    def get_intensity(scalar_value):
        if scalar_value <= Q1:
            return 'neutral'
        elif Q1 < scalar_value <= Q2:
            return 'low'
        elif Q2 < scalar_value <= Q3:
            return 'medium'
        else:
            return 'high'
        
    songs_df['intensity'] = songs_df['scalar_value'].apply(get_intensity)
    
    # Drop the temporary 'scalar_value' column if not needed
    songs_df = songs_df.drop(columns=['scalar_value'])
    
    return songs_df

# Apply the function to categorize intensity
songs_df = classify_intensity(songs_df)

In [None]:
# Apply the function to classify_emotion
songs_df = classify_emotion(songs_df)
# Apply the function to classify_intensity
songs_df = classify_intensity(songs_df)
songs_df.head()

In [None]:
# 'lylics' 추가
songs_df = songs_df[['track_name', 'artist_name', 'valence', 'arousal', 'emotion', 'intensity', 'lyrics', 'uri', 'track_id']]
songs_df.head()

In [None]:
songs_df.info()

In [None]:
import re

# lyrics 전처리 추가

# lyrics 정리하는 함수
def clean_lyrics(lyrics):
    # Remove sections like [verse], [bridge], etc.
    lyrics = re.sub(r'\[.*?\]', '', lyrics)
    # Remove contributor information and other extraneous text
    lyrics = re.sub(r'\d+ Contributors.*$', '', lyrics, flags=re.MULTILINE)
    # Remove leading and trailing whitespace
    lyrics = lyrics.strip()
    return lyrics

# 'lyrics' 전처리해서 데이터 교체
songs_df['lyrics'] = songs_df['lyrics'].apply(clean_lyrics)

# 'lyrics' 칼럼의 데이터를 모두 문자열로 변환
songs_df['lyrics'] = songs_df['lyrics'].astype(str)


# 가사 5000자 이상 노래 삭제 
songs_df = songs_df[songs_df['lyrics'].str.len() <= 5000]

In [None]:
songs_df.info()

In [None]:
songs_df.to_csv('tracks_preprocessed.csv')

In [None]:
# KeyBERT로 lyrics 키워드 뽑아서 새로운 칼럼으로 만들어놔야함

# 텍스트에서 키워드를 추출하는 함수
def extract_keywords(text, model):
    kw_model = KeyBERT(model)
    keywords = kw_model.extract_keywords(text, keyphrase_ngram_range=(1, 1), stop_words=None, top_n=10)
    return [kw[0] for kw in keywords]

def main(lyrics_path, output_path):
    # 데이터셋 로드
    lyrics_df = pd.read_csv(lyrics_path)

    # 'lyrics' 컬럼의 NaN 값을 빈 문자열로 대체
    lyrics_df['lyrics'] = lyrics_df['lyrics'].fillna('')

    # SBERT 모델 로드
    sbert_model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
    
    # 'lyrics'에서 키워드를 추출하여 'lyrics_keyword' 컬럼에 저장
    lyrics_df['lyrics_keyword'] = lyrics_df['lyrics'].apply(lambda x: ', '.join(extract_keywords(x, sbert_model)) if isinstance(x, str) else '')

    # 각 키워드의 임베딩 벡터 계산
    def compute_embedding(keywords, model):
        keywords_list = keywords.split(', ')
        embeddings = model.encode(keywords_list)
        return embeddings

    # 'keyword_embedding' 컬럼에 각 키워드의 임베딩 벡터 저장
    lyrics_df['keyword_embedding'] = lyrics_df['lyrics_keyword'].apply(lambda x: compute_embedding(x, sbert_model))
        
    # 수정된 데이터셋 저장
    lyrics_df.to_csv(output_path, index=False)

In [None]:
if __name__ == "__main__":
    lyrics_path = os.path.join('tracks_preprocessed.csv')  # 원본 데이터셋 경로 설정
    output_path = os.path.join('tracks_final.csv')  # 수정된 데이터셋 저장 경로 설정
    main(lyrics_path, output_path)

In [None]:
import plotly.express as px

fig = px.scatter(songs_df, x='valence', y='arousal', hover_data=['track_name', 'artist_name'],
                 title='Scatter Plot of Valence vs Arousal')

fig.update_layout(
    xaxis=dict(scaleanchor="y", scaleratio=1),
    yaxis=dict(scaleanchor="x", scaleratio=1),
    width=800,
    height=600
)

# Show the plot
fig.show()