# Работа с PySpark DataFrames

## Сравнение RDD и DataFrames

### Ограничения RDD
1. Данные рассматриваются как текстовые файлы или бинарные файлы, разделённые по строкам
   * Каждая задача начинается с парсинга структуры
   * Неудобно работать со столбцами
2. Отсутствие встроенной оптимизации запросов
3. Неудобный API по сравнению с SQL/pandas

### Преимущества DataFrames
* Структурированные данные со схемой
* Оптимизация запросов через Catalyst Optimizer
* Удобный API, похожий на pandas и SQL
* Автоматическая оптимизация выполнения через Tungsten

### Инициализация SparkSession

`.getOrCreate()` создаёт или возвращает существующую сессию (Singleton pattern)

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql import Row

# Создание SparkSession с оптимизациями
spark = SparkSession.builder \
    .appName('Spark DF Practice') \
    .master('yarn') \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .config("spark.sql.repl.eagerEval.maxNumRows", 10) \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "2g") \
    .config("spark.sql.adaptive.enabled", True) \
    .config("spark.sql.adaptive.coalescePartitions.enabled", True) \
    .getOrCreate()

# Установка уровня логирования
spark.sparkContext.setLogLevel("WARN")

### Проверка SparkSession

SparkSession - основной вход для работы с DataFrames

In [2]:
spark

### SparkContext для работы с RDD

SparkContext находится внутри SparkSession и используется для работы с RDD

In [3]:
sc = spark.sparkContext
sc

## Создание DataFrame

### Загрузка данных из JSON

In [4]:
%%time
# Оптимизированная загрузка с кэшированием и партиционированием
df = (spark.read.format("json")
      .load("/data/yelp/review")
      .repartition(200)  # Оптимальное количество партиций
      .cache())  # Кэширование для повторного использования

# Триггер вычисления для кэширования
df.count()

CPU times: user 8.85 ms, sys: 4.78 ms, total: 13.6 ms
Wall time: 22.1 s


### Структура данных

In [5]:
df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)



In [6]:
df.show(5, truncate=True)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|XQfwVwDr-v0ZS3_Cb...|   0|2018-07-07 22:09:11|    0|KU_O5udG6zpxOg-Vc...|  3.0|If you decide to ...|     0|mh_-eMZ6K5RLWhZyI...|
|7ATYjTIgM3jUlt4UM...|   1|2012-01-03 15:28:18|    0|BiTunyQ73aT9WBnpR...|  5.0|I've taken a lot ...|     1|OyoGAe7OKpv6SyGZT...|
|YjUWPpI6HXG530lwP...|   0|2014-02-05 20:30:30|    0|saUsX_uimxRlCVr67...|  3.0|Family diner. Had...|     0|8g_iMtfSiwikVnbP2...|
|kxX2SOes4o-D3ZQBk...|   1|2015-01-04 00:01:03|    0|AqPFMleE6RsU23_au...|  5.0|Wow! Yummy, diff...|     1|_7bHUi9Uuf5__HHc_...|
|e4Vwtrqf-wpJfwesg...|   1|2017-01-14 20:54:15|    0|Sx8TMOWLNuJBWer-0...|  4.0|Cute interi

### Создание DataFrame из RDD

In [7]:
# Создание RDD с правильной структурой
import json

# Используем Row для создания структурированных данных
rdd = sc.textFile("/data/yelp/review")\
        .map(lambda x: Row(**json.loads(x)))

In [8]:
%%time
# Создание DataFrame из RDD с явной схемой
schema = StructType([
    StructField("business_id", StringType(), True),
    StructField("cool", LongType(), True),
    StructField("date", StringType(), True),
    StructField("funny", LongType(), True),
    StructField("review_id", StringType(), True),
    StructField("stars", DoubleType(), True),
    StructField("text", StringType(), True),
    StructField("useful", LongType(), True),
    StructField("user_id", StringType(), True)
])

df_from_rdd = spark.createDataFrame(rdd, schema=schema)

CPU times: user 14.3 ms, sys: 4.72 ms, total: 19 ms
Wall time: 1.21 s


In [9]:
df_from_rdd.show(5)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|XQfwVwDr-v0ZS3_Cb...|   0|2018-07-07 22:09:11|    0|KU_O5udG6zpxOg-Vc...|  3.0|If you decide to ...|     0|mh_-eMZ6K5RLWhZyI...|
|7ATYjTIgM3jUlt4UM...|   1|2012-01-03 15:28:18|    0|BiTunyQ73aT9WBnpR...|  5.0|I've taken a lot ...|     1|OyoGAe7OKpv6SyGZT...|
|YjUWPpI6HXG530lwP...|   0|2014-02-05 20:30:30|    0|saUsX_uimxRlCVr67...|  3.0|Family diner. Had...|     0|8g_iMtfSiwikVnbP2...|
|kxX2SOes4o-D3ZQBk...|   1|2015-01-04 00:01:03|    0|AqPFMleE6RsU23_au...|  5.0|Wow! Yummy, diff...|     1|_7bHUi9Uuf5__HHc_...|
|e4Vwtrqf-wpJfwesg...|   1|2017-01-14 20:54:15|    0|Sx8TMOWLNuJBWer-0...|  4.0|Cute interi

## Операции над DataFrames

### Проекция (SELECT)

In [10]:
# Проекция столбцов
df.select("business_id", "stars", "text").show(5)

+--------------------+-----+--------------------+
|         business_id|stars|                text|
+--------------------+-----+--------------------+
|XQfwVwDr-v0ZS3_Cb...|  3.0|If you decide to ...|
|7ATYjTIgM3jUlt4UM...|  5.0|I've taken a lot ...|
|YjUWPpI6HXG530lwP...|  3.0|Family diner. Had...|
|kxX2SOes4o-D3ZQBk...|  5.0|Wow! Yummy, diff...|
|e4Vwtrqf-wpJfwesg...|  4.0|Cute interior and...|
+--------------------+-----+--------------------+
only showing top 5 rows



In [11]:
# Переименование столбцов
df.select(df.business_id, F.col("text").alias("review")).show(5)

+--------------------+--------------------+
|         business_id|             review|
+--------------------+--------------------+
|XQfwVwDr-v0ZS3_Cb...|If you decide to ...|
|7ATYjTIgM3jUlt4UM...|I've taken a lot ...|
|YjUWPpI6HXG530lwP...|Family diner. Had...|
|kxX2SOes4o-D3ZQBk...|Wow! Yummy, diff...|
|e4Vwtrqf-wpJfwesg...|Cute interior and...|
+--------------------+--------------------+
only showing top 5 rows



### Фильтрация (WHERE)

In [12]:
# Фильтрация с помощью SQL-синтаксиса
df.where("stars > 4").show(5)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|7ATYjTIgM3jUlt4UM...|   1|2012-01-03 15:28:18|    0|BiTunyQ73aT9WBnpR...|  5.0|I've taken a lot ...|     1|OyoGAe7OKpv6SyGZT...|
|kxX2SOes4o-D3ZQBk...|   1|2015-01-04 00:01:03|    0|AqPFMleE6RsU23_au...|  5.0|Wow! Yummy, diff...|     1|_7bHUi9Uuf5__HHc_...|
|gmjsEdUsKpj9Xxu6p...|   0|2015-01-03 23:21:18|    2|6AxgBCNX_PNTOxmbR...|  5.0|Loved this tour! ...|     0|r3zeYsv1XFBRA4dJp...|
|LHSTtnW3YHCeUkRDG...|   0|2015-08-07 02:29:16|    0|_ZeMknuYdlQcUqng_...|  5.0|Amazingly amazing...|     2|yfFzsLmaWF2d4Sr0U...|
|uMvVYRgGNXf5boolA...|   0|2015-06-21 14:48:06|    0|rGQRf8UafX7OTlMNN...|  5.0|My experien

In [13]:
# Комбинированная фильтрация
df.filter((F.col("stars") > 4) & (F.col("text").like("%amazing%"))).show(5)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|LHSTtnW3YHCeUkRDG...|   0|2015-08-07 02:29:16|    0|_ZeMknuYdlQcUqng_...|  5.0|Amazingly amazing...|     2|yfFzsLmaWF2d4Sr0U...|
|SZU9c8V2GuREDN5Kg...|   0|2016-05-31 02:14:54|    0|4zopEEPqfwm-c_FNp...|  5.0|We were a bit wea...|     0|JYYYKt6TdVA4ng9lL...|
|5Ce3lZksYVkCbrihq...|   0|2014-07-25 17:56:26|    0|ymhbOMW63B_vGaRFR...|  5.0|I just started go...|     0|yZdAhQ_KmKuCZmbBy...|
|zqNPRRk3q4fGr5DnG...|   0|2017-07-09 16:28:13|    0|GqobN9vLqJmWwKEx...|  5.0|I spent a week he...|     0|C1Tb3JC0PRjkbnAWw...|
|qm4W6GgvroT7THRXL...|   0|2011-12-06 20:47:42|    0|T6Z3vPCgMwt8_6tdK...|  5.0|I don't thi

### Агрегация и статистика

In [14]:
# Базовая статистика
df.summary().show(truncate=False)

+-------+--------------------+------------------+-------------------+-------------------+--------------------+------------------+----------------------+------------------+--------------------+
|summary|         business_id|              cool|               date|              funny|           review_id|             stars|                  text|            useful|             user_id|
+-------+--------------------+------------------+-------------------+-------------------+--------------------+------------------+----------------------+------------------+--------------------+
|  count|             6990280|           6990280|            6990280|            6990280|             6990280|           6990280|               6990280|           6990280|             6990280|
|   mean|                null|0.4986175088837643|               null|0.32655959417934616|                null| 3.74858374771826|                  null|1.1846089140921394|                null|
| stddev|                null| 2.172

In [15]:
# Агрегация
df.select(
    F.avg("stars").alias("avg_rating"),
    F.min("stars").alias("min_rating"),
    F.max("stars").alias("max_rating")
).show()

+------------------+------------------+------------------+
|        avg(stars)|        min(stars)|        max(stars)|
+------------------+------------------+------------------+
|3.7485837477182883|               1.0|               5.0|
+------------------+------------------+------------------+



### Использование SQL

In [16]:
# Регистрация DataFrame как временной таблицы
df.createOrReplaceTempView("reviews")

In [17]:
# SQL запрос
query = """
SELECT 
    COUNT(*) as cnt, 
    business_id 
FROM reviews 
GROUP BY business_id 
ORDER BY cnt DESC
"""

spark.sql(query).show(5)

+---+--------------------+
|cnt|         business_id|
+---+--------------------+
|111|2y_CdkxEOJEJGyJAp...|
| 33|8PNKnlnJg6snf-HUg...|
| 14|wS-SWAa_yaJAw6fJm...|
| 38|skW4boArIApRw9DXK...|
| 43|KBvdN8Apn4DIxuNW3...|
+---+--------------------+
only showing top 5 rows



## Объединение таблиц (JOIN)

In [18]:
# Загрузка данных о бизнесах
business = spark.read.json('/data/yelp/business').cache()
business.count()  # Триггер кэширования

In [19]:
business.show(5)

+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+----------+------------+--------------------+-----------+------------+-----+-----+
|             address|          attributes|         business_id|          categories|         city|               hours|is_open|  latitude|   longitude|                name|postal_code|review_count|stars|state|
+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+----------+------------+--------------------+-----------+------------+-----+-----+
|1616 Chapala St, ...|[,,,,,,,,,,, True...|Pns2l4eNsfO8kk83d...|Doctors, Traditio...|Santa Barbara|                null|      0|34.4266787|-119.7111968|Abby Rappoport, L...|      93101|           7|  5.0|   CA|
|87 Grasso Plaza S...|[,,,,,,,,, True,,...|mpf3x-BjTdTEA3yCZ...|Shipping Centers,...|       Affton|{[8:0-18:30, 0:0-0...|      1|  38.551126|  -90.335695|  

In [20]:
%%time
# Оптимизированный JOIN с broadcast для маленькой таблицы
from pyspark.sql.functions import broadcast

# Business таблица относительно небольшая (150k строк), используем broadcast
business_review = df.join(broadcast(business.select("business_id", "name", "city", "state")), 
                          on='business_id', 
                          how='inner')

CPU times: user 7.41 ms, sys: 2.9 ms, total: 10.3 ms
Wall time: 36.7 ms


In [21]:
business_review.select("business_id", "stars", "name", "city", "state").show(5)

+--------------------+-----+--------------------+----------+-----+
|         business_id|stars|                name|      city|state|
+--------------------+-----+--------------------+----------+-----+
|7ATYjTIgM3jUlt4UM...|  5.0|          Body Cycle|New York  |   NY|
|kxX2SOes4o-D3ZQBk...|  5.0|      Taj Mahal Grill|     Ocala|   FL|
|gmjsEdUsKpj9Xxu6p...|  5.0|     Southern Roots |Charleston|   SC|
|LHSTtnW3YHCeUkRDG...|  5.0|          Amazing VR|   Chicago|   IL|
|uMvVYRgGNXf5boolA...|  5.0|Bella Bella Cupca...|  Antioch|   CA|
+--------------------+-----+--------------------+----------+-----+
only showing top 5 rows



## Анализ плана выполнения

In [22]:
# План выполнения JOIN запроса
business_review.select("business_id", "stars", "name", "city", "state").explain()

== Physical Plan ==
*(2) Project [business_id#783, stars#788, name#1190, city#1185, state#1194]
+- *(2) BroadcastHashJoin [business_id#783], [business_id#1183], Inner, BuildRight
   :- *(2) Filter isnotnull(business_id#783)
   :  +- InMemoryTableScan [business_id#783, stars#788], [isnotnull(business_id#783)]
   :        +- InMemoryRelation [business_id#783, cool#784L, date#785, funny#786L, review_id#787, stars#788, text#789, useful#790L, user_id#791], StorageLevel(disk, memory, deserialized, 1 replicas)
   :              +- *(1) Scan ExistingRDD[business_id#783,cool#784L,date#785,funny#786L,review_id#787,stars#788,text#789,useful#790L,user_id#791]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true])), [id=#1274]
      +- *(1) Project [business_id#1183, name#1190, city#1185, state#1194]
         +- *(1) Filter isnotnull(business_id#1183)
            +- *(1) Scan ExistingRDD[business_id#1183,name#1190,city#1185,state#1194]


## Оптимизации и лучшие практики

In [23]:
# 1. Использование партиционирования
df_partitioned = df.repartition(100, "business_id")  # Партиционирование по ключу JOIN

# 2. Создание индексов (в Spark 3.x доступны индексы)
# df.createIndex("business_id")  # Только в Spark 3.0+

# 3. Управление памятью
df.persist()  # Альтернатива cache() с возможностью выбора уровня хранения

# 4. Оптимизация формата хранения
df.write.parquet("/tmp/yelp_reviews_optimized.parquet", mode="overwrite")

## Пример сложного анализа

In [24]:
# Анализ зависимости оценок от полезности отзывов
analysis = df.groupBy("stars").agg(
    F.avg("cool").alias("avg_cool"),
    F.avg("useful").alias("avg_useful"),
    F.avg("funny").alias("avg_funny")
).orderBy("stars")

analysis.show()

+-----+-------------------+------------------+-------------------+
|stars|           avg_cool|        avg_useful|         avg_funny|
+-----+-------------------+------------------+-------------------+
|  1.0| 0.1452255588498393| 1.483388704318936|0.2940540518089477|
|  2.0|0.22927308379704914|1.1568619205414952|0.2417171637250118|
|  3.0|0.34169988573424386| 1.077341333796138|0.2344559838861801|
|  4.0|0.6116828254847645|0.9679012864491215|0.3360749453376191|
|  5.0| 0.548668893099766|0.8899206193181195| 0.383724926918821|
+-----+-------------------+------------------+-------------------+



## Заключение

### Ключевые моменты:
1. DataFrames предоставляют структурированный API поверх RDD
2. Оптимизация Catalyst обеспечивает эффективное выполнение запросов
3. Используйте кэширование для повторно используемых данных
4. Broadcast join для маленьких таблиц
5. Правильное партиционирование улучшает производительность

In [25]:
# Очистка ресурсов
spark.stop()