In [1]:
import pandas as pd
from sklearn.model_selection import train_test_split
import pickle
from typing import Dict, List, Set, Union
import numpy as np
import pandas as pd
from tqdm import tqdm
from sentence_transformers import SentenceTransformer, util
import catboost as cb
from sklearn.metrics import classification_report

In [2]:
data_path = './train.csv'
knn_weights_path = './knn_weights'
catboost_weights_path = './catboost_weights'

In [3]:
X_test = pd.read_csv(data_path)

if 'isCover' in X_test.columns:
    y_test = X_test['isCover']
    X_test.drop('isCover', axis=1, inplace=True)

In [4]:
class KNNOnEmbeddings:
    def __init__(self, model, ):
        super().__init__()
        self.model = model
        self.embeddings = []
        self.embedding_id_to_track_id = []
        self.track_info = []
        self.text_field = 'text'
        
    def _parse_track(self, track_text: str) -> Set[str]:
        return set([line for line in track_text.split('\n') if line])
    
    def _get_line_embedding(self, line: str) -> np.ndarray:
        return self.model.encode(line)
    
    def __len__(self):
        return len(self.track_info)
    
    def _get_track_embeddings(self, track_text: str):
        parsed_text = self._parse_track(track_text)
        
        embeddings = []
        for line in parsed_text:
            embedding = self._get_line_embedding(line)
            embeddings.append(embedding)
            
        return embeddings
                
    def append(self, track: Dict[str, Union[str, int, bool, None]]) -> None:
        self.track_info.append(track)
        embeddings = self._get_track_embeddings(track[self.text_field])
        self.embeddings += embeddings
        self.embedding_id_to_track_id += [len(self.track_info) - 1, ] * len(embeddings)
        
    def make_from_dataframe(self, tracks: pd.DataFrame) -> None:
        for _, track in tqdm(tracks.iterrows()):
            self.append(dict(track))
            
    def dump(self, file_path: str) -> None:
        with open(file_path, 'wb') as file:
            pickle.dump(self, file)
    
    @staticmethod
    def load(file_path: str) -> None:
        with open(file_path, 'rb') as file:
            obj = pickle.load(file)
        
        return obj
    
    def _from_embedding_id_to_track_id(self, embedding_ids: List[int]) -> List[int]:
        return [self.embedding_id_to_track_id[embedding_id] for embedding_id in embedding_ids]
            
    def _find_k_nearest_neigbors_by_score(self, scores: List[float], k: int = 3):
        ids = np.argpartition(scores, -k)[-k:]
        return list(ids), scores[ids]
        
    def _find_k_nearest_neigbors_by_embeddings(self, embeddings: np.ndarray, k: int = 3):
        cos_sims = util.cos_sim(embeddings, knn.embeddings).numpy()
        scores = []
        for line_cos_sim in cos_sims:
            scores.append(self._find_k_nearest_neigbors_by_score(line_cos_sim, k=k))
        return scores
 
    def find_k_nearest_neighbors(self, track: Dict[str, Union[str, int, bool, None]], k: int = 3):
        embeddings = self._get_track_embeddings(track[self.text_field])
        
        ids_scores = self._find_k_nearest_neigbors_by_embeddings(embeddings, k=k)
        
        ids = []
        scores = []
        for ids_, scores_ in ids_scores:
            scores_ /= scores_.sum()
            
            ids += self._from_embedding_id_to_track_id(ids_)
            scores += list(scores_)
        
        ids_scores = sorted(zip(ids, scores))
        ids_agg_scores = []
        agg_score = 0.
        for i, (idx, score) in enumerate(ids_scores):
            if not i or idx == ids_scores[i - 1][0]:
                agg_score += score
            else:
                ids_agg_scores.append((ids_scores[i - 1][0], agg_score))
                agg_score = score
        
        ids_agg_scores.append((ids_scores[-1][0], agg_score))
        ids_agg_scores = np.array(ids_agg_scores)
                
        top_k_ids, _ = self._find_k_nearest_neigbors_by_score(ids_agg_scores[:, 1], k=k)
        top_k = ids_agg_scores[top_k_ids, :]
        top_k = top_k[::-1]
        top_k[:, 0] = top_k[:, 0].astype(int)
        return top_k
    
    def find_k_nearest_tracks(self, track: Dict[str, Union[str, int, bool, None]], fields: List[str], k: int = 3):
        if isinstance(track.get(self.text_field, 0.), str):
            top_k = self.find_k_nearest_neighbors(track, k=k)

            tracks = []

            for neighbor in top_k:
                idx = int(neighbor[0])
                track_info = self.track_info[idx]

                for field in fields:
                    tracks.append(track_info.get(field, None))

                tracks.append(neighbor[1])
        else:
            tracks = [None, ] * ((len(fields) + 1) * k)
            
        return tracks

In [5]:
knn = KNNOnEmbeddings.load(knn_weights_path)

In [6]:
fields = [
    'track_id',
    'artistsName',
    'trackTitle',
    'isCover',
    'rating',
    'title_regex_flag',
    'other_titles_regex_flag',
    'version_regex_flag',
]

In [None]:
neighbors_info_test = []

for _, track in tqdm(X_test.iterrows()):
    neighbors_info_test.append(knn.find_k_nearest_tracks(dict(track), fields))

In [9]:
additional_features_names = [field + '_top' + suffix for suffix in ['1', '2', '3'] for field in fields + ['similarity']]

In [10]:
test_additional_features = pd.DataFrame(neighbors_info_test, columns=additional_features_names)

In [11]:
X_test = pd.concat([X_test.reset_index(drop=True), test_additional_features], axis=1, ignore_index=False)

In [12]:
for i in range(1, 4):
    X_test[f'similarity_top{i}'].fillna(0., inplace=True)

In [13]:
categorical_features = [
    'track_id', 
    'rating', 
    'rating_nan', 
    'title_regex_flag', 
    'other_titles_regex_flag', 
    'version_regex_flag',
    'track_id_top1',
    'isCover_top1',
    'rating_top1', 
    'title_regex_flag_top1', 
    'other_titles_regex_flag_top1',
    'version_regex_flag_top1',
    'track_id_top2',
    'isCover_top2',
    'rating_top2', 
    'title_regex_flag_top2', 
    'other_titles_regex_flag_top2',
    'version_regex_flag_top2',
    'track_id_top3',
    'isCover_top3',
    'rating_top3', 
    'title_regex_flag_top3', 
    'other_titles_regex_flag_top3',
    'version_regex_flag_top3',
]

text_features = [
    'artistsName', 
    'trackTitle', 
    'text',
    'artistsName_top1', 
    'trackTitle_top1',
    'artistsName_top2', 
    'trackTitle_top2',
    'artistsName_top3', 
    'trackTitle_top3',
]

In [17]:
test_dataset = cb.Pool(
    X_test.drop(text_features, axis=1).fillna(-1).astype(int).replace(-1, 'NaN'),
#     uncomment this if you have ground truth labels
#     y_test,
    cat_features=categorical_features,
)

In [23]:
catboost_classifier = cb.CatBoostClassifier()
catboost_classifier.load_model(catboost_weights_path);

In [19]:
catboost_classifier.predict(test_dataset)

array([1, 0, 1, ..., 0, 0, 0], dtype=int64)

In [None]:
# print(classification_report(test_dataset.get_label(), catboost_classifier.predict(test_dataset)))