<h3>Apache Spark: управление памятью</h3>

<img src="static/hierarchy.JPG" alt="hierarchy" width="500"/>

Есть несколько уровней управления памятью (Spark, Yarn, JVM, ОС). При запуске Spark приложения выделяется память для драйвера, представлена JVM, и память под исполнителей (внутреннее управление Spark-а). YARN выделяет контейнеры под исполнение для работы на различных узловых машинах (node).

<img src="static/node_mng.JPG" alt="node_mng" width="500"/>

ResourceManager работает с запросами памяти, выделяя исполнителям памяти вплоть до некоторой границы, заданной yarn.scheduler.maximum-allocation-mb. Запросить больше не получится. На одной ноде, эта работа будет сделана NodeManager-ом, ресурсы которого ограничены физически, в YARN соответствующая настройка yarn.nodemanager.resource.memory-mb (физический объем памяти на NodeManager в Mb, который может быть ведлене под yarn контейнер). Один ExecutorContainer предствален JVM, вся памяти которой поделена на три секции

<ul>
    <li><b>Heap memory</b> Память задается свойством spark.executor.memory и определяет максимальную память JVM heap memory.</li>
    <li><b>Off-Heap memory</b> Отличается от предыдущей отсутсвием сборщика мусора</li>
<li><b>Overhead memory</b></li> экстраресурсы для особенно тяжелых задач
</ul>

Спарк предоставляет единый интерфейс управления памятью через UnifiedMemoryManager.

<img src="static/memory_parts.JPG" alt="memory_parts" width="500"/>

<h4><b>Зарезервированная память</b></h4>

Спарк преберегает память для хранения внутренних объектов. Это гарантирует наличие ресурсов даже для маленькой JVM heap. Устанавливается объемом 300 Мб

<h4><b>Storage memory</b></h4>

Нужна для кеширования и broadcast операций. Объем определяется 
$Storage Memory = usableMemory * spark.memory.fraction * spark.memory.storageFraction$
По умолчанию занимает треть всей памяти

<h4><b>Execution memory</b></h4>

Память под хранение временных данных операций join, shuffle, sort и другие.
$Execution Memory = usableMemory * spark.memory.fraction * (1 - spark.memory.storageFraction)$

<h4><b>User memory</b></h4>

Обычно использзуется для хранения родословни нашего DF. Имеющееся пространство можно использзовать на усмотрение пользователя.
$User Memory = usableMemory * (1 - spark.memory.fraction)$

<h4><b>Dynamic occupancy mechanism</b></h4>

<img src="static/dynamic_occup.JPG" alt="dynamic_occup" width="500"/>

Общая память делится на память хранения и память исполнения. Общая память хранения может наполняться до порога onHeapStorageRegionSize, эта часть памяти используется, если она свободна от файлов исполнения. В противном случае хранение ждет освобождение ресурсов от данных исполнения. По умолчанию граница устанавливается на весь объем общей памяти.

Данные исполнения имеют приоритет над хранением и последним придется уйти, особождая ресурсы. В случае заполнения заданных в настройках пропорций, в ход идет алгоритм LRU.

### Оптимизация

<h4><b>Виды партиций</b></h4>

<ul>
    <li><b>Входные</b> Можем управлять размером spark.sql.files.MaxPartitionBytes, обычно довольствуемся стандартными 128MB</li>
    <li><b>Shuffle</b> Управляем количеством, ззадем spark.sql.shuffle.partitions</li>
<li><b>Выходые</b></li> задаем количество coalesce, repartition, option("maxRecordesPerFile")
</ul>

<h4><b>Persistence/Broadcast</b></h4>

<img src="static/persist.JPG" alt="persist" width="500"/>
<img src="static/broadcast.JPG" alt="broacast" width="500"/>

<h4><b>Rules</b></h4>

<img src="static/plan.JPG" alt="broacast" width="500"/>

Каждый оператор физического плана определяется выходными партициями (outputPartitioning), выходным порядком (outputOrdering), requiredChildDistribution (требование на выходные партиции дочернего оператора), requiredChildOrdering. Если требования между операторами не согласуются, вызывается перетасовка или сортировка

<img src="static/req_1.JPG" alt="req_1" width="500"/>
<img src="static/req_2.JPG" alt="req_2" width="500"/>

Можно опустить дорогую операцию обмена и сортировки перед SMJ, проведя предварительно repartition() и сортировку внутри партиций. Выигрыш будет получен, если последующие операции не сокращают количество значений поля, по которому было проведено переразбиение.

<img src="static/avoid_shuffle.JPG" alt="avoid_shuffle" width="500"/>

Глобально можно поделить техники оптимизации на переиспользование обмена (нельзя управлять напрямую, определяется оптимизатором), переиспользование вычислений (кеширование и удержание наборов данных), оптимизация способов разбиения на блоки