Решил сначала посмотреть варианты и решить задание при помощи Pandas т.к. не особо знаком с PySpark. Когда разобрался что нужно сделать и разработал рабочее решение, то посмотрел как работать с Spark и реализовал код на нем.  

# Вариант с pandas

In [4]:
import pandas as pd

products = pd.DataFrame({
    'product_id': [1, 2, 3, 4, 5, 6],
    'product_name': ['iPhone', 'MacBook', 'AirPods', 'iPad', 'Watch', 'Stylus']
})

categories = pd.DataFrame({
    'category_id': [1, 2, 3, 4, 5],
    'category_name': ['Phones', 'Laptops', 'Accessories', 'Tablets', 'Earphones']
})

relations = pd.DataFrame({
    'product_id': [1, 2, 3, 3, 6],
    'category_id': [1, 2, 3, 5, 3]
})

print("Продукты:")
products

Продукты:


Unnamed: 0,product_id,product_name
0,1,iPhone
1,2,MacBook
2,3,AirPods
3,4,iPad
4,5,Watch
5,6,Stylus


In [3]:
print("Категории:")
categories

Категории:


Unnamed: 0,category_id,category_name
0,1,Phones
1,2,Laptops
2,3,Accessories
3,4,Tablets
4,5,Earphones


In [4]:
print("Связи:")
relations

Связи:


Unnamed: 0,product_id,category_id
0,1,1
1,2,2
2,3,3
3,3,5
4,6,3


In [None]:
# собираем датасет по связям
from typing import Tuple, List
def get_pairs(
        products_df: pd.DataFrame,
    categories_df: pd.DataFrame, 
    relations_df: pd.DataFrame
) -> Tuple[pd.DataFrame, List[str], List[str]]:
    """
    Возвращает все пары продукт-категория, включая продукты без категорий и категории без продуктов.
    
    Args:
        products_df: pandas DataFrame с колонками ['product_id', 'product_name']
        categories_df: pandas DataFrame с колонками ['category_id', 'category_name']
        relations_df: pandas DataFrame с колонками ['product_id', 'category_id']
    
    Returns:
        Tuple containing:
        - pandas DataFrame с колонками ['product_name', 'category_name']
        - List продуктов без категорий
        - List категорий без продуктов
    
    Raises:
        ValueError: Если таблица связей пуста или отсутствуют обязательные колонки
    """
    
    if relations_df.empty:
        raise ValueError("Таблица связей пуста")
    
    for df_cols, cols, name in [
        (products_df.columns, ['product_id', 'product_name'], "products_df"),
        (categories_df.columns, ['category_id', 'category_name'], "categories_df"), 
        (relations_df.columns, ['product_id', 'category_id'], "relations_df")
    ]:
        missing_cols = set(cols) - set(df_cols)
        if missing_cols:
            raise ValueError(f"В {name} отсутствуют колонки: {missing_cols}")

    # Использовал множества для ускорения поиска
    products_in_relations = set(relations_df['product_id'])
    categories_in_relations = set(relations_df['category_id'])
    
    # Продукты без категорий
    products_without_categories = products_df[
        ~products_df['product_id'].isin(products_in_relations)
    ]['product_name'].tolist()
    
    # Категории без продуктов
    categories_without_products = categories_df[
        ~categories_df['category_id'].isin(categories_in_relations)
    ]['category_name'].tolist()
    
    result_parts = []
    
    try:
        existing_pairs = (
            relations_df
            .merge(products_df, on='product_id', how='inner')
            .merge(categories_df, on='category_id', how='inner')
            [['product_name', 'category_name']]
        )
        result_parts.append(existing_pairs)
    except Exception as e:
        raise ValueError(f"Ошибка при объединении таблиц: {e}")
    
    if products_without_categories:
        result_parts.append(pd.DataFrame({
            'product_name': products_without_categories,
            'category_name': [None] * len(products_without_categories)
        }))
    
    if categories_without_products:
        result_parts.append(pd.DataFrame({
            'product_name': [None] * len(categories_without_products),
            'category_name': categories_without_products
        }))
    
    return pd.concat(result_parts, ignore_index=True), products_without_categories, categories_without_products

In [19]:
result, prod_wo_cat, cat_wo_prod = get_pairs(products, categories, relations)

print("Все пары продукт-категория:")
result

Все пары продукт-категория:


Unnamed: 0,product_name,category_name
0,iPhone,Phones
1,MacBook,Laptops
2,AirPods,Accessories
3,AirPods,Earphones
4,Stylus,Accessories
5,iPad,
6,Watch,
7,,Tablets


In [20]:
print(f"Продукты без категорий: {list(prod_wo_cat)}")

print(f"Категории без продуктов: {list(cat_wo_prod)}")

Продукты без категорий: ['iPad', 'Watch']
Категории без продуктов: ['Tablets']


In [21]:
print(f"Всего продуктов: {len(products)}")
print(f"Продуктов с категориями: {len(result[result['category_name'].notna()]['product_name'].unique())}")
print(f"Продуктов без категорий: {len(prod_wo_cat)}")
print(f"Всего категорий: {len(categories)}")
print(f"Категорий с продуктами: {len(result[result['product_name'].notna()]['category_name'].unique())}")
print(f"Категорий без продуктов: {len(cat_wo_prod)}")

Всего продуктов: 6
Продуктов с категориями: 5
Продуктов без категорий: 2
Всего категорий: 5
Категорий с продуктами: 5
Категорий без продуктов: 1


In [18]:
print("=== ТЕСТ 1: Пустая таблица связей ===")
try:
    empty_relations = pd.DataFrame({'product_id': [], 'category_id': []})
    result, no_cat, no_prod = get_pairs(products, categories, empty_relations)
    print("❌ Тест не прошел: Ожидалась ошибка, но функция выполнилась")
except ValueError as e:
    print(f"✅ Тест прошел: {e}")

print("\n")

print("=== ТЕСТ 2: Отсутствует колонка product_name ===")
try:
    invalid_products = pd.DataFrame({
        'product_id': [1, 2],  # есть только product_id, нет product_name
        # 'product_name': ['A', 'B']
    })
    result, no_cat, no_prod = get_pairs(invalid_products, categories, relations)
    print("❌ Тест не прошел: Ожидалась ошибка, но функция выполнилась")
except ValueError as e:
    print(f"✅ Тест прошел: {e}")

print("\n")

print("=== ТЕСТ 3: Корректные данные ===")
try:
    result, no_cat, no_prod = get_pairs(products, categories, relations)
    print("✅ Тест прошел: Функция выполнилась успешно")
    print(f"   Результат: {len(result)} строк")
    print(f"   Продукты без категорий: {no_cat}")
    print(f"   Категории без продуктов: {no_prod}")
except Exception as e:
    print(f"❌ Тест не прошел: {e}")

=== ТЕСТ 1: Пустая таблица связей ===
✅ Тест прошел: Таблица связей пуста


=== ТЕСТ 2: Отсутствует колонка product_name ===
✅ Тест прошел: В products_df отсутствуют колонки: {'product_name'}


=== ТЕСТ 3: Корректные данные ===
✅ Тест прошел: Функция выполнилась успешно
   Результат: 8 строк
   Продукты без категорий: ['iPad', 'Watch']
   Категории без продуктов: ['Tablets']


# Вариант с PySpark

## Проверки путей для Java и PySpark

### Проверка что Java для PySpark установлена (для pyspark 4.0.1 требуется java JDK 17 или 21)

Пояснение: у меня была установлена JDK 24 и spark JVM не запустилась. Пришлось устанавливать версию 21 и прописывать путь для JAVA_HOME в переменную среды Windows для пользователя. Т.е. через Win+R -> systempropertiesadvanced -> Переменные среды -> переменные среды пользователя для %Username% а так же изменил настройки в VS Code, сделав поиск в настройках VS Code: Ctrl + , -> terminal integrated env -> и прописал следующие настройки: 

```json
"jdk.jdkhome": "C:\\Program Files\\Java\\jdk-21",
"java.jdt.ls.java.home": "C:\\Program Files\\Java\\jdk-21",
"editor.unicodeHighlight.includeComments": false,
"editor.unicodeHighlight.nonBasicASCII": false,
"java.import.gradle.java.home": "C:\\Program Files\\Java\\jdk-21",
"terminal.integrated.env.windows": {
    "JAVA_HOME": "C:\\Program Files\\Java\\jdk-21",
    "PATH": "${env:PATH};C:\\Program Files\\Java\\jdk-21"
}
```

In [1]:
import os
import subprocess
import sys

def check_java_installation():
    """Проверка установки Java"""
    print("=== ПРОВЕРКА JAVA ===")
    
    # Проверяем JAVA_HOME
    java_home = os.environ.get('JAVA_HOME')
    print(f"JAVA_HOME: {java_home}")
    
    # Проверяем версию Java
    try:
        result = subprocess.run(['java', '-version'], capture_output=True, text=True)
        print("Java version check:")
        print(result.stderr.split('\n')[0])
    except FileNotFoundError:
        print("❌ Java не найдена в PATH")
        return False
    
    # Проверяем путь
    if java_home and os.path.exists(java_home):
        print(f"✅ JAVA_HOME корректен: {java_home}")
    else:
        print("❌ JAVA_HOME неверный или не установлен")
        return False
        
    return True

# Проверяем Java
if check_java_installation():
    print("\n✅ Java настроена правильно!")
else:
    print("\n❌ Проблемы с настройкой Java")
    sys.exit(1)

=== ПРОВЕРКА JAVA ===
JAVA_HOME: C:\Program Files\Java\jdk-21
Java version check:
java version "21.0.8" 2025-07-15 LTS
✅ JAVA_HOME корректен: C:\Program Files\Java\jdk-21

✅ Java настроена правильно!


Если путь к jdk есть в PATH то java подтянется автоматически при запуске кода с инициализацией pyspark сессии, т.е. все будет работать. JAVA_HOME проверяю на всякий случай

### Проверка что прописаны пути для PySpark для Windows переменных окружения

Пояснение: была проблема, что spark выводил ошибку *Python worker failed to connect back*. Исправил это добавив переменные в "переменные среды" а так же в VS Code в часть "terminal.integrated.env.windows": 
```json
{
"PYSPARK_PYTHON": "C:\\Users\\Alex\\AppData\\Local\\Programs\\Python\\Python311\\python.exe",
"PYSPARK_DRIVER_PYTHON": "C:\\Users\\Alex\\AppData\\Local\\Programs\\Python\\Python311\\python.exe",
"SPARK_HOME": "C:\\Users\\Alex\\AppData\\Local\\Programs\\Python\\Python311\\Lib\\site-packages\\pyspark"
}
```

In [1]:
import os
import subprocess
import sys

variables_to_check = [
    'SPARK_HOME', 
    'PYSPARK_PYTHON',
    'PYSPARK_DRIVER_PYTHON',
    'PATH'
]

for var in variables_to_check:
    value = os.environ.get(var)
    if value:
        print(f"✅ {var}: {value}")
    else:
        print(f"❌ {var}: НЕ УСТАНОВЛЕН")

print("\n=== ПРОВЕРКА КОМАНД ===")

# Проверяем доступность команд
commands_to_check = [
    'java -version',
    'python --version',
    'where python' if os.name == 'nt' else 'which python'
]

for cmd in commands_to_check:
    try:
        result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
        if result.returncode == 0:
            print(f"✅ {cmd}: РАБОТАЕТ")
            # Выводим первую строку вывода для версий
            if 'version' in cmd:
                output_line = result.stderr.split('\n')[0] if result.stderr else result.stdout.split('\n')[0]
                print(f"   {output_line}")
        else:
            print(f"❌ {cmd}: НЕ РАБОТАЕТ (код возврата: {result.returncode})")
    except Exception as e:
        print(f"❌ {cmd}: ОШИБКА - {e}")

print("\n=== ПРОВЕРКА PYTHON ПУТЕЙ ===")
print(f"sys.executable: {sys.executable}")
print(f"sys.version: {sys.version}")

# Проверяем, можем ли импортировать pyspark
try:
    from pyspark.sql import SparkSession
    print("✅ PySpark импортируется успешно")
except ImportError as e:
    print(f"❌ Ошибка импорта PySpark: {e}")

✅ SPARK_HOME: C:\Users\Alex\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark
✅ PYSPARK_PYTHON: C:\Users\Alex\AppData\Local\Programs\Python\Python311\python.exe
✅ PYSPARK_DRIVER_PYTHON: C:\Users\Alex\AppData\Local\Programs\Python\Python311\python.exe
✅ PATH: c:\Users\Alex\AppData\Local\Programs\Python\Python311;c:\Users\Alex\AppData\Roaming\Python\Python311\Scripts;C:\Program Files\Common Files\Oracle\Java\javapath;C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v12.8\bin;C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v12.8\libnvvp;C:\Program Files\NVIDIA\CUDNN\v9.4\bin;C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v12.6\bin;C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v12.6\libnvvp;C:\WINDOWS\system32;C:\WINDOWS;C:\WINDOWS\System32\Wbem;C:\WINDOWS\System32\WindowsPowerShell\v1.0\;C:\WINDOWS\System32\OpenSSH\;C:\Program Files (x86)\NVIDIA Corporation\PhysX\Common;C:\Program Files\Microsoft VS Code\bin;C:\Program Files\dotnet\;C:\Program Files\Graphviz\

## Решение задания

In [None]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from typing import Tuple
import pandas as pd

In [3]:
# Создаем Spark сессию
spark = SparkSession.builder \
    .appName("ProductCategoryAnalysis") \
    .getOrCreate()

In [4]:
def create_test_data(from_pd: bool = False,
                    products_df: pd.DataFrame = pd.DataFrame(),
                    categories_df: pd.DataFrame = pd.DataFrame(), 
                    relations_df: pd.DataFrame = pd.DataFrame()) -> Tuple[DataFrame, DataFrame, DataFrame]:
    """Создает тестовые данные в PySpark DataFrame"""
    if from_pd:
        products = spark.createDataFrame(products_df)
        categories= spark.createDataFrame(categories_df)
        relations= spark.createDataFrame(relations_df)
    else:
        products_data = [
            (1, 'iPhone'), (2, 'MacBook'), (3, 'AirPods'), 
            (4, 'iPad'), (5, 'Watch'), (6, 'Stylus')
        ]
        products_schema = StructType([
            StructField("product_id", IntegerType(), True),
            StructField("product_name", StringType(), True)
        ])
        products = spark.createDataFrame(products_data, products_schema)
        
        categories_data = [
            (1, 'Phones'), (2, 'Laptops'), (3, 'Accessories'), 
            (4, 'Tablets'), (5, 'Earphones')
        ]
        categories_schema = StructType([
            StructField("category_id", IntegerType(), True),
            StructField("category_name", StringType(), True)
        ])
        categories = spark.createDataFrame(categories_data, categories_schema)
        
        relations_data = [
            (1, 1), (2, 2), (3, 3), (3, 5), (6, 3)
        ]
        relations_schema = StructType([
            StructField("product_id", IntegerType(), True),
            StructField("category_id", IntegerType(), True)
        ])
        relations = spark.createDataFrame(relations_data, relations_schema)
    
    return products, categories, relations

In [18]:
# Создаем тестовые данные
products, categories, relations = create_test_data()

print("=== ПРОДУКТЫ ===")
products.show()

print("=== КАТЕГОРИИ ===")
categories.show()

print("=== СВЯЗИ ===")
relations.show()

=== ПРОДУКТЫ ===
+----------+------------+
|product_id|product_name|
+----------+------------+
|         1|      iPhone|
|         2|     MacBook|
|         3|     AirPods|
|         4|        iPad|
|         5|       Watch|
|         6|      Stylus|
+----------+------------+

=== КАТЕГОРИИ ===
+-----------+-------------+
|category_id|category_name|
+-----------+-------------+
|          1|       Phones|
|          2|      Laptops|
|          3|  Accessories|
|          4|      Tablets|
|          5|    Earphones|
+-----------+-------------+

=== СВЯЗИ ===
+----------+-----------+
|product_id|category_id|
+----------+-----------+
|         1|          1|
|         2|          2|
|         3|          3|
|         3|          5|
|         6|          3|
+----------+-----------+



In [23]:
def get_pairs_spark(
    products_df: DataFrame,
    categories_df: DataFrame, 
    relations_df: DataFrame
) -> Tuple[DataFrame, List[str], List[str]]:
    """
    PySpark версия функции для получения всех пар продукт-категория
    
    Args:
        products_df: pyspark DataFrame с колонками ['product_id', 'product_name']
        categories_df: pyspark DataFrame с колонками ['category_id', 'category_name']
        relations_df: pyspark DataFrame с колонками ['product_id', 'category_id']

    Returns:
        Tuple containing:
        - pyspark DataFrame с колонками ['product_name', 'category_name']
        - List продуктов без категорий
        - List категорий без продуктов
    
    Raises:
        ValueError: Если таблица связей пуста или отсутствуют обязательные колонки
    """
    
    if relations_df.count() == 0:
        raise ValueError("Таблица связей пуста")
    
    for df_cols, cols, name in [
        (products_df.columns, ['product_id', 'product_name'], "products_df"),
        (categories_df.columns, ['category_id', 'category_name'], "categories_df"), 
        (relations_df.columns, ['product_id', 'category_id'], "relations_df")
    ]:
        missing_cols = set(cols) - set(df_cols)
        if missing_cols:
            raise ValueError(f"В {name} отсутствуют колонки: {missing_cols}")

    result = (
        relations_df
        .join(products_df, 'product_id')
        .join(categories_df, 'category_id')
        .select('product_name', 'category_name')
    )
    
    # Продукты без категорий и Категории без продуктов через анти-джоин (left_anti)
    products_without_categories = [row['product_name'] for row in 
                                   products_df.join(relations_df, on='product_id', how='left_anti')
                                   .select('product_name').collect()]

    categories_without_products = [row['category_name'] for row in 
                                   categories_df.join(relations_df, on='category_id', how='left_anti')
                                   .select('category_name').collect()]

    if products_without_categories:
        extra_products = products_df.sparkSession.createDataFrame(
            [(p, None) for p in products_without_categories],
            schema=StructType([
                StructField("product_name", StringType(), True),
                StructField("category_name", StringType(), True)
            ]) #без schema с указанием типа ругается на None: 
                # (PySparkValueError: [CANNOT_DETERMINE_TYPE] Some of types cannot be determined after inferring.)
        )
        result = result.unionByName(extra_products)
    
    if categories_without_products:
        extra_categories = categories_df.sparkSession.createDataFrame(
            [(None, c) for c in categories_without_products],
            schema=StructType([
                StructField("product_name", StringType(), True),
                StructField("category_name", StringType(), True)
            ]) #тут тоже schema нужна соответственно 
        )
        result = result.unionByName(extra_categories)

    return result, products_without_categories, categories_without_products

In [24]:
print("=== РЕЗУЛЬТАТ ===")
result, prod_wo_cat, cat_wo_prod = get_pairs_spark(products, categories, relations)
result.show()

print(f"Продукты без категорий: {prod_wo_cat}")
print(f"Категории без продуктов: {cat_wo_prod}")

=== РЕЗУЛЬТАТ ===
+------------+-------------+
|product_name|category_name|
+------------+-------------+
|      iPhone|       Phones|
|     MacBook|      Laptops|
|      Stylus|  Accessories|
|     AirPods|  Accessories|
|     AirPods|    Earphones|
|        iPad|         NULL|
|       Watch|         NULL|
|        NULL|      Tablets|
+------------+-------------+

Продукты без категорий: ['iPad', 'Watch']
Категории без продуктов: ['Tablets']


In [28]:
print(f"Всего продуктов: {products.count()}")
print(f"Продуктов с категориями: {result.filter(F.col('category_name').isNotNull()).select('product_name').distinct().count()}")

print(f"Продуктов без категорий: {len(prod_wo_cat)}")
print(f"Всего категорий: {categories.count()}")
print(f"Категорий с продуктами: {result.filter(F.col('product_name').isNotNull()).select('category_name').distinct().count()}")

print(f"Категорий без продуктов: {len(cat_wo_prod)}")

Всего продуктов: 6
Продуктов с категориями: 5
Продуктов без категорий: 2
Всего категорий: 5
Категорий с продуктами: 5
Категорий без продуктов: 1


In [29]:
print("=== ТЕСТ 1: Пустая таблица связей ===")
try:
    empty_relations = spark.createDataFrame([], schema="product_id INT, category_id INT")
    result, no_cat, no_prod = get_pairs_spark(products, categories, empty_relations)
    print("❌ Тест не прошел: Ожидалась ошибка, но функция выполнилась")
except ValueError as e:
    print(f"✅ Тест прошел: {e}")

print("\n")

print("=== ТЕСТ 2: Отсутствует колонка product_name ===")
try:
    invalid_products = spark.createDataFrame([(1,), (2,)], ["product_id"])  # без product_name
    result, no_cat, no_prod = get_pairs_spark(invalid_products, categories, relations)
    print("❌ Тест не прошел: Ожидалась ошибка, но функция выполнилась")
except ValueError as e:
    print(f"✅ Тест прошел: {e}")

print("\n")

print("=== ТЕСТ 3: Корректные данные ===")
try:
    result, no_cat, no_prod = get_pairs_spark(products, categories, relations)
    print("✅ Тест прошел: Функция выполнилась успешно")
    print(f"   Результат: {result.count()} строк")
    print(f"   Продукты без категорий: {no_cat}")
    print(f"   Категории без продуктов: {no_prod}")
except Exception as e:
    print(f"❌ Тест не прошел: {e}")


=== ТЕСТ 1: Пустая таблица связей ===
✅ Тест прошел: Таблица связей пуста


=== ТЕСТ 2: Отсутствует колонка product_name ===
✅ Тест прошел: В products_df отсутствуют колонки: {'product_name'}


=== ТЕСТ 3: Корректные данные ===
✅ Тест прошел: Функция выполнилась успешно
   Результат: 8 строк
   Продукты без категорий: ['iPad', 'Watch']
   Категории без продуктов: ['Tablets']


In [None]:
# !pip freeze > requirements.txt