In [1]:
import subprocess
import sys
import os
import time
import zipfile


# Функция для установки пакета
def install(package):
    subprocess.check_call([sys.executable, "-m", "pip", "install", package])


# Проверка и установка пакетов
required_packages = {
    "pyspark": "pyspark",
}

for package, pip_name in required_packages.items():
    try:
        __import__(package)
    except ImportError:
        print(f"{package} не установлен. Устанавливаем...")
        install(pip_name)
    else:
        print(f"{package} уже установлен.")

pyspark не установлен. Устанавливаем...
Defaulting to user installation because normal site-packages is not writeable
Collecting pyspark
  Using cached pyspark-3.5.1-py2.py3-none-any.whl
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


You should consider upgrading via the '/Applications/Xcode.app/Contents/Developer/usr/bin/python3 -m pip install --upgrade pip' command.


In [2]:
from pyspark.sql import SparkSession  # type: ignore
from pyspark.sql.functions import lit, col, sum, udf  # type: ignore
from pyspark.sql.types import StringType, StructField, StructType

In [3]:
# 1: Загрузка и анализ данных из CSV-файла

# Путь, куда будут извлечены файлы
data_folder = "data/sales_data"

# Создаем папку, если она не существует
os.makedirs(data_folder, exist_ok=True)

# Разархивирование файла
with zipfile.ZipFile("data/sales_data.zip", "r") as zip_ref:
    zip_ref.extractall(data_folder)
print("Файлы успешно разархивированы")


# Установка переменной среды PYSPARK_SUBMIT_ARGS
os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--packages org.apache.spark:spark-avro_2.12:3.5.1 pyspark-shell"
)

# Создание сессии Spark
spark = SparkSession.builder.appName("Sales Data Analysis").getOrCreate()


# Получение списка файлов в директории
data_files = [f for f in os.listdir(data_folder) if f.endswith(".csv")]

# Инициализация пустого DataFrame
full_df = None

# Чтение и объединение всех файлов
for file in data_files:
    file_path = os.path.join(data_folder, file)
    month = file.split("_")[1]  # Извлечение месяца из имени файла
    month_df = spark.read.csv(file_path, header=True, inferSchema=True)
    month_df = month_df.withColumn("Month", lit(month))

    if full_df is None:
        full_df = month_df
    else:
        full_df = full_df.union(month_df)

# Вывод схемы DataFrame
full_df.printSchema()

# Вывод общей статистики по данным
full_df.describe().show()

# Анализ самых популярных товаров
top_products = full_df.groupBy("Product").count().orderBy(col("count").desc()).limit(10)

# Вывод Топ-10 самых популярных товаров
top_products.show()

Файлы успешно разархивированы


24/05/06 03:32:26 WARN Utils: Your hostname, MacBook-Pro-Andrej.local resolves to a loopback address: 127.0.0.1; using 192.168.1.3 instead (on interface en0)
24/05/06 03:32:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/andrejizvarin/.ivy2/cache
The jars for the packages stored in: /Users/andrejizvarin/.ivy2/jars
org.apache.spark#spark-avro_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-18de887c-9bf3-49f2-96c9-ae72875a475a;1.0
	confs: [default]


:: loading settings :: url = jar:file:/Users/andrejizvarin/Library/Python/3.9/lib/python/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.spark#spark-avro_2.12;3.5.1 in central
	found org.tukaani#xz;1.9 in central
:: resolution report :: resolve 113ms :: artifacts dl 2ms
	:: modules in use:
	org.apache.spark#spark-avro_2.12;3.5.1 from central in [default]
	org.tukaani#xz;1.9 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-18de887c-9bf3-49f2-96c9-ae72875a475a
	confs: [default]
	0 artifacts copied, 2 already retrieved (0kB/3ms)
24/05/06 03:32:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes wher

root
 |-- Order ID: integer (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: integer (nullable = true)
 |-- Price Each: double (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Purchase Address: string (nullable = true)
 |-- Month: string (nullable = false)



24/05/06 03:32:32 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-------+-----------------+------------+-------------------+------------------+--------------+--------------------+---------+
|summary|         Order ID|     Product|   Quantity Ordered|        Price Each|    Order Date|    Purchase Address|    Month|
+-------+-----------------+------------+-------------------+------------------+--------------+--------------------+---------+
|  count|           185950|      186305|             185950|            185950|        186305|              186305|   186850|
|   mean|230417.5693788653|        NULL| 1.1243828986286637|184.39973476743927|          NULL|                NULL|     NULL|
| stddev|51512.73710999595|        NULL|0.44279262402866953| 332.7313298843437|          NULL|                NULL|     NULL|
|    min|           141234|20in Monitor|                  1|              2.99|01/01/19 03:07|1 11th St, Atlant...|    April|
|    max|           319670|      iPhone|                  9|            1700.0|    Order Date|    Purchase Address|Sep

In [4]:
# 2: Партиционирование данных и оптимизация производительности

# Партиционирование DataFrame по столбцу 'Month'
partitioned_df = full_df.repartition("Month")

# Выполнение агрегации: подсчет общего количества проданных товаров по месяцам
aggregated_df = partitioned_df.groupBy("Month").agg(
    sum("Quantity Ordered").alias("Total Quantity Sold")
)


# Оценка производительности
# Функция для замера времени выполнения
def measure_performance(df):
    start_time = time.time()
    df.groupBy("Month").agg(
        sum("Quantity Ordered").alias("Total Quantity Sold")
    ).collect()
    return time.time() - start_time


# Измерение времени для непартиционированного DataFrame
unpartitioned_time = measure_performance(full_df)
print(
    f"Время выполнения для непартиционированного DataFrame: {unpartitioned_time:.2f} секунд"
)

# Измерение времени для партиционированного DataFrame
partitioned_time = measure_performance(partitioned_df)
print(
    f"Время выполнения для партиционированного DataFrame: {partitioned_time:.2f} секунд"
)

# Сравнение результатов
print("Улучшение производительности:", unpartitioned_time / partitioned_time)

# Изменение количества партиций на одну и сохранение на диск
single_partition_df = partitioned_df.coalesce(1)
single_partition_df.write.format("csv").mode("overwrite").save(
    "data/single_partitioned_data.csv"
)

CodeCache: size=131072Kb used=39728Kb max_used=39741Kb free=91343Kb
 bounds [0x00000001071e8000, 0x00000001098f8000, 0x000000010f1e8000]
 total_blobs=13804 nmethods=12836 adapters=880
 compilation: disabled (not enough contiguous free space left)




Время выполнения для непартиционированного DataFrame: 0.49 секунд
Время выполнения для партиционированного DataFrame: 0.39 секунд
Улучшение производительности: 1.2386262352257313


                                                                                

In [5]:
# 3: Работа с различными форматами данных

# Путь для сохранения файлов
output_path = "data/output"

# Удаление пробелов из названий столбцов для совместимости с форматом Avro
fixed_df = full_df.select([col(c).alias(c.replace(" ", "_")) for c in full_df.columns])

# Сохранение в формате JSON
fixed_df.write.mode("overwrite").json(f"{output_path}/data.json")

# Сохранение в формате Parquet
fixed_df.write.mode("overwrite").parquet(f"{output_path}/data.parquet")

# Сохранение в формате ORC
fixed_df.write.mode("overwrite").orc(f"{output_path}/data.orc")

# Сохранение в формате Avro
fixed_df.write.format("avro").mode("overwrite").save(f"{output_path}/data.avro")


# Функция для замера времени записи DataFrame в определенный формат
def measure_write_performance(df, format, path):
    start_time = time.time()
    if format == "json":
        df.write.mode("overwrite").json(path)
    elif format == "parquet":
        df.write.mode("overwrite").parquet(path)
    elif format == "orc":
        df.write.mode("overwrite").orc(path)
    elif format == "avro":
        df.write.format("avro").mode("overwrite").save(path)
    else:
        raise ValueError("Unsupported format")
    elapsed_time = time.time() - start_time
    print(f"Время записи в формат {format}: {elapsed_time:.2f} секунд")


# Замер производительности для каждого формата
measure_write_performance(fixed_df, "json", f"{output_path}/data.json")
measure_write_performance(fixed_df, "parquet", f"{output_path}/data.parquet")
measure_write_performance(fixed_df, "orc", f"{output_path}/data.orc")
measure_write_performance(fixed_df, "avro", f"{output_path}/data.avro")

24/05/06 03:32:38 WARN MemoryManager: Total allocation exceeds 95,00% (906 992 014 bytes) of heap memory
Scaling row group sizes to 96,54% for 7 writers
24/05/06 03:32:38 WARN MemoryManager: Total allocation exceeds 95,00% (906 992 014 bytes) of heap memory
Scaling row group sizes to 84,47% for 8 writers
24/05/06 03:32:38 WARN MemoryManager: Total allocation exceeds 95,00% (906 992 014 bytes) of heap memory
Scaling row group sizes to 75,08% for 9 writers
24/05/06 03:32:38 WARN MemoryManager: Total allocation exceeds 95,00% (906 992 014 bytes) of heap memory
Scaling row group sizes to 67,58% for 10 writers
24/05/06 03:32:38 WARN MemoryManager: Total allocation exceeds 95,00% (906 992 014 bytes) of heap memory
Scaling row group sizes to 75,08% for 9 writers
24/05/06 03:32:38 WARN MemoryManager: Total allocation exceeds 95,00% (906 992 014 bytes) of heap memory
Scaling row group sizes to 84,47% for 8 writers
24/05/06 03:32:38 WARN MemoryManager: Total allocation exceeds 95,00% (906 992 01

Время записи в формат json: 0.38 секунд


24/05/06 03:32:45 WARN MemoryManager: Total allocation exceeds 95,00% (906 992 014 bytes) of heap memory
Scaling row group sizes to 96,54% for 7 writers
24/05/06 03:32:45 WARN MemoryManager: Total allocation exceeds 95,00% (906 992 014 bytes) of heap memory
Scaling row group sizes to 84,47% for 8 writers
24/05/06 03:32:45 WARN MemoryManager: Total allocation exceeds 95,00% (906 992 014 bytes) of heap memory
Scaling row group sizes to 75,08% for 9 writers
24/05/06 03:32:45 WARN MemoryManager: Total allocation exceeds 95,00% (906 992 014 bytes) of heap memory
Scaling row group sizes to 67,58% for 10 writers
24/05/06 03:32:45 WARN MemoryManager: Total allocation exceeds 95,00% (906 992 014 bytes) of heap memory
Scaling row group sizes to 75,08% for 9 writers
24/05/06 03:32:45 WARN MemoryManager: Total allocation exceeds 95,00% (906 992 014 bytes) of heap memory
Scaling row group sizes to 84,47% for 8 writers
24/05/06 03:32:45 WARN MemoryManager: Total allocation exceeds 95,00% (906 992 01

Время записи в формат parquet: 0.41 секунд


                                                                                

Время записи в формат orc: 0.74 секунд
Время записи в формат avro: 0.33 секунд


In [6]:
# 4: Работа с UDF и кешированием


def parse_address(address):
    if address is None:
        return (
            None,
            None,
            None,
            None,
        )  # Возвращаем кортеж с None для каждой части адреса
    parts = address.split(",")
    if len(parts) < 3:
        return (None, None, None, None)  # Не достаточно данных для разбиения адреса
    street = parts[0].strip()
    city = parts[1].strip()
    state_postal = parts[2].strip().split(" ")
    state = state_postal[0] if len(state_postal) > 1 else None
    postal_code = state_postal[1] if len(state_postal) > 1 else None
    return (street, city, state, postal_code)


schema = StructType(
    [
        StructField("Street", StringType(), True),
        StructField("City", StringType(), True),
        StructField("State", StringType(), True),
        StructField("Postal_Code", StringType(), True),
    ]
)

parse_address_udf = udf(parse_address, schema)

# Применение UDF
address_df = full_df.withColumn(
    "Parsed_Address", parse_address_udf(col("Purchase Address"))
)
address_df = address_df.select("*", "Parsed_Address.*").drop("Parsed_Address")

# Кеширование для улучшения производительности
address_df.cache()

# Добавление столбца с общей ценой
address_df = address_df.withColumn(
    "Total Price", col("Quantity Ordered") * col("Price Each")
)

# Запуск действий для активации кеширования и оценки результатов
address_df.show()


def measure_performance(operation_description, func):
    start_time = time.time()
    func()
    elapsed_time = time.time() - start_time
    print(f"{operation_description}: {elapsed_time:.2f} секунд")


measure_performance("Выполнение с кешированием", lambda: address_df.collect())
address_df.unpersist()  # Очистка кеша
measure_performance("Выполнение без кеширования", lambda: address_df.collect())

                                                                                

+--------+--------------------+----------------+----------+--------------+--------------------+--------+----------------+-------------+-----+-----------+-----------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|   Month|          Street|         City|State|Postal_Code|Total Price|
+--------+--------------------+----------------+----------+--------------+--------------------+--------+----------------+-------------+-----+-----------+-----------+
|  295665|  Macbook Pro Laptop|               1|    1700.0|12/30/19 00:01|136 Church St, Ne...|December|   136 Church St|New York City|   NY|      10001|     1700.0|
|  295666|  LG Washing Machine|               1|     600.0|12/29/19 07:03|562 2nd St, New Y...|December|      562 2nd St|New York City|   NY|      10001|      600.0|
|  295667|USB-C Charging Cable|               1|     11.95|12/12/19 18:21|277 Main St, New ...|December|     277 Main St|New York City|   NY|      10001|      11.95|
|  2

                                                                                

Выполнение с кешированием: 2.41 секунд
Выполнение без кеширования: 1.14 секунд


### Пример, где кеширование полезно
Кеширование часто используется в веб-разработке для ускорения загрузки страниц, сохраняя данные, часто запрашиваемые пользователями, в памяти.
Например, на новостных сайтах, где статьи обновляются нечасто, но читаются многими пользователями, кеширование позволяет сократить нагрузку на сервер и ускорить отображение страниц для конечного пользователя.

In [7]:
# Завершение сессии Spark
spark.stop()