# Как работает логистическая регрессия в Spark: особенности прогноза
`Логистическая регрессия` – это статистическая модель, которая используется в машинном обучении для прогнозирования вероятности возникновения некоторого события путем построения логистической функции и сравнения этого события с кривой этой функции. В результате формируется ответ в виде вероятности бинарного события: `0` и `1`, где `0` – событие не произошло, `1` – событие произошло.

# Работа с логистической регрессией в Spark

Для того, чтобы начать работу по прогнозу данных, необходимо настроить базовую конфигурацию, импортировав некоторые классы библиотек `Spark MLlib` и `Spark SQL`:

In [1]:
!pip install pyspark



In [2]:
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [3]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *

ДАТАСЕТ С ДОМАМИ НА ПРОДАЖУ

В качестве примера мы будем использовать датасет Kaggle, который содержит данные о домах на продажу в Бруклине с 2003 по 2017 года и доступен для скачивания. Он содержит 111 атрибутов (столбцов) и 390883 записей (строк). В атрибуты включены: дата продажи, дата постройки, цена на дом, налоговый класс, соседние регионы, долгота, ширина и др.

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [6]:
import os

In [7]:
os.chdir("/content/drive/My Drive/Colab Notebooks/Var11")
os.listdir()

['delivery.csv']

Теперь необходимо импортировать входные данные, создав на их основе набор RDD (Resilient Distributed Dataset).

In [10]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
data = spark.read.csv(
    '/content/drive/My Drive/Colab Notebooks/Var11/delivery.csv',
    inferSchema=True, header=True)

# ГОТОВИМ АТРИБУТ ДЛЯ ПОСЛЕДУЮЩЕЙ БИНАРНОЙ КЛАССИФИКАЦИИ

Допустим, требуется классифицировать налоговый класс на дом (tax_class). Всего имеется 10 таких классов. Поскольку данные распределены неравномерно (например, в классе 1 имеется 198969 записей, а в 3-м — только 18), мы разделим их на 2 категории: те, которые принадлежат классу 1, и остальные. В Python это делается очень просто, нужно просто вызвать метод replace:

In [13]:
from pyspark.sql import functions as F

# Бинаризация статуса
data = data.withColumn(
    "status",
    F.when(F.col("status") == "delayed", 1).otherwise(0)
)

Кроме того, алгоритмы Machine Learning в PySpark работают с числовым значениями, а не со строками. Поэтому преобразуем значения столбца tax_class в тип int:

# ПОДБОР ПРИЗНАКОВ И ПРЕОБРАЗОВАНИЕ КАТЕГОРИЙ

In [14]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# Индексация для origin
indexer_origin = StringIndexer(inputCol="origin", outputCol="origin_index")
data = indexer_origin.fit(data).transform(data)

# One-Hot Encoding для origin
encoder_origin = OneHotEncoder(inputCol="origin_index", outputCol="origin_vec")
data = encoder_origin.fit(data).transform(data)

# Аналогично для destination
indexer_dest = StringIndexer(inputCol="destination", outputCol="dest_index")
data = indexer_dest.fit(data).transform(data)

encoder_dest = OneHotEncoder(inputCol="dest_index", outputCol="dest_vec")
data = encoder_dest.fit(data).transform(data)

Обработка пропусков

In [15]:
# Проверка пропусков
from pyspark.sql.functions import col, sum as spark_sum

data.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in data.columns]).show()

# Импутация (пример для числовых признаков)
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=["distance_km", "delivery_time_min"],
    outputCols=["distance_km_imp", "delivery_time_min_imp"]
)
data = imputer.fit(data).transform(data)

+-----------+----+------+-----------+-----------+-----------------+------+------------+----------+----------+--------+
|delivery_id|date|origin|destination|distance_km|delivery_time_min|status|origin_index|origin_vec|dest_index|dest_vec|
+-----------+----+------+-----------+-----------+-----------------+------+------------+----------+----------+--------+
|          0|   0|     0|          0|          0|                0|     0|           0|         0|         0|       0|
+-----------+----+------+-----------+-----------+-----------------+------+------------+----------+----------+--------+



# ВЕКТОРИЗАЦИЯ ПРИЗНАКОВ

In [16]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

# Собираем все признаки
assembler = VectorAssembler(
    inputCols=["distance_km_imp", "delivery_time_min_imp",
              "origin_vec", "dest_vec"],
    outputCol="raw_features"
)
data = assembler.transform(data)

# Масштабирование
scaler = StandardScaler(
    inputCol="raw_features",
    outputCol="features",
    withStd=True,
    withMean=True
)
data = scaler.fit(data).transform(data)

In [None]:
Стратифицирование и разделение

In [18]:
# Ручная стратификация
stratified_data = data.stat.sampleBy(
    "status",
    fractions={0: 0.8, 1: 0.8},  # 80% для каждого класса
    seed=42
)

# Разделение
train = stratified_data
test = data.subtract(train)

Обучение модели с учетом дисбаланса

In [20]:
from pyspark.sql import functions as F
from pyspark.ml.classification import LogisticRegression

# 6. Обучение модели с учетом дисбаланса классов
# ------------------------------------------------------------
# Добавляем веса классов ПЕРЕД разделением на train/test
class_weights = {0: 1.0, 1: 5.0}  # Вес для класса 1 (delayed) увеличен в 5 раз

data = data.withColumn(
    "class_weight",
    F.when(F.col("status") == 1, class_weights[1]).otherwise(class_weights[0])
)

# Повторяем стратифицированное разделение после добавления class_weight
stratified_data = data.stat.sampleBy(
    "status",
    fractions={0: 0.8, 1: 0.8},
    seed=42
)
train = stratified_data
test = data.subtract(train)

# Теперь столбец class_weight существует в train
lr = LogisticRegression(
    featuresCol="features",
    labelCol="status",
    weightCol="class_weight",  # Убедитесь, что имя столбца совпадает
    maxIter=10
)

model = lr.fit(train)

# ДОБАВЛЕНИЕ ПРИЗНАКОВ

Векторизуем также год постройки (year_of_sale) и соседние регионы (neighborhood_id). Для этого нужно только в VectorAssembler указать выбранные признаки:

In [21]:
# Отношение времени доставки к расстоянию
data = data.withColumn(
    "delivery_speed",
    F.col("delivery_time_min_imp") / F.col("distance_km_imp")
)

# Категоризация расстояния
data = data.withColumn(
    "distance_category",
    F.when(F.col("distance_km_imp") < 500, "short")
     .when(F.col("distance_km_imp").between(500, 1000), "medium")
     .otherwise("long")
)

# Индексация новой категории
indexer_dist = StringIndexer(inputCol="distance_category", outputCol="dist_cat_index")
data = indexer_dist.fit(data).transform(data)

In [None]:
Оценка модели

In [24]:
# Посмотрим на распределение предсказаний
predictions.groupBy("prediction").count().show()

# Выведем несколько примеров с реальными метками
predictions.select("status", "probability", "prediction").filter(F.col("status") == 1).show(10)

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|   17|
+----------+-----+

+------+-----------+----------+
|status|probability|prediction|
+------+-----------+----------+
+------+-----------+----------+

