# fastFMのclassifierを使ってみる

In [1]:
from pathlib import Path
import numpy as np
import sys,os
sys.path.append(os.pardir)
from tools.preprocess.common import CommonPreprocessor
from tools.preprocess.frequency_vectorizer import FrequencyVectorizer
from tools.model.fm_bpr import FMBPR

In [2]:
orignal_path=Path("../data/original.txt")

In [3]:
cp=CommonPreprocessor(rare_capo_list=['-6', '-7'],val_rate=0.1,test_rate=0.2,split_seed=0)
songs=cp.get_song_list(orignal_path)
songs=cp.remove_rare_capo_song(songs)
songs_train, songs_valid, songs_test=cp.split_dataset(songs)

In [4]:
chord_stat=cp.retrieve_chord_stat(songs_train)

In [5]:
chords_list_train = [song["chords"] for song in songs_train]
chords_list_valid = [song["chords"] for song in songs_valid]
chords_list_test = [song["chords"] for song in songs_test]

In [6]:
fv=FrequencyVectorizer(chord_stat, threshold=5, token="<UNK>")

In [7]:
X_train=fv.get_chord_features(chords_list_train)
X_valid =fv.get_chord_features(chords_list_valid)
X_test=fv.get_chord_features(chords_list_test)
y_train=[song["rec_capo"] for song in songs_train]
y_valid=[song["rec_capo"] for song in songs_valid]
y_test=[song["rec_capo"] for song in songs_test]

In [8]:
from sklearn.metrics import accuracy_score
from fastFM import als
from scipy import sparse
import numpy as np
from sklearn.preprocessing import LabelBinarizer
from numpy.random import randint


class FMClassifier:
    def __init__(self, n_iter, rank,dupulication_capo_order):
        self.model = als.FMClassification(n_iter=n_iter, rank=rank)
        self.dupulication_capo_order = dupulication_capo_order
        self.capo_encoder = None

    def fit(self, X, y):
        """
        :param X: np.arrayでchord featureのみ。複製前.
        :param y: recomennded capo. 文字列. len(X)=len(y)
        """
        print("Preprocessing features...")
        X_dup = self.duplicate_add_capo_features(X)
        print("Creating labels...")
        labels = self.create_labels(y)
        print("Sampling labels...")
        X_sampled, y_sampled= self.sample_dataset(X_dup,labels)
        print("sparse")
        X_sp = sparse.csr_matrix(X_sampled)
        print("Start training...")
        return self.model.fit(X_sp, y_sampled)
                       
    def sample_dataset(self,X_dup,labels):
        X_dup_pos=X_dup[labels==1]
        X_dup_neg=X_dup[labels==-1]
        diff=randint(0,len(self.dupulication_capo_order)-1,len(X_dup_neg)//(len(self.dupulication_capo_order)-1))
        sampled_index=np.array([(len(self.dupulication_capo_order)-1)*i for i in range(len(X_dup_neg)//(len(self.dupulication_capo_order)-1))])+diff
        print("sampling")
        X_dup_neg_sampled=X_dup_neg[sampled_index]
        assert len(X_dup_neg_sampled)==len(X_dup_pos)
        print("concat")
        X_sampled = np.concatenate([X_dup_pos, X_dup_neg_sampled])
        y_sampled=np.concatenate([np.ones(len(X_dup_pos)),-np.ones(len(X_dup_neg_sampled))])
        assert len(X_sampled)==len(y_sampled)
        print(X_sampled.shape,y_sampled.shape)
        print("shuffle")
        return self.shuffle_samples(X_sampled,y_sampled)
    
    def shuffle_samples(self,X, y):
        zipped = list(zip(X, y))
        np.random.shuffle(zipped)
        X_result, y_result = zip(*zipped)
        return np.asarray(X_result), np.asarray(y_result)

    def predict(self, X):
        """
        :param X: np.arrayでchord featureのみ。複製前.
        :param y: recomennded capo. 文字列. len(X)=len(y)
         :return e.g. np.array([["+1", "-4", "0",...],["-1", "-3", "+1",...],...])
        """
        rankings = self.predict_ranking(X)
        return np.array([ranking[0] for ranking in rankings])

    def predict_ranking(self, X):
        """入力した曲に対しておすすめcapoのランキングを返す.

         :param X: np.arrayでchord featureのみ。複製前.
         :return e.g. np.array(["+1", "-4", "0",...])
        """
        X_dup = self.duplicate_add_capo_features(X)
        X_sp = sparse.csr_matrix(X_dup)
        pred = self.model.predict(X_sp)
        reshaped = np.reshape(pred, (-1, len(self.dupulication_capo_order)))
        results = []
        for i in range(len(reshaped)):
            results.append(self.return_sorted_capo(reshaped[i]))
        return results

    def get_all_embeddings(self):
        """embeddingを全てまとめて取得"""
        return self.model.V_.T

    def get_embeddings(self):
        """学習結果としてchordとcapoのembeddingを取得"""
        embeddings = self.get_all_embeddings()
        chord_embeddings = embeddings[:-len(self.dupulication_capo_order), :]
        capo_embeddings = embeddings[-len(self.dupulication_capo_order):, :]
        return chord_embeddings, capo_embeddings

    def create_labels(self, y):
        """おすすめcapo=yを用いてlabelsを作成."""
        labels = []
        for rec in y:
            pos_j = int(np.where(self.dupulication_capo_order == rec)[0])
            for j in range(len(self.dupulication_capo_order)):
                if j != pos_j:
                    labels.append(-1)
                else:
                    labels.append(1)
        return np.array(labels)

    def return_sorted_capo(self, pred):
        """予測したscoreを元にrankingになるようにsortする."""
        index = [0] * len(self.dupulication_capo_order)
        for j, rank in enumerate(np.argsort(pred[::-1])):
            index[rank] = j
        return self.dupulication_capo_order[index]

    def evaluate_top1(self, X, y):
        """
        :param X: np.arrayでchord featureのみ。複製前.
        :param y: recomennded capo. 文字列. len(X)=len(y)
        """
        pred = self.predict(X)
        return accuracy_score(y, pred)

    def capo_onehot_encode(self, duplicated_capos):
        """文字列のcapoをonehot encoding"""
        if self.capo_encoder is None:
            self.capo_encoder = LabelBinarizer()
            return self.capo_encoder.fit_transform(duplicated_capos)
        else:
            return self.capo_encoder.transform(duplicated_capos)

    def duplicate_capos(self, n):
        """n回dupulication_capo_orderを繰り返す."""
        return np.tile(self.dupulication_capo_order, n)

    def duplicate_add_capo_features(self, X):
        """
        :param X: np.arrayでchord featureのみ。複製前.
        """
        expanded_chord_features = X.repeat(len(self.dupulication_capo_order), axis=0)
        duplicated_capos = self.duplicate_capos(len(X))
        capo_features = self.capo_onehot_encode(duplicated_capos)
        return np.concatenate([expanded_chord_features, capo_features], 1)


In [None]:
model=FMClassifier(n_iter=10000, rank=10, dupulication_capo_order=np.array(['1','0','-1','-2','-3','-4','-5']))

In [None]:
model.fit(X_train,y_train)

Preprocessing features...
Creating labels...
Sampling labels...
sampling
concat
(71990, 698) (71990,)
shuffle
sparse


In [None]:
rankings=model.predict_ranking(X_valid)
rankings[:10]

In [None]:
pred_top1= model.predict(X_valid)
pred_top1[:10]

In [None]:
model.evaluate_top1(X_valid,y_valid)

In [None]:
chord_embeddings, capo_embeddings=model.get_embeddings()

In [None]:
chord_embeddings.shape,capo_embeddings.shape