In [None]:
# from sklearnex import patch_sklearn
# patch_sklearn()
!pip install wget

import os
import re
import pandas as pd
import numpy as np
import sklearn.model_selection
import wget
import gensim
from zipfile import ZipFile
from gensim.models import Word2Vec
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score


In [None]:
class Classifier:
    def __init__(self, extractor: str = 'self-trained w2v') -> None:
        self.svm_cls = SVC()
        self.extractor = extractor
        
        self.labels_to_id = {
            'Экономика': 0, 
            'Спорт': 1, 
            'Культура': 2, 
            'Наука и техника': 3
        }
        self.id_to_labels = {
            0: 'Экономика', 
            1: 'Спорт', 
            2: 'Культура', 
            3: 'Наука и техника'
        }
        self.random_state = 42
        
        if self.extractor == 'pretrained ft' and not os.path.exists('FastText'):
            wget.download('http://vectors.nlpl.eu/repository/20/214.zip')
            with ZipFile('214.zip') as zpfile:
                zpfile.extractall('FastText')
            os.remove('214.zip')
        
        
    def read_file(self, filename: str = None) -> tuple[pd.DataFrame, list[str], list[int]|None]:
        extension = os.path.splitext(filename)[1]
        df, texts, labels = None, None, None
        if extension == '.json':
            df = pd.read_json(filename)
            texts = []
            labels = []
            for k, v in self.labels_to_id.items():
                for text in df[k]['texts']:
                    texts.append( self.preprocessing(text) )
                    labels.append(v)
            
        elif extension == '.csv':
            df = pd.read_csv(filename)
            texts = [self.preprocessing(text) for text in df['text'] ]
        else:
            raise Exception('Unsupported extension')
        return (df, texts, labels)
        
    def preprocessing(self, text: str = None) -> str:
        s = text.lower()
        s = re.sub("[^а-яА-Яa-zA-Z0-9]", " ", s)
        s = re.sub("\s+", " ", s)
        s = s.strip()
        return s
    
    def get_features(self, texts: list[str], stage: str = 'train') -> np.array:
        features = []
        if self.extractor == 'self-trained w2v':
            if stage == 'train':
                tokens = [text.split() for text in texts]
                self.w2v_model = Word2Vec(sentences=tokens,
                                          vector_size=300,
                                          window=5,
                                          min_count=1,
                                          workers=os.cpu_count(),
                                          seed=self.random_state
                                         )
                self.w2v_model.build_vocab(tokens)
                self.w2v_model.train(tokens,
                                     total_examples=self.w2v_model.corpus_count, 
                                     epochs=100, 
                                     report_delay=1
                                    )

            for text in texts:
                vectors = []

                for word in text.split():
                    if word in self.w2v_model.wv:
                        vector = self.w2v_model.wv[word]
                        vectors.append(vector)

                vectors = np.array(vectors)
                feature = np.average(vectors, axis=0)
                features.append(feature)

            features = np.array(features)
        elif self.extractor == 'pretrained ft':
            self.ft_model = gensim.models.KeyedVectors.load('FastText/model.model')
            self.ft_model.fill_norms(force=True)
            
            for text in texts:
                vectors = []
                
                vectors.append(np.zeros(self.ft_model.vector_size))

                for word in text.split():
                    if word in self.ft_model.key_to_index:
                        vector = self.ft_model[self.ft_model.key_to_index[word]]
                        vectors.append(vector)

                vectors = np.array(vectors)
                
                feature = np.average(vectors, axis=0)
                features.append(feature)

            features = np.array(features)
        else:
            raise Exception(f'Unsupported {self.extractor} extractor')
        return features
    
    def fit(self, filename: str = None) -> float:
        df, texts, labels = self.read_file(filename=filename)
        
        features = self.get_features(texts, stage='train')
        
        train_features, val_features, train_labels, val_labels = sklearn.model_selection.train_test_split(features, labels, test_size=0.2, stratify=labels, shuffle=True, random_state=self.random_state)
        
        self.svm_cls.fit(train_features, train_labels)
        
        preds = self.svm_cls.predict(val_features)
        return accuracy_score(val_labels, preds)
        
    
    def predict(self, filename: str = None, submit: bool = False) -> np.array:
        df, texts, _ = self.read_file(filename=filename)
        
        features = self.get_features(texts, stage = 'test')
        
        predictions = self.svm_cls.predict(features)
        
        if submit:
            submission = pd.DataFrame()
            
            ids = []
            labels = []
            for i, v in enumerate(predictions):
                ids.append(i)
                labels.append(''.join(self.id_to_labels[v].strip().split()))
                
            submission['Id'] = ids
            submission['Category'] = labels

            submission.to_csv('submission.csv', index=False)
            
        return predictions

In [None]:
cls = Classifier(extractor='pretrained ft')
val_accuracy = cls.fit(filename='/kaggle/input/nlp-itmo-exercise-1/archive/train_10000.json', )
print(val_accuracy)

In [None]:
cls.predict(filename='/kaggle/input/nlp-itmo-exercise-1/archive/test.csv', submit=True)