In [1]:
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import DataFrame
from pyspark.sql import Window

from sklearn.neighbors import NearestNeighbors
import pandas as pd
import numpy as np

In [2]:
from sklearn.datasets import make_moons, make_classification, make_regression

In [3]:
from sklearn.model_selection import train_test_split

In [4]:
from connector import spark_connector

In [5]:
con = spark_connector(exec_inst=10, exec_cores=10, memory_exec=25, marshal_mode=True)

In [6]:
con.spark

In [10]:
X, y = make_classification(
    n_samples=10000,
    n_features=15,
    n_informative=15,
    n_redundant=0,
    n_repeated=0,
    n_classes=5,
    n_clusters_per_class=2,
    weights=None,
    flip_y=0.04,
    class_sep=1.0,
    hypercube=True,
    shift=0.0,
    scale=1.0,
    shuffle=True,
    random_state=0,
)

In [7]:
X, y =  make_regression(
    n_samples=10000,
    n_features=3,
    n_informative=3,
    n_targets=1,
    bias=0.0,
    effective_rank=None,
    noise=0.05,
    shuffle=True,
    coef=False,
    random_state=0,
)

In [8]:
data = pd.DataFrame(X, columns=[f'x{i+1}' for i in range(X.shape[1])])
data['y'] = y

In [9]:
train, test = train_test_split(data, random_state=6, test_size=0.1)
train.shape, test.shape

((9000, 4), (1000, 4))

In [10]:
from sklearn.metrics import classification_report, mean_absolute_percentage_error

In [11]:
from sklearn.neighbors import KNeighborsRegressor, KNeighborsClassifier

In [12]:
nn = KNeighborsRegressor(n_neighbors=15,metric='euclidean', weights='distance', algorithm='brute', n_jobs=-1).fit(train.drop('y', axis=1), train['y'])
#nn = KNeighborsClassifier(n_neighbors=15,metric='euclidean', weights='distance', algorithm='brute', n_jobs=-1).fit(train.drop('y', axis=1), train['y'])

In [13]:
mean_absolute_percentage_error(test[:].reset_index(names=['index_test']).sort_values(by='index_test')['y'].to_numpy(), nn.predict(test[:].reset_index(names=['index_test']).sort_values(by='index_test').drop(['y','index_test'], axis=1)))
#print(classification_report(test[:].reset_index(names=['index_test']).sort_values(by='index_test')['y'].to_numpy(), nn.predict(test[:].reset_index(names=['index_test']).sort_values(by='index_test').drop(['y','index_test'], axis=1))))

0.15269159404715185

In [14]:
del nn

In [15]:
train = train.reset_index(names='index_train')
test = test.reset_index(names='index_test')

In [16]:
train_df = con.spark.createDataFrame(train)
test_df = con.spark.createDataFrame(test.drop(['y'], axis=1))

In [17]:
from norm import Normalizer

In [19]:
normalizer = Normalizer(
    method='zscore',
    columns=train_df.drop('index_train','y').columns
)
normalizer.fit(train_df)

<norm.Normalizer at 0x795d2f899990>

In [20]:
print(normalizer.stats)

{'x1': {'method': 'zscore', 'params': (0.000835914876987771, 0.9950252125618987)}, 'x2': {'method': 'zscore', 'params': (-0.00959960069506814, 0.9940890678194557)}, 'x3': {'method': 'zscore', 'params': (0.001439387547684664, 0.9881289505938848)}}


In [21]:
train_normalized = normalizer.transform(train_df)
test_normalized = normalizer.transform(test_df)

In [22]:
from KNN import spark_knn

In [23]:
knn = spark_knn(spark = con.spark, train_df = train_normalized)

In [24]:
knn.fit?

[0;31mSignature:[0m [0mknn[0m[0;34m.[0m[0mfit[0m[0;34m([0m[0mwindow_size[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m [0midcol[0m[0;34m:[0m [0mstr[0m [0;34m=[0m [0;34m'index_train'[0m[0;34m)[0m [0;34m->[0m [0;32mNone[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Метод для предобработки "обучающего" датасета.
window_size - количество строк в партиции. Если не задано, подсчет происходит автоматически.
normalize - метод нормализации данных. Есть minmax, zscore или None.
Нормализуются все столбцы кроме _idcol, _type_y. Следует передавать только фичи.
[0;31mFile:[0m      ~/work/Spark_conn/KNN.py
[0;31mType:[0m      method

In [25]:
train2 = knn.fit(window_size=None, idcol = 'index_train')

In [26]:
knn.predict?

[0;31mSignature:[0m
[0mknn[0m[0;34m.[0m[0mpredict[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mpred_df[0m[0;34m:[0m [0mpyspark[0m[0;34m.[0m[0msql[0m[0;34m.[0m[0mdataframe[0m[0;34m.[0m[0mDataFrame[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mpred_df_window_size[0m[0;34m:[0m [0mint[0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0midcol[0m[0;34m:[0m [0mstr[0m [0;34m=[0m [0;34m'index_test'[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mn_neighbors[0m[0;34m:[0m [0mint[0m [0;34m=[0m [0;36m10[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mmetric[0m[0;34m:[0m [0mstr[0m [0;34m=[0m [0;34m'euclidean'[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mweighted[0m[0;34m:[0m [0mbool[0m [0;34m=[0m [0;32mTrue[0m[0;34m,[0m[0;34m[0m
[0;34m[0m[0;34m)[0m [0;34m->[0m [0mpyspark[0m[0;34m.[0m[0msql[0m[0;34m.[0m[0mdataframe[0m[0;34m.[0m[0mDataFrame[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Метод д

In [None]:
result = knn.predict(test_normalized, metric = 'euclidean', idcol = 'index_test', pred_df_window_size=None, n_neighbors=15)

In [35]:
pred_y = result.drop('metric').toPandas()

In [40]:
print(classification_report(test.sort_values(by='index_test')['y'].to_numpy(), pred_y.sort_values(by='index_test')['y'].to_numpy()))

              precision    recall  f1-score   support

           0       0.88      0.82      0.85       201
           1       0.85      0.83      0.84       184
           2       0.86      0.90      0.88       186
           3       0.87      0.93      0.90       215
           4       0.89      0.86      0.88       214

    accuracy                           0.87      1000
   macro avg       0.87      0.87      0.87      1000
weighted avg       0.87      0.87      0.87      1000



In [36]:
mean_absolute_percentage_error(test.sort_values(by='index_test')['y'].to_numpy(), pred_y.sort_values(by='index_test')['y'].to_numpy())

0.1357402213993588

In [37]:
#con.stop_spark()

time: 2025-02-22 17:23:55.330104
del
stop
