# PySpark - Pandas UDF. Групповые типы функций 
`(применение через .applyInPandas, pandas_udf для GROUPED_AGG) + скалярные mapInPandas (+ .cogroup)`  

В данном Notebook рассматриваются типы функций: 
 - применение через .applyInPandas (ранняя реализация GROUPED_MAP)
 - GROUPED_AGG  **.mapInPandas**, **.cogroup.applyInPandas**

| Тип  | Скалярные/Групповые | Возвращаемое значение | Трансформация | Реализация |
|:-----|:-------|:--------|:---------------|:---------------|
| **GROUPED_MAP** (устаревшее) | Групповые | DataFrame | Сложная обработка групп | .applyInPandas(func, schema)
| **GROUPED_AGG** | Групповые | Scalar | Агрегация групп |  @pandas_udf(type) |
| Нет | Скалярные             | Iterator | Обработка партиций | .mapInPandas(func, schema)  |
| Нет | Скалярные             | Scalar | Обработка двух групп | .cogroup().applyInPandas() |


### Описание  
```python
pyspark.sql.functions.pandas_udf (f=None, returnType=None, functionType=None)
#    f=None,            - Функция для преобразования в UDF
#    returnType=None,   - Тип возвращаемого значения
#    functionType=None  - Тип UDF (depricted)
```
**Тип функций:** *GROUPED_MAP*, *GROUPED_AGG*

### Параметр `returnType` (Строковые обозначения):
```python
# для .applyInPandas(func, schema)
pandas.DataFrame # согласно схеме вывода pyspark.sql.types.StructType

# для GROUPED_AGG (в т.ч с .cogroup())
@pandas_udf(<"int", "long", "float", "double", "string", "boolean", "date", "timestamp">)

# для .mapInPandas(func, schema)
pandas.DataFrame # согласно схеме вывода pyspark.sql.types.StructType
```
### Возвращаемое значение  
```python
pandas.DataFrame # согласно схеме вывода pyspark.sql.types.StructType
```

Параметр, устанавливающий максимальное количество записей в одном **Arrow batch** при передаче данных между Spark и Python процессами:
```python
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "<Количество строк в батче>")
```
см. https://spark.apache.org/docs/3.5.7/api/python/reference/pyspark.sql/api/pyspark.sql.functions.pandas_udf.html

In [1]:
import os
import sys
spark_home = os.environ.get('SPARK_HOME', None)
sys.path.insert(0, spark_home + "python")
os.environ["SPARK_LOCAL_IP"]='localhost'
from pyspark import SparkContext, SparkConf#, HiveContext
conf = SparkConf()\
             .setAppName("Example Spark")\
             .setMaster("local[3]")\
             .setAppName("CountingSheep")\
             .set("spark.sql.catalogImplementation", "hive")
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/11 14:20:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.5.7
      /_/

Using Python version 3.13.7 (main, Nov 24 2025 20:51:28)
Spark context Web UI available at http://localhost:4040
Spark context available as 'sc' (master = local[3], app id = local-1768130438853).
SparkSession available as 'spark'.


In [3]:
spark

In [4]:
from pyspark.sql.types import *
import pyspark.sql.functions as f_
from pyspark.sql import Window
from pyspark.sql import DataFrame
from pyspark.sql.types import *
from pyspark.sql.functions import udf, pandas_udf, rand
from datetime import datetime, date
from pyspark.sql import Row
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

## Предварительная подготовка

### DataSet Titanic
| Колонка  | Тип | Описание| Комментарий |
|:-----|:-------|:--------|:---------------|
| PassengerId | integer | Уникальный идентификатор пассажира | Уникальный ключ | 
| Survived | integer | Выжившие | 1 - пассажир выжил, 0 - погиб (целевая перемнная) | 
| Pclass | string | Класс билета пассажира | Значения 1 - 3   | 
| Name | string | Фамилия и имя пассажира | Пример значения: Braund, Mr. Owen Harris  | 
| Sex | string | Пол пассажира | male - мужской, female - женский  | 
| Age | double | Возраст пассажира | Измеряется в годах  | 
| SibSp | integer | Количество братьев, сестер, сводных братьев, сводных сестер, супругов на борту | Число от 0 до 8 | 
| Parch | integer | Количество родителей, детей (в том числе приемных) на борту | Число от 0 до 9 | 
| Ticket | string | Номер билета пассажира| Пример значения: STON/O2. 3101282 | 
| Fare | double | Плата за проезд| Пример значения: 133.65 | 
| Cabin | string | Каюта| Пример значения: C123 | 
| Embarked| string |  Код порта посадки пассажира | S — Саутгемптон; C — Шербур; Q — Квинстаун. (NULL означает Саутгемптон) | 


In [5]:
# train и test
trainfile = "./Data/train.csv"
testfile  = "./Data/test.csv"

titanic_train_csv = spark.read.option("header", "true").option("inferSchema", "true").csv(trainfile)
titanic_test_csv = spark.read.option("header", "true").option("inferSchema", "true").csv(testfile)

### Тестовый random DataSet

In [6]:
data = []
nRow = 0
for product in ["String_A", "String_B"]:
    for i in range(15):
        nRow += 1
        date = datetime(2014, 5, 2) + timedelta(days=i*3)
        value1 = round(.55 + i * 1.08 + (0.68 if product == "String_A" else .23),6)
        value2 = 110.0 + i * 5 + (10 if product == "String_A" else 20)
        timestamp_val = datetime(2007, 5, 8, 16, 35) + timedelta(days=i*5*.05 + i*9)
        data.append((nRow, product, date, value1,value2, timestamp_val))

dfData = spark.createDataFrame(data, ["rowid","str_value", "date_val", "first_double_val", "second_double_val","timestamp_val"])

In [7]:
dfData.show(8,False)

+-----+---------+-------------------+----------------+-----------------+-------------------+
|rowid|str_value|date_val           |first_double_val|second_double_val|timestamp_val      |
+-----+---------+-------------------+----------------+-----------------+-------------------+
|1    |String_A |2014-05-02 00:00:00|1.23            |120.0            |2007-05-08 16:35:00|
|2    |String_A |2014-05-05 00:00:00|2.31            |125.0            |2007-05-17 22:35:00|
|3    |String_A |2014-05-08 00:00:00|3.39            |130.0            |2007-05-27 04:35:00|
|4    |String_A |2014-05-11 00:00:00|4.47            |135.0            |2007-06-05 10:35:00|
|5    |String_A |2014-05-14 00:00:00|5.55            |140.0            |2007-06-14 16:35:00|
|6    |String_A |2014-05-17 00:00:00|6.63            |145.0            |2007-06-23 22:35:00|
|7    |String_A |2014-05-20 00:00:00|7.71            |150.0            |2007-07-03 04:35:00|
|8    |String_A |2014-05-23 00:00:00|8.79            |155.0           

In [8]:
print("Arrow batch size:", spark.conf.get("spark.sql.execution.arrow.maxRecordsPerBatch"))

Arrow batch size: 10000


## Example .applyInPandas ( устар. PANDAS_UDF.GROUPED_MAP)  

Использование **applyInPandas** в PySpark имеет смысл в сценариях применения пользовательских функций к сгруппированным данным с использованием возможностей Pandas

**Особенности применения applyInPandas:**

1. **Работа с Pandas DataFrame** - группа данных передается в функцию как Pandas DataFrame, что позволяет использовать возможности Pandas для обработки данных
2. **Схема вывода** - необходимо явно определять схему вывода с помощью StructType
3. **Высокая производительность** - Pandas UDF может повысить производительность по сравнению с классическими UDF (данные обрабатываются в пакетах и используют возможности векторизованных операций Pandas)
4. **Обработка по группам** - данные автоматически группируются по указанному столбцу, и функция применяется отдельно к каждой группе
5. **Векторизация операций** - операции производятся над целыми колонками данных, что значительно ускоряет их выполнение по сравнению с классическими циклическими подходами
   
**Пример определения:**
```python
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import pandas as pd
# устаревший синтаксис @pandas_udf(IntegerType(), PandasUDFType.GROUPED_MAP)

# Определяется схема вывода
output_schema = StructType([
    StructField("gruoped_key", StringType(), True),
    StructField("agg_value", DoubleType(), True)
                    ])

# Определяется функция для применения
def calculate_fuction(pdf: pd.DataFrame):
    return pd.DataFrame({
        "key": [pdf['date_value'].iloc[0]],
        "value1_avg": [pdf['double_value1'].mean()]               
                       })

# Прменение средствами .applyInPandas
dfData.groupby("gruoped_key").applyInPandas(calculate_fuction, schema=output_schema).show()

``` 

**applyInPandas** используется, если:  
- необходимо применять сложные функции к каждой группе данных (например пользовательские агрегаты, фильтрации или трансформации, трудно реализуемыми стандартными функциями Spark) 
- логику обработки данных эффективнее реализовать с использованием Pandas
- требуется воспользоваться векторизованными операциями Pandas, которые могут быть более быстрыми чем операции в Spark


In [9]:
import pyarrow as pa
print(f"PyArrow version: {pa.__version__}")

PyArrow version: 22.0.0


### 1. Простая функция pandas_udf с applyInPandas

**Базовый пример**  
Простой пример для иллюстрации обработки и применения applyInPandas    
*Подсчет avg, max, min по группе*

In [10]:
# Определение схемы вывода*
output_schema = StructType([
    StructField("key", StringType(), True),
    StructField("value_avg", DoubleType(), True),
    StructField("value_min", DoubleType(), True),
    StructField("value_max", DoubleType(), True)
])

# Определение функции для применения
def calculate_agg(pdf: pd.DataFrame):
    """Среднее, минимальное и максимальное внутри группы"""
    return pd.DataFrame({
        "key": [pdf['date_val'].iloc[0]],
        "value_avg": [pdf['value_for_avg'].mean().round(6)],
        "value_min": [pdf['value_for_minmax'].min()],
        "value_max": [pdf['value_for_minmax'].max()]
                       })
    
# Применение функции с помощью applyInPandas
dfData.select(f_.col("date_val").cast("String")
             ,f_.col("first_double_val").alias("value_for_avg")
             ,f_.col("second_double_val").alias("value_for_minmax")) \
       .groupby("date_val") \
       .applyInPandas(calculate_agg, schema=output_schema) \
       .show(10)    

[Stage 7:>                                                          (0 + 1) / 1]

+-------------------+---------+---------+---------+
|                key|value_avg|value_min|value_max|
+-------------------+---------+---------+---------+
|2014-05-02 00:00:00|    1.005|    120.0|    130.0|
|2014-05-05 00:00:00|    2.085|    125.0|    135.0|
|2014-05-08 00:00:00|    3.165|    130.0|    140.0|
|2014-05-11 00:00:00|    4.245|    135.0|    145.0|
|2014-05-14 00:00:00|    5.325|    140.0|    150.0|
|2014-05-17 00:00:00|    6.405|    145.0|    155.0|
|2014-05-20 00:00:00|    7.485|    150.0|    160.0|
|2014-05-23 00:00:00|    8.565|    155.0|    165.0|
|2014-05-26 00:00:00|    9.645|    160.0|    170.0|
|2014-05-29 00:00:00|   10.725|    165.0|    175.0|
+-------------------+---------+---------+---------+
only showing top 10 rows



                                                                                

**Нормализация и ранжирование**
Еще один пример реализации внутренней обработки с вызовом методов **pandas**

In [11]:
# Определение схемы вывода
output_schema = StructType([
    StructField("key", StringType()),
    StructField("date_val", StringType()),
    StructField("value", IntegerType()),
    StructField("normalized_val", DoubleType()),
    StructField("rank", IntegerType())
])

# Функция обработки группы
def normalize_and_rank(pdf: pd.DataFrame):
    """Нормализация и ранжирование внутри группы"""
    # Нормализация (0-1)
    min_val = pdf['value'].min()
    max_val = pdf['value'].max()
    
    if max_val > min_val:
        pdf['normalized_val'] = round((pdf['value'] - min_val) / (max_val - min_val),6)
    else:
        pdf['normalized_val'] = 0.0
    
    # Ранжирование
    pdf['rank'] = pdf['value'].rank(ascending=False, method='dense').astype(int)
    
    return pdf

# Применение функции с помощью applyInPandas
dfData.select(f_.col("str_value").alias("key")
             ,f_.col("date_val").cast("String")
             ,f_.col("second_double_val").alias("value")) \
      .groupBy("key").applyInPandas(normalize_and_rank, schema=output_schema)\
      .orderBy("rank")\
      .show(10)

+--------+-------------------+-----+--------------+----+
|     key|           date_val|value|normalized_val|rank|
+--------+-------------------+-----+--------------+----+
|String_A|2014-06-13 00:00:00|  190|           1.0|   1|
|String_B|2014-06-13 00:00:00|  200|           1.0|   1|
|String_A|2014-06-10 00:00:00|  185|      0.928571|   2|
|String_B|2014-06-10 00:00:00|  195|      0.928571|   2|
|String_A|2014-06-07 00:00:00|  180|      0.857143|   3|
|String_B|2014-06-07 00:00:00|  190|      0.857143|   3|
|String_A|2014-06-04 00:00:00|  175|      0.785714|   4|
|String_B|2014-06-04 00:00:00|  185|      0.785714|   4|
|String_A|2014-06-01 00:00:00|  170|      0.714286|   5|
|String_B|2014-06-01 00:00:00|  180|      0.714286|   5|
+--------+-------------------+-----+--------------+----+
only showing top 10 rows



### 2. Интерполяция пропущенных значений

**Исходные данные**

In [12]:
df_interp = (titanic_train_csv
                .select( f_.col("Sex").cast("string").alias("series")
                        ,f_.col("PassengerId").alias("index")
                        ,f_.col("Age").alias("value") 
                       ))   

print("Age is null:", titanic_train_csv.where("Age is null").count())

Age is null: 177


**Функция pandas_udf для применения**

In [13]:
# Исходные данные
df_interp = (titanic_train_csv
                .select( f_.col("Sex").cast("string").alias("series")
                        ,f_.col("PassengerId").alias("index")
                        ,f_.col("Age").alias("value") 
                       ))   

titanic_train_csv.where("Age is null").count()

# Определение схемы вывода
schema_interp = StructType([
    StructField("series", StringType()),
    StructField("index", IntegerType()),
    StructField("value", DoubleType()),
    StructField("value_linear", DoubleType()),
    StructField("value_ffill", DoubleType()),
    StructField("value_bfill", DoubleType())
])

# Функция обработки
def interpolate_values(pdf):
    """Различные методы интерполяции пропущенных значений"""
    pdf = pdf.sort_values('index').reset_index(drop=True)
    # Линейная интерполяция
    pdf['value_linear'] = pdf['value'].interpolate(method='linear')
    # Forward fill
    pdf['value_ffill'] = pdf['value'].ffill()
    # Backward fill
    pdf['value_bfill'] = pdf['value'].bfill()
    
    return pdf

# Применение функции
result_interp = df_interp.groupBy("series").applyInPandas(interpolate_values, schema=schema_interp)

print("Age is null:", result_interp.where("value_linear is null").count())

result_interp.orderBy("series", "index").show(10)

Age is null: 0
+------+-----+-----+------------+-----------+-----------+
|series|index|value|value_linear|value_ffill|value_bfill|
+------+-----+-----+------------+-----------+-----------+
|female|    2| 38.0|        38.0|       38.0|       38.0|
|female|    3| 26.0|        26.0|       26.0|       26.0|
|female|    4| 35.0|        35.0|       35.0|       35.0|
|female|    9| 27.0|        27.0|       27.0|       27.0|
|female|   10| 14.0|        14.0|       14.0|       14.0|
|female|   11|  4.0|         4.0|        4.0|        4.0|
|female|   12| 58.0|        58.0|       58.0|       58.0|
|female|   15| 14.0|        14.0|       14.0|       14.0|
|female|   16| 55.0|        55.0|       55.0|       55.0|
|female|   19| 31.0|        31.0|       31.0|       31.0|
+------+-----+-----+------------+-----------+-----------+
only showing top 10 rows



### 3. Машинное обучение по группам
На примере DataSet "Titanic" (Логистическая регрессия - задача классификации):

**Задача:** Классификация (**Модель**:Логистическая регрессия)

**Dataset:** Titanic (tran: train.csv, test: test.csv)
  
**Таргет:** "Survived" (1 - выжившие)

### Целевая переменная

In [14]:
label_col = "Survived"

### DataSet Titanic - очистка данных и создание дополнительных фичей

**Функция подсчета простых статистик по DataSet**

In [15]:
def simpleDataStat(full: DataFrame, train: DataFrame, test: DataFrame):
    """ 
    Функция простой статистики DataSet для train, test и полного DataSet.
    Выводит на экран рассчитанные показатели count с учетом целевой переменной
    """
    
    full_cnt = full.count(); train_cnt = train.count(); test_cnt = test.count()
    
    print("Count data: FULL: ", full_cnt, f"({full_cnt/full_cnt*100}%)"
                      ,"TRAIN:", train_cnt, f"({round(train_cnt/full_cnt*100,2)}%)"
                      ,"TEST:", test_cnt, f"({round(test_cnt/full_cnt*100,2)}%)")    
    full.groupBy(f_.lit("full").alias("dataset"),"label").count()\
        .unionByName(train.groupBy("label",f_.lit("train").alias("dataset")).count())\
        .unionByName(test.groupBy("label",f_.lit("test").alias("dataset")).count())\
        .groupBy("label").pivot("dataset").max("count")\
        .show()

**Очистка и фичи**

In [16]:
# mapping титулов в поле name
titul_mapping = {
   "Mlle": "Miss", "Ms":  "Miss", "Dona":  "Mrs", "Mme":  "Mrs", "Lady": "Mrs", "Countess": "Mrs"
  ,"Rev":  "Mr",   "Col": "Mr",   "Major": "Mr",  'Capt': "Mr"
  ,"Jonkheer": "Sir", "Don":  "Sir"
  ,"Dr":   "Rare"}

# дополнительные фичи
ft_extra = [f_.when(f_.col("Age")<=15.0,15.0)\
              .when(f_.col("Age").isNull(),28)\
              .when(f_.col("Age") < 25,25)\
              .when(f_.col("Age") <= 34,34)\
              .when(f_.col("Age") <= 44,44).otherwise(55).alias("ft_cat_age")
      ,f_.regexp_extract(f_.col("Name"), r" ([A-Za-z]+)\.", 1).alias("ft_titul")
      ,(f_.col("SibSp") +f_.col("Parch") + 1).alias("ft_family_size")
      ,f_.substring(f_.col("Cabin"),1,1).alias("ft_class_cabin") ]

# Преобразование DataSet
traindf = titanic_train_csv.select(*titanic_train_csv.columns, *ft_extra).withColumnRenamed(label_col,"label")
testdf = titanic_test_csv.select(*titanic_test_csv.columns, *ft_extra).withColumnRenamed(label_col,"label")
titanicdf = traindf.unionByName(testdf)

# Статистика
simpleDataStat(full=titanicdf, train=traindf, test=testdf)

Count data: FULL:  1309 (100.0%) TRAIN: 891 (68.07%) TEST: 418 (31.93%)
+-----+----+----+-----+
|label|full|test|train|
+-----+----+----+-----+
|    1| 494| 152|  342|
|    0| 815| 266|  549|
+-----+----+----+-----+



### Обучение модели

In [17]:
from sklearn.linear_model import LogisticRegression
import pickle
from sklearn.preprocessing import OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer

**Метрики ACCURACY, PRECISION, RECALL, F1:**  
Расчетные значения:
- *TP* $(True Positive)$ — верно предсказанные положительные
- *TN* $(True Negative)$ — верно предсказанные отрицательные
- *FP* $(False Positive)$ — ложные срабатывания
- *FN* $(False Negative)$ — пропуски предсказания

Метрики:  

$ Accuracy = \frac{TP + TN}{TP + TN + FP + FN} $  

$ Precision = \frac{TP}{TP + FP} $  

$ Recall = \frac{TP}{TP + FN} $

$ F1 = \frac{2TP}{2TP + FP + FN} $ или ($ F1 = 2 \cdot \frac{Precision \cdot Recall}{Precision + Recall} $)

In [18]:
def calc_metric(df: DataFrame, group_column = f_.lit(1), info = None):
    """ 
    Функция расчета метрик ACCURACY, PRECISION, RECALL, F1.
    Возвращает DataFrame
    """    
    return df.groupBy(f_.lit(info).alias("info"), group_column)\
             .agg(
                 f_.sum(f_.col("prediction") * f_.col("label")).alias("TP")
                ,f_.sum(((f_.col("prediction")==0)&(f_.col("label")==0)).cast("integer")).alias("TN") 
                ,f_.sum(((f_.col("prediction")==1)&(f_.col("label")==0)).cast("integer")).alias("FP") 
                ,f_.sum(((f_.col("prediction")==0)&(f_.col("label")==1)).cast("integer")).alias("FN") 
                )\
            .select("info","TP", "TN", "FP", "FN" 
                   ,f_.expr("(TP+TN)/(TP+TN+FP+FN)").alias("ACCURACY")
                   ,f_.expr("TP/(TP+FP)").alias("PRECISION")        
                   ,f_.expr("TP/(TP+FN)").alias("RECALL")
                   ,f_.expr("2*TP/(2*TP+FP+FN)").alias("F1")
                   )

**Метрики ROC AUC и GINI** 

Реализованы средствами **pyspark.ml**  

$ ROC AUC $ расчитывается через BinaryClassificationEvaluator  

$ GINI = 2 \cdot roc_auc - 1 $  

**Пример:**
```python
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# DataSet после применения модели
predictions_df = df_result_train.select("label","probability")

# Оценьщик
evaluator = BinaryClassificationEvaluator(
    labelCol="label", 
    rawPredictionCol="rawPrediction",  # или probability, в зависимости от модели
    metricName="areaUnderROC"
)

roc_auc = evaluator.evaluate(predictions_df)
print(roc_auc,  2*roc_auc -1)
```

Где `predictions_df` — DataFrame с колонками:
   - `label` — истинный класс (0/1),
   - `rawPrediction` — выход модели (`probability`).

**evaluator для ROC AUC**

In [19]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(
    labelCol="label", 
    rawPredictionCol="probability", 
    metricName="areaUnderROC"
)

**pandas_udf функция обучения модели логистической регрессии** (для применение через applyInPandas)

In [20]:
# Выходная схема модели обучения
modelschema = StructType([StructField("group_key", StringType(), False), StructField("model_bytes", BinaryType(), False)])

# Функция обучения
def train_model(pdf: pd.DataFrame) -> pd.DataFrame:
    """
    Функция (pandas_udf) тренировки модели LogisticRegression. см.*
    Колонки 
        label - целевая переменная (с классми 1 или 0)
        group_key - колонка группировки. см.** 
    
    Внешние переменные:
        numfeatures  - числовые фичи
        catfeatures  - категориальные фичи
       
    Параметры:
        - pdf – тренировочный DataSet (train как pandas.DataFrame)
        
    Return: DataFrame с бинарными строками сериализованной модели.
            Согласно схеме: StructField("group_key", StringType()), StructField("model_bytes", BinaryType())

    *вызов через .applyInPandas
    **в случае нескольких групп - будут обучены несколько моделей (одна на каждую группу)       
    """
    # Если нет колонки group_key - обучение по всему DataSet
    pdf['group_key'] = pdf.get('group_key', "NO GROUP KEY")
    # Все фичи
    feature_cols =  numfeatures + catfeatures
    # Конвейер: импьютация чисел + OHE категорий + LogisticRegression
    pre = ColumnTransformer(
        transformers=[
            ("num", SimpleImputer(strategy="median"), numfeatures),
            ("cat", Pipeline([
                ("impute", SimpleImputer(strategy="most_frequent")),
                ("ohe", OneHotEncoder(handle_unknown="ignore"))
            ]), catfeatures)
        ],
        remainder="drop"
    )

    pipe = Pipeline([
        ("pre", pre),
        ("clf", LogisticRegression(max_iter=400, solver="liblinear"))
    ])

    # Фильтр строк без таргета
    pdf = pdf.dropna(subset=["label"])

    X = pdf[feature_cols]
    y = pdf["label"].astype(int)
    
    # Обучение
    pipe.fit(X, y)

    #Группа
    print("GROUP:", [pdf['group_key'].iloc[0]])
    # Коэффициенты
    print("coef:", pipe[-1].coef_)
    print("bias:", pipe[-1].intercept_)
    print("=================================================")    
    
    # Модель
    return pd.DataFrame({
        "group_key": [pdf['group_key'].iloc[0]]
       ,"model_bytes": [pickle.dumps(pipe)]
    })

**Функция применения модели**

In [21]:
# Output Schena
pred_schema = StructType([
    StructField("PassengerId", IntegerType(), False),
    StructField("probability", DoubleType(), False),
    StructField("prediction", IntegerType(), False), ])

# Функция применения модели
def inference_model(pdf: pd.DataFrame) -> pd.DataFrame:
    """
    Функция (pandas_udf) применения модели
    Колонка probability - колонка с рассчитанной вероятностью
    Колонка prediction - колонка предсказания (1,0 согласно trashhold (переменная pred) )

    Внешние переменные:
        numfeatures  - числовые фичи
        catfeatures  - категориальные фичи
     
    Параметры:
        - pdf – DataSet для inference
        
    Return: DataFrame с бинарными строками сериализованной модели.
            Согласно схеме:     
                StructField("PassengerId", IntegerType()) - идентификатор строки
                StructField("probability", DoubleType()) - вероятность
                StructField("prediction", IntegerType()) - предсказание
    """    
    pipe = pickle.loads(pdf["model_bytes"].iloc[0])
    X = pdf[list(numfeatures + catfeatures)]
    proba = pipe.predict_proba(X)[:, 1]
    pred = (proba >= 0.50).astype(int)

    return pd.DataFrame({
        "PassengerId": pdf["PassengerId"].astype(int).values,
        "probability": proba,
        "prediction": pred
    })

**Отобранные фичи**

In [22]:
numfeatures = ["ft_family_size","Fare","Pclass"] # Числовые фичи
catfeatures = ["SibSp","Parch", "Sex","ft_titul","ft_class_cabin","ft_cat_age" ] # Категориальные фичи

**Тренировка модели**

In [23]:
# Train
modeldf = (
    traindf
        .select(*(numfeatures + catfeatures), "label")
        .groupBy(f_.lit("1").alias("dummy"))      # одна группа = весь датасет
        .applyInPandas(train_model, schema=modelschema)
).cache()
# modeldf содержит строку с бинарной моделью
modeldf.show(truncate=55)

coef: [[-0.43665192  0.00563984 -0.63743176  0.47946149  0.69793652  0.97560474
  -0.42705458 -0.19187886 -0.42390711 -0.10807391  0.10920399  0.23404936
   0.49504337  0.72617025 -0.48881061  0.06107622 -0.13464429  1.43087475
  -0.42878646 -0.31381195  0.14790219  0.0433201  -0.37754905 -0.22544841
  -0.34812029  0.15898499  0.10493882  2.03784885  0.09183242  0.14790572
   0.0651606  -0.95787665  0.64695358  0.16676653 -0.83626821  0.44954904
   0.13145218  0.1618445  -0.12132718  0.80073114  0.98443457  0.2261824
  -0.59978954 -0.18985501 -0.39158477  0.85598794  0.09643507  0.14657049
   0.45380304  0.03227922 -0.58298746]]
bias: [1.00208829]


+------------+-------------------------------------------------------+
|   group_key|                                            model_bytes|
+------------+-------------------------------------------------------+
|NO GROUP KEY|[80 04 95 7B 0D 00 00 00 00 00 00 8C 10 73 6B 6C 65 ...|
+------------+-------------------------------------------------------+



                                                                                

### Применение модели

**Применение модели к train DataSet через applyInPandas**

In [24]:
train_with_model = traindf.crossJoin(modeldf)
pred_train_df = (train_with_model
                    .groupBy(f_.lit(1).alias("dummy"))  # одной группой, как и при обучении
                    .applyInPandas(inference_model, schema=pred_schema)
                )
df_result_train = pred_train_df.join(train_with_model, ["PassengerId"])

df_result_train.show(5, truncate=15)

+-----------+---------------+----------+-----+------+---------------+------+----+-----+-----+----------+-------+-----+--------+----------+--------+--------------+--------------+------------+---------------+
|PassengerId|    probability|prediction|label|Pclass|           Name|   Sex| Age|SibSp|Parch|    Ticket|   Fare|Cabin|Embarked|ft_cat_age|ft_titul|ft_family_size|ft_class_cabin|   group_key|    model_bytes|
+-----------+---------------+----------+-----+------+---------------+------+----+-----+-----+----------+-------+-----+--------+----------+--------+--------------+--------------+------------+---------------+
|        148|0.6360269871...|         1|    0|     3|"Ford, Miss....|female| 9.0|    2|    2|W./C. 6608| 34.375| NULL|       S|      15.0|    Miss|             5|          NULL|NO GROUP KEY|[80 04 95 7B...|
|        463|0.4374759515...|         0|    0|     1|Gee, Mr. Art...|  male|47.0|    0|    0|    111320|   38.5|  E63|       S|      55.0|      Mr|             1|          

**Применение модели к test DataSet через applyInPandas**

In [25]:
# Для каждой строки необходимо добавить колонку с моделью
test_with_model = testdf.crossJoin(modeldf)

pred_test_df = (
    test_with_model
        .groupBy(f_.lit(1).alias("dummy"))  # одной группой, как и при обучении
        .applyInPandas(inference_model, schema=pred_schema)
)
df_result_test = pred_test_df.join(test_with_model, ["PassengerId"])

df_result_test.show(5, truncate=15)

+-----------+---------------+----------+------+---------------+------+----+-----+-----+-------+-------+-----+--------+-----+----------+--------+--------------+--------------+------------+---------------+
|PassengerId|    probability|prediction|Pclass|           Name|   Sex| Age|SibSp|Parch| Ticket|   Fare|Cabin|Embarked|label|ft_cat_age|ft_titul|ft_family_size|ft_class_cabin|   group_key|    model_bytes|
+-----------+---------------+----------+------+---------------+------+----+-----+-----+-------+-------+-----+--------+-----+----------+--------+--------------+--------------+------------+---------------+
|        892|0.0787098527...|         0|     3|Kelly, Mr. J...|  male|34.5|    0|    0| 330911| 7.8292| NULL|       Q|    0|      44.0|      Mr|             1|          NULL|NO GROUP KEY|[80 04 95 7B...|
|        893|0.5414954950...|         1|     3|Wilkes, Mrs....|female|47.0|    1|    0| 363272|    7.0| NULL|       S|    1|      55.0|     Mrs|             2|          NULL|NO GROUP K

### Расчет метрик

In [26]:
calc_metric(df=df_result_train, info="TRAN").unionByName(calc_metric(df=df_result_test,info = "TEST")).show()

[Stage 109:>                (0 + 1) / 1][Stage 111:>                (0 + 1) / 1]

+----+---+---+---+---+------------------+------------------+------------------+------------------+
|info| TP| TN| FP| FN|          ACCURACY|         PRECISION|            RECALL|                F1|
+----+---+---+---+---+------------------+------------------+------------------+------------------+
|TRAN|265|485| 64| 77|0.8417508417508418|0.8054711246200608|0.7748538011695907| 0.789865871833085|
|TEST|147|241| 25|  5|0.9282296650717703|0.8546511627906976|0.9671052631578947|0.9074074074074074|
+----+---+---+---+---+------------------+------------------+------------------+------------------+



                                                                                

In [27]:
roc_auc_train = evaluator.evaluate(df_result_train.select("label","probability"))
roc_auc_test = evaluator.evaluate( df_result_test.select("label","probability"))

print("train: ROC AUC:",roc_auc_train, "GINI:", 2* roc_auc_train -1)
print("test: ROC AUC:",roc_auc_test, "GINI:", 2* roc_auc_test -1)

train: ROC AUC: 0.8844656419433531 GINI: 0.7689312838867062
test: ROC AUC: 0.9469232291254451 GINI: 0.8938464582508903


### Групповая модель (одна модель на группу)

**Разбиение DataSet на train и test с учетом группировки по PClass**  

Класс каюты:  
 - 1 — Первый;
 - 2 — Второй;
 - 3 — Третий

Строится 3 модели выживаемости в каждом классе каюты, для каждого класса  по одной


In [28]:
df_with_strata = titanicdf.where("pclass is not null") \
                          .withColumn("strata",f_.concat_ws("_", f_.col("pclass"), f_.col("label").cast("string")))

test_fraction = 0.25

# одна и та же доля для всех страт
strata_values = [row["strata"] for row in df_with_strata.select("strata").distinct().collect()]
fractions = {s: test_fraction for s in strata_values}

group_testdf = df_with_strata.stat.sampleBy("strata", fractions, seed=42).drop("strata")

group_traindf = df_with_strata.join(group_testdf.select("PassengerId"), on="PassengerId", how="left_anti").drop("strata")

**Статистика**

In [29]:
for pclass in ["1", "2", "3"]: 
    print("PClass:", pclass)
    simpleDataStat(full=titanicdf.where(f"PClass = '{pclass}'")
                  ,train=group_traindf.where(f"PClass = '{pclass}'")
                  ,test=group_testdf.where(f"PClass = '{pclass}'"))

PClass: 1
Count data: FULL:  323 (100.0%) TRAIN: 236 (73.07%) TEST: 87 (26.93%)
+-----+----+----+-----+
|label|full|test|train|
+-----+----+----+-----+
|    1| 186|  52|  134|
|    0| 137|  35|  102|
+-----+----+----+-----+

PClass: 2
Count data: FULL:  277 (100.0%) TRAIN: 213 (76.9%) TEST: 64 (23.1%)
+-----+----+----+-----+
|label|full|test|train|
+-----+----+----+-----+
|    1| 117|  34|   83|
|    0| 160|  30|  130|
+-----+----+----+-----+

PClass: 3
Count data: FULL:  709 (100.0%) TRAIN: 526 (74.19%) TEST: 183 (25.81%)
+-----+----+----+-----+
|label|full|test|train|
+-----+----+----+-----+
|    1| 191|  55|  136|
|    0| 518| 128|  390|
+-----+----+----+-----+



**Отобранные фичи**

In [30]:
numfeatures = ["Age", "ft_family_size", "Fare"] # Числовые Фичи
catfeatures = ["Sex","SibSp","Parch"] # Категориальные Фичи

**Тренировка модели**

In [31]:
group_modelschema = StructType([StructField("group_key", IntegerType(), False), StructField("model_bytes", BinaryType(), False)])
# Train
group_modeldf = (
    group_traindf.where("pclass is not null")
                 .withColumn("group_key",f_.col("pclass"))
                 .select(*(numfeatures+catfeatures), "label","group_key")
                 .groupBy(f_.col("group_key").alias("group_key"))      # по группам pclass
                 .applyInPandas(train_model, schema=group_modelschema)).cache()

# modeldf содержит строку с бинарной моделью
group_modeldf.show(truncate=55)

GROUP: [np.int32(2)]
coef: [[-0.04671291  0.54301482 -0.01535855  2.33338555 -2.00327483  0.58300092
  -0.11560798 -0.18177719  0.04449498 -0.19973869  0.51365234  0.00366198
   0.01253509]]
bias: [0.33011072]
GROUP: [np.int32(3)]
coef: [[-0.02745026 -0.33495111  0.01365803  1.60498287 -1.34907181  0.13533635
   0.40602761  0.7768091  -1.04849935  0.23602821  0.14073219 -0.39052305
  -0.35157364  0.19066159 -0.04132843 -0.05619206  0.35764523  0.19209625
  -0.01983033 -0.01556757]]
bias: [0.25591106]
coef: [[-0.02644635  0.33416936 -0.00265923  2.38591004 -1.80073118  0.21935206
  -0.01591867  0.35516746  0.02657801  0.8357008   0.11601491 -0.10303421
  -0.11890431 -0.14459834]]
bias: [0.58517886]


+---------+-------------------------------------------------------+
|group_key|                                            model_bytes|
+---------+-------------------------------------------------------+
|        1|[80 04 95 A8 0A 00 00 00 00 00 00 8C 10 73 6B 6C 65 ...|
|        3|[80 04 95 F3 0A 00 00 00 00 00 00 8C 10 73 6B 6C 65 ...|
|        2|[80 04 95 AD 0A 00 00 00 00 00 00 8C 10 73 6B 6C 65 ...|
+---------+-------------------------------------------------------+



                                                                                

### Применение модели

**Применение модели к train DataSet через applyInPandas**

In [32]:
train_w_group_model = group_traindf.join(group_modeldf,[f_.col("group_key")==f_.col("pclass")])

pred_group_train_df = (
    train_w_group_model
        .groupBy(f_.col("Sex").alias("group_key"))  # по группам pclass, как и при обучении
        .applyInPandas(inference_model, schema=pred_schema))

df_result_group_train = pred_group_train_df.join(train_w_group_model, ["PassengerId"])

df_result_group_train.show(5, truncate=15)

+-----------+---------------+----------+-----+------+---------------+------+----+-----+-----+-----------+------+-----+--------+----------+--------+--------------+--------------+---------+---------------+
|PassengerId|    probability|prediction|label|Pclass|           Name|   Sex| Age|SibSp|Parch|     Ticket|  Fare|Cabin|Embarked|ft_cat_age|ft_titul|ft_family_size|ft_class_cabin|group_key|    model_bytes|
+-----------+---------------+----------+-----+------+---------------+------+----+-----+-----+-----------+------+-----+--------+----------+--------+--------------+--------------+---------+---------------+
|        463|0.0825133947...|         0|    0|     1|Gee, Mr. Art...|  male|47.0|    0|    0|     111320|  38.5|  E63|       S|      55.0|      Mr|             1|             E|        1|[80 04 95 A8...|
|        833|0.0993543809...|         0|    0|     3| Saad, Mr. Amin|  male|NULL|    0|    0|       2671|7.2292| NULL|       C|      28.0|      Mr|             1|          NULL|       

**Применение модели к test DataSet через applyInPandas**

In [33]:
test_w_group_model = group_testdf.join(group_modeldf,[f_.col("group_key")==f_.col("pclass")])

pred_group_test_df = (
    test_w_group_model
        .groupBy(f_.col("Sex").alias("group_key"))  # одной группой, как и при обучении
        .applyInPandas(inference_model, schema=pred_schema))

df_result_group_test = pred_group_test_df.join(test_w_group_model, ["PassengerId"])

df_result_group_test.show(5, truncate=15)

+-----------+---------------+----------+-----+------+---------------+------+----+-----+-----+----------+-------+-----+--------+----------+--------+--------------+--------------+---------+---------------+
|PassengerId|    probability|prediction|label|Pclass|           Name|   Sex| Age|SibSp|Parch|    Ticket|   Fare|Cabin|Embarked|ft_cat_age|ft_titul|ft_family_size|ft_class_cabin|group_key|    model_bytes|
+-----------+---------------+----------+-----+------+---------------+------+----+-----+-----+----------+-------+-----+--------+----------+--------+--------------+--------------+---------+---------------+
|        148|0.7584245442...|         1|    0|     3|"Ford, Miss....|female| 9.0|    2|    2|W./C. 6608| 34.375| NULL|       S|      15.0|    Miss|             5|          NULL|        3|[80 04 95 F3...|
|        471|0.0993798048...|         0|    0|     3|Keefe, Mr. A...|  male|NULL|    0|    0|    323592|   7.25| NULL|       S|      28.0|      Mr|             1|          NULL|       

### Расчет метрик

In [34]:
for pclass in ["1", "2", "3"]:
    print("pclass:", pclass)
    calc_metric(df=df_result_group_train.where(f"Pclass = '{pclass}'"), info="TRAN")\
        .unionByName(calc_metric(df=df_result_group_test.where(f"Pclass = '{pclass}'"),info = "TEST")).show()

pclass: 1
+----+---+---+---+---+------------------+------------------+------------------+------------------+
|info| TP| TN| FP| FN|          ACCURACY|         PRECISION|            RECALL|                F1|
+----+---+---+---+---+------------------+------------------+------------------+------------------+
|TRAN|102| 91| 11| 32|0.8177966101694916|0.9026548672566371|0.7611940298507462|0.8259109311740891|
|TEST| 41| 35|  0| 11|0.8735632183908046|               1.0|0.7884615384615384|0.8817204301075269|
+----+---+---+---+---+------------------+------------------+------------------+------------------+

pclass: 2
+----+---+---+---+---+------------------+---------+------------------+------------------+
|info| TP| TN| FP| FN|          ACCURACY|PRECISION|            RECALL|                F1|
+----+---+---+---+---+------------------+---------+------------------+------------------+
|TRAN| 69|124|  6| 14|0.9061032863849765|     0.92|0.8313253012048193|0.8734177215189873|
|TEST| 30| 30|  0|  4|   

In [35]:
for pclass in ["1", "2", "3"]:
    predictions_train = df_result_train.where(f"Pclass = '{pclass}'").select("label","probability")
    predictions_test = df_result_test.where(f"Pclass = '{pclass}'").select("label","probability")

    roc_auc_train = evaluator.evaluate(predictions_train)
    roc_auc_test = evaluator.evaluate(predictions_test)
    print(f"Pclass = {pclass}")
    print(f"train: ROC AUC: {roc_auc_train}", "GINI:", 2* roc_auc_train -1)
    print(f"test:  ROC AUC:",roc_auc_test,  "GINI:", 2* roc_auc_test -1)

Pclass = 1
train: ROC AUC: 0.8798713235294117 GINI: 0.7597426470588233
test:  ROC AUC: 0.9666666666666667 GINI: 0.9333333333333333
Pclass = 2
train: ROC AUC: 0.9262945846664299 GINI: 0.8525891693328598
test:  ROC AUC: 0.9814814814814815 GINI: 0.962962962962963
Pclass = 3
train: ROC AUC: 0.8048816300713839 GINI: 0.6097632601427678
test:  ROC AUC: 0.9083904109589045 GINI: 0.816780821917809


### 4. Создание фичей для временных рядов

**Исходные данные**

In [36]:
np.random.seed(42)
# Данные продаж по часам
hourly_data = []
for store in ["Store_A", "Store_B"]:
    base_sales = 100 if store == "Store_A" else 150
    for hour in range(0, 24 * 7, 2):  # Неделя данных
        timestamp = datetime(2024, 1, 1) + timedelta(hours=hour)
        
        # Имитируем паттерны: пик днем, спад ночью
        hour_of_day = timestamp.hour
        day_pattern = 1.5 if 9 <= hour_of_day <= 18 else 0.7
        
        # Выходные имеют другой паттерн
        weekend_pattern = 1.2 if timestamp.weekday() >= 5 else 1.0
        
        sales = round(base_sales * day_pattern * weekend_pattern + np.random.randn() * 10,4)
        hourly_data.append((store, timestamp, float(sales)))

df_hourly = spark.createDataFrame(hourly_data, ["store", "timestamp", "sales"])

**Схема результата**

In [37]:
schema_features = StructType([
    StructField("store", StringType()),
    StructField("timestamp", TimestampType()),
    StructField("sales", DoubleType()),
    # Временные признаки
    StructField("hour", IntegerType()),
    StructField("weekday", IntegerType()),
    StructField("is_weekend", IntegerType()),
    StructField("mon_day", IntegerType()),
    # Лаговые признаки
    StructField("lag_24h", DoubleType()),
    # Скользящие статистики
    StructField("roll_mean_24h", DoubleType()),
    StructField("roll_std_24h", DoubleType()),
    StructField("roll_min_24h", DoubleType()),
    StructField("roll_max_24h", DoubleType()),
    # Экспоненциальное скользящее среднее
    StructField("ema_12h", DoubleType()),
    StructField("ema_24h", DoubleType()),
    # Изменения
    StructField("diff_2h", DoubleType()),
    StructField("pct_chng_1h", DoubleType()),
    StructField("pct_chng_24h", DoubleType()),
    # Кумулятивные признаки
    StructField("cumsum", DoubleType()),
    StructField("cummax", DoubleType()),
    StructField("cummin", DoubleType())
])

**Функция для применения через .applyInPandas**

In [38]:
def create_timeseries_features(pdf):
    """Создание признаков для временных рядов"""
    # Сортируем по времени
    pdf = pdf.sort_values('timestamp').reset_index(drop=True)
    
    # ===== ВРЕМЕННЫЕ ПРИЗНАКИ =====
    pdf['hour'] = pdf['timestamp'].dt.hour
    pdf['weekday'] = pdf['timestamp'].dt.dayofweek
    pdf['is_weekend'] = (pdf['weekday'] >= 5).astype(int)
    pdf['mon_day'] = pdf['timestamp'].dt.day
    
    # ===== ЛАГОВЫЕ ПРИЗНАКИ =====
    pdf['lag_24h'] = pdf['sales'].shift(24)
    
    # ===== СКОЛЬЗЯЩИЕ СТАТИСТИКИ =====
    # Скользящее среднее
    pdf['roll_mean_24h'] = round(pdf['sales'].rolling(window=24, min_periods=1).mean(),10)
    
    # Скользящее стандартное отклонение
    pdf['roll_std_24h'] = round(pdf['sales'].rolling(window=24, min_periods=1).std(),10)
    
    # Скользящие min/max
    pdf['roll_min_24h'] = round(pdf['sales'].rolling(window=24, min_periods=1).min(),10)
    pdf['roll_max_24h'] = round(pdf['sales'].rolling(window=24, min_periods=1).max(),10)
    
    # ===== ЭКСПОНЕНЦИАЛЬНОЕ СКОЛЬЗЯЩЕЕ СРЕДНЕЕ =====
    pdf['ema_12h'] = round(pdf['sales'].ewm(span=12, adjust=False).mean(),10)
    pdf['ema_24h'] = round(pdf['sales'].ewm(span=24, adjust=False).mean(),10)
    
    # ===== ИЗМЕНЕНИЯ =====
    # Абсолютное изменение
    pdf['diff_2h'] = round(pdf['sales'].diff(2),3)
    
    # Процентное изменение
    pdf['pct_chng_1h'] = round(pdf['sales'].pct_change(1) * 100,3)
    pdf['pct_chng_24h'] = round(pdf['sales'].pct_change(24) * 100,3)
    
    # ===== КУМУЛЯТИВНЫЕ ПРИЗНАКИ =====
    pdf['cumsum'] = round(pdf['sales'].cumsum(),3)
    pdf['cummax'] = round(pdf['sales'].cummax(),3)
    pdf['cummin'] = round(pdf['sales'].cummin(),3)
    
    return pdf

**Применение через .applyInPandas**

In [39]:
result_features = df_hourly.groupBy("store").applyInPandas(
    create_timeseries_features, 
    schema=schema_features
)

result_features.orderBy("store", "timestamp") \
.withColumn("timestamp", f_.col("timestamp").cast("date"))\
.show(10, truncate=15, vertical = False)

+-------+----------+--------+----+-------+----------+-------+-------+--------------+-------------+------------+------------+--------------+--------------+-------+-----------+------------+--------+-------+------+
|  store| timestamp|   sales|hour|weekday|is_weekend|mon_day|lag_24h| roll_mean_24h| roll_std_24h|roll_min_24h|roll_max_24h|       ema_12h|       ema_24h|diff_2h|pct_chng_1h|pct_chng_24h|  cumsum| cummax|cummin|
+-------+----------+--------+----+-------+----------+-------+-------+--------------+-------------+------------+------------+--------------+--------------+-------+-----------+------------+--------+-------+------+
|Store_A|2024-01-01| 74.9671|   0|      0|         0|      1|   NULL|       74.9671|         NULL|     74.9671|     74.9671|       74.9671|       74.9671|   NULL|       NULL|        NULL|  74.967| 74.967|74.967|
|Store_A|2024-01-01| 68.6174|   2|      0|         0|      1|   NULL|      71.79225| 4.4899159285|     68.6174|     74.9671| 73.9902230769|     74.45912

**Статистика по признакам**

In [40]:
result_features.select(
    "store", "sales", "roll_mean_24h", "ema_24h", "pct_chng_24h"
).describe().show()

+-------+-------+-----------------+------------------+------------------+------------------+
|summary|  store|            sales|     roll_mean_24h|           ema_24h|      pct_chng_24h|
+-------+-------+-----------------+------------------+------------------+------------------+
|  count|    168|              168|               168|               168|               120|
|   mean|   NULL|136.1564898809523|128.90968077511852|126.31511784454047|10.186474999999998|
| stddev|   NULL|63.21476668527007|29.668561025060946|30.947864059345694|19.182998324611788|
|    min|Store_A|          50.4033|          71.79225|         74.459124|           -27.747|
|    max|Store_B|          281.586|          188.1172|    199.9060811578|            90.275|
+-------+-------+-----------------+------------------+------------------+------------------+



## 5. Вызов pandas_udf с дополнительными параметрами (на примере расчета PSI)

**PSI - Population Stability Index**  
Метрика для определения изменения структуры (распределения) признака или скоринговой модели между двумя выборками/периодами.

$ PSI = \sum_{i=1}^{n} (actual_i - expected_i) \times \ln\left(\frac{actual_i}{expected_i}\right) $

где:  
-  $actual_i$ - доля наблюдений в $i$-й группе в (бине) текущей выборке
-  $expected_i$ - доля наблюдений в $i$-й группе (бине) в базовой выборке
-  $n$ - количество бинов

**Интерпретация:**  

- **PSI < 0.1** – данные стабильны
- **0.1 ≤ PSI < 0.25** – умеренный сдвиг, нужен мониторинг или настройка
- **PSI ≥ 0.25** – сильный сдвиг, требуется разбор причин, возможено переобучение модели

**Исходные данные**

In [41]:
data = [
    (0, 0.43, 0.38, 0.43), (0, 0.36, 0.43, 0.52), (0, 0.53, 0.56, 0.65), (0, 0.56, 0.60, 0.55), (0, 0.59, 0.62, 0.56),
    (1, 0.42, 0.30, 0.38), (1, 0.35, 0.37, 0.47), (1, 0.36, 0.45, 0.56), (1, 0.51, 0.55, 0.61), (1, 0.52, 0.56, 0.61),
    (2, 0.36, 0.47, 0.59), (2, 0.34, 0.33, 0.65), (2, 0.35, 0.43, 0.68), (2, 0.36, 0.45, 0.69),
    (3, 0.72, 0.71, 0.89), (3, 0.56, 0.63, 0.87), (3, 0.15, 0.28, 0.63)
]
columns = ["period_no", "value_1", "value_2", "value_3"]
df_for_psi = spark.createDataFrame(data, columns)

**Функция для применения через applyInPandas**

In [42]:
def calculate_psi(pdf: pd.DataFrame, period_col='period_no', actual_period_value=0, bins=9) -> pd.DataFrame:
    """
    Рассчитывает PSI для каждой переменной (value_1, value_2, value_3)
    относительно актуального периода (period_no =  0)
    """
    
    def compute_psi(actual, expected):
        """
        Вычисляет PSI между двумя распределениями
        """
        # Удаляем пропуски
        actual_clean = actual.dropna()
        expected_clean = expected.dropna()
        
        if len(actual_clean) == 0 or len(expected_clean) == 0:
            return np.nan
        
        # Определяем границы бинов на основе актуального периода
        _, bin_edges = np.histogram(actual_clean, bins=bins)
        
        # Распределяем данные по бинам
        actual_counts, _ = np.histogram(actual_clean, bins=bin_edges)
        expected_counts, _ = np.histogram(expected_clean, bins=bin_edges)
        
        # Вычисляем пропорции (добавляем малое значение для избежания деления на 0)
        actual_pct = (actual_counts + 0.0001) / len(actual_clean)
        expected_pct = (expected_counts + 0.0001) / len(expected_clean)
        
        # Формула PSI: sum((actual% - expected%) * ln(actual% / expected%))
        psi = np.sum((actual_pct - expected_pct) * np.log(actual_pct / expected_pct))
        
        return psi
    
    # Получаем данные актуального периода
    actual_period = pdf[pdf[period_col] == actual_period_value]
    
    if actual_period.empty:
        return pd.DataFrame(columns=[period_col, 'variable', 'psi'])
    
    results = []
    
    # Для каждого expected периода
    for period in pdf[period_col].unique():
        if period == actual_period_value:
            continue
            
        expected_period = pdf[pdf[period_col] == period]
        
        # Рассчитываем PSI для каждой переменной
        for col in ['value_1', 'value_2', 'value_3']:
            psi_value = compute_psi(
                actual_period[col], 
                expected_period[col]
            )
            
            results.append({
                period_col: str(period),
                'variable': col,
                'psi': psi_value
            })
    
    return pd.DataFrame(results)

**Применение функции**

In [43]:
result_df = df_for_psi.groupby().applyInPandas(lambda pdf: calculate_psi(pdf, bins = 1), schema="period_no string, variable string, psi double")

result_df.show()

+---------+--------+-------------------+
|period_no|variable|                psi|
+---------+--------+-------------------+
|        1| value_1|0.04462771028534144|
|        1| value_2| 0.2043249163152812|
|        1| value_3|0.04462771028534144|
|        2| value_1| 0.3465551252190449|
|        2| value_2|0.07191574652480093|
|        2| value_3| 0.3465551252190449|
|        3| value_1| 0.7323402152146065|
|        3| value_2|  10.30883520747549|
|        3| value_3| 0.7323402152146065|
+---------+--------+-------------------+



## PANDAS_UDF.GROUPED_AGG Example

PANDAS_UDF.GROUPED_AGG - тип pandas UDF в PySpark, который используется для агрегаций по группам (аналог groupBy().agg(...)), но с возможностью писать агрегирующую функцию на pandas, а не на встроенных функциях Spark.


**Особенности GROUPED_MAP UDF:**

1. **Работа с Pandas DataFrame** - группа данных передается в функцию как Pandas DataFrame, что позволяет использовать возможности Pandas для обработки данных
2. **Обработка по группам** - возможность использования нестандартной агрегирующей функции, которой нет в Spark (медиана, percentiles по особым правилам, кастомный скор, PSI и т.п.).
   
**Пример определения:**
```python
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import pandas as pd

@pandas_udf("double")
# Определяется функция для применения
def calculate_fuction(pdf: pd.DataFrame) -> float:
    return s.mean()

# Применение средствами .applyInPandas
dfData.groupby("grp").agg(calculate_fuction(F.col("value")).alias("mean_value"))

``` 

## 1. Расчет взвешенного среднего

In [44]:
@pandas_udf("double")
def weighted_average(values: pd.Series, weights: pd.Series) -> float:
    return (values * weights).sum() / weights.sum()

# Использование
df = spark.createDataFrame([
    ("A", 100, 0.3),
    ("A", 200, 0.7),
    ("B", 150, 0.5),
    ("B", 250, 0.5)
], ["category", "value", "weight"])

df.groupBy("category").agg(weighted_average(df.value, df.weight).alias("weighted_avg")).show()

+--------+------------+
|category|weighted_avg|
+--------+------------+
|       A|       170.0|
|       B|       200.0|
+--------+------------+



## 2. Расчет перцентилей

In [45]:
@pandas_udf("double")
def percentile_95(values: pd.Series) -> float:
    return values.quantile(0.95)

# Использование
df = spark.createDataFrame([
    ("Product1", 100),
    ("Product1", 200),
    ("Product1", 300),
    ("Product2", 150),
    ("Product2", 250)
], ["product", "price"])

df.groupBy("product").agg(percentile_95(df.price).alias("p95_price")).show()

+--------+---------+
| product|p95_price|
+--------+---------+
|Product1|    290.0|
|Product2|    245.0|
+--------+---------+



## 3. Коэффициент вариации (CV)

In [46]:
@pandas_udf("double")
def coefficient_of_variation(values: pd.Series) -> float:
    return (values.std() / values.mean()) * 100 if values.mean() != 0 else 0.0

# Использование
df = spark.createDataFrame([
    ("Store1", 100),
    ("Store1", 110),
    ("Store1", 105),
    ("Store2", 200),
    ("Store2", 300)
], ["store", "sales"])

df.groupBy("store").agg(coefficient_of_variation(df.sales).alias("cv_percent")).show()

+------+------------------+
| store|        cv_percent|
+------+------------------+
|Store1| 4.761904761904762|
|Store2|28.284271247461902|
+------+------------------+



## Example cogroup c PANDAS_UDF

**cogroup** позволяет обрабатывать две группы данных (два Spark DataFrame) одновременно средствами .applyInPandas


### 1. Сравнение данных двух периодов  
Например, подсчитет процентного изменения среднего значения фичей между текуцим годом и предыдущим (текуцим годом - 1)

$$
growth\_pct = \frac{\text{avg}(prev\_value) - \text{avg}(last\_value)}{\text{avg}(prev\_value)} 
$$


**Исходные данные**

In [47]:
# Данные за два периода
df_pr = spark.createDataFrame([
    ("feature1", "2024-01", 0.7), ("feature1", "2024-02", 3.25), ("feature1", "2024-03", 0.4),
    ("feature2", "2024-01", 160.0), ("feature2", "2024-02", 170.0),
], ["feature", "period", "value"]) # Значение фичей за предыдущий год

df_cr = spark.createDataFrame([
    ("feature1", "2024-01", 1.0), ("feature1", "2024-02", 2.05),
    ("feature2", "2024-01", 195.0),
], ["feature", "period", "value"]) # Значение фичей за текущий год

**Схема результата**

In [48]:
# Схема результата
schema = StructType([
    StructField("feature", StringType(), True),
    StructField("avg_prev_year", DoubleType(), True),
    StructField("avg_last_year", DoubleType(), True),
    StructField("growth_pct", DoubleType(), True)
])

**Функция pandas_udf для применения через cogroup**

In [49]:
def compare_periods(pdf_prev: pd.DataFrame, pdf2: pd.DataFrame) -> pd.DataFrame:
    """ Функция сравнения данных двух DataFrame. Определяет процент изменения среднего значения зв период """        
    if pdf_prev.empty or pdf2.empty:
        return pd.DataFrame(columns=["feature", "avg_prev_year", "avg_last_year", "growth_pct"])
    
    feature = pdf_prev['feature'].iloc[0]
    avg_prev_year = pdf_prev['value'].mean()
    avg_last_year = pdf2['value'].mean()
    growth = ((avg_last_year - avg_prev_year) / avg_prev_year) * 100 if avg_prev_year != 0 else 0
    
    return pd.DataFrame([[feature, round(avg_prev_year,6), round(avg_last_year,6), round(growth,2)]], 
                        columns=["feature", "avg_prev_year", "avg_last_year", "growth_pct"])

**Применение функции**

In [50]:
df_pr.groupBy("feature").cogroup(df_cr.groupBy("feature")).applyInPandas(compare_periods, schema=schema).show()

+--------+-------------+-------------+----------+
| feature|avg_prev_year|avg_last_year|growth_pct|
+--------+-------------+-------------+----------+
|feature1|         1.45|        1.525|      5.17|
|feature2|        165.0|        195.0|     18.18|
+--------+-------------+-------------+----------+



## 2. Расчёт корреляции между данными двух DataFrame 

**Исходные данные**

In [51]:
# Данные: клики и покупки
clicks_df = spark.createDataFrame([
    ("User1", "2024-01-01", 10),
    ("User1", "2024-01-02", 15),
    ("User2", "2024-01-01", 5),
    ("User2", "2024-01-02", 15),
    ("User3", "2024-01-01", 2),
], ["user_id", "date", "clicks"])

purchases_df = spark.createDataFrame([
    ("User1", "2024-01-01", 2),
    ("User1", "2024-01-02", 3),
    ("User2", "2024-01-01", 1),
    ("User2", "2024-01-02", 6),
], ["user_id", "date", "purchases"])


**Схема результата**

In [52]:
schema = StructType([
    StructField("user_id", StringType()),
    StructField("correlation", DoubleType())
])

**Функция для применения через .mapInPandas**

In [53]:
def calculate_correlation(clicks_pdf: pd.DataFrame, purchases_pdf: pd.DataFrame) -> pd.DataFrame:
    """ Нормализация данных по партициям (MinMax Scaling) """        
    if not clicks_pdf.empty:
        user_id = clicks_pdf["user_id"].iloc[0]
    elif not purchases_pdf.empty:
        user_id = purchases_pdf["user_id"].iloc[0]
    else:
        return pd.DataFrame(columns=["user_id", "correlation"])
    
    # Объединяем по дате
    merged = clicks_pdf.merge(purchases_pdf, on="date", suffixes=("_c", "_p"))
    
    if len(merged) < 2:
        return pd.DataFrame([[user_id, np.nan]], columns=["user_id", "correlation"])
    
    user_id = clicks_pdf['user_id'].iloc[0]
    corr = merged['clicks'].corr(merged['purchases'])
    
    return pd.DataFrame([[user_id, corr]], columns=["user_id", "correlation"])

**Применение функции**

In [54]:
result = clicks_df.groupBy("user_id").cogroup( purchases_df.groupBy("user_id")).applyInPandas(calculate_correlation, schema=schema)

result.show()

+-------+------------------+
|user_id|       correlation|
+-------+------------------+
|  User1|0.9999999999999999|
|  User2|0.9999999999999999|
|  User3|              NULL|
+-------+------------------+



## 3. Синхронизация временных рядов (заполнение пропусков значением 0)

**Исходные данные**

In [55]:
# Плановые и фактические данные
planned_df = spark.createDataFrame([
    ("Task1", "2024-01-01", 1000),
    ("Task1", "2024-01-03", 1200),
], ["task", "date", "planned_value"])

actual_df = spark.createDataFrame([
    ("Task1", "2024-01-01", 950),
    ("Task1", "2024-01-02", 1050),
], ["task", "date", "actual_value"])


**Схема результата**

In [56]:
schema = StructType([
    StructField("task", StringType()),
    StructField("date", StringType()),
    StructField("planned_value", DoubleType()),
    StructField("actual_value", DoubleType()),
    StructField("variance", DoubleType())
])


**Функция для применения**

In [57]:
def merge_and_fill(planned_pdf: pd.DataFrame, actual_pdf: pd.DataFrame) -> pd.DataFrame:
    if planned_pdf.empty and actual_pdf.empty:
        return pd.DataFrame(columns=schema.fieldNames())
    
    task = planned_pdf['task'].iloc[0] if not planned_pdf.empty else actual_pdf['task'].iloc[0]
    
    # Объединяем с заполнением пропусков
    merged = pd.merge(planned_pdf, actual_pdf, on="date", how="outer", suffixes=("_p", "_a"))
    merged['task'] = task
    merged['planned_value'] = merged['planned_value'].fillna(0)
    merged['actual_value'] = merged['actual_value'].fillna(0)
    merged['variance'] = merged['actual_value'] - merged['planned_value']
    
    return merged[['task', 'date', 'planned_value', 'actual_value', 'variance']]

**Применение через applyInPandas**

In [58]:
result = planned_df.groupBy("task").cogroup(actual_df.groupBy("task")).applyInPandas(merge_and_fill, schema=schema)

result.orderBy("date").show()

+-----+----------+-------------+------------+--------+
| task|      date|planned_value|actual_value|variance|
+-----+----------+-------------+------------+--------+
|Task1|2024-01-01|       1000.0|       950.0|   -50.0|
|Task1|2024-01-02|          0.0|      1050.0|  1050.0|
|Task1|2024-01-03|       1200.0|         0.0| -1200.0|
+-----+----------+-------------+------------+--------+



## 4. A/B тест: сравнение контрольной и тестовой групп

In [59]:
# Контрольная группа
control_df = spark.createDataFrame([
    ("Campaign1", 1, 100),
    ("Campaign1", 2, 110),
    ("Campaign1", 3, 105),
], ["campaign", "user_id", "revenue"])

# Тестовая группа
test_df = spark.createDataFrame([
    ("Campaign1", 4, 120),
    ("Campaign1", 5, 130),
    ("Campaign1", 6, 125),
], ["campaign", "user_id", "revenue"])

schema = StructType([
    StructField("campaign", StringType()),
    StructField("control_avg", DoubleType()),
    StructField("test_avg", DoubleType()),
    StructField("uplift_pct", DoubleType()),
    StructField("p_value", DoubleType())
])

def ab_test_analysis(control_pdf: pd.DataFrame, test_pdf: pd.DataFrame) -> pd.DataFrame:
    from scipy import stats
    
    if control_pdf.empty or test_pdf.empty:
        return pd.DataFrame(columns=schema.fieldNames())
    
    campaign = control_pdf['campaign'].iloc[0]
    control_avg = control_pdf['revenue'].mean()
    test_avg = test_pdf['revenue'].mean()
    uplift = ((test_avg - control_avg) / control_avg) * 100 if control_avg != 0 else 0
    
    # T-test
    t_stat, p_value = stats.ttest_ind(control_pdf['revenue'], test_pdf['revenue'])
    
    return pd.DataFrame([[campaign, control_avg, test_avg, uplift, p_value]], 
                        columns=schema.fieldNames())

result = control_df.groupBy("campaign").cogroup(
    test_df.groupBy("campaign")
).applyInPandas(ab_test_analysis, schema=schema)

result.show()

+---------+-----------+--------+------------------+--------------------+
| campaign|control_avg|test_avg|        uplift_pct|             p_value|
+---------+-----------+--------+------------------+--------------------+
|Campaign1|      105.0|   125.0|19.047619047619047|0.008049893100837717|
+---------+-----------+--------+------------------+--------------------+



## Example PANDAS_UDF с применением срествами .mapInPandas

mapInPandas - это метод Spark DataFrame, позволящий обрабатывать данные партициями в виде pandas.DataFrame и возвращать pandas.DataFrame. (pandas‑аналог mapPartitions)

**Особенности**

1. **Обработка по партициям Spark** - Итериратор по партициям Spark DataFrame и yield‑итератор pandas.DataFrame как возвращаемое значение
2. **Схема вывода** - необходимо явно определять схему вывода с помощью StructType
3. **Векторизовация операций** - операции производятся над целыми колонками данных, что значительно ускоряет их выполнение по сравнению с классическими циклическими подходами

```python
DataFrame.mapInPandas(func: PandasMapIterFunction
                     ,schema: Union[pyspark.sql.types.StructType, str]
                    , barrier: bool = False) 
return DataFrame
```   

**Пример определения:**
```python
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import pandas as pd

# Определяется схема вывода
output_schema = StructType([
    StructField("id", StringType(), True),
    StructField("value", IntegerType(), True)
                    ])

# Определяется функция для применения
def calculate_fuction(pdf: pd.DataFrame):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

# Прменение средствами .applyInPandas
df.mapInPandas(calculate_fuction, schema=output_schema).show()

### 1. Нормализация данных по партициям (MinMax Scaling)

Min–Max нормализация - масштабирование в диапазон (0...1)

Для каждого признака расчитывается внутри группы:

$ 
x_{\text{norm}} = \frac{x - x_{\min}}{x_{\max} - x_{\min}} $

**Исходные данные**

In [60]:
df = spark.createDataFrame([
    ("Product1", 50),  ("Product1", 100), ("Product1", 150), ("Product1", 200),
    ("Product2", 150), ("Product2", 160), ("Product2", 300),
    ("Product3", 60),
], ["product", "sales"])

**Схема результата**

In [61]:
schema = StructType([
    StructField("product", StringType()),
    StructField("sales", DoubleType()),
    StructField("normalized_sales", DoubleType())
])

**Функция для применения через .mapInPandas**

In [62]:
def normalize_partition(iterator):
    """ Нормализация данных по партициям (MinMax Scaling) """    
    for pdf in iterator:
        if pdf.empty:
            yield pdf
            continue
        
        # MinMax нормализация внутри партиции
        min_val = pdf['sales'].min()
        max_val = pdf['sales'].max()
        
        if max_val - min_val != 0:
            pdf['normalized_sales'] = (pdf['sales'] - min_val) / (max_val - min_val)
        else:
            pdf['normalized_sales'] = 0.0
        
        yield pdf

**Применение функции**

In [63]:
df.repartition(1).mapInPandas(normalize_partition, schema=schema).show()

+--------+-----+----------------+
| product|sales|normalized_sales|
+--------+-----+----------------+
|Product1| 50.0|             0.0|
|Product1|100.0|             0.2|
|Product1|150.0|             0.4|
|Product1|200.0|             0.6|
|Product2|150.0|             0.4|
|Product2|160.0|            0.44|
|Product2|300.0|             1.0|
|Product3| 60.0|            0.04|
+--------+-----+----------------+



### 2. Обработка временных рядов с интерполяцией

Интерполяция пропущенных значений — это способ восстановления недостающих данных по уже известным соседним точкам. Предполагается что значение меняется плавно.


**Исходные данные с пропусками**

In [64]:
df = spark.createDataFrame([
    ("Sensor1", "2024-01-01", 20.5),
    ("Sensor1", "2024-01-03", 22.0),  # Пропущен 2024-01-02
    ("Sensor1", "2024-01-04", 21.5),
    ("Sensor2", "2024-01-01", 18.0),
    ("Sensor2", "2024-01-04", 19.5),  # Пропущены 02 и 03
], ["sensor_id", "date", "temperature"])

**Схема результата**

In [65]:
schema = StructType([
    StructField("sensor_id", StringType(), True),
    StructField("date", StringType(), True),
    StructField("temperature", DoubleType(), True),
    StructField("is_interpolated", StringType(), True)
])

**Функция для применения через .mapInPandas**

In [66]:
# Линейная интерполяция
def interpolate_timeseries(iterator):
    """
    Линейная интерполяция пропущенных значений в временных рядах.
    Обрабатывает каждую партицию отдельно.
    """
    for pdf in iterator:
        if pdf.empty:
            yield pdf
            continue
        
        # Преобразуем строки в datetime
        pdf['date'] = pd.to_datetime(pdf['date'])
        pdf = pdf.sort_values(['sensor_id', 'date'])
        
        result_dfs = []
        
        # Для каждого сенсора отдельно создаётся полный диапазон дат
        for sensor_id, group in pdf.groupby('sensor_id'):
            date_range = pd.date_range(start=group['date'].min(), end=group['date'].max(), freq='D')
            
            # DataFrame с полным диапазоном дат
            full_dates = pd.DataFrame({'date': date_range,'sensor_id': sensor_id})
            
            # Объединение с исходными данными
            merged = full_dates.merge(group[['date', 'temperature']], on='date', how='left')
            
            # Метка интерполированных значений (Yes)
            merged['is_interpolated'] = merged['temperature'].isna().map({True: 'Yes', False: 'No'})
            
            # Применение линейной интерполяции
            merged['temperature'] = merged['temperature'].interpolate(method='linear')
            
            result_dfs.append(merged)
        
        # Объединяем все сенсоры
        if result_dfs:
            result = pd.concat(result_dfs, ignore_index=True)
            result['date'] = result['date'].dt.strftime('%Y-%m-%d')
            
            # Итератор в правильном порядке колонок
            yield result[['sensor_id', 'date', 'temperature', 'is_interpolated']]

**Применение функции**

In [67]:
df.mapInPandas(interpolate_timeseries, schema=schema).orderBy("sensor_id", "date").show()

+---------+----------+-----------+---------------+
|sensor_id|      date|temperature|is_interpolated|
+---------+----------+-----------+---------------+
|  Sensor1|2024-01-01|       20.5|             No|
|  Sensor1|2024-01-03|       22.0|             No|
|  Sensor1|2024-01-04|       21.5|             No|
|  Sensor2|2024-01-01|       18.0|             No|
|  Sensor2|2024-01-02|       18.5|            Yes|
|  Sensor2|2024-01-03|       19.0|            Yes|
|  Sensor2|2024-01-04|       19.5|             No|
+---------+----------+-----------+---------------+



# Stop Spark Session

In [68]:
spark.stop()