### Чем плох RDD

1. Данные рассматриваются как текстовые файлы (или бинарники), разделённые по строкам. Про структуру данных ничего не знаем.
  * Каждая задача начинается с парсинга структуры.
  * Иногда нужно работать со столбцами, а не хранить все строки.
2. Неудобно. Хочется что-то похожее на SQL и / или pandasю

### Dataframes
* Похожи на DF в pandas или SQL.
* Работают поверх RDD.

##### Создаем Spark сессию

`.getOrCreate()` - сессия (также как и SparkContext) в рамках приложения существует как Singleton.

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Spark DF practice').master('yarn').getOrCreate()

Если работаем с DF, используем SparkSession

In [3]:
spark

Если нужно RDD - используем SparkContext. Он находится внутри SparkSession.

In [4]:
spark.sparkContext  # или просто `sc`

## Создание DF

### Читаем данные из источника

In [5]:
%%time
df = (spark.read.format("json")
    .load("/data/yelp/review")
)

CPU times: user 1.11 ms, sys: 3.73 ms, total: 4.85 ms
Wall time: 22.3 s


`options` очень много. Подробнее можно прочесть [здесь](https://spark.apache.org/docs/2.4.4/api/java/org/apache/spark/sql/DataFrameReader.html#option-java.lang.String-java.lang.String-) для каждого формата.

Почему код так долго выполняется? Почему по логике так быть не должно?

In [6]:
df # видим струтуру

DataFrame[business_id: string, cool: bigint, date: string, funny: bigint, review_id: string, stars: double, text: string, useful: bigint, user_id: string]

In [7]:
df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)



Проверим схему данных

In [9]:
df.show(1)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|XQfwVwDr-v0ZS3_Cb...|   0|2018-07-07 22:09:11|    0|KU_O5udG6zpxOg-Vc...|  3.0|If you decide to ...|     0|mh_-eMZ6K5RLWhZyI...|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
only showing top 1 row



Можно задавать схему данных для текстовых форматов (мы указали формат json, поэтому схема автоматически была прочитана)

In [10]:
# # Для каждого поля пишем название, тип и ещё можно указать Nullable
# from pyspark.sql.types import *
# schema = StructType(fields=[
#     StructField("user_id", IntegerType()),
#     StructField("age", IntegerType()),
#     StructField("gender", StringType()),
#     StructField("occupation", StringType()),
#     StructField("zip", IntegerType())
# ])

Есть трансформация `summary`, кот. аналогично pandas, выводит статистику. Это трансформация => для работы нужно вызвать action (show)

In [11]:
df.summary().show(10)

+-------+--------------------+------------------+-------------------+-------------------+--------------------+------------------+----------------------+------------------+--------------------+
|summary|         business_id|              cool|               date|              funny|           review_id|             stars|                  text|            useful|             user_id|
+-------+--------------------+------------------+-------------------+-------------------+--------------------+------------------+----------------------+------------------+--------------------+
|  count|             6990280|           6990280|            6990280|            6990280|             6990280|           6990280|               6990280|           6990280|             6990280|
|   mean|                null|0.4986175088837643|               null|0.32655959417934616|                null|  3.74858374771826|                  null|1.1846089140921394|                null|
| stddev|                null| 2.17

### Преобразование из RDD

In [12]:
#RDD разделители внутри строк не видит, нужно парсить

import json
rdd = sc.textFile("/data/yelp/review").map(lambda x: json.loads(x))

In [13]:
rdd.take(5)

[{'review_id': 'KU_O5udG6zpxOg-VcAEodg',
  'user_id': 'mh_-eMZ6K5RLWhZyISBhwA',
  'business_id': 'XQfwVwDr-v0ZS3_CbbE5Xw',
  'stars': 3.0,
  'useful': 0,
  'funny': 0,
  'cool': 0,
  'text': "If you decide to eat here, just be aware it is going to take about 2 hours from beginning to end. We have tried it multiple times, because I want to like it! I have been to it's other locations in NJ and never had a bad experience. \n\nThe food is good, but it takes a very long time to come out. The waitstaff is very young, but usually pleasant. We have just had too many experiences where we spent way too long waiting. We usually opt for another diner or restaurant on the weekends, in order to be done quicker.",
  'date': '2018-07-07 22:09:11'},
 {'review_id': 'BiTunyQ73aT9WBnpR9DZGw',
  'user_id': 'OyoGAe7OKpv6SyGZT5g77Q',
  'business_id': '7ATYjTIgM3jUlt4UM3IypQ',
  'stars': 5.0,
  'useful': 1,
  'funny': 0,
  'cool': 1,
  'text': "I've taken a lot of spin classes over the years, and nothing com

In [14]:
%%time
#Конвертируем в DF
df = spark.createDataFrame(rdd)

CPU times: user 204 ms, sys: 40.4 ms, total: 244 ms
Wall time: 1.15 s




In [16]:
df.show(10)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|XQfwVwDr-v0ZS3_Cb...|   0|2018-07-07 22:09:11|    0|KU_O5udG6zpxOg-Vc...|  3.0|If you decide to ...|     0|mh_-eMZ6K5RLWhZyI...|
|7ATYjTIgM3jUlt4UM...|   1|2012-01-03 15:28:18|    0|BiTunyQ73aT9WBnpR...|  5.0|I've taken a lot ...|     1|OyoGAe7OKpv6SyGZT...|
|YjUWPpI6HXG530lwP...|   0|2014-02-05 20:30:30|    0|saUsX_uimxRlCVr67...|  3.0|Family diner. Had...|     0|8g_iMtfSiwikVnbP2...|
|kxX2SOes4o-D3ZQBk...|   1|2015-01-04 00:01:03|    0|AqPFMleE6RsU23_au...|  5.0|Wow!  Yummy, diff...|     1|_7bHUi9Uuf5__HHc_...|
|e4Vwtrqf-wpJfwesg...|   1|2017-01-14 20:54:15|    0|Sx8TMOWLNuJBWer-0...|  4.0|Cute inter

In [17]:
df.schema

StructType(List(StructField(business_id,StringType,true),StructField(cool,LongType,true),StructField(date,StringType,true),StructField(funny,LongType,true),StructField(review_id,StringType,true),StructField(stars,DoubleType,true),StructField(text,StringType,true),StructField(useful,LongType,true),StructField(user_id,StringType,true)))

Визуальное отображение схемы

In [18]:
df.show(5, truncate=False, vertical=True)

-RECORD 0--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 business_id | XQfwVwDr-v0ZS3_CbbE5Xw                                                                                                                     

### Простые действия над DF
Вспоминаем реляционку:
 * Проекция (`SELECT`) - подмножество столбцов
 * Фильтр (`WHERE`, `HAVING`) - подмножество строк.

In [19]:
df.schema.fieldNames()

['business_id',
 'cool',
 'date',
 'funny',
 'review_id',
 'stars',
 'text',
 'useful',
 'user_id']

#### Проекция

In [20]:
df.select(["business_id", "stars", "text"]).show(5)

+--------------------+-----+--------------------+
|         business_id|stars|                text|
+--------------------+-----+--------------------+
|XQfwVwDr-v0ZS3_Cb...|  3.0|If you decide to ...|
|7ATYjTIgM3jUlt4UM...|  5.0|I've taken a lot ...|
|YjUWPpI6HXG530lwP...|  3.0|Family diner. Had...|
|kxX2SOes4o-D3ZQBk...|  5.0|Wow!  Yummy, diff...|
|e4Vwtrqf-wpJfwesg...|  4.0|Cute interior and...|
+--------------------+-----+--------------------+
only showing top 5 rows



Аналог `df[df.columns[:3]]`

In [23]:
df.select(*df.schema.fieldNames()[:3]) # log_df[log_df.columns[:3]]

DataFrame[business_id: string, cool: bigint, date: string]

In [24]:
df.select("business_id", "stars", "text")

DataFrame[business_id: string, stars: double, text: string]

Это всё трансформации. На выходе получаем DF. 

In [26]:
df.where("stars > 4").show(5) # SQL-like syntax

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|7ATYjTIgM3jUlt4UM...|   1|2012-01-03 15:28:18|    0|BiTunyQ73aT9WBnpR...|  5.0|I've taken a lot ...|     1|OyoGAe7OKpv6SyGZT...|
|kxX2SOes4o-D3ZQBk...|   1|2015-01-04 00:01:03|    0|AqPFMleE6RsU23_au...|  5.0|Wow!  Yummy, diff...|     1|_7bHUi9Uuf5__HHc_...|
|gmjsEdUsKpj9Xxu6p...|   0|2015-01-03 23:21:18|    2|6AxgBCNX_PNTOxmbR...|  5.0|Loved this tour! ...|     0|r3zeYsv1XFBRA4dJp...|
|LHSTtnW3YHCeUkRDG...|   0|2015-08-07 02:29:16|    0|_ZeMknuYdlQcUqng_...|  5.0|Amazingly amazing...|     2|yfFzsLmaWF2d4Sr0U...|
|uMvVYRgGNXf5boolA...|   0|2015-06-21 14:48:06|    0|rGQRf8UafX7OTlMNN...|  5.0|My experie

In [27]:
df.filter(df.stars > 4).show(5) # df[df.stars > 4]

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|7ATYjTIgM3jUlt4UM...|   1|2012-01-03 15:28:18|    0|BiTunyQ73aT9WBnpR...|  5.0|I've taken a lot ...|     1|OyoGAe7OKpv6SyGZT...|
|kxX2SOes4o-D3ZQBk...|   1|2015-01-04 00:01:03|    0|AqPFMleE6RsU23_au...|  5.0|Wow!  Yummy, diff...|     1|_7bHUi9Uuf5__HHc_...|
|gmjsEdUsKpj9Xxu6p...|   0|2015-01-03 23:21:18|    2|6AxgBCNX_PNTOxmbR...|  5.0|Loved this tour! ...|     0|r3zeYsv1XFBRA4dJp...|
|LHSTtnW3YHCeUkRDG...|   0|2015-08-07 02:29:16|    0|_ZeMknuYdlQcUqng_...|  5.0|Amazingly amazing...|     2|yfFzsLmaWF2d4Sr0U...|
|uMvVYRgGNXf5boolA...|   0|2015-06-21 14:48:06|    0|rGQRf8UafX7OTlMNN...|  5.0|My experie

In [28]:
df.filter("stars > 4 AND text LIKE '%amazing%'").show(5)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|7ATYjTIgM3jUlt4UM...|   1|2012-01-03 15:28:18|    0|BiTunyQ73aT9WBnpR...|  5.0|I've taken a lot ...|     1|OyoGAe7OKpv6SyGZT...|
|LHSTtnW3YHCeUkRDG...|   0|2015-08-07 02:29:16|    0|_ZeMknuYdlQcUqng_...|  5.0|Amazingly amazing...|     2|yfFzsLmaWF2d4Sr0U...|
|SZU9c8V2GuREDN5Kg...|   0|2016-05-31 02:14:54|    0|4zopEEPqfwm-c_FNp...|  5.0|We were a bit wea...|     0|JYYYKt6TdVA4ng9lL...|
|EpREWeEpmR8f1qLHz...|   0|2011-11-30 06:58:36|    0|-up4mW6WdqzGrRh7t...|  5.0|After living in t...|     0|xbybLiQockAzC4xAl...|
|5Ce3lZksYVkCbrihq...|   0|2014-07-25 17:56:26|    0|ymhbOMW63B_vGaRFR...|  5.0|I just sta

In [29]:
df.filter((df.stars > 4) & (df.text.like("%amazing%"))).show(5)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|7ATYjTIgM3jUlt4UM...|   1|2012-01-03 15:28:18|    0|BiTunyQ73aT9WBnpR...|  5.0|I've taken a lot ...|     1|OyoGAe7OKpv6SyGZT...|
|LHSTtnW3YHCeUkRDG...|   0|2015-08-07 02:29:16|    0|_ZeMknuYdlQcUqng_...|  5.0|Amazingly amazing...|     2|yfFzsLmaWF2d4Sr0U...|
|SZU9c8V2GuREDN5Kg...|   0|2016-05-31 02:14:54|    0|4zopEEPqfwm-c_FNp...|  5.0|We were a bit wea...|     0|JYYYKt6TdVA4ng9lL...|
|EpREWeEpmR8f1qLHz...|   0|2011-11-30 06:58:36|    0|-up4mW6WdqzGrRh7t...|  5.0|After living in t...|     0|xbybLiQockAzC4xAl...|
|5Ce3lZksYVkCbrihq...|   0|2014-07-25 17:56:26|    0|ymhbOMW63B_vGaRFR...|  5.0|I just sta

Переименуем некоторые поля

In [30]:
df.select(df.business_id, df.text.alias("review")).show(5)

+--------------------+--------------------+
|         business_id|              review|
+--------------------+--------------------+
|XQfwVwDr-v0ZS3_Cb...|If you decide to ...|
|7ATYjTIgM3jUlt4UM...|I've taken a lot ...|
|YjUWPpI6HXG530lwP...|Family diner. Had...|
|kxX2SOes4o-D3ZQBk...|Wow!  Yummy, diff...|
|e4Vwtrqf-wpJfwesg...|Cute interior and...|
+--------------------+--------------------+
only showing top 5 rows



Можно взять название колонок через строковую переменную. Это может быть удобно тогда, когда название столбца очень сложное!

In [31]:
import pyspark.sql.functions as f
df.select("business_id", f.col("text").alias("review")).show(5)

+--------------------+--------------------+
|         business_id|              review|
+--------------------+--------------------+
|XQfwVwDr-v0ZS3_Cb...|If you decide to ...|
|7ATYjTIgM3jUlt4UM...|I've taken a lot ...|
|YjUWPpI6HXG530lwP...|Family diner. Had...|
|kxX2SOes4o-D3ZQBk...|Wow!  Yummy, diff...|
|e4Vwtrqf-wpJfwesg...|Cute interior and...|
+--------------------+--------------------+
only showing top 5 rows



In [32]:
df[df.business_id, "text"].show(5)

+--------------------+--------------------+
|         business_id|                text|
+--------------------+--------------------+
|XQfwVwDr-v0ZS3_Cb...|If you decide to ...|
|7ATYjTIgM3jUlt4UM...|I've taken a lot ...|
|YjUWPpI6HXG530lwP...|Family diner. Had...|
|kxX2SOes4o-D3ZQBk...|Wow!  Yummy, diff...|
|e4Vwtrqf-wpJfwesg...|Cute interior and...|
+--------------------+--------------------+
only showing top 5 rows



Подключим механизм SQL! Для этого нам необходимо зарегистрировать таблицу

In [33]:
df.registerTempTable("df")

In [36]:
query_str = """
SELECT count(*) as cnt, business_id FROM df
GROUP BY business_id
"""

In [37]:
spark.sql(query_str).show(5)

+---+--------------------+
|cnt|         business_id|
+---+--------------------+
|111|2y_CdkxEOJEJGyJAp...|
| 33|8PNKnlnJg6snf-HUg...|
| 14|wS-SWAa_yaJAw6fJm...|
| 38|skW4boArIApRw9DXK...|
| 43|KBvdN8Apn4DIxuNW3...|
+---+--------------------+
only showing top 5 rows



# Объединение таблиц в Hive

In [38]:
business = spark.read.json('/data/yelp/business')

In [39]:
business.show(5)

+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+----------+------------+--------------------+-----------+------------+-----+-----+
|             address|          attributes|         business_id|          categories|         city|               hours|is_open|  latitude|   longitude|                name|postal_code|review_count|stars|state|
+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+----------+------------+--------------------+-----------+------------+-----+-----+
|1616 Chapala St, ...|[,,,,,,,,,,, True...|Pns2l4eNsfO8kk83d...|Doctors, Traditio...|Santa Barbara|                null|      0|34.4266787|-119.7111968|Abby Rappoport, L...|      93101|           7|  5.0|   CA|
|87 Grasso Plaza S...|[,,,,,,,,, True,,...|mpf3x-BjTdTEA3yCZ...|Shipping Centers,...|       Affton|[8:0-18:30, 0:0-0...|      1| 38.551126|  -90.335695|    

In [40]:
business.count()

150346

In [41]:
df.show(5)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|XQfwVwDr-v0ZS3_Cb...|   0|2018-07-07 22:09:11|    0|KU_O5udG6zpxOg-Vc...|  3.0|If you decide to ...|     0|mh_-eMZ6K5RLWhZyI...|
|7ATYjTIgM3jUlt4UM...|   1|2012-01-03 15:28:18|    0|BiTunyQ73aT9WBnpR...|  5.0|I've taken a lot ...|     1|OyoGAe7OKpv6SyGZT...|
|YjUWPpI6HXG530lwP...|   0|2014-02-05 20:30:30|    0|saUsX_uimxRlCVr67...|  3.0|Family diner. Had...|     0|8g_iMtfSiwikVnbP2...|
|kxX2SOes4o-D3ZQBk...|   1|2015-01-04 00:01:03|    0|AqPFMleE6RsU23_au...|  5.0|Wow!  Yummy, diff...|     1|_7bHUi9Uuf5__HHc_...|
|e4Vwtrqf-wpJfwesg...|   1|2017-01-14 20:54:15|    0|Sx8TMOWLNuJBWer-0...|  4.0|Cute inter

In [42]:
%%time
business_review = df.join(business, on='business_id', how='inner')

CPU times: user 0 ns, sys: 2.29 ms, total: 2.29 ms
Wall time: 35.2 ms


Быстро? Почему?

Это transformation!

Посмотрим на план запроса!

In [44]:
business_review.explain()

== Physical Plan ==
*(5) Project [business_id#783, cool#784L, date#785, funny#786L, review_id#787, stars#788, text#789, useful#790L, user_id#791, address#1181, attributes#1182, categories#1184, city#1185, hours#1186, is_open#1187L, latitude#1188, longitude#1189, name#1190, postal_code#1191, review_count#1192L, stars#1193, state#1194]
+- *(5) SortMergeJoin [business_id#783], [business_id#1183], Inner
   :- *(2) Sort [business_id#783 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(business_id#783, 200), true, [id=#312]
   :     +- *(1) Filter isnotnull(business_id#783)
   :        +- *(1) Scan ExistingRDD[business_id#783,cool#784L,date#785,funny#786L,review_id#787,stars#788,text#789,useful#790L,user_id#791]
   +- *(4) Sort [business_id#1183 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(business_id#1183, 200), true, [id=#321]
         +- *(3) Project [address#1181, attributes#1182, business_id#1183, categories#1184, city#1185, hours#1186, is_open#1187L, latit

In [45]:
business_review.show(5)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+--------------------+--------------------+--------------------+----+--------------------+-------+----------+-----------+--------------------+-----------+------------+-----+-----+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|             address|          attributes|          categories|city|               hours|is_open|  latitude|  longitude|                name|postal_code|review_count|stars|state|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+--------------------+--------------------+--------------------+----+--------------------+-------+----------+-----------+--------------------+-----------+------------+-----+-----+
|-0iIxySkp97WNlwK6...|   0|2018-03-23 18:26:15|    0|1eFR8S29TAYXPSW-q...|  

Посмотрим на количество партиций в RDD. Под капотом Spark SQL использует RDD

In [46]:
business_review.rdd.getNumPartitions()

200

Поменяем способ JOIN на BroadcastHashJoin

In [63]:
business.drop('stars')

DataFrame[address: string, attributes: struct<AcceptsInsurance:string,AgesAllowed:string,Alcohol:string,Ambience:string,BYOB:string,BYOBCorkage:string,BestNights:string,BikeParking:string,BusinessAcceptsBitcoin:string,BusinessAcceptsCreditCards:string,BusinessParking:string,ByAppointmentOnly:string,Caters:string,CoatCheck:string,Corkage:string,DietaryRestrictions:string,DogsAllowed:string,DriveThru:string,GoodForDancing:string,GoodForKids:string,GoodForMeal:string,HairSpecializesIn:string,HappyHour:string,HasTV:string,Music:string,NoiseLevel:string,Open24Hours:string,OutdoorSeating:string,RestaurantsAttire:string,RestaurantsCounterService:string,RestaurantsDelivery:string,RestaurantsGoodForGroups:string,RestaurantsPriceRange2:string,RestaurantsReservations:string,RestaurantsTableService:string,RestaurantsTakeOut:string,Smoking:string,WheelchairAccessible:string,WiFi:string>, business_id: string, categories: string, city: string, hours: struct<Friday:string,Monday:string,Saturday:string

In [64]:
spark.sql("SET spark.sql.autoBroadcastJoinThreshold = 10")
business_review_new = df.join(f.broadcast(business.drop('stars')), on='business_id', how='inner')
business_review_new.explain()

== Physical Plan ==
*(2) Project [business_id#783, cool#784L, date#785, funny#786L, review_id#787, stars#788, text#789, useful#790L, user_id#791, address#1181, attributes#1182, categories#1184, city#1185, hours#1186, is_open#1187L, latitude#1188, longitude#1189, name#1190, postal_code#1191, review_count#1192L, state#1194]
+- *(2) BroadcastHashJoin [business_id#783], [business_id#1183], Inner, BuildRight
   :- *(2) Filter isnotnull(business_id#783)
   :  +- *(2) Scan ExistingRDD[business_id#783,cool#784L,date#785,funny#786L,review_id#787,stars#788,text#789,useful#790L,user_id#791]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[2, string, true])), [id=#750]
      +- *(1) Project [address#1181, attributes#1182, business_id#1183, categories#1184, city#1185, hours#1186, is_open#1187L, latitude#1188, longitude#1189, name#1190, postal_code#1191, review_count#1192L, state#1194]
         +- *(1) Filter isnotnull(business_id#1183)
            +- FileScan json [address#1181,attrib

Узнаем количество партиций

In [65]:
business_review_new.rdd.getNumPartitions()

20

Количество партиций уменьшилось - используются партиции для таблицы review

In [66]:
business_review_new.show(5, vertical=True)

-RECORD 0----------------------------
 business_id  | XQfwVwDr-v0ZS3_Cb... 
 cool         | 0                    
 date         | 2018-07-07 22:09:11  
 funny        | 0                    
 review_id    | KU_O5udG6zpxOg-Vc... 
 stars        | 3.0                  
 text         | If you decide to ... 
 useful       | 0                    
 user_id      | mh_-eMZ6K5RLWhZyI... 
 address      | 1460 Bethlehem Pike  
 attributes   | [,, 'none', {'tou... 
 categories   | Restaurants, Brea... 
 city         | North Wales          
 hours        | [7:30-15:0, 7:30-... 
 is_open      | 1                    
 latitude     | 40.2101961875        
 longitude    | -75.2236385919       
 name         | Turning Point of ... 
 postal_code  | 19454                
 review_count | 169                  
 state        | PA                   
-RECORD 1----------------------------
 business_id  | 7ATYjTIgM3jUlt4UM... 
 cool         | 1                    
 date         | 2012-01-03 15:28:18  
 funny      

Посчитаем среднюю оценку

In [67]:
business_review_new.groupby('business_id').agg(
    f.avg('stars')
).show(5)

+--------------------+------------------+
|         business_id|        avg(stars)|
+--------------------+------------------+
|SDhiF9UG0gsuaXgCn...|4.5588235294117645|
|eXMfOA7nAh3dlLgyw...| 4.228310502283105|
|h_6ioAoKNLi01kPho...|               2.0|
|eG-XKCjF8VwxM_SH9...|3.8333333333333335|
|bncTqUdA8ZPcUkDDm...| 3.747967479674797|
+--------------------+------------------+
only showing top 5 rows



Посчитаем количество review по разным регионам

In [68]:
business_review_new.groupby('state').count().distinct().show(5)

+-----+------+
|state| count|
+-----+------+
|   AZ|431708|
|   LA|761673|
|   NJ|260897|
|   MI|    11|
|   NV|430678|
+-----+------+
only showing top 5 rows



In [71]:
result = business_review_new.groupby('city').count().distinct()

In [72]:
result.show(10)

+-------------+-----+
|         city|count|
+-------------+-----+
| Harleysville| 2640|
|  Merion Park|    7|
|   Westampton|  822|
|    Worcester|  154|
|        Bucks|   79|
|St.Pete Beach|   10|
|      MARLTON|   12|
|  AB Edmonton|    6|
|     TAMPA AP|   18|
|  Springfield|11686|
+-------------+-----+
only showing top 10 rows



Запишем результаты в таблицу

In [73]:
result.write.csv("business_review_counts.tsv", sep='\t', mode='overwrite')

Посмотрим содержимое записей

In [74]:
! hdfs dfs -ls business_review_counts.tsv

Found 201 items
-rw-r--r--   1 ubuntu ubuntu          0 2023-08-14 15:09 business_review_counts.tsv/_SUCCESS
-rw-r--r--   1 ubuntu ubuntu        124 2023-08-14 15:09 business_review_counts.tsv/part-00000-31501751-6d6d-471a-bfbb-b035ab0f32b6-c000.csv
-rw-r--r--   1 ubuntu ubuntu        125 2023-08-14 15:09 business_review_counts.tsv/part-00001-31501751-6d6d-471a-bfbb-b035ab0f32b6-c000.csv
-rw-r--r--   1 ubuntu ubuntu        141 2023-08-14 15:09 business_review_counts.tsv/part-00002-31501751-6d6d-471a-bfbb-b035ab0f32b6-c000.csv
-rw-r--r--   1 ubuntu ubuntu        112 2023-08-14 15:09 business_review_counts.tsv/part-00003-31501751-6d6d-471a-bfbb-b035ab0f32b6-c000.csv
-rw-r--r--   1 ubuntu ubuntu        104 2023-08-14 15:09 business_review_counts.tsv/part-00004-31501751-6d6d-471a-bfbb-b035ab0f32b6-c000.csv
-rw-r--r--   1 ubuntu ubuntu         84 2023-08-14 15:09 business_review_counts.tsv/part-00005-31501751-6d6d-471a-bfbb-b035ab0f32b6-c000.csv
-rw-r--r--   1 ubuntu ubuntu        1

Это - директория! В Spark все хранится в виде директорий

# Оконные функции

In [None]:
SELECT business_id, user_id, useful,
LEAD(useful) OVER (PARTITION BY business_id ORDER BY useful DESC),
RANK() OVER (PARTITION BY business_id ORDER BY useful DESC)
LIMIT 10;

In [78]:
from pyspark.sql import Window

df.select("business_id", "user_id", 'useful', f.rank().over(
    Window.partitionBy('business_id').orderBy(f.col('useful').desc())
).alias('rank')).show(5)

+--------------------+--------------------+----+
|         business_id|             user_id|rank|
+--------------------+--------------------+----+
|-0iIxySkp97WNlwK6...|gVO3VZwCu54Otc7bU...|   1|
|-0iIxySkp97WNlwK6...|vHc-UrI9yfL_pnnc6...|   2|
|-0iIxySkp97WNlwK6...|lRRVRehFcudfbjY6y...|   3|
|-0iIxySkp97WNlwK6...|DFTYFXMBLiuvxJFz7...|   4|
|-0iIxySkp97WNlwK6...|14WeUBavXkLZg-p_H...|   5|
+--------------------+--------------------+----+
only showing top 5 rows



In [80]:
df.select("business_id", "user_id", 'useful', f.rank().over(
    Window.partitionBy('business_id').orderBy(f.col('useful').desc())
).alias('rank')).where('rank <= 3').show(5)

+--------------------+--------------------+------+----+
|         business_id|             user_id|useful|rank|
+--------------------+--------------------+------+----+
|-0iIxySkp97WNlwK6...|gVO3VZwCu54Otc7bU...|    15|   1|
|-0iIxySkp97WNlwK6...|vHc-UrI9yfL_pnnc6...|    14|   2|
|-0iIxySkp97WNlwK6...|lRRVRehFcudfbjY6y...|    12|   3|
|-1dARtemb2Gy7Xlft...|mFpYLLmAKx8TOoLF2...|     1|   1|
|-1dARtemb2Gy7Xlft...|lENqldomRlPzN9qhm...|     0|   2|
+--------------------+--------------------+------+----+
only showing top 5 rows



# Итоги

1. Spark SQL использует обертку в виде SparkSession
2. Spark автоматически подхватывает схему в случае JSON, в случае чтения CSV-данных необходимо указывать схему данных.
3. Spark SQL оперирует DataFrame-ом с бесконечным количеством столбцов
4. Spark SQL позволяет удобным образом делать преобразования над столбцами после JOIN.
5. Spark SQL упрощает взаимодействие с оконными функциями!