### Classifier
- similarity score 1: diary textual embedding - lyrics embedding
- similarity score 2: lyrics emotion score - diary emotion score
- supplementary score: music feature (0,1) - diary emotion score(→0,1 mapped) if same ⇒ bonus otherwise -)
- total score = similarity score 1+ similarity score 2 - supplmentary score

In [None]:
import pinecone, torch, torch.nn as nn
from transformers import RobertaTokenizer, RobertaModel, RobertaConfig, pipeline

# Redefine classifier layer
# source code: https://github.com/huggingface/transformers/blob/84ea427f460ffc8d2ddc08a341ccda076c24fc1f/src/transformers/models/roberta/modeling_roberta.py#L1443
# config: https://huggingface.co/michellejieli/emotion_text_classifier/blob/main/config.json

class RobertaClassificationHead(nn.Module):
    """Head for sentence-level classification tasks."""

    def __init__(self, config):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        classifier_dropout = (
            config.classifier_dropout if config.classifier_dropout is not None else config.hidden_dropout_prob
        )
        self.dropout = nn.Dropout(classifier_dropout)
        self.out_proj = nn.Linear(config.hidden_size, 28) # config.num_labels

    def forward(self, features, **kwargs):
        # x = features[:, 0, :]  # take <s> token (equiv. to [CLS])
        x = features
        x = self.dropout(x)
        x = self.dense(x)
        x = torch.tanh(x)
        x = self.dropout(x)
        x = self.out_proj(x)
        return x

In [None]:
class Classifier:
    def __init__(self, lyrics_db_index_name: str, features_db_index_name):

        # db
        API_KEY = "12dfbe87-05b3-4243-bb22-e69d329f18ed"
        ENVIRONMENT = "gcp-starter"
        pinecone.init(api_key = API_KEY, environment = ENVIRONMENT)
        self.pinecone_db_lyrics = pinecone.Index(lyrics_db_index_name) # Access the data through 'pinecone_db' object
        self.pinecone_db_features = pinecone.Index(features_db_index_name)

        # vars
        self.total_score = 0.0
    
    def __call__(self, diary_text: str):        
        
        # LM
        config = RobertaConfig.from_pretrained("j-hartmann/emotion-english-distilroberta-base")
        config.output_hidden_states = True
        config.num_labels = 7
        tok = RobertaTokenizer.from_pretrained("j-hartmann/emotion-english-distilroberta-base")
        model = RobertaModel.from_pretrained("j-hartmann/emotion-english-distilroberta-base", config=config)
        
        # extract embedding
        diary_text = torch.tensor([tok.encode(diary_text)])
        self.diary_embedding = model(diary_text).pooler_output.squeeze()

        # get emotion score
        self.classifier = RobertaClassificationHead(config)
        self.diary_emotion_scores = self.clasifier(self.diary_embedding)

        # self.diary_emotion_scores = {}
        # self.diary_emotion_scores['emo'] = config.id2label[int(emotion_scores.argmax())]

        # for i in range(emotion_scores):
        #     self.diary_emotion_scores[i] = emotion_scores[i]

    def compute_scores(self, max_k=100, include_values = True):
        s1_vecs, s1_scores, selected_ids = self.compute_s1(max_k, include_values)
        lyrics_emotion_scores, s2_scores = self.compute_s2(s1_vecs)

        s1s2_scores = torch.mul(s1_scores, s2_scores) # elementwise multiplcation # 100
        ss = self.compute_ss(self, selected_ids, include_values)
        
        total_score = torch.mul(s1s2_scores, ss)

        return total_score, selected_ids


    def compute_s1(self, max_k = 100, include_values = True):
        '''
        compute similarity score (1) between diary textual embedding and lyrics embedding.
        The dimension of both embeddings is 768.
        '''

        # Retrieve
        s1 = self.pinecone_db_lyrics.query(
        vector=self.diary_embedding,
        top_k=max_k,
        include_values = include_values
        )['matches'] # [{id,score,value}, {id,score,value}...]

        s1_vecs = {} # {id1: value, id2: value, ...}
        s1_scores = [] # {id1: s1_score, id2: s1_score, ...}
        selected_ids = []

        for element in s1:
            s1_vecs[element['id']] = torch.tensor(element['value'])
            s1_scores.append(torch.tensor(element['score']))
            selected_ids.append(element['id'])

        s1_scores = torch.stack(s1_scores, axis = 1) # 1 x 100

        return s1_vecs, s1_scores, selected_ids
        
    def compute_s2(self, s1_vecs):
        '''
        compute similarity score (2) between diary emotion score and lyrics emotion score.
        The dimension of both scores is 7.
        '''
        
        lyrics_emotion_scores = []

        for i in s1_vecs.keys():
            lyrics_emotion_scores.append(self.classifier(s1_vecs[i]))
        
        lyrics_emotion_scores = torch.stack(lyrics_emotion_scores, axis = 0)
        s2_scores = torch.dot(self.diary_emotion_score.unsqueeze(0), lyrics_emotion_scores.transpose()) # 1 x 28, 28 x 100 => 1 x 100
        
        return lyrics_emotion_scores, s2_scores

    def compute_ss(self, selected_ids, include_values):
        '''
        project the music and lyrics featues to binary emotion classes and compute supplementary scores
        '''

        scores_map = [0,0,0]
        for i, score in enumerate(self.diary_emotion_scores):
            if i in [0,1,2,5]:
                scores_map[0] += score
            elif i in [3]:
                scores_map[1] += score
            else:
                scores_map[2] += score

        diary_label = int(scores_map.argmax())
        diary_score = scores_map[diary_label]

        # Retrieve
        ss = [] # 100

        for id in selected_ids:
            ss = self.pinecone_db_features.query(
                id = id,
                include_values = include_values
            )['matches'] # [{id,score,value}]
            
            label = ss[0]['value'][0]  # label(0/1/2) : -/+/.

            if label in [0,1,2,5]: # anger, disgust, fear, sadness
                label = 0
            elif label in [3]: # joy
                label = 1
            else: # neutral, surprise
                label = 2
            
            s = 1 if diary_label == label else -1
            ss.append(s*diary_score)
            
        ss = torch.tensor(ss)        

        return ss
    
    def return_topk_music(self, scores, selected_ids, top_k = 5):
        idxs = torch.argmax(scores, top_n = top_k).to_numpy()
        topk_music_ids = selected_ids[idxs]
        
        return topk_music_ids

In [None]:
# Example
lyrics_db_index_name = 'your lyrics db index name'
features_db_index_name = 'your features db index name'

classifier = Classifier(lyrics_db_index_name, features_db_index_name)
classifier('I mean I just needed a little sympathy, but no one gave me a single comfort word.')
scores, selected_ids = classifier.compute_scores()
topk_music_ids = classifier.return_topk_music(scores, selected_ids, top_k = 10) 

# You can retrieve the music from the pinecone db by using index afterwards.