<a href="https://colab.research.google.com/github/Yanina-Kutovaya/GNN/blob/main/notebooks/spark_stratified_split.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Функция stratified_split для работы с большими данными в PySpark для модели CatBoost

## Список тестов из класса ```TestStratifiedSplit```

Ниже приведен список всех тестовых методов из класса TestStratifiedSplit, с краткими описаниями их назначения и проверяемых сценариев.

1. Основные сценарии разбиения
  - ```test_stratified_split_normal```
  Проверяет нормальное разбиение на ```train/val/test``` с тремя классами и стандартными пропорциями (например, ```0.6:0.2:0.2```). Убеждается, что данные распределены пропорционально по классам и временные столбцы удалены.
  - ```test_stratified_split_no_val```
  Проверяет случай, когда ```val_ratio = 0```. Убеждается, что ```val_data``` — пустой DataFrame, а остальные выборки корректно заполнены.
  - ```test_stratified_split_no_test```
  Проверяет случай, когда ```test_ratio = 0```. Убеждается, что ```test_data``` — пустой DataFrame, а остальные выборки корректно заполнены.
2. Граничные случаи и округление
  - ```test_small_class_sizes```
  Проверяет корректность округления при малых размерах классов. Например, если класс содержит всего 1 элемент, а доля составляет 0.5, то он попадает в остаток (например, в ```test```).
3. Обработка ошибок
  - ```test_invalid_ratio_sum```
  Проверяет, что функция выбрасывает ошибку ```AssertionError```, если сумма долей - ```train_ratio + val_ratio + test_ratio ≠ 1.0```.
  - ```test_missing_label_column```
  Проверяет, что функция выбрасывает ошибку ```ValueError```, если указанный ```label_col``` отсутствует в DataFrame.
4. Воспроизводимость
  - ```test_reproducibility```
  Проверяет, что при одинаковом ```seed``` результаты разбиения воспроизводятся. Убеждается, что повторный запуск с тем же ```seed``` дает идентичные выборки.

## Что проверяют эти тесты?

- __Корректность разбиения:__ Пропорциональное распределение классов в ```train/val/test```.
- __Удаление временных столбцов:__ Отсутствие служебных столбцов (```__temp_rand__```, ```__temp_row_num__``` и т.д.) в результатах.
- __Граничные случаи:__ Обработка ```val_ratio=0```, ```test_ratio=0```, малых классов.
- __Исключения:__ Проверка выбросов ошибок при некорректных входных данных.
- __Воспроизводимость:__  Идентичные результаты при одинаковом ```seed```

Эти тесты полностью покрывают основные сценарии использования функции ```stratified_split```, включая стратификацию, обработку ошибок и воспроизводимость.

In [1]:
install = False
if install:
  # Установить Spark 3.4 с Scala 2.12
  !pip install -q pyspark==3.4.0 catboost==1.2.8

  # Перезапустить среду после установки
  import os
  os.kill(os.getpid(), 9)

In [2]:
import unittest
from pyspark.sql import SparkSession, DataFrame, Window
from pyspark.sql.functions import col, rand, row_number, when
from typing import Tuple

def stratified_split(
    df: DataFrame,
    label_col: str,
    train_ratio: float = 0.6,
    val_ratio: float = 0.2,
    test_ratio: float = 0.2,
    seed: int = 42
) -> Tuple[DataFrame, DataFrame, DataFrame]:
    """
    Стратифицированное разделение данных на train/val/test с использованием оконных функций.

    Parameters:
        df (DataFrame): Исходный DataFrame.
        label_col (str): Название столбца с метками классов.
        train_ratio (float): Доля обучающей выборки.
        val_ratio (float): Доля валидационной выборки.
        test_ratio (float): Доля тестовой выборки.
        seed (int): Сид для генерации случайных чисел.

    Returns:
        tuple[DataFrame, DataFrame, DataFrame]: train_data, val_data, test_data.
    """
    # Проверка суммы долей
    assert abs(train_ratio + val_ratio + test_ratio - 1.0) < 1e-9, "Сумма долей должна быть равна 1.0"

    # Проверка наличия столбца меток
    if label_col not in df.columns:
        raise ValueError(f"Столбец '{label_col}' отсутствует в DataFrame.")

    # Добавляем случайное число для сортировки внутри групп
    df = df.withColumn("__temp_rand__", rand(seed))

    # Окно для нумерации строк внутри групп по случайному числу
    window_spec = Window.partitionBy(label_col).orderBy("__temp_rand__")

    # Добавляем номер строки в группе, начиная с 1
    df_ranked = df.withColumn("__temp_row_num__", row_number().over(window_spec))

    # Вычисляем общее количество строк в каждой группе
    group_counts = df_ranked.groupBy(label_col).count().withColumnRenamed("count", "__temp_total__")

    # Присоединяем количество строк к исходному DataFrame
    df_with_counts = df_ranked.join(group_counts, on=label_col)

    # Вычисляем пороги для train и val
    train_threshold = (col("__temp_total__") * train_ratio).cast("int")
    val_threshold = (col("__temp_total__") * (train_ratio + val_ratio)).cast("int")

    # Присваиваем метки частей
    df_partitioned = df_with_counts.withColumn(
        "__temp_partition__",
        when(col("__temp_row_num__") <= train_threshold, "train")
             .when(col("__temp_row_num__") <= val_threshold, "val")
             .otherwise("test")
    )

    # Фильтруем по меткам и убираем временные столбцы
    train_data = df_partitioned.filter(col("__temp_partition__") == "train").drop(
        "__temp_rand__", "__temp_row_num__", "__temp_total__", "__temp_partition__"
    )
    val_data = df_partitioned.filter(col("__temp_partition__") == "val").drop(
        "__temp_rand__", "__temp_row_num__", "__temp_total__", "__temp_partition__"
    )
    test_data = df_partitioned.filter(col("__temp_partition__") == "test").drop(
        "__temp_rand__", "__temp_row_num__", "__temp_total__", "__temp_partition__"
    )

    # Если val_ratio или test_ratio равны нулю, возвращаем пустой DataFrame
    if val_ratio == 0:
        val_data = df.sparkSession.createDataFrame(df.sparkSession.sparkContext.emptyRDD(), df.schema)
    if test_ratio == 0:
        test_data = df.sparkSession.createDataFrame(df.sparkSession.sparkContext.emptyRDD(), df.schema)

    return train_data, val_data, test_data


class TestStratifiedSplit(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        cls.spark = SparkSession.builder \
            .master("local[*]") \
            .appName("TestStratifiedSplit") \
            .getOrCreate()

    @classmethod
    def tearDownClass(cls):
        cls.spark.stop()

    def create_test_data(self, class_counts: dict = None) -> DataFrame:
        """
        Создаёт тестовый DataFrame с заданным количеством элементов в каждом классе.
        Пример: {"A": 10, "B": 15, "C": 5}
        """
        if class_counts is None:
            class_counts = {"A": 10, "B": 15}
        data = []
        for label, count in class_counts.items():
            data.extend([(label,) for _ in range(count)])
        return self.spark.createDataFrame(data, ["label"])

    def check_partition_sizes(self, df: DataFrame, expected_counts: dict):
        """
        Проверяет, что количество элементов по классам в выборке соответствует ожидаемому.
        """
        counts = df.groupBy("label").count().rdd.collectAsMap()
        for label, expected in expected_counts.items():
            self.assertEqual(counts.get(label, 0), expected)

    def test_stratified_split_normal(self):
        """
        Проверяет нормальное разбиение на train/val/test с тремя классами.
        """
        df = self.create_test_data({"A": 10, "B": 15, "C": 5})
        train_ratio, val_ratio, test_ratio = 0.6, 0.2, 0.2
        train_data, val_data, test_data = stratified_split(df, "label", train_ratio, val_ratio, test_ratio)

        # Проверка размеров
        self.check_partition_sizes(train_data, {"A": 6, "B": 9, "C": 3})
        self.check_partition_sizes(val_data, {"A": 2, "B": 3, "C": 1})
        self.check_partition_sizes(test_data, {"A": 2, "B": 3, "C": 1})

        # Проверка удаления временных столбцов
        temp_cols = ["__temp_rand__", "__temp_row_num__", "__temp_total__", "__temp_partition__"]
        for col_name in temp_cols:
            self.assertNotIn(col_name, train_data.columns)
            self.assertNotIn(col_name, val_data.columns)
            self.assertNotIn(col_name, test_data.columns)

    def test_stratified_split_no_val(self):
        """
        Проверяет случай, когда val_ratio = 0.
        """
        df = self.create_test_data({"A": 10, "B": 15})
        train_ratio, val_ratio, test_ratio = 0.8, 0.0, 0.2
        train_data, val_data, test_data = stratified_split(df, "label", train_ratio, val_ratio, test_ratio)

        self.assertEqual(val_data.count(), 0)
        self.check_partition_sizes(train_data, {"A": 8, "B": 12})
        self.check_partition_sizes(test_data, {"A": 2, "B": 3})

    def test_stratified_split_no_test(self):
        """
        Проверяет случай, когда test_ratio = 0.
        """
        df = self.create_test_data({"A": 10, "B": 15})
        train_ratio, val_ratio, test_ratio = 0.7, 0.3, 0.0
        train_data, val_data, test_data = stratified_split(df, "label", train_ratio, val_ratio, test_ratio)

        self.assertEqual(test_data.count(), 0)
        self.check_partition_sizes(train_data, {"A": 7, "B": 10})
        self.check_partition_sizes(val_data, {"A": 3, "B": 5})

    def test_invalid_ratio_sum(self):
        """
        Проверяет, что функция выбрасывает ошибку при некорректной сумме долей.
        """
        df = self.create_test_data()
        with self.assertRaises(AssertionError):
            stratified_split(df, "label", train_ratio=0.5, val_ratio=0.5, test_ratio=0.1)

    def test_missing_label_column(self):
        """
        Проверяет, что функция выбрасывает ошибку при отсутствии указания label_col.
        """
        df = self.spark.createDataFrame([(1,), (2,)], ["value"])
        with self.assertRaises(ValueError):
            stratified_split(df, "label", train_ratio=0.6)

    def test_small_class_sizes(self):
        """
        Проверяет корректность округления при малых размерах классов.
        """
        df = self.create_test_data({"A": 1, "B": 2})
        train_ratio, val_ratio, test_ratio = 0.5, 0.25, 0.25
        train_data, val_data, test_data = stratified_split(df, "label", train_ratio, val_ratio, test_ratio)

        self.check_partition_sizes(train_data, {"A": 0, "B": 1})  # 1 * 0.5 = 0.5 → 0
        self.check_partition_sizes(val_data, {"A": 0, "B": 0})    # 1 * 0.25 = 0.25 → 0
        self.check_partition_sizes(test_data, {"A": 1, "B": 1})  # Остаток

    def test_reproducibility(self):
        """
        Проверяет воспроизводимость при одинаковом seed.
        """
        df = self.create_test_data({"A": 10, "B": 15})
        train1, val1, test1 = stratified_split(df, "label", seed=42)
        train2, val2, test2 = stratified_split(df, "label", seed=42)

        self.assertEqual(train1.count(), train2.count())
        self.assertEqual(val1.count(), val2.count())
        self.assertEqual(test1.count(), test2.count())

        # Сравнение содержимого
        train1_sorted = train1.sort("label")
        train2_sorted = train2.sort("label")
        self.assertTrue(train1_sorted.exceptAll(train2_sorted).isEmpty())

        val1_sorted = val1.sort("label")
        val2_sorted = val2.sort("label")
        self.assertTrue(val1_sorted.exceptAll(val2_sorted).isEmpty())

        test1_sorted = test1.sort("label")
        test2_sorted = test2.sort("label")
        self.assertTrue(test1_sorted.exceptAll(test2_sorted).isEmpty())

In [3]:
if __name__ == '__main__':
    unittest.main(argv=[''], exit=False)

.......
----------------------------------------------------------------------
Ran 7 tests in 38.776s

OK
