In [18]:
from IPython.core.interactiveshell import InteractiveShell

InteractiveShell.ast_node_interactivity = "all"

In [19]:
a = 10
def afunc():
    global a # 공유변수로 지정,, 이거 안해주면 인식 못함
    a+=1 # 참조해서 값을 읽어 다시 참조변수에 저장할때는 지역변수에서만 찾는다.
    print(a) # a는 afunc() 입장에서는 전역 변수
afunc()

11


# Shared Variables

- 모든 노드에서 사용하기 위한 공유변수


- 공유변수로 지정한 값은 모든 노드에 중복되어 캐시된다.


- 반복적으로 사용해야하는 변수라면,  
  스파크의 노드는 네트워크를 통해 통신 하기 때문에 모든 노드에 중복 캐시하는 시스템적 비용보다  
  네트워크 과정에서 발생하는 오버헤드 비용이 더 많이 발생하게 된다.

## Broadcast Variables

- 각 노드에 공유되는 읽기 전용 변수

In [31]:
# 학생별 수업카테고리코드로 지정되어있는 값을 카테고리 전체이름으로 변경한다고 가정 해보자

data = [("홍길동","DE"),
    ("이제동","DS"),
    ("김철수","DE"),
    ("변현재","WD")]

code_desc = {"DE":"Data Engineer", "DS":"Data Science", "WD":"Web Developer"}    

In [32]:
student_rdd = sc.parallelize(data, 3)
student_rdd.collect()

[('홍길동', 'DE'), ('이제동', 'DS'), ('김철수', 'DE'), ('변현재', 'WD')]

In [33]:
# student_rdd.mapValues(lambda x : code_desc[x]).collect()

### 학생 전공명 변경
- 학생 data는 여러 rdd 객체로 구성되어 있음
- 변경할 전공명은 code_desc변수로 구성해놓았음
- code_desc는 모든 rdd가 접근 가능해야하고, 내용이 변경되면 안됨
    - 공유변수로 등록, 변경 불가능한 readonly로 등록해서 사용할 필요가 있음 : Broadcast_var로 생성해서 사용
### Broadcast var
- sc(spark_context)가 아닌 spark(spark_session) 객체 변수로 제공
- spark_session, spark_context, broadcast --> 변수로 사용할 값이나 변수

In [37]:
# Broadcast_variables 사용하기
code_desc = {"DE":"Data Engineer", "DS":"Data Science", "WD":"Web Developer"} # 등록할 변수    
broadcast_S = spark.sparkContext.broadcast(code_desc) # 세션에 변수 등록하면 객체 반환
# 읽기전용 변수, 수정을 하면
code_desc["DS"] = "PROCESS"
broadcast_S.value["DS"] = "PROCESS"
# code_desc
# broadcast_S.value

del(broadcast_S.value["DE"])# 삭제를 해도
broadcast_S.value # action 연산이므로 rdd 내부에는 반영되지 않음

student_rdd.mapValues(lambda e :code_desc[e]).collect()
student_rdd.mapValues(lambda e :broadcast_S.value[e]).collect() # 사용 시점에는 삭제되지 않는다.

# 아무일도 발생하지 않았다.
# broadcast 함수를 사용해 생성하는 시점에 이미 SparkContext에 등록



{'DS': 'PROCESS', 'WD': 'Web Developer'}

[('홍길동', 'Data Engineer'),
 ('이제동', 'PROCESS'),
 ('김철수', 'Data Engineer'),
 ('변현재', 'Web Developer')]

[('홍길동', 'Data Engineer'),
 ('이제동', 'Data Science'),
 ('김철수', 'Data Engineer'),
 ('변현재', 'Web Developer')]

## Accumulator

- 각 노드에 공유되는 누산기 함수
- 저장 속성과 add() 메소드 갖고 있는 특수한 형태의 클래스
- sc.accumulator(수치형(정수형) // 기본값 :0)
    - 각 노드에서 필요하다면 + 변경은 가능함

In [45]:
accum = sc.accumulator(0) # accum.value == 0으로 초기화
rdd = sc.parallelize([1, 2, 3, 4, 5])

# accum 누산기 클래스 활용해서 rdd data를 더하는 작업
# foreach(f) : 전달되는 rdd 요소 각각에 대하여 f를 실행해주는 함수, f는 return값이 없어야함
    # 주로 accumulator에 누적 저장하거나 외부 시스템에 출력용도로 사용함
rdd.foreach(lambda x : accum.add(x))
# rdd.foreach(lambda x : accum.add(x)).collect() # --> error가 발생 : 반환값이 없어서
accum.value # 여기다가 저장해버림

15

### 잘못된 데이터 수 counting
- 누적연산 필요
- 누산기(accum) 활용

In [48]:
# 정상 데이터 : key:value
# 비정상 데이터 수 확인
accum1 = sc.accumulator(0) # 초기값 0으로 초기화

rdd = sc.parallelize(["A1:V1","A2:V2","A3","A4:V4","A5;V5","A6::A6"])

In [49]:
def f(x) :
    global accum1
    if len(x.split(":")) != 2:
        accum1.add(1) # accum1.value += 1 // 특정 조건일 경우 누산기에 누적 연산을 진행하는 함수
rdd.foreach(f)

print("잘못된 데이터 수 : " + str(accum1.value))

잘못된 데이터 수 : 3


In [3]:
# # accumulator를 사용하지 않는다면?
# a = 0

# # 모든 노드에서 발생하는 데이터 횟수를 확인해보자
# def change_cate(e):
#      a = a + 1
#      return broadcastStates.value[e]
    
# students_rdd.mapValues(lambda e : change_cate(e)).collect()

# # 횟수 확인
# # local variable 'a' referenced before assignment 발생
# a




In [4]:


# 모든 노드에서 발생하는 데이터 횟수를 확인해보자


# 횟수 확인
# local variable 'a' referenced before assignment 발생
