In [26]:
import os
import glob
import findspark
import pandas as pd
findspark.init()
findspark.find()

os.environ['HADOOP_HOME'] = 'C:\\apps\\opt\\spark-3.2.4-bin-hadoop2.7'
os.environ['SPARK_HOME'] = 'C:\\apps\\opt\\spark-3.2.4-bin-hadoop2.7'

from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
from pyspark.sql import types as T

!python --version

Python 3.8.17


- **Широковещательный хеш (Broadcast Hash)**, где один из двух входных наборов данных транслируется всем исполнителям, для каждого из которых строится хеш-таблица. Далее каждый раздел не транслируемого входного набора данных присоединяется независимо к другому набору данных, доступному в виде локальной хеш-таблицы. Это не требует перетасовки и является наиболее эффективным, однако исполнители должны иметь достаточно памяти для размещения транслируемого набора данных. Поэтому на практике Apache Spark избегает этого механизма, если оба набора входных данных превышают настраиваемый порог.
- **Перемешанный хеш (Shuffle Hash)**, где два набора входных данных выравниваются по выбранной схеме разделения (партиционирования). Если один или оба набора входных данных не соответствуют выбранной схеме разделения, операция перемешивания выполняется перед фактическим соединением для достижения соответствия. После того, как оба набора входных данных соответствуют выбранному выходному разделению, Shuffle Hash выполняет соединение для каждого выходного раздела, используя стандартный подход Hash Join. При этом для каждого выходного раздела хеш-таблица сначала создается из меньшего входного набора данных, а затем соответствующий раздел большего входного набора данных присоединяется к построенной хеш-таблице. *По сравнению с Broadcast Hash Join, требования к памяти исполнителей меньше в случае Shuffle Hash Join*, т.к. хеш-таблица строится только на определенном разделе меньшего набора входных данных. Поэтому при большом количестве выходных разделов и исполнителей с достаточным объемом памяти, Shuffle Hash Join будет весьма эффективным. Однако, если Spark потребуется выполнить дополнительную операцию перемешивания для одного или обоих входных наборов для соответствия разделению выходных данных, предпочтительнее ранее рассмотренный Broadcast Hash Join.
- **Сортировка через слияние (Sort Merge)**, которая похожа на Shuffle Hash Join. Здесь также два набора входных данных выравниваются по выбранной схеме разделения выходных данных. Если один из них или оба не соответствуют выбранной схеме разделения, операция перемешивания выполняется до фактического соединения, чтобы обеспечить нужное соответствие. После того, как соответствие выбранному выходному разделению для обоих входных наборов данных достигнуто, Sort Merge выполняет операцию соединения для каждого выходного раздела, используя стандартный подход Sort Merge Join. Он вычислительно менее эффективен по сравнению с Shuffle Hash Join и Broadcast Hash Join однако требования к памяти исполнителей значительно ниже. Как и Shuffle Hash Join, если входные наборы данных не соответствуют желаемому разделению выходных данных, то операция перемешивания входов увеличивает накладные расходы.
- **Декартово соединение (Cartesian)**, которое используется исключительно для выполнения перекрестного соединения между двумя наборами входных данных. Количество выходных разделов всегда равно произведению количества разделов входного набора данных. Каждый выходной раздел сопоставляется с уникальной парой разделов, каждая пара состоит из одного раздела одного и другого раздела второго набора данных. Для каждого из выходных разделов результат вычисляется как декартово произведение данных из двух входных разделов, сопоставленных с выходным разделом. Недостаток этого механизма в том, что Cartesian Join увеличивает количество выходных разделов. Однако, он является единственным, если требуется Cross- соединение.
- **Широковещательное соединение вложенного цикла (Broadcast Nested Loop)**, где один из входных наборов данных транслируется всем исполнителям. После этого каждый раздел не транслируемого набора входных данных присоединяется к транслируемому набору с использованием стандартной Join-процедуры с вложенным циклом, чтобы произвести объединенные выходные данные. С вычислительной точки зрения Broadcast Nested Loop Join наименее эффективен, поскольку для сравнения двух наборов данных выполняется вложенный цикл. Кроме того, это требует большого объема памяти, т.к. один из наборов входных данных должен транслироваться всем исполнителям.



https://bigdataschool.ru/blog/broadcast-join-in-spark-sql-and-other-hints-example.html

In [3]:
spark = SparkSession.builder.appName('APP').config('hive.exec.dynamic.partition.mode', 'nonstrict').enableHiveSupport().getOrCreate()

In [44]:
df = spark.read.option("header", True).csv("Pokemon_full.csv")

In [45]:
df.repartition(10).rdd.getNumPartitions()

10

In [65]:
df_agg = df.groupby('type').count()

In [66]:
columns = ['hp', 'attack', 'defense', 'sp atk', 'sp def','speed']

for col in columns:
    df = df.withColumn(col, F.col(col)+1)

In [74]:
df.explain(True)

== Parsed Logical Plan ==
'Project [name#439, pokedex id#440, height#441, weight#442, type#443, secundary type#444, hp#740, attack#753, defense#766, sp atk#779, sp def#792, ('speed + 1) AS speed#805]
+- Project [name#439, pokedex id#440, height#441, weight#442, type#443, secundary type#444, hp#740, attack#753, defense#766, sp atk#779, (cast(sp def#449 as double) + cast(1 as double)) AS sp def#792, speed#450]
   +- Project [name#439, pokedex id#440, height#441, weight#442, type#443, secundary type#444, hp#740, attack#753, defense#766, (cast(sp atk#448 as double) + cast(1 as double)) AS sp atk#779, sp def#449, speed#450]
      +- Project [name#439, pokedex id#440, height#441, weight#442, type#443, secundary type#444, hp#740, attack#753, (cast(defense#447 as double) + cast(1 as double)) AS defense#766, sp atk#448, sp def#449, speed#450]
         +- Project [name#439, pokedex id#440, height#441, weight#442, type#443, secundary type#444, hp#740, (cast(attack#446 as double) + cast(1 as doubl

In [70]:
dfc = df.localCheckpoint()

In [72]:
dfc.explain(True)

== Parsed Logical Plan ==
LogicalRDD [name#439, pokedex id#440, height#441, weight#442, type#443, secundary type#444, hp#740, attack#753, defense#766, sp atk#779, sp def#792, speed#805], false

== Analyzed Logical Plan ==
name: string, pokedex id: string, height: string, weight: string, type: string, secundary type: string, hp: double, attack: double, defense: double, sp atk: double, sp def: double, speed: double
LogicalRDD [name#439, pokedex id#440, height#441, weight#442, type#443, secundary type#444, hp#740, attack#753, defense#766, sp atk#779, sp def#792, speed#805], false

== Optimized Logical Plan ==
LogicalRDD [name#439, pokedex id#440, height#441, weight#442, type#443, secundary type#444, hp#740, attack#753, defense#766, sp atk#779, sp def#792, speed#805], false

== Physical Plan ==
*(1) Scan ExistingRDD[name#439,pokedex id#440,height#441,weight#442,type#443,secundary type#444,hp#740,attack#753,defense#766,sp atk#779,sp def#792,speed#805]



In [86]:
windowType = Window.partitionBy("type")
#windowType = Window.partitionBy()

result = df.withColumn("type_count", F.count("*").over(windowType))
result_from_cp = dfc.withColumn("type_count", F.count("*").over(windowType))

In [87]:
result.explain(True)

== Parsed Logical Plan ==
'Project [name#439, pokedex id#440, height#441, weight#442, type#443, secundary type#444, hp#740, attack#753, defense#766, sp atk#779, sp def#792, speed#805, count(1) windowspecdefinition('type, unspecifiedframe$()) AS type_count#968]
+- Project [name#439, pokedex id#440, height#441, weight#442, type#443, secundary type#444, hp#740, attack#753, defense#766, sp atk#779, sp def#792, (cast(speed#450 as double) + cast(1 as double)) AS speed#805]
   +- Project [name#439, pokedex id#440, height#441, weight#442, type#443, secundary type#444, hp#740, attack#753, defense#766, sp atk#779, (cast(sp def#449 as double) + cast(1 as double)) AS sp def#792, speed#450]
      +- Project [name#439, pokedex id#440, height#441, weight#442, type#443, secundary type#444, hp#740, attack#753, defense#766, (cast(sp atk#448 as double) + cast(1 as double)) AS sp atk#779, sp def#449, speed#450]
         +- Project [name#439, pokedex id#440, height#441, weight#442, type#443, secundary type

In [88]:
result_from_cp.explain(True)

== Parsed Logical Plan ==
'Project [name#439, pokedex id#440, height#441, weight#442, type#443, secundary type#444, hp#740, attack#753, defense#766, sp atk#779, sp def#792, speed#805, count(1) windowspecdefinition('type, unspecifiedframe$()) AS type_count#983]
+- LogicalRDD [name#439, pokedex id#440, height#441, weight#442, type#443, secundary type#444, hp#740, attack#753, defense#766, sp atk#779, sp def#792, speed#805], false

== Analyzed Logical Plan ==
name: string, pokedex id: string, height: string, weight: string, type: string, secundary type: string, hp: double, attack: double, defense: double, sp atk: double, sp def: double, speed: double, type_count: bigint
Project [name#439, pokedex id#440, height#441, weight#442, type#443, secundary type#444, hp#740, attack#753, defense#766, sp atk#779, sp def#792, speed#805, type_count#983L]
+- Project [name#439, pokedex id#440, height#441, weight#442, type#443, secundary type#444, hp#740, attack#753, defense#766, sp atk#779, sp def#792, sp

In [96]:
result_from_cp.join(df_agg, on = 'type').explain(False)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [type#443, name#439, pokedex id#440, height#441, weight#442, secundary type#444, hp#740, attack#753, defense#766, sp atk#779, sp def#792, speed#805, type_count#983L, count#737L]
   +- BroadcastHashJoin [type#443], [type#1314], Inner, BuildRight, false
      :- Window [count(1) windowspecdefinition(type#443, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS type_count#983L], [type#443]
      :  +- Sort [type#443 ASC NULLS FIRST], false, 0
      :     +- Exchange hashpartitioning(type#443, 200), ENSURE_REQUIREMENTS, [plan_id=1475]
      :        +- Filter isnotnull(type#443)
      :           +- Scan ExistingRDD[name#439,pokedex id#440,height#441,weight#442,type#443,secundary type#444,hp#740,attack#753,defense#766,sp atk#779,sp def#792,speed#805]
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [plan_id=1481]
         +- HashAggregate(keys=[type#131

In [97]:
result_from_cp.join(df_agg).explain(False)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastNestedLoopJoin BuildRight, Inner
   :- Window [count(1) windowspecdefinition(type#443, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS type_count#983L], [type#443]
   :  +- Sort [type#443 ASC NULLS FIRST], false, 0
   :     +- Exchange hashpartitioning(type#443, 200), ENSURE_REQUIREMENTS, [plan_id=1507]
   :        +- Scan ExistingRDD[name#439,pokedex id#440,height#441,weight#442,type#443,secundary type#444,hp#740,attack#753,defense#766,sp atk#779,sp def#792,speed#805]
   +- BroadcastExchange IdentityBroadcastMode, [plan_id=1513]
      +- HashAggregate(keys=[type#1340], functions=[count(1)])
         +- Exchange hashpartitioning(type#1340, 200), ENSURE_REQUIREMENTS, [plan_id=1510]
            +- HashAggregate(keys=[type#1340], functions=[partial_count(1)])
               +- FileScan csv [type#1340] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/

In [100]:
result_from_cp.join(df_agg.filter('type = "bug"')).explain(False)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastNestedLoopJoin BuildRight, Inner
   :- Window [count(1) windowspecdefinition(type#443, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS type_count#983L], [type#443]
   :  +- Sort [type#443 ASC NULLS FIRST], false, 0
   :     +- Exchange hashpartitioning(type#443, 200), ENSURE_REQUIREMENTS, [plan_id=1593]
   :        +- Scan ExistingRDD[name#439,pokedex id#440,height#441,weight#442,type#443,secundary type#444,hp#740,attack#753,defense#766,sp atk#779,sp def#792,speed#805]
   +- BroadcastExchange IdentityBroadcastMode, [plan_id=1599]
      +- HashAggregate(keys=[type#1393], functions=[count(1)])
         +- Exchange hashpartitioning(type#1393, 200), ENSURE_REQUIREMENTS, [plan_id=1596]
            +- HashAggregate(keys=[type#1393], functions=[partial_count(1)])
               +- Filter (isnotnull(type#1393) AND (type#1393 = bug))
                  +- FileScan csv [type#1393] Batched: false, 

In [93]:
result_from_cp.join(df_agg).show()

+--------+----------+------+------+----+--------------+----+------+-------+------+------+-----+----------+--------+-----+
|    name|pokedex id|height|weight|type|secundary type|  hp|attack|defense|sp atk|sp def|speed|type_count|    type|count|
+--------+----------+------+------+----+--------------+----+------+-------+------+------+-----+----------+--------+-----+
|caterpie|        10|     3|    29| bug|          None|46.0|  31.0|   36.0|  21.0|  21.0| 46.0|        75|     bug|   75|
|caterpie|        10|     3|    29| bug|          None|46.0|  31.0|   36.0|  21.0|  21.0| 46.0|        75|  normal|  109|
|caterpie|        10|     3|    29| bug|          None|46.0|  31.0|   36.0|  21.0|  21.0| 46.0|        75|   ghost|   30|
|caterpie|        10|     3|    29| bug|          None|46.0|  31.0|   36.0|  21.0|  21.0| 46.0|        75|   grass|   86|
|caterpie|        10|     3|    29| bug|          None|46.0|  31.0|   36.0|  21.0|  21.0| 46.0|        75|   steel|   30|
|caterpie|        10|   

In [43]:
df.explain(True)

== Parsed Logical Plan ==
'Project [name#56, pokedex id#57, height#58, weight#59, type#60, secundary type#61, hp#345, attack#358, defense#371, sp atk#384, sp def#397, ('speed + 1) AS speed#410]
+- Project [name#56, pokedex id#57, height#58, weight#59, type#60, secundary type#61, hp#345, attack#358, defense#371, sp atk#384, (cast(sp def#66 as double) + cast(1 as double)) AS sp def#397, speed#67]
   +- Project [name#56, pokedex id#57, height#58, weight#59, type#60, secundary type#61, hp#345, attack#358, defense#371, (cast(sp atk#65 as double) + cast(1 as double)) AS sp atk#384, sp def#66, speed#67]
      +- Project [name#56, pokedex id#57, height#58, weight#59, type#60, secundary type#61, hp#345, attack#358, (cast(defense#64 as double) + cast(1 as double)) AS defense#371, sp atk#65, sp def#66, speed#67]
         +- Project [name#56, pokedex id#57, height#58, weight#59, type#60, secundary type#61, hp#345, (cast(attack#63 as double) + cast(1 as double)) AS attack#358, defense#64, sp atk#65

In [109]:
os.listdir('spark-warehouse/pokemon')



['._SUCCESS.crc',
 'type=bug',
 'type=dark',
 'type=dragon',
 'type=dragon_new',
 'type=electric',
 'type=fairy',
 'type=fairy_new',
 'type=fighting',
 'type=fire',
 'type=fire_new',
 'type=flying',
 'type=ghost',
 'type=grass',
 'type=grass_new',
 'type=ground',
 'type=ice',
 'type=normal',
 'type=poison',
 'type=psychic',
 'type=rock',
 'type=steel',
 'type=water',
 '_SUCCESS']

In [136]:
result_from_cp.repartition(10, F.col("type")).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange hashpartitioning(type#443, 10), REPARTITION_BY_NUM, [plan_id=2934]
   +- Window [count(1) windowspecdefinition(type#443, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS type_count#983L], [type#443]
      +- Sort [type#443 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(type#443, 200), ENSURE_REQUIREMENTS, [plan_id=2931]
            +- Scan ExistingRDD[name#439,pokedex id#440,height#441,weight#442,type#443,secundary type#444,hp#740,attack#753,defense#766,sp atk#779,sp def#792,speed#805]




In [None]:
result_from_cp.repartition(10, F.col("type")).explain()

In [132]:
result_hp = result_from_cp.groupBy('type').agg(F.sum('hp').alias('sum_hp')).repartition(10).orderBy('sum_hp')
result_hp.write.mode("ignore").parquet("spark-warehouse/out_1")
print('\n'.join(os.listdir("spark-warehouse/out_1")), "\n\n")

result_hp.explain()

.part-00000-0037ab89-ef82-4a60-935e-b0ff108a2e71-c000.snappy.parquet.crc
._SUCCESS.crc
part-00000-0037ab89-ef82-4a60-935e-b0ff108a2e71-c000.snappy.parquet
_SUCCESS 


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [sum_hp#1912 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(sum_hp#1912 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=2758]
      +- Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, [plan_id=2756]
         +- HashAggregate(keys=[type#443], functions=[sum(hp#740)])
            +- Exchange hashpartitioning(type#443, 200), ENSURE_REQUIREMENTS, [plan_id=2754]
               +- HashAggregate(keys=[type#443], functions=[partial_sum(hp#740)])
                  +- Project [type#443, hp#740]
                     +- Scan ExistingRDD[name#439,pokedex id#440,height#441,weight#442,type#443,secundary type#444,hp#740,attack#753,defense#766,sp atk#779,sp def#792,speed#805]




In [133]:
result_hp = result_from_cp.groupBy('type').agg(F.sum('hp').alias('sum_hp')).repartition(10)#.orderBy('sum_hp')
result_hp.write.mode("ignore").parquet("spark-warehouse/out_2")
print('\n'.join(os.listdir("spark-warehouse/out_2")), "\n\n")

result_hp.explain()

.part-00000-e603b258-462e-47ef-864b-0378ec3043e2-c000.snappy.parquet.crc
.part-00001-e603b258-462e-47ef-864b-0378ec3043e2-c000.snappy.parquet.crc
.part-00002-e603b258-462e-47ef-864b-0378ec3043e2-c000.snappy.parquet.crc
.part-00003-e603b258-462e-47ef-864b-0378ec3043e2-c000.snappy.parquet.crc
.part-00004-e603b258-462e-47ef-864b-0378ec3043e2-c000.snappy.parquet.crc
.part-00005-e603b258-462e-47ef-864b-0378ec3043e2-c000.snappy.parquet.crc
.part-00006-e603b258-462e-47ef-864b-0378ec3043e2-c000.snappy.parquet.crc
.part-00007-e603b258-462e-47ef-864b-0378ec3043e2-c000.snappy.parquet.crc
.part-00008-e603b258-462e-47ef-864b-0378ec3043e2-c000.snappy.parquet.crc
.part-00009-e603b258-462e-47ef-864b-0378ec3043e2-c000.snappy.parquet.crc
._SUCCESS.crc
part-00000-e603b258-462e-47ef-864b-0378ec3043e2-c000.snappy.parquet
part-00001-e603b258-462e-47ef-864b-0378ec3043e2-c000.snappy.parquet
part-00002-e603b258-462e-47ef-864b-0378ec3043e2-c000.snappy.parquet
part-00003-e603b258-462e-47ef-864b-0378ec3043e2-c000

In [134]:
result_hp = result_from_cp.groupBy('type').agg(F.sum('hp').alias('sum_hp')).repartition(10).sortWithinPartitions('sum_hp')
result_hp.write.mode("overwrite").parquet("spark-warehouse/out_3")
print('\n'.join(os.listdir("spark-warehouse/out_3")), "\n\n")

result_hp.explain()

.part-00000-8fde3ba0-d535-4b60-ada0-b134a8745500-c000.snappy.parquet.crc
.part-00001-8fde3ba0-d535-4b60-ada0-b134a8745500-c000.snappy.parquet.crc
.part-00002-8fde3ba0-d535-4b60-ada0-b134a8745500-c000.snappy.parquet.crc
.part-00003-8fde3ba0-d535-4b60-ada0-b134a8745500-c000.snappy.parquet.crc
.part-00004-8fde3ba0-d535-4b60-ada0-b134a8745500-c000.snappy.parquet.crc
.part-00005-8fde3ba0-d535-4b60-ada0-b134a8745500-c000.snappy.parquet.crc
.part-00006-8fde3ba0-d535-4b60-ada0-b134a8745500-c000.snappy.parquet.crc
.part-00007-8fde3ba0-d535-4b60-ada0-b134a8745500-c000.snappy.parquet.crc
.part-00008-8fde3ba0-d535-4b60-ada0-b134a8745500-c000.snappy.parquet.crc
.part-00009-8fde3ba0-d535-4b60-ada0-b134a8745500-c000.snappy.parquet.crc
._SUCCESS.crc
part-00000-8fde3ba0-d535-4b60-ada0-b134a8745500-c000.snappy.parquet
part-00001-8fde3ba0-d535-4b60-ada0-b134a8745500-c000.snappy.parquet
part-00002-8fde3ba0-d535-4b60-ada0-b134a8745500-c000.snappy.parquet
part-00003-8fde3ba0-d535-4b60-ada0-b134a8745500-c000

In [162]:
result_hp = result_from_cp.withColumn('hp100', (F.col('hp')/100).cast('int')).repartition(3,'hp')


In [163]:
from pyspark.sql.functions  import spark_partition_id
result_hp.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().show()

+-----------+-----+
|partitionId|count|
+-----------+-----+
|          0|  460|
|          1|  270|
|          2|  160|
+-----------+-----+



In [164]:
result_rep = result_hp.repartition(5)
result_rep.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().show()

+-----------+-----+
|partitionId|count|
+-----------+-----+
|          0|  178|
|          1|  178|
|          2|  178|
|          3|  178|
|          4|  178|
+-----------+-----+



In [166]:
result_rep = result_hp.repartition(8, 'type')
result_rep.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().show()

+-----------+-----+
|partitionId|count|
+-----------+-----+
|          0|   28|
|          1|  219|
|          2|   34|
|          3|   27|
|          4|  151|
|          5|  158|
|          6|  185|
|          7|   88|
+-----------+-----+

