# Воркшоп по Spark и оптимизациям

Данные в `retail_data.csv`

```json
{
   "InvoiceNo":"536365",
   "StockCode":"85123A",
   "Description":"WHITE HANGING HEART T-LIGHT HOLDER",
   "Quantity":"6",
   "InvoiceDate":"12/1/2010 8:26",
   "UnitPrice":"2.55",
   "CustomerID":"17850",
   "Country":"United Kingdom"
}
```

Данные в `customer_data.csv`

```json
{
   "CustomerID":"12346",
   "Address":"Unit 1047 Box 4089\nDPO AA 57348",
   "Birthdate":"1994-02-20 00:46:27",
   "Email":"cooperalexis@hotmail.com",
   "Name":"Lindsay Cowan",
   "Username":"valenciajennifer"
}
```


## Задание 0

Все логи по умолчанию пишутся в консоль. Чтобы увидеть их в ноутбуке, необходимо выполнить следующие действия:
 - В консоли докера с `pyspark` выполнить команду `ipython profile create`;
 - В файле `.ipython/profile_default/ipython_kernel_config.py` раскомментировать строку `c.IPKernelApp.capture_fd_output = True`;
 - Перезапустить `kernel` в ноутбуке.

In [27]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.streaming.listener import StreamingListener

In [28]:
# Создать сессию Spark
spark = SparkSession.builder.appName('yp-spark-workshop').master('local[*]') \
  .getOrCreate()

## Задание 1: repartition

Репартицировать данные `retail_data` по стране.

In [29]:
df_customer = spark.read \
       .format('json') \
       .option('mode', 'FAILFAST') \
       .load('/home/jovyan/customer_data.json')
    

In [30]:
df_customer.printSchema()

root
 |-- Address: string (nullable = true)
 |-- Birthdate: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Username: string (nullable = true)



In [6]:
df_customer.show()

+--------------------+-------------------+--------------+----------+--------------------+-----------------+----------------+
|             Address|          Birthdate|       Country|CustomerID|               Email|             Name|        Username|
+--------------------+-------------------+--------------+----------+--------------------+-----------------+----------------+
|Unit 1047 Box 408...|1994-02-20 00:46:27|United Kingdom|     12346|cooperalexis@hotm...|    Lindsay Cowan|valenciajennifer|
|55711 Janet Plaza...|1988-06-21 00:15:34|       Iceland|     12347|timothy78@hotmail...|  Katherine David|      hillrachel|
|Unit 2676 Box 935...|1974-11-26 15:30:20|       Finland|     12348| tcrawford@gmail.com|  Leslie Martinez|    serranobrian|
|2765 Powers Meado...|1977-05-06 23:57:35|         Italy|     12349|  dustin37@yahoo.com|    Brad Cardenas|   charleshudson|
|17677 Mark Crest\...|1996-09-13 19:14:27|        Norway|     12350|amyholland@yahoo.com|     Natalie Ford| gregoryharrison|


In [8]:
df_customer.rdd.getNumPartitions()

1

In [12]:
newDf = df_customer.repartition(5, 'Country')

In [13]:
newDf.rdd.getNumPartitions()

5

In [15]:
newDf.write.format('csv').save('/home/jovyan/partitionedData')

                                                                                

## Задание 2: broadcast join

Соединить два датафрейма по ключу `CustomerID`

In [16]:
df_retail = spark.read \
       .format('json') \
       .option('mode', 'FAILFAST') \
       .load('/home/jovyan/retail_data.json')

                                                                                

In [17]:
df_retail.count()

46077

In [19]:
df_customer.count()

507

In [21]:
from pyspark.sql.functions import broadcast

In [22]:
df_joined = df_retail.join(broadcast(df_customer), 'CustomerID')

In [23]:
df_joined.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [CustomerID#102, Description#103, InvoiceDate#104, InvoiceNo#105, Quantity#106, StockCode#107, UnitPrice#108, Address#8, Birthdate#9, Country#10, Email#12, Name#13, Username#14]
   +- BroadcastHashJoin [CustomerID#102], [CustomerID#11], Inner, BuildRight, false
      :- Filter isnotnull(CustomerID#102)
      :  +- FileScan json [CustomerID#102,Description#103,InvoiceDate#104,InvoiceNo#105,Quantity#106,StockCode#107,UnitPrice#108] Batched: false, DataFilters: [isnotnull(CustomerID#102)], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/home/jovyan/retail_data.json], PartitionFilters: [], PushedFilters: [IsNotNull(CustomerID)], ReadSchema: struct<CustomerID:string,Description:string,InvoiceDate:string,InvoiceNo:string,Quantity:string,S...
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[3, string, false]),false), [plan_id=156]
         +- Filter isnotnull(CustomerID#11)
            +- FileScan jso

In [24]:
df_joined.show()

+----------+--------------------+---------------+---------+--------+---------+---------+--------------------+-------------------+--------------+--------------------+---------------+----------------+
|CustomerID|         Description|    InvoiceDate|InvoiceNo|Quantity|StockCode|UnitPrice|             Address|          Birthdate|       Country|               Email|           Name|        Username|
+----------+--------------------+---------------+---------+--------+---------+---------+--------------------+-------------------+--------------+--------------------+---------------+----------------+
|     12346|MEDIUM CERAMIC TO...|1/18/2011 10:01|   541431|   74215|    23166|     1.04|Unit 1047 Box 408...|1994-02-20 00:46:27|United Kingdom|cooperalexis@hotm...|  Lindsay Cowan|valenciajennifer|
|     12346|MEDIUM CERAMIC TO...|1/18/2011 10:17|  C541433|  -74215|    23166|     1.04|Unit 1047 Box 408...|1994-02-20 00:46:27|United Kingdom|cooperalexis@hotm...|  Lindsay Cowan|valenciajennifer|
|    

## Задание 3: кэш

In [32]:
df_retail.cache()

DataFrame[CustomerID: string, Description: string, InvoiceDate: string, InvoiceNo: string, Quantity: string, StockCode: string, UnitPrice: string]

In [34]:
df_retail.head()

                                                                                

Row(CustomerID='12346', Description='MEDIUM CERAMIC TOP STORAGE JAR', InvoiceDate='1/18/2011 10:01', InvoiceNo='541431', Quantity='74215', StockCode='23166', UnitPrice='1.04')