In [1]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"


# Spark 데이터 종류

- RDD : Low-level API에 해당 하는 데이터, 스파크에서 동작하는 모든 데이터는 결국 RDD형태로 연산된다  
- DataFrame : 열과 행으로 이루어진 테이블 형태의 데이터
- DataSet : 한가지 타입의 값만 저장할 수 있는 DataFrame. PySpark에서는 사용하지 않음

## RDD 

### Resillient Distributed Data

Resillient : 회복력 있는  
Distributed : 분산  
Data : 데이터  
 * Resilient : 장애가 발생할 경우 자동으로 데이터를 복구
 * Distributed : 데이터를 읽고 쓸 때 데이터를 파티셔닝 하여 병렬로 읽고 쓴다. 직렬로 모두 읽을 때 보다 속도가 빠르다
 
 
### RDD의 특징

 - Read Only : 변경불가능 객체
 - Lazy Evaluation : 늦은 수행


### RDD 파티셔닝

- 커다란 RDD를 처음 부터 끝까지 직렬로 읽고 쓰면 시간이 오래 걸린다.
- RDD를 작은 부분(Partition) 으로 쪼개 각 Patition 을 병렬로 읽고 써 실행속도를 향상시킬 수 있다.
- Patition을 나눠줄때는 스파크 클러스터 환경의 cpu core 숫자에 맞춰주는 것이 속도 측면에서 유리하다.


![rdd_partitioning](./img/rdd-partitioning.png)

#### * cpu 코어가 3개일때 파티셔닝 예시

![partioning](./img/partioning-optimizer.jpg)

## RDD - 생성

https://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds

In [2]:

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

distData.collect()



[1, 2, 3, 4, 5]

In [3]:
# range메서드로 RDD생성
# 1부터 10까지 2씩 증가하는 값, 파티션 3개
rdd1 = sc.range(1, 10, 2, 3)

rdd1.collect()
rdd1.getNumPartitions()


[1, 3, 5, 7, 9]

3

In [5]:

score_rdd = sc.textFile("/rdd/score.txt",20)
score_rdd.collect()
score_rdd.getNumPartitions()

2022-09-05 13:47:27,995 INFO memory.MemoryStore: Block broadcast_4 stored as values in memory (estimated size 307.6 KiB, free 433.7 MiB)
2022-09-05 13:47:28,230 INFO memory.MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 51.6 KiB, free 433.7 MiB)
2022-09-05 13:47:28,238 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on localhost:45529 (size: 51.6 KiB, free: 434.3 MiB)
2022-09-05 13:47:28,253 INFO spark.SparkContext: Created broadcast 4 from textFile at NativeMethodAccessorImpl.java:0
2022-09-05 13:47:28,423 INFO mapred.FileInputFormat: Total input files to process : 1
2022-09-05 13:47:28,490 INFO spark.SparkContext: Starting job: collect at /tmp/ipykernel_2410/3349058622.py:2
2022-09-05 13:47:28,497 INFO scheduler.DAGScheduler: Got job 3 (collect at /tmp/ipykernel_2410/3349058622.py:2) with 22 output partitions
2022-09-05 13:47:28,497 INFO scheduler.DAGScheduler: Final stage: ResultStage 3 (collect at /tmp/ipykernel_2410/3349058622.py:

2022-09-05 13:47:31,440 INFO scheduler.TaskSetManager: Finished task 19.0 in stage 3.0 (TID 49) in 151 ms on localhost (executor 1) (20/22)
2022-09-05 13:47:31,453 INFO scheduler.TaskSetManager: Finished task 20.0 in stage 3.0 (TID 50) in 133 ms on localhost (executor 2) (21/22)
2022-09-05 13:47:31,521 INFO scheduler.TaskSetManager: Finished task 21.0 in stage 3.0 (TID 51) in 85 ms on localhost (executor 1) (22/22)
2022-09-05 13:47:31,522 INFO cluster.YarnScheduler: Removed TaskSet 3.0, whose tasks have all completed, from pool 
2022-09-05 13:47:31,526 INFO scheduler.DAGScheduler: ResultStage 3 (collect at /tmp/ipykernel_2410/3349058622.py:2) finished in 2.996 s
2022-09-05 13:47:31,528 INFO scheduler.DAGScheduler: Job 3 is finished. Cancelling potential speculative or zombie tasks for this job
2022-09-05 13:47:31,529 INFO cluster.YarnScheduler: Killing all running tasks in stage 3: Stage finished
2022-09-05 13:47:31,532 INFO scheduler.DAGScheduler: Job 3 finished: collect at /tmp/ipyke

['하명도 스파크 50',
 '홍길동 스파크 80',
 '임꺽정 스파크 60',
 '임요환 텐서플로우 100',
 '홍진호 텐서플로우 22',
 '홍진호 텐서플로우 22',
 '이윤열 텐서플로우 90']

22

2022-09-05 14:00:55,394 INFO storage.BlockManagerInfo: Removed broadcast_5_piece0 on localhost:33887 in memory (size: 2.8 KiB, free: 434.3 MiB)
2022-09-05 14:00:55,413 INFO storage.BlockManagerInfo: Removed broadcast_5_piece0 on localhost:32873 in memory (size: 2.8 KiB, free: 434.3 MiB)
2022-09-05 14:00:55,419 INFO storage.BlockManagerInfo: Removed broadcast_5_piece0 on localhost:45529 in memory (size: 2.8 KiB, free: 434.3 MiB)
2022-09-05 14:00:55,495 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0 on localhost:45529 in memory (size: 2.8 KiB, free: 434.3 MiB)
2022-09-05 14:00:55,513 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0 on localhost:32873 in memory (size: 2.8 KiB, free: 434.3 MiB)
2022-09-05 14:00:55,516 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0 on localhost:33887 in memory (size: 2.8 KiB, free: 434.3 MiB)
