# Import

In [1]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.functions import col, lit, sum
import os
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, StringType
from time import time

# Spark Conf

In [2]:
# Определяем имя приложения и URL мастера
appName = "task_10"
master = "spark://thunderobot:7077"

# Set Hadoop related configurations
hadoop_conf = {"fs.defaultFS": "hdfs://localhost:9000"}
# Configure Spark with SparkConf

# Настройка SparkConf
conf = SparkConf().setAppName(appName).setMaster(master)
for key, value in hadoop_conf.items():
    conf.set("spark.hadoop." + key, value)
# Создание SparkSession с использованием SparkConf
spark = SparkSession.builder.config(conf=conf).getOrCreate()

24/06/20 00:12:53 WARN Utils: Your hostname, thunderobot resolves to a loopback address: 127.0.1.1; using 192.168.1.203 instead (on interface enp4s0)
24/06/20 00:12:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/20 00:12:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

# 1. Загрузка и анализ данных из CSV-файла

## Test Read

In [4]:
# Вывод тестого DF для проверки чтения
df = spark.read.csv(("sales_data/Sales_February_2019.csv"), header = True, inferSchema = True)
df.show()

                                                                                

+--------+--------------------+----------------+----------+--------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  150502|              iPhone|               1|     700.0|02/18/19 01:35|866 Spruce St, Po...|
|  150503|AA Batteries (4-p...|               1|      3.84|02/13/19 07:24|18 13th St, San F...|
|  150504|27in 4K Gaming Mo...|               1|    389.99|02/18/19 09:46|52 6th St, New Yo...|
|  150505|Lightning Chargin...|               1|     14.95|02/02/19 16:47|129 Cherry St, At...|
|  150506|AA Batteries (4-p...|               2|      3.84|02/28/19 20:32|548 Lincoln St, S...|
|  150507|Lightning Chargin...|               1|     14.95|02/24/19 18:50|387 12th St, Aust...|
|  150508|AA Batteries (4-p...|               1|      3.84|02/21/19 19:26|622 Center St, Sa...|
|  150509|Apple Airpods Hea...|         

## Read folder

In [5]:
# Путь к папке с данными
data_path = "sales_data"

# Список всех файлов CSV в папке
files = [f for f in os.listdir(data_path) if f.endswith('.csv')]

# Создаем пустой DataFrame для объединения данных
sales_df = None

for file in files:
    # Извлекаем месяц из имени файла
    month = file.split('_')[1].split('.')[0]
    
    # Загружаем данные из CSV файла
    df = spark.read.csv(os.path.join(data_path, file), header=True, inferSchema=True)
    
    # Добавляем колонку с месяцем
    df = df.withColumn('Month', lit(month))
    
    # Объединяем данные
    if sales_df is None:
        sales_df = df
    else:
        sales_df = sales_df.union(df)

# Кэшируем итоговый DF
sales_df.cache()


DataFrame[Order ID: int, Product: string, Quantity Ordered: int, Price Each: double, Order Date: string, Purchase Address: string, Month: string]

## Check

In [6]:
# Записываем в дф значения из колонки 'Month' для проверки 
unique_months = sales_df.select("Month").distinct().collect()

# Вывод значений
print(unique_months)



[Row(Month='September'), Row(Month='October'), Row(Month='August'), Row(Month='November'), Row(Month='March'), Row(Month='June'), Row(Month='July'), Row(Month='December'), Row(Month='February'), Row(Month='May'), Row(Month='January'), Row(Month='April')]


                                                                                

## Schema and stats

In [7]:
# Вывод схемы DataFrame
sales_df.printSchema()

# Вывод общей статистики
sales_df.describe().show()


24/06/20 00:13:04 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


root
 |-- Order ID: integer (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: integer (nullable = true)
 |-- Price Each: double (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Purchase Address: string (nullable = true)
 |-- Month: string (nullable = false)





+-------+-----------------+------------+-------------------+------------------+--------------+--------------------+---------+
|summary|         Order ID|     Product|   Quantity Ordered|        Price Each|    Order Date|    Purchase Address|    Month|
+-------+-----------------+------------+-------------------+------------------+--------------+--------------------+---------+
|  count|           185950|      186305|             185950|            185950|        186305|              186305|   186850|
|   mean|230417.5693788653|        NULL| 1.1243828986286637|184.39973476743924|          NULL|                NULL|     NULL|
| stddev|51512.73710999595|        NULL|0.44279262402866953| 332.7313298843437|          NULL|                NULL|     NULL|
|    min|           141234|20in Monitor|                  1|              2.99|01/01/19 03:07|1 11th St, Atlant...|    April|
|    max|           319670|      iPhone|                  9|            1700.0|    Order Date|    Purchase Address|Sep

                                                                                

## Grouping

In [8]:
# Группируем данные по товару и считаем количество продаж для каждого товара, называем значение Total_Sales
top_products = sales_df.groupBy("Product").agg(sum("Quantity Ordered").alias("Total_Sales")).orderBy(col("Total_Sales").desc()).limit(10)

# Выводим Топ-10 товаров
top_products.show()

+--------------------+-----------+
|             Product|Total_Sales|
+--------------------+-----------+
|AAA Batteries (4-...|      31017|
|AA Batteries (4-p...|      27635|
|USB-C Charging Cable|      23975|
|Lightning Chargin...|      23217|
|    Wired Headphones|      20557|
|Apple Airpods Hea...|      15661|
|Bose SoundSport H...|      13457|
|    27in FHD Monitor|       7550|
|              iPhone|       6849|
|27in 4K Gaming Mo...|       6244|
+--------------------+-----------+



# 2. Партиционирование данных и оптимизация производительности

In [9]:
sales_df.rdd.getNumPartitions()

12

In [10]:
sales_df.count()

186850

## With partitions

In [11]:
# Кол-во партиций
num_partitions = 12

# Добавление партиционирования в датафрейм
partitioned_df = sales_df.repartition(num_partitions, "Month")

# Оценка производительности
partitioned_df.createOrReplaceTempView("partitioned_sales")
%timeit spark.sql("SELECT Month, SUM(`Quantity Ordered`) as total_sales FROM partitioned_sales GROUP BY Month").show()


+---------+-----------+
|    Month|total_sales|
+---------+-----------+
|     July|      16069|
|      May|      18657|
| December|      28121|
|  January|      10868|
| February|      13461|
|    April|      20568|
|  October|      22705|
| November|      19808|
|    March|      17006|
|   August|      13442|
|     June|      15255|
|September|      13119|
+---------+-----------+

+---------+-----------+
|    Month|total_sales|
+---------+-----------+
|     July|      16069|
|      May|      18657|
| December|      28121|
|  January|      10868|
| February|      13461|
|    April|      20568|
|  October|      22705|
| November|      19808|
|    March|      17006|
|   August|      13442|
|     June|      15255|
|September|      13119|
+---------+-----------+

+---------+-----------+
|    Month|total_sales|
+---------+-----------+
|     July|      16069|
|      May|      18657|
| December|      28121|
|  January|      10868|
| February|      13461|
|    April|      20568|
|  October|   

In [12]:
partitioned_df.rdd.getNumPartitions()

12

## Without partitions

In [13]:
# Create a temporary view for SQL queries
sales_df = sales_df.coalesce(1)
sales_df.createOrReplaceTempView("sales")

# Measure execution time after repartitioning
%timeit spark.sql("SELECT Month, SUM(`Quantity Ordered`) as total_sales FROM sales GROUP BY Month").show()


+---------+-----------+
|    Month|total_sales|
+---------+-----------+
|September|      13119|
|  October|      22705|
|   August|      13442|
| November|      19808|
|    March|      17006|
|     June|      15255|
|     July|      16069|
| December|      28121|
| February|      13461|
|      May|      18657|
|  January|      10868|
|    April|      20568|
+---------+-----------+

+---------+-----------+
|    Month|total_sales|
+---------+-----------+
|September|      13119|
|  October|      22705|
|   August|      13442|
| November|      19808|
|    March|      17006|
|     June|      15255|
|     July|      16069|
| December|      28121|
| February|      13461|
|      May|      18657|
|  January|      10868|
|    April|      20568|
+---------+-----------+

+---------+-----------+
|    Month|total_sales|
+---------+-----------+
|September|      13119|
|  October|      22705|
|   August|      13442|
| November|      19808|
|    March|      17006|
|     June|      15255|
|     July|   

In [14]:
sales_df.rdd.getNumPartitions()

1

## Изменил количество партиций на одну и записал файл обратно на диск.

In [15]:
# Измененил количество партиций
sales_df = partitioned_df.repartition(1) 

# Сохраняем, с функцией перезаписи
%timeit sales_df.write.mode("overwrite").format("csv").partitionBy("Month").option("header", True).save("hdfs://localhost:9000/user/aldiyar/sales_data/sales")


[Stage 201:>                                                        (0 + 1) / 1]

3.46 s ± 860 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


                                                                                

In [16]:
sales_df.rdd.getNumPartitions()

1

# 3. Работа с различными форматами данных

## JSON

In [17]:
%timeit sales_df.write.mode("overwrite").format("json").partitionBy("Month").option("header", True).save("hdfs://localhost:9000/user/aldiyar/sales_data/sales_json")


[Stage 226:>                                                        (0 + 1) / 1]

4.55 s ± 470 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


                                                                                

## Parquet

In [18]:
%timeit sales_df.write.mode("overwrite").format("parquet").partitionBy("Month").option("header", True).save("hdfs://localhost:9000/user/aldiyar/sales_data/sales_parquet")


[Stage 250:>                                                        (0 + 1) / 1]

4.9 s ± 375 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


                                                                                

## ORC

In [19]:
%timeit sales_df.write.mode("overwrite").format("orc").partitionBy("Month").option("header", True).save("hdfs://localhost:9000/user/aldiyar/sales_data/sales_orc")


[Stage 274:>                                                        (0 + 1) / 1]

5.25 s ± 178 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


                                                                                

## Выводы

# 4. Работа с UDF и кешированием

## Как выглядит колонка

In [20]:
sales_df.select("Purchase Address").show(truncate=False)

+----------------------------------------+
|Purchase Address                        |
+----------------------------------------+
|389 South St, Atlanta, GA 30301         |
|590 4th St, Seattle, WA 98101           |
|861 Hill St, Atlanta, GA 30301          |
|190 Ridge St, Atlanta, GA 30301         |
|824 Forest St, Seattle, WA 98101        |
|899 Elm St, San Francisco, CA 94016     |
|745 Chestnut St, New York City, NY 10001|
|490 Adams St, New York City, NY 10001   |
|207 1st St, Los Angeles, CA 90001       |
|29 Jefferson St, Dallas, TX 75001       |
|590 6th St, San Francisco, CA 94016     |
|960 Willow St, San Francisco, CA 94016  |
|616 Chestnut St, Dallas, TX 75001       |
|105 Johnson St, Los Angeles, CA 90001   |
|444 Spruce St, San Francisco, CA 94016  |
|764 Adams St, San Francisco, CA 94016   |
|577 Lakeview St, Los Angeles, CA 90001  |
|244 Washington St, Los Angeles, CA 90001|
|524 Cherry St, San Francisco, CA 94016  |
|882 Hickory St, Boston, MA 02215        |
+----------

In [21]:
sales_df.count()

186850

## Фильтрация на пропуски

In [22]:
df_filtered = sales_df.filter(col("Purchase Address").isNull() | (col("Purchase Address").rlike(r"^\s*\S{3,}")))
df_filtered.count()

168270

## Функция для парсинга

In [23]:
#  Функция для разбора адреса
def parse_address(address):
    if address:
        parts = address.split(", ")
        if len(parts) >= 3:
            street_address = parts[0]
            city = parts[1].rsplit(", ", 1)[0]
            state_zip = parts[2]
            state, postal_code = state_zip.split(" ")
            return (street_address, city, state, postal_code)
    # В случае некорректного адреса или пустой строки возвращаем None
    return (None, None, None, None)

## UDF

In [24]:
# Сформировал UDF
parse_address_udf = udf(parse_address, 
                        StructType([
                            StructField("Street address", StringType(), True),
                            StructField("City", StringType(), True),
                            StructField("State", StringType(), True),
                            StructField("Postal Code", StringType(), True)
                        ]))

# Применил к DF
df_parsed = df_filtered.withColumn("Parsed Address", parse_address_udf(df_filtered["Purchase Address"]))

# Разделил на колонки
df_parsed = df_parsed.withColumn("Street address", df_parsed["Parsed Address"]["Street address"]) \
                     .withColumn("City", df_parsed["Parsed Address"]["City"]) \
                     .withColumn("State", df_parsed["Parsed Address"]["State"]) \
                     .withColumn("Postal Code", df_parsed["Parsed Address"]["Postal Code"]) \
                     .drop("Parsed Address")

# Отобразил преобразованный DF
df_parsed.show(truncate=False)

+--------+------------------------+----------------+----------+--------------+---------------------------------------+---------+--------------+-------------+-----+-----------+
|Order ID|Product                 |Quantity Ordered|Price Each|Order Date    |Purchase Address                       |Month    |Street address|City         |State|Postal Code|
+--------+------------------------+----------------+----------+--------------+---------------------------------------+---------+--------------+-------------+-----+-----------+
|248151  |AA Batteries (4-pack)   |4               |3.84      |09/17/19 14:44|380 North St, Los Angeles, CA 90001    |September|380 North St  |Los Angeles  |CA   |90001      |
|248152  |USB-C Charging Cable    |2               |11.95     |09/29/19 10:19|511 8th St, Austin, TX 73301           |September|511 8th St    |Austin       |TX   |73301      |
|248153  |USB-C Charging Cable    |1               |11.95     |09/16/19 17:48|151 Johnson St, Los Angeles, CA 90001  |Se

## Cache

In [25]:
# Кэширование
df_parsed.cache()

DataFrame[Order ID: int, Product: string, Quantity Ordered: int, Price Each: double, Order Date: string, Purchase Address: string, Month: string, Street address: string, City: string, State: string, Postal Code: string]

## Total Price

In [26]:
# Подсчет Total Price
df_total_price = df_parsed.withColumn("Total Price", df_parsed["Quantity Ordered"] * df_parsed["Price Each"])

# Отобразил преобразованный DF
df_total_price.show()

[Stage 298:>                                                        (0 + 1) / 1]

+--------+--------------------+----------------+----------+--------------+--------------------+---------+--------------+-------------+-----+-----------+-----------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|    Month|Street address|         City|State|Postal Code|Total Price|
+--------+--------------------+----------------+----------+--------------+--------------------+---------+--------------+-------------+-----+-----------+-----------+
|  248151|AA Batteries (4-p...|               4|      3.84|09/17/19 14:44|380 North St, Los...|September|  380 North St|  Los Angeles|   CA|      90001|      15.36|
|  248152|USB-C Charging Cable|               2|     11.95|09/29/19 10:19|511 8th St, Austi...|September|    511 8th St|       Austin|   TX|      73301|       23.9|
|  248153|USB-C Charging Cable|               1|     11.95|09/16/19 17:48|151 Johnson St, L...|September|151 Johnson St|  Los Angeles|   CA|      90001|      11.95|
|  248154|

                                                                                

## Оценка времени

In [27]:
# Без кеширования
start_time = time()
df_total_price.count()
end_time = time()
print(f"Без кэширования: {end_time - start_time} seconds")



Без кэширования: 0.11428356170654297 seconds


In [28]:
# С кешированием
df_parsed.cache()
start_time_cached = time()
df_total_price.count()
end_time_cached = time()
print(f"С кэшированием: {end_time_cached - start_time_cached} seconds")


24/06/20 00:15:38 WARN CacheManager: Asked to cache already cached data.


С кэшированием: 0.09195351600646973 seconds
