
# Apache Spark?
- 빅데이터 처리를 위한 통합 컴퓨팅 엔진, 클러스터 환경에서 데이터를 병렬로 처리하는 라이브러리 집합
- -> 사전에, 실제 연산을 수행하는 컴퓨팅 노드를 추가/삭제를 쉽게 할 수 있음

여기서 `통합`, `컴퓨팅 엔진`과 `라이브러리 집합`이 언급됐는데, 이들은 무엇인가?

## Aparch spark concept

### 통합
- 통합은 우리가 무슨 작업(e.g. read, sql연산, ML, stream 처리)을 하든, 동일한 연산엔진과 일관성있는 api를 제공한다는 것을 의미
- -> 따라서, sql 연산을 하든, stream 처리를 하든 우리가 사용하는 api는 크게 다르지 않음
- spark가 나오기 전에는 시스템에서 제공해주는 라이브러리와 다른 소프트웨어에서 제공하는 api를 결합해서 사용해서 데이터를 병렬 처리 하였음

### 컴퓨팅 엔진
- spark는 데이터에 대한 연산만 수행 할 뿐이다.
- -> read/write를 하기 위해서, 우리는 별도의 저장소를 따로 준비해야한다.
- 대신, spark는 여러 저장소에서 읽고 쓸 수 있는 것을 지원한다.(e.g. monogodb connector for spark, spark-es)
- -> 해당 저장소가 spark에서 읽고 쓸 수 있는지, 확인하기 위해서는 해당 저장소가 spark를 지원하는지 찾아봐야함
- 사용하는 저장소에 상관없이, 우리는 일관성 있는 api를 사용하기 때문에, `SparkConf`, `format`, `option` 부분의 인수만 저장소 스펙에 맞게 변경하면 된다.

**spark read from es**
```java
SparkConf conf = new SparkConf();
conf.set("spark.es.nodes", "your host name");
conf.set("spark.es.port", "your port number");

SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
DataSet<Row> dataset = sparkSession.read().format("org.elasticsearch.spark.sql").option("es.resource", "your index with type").load()
```

**spark read from mongodb**
```java
SparkConf conf = new SparkConf();
conf.set("spark.mongodb.read.connection.uri", "your endpoint");
SparkSession sparkSession = SparkSession().builder().config(conf).getOrCreate();
DataSet<Row> dataset = sparkSession().read().foramt("mongodb").option("spark.mongodb.read.database", "your db").option("spark.mongodb.read.collection", "your collection").load()
```

더 나아가, spark는 한 저장소에서 읽어서 다른 저장소에 저장하는 것을 지원한다.(e.g. mongodb로부터 읽어서, es에 write하기)

### 라이브러리
- spark는 표준 라이브러이와, 외부 라이브러리를 제공한다.
- -> 위의 언급된 mongodb connector for spark, spark-es는 spark에서 만든 라이브러리는 아니며, mongodb, es에서 만들었음
- spark에서 제공하는 라이브러리로는 spark sql, graphx가 있음

## Spark가 세상에 나온 이유
- CPU 발전의 한계로 인해, 병렬 컴퓨팅의 중요성
  - 특히 데이터 수집 비용이 저렴해지면서, 데이터를 많이 얻을 수 있지만, 이를 처리하는 환경은 여전히 부족했다.

## Spark history
기존에는 데이터를 병렬처리하기 위해 hadoop MR을 사용했었는데, 단점으로는 다음과 같았음
1. 각 단계별로 MR job을 개발해서 각 job을 클러스터에서 개별적으로 실행해야하는 비효율성
- iterative processing에 적합하지 않음
2. map과 reduce phase 사이에서, disk에 read/wrtie 연산 발생

이 문제를 해결하기 위해 
- spark는 여러 단계로 이루어진 애플리케이션을 쉽게 개발 할 수 있도록 함수형 기반의 api로 설계
- 연산 단계 사이에 데이터를 메모리에 저장함으로써 효율성 증가

초기에는 함수형 기반의 api에 중점을 뒀으나, 이후 버전 1.0 이후 구조화된 데이터를 기반으로 동작하는 신규 spark sql 추가

## Spark 실습
- 데이터 브릭스에서 제공하는 커뮤니티 사용했음.
- -> jupyter notebook에서 테스트하면서 배운 내용을 정리하기 위해

In [0]:
from pyspark.sql import SparkSession

In [0]:
def init_spark():
    spark = SparkSession.builder.appName("Hello world").getOrCreate()
    sc = spark.sparkContext
    return sc

def main():
    sc = init_spark()
    nums = sc.parallelize([1,2,3,4])
    print(nums.map(lambda x: x**2).collect())

main()

[1, 4, 9, 16]
