# **Практическая работа №3. Применение методов глубокого обучения в среде Apache Spark**:

In [None]:
%%capture
!pip install elephas
!pip install findspark
!pip install numpy==1.26.0

# Перезапуск среды выполнения
import os
os.kill(os.getpid(), 9)

# После установки библиотек появится сообщение "Сеанс прекращен по неизвестной причине."
# Так и должно быть. Запускайте код в следующих далее ячейках.

In [1]:
from tensorflow.keras.datasets import mnist
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Activation
from tensorflow.keras.optimizers.legacy import SGD
from tensorflow.keras.utils import to_categorical

from elephas.spark_model import SparkModel
from elephas.utils.rdd_utils import to_simple_rdd

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import findspark

findspark.init()

In [2]:
conf = SparkConf().setMaster("local").setAppName("Data Analysis")
sc = SparkContext(conf = conf)
spark = SparkSession(sc)

In [3]:
from tensorflow.keras.datasets import mnist
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Activation
from tensorflow.keras.utils import to_categorical
from tensorflow.keras import optimizers
from tensorflow.keras.optimizers.legacy import SGD

from elephas.ml_model import ElephasEstimator
from elephas.ml.adapter import to_data_frame

from pyspark import SparkContext, SparkConf
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml import Pipeline

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, FloatType

import numpy as np

### **Задание 1. Обучите модель классификации**

Ссылка на датасеты: https://www.kaggle.com/datasets?search=classification&tags=13302-Classification

Датасет: Drug Classification

Задача: построить модель, которая по данным о пациенте (возраст, пол, давление, холестерин, уровень натрия/калия) предсказывает, какое лекарство (Drug) нужно назначить этому пациенту.

In [4]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, StandardScaler, VectorAssembler

In [6]:
df = spark.read.csv('/content/drug200.csv', header=True, inferSchema=True)
df.printSchema()
df.show(5)

root
 |-- Age: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- BP: string (nullable = true)
 |-- Cholesterol: string (nullable = true)
 |-- Na_to_K: double (nullable = true)
 |-- Drug: string (nullable = true)

+---+---+------+-----------+-------+-----+
|Age|Sex|    BP|Cholesterol|Na_to_K| Drug|
+---+---+------+-----------+-------+-----+
| 23|  F|  HIGH|       HIGH| 25.355|DrugY|
| 47|  M|   LOW|       HIGH| 13.093|drugC|
| 47|  M|   LOW|       HIGH| 10.114|drugC|
| 28|  F|NORMAL|       HIGH|  7.798|drugX|
| 61|  F|   LOW|       HIGH| 18.043|DrugY|
+---+---+------+-----------+-------+-----+
only showing top 5 rows



In [7]:
from pyspark.ml.feature import StringIndexer
indexer_sex = StringIndexer(inputCol='Sex', outputCol='Sex_index')
indexer_bp = StringIndexer(inputCol='BP', outputCol='BP_index')
indexer_ch = StringIndexer(inputCol='Cholesterol', outputCol='Cholesterol_index')
indexer_drug = StringIndexer(inputCol='Drug', outputCol='label')

df = indexer_sex.fit(df).transform(df)
df = indexer_bp.fit(df).transform(df)
df = indexer_ch.fit(df).transform(df)
df = indexer_drug.fit(df).transform(df)

In [8]:
encoder = OneHotEncoder(
    inputCols=["Sex_index", "BP_index", "Cholesterol_index"],
    outputCols=["SexVec", "BPVec", "CholVec"]
)
df = encoder.fit(df).transform(df)

In [9]:
assembler = VectorAssembler(
    inputCols=['Age', 'Sex_index', 'BP_index', 'Cholesterol_index', 'Na_to_K'],
    outputCol='features'
)

In [10]:
output = assembler.transform(df)
output.select("features").show()

+--------------------+
|            features|
+--------------------+
|[23.0,1.0,0.0,0.0...|
|[47.0,0.0,1.0,0.0...|
|[47.0,0.0,1.0,0.0...|
|[28.0,1.0,2.0,0.0...|
|[61.0,1.0,1.0,0.0...|
|[22.0,1.0,2.0,0.0...|
|[49.0,1.0,2.0,0.0...|
|[41.0,0.0,1.0,0.0...|
|[60.0,0.0,2.0,0.0...|
|[43.0,0.0,1.0,1.0...|
|[47.0,1.0,1.0,0.0...|
|[34.0,1.0,0.0,1.0...|
|[43.0,0.0,1.0,0.0...|
|[74.0,1.0,1.0,0.0...|
|[50.0,1.0,2.0,0.0...|
|[16.0,1.0,0.0,1.0...|
|[69.0,0.0,1.0,1.0...|
|(5,[0,4],[43.0,13...|
|[23.0,0.0,1.0,0.0...|
|[32.0,1.0,0.0,1.0...|
+--------------------+
only showing top 20 rows



In [11]:
output.show()

+---+---+------+-----------+-------+-----+---------+--------+-----------------+-----+-------------+-------------+-------------+--------------------+
|Age|Sex|    BP|Cholesterol|Na_to_K| Drug|Sex_index|BP_index|Cholesterol_index|label|       SexVec|        BPVec|      CholVec|            features|
+---+---+------+-----------+-------+-----+---------+--------+-----------------+-----+-------------+-------------+-------------+--------------------+
| 23|  F|  HIGH|       HIGH| 25.355|DrugY|      1.0|     0.0|              0.0|  0.0|    (1,[],[])|(2,[0],[1.0])|(1,[0],[1.0])|[23.0,1.0,0.0,0.0...|
| 47|  M|   LOW|       HIGH| 13.093|drugC|      0.0|     1.0|              0.0|  4.0|(1,[0],[1.0])|(2,[1],[1.0])|(1,[0],[1.0])|[47.0,0.0,1.0,0.0...|
| 47|  M|   LOW|       HIGH| 10.114|drugC|      0.0|     1.0|              0.0|  4.0|(1,[0],[1.0])|(2,[1],[1.0])|(1,[0],[1.0])|[47.0,0.0,1.0,0.0...|
| 28|  F|NORMAL|       HIGH|  7.798|drugX|      1.0|     2.0|              0.0|  1.0|    (1,[],[])|    (2,

In [12]:
final_data = output.select("features",'label')

In [13]:
# Разделение на обучающую и тестовую выборку
train_data, test_data = final_data.randomSplit([0.7,0.3])

In [14]:
# Создание модели Keras
model = Sequential()
model.add(Dense(64, input_shape=(train_data.select('features').first()[0].size,)))
model.add(Activation('relu'))
model.add(Dense(64))
model.add(Activation('relu'))
model.add(Dense(5, activation='softmax'))  # 5 классов (типов лекарств)

In [19]:
# Компиляция модели

model.compile(optimizer=Adam(learning_rate=0.0001),
              loss='categorical_crossentropy',
              metrics=['accuracy'])

In [20]:
def to_data_frame(spark, x_data, y_data):
    return spark.createDataFrame(
        [(Vectors.dense(x), int(y)) for x, y in zip(x_data, y_data)],
        ['features', 'label']
    )

In [21]:
# Преобразуем данные для обучения и тестирования
from pyspark.ml.linalg import Vectors

train_features = [Vectors.dense(row[0].toArray()) for row in train_data.select('features').collect()]
train_labels = [row[0] for row in train_data.select('label').collect()]
df_train = spark.createDataFrame(zip(train_features, train_labels), ["features", "label"])

test_features = [Vectors.dense(row[0].toArray()) for row in test_data.select('features').collect()]
test_labels = [row[0] for row in test_data.select('label').collect()]
df_test = spark.createDataFrame(zip(test_features, test_labels), ["features", "label"])

In [22]:
# Создание и настройка ElephasEstimator
estimator = ElephasEstimator()
estimator.set_keras_model_config(model.to_json())
estimator.set_optimizer_config({"class_name": "Adam", "config": {"learning_rate": 0.0001}})
estimator.set_mode("asynchronous")
estimator.set_loss("sparse_categorical_crossentropy")
estimator.set_metrics(['accuracy'])
estimator.set_epochs(30)
estimator.set_batch_size(2)
estimator.set_validation_split(0.1)
estimator.set_categorical_labels(False)

ElephasEstimator_68e56d8dc466

In [23]:
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               (None, 64)                384       
                                                                 
 activation (Activation)     (None, 64)                0         
                                                                 
 dense_1 (Dense)             (None, 64)                4160      
                                                                 
 activation_1 (Activation)   (None, 64)                0         
                                                                 
 dense_2 (Dense)             (None, 5)                 325       
                                                                 
Total params: 4869 (19.02 KB)
Trainable params: 4869 (19.02 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [24]:
from pyspark.ml import Pipeline
import numpy as np
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.mllib.evaluation import MulticlassMetrics

In [25]:
# Обучаем модель через ElephasEstimator
pipeline = Pipeline(stages=[estimator])

# Обучение на тренировочном наборе
fitted_pipeline = pipeline.fit(df_train)

>>> Fit model
 * Serving Flask app 'elephas.parameter.server'
 * Debug mode: off


 * Running on http://172.28.0.12:4000
INFO:werkzeug:[33mPress CTRL+C to quit[0m


>>> Initialize workers
>>> Distribute load


INFO:werkzeug:172.28.0.12 - - [10/Jun/2025 07:29:47] "GET /parameters HTTP/1.1" 200 -
INFO:werkzeug:172.28.0.12 - - [10/Jun/2025 07:29:48] "POST /update HTTP/1.1" 200 -
INFO:werkzeug:172.28.0.12 - - [10/Jun/2025 07:29:48] "GET /parameters HTTP/1.1" 200 -
INFO:werkzeug:172.28.0.12 - - [10/Jun/2025 07:29:48] "POST /update HTTP/1.1" 200 -
INFO:werkzeug:172.28.0.12 - - [10/Jun/2025 07:29:48] "GET /parameters HTTP/1.1" 200 -
INFO:werkzeug:172.28.0.12 - - [10/Jun/2025 07:29:48] "POST /update HTTP/1.1" 200 -
INFO:werkzeug:172.28.0.12 - - [10/Jun/2025 07:29:48] "GET /parameters HTTP/1.1" 200 -
INFO:werkzeug:172.28.0.12 - - [10/Jun/2025 07:29:49] "POST /update HTTP/1.1" 200 -
INFO:werkzeug:172.28.0.12 - - [10/Jun/2025 07:29:49] "GET /parameters HTTP/1.1" 200 -
INFO:werkzeug:172.28.0.12 - - [10/Jun/2025 07:29:49] "POST /update HTTP/1.1" 200 -
INFO:werkzeug:172.28.0.12 - - [10/Jun/2025 07:29:49] "GET /parameters HTTP/1.1" 200 -
INFO:werkzeug:172.28.0.12 - - [10/Jun/2025 07:29:49] "POST /update HT

>>> Async training complete.


In [26]:
# Применение на тестовом наборе
predictions = fitted_pipeline.transform(df_test)

In [27]:
# UDF-функция для получения класса из вероятностей
argmax_udf = udf(lambda x: int(np.argmax(x)), IntegerType())

# Применим UDF
predictions = predictions.withColumn("prediction", argmax_udf(predictions["prediction"]))

In [28]:
# Отобразим первые 10 строк предсказаний
predictions.select("label", "prediction").show(10, truncate=False)

+-----+----------+
|label|prediction|
+-----+----------+
|2.0  |0         |
|2.0  |3         |
|2.0  |3         |
|3.0  |3         |
|3.0  |3         |
|3.0  |3         |
|0.0  |0         |
|1.0  |1         |
|1.0  |1         |
|1.0  |0         |
+-----+----------+
only showing top 10 rows



In [29]:
# Оценка качества
prediction_and_labels = predictions.select("prediction", "label").rdd.map(lambda row: (float(row.prediction), float(row.label)))

metrics = MulticlassMetrics(prediction_and_labels)

# Матрица ошибок
print("Матрица ошибок:")
print(metrics.confusionMatrix().toArray())

# Точность модели
accuracy = metrics.accuracy
print(f"\nТочность модели: {accuracy:.4f}")

# Precision и Recall по каждому классу
labels = prediction_and_labels.map(lambda x: x[1]).distinct().collect()
for label in labels:
    print(f"Класс {int(label)}:")
    print(f"  Precision: {metrics.precision(label):.4f}")
    print(f"  Recall: {metrics.recall(label):.4f}")
    print(f"  F1-score: {metrics.fMeasure(label):.4f}")



Матрица ошибок:
[[21.  5.  0.  1.  0.]
 [ 2. 13.  0.  0.  0.]
 [ 1.  3.  0.  3.  0.]
 [ 0.  1.  0.  3.  0.]
 [ 0.  5.  0.  0.  0.]]

Точность модели: 0.6379
Класс 2:
  Precision: 0.0000
  Recall: 0.0000
  F1-score: 0.0000
Класс 3:
  Precision: 0.4286
  Recall: 0.7500
  F1-score: 0.5455
Класс 0:
  Precision: 0.8750
  Recall: 0.7778
  F1-score: 0.8235
Класс 1:
  Precision: 0.4815
  Recall: 0.8667
  F1-score: 0.6190
Класс 4:
  Precision: 0.0000
  Recall: 0.0000
  F1-score: 0.0000


### **Задание 2. Обучите модель регрессии**

Ссылка на датасеты: https://www.kaggle.com/datasets?search=Regression&page=2

Датасет: House Sales in King County, USA

Задача: предсказать цену дома на основе его характеристик.

In [105]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.linalg import Vectors
from elephas.ml_model import ElephasEstimator
from elephas.ml.adapter import to_data_frame
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Activation
from tensorflow.keras.optimizers import SGD
import tensorflow.keras.optimizers as optimizers

In [154]:
df_2 = spark.read.csv('/content/kc_house_data.csv', header=True, inferSchema=True)
df_2.printSchema()
df_2.show(5)

root
 |-- id: long (nullable = true)
 |-- date: string (nullable = true)
 |-- price: double (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- bathrooms: double (nullable = true)
 |-- sqft_living: integer (nullable = true)
 |-- sqft_lot: integer (nullable = true)
 |-- floors: double (nullable = true)
 |-- waterfront: integer (nullable = true)
 |-- view: integer (nullable = true)
 |-- condition: integer (nullable = true)
 |-- grade: integer (nullable = true)
 |-- sqft_above: integer (nullable = true)
 |-- sqft_basement: integer (nullable = true)
 |-- yr_built: integer (nullable = true)
 |-- yr_renovated: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- sqft_living15: integer (nullable = true)
 |-- sqft_lot15: integer (nullable = true)

+----------+---------------+--------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------

In [155]:
# Преобразуем данные в нужный формат
feature_columns = ['bedrooms', 'bathrooms', 'sqft_living', 'sqft_lot', 'floors',
                   'waterfront', 'view', 'condition', 'grade', 'sqft_above',
                   'sqft_basement', 'yr_built', 'yr_renovated', 'zipcode', 'lat', 'long']

In [156]:
# Создаем VectorAssembler
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')

In [157]:
df_transformed = assembler.transform(df_2)

In [158]:
# Разделяем данные на обучающую и тестовую выборки
train_df, test_df = df_transformed.randomSplit([0.8, 0.2], seed=1234)

In [159]:
# Преобразуем RDD в два отдельных списка: features и labels
train_features = train_df.select("features").rdd.map(lambda row: row["features"].toArray()).collect()
train_labels = train_df.select("price").rdd.map(lambda row: row["price"]).collect()

test_features = test_df.select("features").rdd.map(lambda row: row["features"].toArray()).collect()
test_labels = test_df.select("price").rdd.map(lambda row: row["price"]).collect()

# Преобразуем в формат для Elephas
train_data = to_data_frame(spark.sparkContext, train_features, train_labels)
test_data = to_data_frame(spark.sparkContext, test_features, test_labels)

In [160]:
model = Sequential()
model.add(Dense(128, input_shape=(len(feature_columns),), activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(1))  # для регрессии


In [161]:
adam = Adam(learning_rate=0.001)
adam_conf = {
    "class_name": "Adam",
    "config": adam.get_config()
}

In [162]:
batch_size = 32
epochs = 50

In [163]:
estimator = ElephasEstimator()
estimator.set_keras_model_config(model.to_json())
estimator.set_optimizer_config(adam_conf)
estimator.set_mode("synchronous")
estimator.set_loss("mae")
estimator.set_metrics(['mse'])
estimator.set_epochs(epochs)
estimator.set_batch_size(batch_size)
estimator.set_validation_split(0.1)
estimator.set_categorical_labels(False)

ElephasEstimator_bfff8b5cbea4

In [164]:
# Обучение модели через Pipeline
pipeline = Pipeline(stages=[estimator])
fitted_pipeline = pipeline.fit(train_data)

>>> Fit model
>>> Synchronous training complete.


In [165]:
prediction = fitted_pipeline.transform(test_data)
pnl = prediction.select("label", "prediction")
pnl.show(10)

+--------+------------+
|   label|  prediction|
+--------+------------+
|300000.0|    604571.0|
|235000.0|459364.84375|
|617000.0|  542613.125|
|843000.0|  829666.875|
|837700.0| 778822.3125|
|715000.0|  817345.875|
|795000.0|   853241.75|
|319500.0| 672385.4375|
|570000.0|  560088.875|
|550120.0|  694958.875|
+--------+------------+
only showing top 10 rows



In [166]:
from pyspark.mllib.evaluation import RegressionMetrics

prediction_and_label = pnl.rdd.map(lambda row: (row.label, row.prediction))
metrics = RegressionMetrics(prediction_and_label)

print("MAE:", metrics.meanAbsoluteError)

MAE: 177375.1860320981


### **Задание 3. Обучите модель текстовой классификации**

Датасет: Coronavirus tweets NLP - Text Classification

Задача: Классификация характера твита (положительный, отрицательный или нейтральный)

Ссылка на датасеты: https://www.kaggle.com/datasets?search=text+classification

In [16]:
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, when
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Activation
from tensorflow.keras.optimizers.legacy import Adam
from elephas.spark_model import SparkModel
from pyspark.mllib.evaluation import MulticlassMetrics
import numpy as np

In [30]:
df = spark.read.csv('/content/Corona_NLP_train.csv', header=True, inferSchema=True)

In [31]:
df = df.select("OriginalTweet", "Sentiment").na.drop()

In [32]:
from pyspark.sql.functions import lower

df = df.withColumn("Sentiment", lower(col("Sentiment")))
df = df.filter(col("Sentiment").isin("positive", "negative", "neutral"))

In [33]:
classes = ['positive', 'negative', 'neutral']
balanced_df = None
for cls in classes:
    cls_df = df.filter(col("Sentiment") == cls).limit(250)
    balanced_df = cls_df if balanced_df is None else balanced_df.union(cls_df)


In [34]:
indexer = StringIndexer(inputCol="Sentiment", outputCol="indexedLabel")
indexer_model = indexer.fit(balanced_df)
train_df = indexer_model.transform(balanced_df)

In [35]:
tokenizer = Tokenizer(inputCol="OriginalTweet", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="raw_features", numFeatures=2000)
idf = IDF(inputCol="raw_features", outputCol="features")

In [36]:
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf])
pipeline_model = pipeline.fit(train_df)
train_data = pipeline_model.transform(train_df)

In [37]:
train_data, test_data = train_data.randomSplit([0.7, 0.3], seed=42)
train_data.cache()
test_data.cache()

print(f"Train count: {train_data.count()}")
print(f"Test count: {test_data.count()}")

Train count: 525
Test count: 225


In [45]:
num_classes = 3
model = Sequential()
model.add(Dense(128, input_shape=(2000,)))
model.add(Activation('relu'))
model.add(Dense(64))
model.add(Activation('relu'))
model.add(Dense(num_classes))
model.add(Activation('softmax'))

In [46]:
model.compile(optimizer=Adam(lr=0.001), loss='sparse_categorical_crossentropy', metrics=['accuracy'])

In [40]:
train_rdd = train_data.rdd.map(lambda row: (row['features'].toArray(), row['indexedLabel']))

In [42]:
from tensorflow.keras.callbacks import EarlyStopping

spark_model = SparkModel(
    model,
    frequency='batch',  # Более частая синхронизация (чем 'epoch') может улучшить сходимость
    mode='asynchronous',  # Асинхронный режим часто работает быстрее для больших кластеров
    port=4002,
    num_workers=4,  # Оптимальное количество воркеров для вашего кластера
    elastic_learning_rate=0.001,  # Адаптация learning rate между воркерами
    dynamic_loss_scale=True  # Автоматическое масштабирование loss для стабильности
)
history = spark_model.fit(
    train_rdd,
    epochs=50,  # Больше эпох для сложных моделей
    batch_size=128,  # Увеличенный batch для распределённого обучения
    validation_split=0.1,  # Меньше validation данных для большего обучающего набора
    verbose=1,
    shuffle=True,  # Перемешивание данных между эпохами
    learning_rate=0.001,  # Стандартный learning rate для Adam
    metrics=['accuracy'],  # Контроль метрик
    callbacks=[EarlyStopping(monitor='val_loss', patience=3)]  # Ранняя остановка
)

>>> Fit model
 * Serving Flask app 'elephas.parameter.server'
 * Debug mode: off


 * Running on http://172.28.0.12:4002
INFO:werkzeug:[33mPress CTRL+C to quit[0m


>>> Initialize workers
>>> Distribute load


INFO:werkzeug:172.28.0.12 - - [10/Jun/2025 08:39:00] "GET /parameters HTTP/1.1" 200 -
INFO:werkzeug:172.28.0.12 - - [10/Jun/2025 08:39:01] "POST /update HTTP/1.1" 200 -
INFO:werkzeug:172.28.0.12 - - [10/Jun/2025 08:39:01] "GET /parameters HTTP/1.1" 200 -
INFO:werkzeug:172.28.0.12 - - [10/Jun/2025 08:39:01] "POST /update HTTP/1.1" 200 -
INFO:werkzeug:172.28.0.12 - - [10/Jun/2025 08:39:01] "GET /parameters HTTP/1.1" 200 -
INFO:werkzeug:172.28.0.12 - - [10/Jun/2025 08:39:01] "POST /update HTTP/1.1" 200 -
INFO:werkzeug:172.28.0.12 - - [10/Jun/2025 08:39:01] "GET /parameters HTTP/1.1" 200 -
INFO:werkzeug:172.28.0.12 - - [10/Jun/2025 08:39:01] "POST /update HTTP/1.1" 200 -
INFO:werkzeug:172.28.0.12 - - [10/Jun/2025 08:39:01] "GET /parameters HTTP/1.1" 200 -
INFO:werkzeug:172.28.0.12 - - [10/Jun/2025 08:39:01] "POST /update HTTP/1.1" 200 -
INFO:werkzeug:172.28.0.12 - - [10/Jun/2025 08:39:01] "GET /parameters HTTP/1.1" 200 -
INFO:werkzeug:172.28.0.12 - - [10/Jun/2025 08:39:01] "POST /update HT

>>> Async training complete.


In [43]:
test_np_x = np.array(test_data.rdd.map(lambda row: row['features'].toArray()).collect())
test_np_y = np.array(test_data.rdd.map(lambda row: row['indexedLabel']).collect())
predictions = spark_model.predict(test_np_x)
predictions = np.argmax(predictions, axis=1)

In [44]:
# Метрики
print(f"test_np_x shape: {test_np_x.shape}")
print(f"test_np_y shape: {test_np_y.shape}")
print(f"predictions shape: {predictions.shape}")

prediction_and_label = spark.sparkContext.parallelize([(float(y), float(p)) for y, p in zip(test_np_y, predictions)])
metrics = MulticlassMetrics(prediction_and_label)
print(f"Accuracy: {metrics.accuracy}")
print(f"Precision: {metrics.weightedPrecision}")
print(f"Recall: {metrics.weightedRecall}")
print(f"F1 Score: {metrics.weightedFMeasure()}")

print(f"Sample preds: {predictions[:10]}")
print(f"Sample actual: {test_np_y[:10]}")

test_np_x shape: (225, 2000)
test_np_y shape: (225,)
predictions shape: (225,)




Accuracy: 0.40444444444444444
Precision: 0.43727870959449905
Recall: 0.40444444444444444
F1 Score: 0.41131310456434594
Sample preds: [2 2 2 2 2 1 2 0 2 0]
Sample actual: [2. 2. 2. 2. 2. 2. 2. 2. 2. 2.]
