# Apache Spark (Overview) - p.127

## RDD (Resilient Distributed Dataset, 탄력적이면서 분산된 데이터셋)
- 오류 자동복구 기능이 포함된 가상의 '리스트'
    - 병렬처리 되는 리스트라고 생각하면 쉽다.
    - MapReduce를 대체하려고 RDD가 나옴.
    - ML이 유행되면서, ML에서 제공해주는 데이터는 table 형태의 데이터가 많다(traditional 형태, 딥러닝은 이미지, 영상 등으로 예외 케이스) -> rdd 보다는 dataframe 형태가 맞다고 발달됨
- 다양한 계산을 수행 가능, 메모리를 활용하여 높은 성능을 가짐

## Scala Interface
- 매우 간결한 표현이 가능한 모던 프로그래밍 언어
- Functional Programming이 가능해 데이터의 변환을 효과적으로 표현할 수 있음

## Apache Spark 확장 프로젝트
- Spark을 엔진으로 하는 확장 프로젝트들이 같이 제공됨

## Apache Spark Demo
- pyspark
    - sc
        - SparkContext
        - ex
        ```Python
            rdd = sc.parallelize([1, 2, 3, 4])
            rdd2 = rdd.map(lambda x: x*2)
            rdd2.collect() ... # rdd2에 있는 걸 다 뽑아서 보여줌 -> 이때 작업을 수행함
            
            rdd3 = rdd.filter(lambda x: x > 2)
            rdd3.collect() # -> 이때 작업을 수행함
        ```    
        - python array와 다를 건 없다.
            - python array는 기능이 별로 없다.
        
    - spark
        - SparkSession
        
        - DataFrame ex
        ```Python
        d = [{'name' : 'Alice', 'age': 1}]
        df = spark.createDataFrame(d)
        df.show()
        df.filter(df['age'] == 0).show()
        ```
        - Pandas DataFrame과 Spark DataFrame은 비슷하지만 Pandas는 컴퓨터 한 대의 용량으로 처리하기에 적합하지만, Spark는 데이터가 아무리 크더라도 빠르게 처리할 수 있다. 나머지는 거의 동일하다.


## Simple Operations
- Create RDD
- **Actions** : Count, Collect, Take
    - Count : 구하는 것
    - Collect : Array 형태로 바꾸기, 전체 보여주기
    - Take : collect와 비슷한 개념, 몇개만 보여주기 (head 개념)
- **Transformations** : Map, Filter, etc
    - 
- File Load, Save


## Spark Core Concept
### RDD (Resilient Distributed Dataset)
- 클러스터 전체에서 공유되는 리스트, 메모리상에 올라가있음(메모리 부족한 경우, 디스크에 spill)
    - 가상의 List, 메모리 상에 올라가 있으나 안 올라가 있어도 된다.
    - join이나 group operation을 하려면 메모리가 많이 필요하다.
- map, reduce, count, filter, join 등 다양한 작업 가능
- 작업을 병렬적으로 처리
- 여러 작업을 설정해두고, 결과를 얻을 때 **lazy하게** 계산
   
<img src = "../../images/dees_ksw_4_1.png">

#### Transformations
    - 데이터를 어떻게 구해낼지를 표현
#### Actions
    - 표현된 데이터를 가져옴
#### Lineage
    - 클러스터 중 일부의 고장 등으로 작업이 중간에 실패하더라도, Lineage를 통해 데이터를 복구
#### Lazy Execution
    - Transformation 시에는 계산을 수행하지 않고, Action이 수행되는 시점부터 데이터를 읽어들여서 계산을 시작
    
     - (MapReduce처럼 중간 단계마다 저장하지 않기 때문에) 성능(속도)적인 측면에서 효율적이다.
     
```Python
rdd = sc.parallelize([1, 2, 3, 4])
rdd2 = rdd.map(lambda x: x*2)
rdd3 = rdd.filter(lambda x: x > 2) # 이때까지 실제 작업 수행되지 않음
rdd3.collect # 이때 실제 작업을 수행함
```

### RDD Transformations
- map, flatmap, mapPartitions, sample, union, intersection, distinct, groupByKey, reduceByKey, join, repartition 등
- 최신의 자료는 [Spark Programming Guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html) 참고
- [Zhen He Spark](http://homepage.cs.latrobe.edu.au/zhe/)
- [The RDD API By Example](http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html)

### RDD Actions
- 여러가지 변환(Transformation)이 담긴 RDD의 정보를 통한 계산을 수행함
- reduce, collect, **count**, first, **take**, **saveAsTextFile**, countByKey, foreach 등

### RDD Caching
- 반복 계산에서의 성능 향상을 위해 RDD의 내용을 캐시 가능
- API
    - rdd.persist() or rdd.cache()
    - rdd.unpersist()
    - rdd.persist(MEMORY_ONLY)와 같이 캐시 옵션을 줄 수 있음
- Persistence Level
<img src = "../../images/dees_ksw_4_2.png">

### RDD 내부
- **Partition**
- **Dependency**
- Function
- Metadata

```Python
rdd1 = sc.parallelize([('a', 1), ('b', 2)]) # tuple (key, value 형태)
rdd2 = sc.parallelize([('a', 3), ('b', 2)])
rdd1.join(rdd2)
rdd.join(rdd2).collect()
rdd1.leftOuterJoin(rdd2).collect()
rdd1.fullOuterJoin(rdd2).collect()
```

### Spark Cluster

<img src = "../../images/dees_ksw_4_3.png">
- Hadoop 등에서 채용하고 있는 전형적인 master-slave 구조
- Master 서버
    - Diver program
- Slave (Worker)
    - Task들을 수행
    - 데이터들을 캐싱
    
- 마스터는 후진 컴퓨터를 써도 될 정도로 할 일이 없다. 워커들이 많을 것이다. 워커에 executor를 띄울 것이다. 워커, 마스터는 컴퓨터 한 대를 뜻한다. 

- [Garbage Collection (G.C)](https://ko.wikipedia.org/wiki/%EC%93%B0%EB%A0%88%EA%B8%B0_%EC%88%98%EC%A7%91_(%EC%BB%B4%ED%93%A8%ED%84%B0_%EA%B3%BC%ED%95%99))
    - 프로그램이 커지면 GC하는데 오래 걸린다(쓰레기를 많이 쌓아놨으면 청소하는데 오래 걸리는 개념)
- [Garbage Collection (G.C)](https://d2.naver.com/helloworld/1329)
    
### Spark Shell, Spark App

### Spark 소스코드

### flatmap
```Python
wc 
wc.sortBy
```

    


---
### Socar
- 회원 레코드 : 400만 (수백~수천 만건)
- 예약 레코드 : 수천만 건 (수십~수백억 건)
- 수십 억, 수천 억 데이터
- 남녀 비율을 알고 싶다면, 두 테이블의 user_id를 합쳐야함

### 회사 이야기
- happy한 결정
    - socar
        - 가격 전략 미팅
            - 차를 시간별로 빌려 쓰는 서비스
            - 30분 부터 사용이 가능
            - 30분 : 4000원, 1시간 : 8000원, 10시간~24시간 : 8만원(10시간 비용)
            - 바꿔야 함 ... 문제가 있기 때문에.
            - 하루 요금을 10% 정도 저렴하게 깎음 -> 7.2만원
            - 짧은 구간에서 손해(적자)가 많이 남, 손해나는 구간을 없애기 위해서 짧은 시간 구간을 늘려야 겠다고 생각함
            - 짧게 빌릴 수록 적자, 4시간은 괜찮음
            - 모든 구간에서, 10% 씩 오르는데 많이 쓸수록 적게 오르도록 (A/B/C test)
            - A는 평일, B는 주말에 괜찮음.
            - 소수의 사용자들에게만 A, B를 적용했기 때문에 결과를 얼만큼 믿을 수 있느냐... 이슈가 있음
            - 빅데이터 스케일로 하면 통계 분석이 필요없다. 큰 수의 법칙으로 평균으로만 비교해도 된다.
            - 데이터의 다양성(경향성)에 비해서 숫자가 적으므로, 통계 분석이 필요하다.
            
    - tada
        - 초기대비 20~30배 이상 성장
        - data science Task
            - 운영 효율 높이기
                - 적자 서비스
                - 차량 잘 배치하기
                - 과거 호출 데이터를 통해 예측 모델 만들기
                    - 잘 예측해서, 차들을 호출이 많을 지역으로 보내는 알고리즘 만들기
                - 타다 현재 1,000 대
                    - 순환 고리 매니징 : 머신러닝 프로젝트의 성패
            - 가격 전략
                - 
---
## Questions
- pandas에서 기본적으로 제공해주지 않는 함수
    - 개별적으로 함수를 만들어서 처리하는데 spark도 동일하게 하는지? 데이터 양
- 통계
    - 파생변수가 많으면 통계적으로 유의한 변수들만 추출
- union all 도 있는지?
- partition = partition by ?


- Mac이 훨씬 좋음
    - Server 환경 Linux
    - Mac은 Unix 기반
    - Native로 돌아감
    
- 딥러닝은 GPU 유무에 따라 성능 차이가 크므로, gpu 달린 윈도우를 쓰는 사람도 있음