# ETL Pipeline с использованием Delta Lake

В этом ноутбуке реализован ETL-пайплайн для обработки данных с использованием Delta Lake. Процесс включает следующие этапы:

1. **Загрузка исходных данных в формат Delta Table (bronze слой)**
2. **Очистка и трансформация данных**
3. **Сохранение обработанных данных в silver слой**

## Требования к проекту:
- Сформировать корректный датасет для предсказания возраста по устройству
- Объединить данные из target_train.feather и dataset_full.feather по ID
- Удалить дубликаты записей по устройству для одного пользователя


In [None]:
!pip install --upgrade pyspark==3.5.0 delta-spark

In [13]:
# Импорт необходимых библиотек
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta import *
import pyarrow.dataset as ds

print("Импорт библиотек завершен успешно")

Импорт библиотек завершен успешно


In [14]:
# Создание директорий для bronze и silver слоев
bronze_path = "/opt/data/bronze"
silver_path = "/opt/data/silver"

# Создаем папки, если они не существуют
os.makedirs(bronze_path, exist_ok=True)
os.makedirs(silver_path, exist_ok=True)

print(f"Директории созданы:")
print(f"Bronze: {bronze_path}")
print(f"Silver: {silver_path}")

Директории созданы:
Bronze: /opt/data/bronze
Silver: /opt/data/silver


In [5]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
import os

# На всякий случай сбросим старые настройки spark-submit
os.environ.pop("PYSPARK_SUBMIT_ARGS", None)

# Собираем builder сразу с настройками кластера и Delta Lake
builder = (
    SparkSession.builder
        .appName("Age Predictor ETL Pipeline")
        .master("spark://spark-standalone:7077")
        .config("spark.driver.host", "jupyter-spark")
        .config("spark.driver.bindAddress", "0.0.0.0")
        .config("spark.driver.port", "7078")
        .config("spark.blockManager.port", "7079")

        # Delta Lake extensions
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

        # Общие оптимизации
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .config("spark.sql.adaptive.enabled", "true")
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
)

# Оборачиваем builder, чтобы подтянулись все JAR’ы Delta
spark = configure_spark_with_delta_pip(builder).getOrCreate()

print("Connected to:", spark.sparkContext.master)
print("Spark version:", spark.version)
print("Delta Lake support enabled")
print("Available cores:", spark.sparkContext.defaultParallelism)

Connected to: spark://spark-standalone:7077
Spark version: 3.5.0
Delta Lake support enabled
Available cores: 2


## Этап 1: Загрузка данных в Bronze слой

Загрузим исходные данные из feather файлов и сохраним их в формате Delta Table в папку bronze.


In [6]:
# Загрузка данных из feather файлов
print("Загрузка target_train.feather...")
target_train_pandas = pd.read_feather("../data/target_train.feather")
print(f"Размер target_train: {target_train_pandas.shape}")
print(f"Колонки target_train: {list(target_train_pandas.columns)}")

# Просмотр первых строк
print("\nПример данных target_train:")
print(target_train_pandas.head())

Загрузка target_train.feather...
Размер target_train: (270000, 3)
Колонки target_train: ['age', 'is_male', 'user_id']

Пример данных target_train:
    age is_male  user_id
0  31.0       1   350459
1  35.0       1   188276
2  41.0       0    99002
3  33.0       0   155506
4  54.0       0   213873


In [7]:
# Конвертация pandas DataFrame в Spark DataFrame
print("Конвертация в Spark DataFrame...")
target_train_spark = spark.createDataFrame(target_train_pandas)

print("Схема target_train:")
target_train_spark.printSchema()

Конвертация в Spark DataFrame...
Схема target_train:
root
 |-- age: double (nullable = true)
 |-- is_male: string (nullable = true)
 |-- user_id: long (nullable = true)



In [16]:
# Абсолютный путь до целевой папки
target_train_path = bronze_path + "/target_train"

# Пишем в Delta
target_train_spark.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save(target_train_path)

#### Загружаем основной датасет частями

In [26]:
target_path = bronze_path + "/dataset_full"

# Загружаем Feather через PyArrow Dataset и пишем батчами
print("Загрузка dataset_full.feather по батчам и сохранение в Bronze слой…")
dataset = ds.dataset("../data/dataset_full.feather", format="feather")

MAX_ROWS = 20_000_000
batch_size = 200_000
total_rows = 0

for i, batch in enumerate(dataset.to_batches(batch_size=batch_size)):
    # если уже накопили нужное — выходим
    if total_rows >= MAX_ROWS:
        break
        
    # Конвертация Arrow batch → Pandas
    pdf = batch.to_pandas()
    if i == 0:
        print(f"Первый батч размером: {pdf.shape}")
        print(f"Колонки: {list(pdf.columns)}")

    total_rows += len(pdf)

    # Конвертация в Spark DataFrame
    sdf = spark.createDataFrame(pdf)
    if i == 0:
        print("\nСхема dataset_full:")
        sdf.printSchema()

    # Запись в Delta в режиме append + mergeSchema
    sdf.write \
       .format("delta") \
       .mode("append") \
       .option("mergeSchema", "true") \
       .save(target_path)

    if i % 25 == 0:
        print(f"Батч {i+1}: записано {len(pdf)} строк (итого {total_rows})")

print(f"\nВсего строк записано: {total_rows}")
print("Данные успешно сохранены в bronze слой!")

Загрузка dataset_full.feather по батчам и сохранение в Bronze слой…
Первый батч размером: (65536, 12)
Колонки: ['region_name', 'city_name', 'cpe_manufacturer_name', 'cpe_model_name', 'url_host', 'cpe_type_cd', 'cpe_model_os_type', 'price', 'date', 'part_of_day', 'request_cnt', 'user_id']

Схема dataset_full:
root
 |-- region_name: string (nullable = true)
 |-- city_name: string (nullable = true)
 |-- cpe_manufacturer_name: string (nullable = true)
 |-- cpe_model_name: string (nullable = true)
 |-- url_host: string (nullable = true)
 |-- cpe_type_cd: string (nullable = true)
 |-- cpe_model_os_type: string (nullable = true)
 |-- price: double (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- part_of_day: string (nullable = true)
 |-- request_cnt: long (nullable = true)
 |-- user_id: long (nullable = true)

Батч 1: записано 65536 строк (итого 65536)
Батч 26: записано 65536 строк (итого 1703936)
Батч 51: записано 65536 строк (итого 3342336)
Батч 76: записано 65536 строк (итого 

## Этап 2: Загрузка данных из Bronze слоя

In [27]:
# Загрузка данных из bronze слоя
print("Загрузка данных из bronze слоя...")
target_train_bronze = spark.read.format("delta").load(f"{bronze_path}/target_train")
dataset_full_bronze = spark.read.format("delta").load(f"{bronze_path}/dataset_full")

print(f"Количество записей в target_train: {target_train_bronze.count()}")
print(f"Количество записей в dataset_full: {dataset_full_bronze.count()}")

# Проверка уникальных ID в каждом датасете
print(f"Уникальных ID в target_train: {target_train_bronze.select('user_id').distinct().count()}")
print(f"Уникальных ID в dataset_full: {dataset_full_bronze.select('user_id').distinct().count()}")

Загрузка данных из bronze слоя...
Количество записей в target_train: 270000
Количество записей в dataset_full: 20054016
Уникальных ID в target_train: 270000
Уникальных ID в dataset_full: 29884


## Этап 3: Очистка и трансформация данных

Здесь мы выполним следующие операции:
1. Объединим данные по ID пользователя
2. Удалим дубликаты записей по устройству для одного пользователя
3. Подготовим финальный датасет для предсказания возраста по устройству


In [29]:
# Объединение данных по ID пользователя
print("Объединение данных по ID пользователя...")
joined_df = dataset_full_bronze.join(target_train_bronze, on="user_id", how="inner")

print(f"Количество записей после объединения: {joined_df.count()}")
print(f"Количество уникальных пользователей: {joined_df.select('user_id').distinct().count()}")

# Проверим схему объединенных данных
print("\nСхема объединенных данных:")
joined_df.printSchema()

Объединение данных по ID пользователя...
Количество записей после объединения: 13109396
Количество уникальных пользователей: 19482

Схема объединенных данных:
root
 |-- user_id: long (nullable = true)
 |-- region_name: string (nullable = true)
 |-- city_name: string (nullable = true)
 |-- cpe_manufacturer_name: string (nullable = true)
 |-- cpe_model_name: string (nullable = true)
 |-- url_host: string (nullable = true)
 |-- cpe_type_cd: string (nullable = true)
 |-- cpe_model_os_type: string (nullable = true)
 |-- price: double (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- part_of_day: string (nullable = true)
 |-- request_cnt: long (nullable = true)
 |-- age: double (nullable = true)
 |-- is_male: string (nullable = true)



In [30]:
# Анализ дубликатов
print("Анализ дубликатов по устройству для пользователей...")

# Подсчет записей на пользователя и устройство
user_device_counts = joined_df.groupBy("user_id", "cpe_model_name").count()
duplicates = user_device_counts.filter(col("count") > 1)

print(f"Количество пар (пользователь, устройство) с дубликатами: {duplicates.count()}")

# Показать примеры дубликатов
if duplicates.count() > 0:
    print("\nПримеры дубликатов:")
    duplicates.orderBy(desc("count")).show(10)
else:
    print("Дубликаты не найдены")

Анализ дубликатов по устройству для пользователей...
Количество пар (пользователь, устройство) с дубликатами: 19231

Примеры дубликатов:
+-------+----------------+-----+
|user_id|  cpe_model_name|count|
+-------+----------------+-----+
|  63338|       iPhone 11|15022|
| 162592|            1904|12286|
| 291085|       iPhone 11|11843|
| 160011|       iPhone XR|11661|
|  16802|Redmi Note 8 Pro|11607|
| 407449|       iPhone 11|11321|
| 112660|        iPhone 7|10202|
| 269822|  iPhone SE 2020| 9827|
| 160651|       iPhone XR| 9674|
| 217241|      Galaxy A31| 9494|
+-------+----------------+-----+
only showing top 10 rows



In [31]:
# Удаление дубликатов по устройству для каждого пользователя
print("Удаление дубликатов...")

# Определяем колонки для удаления дубликатов (id и device)
columns_to_check = joined_df.columns
print(f"Доступные колонки: {columns_to_check}")

# Ищем колонки, которые могут содержать информацию об устройстве
device_related_columns = [col for col in columns_to_check if 'cpe_model_name' in col.lower() or 'cpe_manufacturer_name' in col.lower()]
print(f"Колонки, связанные с устройством: {device_related_columns}")

# Если есть колонка device, используем её для удаления дубликатов
if 'cpe_model_name' in columns_to_check:
    cleaned_df = joined_df.dropDuplicates(["user_id", "cpe_model_name"])
    print("Используется колонка 'cpe_model_name' для удаления дубликатов")
elif device_related_columns:
    # Используем первую найденную колонку, связанную с устройством
    device_col = device_related_columns[0]
    cleaned_df = joined_df.dropDuplicates(["user_id", device_col])
    print(f"Используется колонка '{device_col}' для удаления дубликатов")
else:
    # Если колонки device нет, удаляем дубликаты по всем колонкам
    cleaned_df = joined_df.dropDuplicates()
    print("Удаляем полные дубликаты записей")

print(f"Количество записей до очистки: {joined_df.count()}")
print(f"Количество записей после очистки: {cleaned_df.count()}")
print(f"Удалено записей: {joined_df.count() - cleaned_df.count()}")

Количество записей после очистки: 19482
Удалено записей: 13089914


In [33]:
# Финальная проверка и статистика данных
print("Финальная статистика очищенных данных:")
print(f"Общее количество записей: {cleaned_df.count()}")
print(f"Количество уникальных пользователей: {cleaned_df.select('user_id').distinct().count()}")

# Статистика по возрасту
print("\nСтатистика по возрасту:")
age_stats = cleaned_df.select("age").describe()
age_stats.show()

# Показать примеры финальных данных
print("\nПример очищенных данных:")
cleaned_df.show(10, truncate=False)

Финальная статистика очищенных данных:
Общее количество записей: 19482
Количество уникальных пользователей: 19482

Статистика по возрасту:
+-------+-----+
|summary|  age|
+-------+-----+
|  count|19482|
|   mean|  NaN|
| stddev|  NaN|
|    min| 14.0|
|    max|  NaN|
+-------+-----+


Пример очищенных данных:
+-------+--------------------+--------------------+---------------------+-------------------+---------------------------+-----------+-----------------+-------+-------------------+-----------+-----------+----+-------+
|user_id|region_name         |city_name           |cpe_manufacturer_name|cpe_model_name     |url_host                   |cpe_type_cd|cpe_model_os_type|price  |date               |part_of_day|request_cnt|age |is_male|
+-------+--------------------+--------------------+---------------------+-------------------+---------------------------+-----------+-----------------+-------+-------------------+-----------+-----------+----+-------+
|16     |Челябинская область |Магнитого

## Этап 4: Сохранение в Silver слой

In [34]:
# Сохранение очищенных данных в silver слой
print("Сохранение очищенных данных в silver слой...")

cleaned_df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save(f"{silver_path}/cleaned_dataset")

print("Данные успешно сохранены в silver слой!")

# Добавим метаданные о процессе
metadata_df = spark.createDataFrame([
    ("bronze_to_silver_etl", "completed", 
     joined_df.count(), cleaned_df.count(), 
     joined_df.count() - cleaned_df.count())
], ["process", "status", "input_records", "output_records", "removed_duplicates"])

metadata_df.write \
    .format("delta") \
    .mode("overwrite") \
    .save(f"{silver_path}/process_metadata")

print("Метаданные процесса сохранены")

Сохранение очищенных данных в silver слой...
Данные успешно сохранены в silver слой!
Метаданные процесса сохранены


In [35]:
# Проверка сохраненных данных в silver слое
print("Проверка сохраненных данных в silver слое...")

silver_data = spark.read.format("delta").load(f"{silver_path}/cleaned_dataset")
print(f"Количество записей в silver слое: {silver_data.count()}")

print("\nСхема данных в silver слое:")
silver_data.printSchema()

print("\nПример данных из silver слоя:")
silver_data.show(5, truncate=False)

# Проверка метаданных
metadata = spark.read.format("delta").load(f"{silver_path}/process_metadata")
print("\nМетаданные процесса:")
metadata.show()

Проверка сохраненных данных в silver слое...
Количество записей в silver слое: 19482

Схема данных в silver слое:
root
 |-- user_id: long (nullable = true)
 |-- region_name: string (nullable = true)
 |-- city_name: string (nullable = true)
 |-- cpe_manufacturer_name: string (nullable = true)
 |-- cpe_model_name: string (nullable = true)
 |-- url_host: string (nullable = true)
 |-- cpe_type_cd: string (nullable = true)
 |-- cpe_model_os_type: string (nullable = true)
 |-- price: double (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- part_of_day: string (nullable = true)
 |-- request_cnt: long (nullable = true)
 |-- age: double (nullable = true)
 |-- is_male: string (nullable = true)


Пример данных из silver слоя:
+-------+-------------------+---------------+---------------------+-------------------+----------------------+-----------+-----------------+-------+-------------------+-----------+-----------+----+-------+
|user_id|region_name        |city_name      |cpe_manufact

## Этап 5: Финальная валидация и сводка


In [36]:
# Финальная валидация ETL процесса
print("=== СВОДКА ETL ПРОЦЕССА ===")
print()

# Проверяем наличие данных в bronze и silver слоях
bronze_target = spark.read.format("delta").load(f"{bronze_path}/target_train")
bronze_dataset = spark.read.format("delta").load(f"{bronze_path}/dataset_full")
silver_cleaned = spark.read.format("delta").load(f"{silver_path}/cleaned_dataset")

print("СТАТИСТИКА ДАННЫХ:")
print(f"Bronze - target_train: {bronze_target.count():,} записей")
print(f"Bronze - dataset_full: {bronze_dataset.count():,} записей")
print(f"Silver - cleaned_dataset: {silver_cleaned.count():,} записей")
print()

print("ВЫПОЛНЕННЫЕ ЗАДАЧИ:")
print("1. Загружены исходные данные в формат Delta Table в папку ./data/bronze/")
print("2. Данные объединены по ID пользователя")
print("3. Удалены дубликаты записей по устройству для одного пользователя")
print("4. Обработанные данные сохранены в папку silver в формате Delta Table")
print()

print("СТРУКТУРА ДАННЫХ:")
print("../data/bronze/target_train/ - исходные данные о возрасте пользователей")
print("../data/bronze/dataset_full/ - исходные данные об устройствах пользователей") 
print("../data/silver/cleaned_dataset/ - очищенный датасет для ML модели")
print("../data/silver/process_metadata/ - метаданные ETL процесса")
print()

print("Данные готовы для предсказания возраста пользователей по их устройствам!")

=== СВОДКА ETL ПРОЦЕССА ===

СТАТИСТИКА ДАННЫХ:
Bronze - target_train: 270,000 записей
Bronze - dataset_full: 20,054,016 записей
Silver - cleaned_dataset: 19,482 записей

ВЫПОЛНЕННЫЕ ЗАДАЧИ:
1. Загружены исходные данные в формат Delta Table в папку ./data/bronze/
2. Данные объединены по ID пользователя
3. Удалены дубликаты записей по устройству для одного пользователя
4. Обработанные данные сохранены в папку silver в формате Delta Table

СТРУКТУРА ДАННЫХ:
../data/bronze/target_train/ - исходные данные о возрасте пользователей
../data/bronze/dataset_full/ - исходные данные об устройствах пользователей
../data/silver/cleaned_dataset/ - очищенный датасет для ML модели
../data/silver/process_metadata/ - метаданные ETL процесса

Данные готовы для предсказания возраста пользователей по их устройствам!


In [37]:
# Остановка Spark сессии
print("Остановка Spark сессии...")
spark.stop()
print("Spark сессия завершена")

Остановка Spark сессии...
Spark сессия завершена
