## 2.5 Spark Session
- Spark Application 은 SparkSession 이라는 드라이버 프로세스로 제어한다. 

In [0]:
spark

## 2.6 Spark DataFrame
- 구조적 API 
- 테이블의 데이터를 row , column 으로 표현 

In [0]:
%scala
val myRange = spark.range(1000).toDF("n")

In [0]:
# 한 개의 column 과 1000 개의 row 
myRange = spark.range(1000).toDF("number")

In [0]:
myRange.display()

number
0
1
2
3
4
5
6
7
8
9


In [0]:
%scala
myRange.show()

## 2.7 트랜스포메이션
- 좁은 의존성 (narrow dependency)
- 넓은 의존성 (wide dependency)

In [0]:
divisBy2 = myRange.where("number % 2 = 0")

## 2.8 액션
- 실제 연산을 수행하려면 action 명령을 내려야 한다. 
- 일련의 transformation 으로부터 결과를 계산하도록 지시하는 명령어

In [0]:
divisBy2.count()

Out[18]: 500

## 2.10 종합 예제
- SparkSession 의 DataFrameReader 클래스를 사용해서 데이터를 읽는다. 

In [0]:
flightData2015 = spark\
    .read\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .csv("dbfs:/FileStore/shared_uploads/mjwoo0@naver.com/2015_summary.csv")

In [0]:
flightData2015.display()

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62
United States,Singapore,1
United States,Grenada,62
Costa Rica,United States,588
Senegal,United States,40
Moldova,United States,1


In [0]:
flightData2015.take(3)

Out[21]: [Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

- Spark 의 실행 계획 : explain() 

In [0]:
flightData2015.sort("count").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#169 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#169 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=275]
      +- FileScan csv [DEST_COUNTRY_NAME#167,ORIGIN_COUNTRY_NAME#168,count#169] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/FileStore/shared_uploads/mjwoo0@naver.com/2015_summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




- spark 는 기본적으로 셔플 수행 시 200개의 셔플 파티션을 생성한다. 
- 아래의 코드는 shuffle partition 개수 값을 5로 설정하여 출력 파티션 수를 줄인다. 

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

- count 순으로 정렬하기 

In [0]:
flightData2015.sort("count").take(2)

Out[24]: [Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

## SQL 사용하기 
- DataFrame 을 Table / View 로 만들어 SQL 문을 사용

In [0]:
flightData2015.createOrReplaceTempView("flight_data_2015")

- Spark 는 SQL query 를 DataFrame 코드와 같은 실행 계획으로 컴파일 하므로 두가지 방법의 성능 차이가 없다. 

In [0]:
sqlWay = spark.sql("""
    SELECT DEST_COUNTRY_NAME, count(1)
    FROM flight_data_2015     
    GROUP BY DEST_COUNTRY_NAME              
""")

dataFrameWay = flightData2015\
    .groupBy("DEST_COUNTRY_NAME")\
    .count()

In [0]:
sqlWay.explain()
dataFrameWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) HashAggregate(keys=[DEST_COUNTRY_NAME#167], functions=[finalmerge_count(merge count#324L) AS count(1)#312L])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 0, Statistics(sizeInBytes=4.6 KiB, rowCount=132, isRuntime=true)
         +- Exchange hashpartitioning(DEST_COUNTRY_NAME#167, 5), ENSURE_REQUIREMENTS, [plan_id=404]
            +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#167], functions=[partial_count(1) AS count#324L])
               +- FileScan csv [DEST_COUNTRY_NAME#167] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/FileStore/shared_uploads/mjwoo0@naver.com/2015_summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>
+- == Initial Plan ==
   HashAggregate(keys=[DEST_COUNTRY_NAME#167], functions=[finalmerge_count(merge count#324L) AS count(1)#312L])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#16

In [0]:
spark.sql("SELECT max(count) from flight_data_2015").take(1)

Out[33]: [Row(max(count)=370002)]

In [0]:
from pyspark.sql.functions import max 

flightData2015.select(max("count")).take(1)

Out[34]: [Row(max(count)=370002)]

In [0]:
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME 
ORDER BY sum(count) DESC 
LIMIT 5
""")

maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [0]:
from pyspark.sql.functions import desc 

flightData2015\
    .groupBy("DEST_COUNTRY_NAME")\
    .sum("count")\
    .withColumnRenamed("sum(count)", "destination_total")\
    .sort(desc("destination_total"))\
    .limit(5)\
    .show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [0]:
flightData2015\
    .groupBy("DEST_COUNTRY_NAME")\
    .sum("count")\
    .withColumnRenamed("sum(count)", "destination_total")\
    .sort(desc("destination_total"))\
    .limit(5)\
    .explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[destination_total#431L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#167,destination_total#431L])
   +- HashAggregate(keys=[DEST_COUNTRY_NAME#167], functions=[finalmerge_sum(merge sum#435L) AS sum(count#169)#427L])
      +- Exchange hashpartitioning(DEST_COUNTRY_NAME#167, 5), ENSURE_REQUIREMENTS, [plan_id=618]
         +- HashAggregate(keys=[DEST_COUNTRY_NAME#167], functions=[partial_sum(count#169) AS sum#435L])
            +- FileScan csv [DEST_COUNTRY_NAME#167,count#169] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/FileStore/shared_uploads/mjwoo0@naver.com/2015_summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>


