# 4. Экспертные темы

## 4.1. Delta Lake

Delta Lake — это открытый формат хранения данных, который добавляет ACID-транзакции, управление метаданными и другие enterprise-функции к вашим данным в Data Lakes.

| Функции | Преимущество |
| - | - |
| ACID-транзакции | Гарантированная целостность данных при параллельныхзапросах |
| Time Travel | Доступ к историческим данным |
| Schema Enforcement | Контроль схема при записи |
| Upsert/Delete | Поддержка операций MERGE, UPDATE, DELETE |
| Оптимизация файлов | Автоматическая компактификация маленьких файлов |

Установка `pip install delta-spark`

Инициализация Spark с Delta
```python
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DeltaLake") \
    .config("spark.jars", "/path/to/delta-core_2.12.jar") \
    .getOrCreate()
```

Создание Delta-таблицы
```python
df.write.format("delta").save("/data/delta/events")

# С явным указанием схемы
spark.sql("""
  CREATE TABLE events (
    id LONG,
    date STRING,
    event_type STRING
  ) USING DELTA
  LOCATION '/data/delta/events'
""")
```

Чтение данных
```python
delta_df = spark.read.format("delta").load("/data/delta/events")

# Чтение конкретной версии
spark.read.format("delta") \
  .option("versionAsOf", 5) \
  .load("/data/delta/events")
```

#### Time Travel — доступ к истории

Просмотр истории изменений
```python
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "/data/delta/events")
history = delta_table.history()
history.show()
```

Восстановление данных
```python
# Восстановление до версии 2
delta_table.restoreToVersion(2)

# Восстановление до timestamp
delta_table.restoreToTimestamp("2023-01-01 00:00:00")
```

#### Операции Upsert и Delete

MERGE (Upsert)
```python
updates_df = spark.createDataFrame([(1, "new_data")], ["id", "data"])

delta_table.alias("target").merge(
    updates_df.alias("updates"),
    "target.id = updates.id"
).whenMatchedUpdate(set={"data": "updates.data"}) \
 .whenNotMatchedInsert(values={
    "id": "updates.id",
    "data": "updates.data"
}).execute()
```

Удаление данных
```python
delta_table.delete("date < '2023-01-01'")  # Удалить старые записи
```

#### Оптимизация Delta-таблиц

Компактификация файлов
```python
delta_table.optimize().executeCompaction()
```

Z-Ordering (кластеризация)
```python
delta_table.optimize().executeZOrderBy("event_type")
```

Вакуумирование (удаление старых версий)
```python
delta_table.vacuum()                  # Удалить версии старше 7 дней
delta_table.vacuum(48)                # Удалить версии старше 48 часов
```

#### Практический кейс: CDC (Change Data Capture)

Задача: Реализовать обработку изменений из источника в Delta Lake.
```python
# 1. Создаём стриминг из Kafka
changes = spark.readStream \
  .format("kafka") \
  .option("subscribe", "updates") \
  .load()

# 2. Парсим JSON-сообщения
parsed = changes.select(
  from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")

# 3. Записываем изменения в Delta с MERGE
def upsert_to_delta(microBatchDF, batchId):
    delta_table = DeltaTable.forPath(spark, "/data/delta/target")
    delta_table.alias("t").merge(
        microBatchDF.alias("s"),
        "t.id = s.id"
    ).whenMatchedUpdateAll() \
     .whenNotMatchedInsertAll() \
     .execute()

parsed.writeStream \
  .foreachBatch(upsert_to_delta) \
  .option("checkpointLocation", "/checkpoints/delta_cdc") \
  .start()
```

#### Мониторинг и обслуживание

Анализ метаданных
```python
spark.sql("ANALYZE TABLE events COMPUTE STATISTICS")
spark.sql("DESCRIBE DETAIL events").show()
```

Размеры файлов
```python
spark.sql("""
  SELECT file_size, num_records 
  FROM delta.`/data/delta/events`.files
""").show()
```

## 4.2. MLlib (машинное обучение)

MLlib — это масштабируемая библиотека машинного обучения Spark, которая предоставляет:
- Алгоритмы для классификации, регрессии, кластеризации
- Инструменты для обработки признаков (feature engineering)
- Конвейеры (Pipelines) для построения ML-пайплайнов
- Интеграцию с Python-библиотеками (sklearn, TensorFlow)

---

#### **Загрузка данных (на примере Titanic)**
```python
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MLlib").getOrCreate()
df = spark.read.csv("titanic.csv", header=True, inferSchema=True)

# Выборка признаков и целевой переменной
data = df.select(
    "Survived",  # Целевая переменная
    "Pclass", 
    "Sex", 
    "Age", 
    "Fare"
).na.drop()  # Удаление пропусков
```

#### **Обработка категориальных признаков**
```python
from pyspark.ml.feature import (
    StringIndexer, 
    OneHotEncoder,
    VectorAssembler
)

# Преобразование строк в числовые индексы
sex_indexer = StringIndexer(inputCol="Sex", outputCol="SexIndex")
pclass_indexer = StringIndexer(inputCol="Pclass", outputCol="PclassIndex")

# One-Hot Encoding
encoder = OneHotEncoder(
    inputCols=["SexIndex", "PclassIndex"],
    outputCols=["SexVec", "PclassVec"]
)

# Объединение всех признаков в один вектор
assembler = VectorAssembler(
    inputCols=["PclassVec", "SexVec", "Age", "Fare"],
    outputCol="features"
)
```

---

#### **Создание конвейера преобразований**
```python
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[
    sex_indexer,
    pclass_indexer,
    encoder,
    assembler
])

# Применение пайплайна к данным
processed_data = pipeline.fit(data).transform(data)
processed_data.show(3)
```

#### **Разделение на train/test**
```python
train_data, test_data = processed_data.randomSplit([0.7, 0.3], seed=42)
```

---

#### **Логистическая регрессия**
```python
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(
    featuresCol="features",
    labelCol="Survived",
    maxIter=10
)

model = lr.fit(train_data)
predictions = model.transform(test_data)
predictions.select("Survived", "prediction").show(5)
```

#### **Случайный лес**
```python
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(
    labelCol="Survived",
    featuresCol="features",
    numTrees=100
)

rf_model = rf.fit(train_data)
rf_predictions = rf_model.transform(test_data)
```

---

#### **Метрики качества**
```python
from pyspark.ml.evaluation import (
    BinaryClassificationEvaluator,
    MulticlassClassificationEvaluator
)

# AUC-ROC
evaluator = BinaryClassificationEvaluator(
    labelCol="Survived",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc:.3f}")

# Accuracy
acc_evaluator = MulticlassClassificationEvaluator(
    labelCol="Survived",
    predictionCol="prediction",
    metricName="accuracy"
)
accuracy = acc_evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy:.3f}")
```

#### **Матрица ошибок**
```python
from pyspark.mllib.evaluation import MulticlassMetrics

# Конвертация в RDD
predictionAndLabels = predictions.select("prediction", "Survived").rdd

# Расчёт метрик
metrics = MulticlassMetrics(predictionAndLabels)
print("Confusion Matrix:")
print(metrics.confusionMatrix().toArray())
```

---

#### **Cross-Validator**
```python
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Сетка параметров
paramGrid = (ParamGridBuilder()
    .addGrid(lr.regParam, [0.01, 0.1])
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
    .build())

# Кросс-валидация
cv = CrossValidator(
    estimator=lr,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3
)

cv_model = cv.fit(train_data)
best_model = cv_model.bestModel
```

---

#### **Сохранение и загрузка моделей**
```python
# Сохранение
best_model.write().overwrite().save("models/best_lr_model")

# Загрузка
from pyspark.ml.classification import LogisticRegressionModel
loaded_model = LogisticRegressionModel.load("models/best_lr_model")
```

---

#### **Практическое задание**
**Задача**: Постройте модель для предсказания цены домов (регрессия)  
**Датасет**: [California Housing](https://www.kaggle.com/datasets/camnugent/california-housing-prices)

1. Загрузите данные и обработайте категориальные признаки
2. Постройте пайплайн с RandomForestRegressor
3. Оцените модель с помощью RMSE и R2
4. Подберите оптимальные гиперпараметры

```python
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

rf = RandomForestRegressor(
    labelCol="median_house_value",
    featuresCol="features"
)

# Решение...
```

---

#### **Использование sklearn внутри Spark**
```python
from sklearn.ensemble import GradientBoostingClassifier
from pyspark.sql.functions import pandas_udf

@pandas_udf("double")
def sklearn_predict(features: pd.Series) -> pd.Series:
    model = GradientBoostingClassifier()
    model.fit(train_pd[features], train_pd[label])
    return pd.Series(model.predict(features))
```

#### **Использование TensorFlow/PyTorch**
```python
from pyspark.ml.torch.distributor import TorchDistributor

def train_fn():
    import torch
    # Код обучения модели
    
distributor = TorchDistributor(num_processes=2, local_mode=True)
distributor.run(train_fn)
```

## 4.3. Распределенная обработка (Glue, EMR)

#### **AWS Glue: Серверный ETL-сервис. Ключевые возможности**
- **Автоматическое обнаружение схемы** (Schema Discovery)
- **Встроенные преобразования** (Transformations)
- **Интеграция с каталогом данных** (Glue Data Catalog)
- **Серверная архитектура** (нет управления инфраструктурой)

#### **Создание Glue Job для обработки данных**
```python
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# Чтение данных из S3
datasource = glueContext.create_dynamic_frame.from_catalog(
    database="your-database",
    table_name="raw_data"
)

# Преобразование данных
def transform(record):
    record["new_column"] = record["existing_column"] * 2
    return record

transformed = Map.apply(frame=datasource, f=transform)

# Запись результатов в Parquet
glueContext.write_dynamic_frame.from_options(
    frame=transformed,
    connection_type="s3",
    connection_options={"path": "s3://output-bucket/processed/"},
    format="parquet"
)
```

#### **Запуск Glue Job через AWS CLI**
```bash
aws glue start-job-run --job-name "your-glue-job" --arguments='--extra-py-files="s3://your-bucket/libs.zip"'
```

---

#### **Amazon EMR: Управляемый Hadoop-кластер. Создание кластера EMR**
```bash
aws emr create-cluster \
--name "PySpark Cluster" \
--release-label emr-6.15.0 \
--applications Name=Spark \
--ec2-attributes KeyName=your-key-pair \
--instance-type m5.xlarge \
--instance-count 3 \
--log-uri s3://your-logs-bucket/ \
--steps Type=Spark,Name="Spark Job",ActionOnFailure=CONTINUE,Args=[--deploy-mode,cluster,--master,yarn,s3://your-bucket/your_script.py] \
--use-default-roles
```

#### **Пример PySpark-скрипта для EMR**
```python
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("EMR Processing") \
    .config("spark.sql.shuffle.partitions", "1000") \
    .getOrCreate()

# Чтение данных из S3
df = spark.read.parquet("s3://input-bucket/data/")

# Обработка данных
result = df.groupBy("category").count()

# Запись результатов
result.write.parquet("s3://output-bucket/results/")
```

#### **Мониторинг EMR**
- **EMR Console**: Просмотр состояния кластера
- **YARN ResourceManager**: `http://<master-public-dns>:8088`
- **Spark History Server**: `http://<master-public-dns>:18080`

---

#### **Оптимизация производительности. Настройки Glue**
```python
glueContext = GlueContext(SparkContext.getOrCreate())
glueContext.setConf("spark.sql.shuffle.partitions", "1000")
glueContext.setConf("spark.default.parallelism", "1000")
```

#### **Выбор инстансов EMR**
| **Тип задачи**       | **Рекомендуемые инстансы**      | **Конфигурация**                  |
|----------------------|--------------------------------|----------------------------------|
| Память-интенсивная   | r5.2xlarge, r6g.xlarge         | --executor-memory 20G            |
| CPU-интенсивная      | c5.2xlarge, c6g.xlarge         | --executor-cores 4               |
| Сбалансированная     | m5.2xlarge, m6g.xlarge         | --num-executors 10               |

#### **араметры Spark Submit для EMR**
```bash
spark-submit \
--deploy-mode cluster \
--master yarn \
--executor-memory 20G \
--executor-cores 4 \
--num-executors 10 \
your_script.py
```

---

#### **Интеграция с другими сервисами AWS. Чтение/запись в DynamoDB**
```python
# Чтение
dyf = glueContext.create_dynamic_frame.from_options(
    connection_type="dynamodb",
    connection_options={
        "dynamodb.input.tableName": "your-table",
        "dynamodb.throughput.read.percent": "0.5"
    }
)

# Запись
glueContext.write_dynamic_frame_from_options(
    frame=dyf,
    connection_type="dynamodb",
    connection_options={
        "dynamodb.output.tableName": "output-table",
        "dynamodb.throughput.write.percent": "0.5"
    }
)
```

#### **Обработка потоковых данных (Kinesis)**
```python
streaming_df = spark.readStream \
    .format("kinesis") \
    .option("streamName", "your-stream") \
    .option("region", "us-east-1") \
    .load()

query = streaming_df.writeStream \
    .format("parquet") \
    .option("path", "s3://output-bucket/streaming/") \
    .option("checkpointLocation", "s3://checkpoint-bucket/") \
    .start()
```

---

#### **Практическое задание**
**Задача**: Разверните ETL-пайплайн для обработки логов веб-сайта  
**Данные**: Логи в S3 в формате JSON (1+ TB данных)  
**Требования**:
1. Используйте AWS Glue для очистки и трансформации данных
2. Разверните EMR-кластер для агрегации данных
3. Сохраните результаты в Parquet-формате
4. Настройте автоматическое масштабирование кластера

**Решение**:
```python
# Glue Job (preprocessing.py)
from awsglue.context import GlueContext

glueContext = GlueContext(SparkContext.getOrCreate())
logs = glueContext.create_dynamic_frame.from_options(
    "s3",
    {"paths": ["s3://input-logs/"]},
    format="json"
)

# Фильтрация и преобразование
cleaned = logs.filter(lambda r: r["status"] == 200)
transformed = cleaned.apply_mapping([
    ("timestamp", "string", "event_time", "timestamp"),
    ("user_id", "string", "user_id", "string")
])

# Запись промежуточных данных
glueContext.write_dynamic_frame.from_options(
    frame=transformed,
    connection_type="s3",
    connection_options={"path": "s3://processed-logs/"},
    format="parquet"
)
```

---

#### **Лучшие практики**
1. **Glue**:
   - Используйте **закладки (bookmarks)** для инкрементальной обработки
   - Оптимизируйте **DPU (Data Processing Units)** для стоимости

2. **EMR**:
   - Включайте **EMR Managed Scaling** для автоматического масштабирования
   - Используйте **Spot Instances** для экономии (до 90%)

3. **Данные**:
   - Разделяйте данные по **партициям** (дата, регион)
   - Используйте **колоночные форматы** (Parquet, ORC)

## 4.4. Отладка и логирование  

#### **Настройка логирования в PySpark. Базовая конфигурация**
```python
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

# Настройка уровня логирования
sc = SparkContext()
sc.setLogLevel("WARN")  # Уровни: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN

spark = SparkSession.builder.getOrCreate()
```

#### **Кастомизация формата логов**
Добавьте в `$SPARK_HOME/conf/log4j2.properties`:
```properties
name=SparkConfig
appender.console.type=Console
appender.console.name=STDOUT
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

logger.spark.name=org.apache.spark
logger.spark.level=warn

rootLogger.level=warn
rootLogger.appenderRefs=stdout
rootLogger.appenderRef.stdout.ref=STDOUT
```

---

#### **Инструменты отладки. Spark UI**
Доступен по адресу `http://<driver-node>:4040`:
- **Jobs**: Статус и длительность заданий
- **Stages**: Детализация по стадиям выполнения
- **Storage**: Кэшированные RDD/DataFrame
- **Environment**: Конфигурация Spark

#### **Анализ плана выполнения**
```python
# Объяснение физического плана
df.explain(mode="extended")

# Сохранение плана выполнения в файл
with open("query_plan.txt", "w") as f:
    f.write(df._jdf.queryExecution().toString())
```

#### **Мониторинг через API**
```python
# Получение информации о заданиях
status_tracker = sc.statusTracker()
for job_id in status_tracker.getJobIdsForGroup():
    job_info = status_tracker.getJobInfo(job_id)
    print(f"Job {job_id}: {job_info.status}")

# Метрики выполнения
print(sc.uiWebUrl)
```

---

#### **Отладка распределённых приложений. Логирование внутри преобразований**
```python
def debug_transform(partition):
    import logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    for row in partition:
        logger.info(f"Processing row: {row}")
        yield row

rdd.mapPartitions(debug_transform).count()
```

#### **Отлов исключений**
```python
def safe_transform(row):
    try:
        # Ваша логика обработки
        return processed_row
    except Exception as e:
        import sys
        print(f"Error processing row {row}: {e}", file=sys.stderr)
        return None  # или специальное значение для обработки ошибок

clean_rdd = rdd.map(safe_transform).filter(lambda x: x is not None)
```

---

#### **Анализ распространённых ошибок. OOM (Out of Memory)**
**Симптомы**:
- `java.lang.OutOfMemoryError`
- Зависание задач

**Решение**:
```python
spark = SparkSession.builder \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.memory.fraction", "0.8") \
    .getOrCreate()
```

#### **Data Skew (Перекос данных)**
**Симптомы**:
- Несколько задач выполняются значительно дольше других
- Медленные JOIN-операции

**Решение**:
```python
# Сольтинг (salting)
from pyspark.sql.functions import rand

df = df.withColumn("salt", (rand() * 100).cast("int"))
df.groupBy("key", "salt").count()
```

#### **Проблемы с сериализацией**
**Симптомы**:
- `PicklingError` или `SerializationException`

**Решение**:
```python
# Убедитесь, что все функции и объекты сериализуемы
def external_function(x):
    return x * 2

# Неправильно:
rdd.map(lambda x: some_external_function(x))

# Правильно:
rdd.map(external_function)
```

---

#### **Продвинутые техники. Логирование в распределённом режиме**
```python
def log_partition(iterator):
    import socket
    host = socket.gethostname()
    print(f"Processing on {host}")
    yield sum(1 for _ in iterator)

count = rdd.mapPartitions(log_partition).sum()
```

#### **Интеграция с Sentry/ELK**
```python
from py4j.java_gateway import JavaGateway
gateway = JavaGateway()

def log_to_elk(message):
    elk_logger = gateway.jvm.org.apache.log4j.Logger.getLogger("ELK")
    elk_logger.info(message)

rdd.foreach(lambda x: log_to_elk(f"Processed: {x}"))
```

---

#### **Практическое задание**
**Задача**: Реализовать отладочный пайплайн для ETL-процесса  
**Требования**:
1. Логировать количество обработанных строк в каждом партицировании
2. Собирать метрики времени выполнения операций
3. Реализовать обработку ошибок с сохранением "битых" записей
4. Генерировать отчёт о качестве данных

**Решение**:
```python
from datetime import datetime
import json

def debug_pipeline(partition):
    metrics = {
        "start_time": datetime.now().isoformat(),
        "processed": 0,
        "errors": 0,
        "bad_rows": []
    }
    
    for row in partition:
        try:
            # Ваша обработка
            processed_row = transform(row)
            metrics["processed"] += 1
            yield processed_row
        except Exception as e:
            metrics["errors"] += 1
            metrics["bad_rows"].append({
                "row": str(row),
                "error": str(e)
            })
    
    metrics["end_time"] = datetime.now().isoformat()
    with open(f"/tmp/metrics_{datetime.now().timestamp()}.json", "w") as f:
        json.dump(metrics, f)

# Применение
result_rdd = input_rdd.mapPartitions(debug_pipeline)
```

---

#### **Лучшие практики**
1. **Логируйте контекст**: Добавляйте ID задания, время, хост
2. **Разделяйте логи**: Отдельные логи для ошибок, предупреждений, информации
3. **Используйте структурированный формат**: JSON для логов
4. **Мониторьте ключевые метрики**:
   - Время выполнения задач
   - Использование памяти
   - Количество ошибок
