In [1]:
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.metrics import accuracy_score
from sklearn import neighbors

import numpy as np
import zipfile
import json
import pickle

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
dataset_path = '/content/drive/MyDrive/texts-dev.zip'
zipfile.ZipFile(dataset_path, 'r').extractall()

session_path = '/content/dev-dataset-task2022-04.json'

with open(session_path) as f:
  raw_data = json.load(f)

In [4]:
X_data = []
y_data = []
for pair in raw_data:
  X_data.append(pair[0])
  y_data.append(int(pair[1]))

In [5]:
assert len(X_data) == len(y_data)

In [6]:
X_train, X_test, y_train, y_test = train_test_split(X_data, y_data, test_size=0.001, random_state=48151623)
X_train = np.array(X_train)
X_test = np.array(X_test)
y_train = np.array(y_train)
y_test = np.array(y_test)

In [7]:
X_train.size

3219

In [8]:
def kfold_split(num_objects, num_folds):
    """
    Split [0, 1, ..., num_objects - 1] into equal num_folds folds (last fold can be longer) and returns num_folds train-val
       pairs of indexes.

    Parameters:
    num_objects (int): number of objects in train set
    num_folds (int): number of folds for cross-validation split

    Returns:
    list((tuple(np.array, np.array))): list of length num_folds, where i-th element of list contains tuple of 2 numpy arrays,
                                       the 1st numpy array contains all indexes without i-th fold while the 2nd one contains
                                       i-th fold
    """
    partition = []
    size = num_objects // num_folds
    for i in range(0, num_folds - 1):
        partition.append(list(range(i * size, (i + 1) * size)))
    partition.append(list(range(partition[-1][-1] + 1, num_objects)))

    ret = []
    for part in partition:
        ret.append([np.concatenate((np.arange(0, part[0]), np.arange(part[-1] + 1, num_objects))),
                    np.arange(part[0], part[-1] + 1)])
    return ret

In [9]:
def knn_cv_score(x, y, parameters, score_function, folds, knn_class):
    """
    Takes train data, counts cross-validation score over grid of parameters (all possible parameters combinations)

    Parameters:
    X (2d np.array): train set
    y (1d np.array): train labels
    parameters (dict): dict with keys from {n_neighbors, metrics, weights, normalizers}, values of type list,
                       parameters['normalizers'] contains tuples (normalizer, normalizer_name), see parameters
                       example in your jupyter notebook
    score_function (callable): function with input (y_predict, y_true) which outputs score metric
    folds (list): output of kfold_split
    knn_class (obj): class of knn model to fit

    Returns:
    dict: key - tuple of (normalizer_name, n_neighbors, metric, weight), value - mean score over all folds
    """
    ret = dict()
    for i in folds:
        for normalizers in parameters['normalizers']:
            x_train, x_test = x[i[0]], x[i[1]]
            if normalizers[0] is not None:
                normalizers[0].fit(x_train)
                x_train, x_test = normalizers[0].transform(x_train), normalizers[0].transform(x_test)
            for n_neighbors in parameters['n_neighbors']:
                for metrics in parameters['metrics']:
                    for weights in parameters['weights']:
                        print(f"Training: {normalizers[1]} {weights} {metrics} {n_neighbors}")
                        clf = knn_class(n_neighbors=n_neighbors, weights=weights, metric=metrics)
                        clf.fit(x_train, y[i[0]])
                        score = score_function(y[i[1]], clf.predict(x_test))
                        if (normalizers[1], n_neighbors, metrics, weights) in ret:
                            ret[(normalizers[1], n_neighbors, metrics, weights)] += score / len(folds)
                        else:
                            ret[(normalizers[1], n_neighbors, metrics, weights)] = score / len(folds)
    return ret

In [6]:
import nltk
nltk.download("stopwords")

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [7]:
from nltk.corpus import stopwords
count_vec = CountVectorizer(max_df=0.8, min_df=10)
tf_idf = TfidfVectorizer(min_df=3, stop_words=stopwords.words('russian'))

In [12]:
parameters = {
    'n_neighbors': [i for i in range(1, 11)],
    'metrics': ['euclidean', 'cosine'],
    'weights': ['uniform', 'distance'],
    'normalizers': [(count_vec, 'CountVectorizer'), (tf_idf, 'TfidfVectorizer')]
}

In [13]:
#score_dict = knn_cv_score(X_train, y_train, parameters, accuracy_score, kfold_split(X_train.shape[0], 4), neighbors.KNeighborsClassifier)

In [14]:
#max(score_dict, key=score_dict.get)

In [8]:
X_train = np.array(X_data)
y_train = np.array(y_data)

In [9]:
X_train.size

3223

In [10]:
tf_idf.fit(X_train)
X_train_vec = tf_idf.transform(X_train)

In [11]:
clf = neighbors.KNeighborsClassifier(n_neighbors = 1, metric ='cosine')
clf.fit(X_train_vec, y_train)

KNeighborsClassifier(metric='cosine', n_neighbors=1)

In [12]:
saved_file = "KNN_model.pkl"
with open(saved_file, "wb") as file:
  pickle.dump(clf, file)

In [20]:
with open(saved_file, "rb") as file:
  model = pickle.load(file)

In [13]:
saved_vectorizer = "vectorizer.pkl"
with open(saved_vectorizer, "wb") as file:
  pickle.dump(tf_idf, file)

In [22]:
with open(saved_vectorizer, "rb") as file:
  vectorizer = pickle.load(file)

In [23]:
X_test_vec = vectorizer.transform(X_test)
score = accuracy_score(y_test, model.predict(X_test_vec))

In [24]:
score

1.0