In [1]:
import pyspark

In [6]:
from pyspark.sql import SparkSession

In [7]:
spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()

In [8]:
spark

In [9]:
# DataFrame 생성 - 대표적인 구조적 API
myRange = spark.range(1000).toDF("number")
myRange

DataFrame[number: bigint]

In [10]:
# 트랜스포메이션 - 아직 실행 X
disvisBy2 = myRange.where("number % 2 = 0")
disvisBy2

DataFrame[number: bigint]

In [15]:
# 액션 - 실제 실행
import time

start = time.time()
print(myRange.count())
print(f"{time.time()-start:.2f} sec")

start = time.time()
print(disvisBy2.count())
print(f"{time.time()-start:.2f} sec")

start = time.time()
print(disvisBy2.count())
print(f"{time.time()-start:.2f} sec")

1000
2.27 sec
500
0.40 sec
500
0.19 sec


## flightData2015

In [16]:
flightData2015 = spark\
    .read\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .csv("../data/flight-data/csv/2015-summary.csv")

In [17]:
flightData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [19]:
type(flightData2015)

pyspark.sql.dataframe.DataFrame

In [21]:
flightData2015.__sizeof__()

32

In [49]:
flightData2015.sort("count").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#40 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#40 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#141]
      +- FileScan csv [DEST_COUNTRY_NAME#38,ORIGIN_COUNTRY_NAME#39,count#40] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/hyeon/Desktop/side-project/SparkTheDefinitiveGuide/data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [53]:
# 스파크는 셔플 수행 시 기본적으로 200개의 셔플 파티션을 생성 (넓은 트랜스포메이션)
# 이를 5로 설정하여 셔플의 출력 파티션 수를 줄인다.
spark.conf.set("spark.sql.suffle.partitions","5")
flightData2015.sort('count').take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [54]:
# DataFrame을 테이블이나 뷰로 만드는 메서드
flightData2015.createOrReplaceTempView("flight_data_2015")

In [55]:
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")

In [56]:
dataFrameWay = flightData2015\
    .groupBy("DEST_COUNTRY_NAME")\
    .count()

In [57]:
sqlWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#38], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#38, 200), ENSURE_REQUIREMENTS, [id=#181]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#38], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#38] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/hyeon/Desktop/side-project/SparkTheDefinitiveGuide/data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>




In [58]:
dataFrameWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#38], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#38, 200), ENSURE_REQUIREMENTS, [id=#194]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#38], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#38] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/hyeon/Desktop/side-project/SparkTheDefinitiveGuide/data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>




In [59]:
# max 함수를 통해 특정 위치 왕래하는 최대 비행 횟수
# max는 트랜스포메이션이다.
from pyspark.sql.functions import max

flightData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

In [60]:
# SQL 구문
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")
maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [61]:
## DataFrame 구문
from pyspark.sql.functions import desc

flightData2015\
    .groupBy("DEST_COUNTRY_NAME")\
    .sum("count")\
    .withColumnRenamed("sum(count)","destination_total")\
    .sort(desc("destination_total"))\
    .limit(5)\
    .show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [62]:
flightData2015\
    .groupBy("DEST_COUNTRY_NAME")\
    .sum("count")\
    .withColumnRenamed("sum(count)","destination_total")\
    .sort(desc("destination_total"))\
    .limit(5)\
    .explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[destination_total#134L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#38,destination_total#134L])
   +- HashAggregate(keys=[DEST_COUNTRY_NAME#38], functions=[sum(count#40)])
      +- Exchange hashpartitioning(DEST_COUNTRY_NAME#38, 200), ENSURE_REQUIREMENTS, [id=#362]
         +- HashAggregate(keys=[DEST_COUNTRY_NAME#38], functions=[partial_sum(count#40)])
            +- FileScan csv [DEST_COUNTRY_NAME#38,count#40] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/hyeon/Desktop/side-project/SparkTheDefinitiveGuide/data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>


