# Практическая работа 12.5

**Задания:**
1.  **Загрузка и анализ данных из CSV-файла**
    1.  Разархивируйте sales_data.zip, в нем данные по продажам за 12 месяцев 2019 г. Считайте  в Spark DataFrame каждый из них и объедините в один большой датафрейм методом union, добавьте колонку month для обозначения месяца.
    2.  Выведите схему (schema) DataFrame и общую статистику по данным.
    3.  Выясните, какие товары пользуются наибольшим спросом (больше всего продаж), выведите Топ-10 товаров и кол-во их покупок.
2.  **Партиционирование данных и оптимизация производительности**
    1.  Добавьте партиционирование в датафрейм, используя выбранный столбец как ключ партиционирования, и объясните свой выбор.
    2.  Выполните операцию агрегации данных (например, суммирование кол-во проданных штук) по партициям.
    3.  Оцените производительность и сравните её с непартиционированной таблицей, например замерив все выполнения кода в ячейках Jupyter notebook с помощью %timeit. 
    4.  Измените количество партиций на одну и запишите файл обратно на диск.
3.  **Работа с различными форматами данных**
    1.  Сохраните датафрейм с данными, изначально хранящиеся в формате CSV, в других доступных форматах. Например, JSON, Parquet, Avro, ORC.
    2.  Проанализируйте различия в производительности (как быстро запись происходит) и объёме хранения для каждого формата на диске компьютера.
    3.  Сделайте выводы, оцените плюсы и минусы разных форматов.
4.  **Работа с UDF и кешированием**
    1.  Создайте UDF на Python, которая парсит адрес покупателя и разбивает колонку Purchase Address на Street address, City, State, Postal Code. 
    2.  Примените UDF к Spark DataFrame и убедитесь, что форматирование прошло успешно и добавились 4 новые колонки.
    3.  Сделайте ещё одно преобразование датафрейма - перемножьте проданные штуки на цену единицы товара, получите Total price, предварительно закешировав (cache) результат применения UDF.
    4.  Оцените скорость выполнения всей цепочки преобразований датафрейма (UDF и добавление Total price) без кеширования и с его наличием.
    5.  Придумайте и опишите пример из жизни, когда кеширование важно применять и почему это может быть полезно.

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as sf
from pyspark.sql.types import StructType, StructField, StringType, FloatType
from pathlib import Path
import subprocess

### 1. Загрузка и анализ данных из CSV-файла

-  Разархивируйте sales_data.zip, в нем данные по продажам за 12 месяцев 2019 г. Считайте  в Spark DataFrame каждый из них и объедините в один большой датафрейм методом union, добавьте колонку month для обозначения месяца.

In [12]:
!ls 

12_5_Практическая_работа.ipynb	sales_data.zip


In [3]:
!unzip sales_data.zip -d ./sales_data

Archive:  sales_data.zip
  inflating: ./sales_data/Sales_April_2019.csv  
  inflating: ./sales_data/Sales_August_2019.csv  
  inflating: ./sales_data/Sales_December_2019.csv  
  inflating: ./sales_data/Sales_February_2019.csv  
  inflating: ./sales_data/Sales_January_2019.csv  
  inflating: ./sales_data/Sales_July_2019.csv  
  inflating: ./sales_data/Sales_June_2019.csv  
  inflating: ./sales_data/Sales_March_2019.csv  
  inflating: ./sales_data/Sales_May_2019.csv  
  inflating: ./sales_data/Sales_November_2019.csv  
  inflating: ./sales_data/Sales_October_2019.csv  
  inflating: ./sales_data/Sales_September_2019.csv  


In [14]:
!ls sales_data

Sales_April_2019.csv	 Sales_January_2019.csv  Sales_May_2019.csv
Sales_August_2019.csv	 Sales_July_2019.csv	 Sales_November_2019.csv
Sales_December_2019.csv  Sales_June_2019.csv	 Sales_October_2019.csv
Sales_February_2019.csv  Sales_March_2019.csv	 Sales_September_2019.csv


In [4]:
app_name = 'Practice_12_5'
master = "local[*]"

conf = SparkConf().setAppName(app_name).setMaster(master)

sc = SparkContext(conf=conf)
spark = SparkSession(sc)



In [16]:
!hdfs dfs -ls

Found 1 items
drwxr-xr-x   - jovyan supergroup          0 2024-06-07 23:12 top_30_words


In [5]:
sc

In [6]:
data_path = Path('./sales_data')
subprocess.call(['hdfs', 'dfs', '-mkdir', str(data_path)])

data_sdf = None

for file_path in data_path.glob('*.csv'):
    file_title = file_path.parts[-1].split('.', -1)[0]
    _, month, year = file_title.split('_')
    year = int(year)
    
    subprocess.call(['hdfs', 'dfs', '-copyFromLocal', str(file_path), str(file_path)])
    tmp_sdf = spark.read.csv(str(file_path), header=True, inferSchema=True)
    tmp_sdf = tmp_sdf.withColumn('Month', sf.lit(month))
    
    if data_sdf is None:
        data_sdf = tmp_sdf
    else:
        data_sdf = data_sdf.union(tmp_sdf)
    
    print(tmp_sdf.take(5))

[Row(Order ID=278797, Product='Wired Headphones', Quantity Ordered=1, Price Each=11.99, Order Date='11/21/19 09:54', Purchase Address='46 Park St, New York City, NY 10001', Month='November'), Row(Order ID=278798, Product='USB-C Charging Cable', Quantity Ordered=2, Price Each=11.95, Order Date='11/17/19 10:03', Purchase Address='962 Hickory St, Austin, TX 73301', Month='November'), Row(Order ID=278799, Product='Apple Airpods Headphones', Quantity Ordered=1, Price Each=150.0, Order Date='11/19/19 14:56', Purchase Address='464 Cherry St, Los Angeles, CA 90001', Month='November'), Row(Order ID=278800, Product='27in FHD Monitor', Quantity Ordered=1, Price Each=149.99, Order Date='11/25/19 22:24', Purchase Address='649 10th St, Seattle, WA 98101', Month='November'), Row(Order ID=278801, Product='Bose SoundSport Headphones', Quantity Ordered=1, Price Each=99.99, Order Date='11/09/19 13:56', Purchase Address='522 Hill St, Boston, MA 02215', Month='November')]
[Row(Order ID=162009, Product='iPh

[Row(Order ID=150502, Product='iPhone', Quantity Ordered=1, Price Each=700.0, Order Date='02/18/19 01:35', Purchase Address='866 Spruce St, Portland, ME 04101', Month='February'), Row(Order ID=150503, Product='AA Batteries (4-pack)', Quantity Ordered=1, Price Each=3.84, Order Date='02/13/19 07:24', Purchase Address='18 13th St, San Francisco, CA 94016', Month='February'), Row(Order ID=150504, Product='27in 4K Gaming Monitor', Quantity Ordered=1, Price Each=389.99, Order Date='02/18/19 09:46', Purchase Address='52 6th St, New York City, NY 10001', Month='February'), Row(Order ID=150505, Product='Lightning Charging Cable', Quantity Ordered=1, Price Each=14.95, Order Date='02/02/19 16:47', Purchase Address='129 Cherry St, Atlanta, GA 30301', Month='February'), Row(Order ID=150506, Product='AA Batteries (4-pack)', Quantity Ordered=2, Price Each=3.84, Order Date='02/28/19 20:32', Purchase Address='548 Lincoln St, Seattle, WA 98101', Month='February')]
[Row(Order ID=248151, Product='AA Batte

In [19]:
data_sdf.take(5)

[Row(Order ID=278797, Product='Wired Headphones', Quantity Ordered=1, Price Each=11.99, Order Date='11/21/19 09:54', Purchase Address='46 Park St, New York City, NY 10001', Month='November'),
 Row(Order ID=278798, Product='USB-C Charging Cable', Quantity Ordered=2, Price Each=11.95, Order Date='11/17/19 10:03', Purchase Address='962 Hickory St, Austin, TX 73301', Month='November'),
 Row(Order ID=278799, Product='Apple Airpods Headphones', Quantity Ordered=1, Price Each=150.0, Order Date='11/19/19 14:56', Purchase Address='464 Cherry St, Los Angeles, CA 90001', Month='November'),
 Row(Order ID=278800, Product='27in FHD Monitor', Quantity Ordered=1, Price Each=149.99, Order Date='11/25/19 22:24', Purchase Address='649 10th St, Seattle, WA 98101', Month='November'),
 Row(Order ID=278801, Product='Bose SoundSport Headphones', Quantity Ordered=1, Price Each=99.99, Order Date='11/09/19 13:56', Purchase Address='522 Hill St, Boston, MA 02215', Month='November')]

In [20]:
!hdfs dfs -ls

Found 2 items
drwxr-xr-x   - jovyan supergroup          0 2024-06-09 20:27 sales_data
drwxr-xr-x   - jovyan supergroup          0 2024-06-07 23:12 top_30_words


In [21]:
!hdfs dfs -ls ./sales_data

Found 12 items
-rw-r--r--   1 jovyan supergroup    1595953 2024-06-09 20:26 sales_data/Sales_April_2019.csv
-rw-r--r--   1 jovyan supergroup    1043593 2024-06-09 20:26 sales_data/Sales_August_2019.csv
-rw-r--r--   1 jovyan supergroup    2181642 2024-06-09 20:26 sales_data/Sales_December_2019.csv
-rw-r--r--   1 jovyan supergroup    1046495 2024-06-09 20:26 sales_data/Sales_February_2019.csv
-rw-r--r--   1 jovyan supergroup     843098 2024-06-09 20:26 sales_data/Sales_January_2019.csv
-rw-r--r--   1 jovyan supergroup    1248753 2024-06-09 20:27 sales_data/Sales_July_2019.csv
-rw-r--r--   1 jovyan supergroup    1182508 2024-06-09 20:26 sales_data/Sales_June_2019.csv
-rw-r--r--   1 jovyan supergroup    1323497 2024-06-09 20:26 sales_data/Sales_March_2019.csv
-rw-r--r--   1 jovyan supergroup    1443965 2024-06-09 20:26 sales_data/Sales_May_2019.csv
-rw-r--r--   1 jovyan supergroup    1534677 2024-06-09 20:26 sales_data/Sales_November_2019.csv
-rw-r--r--   1 jovyan supergroup    

- Выведите схему (schema) DataFrame и общую статистику по данным.

In [22]:
data_sdf.printSchema()

root
 |-- Order ID: integer (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: integer (nullable = true)
 |-- Price Each: double (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Purchase Address: string (nullable = true)
 |-- Month: string (nullable = false)



In [23]:
data_sdf.describe().show()

+-------+-----------------+------------+------------------+------------------+--------------+--------------------+---------+
|summary|         Order ID|     Product|  Quantity Ordered|        Price Each|    Order Date|    Purchase Address|    Month|
+-------+-----------------+------------+------------------+------------------+--------------+--------------------+---------+
|  count|           185950|      186305|            185950|            185950|        186305|              186305|   186850|
|   mean|230417.5693788653|        null|1.1243828986286637|184.39973476743927|          null|                null|     null|
| stddev|51512.73710999595|        null|0.4427926240286695| 332.7313298843437|          null|                null|     null|
|    min|           141234|20in Monitor|                 1|              2.99|01/01/19 03:07|1 11th St, Atlant...|    April|
|    max|           319670|      iPhone|                 9|            1700.0|    Order Date|    Purchase Address|September|


- Выясните, какие товары пользуются наибольшим спросом (больше всего продаж), выведите Топ-10 товаров и кол-во их покупок.

In [7]:
total_quantity_ordered_sdf = data_sdf.groupBy("Product") \
    .agg(sf.sum("Quantity Ordered")) \
    .select(
        sf.col("Product"), 
        sf.col("sum(Quantity Ordered)").alias("Total Quantity Ordered")
    ) \
    .orderBy("Total Quantity Ordered", ascending=False)

total_quantity_ordered_sdf.limit(10).show()

+--------------------+----------------------+
|             Product|Total Quantity Ordered|
+--------------------+----------------------+
|AAA Batteries (4-...|                 31017|
|AA Batteries (4-p...|                 27635|
|USB-C Charging Cable|                 23975|
|Lightning Chargin...|                 23217|
|    Wired Headphones|                 20557|
|Apple Airpods Hea...|                 15661|
|Bose SoundSport H...|                 13457|
|    27in FHD Monitor|                  7550|
|              iPhone|                  6849|
|27in 4K Gaming Mo...|                  6244|
+--------------------+----------------------+



### 2. Партиционирование данных и оптимизация производительности

- Добавьте партиционирование в датафрейм, используя выбранный столбец как ключ партиционирования, и объясните свой выбор.

In [25]:
data_sdf.count()

186850

In [26]:
data_sdf.limit(5).show()

+--------+--------------------+----------------+----------+--------------+--------------------+--------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|   Month|
+--------+--------------------+----------------+----------+--------------+--------------------+--------+
|  278797|    Wired Headphones|               1|     11.99|11/21/19 09:54|46 Park St, New Y...|November|
|  278798|USB-C Charging Cable|               2|     11.95|11/17/19 10:03|962 Hickory St, A...|November|
|  278799|Apple Airpods Hea...|               1|     150.0|11/19/19 14:56|464 Cherry St, Lo...|November|
|  278800|    27in FHD Monitor|               1|    149.99|11/25/19 22:24|649 10th St, Seat...|November|
|  278801|Bose SoundSport H...|               1|     99.99|11/09/19 13:56|522 Hill St, Bost...|November|
+--------+--------------------+----------------+----------+--------------+--------------------+--------+



In [27]:
data_sdf.select("Product").distinct().count()

21

In [28]:
data_sdf.rdd.getNumPartitions()

12

In [8]:
# Делаю партиционирование по клонке "Month" или "Product" т.к
# они являются категориальными
data_sdf.write.format('csv').partitionBy('Month').option('header', True).save('sales_data_prt')

In [30]:
!hdfs dfs -ls sales_data_prt

Found 13 items
drwxr-xr-x   - jovyan supergroup          0 2024-06-09 20:27 sales_data_prt/Month=April
drwxr-xr-x   - jovyan supergroup          0 2024-06-09 20:27 sales_data_prt/Month=August
drwxr-xr-x   - jovyan supergroup          0 2024-06-09 20:27 sales_data_prt/Month=December
drwxr-xr-x   - jovyan supergroup          0 2024-06-09 20:27 sales_data_prt/Month=February
drwxr-xr-x   - jovyan supergroup          0 2024-06-09 20:27 sales_data_prt/Month=January
drwxr-xr-x   - jovyan supergroup          0 2024-06-09 20:27 sales_data_prt/Month=July
drwxr-xr-x   - jovyan supergroup          0 2024-06-09 20:27 sales_data_prt/Month=June
drwxr-xr-x   - jovyan supergroup          0 2024-06-09 20:27 sales_data_prt/Month=March
drwxr-xr-x   - jovyan supergroup          0 2024-06-09 20:27 sales_data_prt/Month=May
drwxr-xr-x   - jovyan supergroup          0 2024-06-09 20:27 sales_data_prt/Month=November
drwxr-xr-x   - jovyan supergroup          0 2024-06-09 20:27 sales_data_prt/Month=Octo

In [9]:
prt_data_sdf = spark.read.csv('./sales_data_prt', header=True, inferSchema=True)

- Выполните операцию агрегации данных (например, суммирование кол-во проданных штук) по партициям.

In [32]:
data_sdf.groupBy("Month") \
    .agg(sf.sum("Quantity Ordered")) \
    .collect();

- Оцените производительность и сравните её с непартиционированной таблицей, например замерив все выполнения кода в ячейках Jupyter notebook с помощью %timeit.

In [33]:
%%timeit -n 10

data_sdf.groupBy("Month") \
    .agg(sf.sum("Quantity Ordered")) \
    .collect();

599 ms ± 53.2 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [34]:
%%timeit -n 10

prt_data_sdf.groupBy("Month") \
    .agg(sf.sum("Quantity Ordered")) \
    .collect();

368 ms ± 75.1 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [35]:
# Скорость выполнения запроса увеличилась

- Измените количество партиций на одну и запишите файл обратно на диск.

In [10]:
prt_data_sdf.coalesce(1).write.format('csv').option('header', True).save('./sales_data_coalesce_1')

In [37]:
!hdfs dfs -ls sales_data_coalesce_1

Found 2 items
-rw-r--r--   1 jovyan supergroup          0 2024-06-09 20:28 sales_data_coalesce_1/_SUCCESS
-rw-r--r--   1 jovyan supergroup   17638412 2024-06-09 20:28 sales_data_coalesce_1/part-00000-b363ec96-3206-4279-b93c-e61a83de417c-c000.csv


### 3. Работа с различными форматами данных

- Сохраните датафрейм с данными, изначально хранящиеся в формате CSV, в других доступных форматах. Например, JSON, Parquet, Avro, ORC.

- Проанализируйте различия в производительности (как быстро запись происходит) и объёме хранения для каждого формата на диске компьютера.

In [38]:
def get_folder_size(path):
    path = Path(path)
    return sum(f.stat().st_size for f in path.glob('**/*') if f.is_file())

get_folder_size('sales_data')

16229477

In [39]:
def hdfs_du(path, in_bytes=False):
    cmd = ['hdfs', 'dfs', '-du', '-s']
    
    if not in_bytes:
        cmd.append('-h')
        
    cmd.append(path)
    out = subprocess.check_output(cmd)
    size = out.split()[0].decode('utf-8')
    mesure = out.split()[1].decode('utf-8')
    
    if in_bytes:
        return int(size)
    else:
        return float(size), mesure
    
# hdfs_du('sales_data_tmp')

In [40]:
import time

In [41]:
def test_data_format(df, d_format, iterations):
    
    subprocess.call(['hdfs', 'dfs', '-rm', '-r', './sales_data_tmp'])
    data_sdf.write.format(d_format).save('./sales_data_tmp')
    data_size = hdfs_du('./sales_data_tmp')
    subprocess.call(['hdfs', 'dfs', '-rm', '-r', './sales_data_tmp'])
    
    total_write_time = 0
    for i in range(iterations):
        start_time = time.time()
        data_sdf.write.format(d_format).save('./sales_data_tmp')
        total_write_time += time.time() - start_time
        
        subprocess.call(['hdfs', 'dfs', '-rm', '-r', './sales_data_tmp'])
        
    return {
        'data_size': data_size,
        'total_write_time': total_write_time,
        'avg_write_time': total_write_time/iterations,
    }

# test_data_format(data_sdf, 'csv', 10)

In [42]:
formats = [
    'json',
    'parquet',
    'orc',
    'avro'
]

for frm in formats:
    print('Testing format:', frm, '...')
    try:
        results = test_data_format(data_sdf, frm, 30)
    except:
        results = 'error'
        
    print(f'\t{frm}:', results)

Testing format: json ...
	json: {'data_size': (34.8, 'M'), 'total_write_time': 36.860700845718384, 'avg_write_time': 1.2286900281906128}
Testing format: parquet ...
	parquet: {'data_size': (4.3, 'M'), 'total_write_time': 49.05225968360901, 'avg_write_time': 1.635075322786967}
Testing format: orc ...
	orc: {'data_size': (3.4, 'M'), 'total_write_time': 47.510725021362305, 'avg_write_time': 1.5836908340454101}
Testing format: avro ...
	avro: error


- Сделайте выводы, оцените плюсы и минусы разных форматов.

In [43]:
# json: 
#       - занимает больше всего места на диске
#       + самая быстрая скорость записи
# parquet:
#      -  самая медленная скорость записи
#     ++- занимает в 8 раз меньше места, чем json
# orc:
#      +- скорсть записи больше, чем parquet, но меньше чем json
#      +  занимает меньше всего меньше места на диске (в 10.2 реза меньше места, чем json)

### 4. Работа с UDF и кешированием

- Создайте UDF на Python, которая парсит адрес покупателя и разбивает колонку Purchase Address на Street address, City, State, Postal Code.

In [44]:
data_sdf.limit(5).show()

+--------+--------------------+----------------+----------+--------------+--------------------+--------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|   Month|
+--------+--------------------+----------------+----------+--------------+--------------------+--------+
|  278797|    Wired Headphones|               1|     11.99|11/21/19 09:54|46 Park St, New Y...|November|
|  278798|USB-C Charging Cable|               2|     11.95|11/17/19 10:03|962 Hickory St, A...|November|
|  278799|Apple Airpods Hea...|               1|     150.0|11/19/19 14:56|464 Cherry St, Lo...|November|
|  278800|    27in FHD Monitor|               1|    149.99|11/25/19 22:24|649 10th St, Seat...|November|
|  278801|Bose SoundSport H...|               1|     99.99|11/09/19 13:56|522 Hill St, Bost...|November|
+--------+--------------------+----------------+----------+--------------+--------------------+--------+



In [80]:
data_sdf.select('Purchase Address').take(5)

[Row(Purchase Address='46 Park St, New York City, NY 10001'),
 Row(Purchase Address='962 Hickory St, Austin, TX 73301'),
 Row(Purchase Address='464 Cherry St, Los Angeles, CA 90001'),
 Row(Purchase Address='649 10th St, Seattle, WA 98101'),
 Row(Purchase Address='522 Hill St, Boston, MA 02215')]

In [11]:
def split_address(address):
    if address is None:
        return (None, None, None, None)
    
    try:
        street, city, state_and_postal_code = address.split(",")
        state, postal_code = state_and_postal_code.strip().split(' ')
    except ValueError:
        return (None, None, None, None)
    
    street = street.strip()
    city = city.strip()
    state = state.strip()
    postal_code = postal_code.strip()
    
    return street, city, state, postal_code

split_address('46 Park St, New York City, NY 10001')

('46 Park St', 'New York City', 'NY', '10001')

In [12]:
split_address_udf_return_type = StructType([
        StructField("Street", StringType()),
        StructField("City", StringType()),
        StructField("State", StringType()),
        StructField("Postal Code", StringType()),
    ])

split_address_udf = sf.udf(
    f=split_address,
    returnType=split_address_udf_return_type
)

- Примените UDF к Spark DataFrame и убедитесь, что форматирование прошло успешно и добавились 4 новые колонки.

In [13]:
data_sdf = data_sdf\
    .withColumn(
        "Split Purchase Address", 
        split_address_udf(sf.col("Purchase Address"))
    )\
    .withColumn("Street", sf.col("Split Purchase Address")["Street"])\
    .withColumn("City", sf.col("Split Purchase Address")["City"])\
    .withColumn("State", sf.col("Split Purchase Address")["State"])\
    .withColumn("Postal Code", sf.col("Split Purchase Address")["Postal Code"])

In [84]:
data_sdf.columns

['Order ID',
 'Product',
 'Quantity Ordered',
 'Price Each',
 'Order Date',
 'Purchase Address',
 'Month',
 'Split Purchase Address',
 'Street',
 'City',
 'State',
 'Postal Code',
 'Total Price']

In [85]:
data_sdf.select("Order ID", "Street", "City", "State", "Postal Code").limit(5).show()

+--------+--------------+-------------+-----+-----------+
|Order ID|        Street|         City|State|Postal Code|
+--------+--------------+-------------+-----+-----------+
|  278797|    46 Park St|New York City|   NY|      10001|
|  278798|962 Hickory St|       Austin|   TX|      73301|
|  278799| 464 Cherry St|  Los Angeles|   CA|      90001|
|  278800|   649 10th St|      Seattle|   WA|      98101|
|  278801|   522 Hill St|       Boston|   MA|      02215|
+--------+--------------+-------------+-----+-----------+



- Сделайте ещё одно преобразование датафрейма - перемножьте проданные штуки на цену единицы товара, получите Total price, предварительно закешировав (cache) результат применения UDF.

In [14]:
data_sdf = data_sdf.drop("Total Price")

In [15]:
def total_price(price_each, quantity_ordered):
    if (price_each is None) or (quantity_ordered is None):
        return None
    else:
        return price_each*quantity_ordered

total_price_udf = sf.udf(
    f=total_price,
    returnType=FloatType()
)

In [21]:
data_sdf.cache?

In [16]:
data_sdf = data_sdf.withColumn("Total Price", total_price_udf(sf.col("Price Each"), sf.col("Quantity Ordered"))).cache()
data_sdf.select("Order ID", "Total Price", "Quantity Ordered", "Price Each").limit(10).show()

+--------+-----------+----------------+----------+
|Order ID|Total Price|Quantity Ordered|Price Each|
+--------+-----------+----------------+----------+
|  278797|      11.99|               1|     11.99|
|  278798|       23.9|               2|     11.95|
|  278799|      150.0|               1|     150.0|
|  278800|     149.99|               1|    149.99|
|  278801|      99.99|               1|     99.99|
|  278802|      11.95|               1|     11.95|
|  278803|      14.95|               1|     14.95|
|  278804|      99.99|               1|     99.99|
|  278805|      14.95|               1|     14.95|
|  278806|      99.99|               1|     99.99|
+--------+-----------+----------------+----------+



- Оцените скорость выполнения всей цепочки преобразований датафрейма (UDF и добавление Total price) без кеширования и с его наличием.

In [76]:
data_sdf = data_sdf.drop("Total Price")

In [87]:
def calculations():
    global data_sdf

    for i in range(8):
        data_sdf.collect()

In [88]:
%%capture
# кэширование
data_sdf = data_sdf.withColumn(
    "Total Price", 
    total_price_udf(sf.col("Price Each"), sf.col("Quantity Ordered"))
).cache()
data_sdf.collect()

In [89]:
%%time
# вычисления с кэшированием
calculations()

CPU times: user 23.3 s, sys: 783 ms, total: 24.1 s
Wall time: 29.3 s


In [90]:
# очистка кэша
data_sdf.unpersist();

In [91]:
%%time
# вычисления без кэширования
calculations()

CPU times: user 22.9 s, sys: 772 ms, total: 23.7 s
Wall time: 37.1 s


- Придумайте и опишите пример из жизни, когда кеширование важно применять и почему это может быть полезно.

In [75]:
# Например кэширование промежуточных результатов при вычислении нескольких
# метрик, которые вычисляютя на основе одинаковых данных