<a href="https://colab.research.google.com/github/IlyaDenisov88/dataenj/blob/main/PySpark/Broadcast%26OptimizationTask.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Broadcast

Давайте также рассмотрим, что такое функция `broadcast` и что она делает.

- Broadcast переменные используются для эффективного распределения больших объектов или небольших наборов данных по всем узлам кластера.
- позволяет избежать многократной передачи одних и тех же данных при выполнении распределенных вычислений.
- Broadcast переменные полезны, когда у вас есть неизменяемые данные, которые нужно использовать в нескольких местах в вашем приложении Spark, и эти данные достаточно велики, чтобы их передача в каждый узел была неэффективной.

Ничего не понятно, но очень интересно... Давайте разбираться.

Я предвкушаю Ваши вопросы по поводу того, а в чем разница в сравнении с `cache` и с `persist` соответственно? В ходе объяснения постараюсь рассказать Вам эту разницу.

- Broadcast чаще всего используется в случае, когда у Вас есть какой-то справочник с данными. То есть он не будет уже изменяться. И, поскольку, он хранится на всех узлах одновременно, то у нас нет необходимости каждый раз вызывать этап `shuffle` для перемещения ключей. Приведу простой пример, когда это может использоваться.

Например, если у вас есть большой DataFrame и маленький DataFrame, и вы хотите выполнить join, оптимальным решением будет использовать broadcast переменную для маленького DataFrame.
 - Это позволяет *избежать дорогих shuffle операций*, так как маленький DataFrame будет распространяться по всем узлам и join будет выполняться локально на каждом узле.



Теперь взглянем на код -



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast


spark = SparkSession.builder \
    .appName("Broadcast Join Example") \
    .getOrCreate()

# Пример большого DataFrame
large_data = [(i, f"Name_{i % 5}") for i in range(1000000)]
large_df = spark.createDataFrame(large_data, ["id", "name"])

# Пример маленького DataFrame
small_data = [(1, "HR"), (2, "Finance"), (3, "Engineering")]
small_df = spark.createDataFrame(small_data, ["id", "department"])

# Применение broadcast к маленькому DataFrame
broadcast_small_df = broadcast(small_df)

# Выполнение join операции
joined_df = large_df.join(broadcast_small_df, "id")


joined_df.show()


spark.stop()


- Мы создаем большой DataFrame large_df, который содержит миллион записей.
- Мы создаем маленький DataFrame small_df, который содержит несколько записей.
- Мы используем функцию broadcast для маленького DataFrame. Он распространит копию маленького DataFrame на все узлы кластера.
- Мы выполняем join между большим DataFrame и broadcast-версией маленького DataFrame. Благодаря broadcast переменной, join выполняется локально на каждом узле без необходимости shuffle операций.


И, на всякий случай, сравним его с cache и persist.



- **Broadcast**: Распространяет неизменяемые данные по всему кластеру для эффективного использования.
- **Cache/Persist**: Кэширует промежуточные результаты вычислений для многократного использования.
- **Broadcast**: Используется для небольших объектов, которые должны быть доступны на всех узлах.
- **Cache/Persist**: Используется для больших наборов данных, которые обрабатываются многократно.
- **Broadcast**: Отправляет копию данных на каждый узел, чтобы избежать многократной передачи.
- **Cache/Persist**: Хранит данные в памяти или на диске для последующего использования без повторных вычислений.


# Оптимизировать данный код (запуск как будто на кластере)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast


spark = SparkSession.builder \
    .appName("Broadcast Join Example") \
    .getOrCreate()

# Пример большого DataFrame
large_data = [(i, f"Name_{i % 5}") for i in range(1000000)]
large_df = spark.createDataFrame(large_data, ["id", "name"])

# Пример маленького DataFrame
small_data = [(1, "HR"), (2, "Finance"), (3, "Engineering")]
small_df = spark.createDataFrame(small_data, ["id", "department"])

# Применение broadcast к маленькому DataFrame
broadcast_small_df = broadcast(small_df)

# Выполнение join операции
joined_df = large_df.join(broadcast_small_df, "id")


joined_df.show()




МОИ идеи что сделать:
1. Catalyst Optimizer и Tungsten Execution Engine
2. Разделить большой дф на бакеты для более эффективного соединения

# Решение:

Если ваш большой DataFrame не равномерно партиционирован, это может привести к дисбалансу нагрузки. Перераспределение данных с использованием метода repartition может помочь.



In [None]:
# Перераспределение большого DataFrame на большее количество партиций для улучшения параллелизма
large_df = large_df.repartition(200)  # Выберите разумное количество партиций в зависимости от вашего кластера


Если большой DataFrame используется в нескольких местах, его кэширование может улучшить производительность.



In [None]:
# Кэширование большого DataFrame
large_df.cache()


Тюнинг конфигурации Spark.



In [None]:
spark.conf.set("spark.sql.shuffle.partitions", "200")  # Количество партиций для операций shuffle
spark.conf.set("spark.executor.memory", "4g")  # Размер памяти для каждого исполнителя
spark.conf.set("spark.driver.memory", "4g")  # Размер памяти для драйвера


spark.stop()

В целом идеи были в правильном направлении -
 - переразделить данные для равномерности
 - про кеш не подумал (многократное использование вроде не подразумевалось, а так конечно дада)
 - Ну и размер памяти для исполнителей и драйвера (SparkSession) весьма большой (на кластере ж все-таки), об этом не подумал.