In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Создаем SparkSession и настраиваем количество партиций для shuffle
spark = SparkSession.builder \
    .appName("Shuffle Example") \
    .config("spark.sql.shuffle.partitions", "50") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/23 16:12:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Проверяем текущую настройку количества партиций для shuffle
print(f"Current shuffle partitions setting: {spark.conf.get('spark.sql.shuffle.partitions')}")

Current shuffle partitions setting: 50


25/04/23 16:13:13 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [3]:
# Пример данных с большим объемом
data1 = [(i, f"Name_{i % 5}") for i in range(1000000)]
data2 = [(i, f"Category_{i % 3}") for i in range(1000000)]

df1 = spark.createDataFrame(data1, ["id", "name"])
df2 = spark.createDataFrame(data2, ["id", "category"])

In [6]:
df1.show(5)

25/04/23 16:16:04 WARN TaskSetManager: Stage 4 contains a task of very large size (7701 KiB). The maximum recommended task size is 1000 KiB.
Exception ignored in: <_io.BufferedWriter name=5>
Traceback (most recent call last):
  File "/home/ilya/github/DE_course_repo/python_projects/py_venv/lib/python3.13/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 193, in manager
BrokenPipeError: [Errno 32] Broken pipe


+---+------+
| id|  name|
+---+------+
|  0|Name_0|
|  1|Name_1|
|  2|Name_2|
|  3|Name_3|
|  4|Name_4|
+---+------+
only showing top 5 rows



In [7]:
# Проверяем начальное количество партиций
initial_partitions_df1 = df1.rdd.getNumPartitions()
initial_partitions_df2 = df2.rdd.getNumPartitions()
print(f"Initial number of partitions in df1: {initial_partitions_df1}")
print(f"Initial number of partitions in df2: {initial_partitions_df2}")

Initial number of partitions in df1: 2
Initial number of partitions in df2: 2


In [8]:
# Принудительно устанавливаем количество партиций перед join
df1_repartitioned = df1.repartition(50)
df2_repartitioned = df2.repartition(50)

In [9]:
# Проверяем количество партиций после repartition
repartitioned_partitions_df1 = df1_repartitioned.rdd.getNumPartitions()
repartitioned_partitions_df2 = df2_repartitioned.rdd.getNumPartitions()
print(f"Number of partitions in df1 after repartition: {repartitioned_partitions_df1}")
print(f"Number of partitions in df2 after repartition: {repartitioned_partitions_df2}")

25/04/23 16:16:36 WARN TaskSetManager: Stage 5 contains a task of very large size (7701 KiB). The maximum recommended task size is 1000 KiB.
25/04/23 16:16:43 WARN TaskSetManager: Stage 6 contains a task of very large size (9653 KiB). The maximum recommended task size is 1000 KiB.

Number of partitions in df1 after repartition: 50
Number of partitions in df2 after repartition: 50


In [10]:
# Выполняем операцию join, требующую shuffle
joined_df = df1_repartitioned.join(df2_repartitioned, "id")

In [11]:
# Проверяем количество партиций после join
joined_partitions = joined_df.rdd.getNumPartitions()
print(f"Number of partitions after join: {joined_partitions}")

25/04/23 16:17:07 WARN TaskSetManager: Stage 7 contains a task of very large size (7701 KiB). The maximum recommended task size is 1000 KiB.
25/04/23 16:17:10 WARN TaskSetManager: Stage 8 contains a task of very large size (9653 KiB). The maximum recommended task size is 1000 KiB.

Number of partitions after join: 2


In [12]:

# Останавливаем SparkSession
spark.stop()