In [1]:
# 세션확인
spark

### 2.5 SparkSession

- spark 코드:
  <pre><code> val myRange = spark.range(1000).toDF("number")</code></pre>

- 1 x 1000 데이터 프레임 각각 값 0~999 생성. 
- spark의 DF는 단일 컴퓨터에 저장하기에 데이터가 크거나 계산에 오랜 시간이 걸려 여러 컴퓨터에 분산.
  - 스프레드시트와 R과 파이썬에서의 DF는 일반적으로 분산컴퓨터가 아닌 단일 컴퓨터에 존재.

- spark는 Dataset, Dataframe, SQL 테이블, RDD라는 핵심 추상화 개념을 가짐.
  - df가 가장 쉽고 효율적. 2부 마지막에 Dataset, 3부에서 RDD를 다룰 예정.

In [3]:
myRange = spark.range(1000).toDF("number")

### 2.7 트랜스포메이션

- 스파크의 핵심 데이터 구조는 한번 생성하면 변경할 수 없는 불변성(immutable)을 가짐.
  - 변경을 위해서 트랜스포메이션 명령
  
- 스칼라 코드
<pre><code> val divisBy2 = myRange.where("number % 2 = 0") </code></pre>

- transformation의 2가지유형
  - narrow dependency: 각 입력의 파티션이 하나의 출력 파티션에만 영향을 미침 (ex: where)
    - pipelining을 자동으로 수행, 모든 작업이 메모리에서 일어남.
  - wide dependency: 하나의 입력 파티션이 여러 출력 파티션에 영향 (ex: shuffle(클러스터에서 파티션을 교환))
    - 셔플의 결과를 디스크에 저장. 중요한 주제임.
    - 파티션: 클러스터의 물리적 머신에 존재하는 row의 집합

In [5]:
divisBy2 = myRange.where("number % 2 = 0")

- 실제 연산을 수행을 위해 액션 명령. (트랜스포메이션은 논리적 실행 계획)
  - 결과를 계산하도록 지시하는 명령.
  - 액션 지정할 때 job이 시작됨.
    - job은 필터(좁은 트렌스포메이션)을 수행한 후 파티션별로 레코드 수를 카운트(넓은 트랜스포메이션)한다.
  
### 2.8 액션  

- 액션의 유형:
  - 콘솔에서 데이터를 보는 액션
  - 각 언어로 된 네이티브 객체에 데이터를 모으는 액션
  - 출력 데이터소스에 저장하는 액션

In [7]:
divisBy2.count()

### 2.10 종합

- inferSchema: DataFrame의 스키마 정보를 알아내는 스키마 추론 기능

In [9]:
flightData2015 = spark\
  .read\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .csv("/databricks-datasets/definitive-guide/data/flight-data/csv/2015-summary.csv")

In [10]:
# flightData2015.head(5)
flightData2015.take(5)

- read: 좁은 트랜스포메이션
- sort: 넓은 트랜스포메이션
  - df를 변경하지 않고 이전 df를 변환하여 새로운 df를 생성해 반환

- explain: df의 계보(lineage)나 스파크 쿼리 실행 계획을 확인 가능.
  - explain 결과의 sort exchange filescan을 주목.

In [12]:
flightData2015.sort("count").explain()

In [13]:
spark.conf.set("spark.sql.shuffle.partitions","5")

flightData2015.sort("count").take(2)

- 스파크는 셔플 수행 시 기본적으로 200개의 셔플 파티션 생성.
- 아래 코드를 통해 셔플 파티션 5개로 설정해 셔플 출력 파티션 줄이기 가능

<pre><code>spark.conf.set("spark.sql.shuffle.partitions","5")</code></pre>


- 스파크는 계보를 통해 입력 데이터에 수행한 연산을 전체 파티션에 어떻게 재연산하는지 알 수 있음.
  - 이 기능이 스파크 프로그래밍 모델인 함수형 프로그래밍의 핵심.
  - 함수형 프로그래밍은 데이터의 변환 규칙이 일정한 경우 같은 입력에 대해 항상 같은 출력 생성.
  
- 사용자는 물리적 데이터를 직접 다루지 않고, 셔플 파티션 파라미터와 같은 물리적 실행 특성을 제어.
  - 스파크 UI를 통해 잡의 실행 상태와 잡의 물리적, 논리적 실행 특성 확인 가능.

In [15]:
flightData2015.sort("count").explain()

### 2.10.1 DataFrame과 SQL

- 사용자가 SQL이나 R, 자바, 파이썬, 스칼라의 데이터프레임으로 비즈니스로직을 표현하면 스파크에서 실제 코드를 실행하기 전에 로직을 기본 실행계획으로 컴파일 한다.
  - explain으로 실행 계획 확인 가능
  
- 스파크 SQL로 DF를 테이블이나 뷰로 등록하면 SQL 사용가능.
  - 성능차이는 없음

In [17]:
flightData2015.createOrReplaceTempView("flight_data_2015")

- 두 실행 계획이 동일한 기본 실행 계획으로 컴파일 되는 것을 확인 가능.

In [19]:
# 파이썬 코드
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")

dataFrameWay = flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .count()

sqlWay.explain()
dataFrameWay.explain()


In [20]:
from pyspark.sql.functions import max

flightData2015.select(max("count")).take(1)

In [21]:
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")

maxSql.show()


In [22]:
maxSql.explain()

- 실행계획이 총 7단계이다. 
  - csv파일을 read $\to$ df를 group by $\to$ df를 sum $\to$ df 컬럼명 변경 $\to$ df sort $\to$ df limit $\to$ df collect $\to$ array(...)

- explain에서 출력된 실행계획은 물리적 실행 시점에서 수행하는 최적화로 인해 다르지만, 모든 부분을 포함하고 있음.
  - partial_sum 함수를 호출할 때 집계가 2 단계로 나누어짐.
  - 2단계로 나누어지는 이유는 숫자 목록의 합을 구하는 연산이 가환성(commutative)을 갖고 있어 합계 연산시 파티션별 처리가 가능하기 때문.

In [24]:
from pyspark.sql.functions import desc

flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .sum("count")\
  .withColumnRenamed("sum(count)", "destination_total")\
  .sort(desc("destination_total"))\
  .limit(5)\
  .show()

In [25]:
flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .sum("count")\
  .withColumnRenamed("sum(count)", "destination_total")\
  .sort(desc("destination_total"))\
  .limit(5)\
  .explain()