In [1]:
import findspark

findspark.init()

import pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, transform

In [3]:
spark = SparkSession.builder.appName("MoviesAnalysis") \
    .config("spark.driver.host", "127.0.0.1") \
    .config("spark.driver.bindAddress", "127.0.0.1").getOrCreate()

23/08/17 05:27:27 WARN Utils: Your hostname, SeongGils-MacBook-Pro-2.local resolves to a loopback address: 127.0.0.1; using 192.168.219.132 instead (on interface en0)
23/08/17 05:27:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/08/17 05:27:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/08/17 05:27:30 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [7]:
movie_data = spark \
    .read \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .csv("hdfs://localhost:9000/data/movies/KC_KOBIS_BOX_OFFIC_MOVIE_INFO_202105.csv")


                                                                                

In [10]:
movie_data.head()

Row(SN=1, MOVIE_NM='분노의 질주: 더 얼티메이트', MNG_NM='저스틴 린', MAKR_NM=None, IMPORT_CMPNY_NM='유니버설픽쳐스인터내셔널 코리아(유)', DISTB_CMPNY_NM='유니버설픽쳐스인터내셔널 코리아(유)', OPEN_DE=20210519, MOVIE_TY_NM='개봉영화', MOVIE_STLE_NM='장편', NLTY_NM='미국', WNTY_SCREEN_CO=2296, WNTY_SELNG_AM=17268076580, WNTY_AUDE_CO=1790155, SU_SELNG_AM=4197011990, SU_AUDE_CO=423112, GENRE_NM='액션', GRAD_NM='12세이상관람가', MOVIE_SE='일반영화')

In [12]:
### 함수를 통한 실행계획과 SQL의 실행 계획 차이 비교

# csv -> View
movie_data.createOrReplaceTempView("movie_data")

In [16]:
# 함수를 통한 호출 실행계획
movie_data.groupBy("NLTY_NM") \
    .count() \
    .explain()

## daptiveSparkPlan isFinalPlan=false:
# 이 부분은 적응형 실행 계획의 시작
# 적응형 실행 계획은 실행 중에 최적의 처리 경로를 선택할 수 있는 기능을 제공합니다. isFinalPlan=false는 이 계획이 아직 최종적인 것이 아니라는 것

## HashAggregate(keys=[NLTY_NM#79], functions=[count(1)]):
# 데이터를 그룹화, 집계 함수를 적용 group by NLTY_NM, count() 중간 결과로 활용

## Exchange hashpartitioning(NLTY_NM#79, 200), ENSURE_REQUIREMENTS, [plan_id=86]:
# 데이터의 분산처리를 위해 데이터를 교환하는 단계,  'NLTY_NM' 열을 기준으로 해시 파티셔닝을 수행하며, 200개의 파티션으로 데이터를 분산
# ENSURE_REQUIREMENTS는 실행 환경의 요구 사항을 충족시키기 위한 추가 작업을 수행

## ashAggregate(keys=[NLTY_NM#79], functions=[partial_count(1)]):
# 이 단계는 이전 단계에서 분산된 데이터를 다시 그룹화하고 집계 함수를 적용
# partial_count(1) 집계 함수를 사용하여 각 그룹 내의 레코드 수를 세는 작업을 수행합니다. 'NLTY_NM' 열을 기준으로 그룹화


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[NLTY_NM#79], functions=[count(1)])
   +- Exchange hashpartitioning(NLTY_NM#79, 200), ENSURE_REQUIREMENTS, [plan_id=125]
      +- HashAggregate(keys=[NLTY_NM#79], functions=[partial_count(1)])
         +- FileScan csv [NLTY_NM#79] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[hdfs://localhost:9000/data/movies/KC_KOBIS_BOX_OFFIC_MOVIE_INFO_202105..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<NLTY_NM:string>


In [17]:
# Query를 통한 호출 실행계획
spark.sql("""
SELECT NLTY_NM, count(1)
from movie_data as t1 group by t1.NLTY_NM 
""").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[NLTY_NM#79], functions=[count(1)])
   +- Exchange hashpartitioning(NLTY_NM#79, 200), ENSURE_REQUIREMENTS, [plan_id=138]
      +- HashAggregate(keys=[NLTY_NM#79], functions=[partial_count(1)])
         +- FileScan csv [NLTY_NM#79] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[hdfs://localhost:9000/data/movies/KC_KOBIS_BOX_OFFIC_MOVIE_INFO_202105..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<NLTY_NM:string>


In [23]:
q = """
select t1.NLTY_NM,
sum(t1.WNTY_SCREEN_CO) as screen_total
from movie_data as t1
group by t1.NLTY_NM
order by screen_total desc
limit 5
"""
spark.sql(q).show()

+-------+------------+
|NLTY_NM|screen_total|
+-------+------------+
|   미국|       14242|
|   한국|       12075|
|   일본|        3169|
|   영국|        1003|
| 헝가리|         691|
+-------+------------+


                                                                                

In [26]:
spark.sql(q).explain()

### dataframe으로 sql 실행 시 실행 계획
# csv -(read)> dataframe -(group by)> dataframe -(sum)> dataframe -(change column name)> dataframe -(sort)> dataframe -(limit)> dataframe -(collect)> Array()  

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[screen_total#422L DESC NULLS LAST], output=[NLTY_NM#79,screen_total#422L])
   +- HashAggregate(keys=[NLTY_NM#79], functions=[sum(WNTY_SCREEN_CO#80)])
      +- Exchange hashpartitioning(NLTY_NM#79, 200), ENSURE_REQUIREMENTS, [plan_id=445]
         +- HashAggregate(keys=[NLTY_NM#79], functions=[partial_sum(WNTY_SCREEN_CO#80)])
            +- FileScan csv [NLTY_NM#79,WNTY_SCREEN_CO#80] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[hdfs://localhost:9000/data/movies/KC_KOBIS_BOX_OFFIC_MOVIE_INFO_202105..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<NLTY_NM:string,WNTY_SCREEN_CO:int>


In [31]:
type(spark.sql(q).collect())

## before show() : 이전까지 트렌스포메이션은 하나 또는 그룹의 데이터 프레임을 반환
## after show() : 이후에는 이제까지 나온 Dataframe을 사용하는 언어ㅔ 맞게 array or list로 변환

list