<a href="https://colab.research.google.com/github/CareOfLaeda/task_for_work/blob/main/task2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col, lit

def get_products_with_categories(products_df: DataFrame,
                               categories_df: DataFrame,
                               product_category_df: DataFrame) -> DataFrame:

    product_category_pairs = (product_category_df
        .join(products_df, "product_id")
        .join(categories_df, "category_id")
        .select("product_name", "category_name"))

    products_without_categories = (products_df
        .join(product_category_df, "product_id", "left_anti")
        .select("product_name", lit(None).alias("category_name")))

    result_df = product_category_pairs.union(products_without_categories)

    return result_df



In [7]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col, lit

spark = SparkSession.builder \
    .appName("ProductsCategoriesTest") \
    .master("local[*]") \
    .getOrCreate()


def get_products_with_categories(products_df: DataFrame,
                               categories_df: DataFrame,
                               product_category_df: DataFrame) -> DataFrame:

    product_category_pairs = (product_category_df
        .join(products_df, "product_id")
        .join(categories_df, "category_id")
        .select("product_name", "category_name"))

    products_without_categories = (products_df
        .join(product_category_df, "product_id", "left_anti")
        .select("product_name", lit(None).alias("category_name")))

    result_df = product_category_pairs.union(products_without_categories)

    return result_df

def test_basic_functionality():
    """Тест базовой функциональности"""
    print("ТЕСТ 1: Базовая функциональность")

    products_data = [
        ("p1", "Laptop"),
        ("p2", "Smartphone"),
        ("p3", "Book"),
        ("p4", "Headphones")
    ]
    products_df = spark.createDataFrame(products_data, ["product_id", "product_name"])

    categories_data = [
        ("c1", "Electronics"),
        ("c2", "Computers"),
        ("c3", "Books")
    ]
    categories_df = spark.createDataFrame(categories_data, ["category_id", "category_name"])

    product_category_data = [
        ("p1", "c1"),
        ("p1", "c2"),
        ("p2", "c1"),
        ("p3", "c3")
    ]
    product_category_df = spark.createDataFrame(product_category_data, ["product_id", "category_id"])

    result = get_products_with_categories(products_df, categories_df, product_category_df)

    print("Входные данные:")
    print("Продукты:")
    products_df.show()
    print("Категории:")
    categories_df.show()
    print("Связи:")
    product_category_df.show()

    print("Результат:")
    result.show()

    total_count = result.count()
    pairs_count = result.filter(col("category_name").isNotNull()).count()
    no_category_count = result.filter(col("category_name").isNull()).count()

    print(f"Статистика:")
    print(f"Всего записей: {total_count}")
    print(f"Пар продукт-категория: {pairs_count}")
    print(f"Продуктов без категорий: {no_category_count}")

    expected_results = [
        ("Laptop", "Electronics"),
        ("Laptop", "Computers"),
        ("Smartphone", "Electronics"),
        ("Book", "Books"),
        ("Headphones", None)
    ]

    actual_results = [(row.product_name, row.category_name) for row in result.collect()]

    print(f"Ожидалось записей: {len(expected_results)}, Получено: {total_count}")
    print(f"Ожидалось пар: 4, Получено: {pairs_count}")
    print(f"Ожидалось продуктов без категорий: 1, Получено: {no_category_count}")

    laptop_categories = [row.category_name for row in result.filter(col("product_name") == "Laptop").collect()]
    headphones_has_no_category = result.filter((col("product_name") == "Headphones") & (col("category_name").isNull())).count() == 1

    print(f"Laptop имеет 2 категории: {len(laptop_categories) == 2}")
    print(f"Headphones без категории: {headphones_has_no_category}")

    return result

def test_multiple_categories():
    print("\nТЕСТ 2: Продукты с несколькими категориями")

    products_data = [("p1", "SuperProduct")]
    products_df = spark.createDataFrame(products_data, ["product_id", "product_name"])

    categories_data = [("c1", "Cat1"), ("c2", "Cat2"), ("c3", "Cat3")]
    categories_df = spark.createDataFrame(categories_data, ["category_id", "category_name"])

    product_category_data = [("p1", "c1"), ("p1", "c2"), ("p1", "c3")]
    product_category_df = spark.createDataFrame(product_category_data, ["product_id", "category_id"])

    result = get_products_with_categories(products_df, categories_df, product_category_df)

    print("Продукт 'SuperProduct' должен быть в 3 категориях")
    result.show()

    product_categories_count = result.filter(col("product_name") == "SuperProduct").count()
    print(f"SuperProduct имеет {product_categories_count} категорий (ожидается: 3)")

    return result

def test_all_products_without_categories():
    """Тест когда у всех продуктов нет категорий"""
    print("\nТЕСТ 3: Все продукты без категорий")

    products_data = [("p1", "Product1"), ("p2", "Product2"), ("p3", "Product3")]
    products_df = spark.createDataFrame(products_data, ["product_id", "product_name"])

    categories_data = [("c1", "Category1")]
    categories_df = spark.createDataFrame(categories_data, ["category_id", "category_name"])

    product_category_data = []
    product_category_df = spark.createDataFrame(product_category_data, ["product_id", "category_id"])

    result = get_products_with_categories(products_df, categories_df, product_category_df)

    print("Все 3 продукта должны быть без категорий")
    result.show()

    total_count = result.count()
    no_category_count = result.filter(col("category_name").isNull()).count()

    print(f"Всего продуктов: {total_count} (ожидается: 3)")
    print(f"Продуктов без категорий: {no_category_count} (ожидается: 3)")

    return result

def test_empty_data():
    """Тест с пустыми входными данными"""
    print("\nТЕСТ 4: Пустые входные данные")

    empty_schema = StructType([
        StructField("product_id", StringType(), True),
        StructField("product_name", StringType(), True)
    ])

    products_df = spark.createDataFrame([], empty_schema)
    categories_df = spark.createDataFrame([], empty_schema)
    product_category_df = spark.createDataFrame([], empty_schema)

    result = get_products_with_categories(products_df, categories_df, product_category_df)

    print("При пустых входных данных результат должен быть пустым")
    result.show()

    total_count = result.count()
    print(f"Записей в результате: {total_count} (ожидается: 0)")

    return result

def test_categories_without_products():
    print("\nТЕСТ 5: Категории без продуктов")

    products_data = [("p1", "SingleProduct")]
    products_df = spark.createDataFrame(products_data, ["product_id", "product_name"])

    categories_data = [("c1", "UsedCategory"), ("c2", "UnusedCategory")]
    categories_df = spark.createDataFrame(categories_data, ["category_id", "category_name"])

    product_category_data = [("p1", "c1")]  # Только одна категория используется
    product_category_df = spark.createDataFrame(product_category_data, ["product_id", "category_id"])

    result = get_products_with_categories(products_df, categories_df, product_category_df)

    print("Категория 'UnusedCategory' не должна отображаться в результате")
    result.show()

    total_count = result.count()
    unused_category_exists = result.filter(col("category_name") == "UnusedCategory").count() > 0

    print(f"Всего записей: {total_count} (ожидается: 1 пара + 0 без категорий = 1)")
    print(f"Неиспользуемая категория не отображается: {not unused_category_exists}")

    return result

def run_all_tests():
    print("=" * 50)

    tests = [
        test_basic_functionality,
        test_multiple_categories,
        test_all_products_without_categories,
        test_empty_data,
        test_categories_without_products
    ]

    passed_tests = 0
    failed_tests = 0

    for i, test_func in enumerate(tests, 1):
        try:
            print(f"\n{'='*30}")
            print(f"ТЕСТ {i}/5")
            result = test_func()
            print(f"ТЕСТ {i} ПРОЙДЕН")
            passed_tests += 1
        except Exception as e:
            print(f"ТЕСТ {i} ПРОВАЛЕН: {e}")
            failed_tests += 1

    print("\n" + "="*50)
    print("ИТОГИ ТЕСТИРОВАНИЯ:")
    print(f"Пройдено: {passed_tests}")
    print(f"Провалено: {failed_tests}")
    print(f"Успешность: {passed_tests/len(tests)*100:.1f}%")

    if failed_tests == 0:
        print("\nВСЕ ТЕСТЫ УСПЕШНО ПРОЙДЕНЫ!")
    else:
        print(f"\n{failed_tests} тестов требуют внимания")

def visual_verification_test():
    print("\nТЕСТ ДЛЯ ВИЗУАЛЬНОЙ ПРОВЕРКИ")

    products_data = [
        ("1", "Яблоко"),
        ("2", "Банан"),
        ("3", "Апельсин")
    ]
    products_df = spark.createDataFrame(products_data, ["product_id", "product_name"])

    categories_data = [
        ("1", "Фрукты"),
        ("2", "Желтые"),
        ("3", "Круглые")
    ]
    categories_df = spark.createDataFrame(categories_data, ["category_id", "category_name"])

    product_category_data = [
        ("1", "1"),
        ("1", "3"),
        ("2", "1"),
        ("2", "2"),
    ]
    product_category_df = spark.createDataFrame(product_category_data, ["product_id", "category_id"])

    print("Тестовые данные:")
    print("Продукты:")
    products_df.show()
    print("Категории:")
    categories_df.show()
    print("Связи:")
    product_category_df.show()

    result = get_products_with_categories(products_df, categories_df, product_category_df)

    print("Ожидаемый результат:")
    print("   - Яблоко: Фрукты, Круглые")
    print("   - Банан: Фрукты, Желтые")
    print("   - Апельсин: без категорий")

    print("\nФактический результат:")
    result.show()

    print("Визуальная проверка завершена")

if __name__ == "__main__":
    run_all_tests()

    visual_verification_test()

    spark.stop()
    print("\nSpark сессия остановлена")


ТЕСТ 1/5
ТЕСТ 1: Базовая функциональность
Входные данные:
Продукты:
+----------+------------+
|product_id|product_name|
+----------+------------+
|        p1|      Laptop|
|        p2|  Smartphone|
|        p3|        Book|
|        p4|  Headphones|
+----------+------------+

Категории:
+-----------+-------------+
|category_id|category_name|
+-----------+-------------+
|         c1|  Electronics|
|         c2|    Computers|
|         c3|        Books|
+-----------+-------------+

Связи:
+----------+-----------+
|product_id|category_id|
+----------+-----------+
|        p1|         c1|
|        p1|         c2|
|        p2|         c1|
|        p3|         c3|
+----------+-----------+

Результат:
+------------+-------------+
|product_name|category_name|
+------------+-------------+
|  Smartphone|  Electronics|
|      Laptop|  Electronics|
|        Book|        Books|
|      Laptop|    Computers|
|  Headphones|         NULL|
+------------+-------------+

Статистика:
Всего записей: 5
Пар 