In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand

spark = SparkSession.builder.master("local[*]").getOrCreate()

# Генерируем данные
df = spark.range(0, 10_000_000) \
    .withColumn("category", (rand() * 1000).cast("int")) \
    .withColumn("amount", (rand() * 1000).cast("int"))


print("Сохранение сгенерироованных значений")

# Записываем в директорию retail.csv
df.write.csv("./retail.csv", header=True, mode="overwrite")

print("Файл сохранен")

spark.stop()

Сохранение сгенерироованных значений
Файл сохранен


In [16]:
from pyspark.sql import SparkSession
from time import time


# Запускаем с адаптивным механизмом выбора партиций после шаффла для 8 партиций
spark = SparkSession.builder \
    .appName("Test_8") \
    .master("local[*]") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# Читаем данные
data = spark.read.csv("./retail.csv", header=True, inferSchema=True)

# Замер времени
start = time()
result = data.groupBy("category").sum("amount")
result.count()  # обязательно, происходит shuffle
duration = time() - start

# Вывод результатов
print("Время выполнения:", f"{duration:.2f} сек")
print("Финальных партиций:", result.rdd.getNumPartitions())
print("Уникальных категорий:", result.count())
print("shuffle.partitions:", spark.conf.get("spark.sql.shuffle.partitions"))

spark.stop()

Время выполнения: 1.80 сек
Финальных партиций: 1
Уникальных категорий: 1000
shuffle.partitions: 8


### Адаптивное выполнение запросов (AQE) — механизм оптимизации в Spark 3.0+, который динамически меняет план выполнения на основе реальных данных, измеренных во время выполнения (а не на этапе планирования). Включение через - .config("spark.sql.adaptive.enabled", "true") 

In [20]:
from pyspark.sql import SparkSession
from time import time


# Запускаем без адаптивного механизма выбора партиций после шаффла для 8 партиций
spark = SparkSession.builder \
    .appName("Test_8") \
    .master("local[*]") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.sql.adaptive.enabled", "false") \
    .getOrCreate()

# Читаем данные
data = spark.read.csv("./retail.csv", header=True, inferSchema=True)

# Замер времени
start = time()
result = data.groupBy("category").sum("amount")
result.count()  # обязательно, происходит shuffle
duration = time() - start

# Вывод результатов
print("Время выполнения:", f"{duration:.2f} сек")
print("Финальных партиций:", result.rdd.getNumPartitions())
print("Уникальных категорий:", result.count())
print("shuffle.partitions:", spark.conf.get("spark.sql.shuffle.partitions"))

spark.stop()

Время выполнения: 2.28 сек
Финальных партиций: 8
Уникальных категорий: 1000
shuffle.partitions: 8


In [22]:
from pyspark.sql import SparkSession
from time import time

# Запускаем без адаптивного механизма выбора партиций после шаффла для 2000 партиций
spark = SparkSession.builder \
    .appName("Test_8") \
    .master("local[*]") \
    .config("spark.sql.shuffle.partitions", "2000") \
    .config("spark.sql.adaptive.enabled", "false") \
    .getOrCreate()

# Читаем данные
data = spark.read.csv("./retail.csv", header=True, inferSchema=True)

# Замер времени
start = time()
result = data.groupBy("category").sum("amount")
result.count()  # обязательно, происходит shuffle
duration = time() - start

# Вывод результатов
print("Время выполнения:", f"{duration:.2f} сек")
print("Финальных партиций:", result.rdd.getNumPartitions())
print("Уникальных категорий:", result.count())
print("shuffle.partitions:", spark.conf.get("spark.sql.shuffle.partitions"))

spark.stop()

Время выполнения: 4.75 сек
Финальных партиций: 2000
Уникальных категорий: 1000
shuffle.partitions: 2000
