Import necessary modules and define functions

In [1]:
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import numpy as np
import pandas as pd
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from transformers import AdamW, get_cosine_schedule_with_warmup, BertTokenizer, BertModel
from collections import Counter
from torch.nn.utils.rnn import pad_sequence
from transformers import AutoTokenizer
import re
import io 
from nltk.corpus import stopwords 
from nltk.tokenize import word_tokenize 

In [2]:
def del_bracket(s):
  pattern = r'\([^)]*\)'  # ()
  s = re.sub(pattern=pattern, repl='', string=s)

  pattern = r'\[[^)]*\]'  # []
  s = re.sub(pattern=pattern, repl='', string=s)

  pattern = r'\<[^)]*\>'  # <>
  s = re.sub(pattern=pattern, repl='', string=s)

  pattern = r'\{[^)]*\}'  # {}
  s = re.sub(pattern=pattern, repl='', string=s)

  return s

def del_special_num(s):
  pattern = r'[^a-zA-Z가-힣]'
  s = re.sub(pattern=pattern, repl=' ', string=s)

  return s

def del_unit(s):
  units = ['mm', 'cm', 'km', 'ml', 'kg', 'g']
  for unit in units:
    s = s.lower() # 대문자를 소문자로 변환
    s = s.replace(unit, '')
  return s

def del_whitespace(s):
  return " ".join(s.split())
  
def del_stopwords(s):
  stopwords = open("data/stopwords.txt", 'r', encoding="utf-8").read().split()
  #print(stopwords)
  s_o=s.split()
  s_f=[]
  for w in s_o:
    if w.strip() not in stopwords:
      s_f.append(w.strip())
  return " ".join(s_f)

In [3]:
# importing model
modelname = "klue/bert-base" 
model_path = 'models/fin_model_2.pt'  # replace with your actual path
max_length = 64
num_classes = 7

# Use cuda if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

#BERT 모델 불러오기
bertmodel = BertModel.from_pretrained(modelname)

class BERTClassifier(nn.Module):
    def __init__(self,
                 bert,
                 hidden_size = 768,
                 num_classes=num_classes,
                 dr_rate=None,
                 params=None):
        super(BERTClassifier, self).__init__()
        self.bert = bert
        self.dr_rate = dr_rate
        self.classifier = nn.Linear(hidden_size , num_classes)
        if dr_rate:
            self.dropout = nn.Dropout(p=dr_rate)

    def gen_attention_mask(self, token_ids, valid_length):
        attention_mask = torch.zeros_like(token_ids)
        for i, v in enumerate(valid_length):
            attention_mask[i][:v] = 1
        return attention_mask.float()

    def forward(self, token_ids, valid_length, segment_ids, attention_mask):
        outputs = self.bert(input_ids = token_ids, token_type_ids = segment_ids.long(), attention_mask = attention_mask.float().to(token_ids.device))
        pooler = outputs[1]
        if self.dr_rate:
            out = self.dropout(pooler)
        return self.classifier(out)

cpu


Some weights of the model checkpoint at klue/bert-base were not used when initializing BertModel: ['cls.predictions.decoder.weight', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.transform.dense.bias', 'cls.seq_relationship.bias', 'cls.seq_relationship.weight', 'cls.predictions.decoder.bias', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.bias']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [4]:
# assuming you have a function for tokenization
tokenizer = AutoTokenizer.from_pretrained(modelname)

# Define a function to get the valid_length, attention_mask and segment_ids
def get_inputs(tokens):
    tokens = ['[CLS]'] + tokens + ['[SEP]']
    valid_length = len(tokens)
    segment_ids = [0]*valid_length
    attention_mask = [1]*valid_length

    # Pad up to max length
    if valid_length < max_length:
        pad_length = max_length - valid_length
        tokens.extend(['[PAD]' for _ in range(pad_length)])
        attention_mask.extend([0]*pad_length)
        segment_ids.extend([0]*pad_length)

    # Convert tokens to IDs
    token_ids = tokenizer.convert_tokens_to_ids(tokens)

    return torch.tensor([token_ids], dtype=torch.long), torch.tensor([valid_length], dtype=torch.long), torch.tensor([segment_ids], dtype=torch.long), torch.tensor([attention_mask], dtype=torch.long)

# assuming you have a function for pre-processing
def preprocess(text):
    for t in text:
        t=del_bracket(t)
        t=del_special_num(t)
        t=del_whitespace(t)
        t=del_stopwords(t)
    return text.lower()

def temperature_scaled_softmax(output, temperature=1.0):
    # Apply temperature scaling on logits
    output = output / temperature

    # Then apply softmax to convert to probabilities
    probabilities = F.softmax(output, dim=-1)

    return probabilities

# Load the model
if torch.cuda.is_available():
    model = torch.load(model_path)
elif not torch.cuda.is_available():
    model = torch.load(model_path, map_location=torch.device('cpu'))

# Switch to eval mode
model.eval()

# Load the model and move to the GPU if available
model.to(device)

BERTClassifier(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(32000, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_af

## Extracting Sentiment Vector

Get input and compute sentiment vector

In [19]:
# get input text
text = input("Input the diary contents: ")

# Preprocess and tokenize the text
tokens = tokenizer.tokenize(preprocess(text))
if len(tokens) > max_length-2: # Account for [CLS] and [SEP]
    tokens = tokens[:max_length-2]

# Get inputs
token_ids, valid_length, segment_ids, attention_mask = get_inputs(tokens)

# Move all your tensors to the same device as your model
token_ids = token_ids.to(device)
valid_length = valid_length.to(device)
segment_ids = segment_ids.to(device)
attention_mask = attention_mask.to(device)

# Ensure no gradient is calculated
with torch.no_grad():
    sentiment_vector = model(token_ids, valid_length, segment_ids, attention_mask)

print("Request successful")
print(sentiment_vector)

# Get the output from your model
output = sentiment_vector

# Apply temperature-scaled softmax to convert output to probabilities
probabilities = temperature_scaled_softmax(output, temperature=5.0) # Increase temperature to make distribution more uniform

emotions = ['중립 ', '기쁨을', '불안을', '슬픔을', '분노를', '상처를', '혐오를'] 

# Get the indices that would sort the probability tensor
sorted_indices = torch.argsort(probabilities, dim=-1, descending=True)

primary_emotion_idx = None
secondary_emotion_idx = None
for idx in sorted_indices[0]:
    if emotions[idx.item()] != '중립 ':
        if primary_emotion_idx is None:
            primary_emotion_idx = idx.item()
        elif secondary_emotion_idx is None:
            secondary_emotion_idx = idx.item()
            break

primary_emotion = emotions[primary_emotion_idx]
primary_emotion_probability = probabilities[0][primary_emotion_idx].item()

secondary_emotion = emotions[secondary_emotion_idx]
secondary_emotion_probability = probabilities[0][secondary_emotion_idx].item()

print(f"당신은 지금 {primary_emotion} 느끼고 있네요.")
print(f"주 감정: {primary_emotion_probability * 100:.2f}% {primary_emotion[:-1]}, 부 감정: {secondary_emotion_probability * 100:.2f}% {secondary_emotion[:-1]}")

Request successful
tensor([[ 9.1251, -1.2937,  0.3767, -1.5946, -1.6430, -2.4702, -1.4495]],
       device='cuda:0')
당신은 지금 불안을 느끼고 있네요.
주 감정: 9.93% 불안, 부 감정: 7.11% 기쁨


## Make music library with lyrics

In [5]:
import pandas as pd
from sklearn.metrics.pairwise import cosine_similarity
import ast
import json

def process_lyrics(lyrics, tokenizer, model):
    # Preprocess and tokenize the lyrics
    tokens = tokenizer.tokenize(preprocess(lyrics))
    if len(tokens) > max_length-2: # Account for [CLS] and [SEP]
        tokens = tokens[:max_length-2]

    # Get inputs
    token_ids, valid_length, segment_ids, attention_mask = get_inputs(tokens)

    # Move all your tensors to the same device as your model
    token_ids = token_ids.to(device)
    valid_length = valid_length.to(device)
    segment_ids = segment_ids.to(device)
    attention_mask = attention_mask.to(device)

    # Ensure no gradient is calculated
    with torch.no_grad():
        sentiment_vector = model(token_ids, valid_length, segment_ids, attention_mask)

    return sentiment_vector.detach().cpu().numpy()[0]  # Return as a 1-D numpy array

# Function to compute cosine similarities and retrieve the top 10 songs
def recommend_songs(diary_text):
    # Process the diary text
    diary_vector = process_lyrics(diary_text, tokenizer, model)

    # Compute cosine similarities
    similarities = cosine_similarity([diary_vector], df_music['sentiment_vector'].to_list())

    # Get the top 10 song indices
    top_10_indices = similarities[0].argsort()[-10:][::-1]

    # Return the corresponding songs
    return df_music.iloc[top_10_indices]

# Function to print out the recommended songs
def print_recommended_songs(diary_text):
    recommended_songs = recommend_songs(diary_text)
    print("Top 10 similar songs:\n")
    print("Rank\tSimilarity\tSong Name - Artist")
    for i, song in enumerate(recommended_songs.iterrows(), start=1):
        index, data = song
        similarity = cosine_similarity([process_lyrics(diary_text, tokenizer, model)], [data['sentiment_vector']])[0][0]
        print(f"{i}st\t{similarity*100:.2f}% similar\t{data['title']} - {data['artist']}")

In [6]:
# Load the music data
df_music = pd.read_csv('library/music_library_model2.csv')

# Convert the strings back to arrays
df_music['sentiment_vector'] = df_music['sentiment_vector'].apply(lambda x: np.array(ast.literal_eval(x)))

diary_text = input("Input the diary contents: ")

print_recommended_songs(diary_text)

Top 10 similar songs:

Rank	Similarity	Song Name - Artist
1st	99.23% similar	이 또한 지나 가리라 - 임재범
2st	98.31% similar	멸망 - 천진우
3st	98.22% similar	상경 - 이솔로몬
4st	97.93% similar	늦은 산책 - 서이경
5st	97.91% similar	떠나 - 유다은
6st	97.52% similar	가족사진 - 김진호 (SG워너비)
7st	97.37% similar	빠이 - 오왠 (O.WHEN)
8st	97.32% similar	aya - 오왠 (O.WHEN)
9st	97.31% similar	G.O.A.T (Greatest Of All Time) - 키 (KEY)
10st	97.16% similar	그런 날 - 곽진언


Making library

In [21]:
# Load the music data
music_dir = "library/music_library.csv"
df_music = pd.read_csv(music_dir)

# Process all songs and add a new column for the sentiment vectors
df_music['sentiment_vector'] = df_music['lyrics'].apply(lambda x: process_lyrics(x, tokenizer, model))

# Convert the sentiment vectors to JSON strings before saving
df_music['sentiment_vector'] = df_music['sentiment_vector'].apply(lambda x: json.dumps([float(i) for i in x]))

# Save to a new CSV file
library_name = music_dir.split('/')[1][:-4]
df_music.to_csv(f"./library/{library_name}_model2.csv", index=False)

Token indices sequence length is longer than the specified maximum sequence length for this model (624 > 512). Running this sequence through the model will result in indexing errors
