## Pyspark BroadCast

1. python 객체 브로드캐스팅

- 딕셔너리를 매핑할때 쓴다.
- sparkContext를 통해 작은 참조데이터를 Executor(일꾼)들에게 전송

occupation_dict = spark.sparkContext.broadcast(meta) -> 네트워크 전송은 딱 1번만 발생

이때 occupation_dict는 데이터 그 자체가 아니라, Broadcast객체 -> .value를 써야한다

F.udf : udf? User Defined Function 사용자 정의 함수
스파크는 JVM 기반, 파이썬 코드를 직접 실행할수 없어서 이 컬럼 처리시 파이썬한테 물어보고 오라고 연결해주는 다리역할 

2. Join

- 참조 데이터가 딕셔너리가 아니라 또 다른 데이터 프레임 일때 쓴다


>브로드캐스트는 데이터를 메모리에 통째로 올리는 방식
>참조 데이터가 너무 크면 메모리가 터진다

In [1]:
from pyspark.sql import (
    Row,
    SparkSession)
import pyspark.sql.functions as F
from pyspark.sql.types import StringType

In [2]:
spark=(
    SparkSession
    .builder
    .appName("broadcast_study")
    .master("spark://spark-master:7077")
    .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/30 07:53:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
#  (occupation_id만 있음)
interviewer_count = spark.createDataFrame(
    [
        ("1100",),
        ("2030",),
        ("1100",),
        ("3801",),
        ("9999",),
        ("2030",),
    ],
    ["occupation_id"]
)

interviewer_count.show()

                                                                                

+-------------+
|occupation_id|
+-------------+
|         1100|
|         2030|
|         1100|
|         3801|
|         9999|
|         2030|
+-------------+



In [4]:
# 작은 참조 데이터 (Python Dict)
meta = {
    "1100": "engineer",
    "2030": "developer",
    "3801": "painter",
    "3021": "chemistry teacher",
}

In [7]:
# 브로드캐스트 없이 

In [5]:
def lookup_no_bc(occupation_id):
    return meta.get(occupation_id,"Unknown")

lookup_no_bc_udf=F.udf(lookup_no_bc)

print(lookup_no_bc_udf)

<function lookup_no_bc at 0xffff60e7ef70>


In [6]:
df_no_bc=interviewer_count.withColumn(
    "occupation_name",
    lookup_no_bc_udf(F.col("occupation_id"))
)
df_no_bc.show()

+-------------+---------------+
|occupation_id|occupation_name|
+-------------+---------------+
|         1100|       engineer|
|         2030|      developer|
|         1100|       engineer|
|         3801|        painter|
|         9999|        Unknown|
|         2030|      developer|
+-------------+---------------+



In [7]:
# 	97 ms

In [8]:
# 브로드캐스트 사용ㅇ

In [9]:
# 작은데이터를 모든 Excutor에게 전하기 
# occupation_bc는 딕셔너리가 아니라 Broadcast객체!
occupation_bc=spark.sparkContext.broadcast(meta)
print(occupation_bc)
print(occupation_bc.value)

<pyspark.broadcast.Broadcast object at 0xffff60e76fa0>
{'1100': 'engineer', '2030': 'developer', '3801': 'painter', '3021': 'chemistry teacher'}


In [10]:
def get_occupation_name(occupation_id:str)->str:
    return occupation_bc.value.get(occupation_id,"Unknown")

In [11]:
# udf등록 
occupation_lookup_udf=F.udf(get_occupation_name,StringType())

In [12]:
df_result=interviewer_count.withColumn(
    "occupation_name",
    occupation_lookup_udf(F.col("occupation_id"))
)
df_result.show()

+-------------+---------------+
|occupation_id|occupation_name|
+-------------+---------------+
|         1100|       engineer|
|         2030|      developer|
|         1100|       engineer|
|         3801|        painter|
|         9999|        Unknown|
|         2030|      developer|
+-------------+---------------+



In [13]:
# 	81 ms

### BroadCast 했을때랑 안했을때 시간 차이 

 Spark UI 에서 확인해 본 결과 
 
 BroadCast안했을때는  	97 ms + 결과가 나올때 버벅거림 있다
 
 BroadCast했을때는 81ms + 결과가 빠르게 나오는 것을 확인했다

 >겨우 5개짜리 데이터에서도 이 정도 차이가 난다면, **실무 데이터(수만 건)** 에서는 **몇 분 vs 몇 시간**의 차이로 벌어질 수 있음!



### Join패턴

In [14]:
big_df = spark.createDataFrame(
    [
        (1, "click"),
        (2, "view"),
        (3, "click"),
        (1, "purchase"),
        (2, "click"),
    ],
    ["user_id", "event"]
)

big_df.show()

+-------+--------+
|user_id|   event|
+-------+--------+
|      1|   click|
|      2|    view|
|      3|   click|
|      1|purchase|
|      2|   click|
+-------+--------+



In [15]:
small_df = spark.createDataFrame(
    [
        (1, "Alice"),
        (2, "Bob"),
        (3, "Charlie"),
    ],
    ["user_id", "user_name"]
)

small_df.show()

+-------+---------+
|user_id|user_name|
+-------+---------+
|      1|    Alice|
|      2|      Bob|
|      3|  Charlie|
+-------+---------+



In [17]:
joined_normal=big_df.join(small_df,"user_id")
joined_normal.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [user_id#35L, event#36, user_name#49]
   +- SortMergeJoin [user_id#35L], [user_id#48L], Inner
      :- Sort [user_id#35L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(user_id#35L, 200), ENSURE_REQUIREMENTS, [plan_id=109]
      :     +- Filter isnotnull(user_id#35L)
      :        +- Scan ExistingRDD[user_id#35L,event#36]
      +- Sort [user_id#48L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(user_id#48L, 200), ENSURE_REQUIREMENTS, [plan_id=110]
            +- Filter isnotnull(user_id#48L)
               +- Scan ExistingRDD[user_id#48L,user_name#49]




In [19]:
# SortMergeJoin
# Exchange -> 네트워크 데이터 이동 , 파티션을 다시 나누는 지점 
# 셔플 발생

In [21]:
# Broadcast Join
joined_bc=big_df.join(
    F.broadcast(small_df),
    "user_id"
)
joined_bc.show()
joined_bc.explain()

+-------+--------+---------+
|user_id|   event|user_name|
+-------+--------+---------+
|      1|   click|    Alice|
|      2|    view|      Bob|
|      3|   click|  Charlie|
|      1|purchase|    Alice|
|      2|   click|      Bob|
+-------+--------+---------+

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [user_id#35L, event#36, user_name#49]
   +- BroadcastHashJoin [user_id#35L], [user_id#48L], Inner, BuildRight, false
      :- Filter isnotnull(user_id#35L)
      :  +- Scan ExistingRDD[user_id#35L,event#36]
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=265]
         +- Filter isnotnull(user_id#48L)
            +- Scan ExistingRDD[user_id#48L,user_name#49]




In [23]:
# BroadcastExchange HashedRelationBroadcastMode 
# BroadcastExchange 작은쪽데이터를 모든 Excutor복사, 셔플이 x
# spakUI에서 SQL/DataFrame을 확인해보자 !