In [1]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("spark_sql_basic")
sc = SparkContext(conf=conf)

In [2]:
movies_rdd = sc.parallelize([
    (1, ("어벤져스", "마블")),
    (2, ("슈퍼맨", "DC")),
    (3, ("배트맨", "DC")),
    (4, ("겨울왕국", "디즈니")),
    (5, ("아이언맨", "마블"))
])


attendances_rdd = sc.parallelize([
    (1, (13934592, "KR")),
    (2, (2182227,"KR")),
    (3, (4226242, "KR")),
    (4, (10303058, "KR")),
    (5, (4300365, "KR"))
])

마블 영화 중 관객 수가 500만 이상인 영화를 가져오려면 2가지 방법이 있습니다.
1. `Inner Join` -> `Filter By Movie`, `Filter By attendance`
2. `Filter By Movie`, `Filter By Attendance` -> `Inner Join`

In [3]:
# CASE 1 : join 먼저. filter 나중에
movie_attendance = movies_rdd.join(attendances_rdd)
movie_attendance.filter(lambda x: x[1][0][1] == "마블" and x[1][1][0] > 5000000).collect()

[(1, (('어벤져스', '마블'), (13934592, 'KR')))]

In [4]:
# CASE 2 : filter 먼저, join 나중에
filtered_movies = movies_rdd.filter(lambda x : x[1][1] == "마블")
filtered_attendances = attendances_rdd.filter(lambda x : x[1][0] > 5000000)

filtered_movies.join(filtered_attendances).collect()

[(1, (('어벤져스', '마블'), (13934592, 'KR')))]

동일한 결과지만 filter를 먼저 수행해서 가져올 데이터를 걸러 낸 다음 join을 하는 `CASE 2`가 훨씬 효율적 입니다.

**매번 이런 고민을 하기엔 너무 시간이 아깝고, 개발자 마다 성능 차이도 심합니다. Spark SQL을 이용해 보죠**

# 1. SparkSession 만들기
- `SparkContext`에 해당하며, 새로운 스파크 어플리케이션을 만들어 준다.

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("spark-sql").getOrCreate()

In [6]:
movies = [
    (1, "어벤져스", "마블", 2012, 4, 26),
    (2, "슈퍼맨", "DC", 2013, 6, 13),
    (3, "배트맨", "DC", 2008, 8, 6),
    (4, "겨울왕국", "디즈니", 2014, 1, 16),
    (5, "아이언맨", "마블", 2008, 4, 30)
]

In [7]:
movie_schema = ["id", "name", "company", "year", "month", "day"]

# 2. 데이터 프레임 만들기

In [8]:
# 스파크가 알아서 데이터 타입을 결정지어 줍니다. -> inferSchema=True
df = spark.createDataFrame(data=movies, schema=movie_schema)

스키마의 타입 확인
* `dtypes`

In [9]:
df.dtypes

[('id', 'bigint'),
 ('name', 'string'),
 ('company', 'string'),
 ('year', 'bigint'),
 ('month', 'bigint'),
 ('day', 'bigint')]

전체 데이터 프레임 내용 확인
* `show()`
* Pandas DataFrame 처럼 데이터의 내용을 출력

In [10]:
df.show()

+---+--------+-------+----+-----+---+
| id|    name|company|year|month|day|
+---+--------+-------+----+-----+---+
|  1|어벤져스|   마블|2012|    4| 26|
|  2|  슈퍼맨|     DC|2013|    6| 13|
|  3|  배트맨|     DC|2008|    8|  6|
|  4|겨울왕국| 디즈니|2014|    1| 16|
|  5|아이언맨|   마블|2008|    4| 30|
+---+--------+-------+----+-----+---+



In [11]:
df

DataFrame[id: bigint, name: string, company: string, year: bigint, month: bigint, day: bigint]

# 3. Spark SQL 사용하기
- `createOrReplaceTempView` 함수를 이용해서 DataFrame을 table 형식으로 등록한다.

In [12]:
df.createOrReplaceTempView("movies")

영화 이름만 가져오기

In [13]:
query = """
SELECT name
FROM movies
"""

# 쿼리 실행
spark.sql(query).show()

+--------+
|    name|
+--------+
|어벤져스|
|  슈퍼맨|
|  배트맨|
|겨울왕국|
|아이언맨|
+--------+



영화 이름, 개봉 연도 가져오기

In [14]:
query = """
SELECT name, year
FROM movies
"""

# 쿼리 실행
spark.sql(query).show()

+--------+----+
|    name|year|
+--------+----+
|어벤져스|2012|
|  슈퍼맨|2013|
|  배트맨|2008|
|겨울왕국|2014|
|아이언맨|2008|
+--------+----+



2010년도 이후에 개봉한 영화의 모든 정보

In [15]:
query = """
SELECT *
FROM movies
WHERE year > 2010
"""

# 쿼리 실행
spark.sql(query).show()

+---+--------+-------+----+-----+---+
| id|    name|company|year|month|day|
+---+--------+-------+----+-----+---+
|  1|어벤져스|   마블|2012|    4| 26|
|  2|  슈퍼맨|     DC|2013|    6| 13|
|  4|겨울왕국| 디즈니|2014|    1| 16|
+---+--------+-------+----+-----+---+



마블 영화 중에 2010년도 이후에 개봉한 영화의 모든 정보

In [16]:
query = """
SELECT *
FROM movies
WHERE year > 2010 and company='마블'
"""

# 쿼리 실행
spark.sql(query).show()

+---+--------+-------+----+-----+---+
| id|    name|company|year|month|day|
+---+--------+-------+----+-----+---+
|  1|어벤져스|   마블|2012|    4| 26|
+---+--------+-------+----+-----+---+



영화의 이름이 `~맨`으로 끝나는 영화의 모든 정보

In [17]:
query = """
SELECT *
FROM movies
WHERE name like '%맨'
"""

# 쿼리 실행
spark.sql(query).show()

+---+--------+-------+----+-----+---+
| id|    name|company|year|month|day|
+---+--------+-------+----+-----+---+
|  2|  슈퍼맨|     DC|2013|    6| 13|
|  3|  배트맨|     DC|2008|    8|  6|
|  5|아이언맨|   마블|2008|    4| 30|
+---+--------+-------+----+-----+---+



개봉월이 4월 ~ 8월 사이인 영화의 모든 정보

In [18]:
query = """
SELECT *
FROM movies
WHERE month BETWEEN 4 AND 8
"""

# 쿼리 실행
spark.sql(query).show()

+---+--------+-------+----+-----+---+
| id|    name|company|year|month|day|
+---+--------+-------+----+-----+---+
|  1|어벤져스|   마블|2012|    4| 26|
|  2|  슈퍼맨|     DC|2013|    6| 13|
|  3|  배트맨|     DC|2008|    8|  6|
|  5|아이언맨|   마블|2008|    4| 30|
+---+--------+-------+----+-----+---+



`id`가 3번인 영화보다 늦게 개봉한 마블영화의 모든 정보 ( 개봉 연도만 고려 )

In [19]:
query = """
SELECT *
FROM movies
WHERE company='마블'
  AND year > ( SELECT year FROM movies WHERE id = 3)
"""

# 쿼리 실행
spark.sql(query).show()

+---+--------+-------+----+-----+---+
| id|    name|company|year|month|day|
+---+--------+-------+----+-----+---+
|  1|어벤져스|   마블|2012|    4| 26|
+---+--------+-------+----+-----+---+



연도 내림 차순으로 모든 데이터를 정렬

In [22]:
query = """
SELECT *
FROM movies
ORDER BY year ASC
"""

# 쿼리 실행
spark.sql(query).show()

+---+--------+-------+----+-----+---+
| id|    name|company|year|month|day|
+---+--------+-------+----+-----+---+
|  3|  배트맨|     DC|2008|    8|  6|
|  5|아이언맨|   마블|2008|    4| 30|
|  1|어벤져스|   마블|2012|    4| 26|
|  2|  슈퍼맨|     DC|2013|    6| 13|
|  4|겨울왕국| 디즈니|2014|    1| 16|
+---+--------+-------+----+-----+---+



* `count` : 개수 세기
* `mean` : 평균 구하기
* `sum` : 총 합

DC 영화의 개수

In [23]:
query = """
SELECT count(*)
FROM movies
WHERE company='DC'
"""

# 쿼리 실행
spark.sql(query).show()

+--------+
|count(1)|
+--------+
|       2|
+--------+



마블과 디즈니 영화의 개수

In [25]:
query = """
SELECT count(*)
FROM movies
WHERE company IN ('마블','디즈니')
"""

# 쿼리 실행
spark.sql(query).show()

+--------+
|count(1)|
+--------+
|       3|
+--------+



# Join 구현하기

In [26]:
attendances = [
    (1, 13934592., "KR"),
    (2, 2182227.,"KR"),
    (3, 4226242., "KR"),
    (4, 10303058., "KR"),
    (5, 4300365., "KR")
]

개발자가 직접 스키마 정의

In [28]:
# 스키마 타입 불러오기

# 자료형 타입 불러오기
from pyspark.sql.types import StringType, FloatType, IntegerType

# 구조를 만들기 위한 타입 불러오기(필수)
from pyspark.sql.types import StructType, StructField

In [31]:
# StructType : 스키마를 구성하는 모든 정보들을 묶어주는 역할
attendances_schema = StructType([
    StructField("id", IntegerType(), True), # StructField : 컬럼의 정보를 구조화 시켜주는 역할(컬럼명, 컬럼 타입, Null 허용 여부 등....)
    StructField("attendance", FloatType(), True),
    StructField("theater_country", StringType(), True)
])

In [33]:
att_df = spark.createDataFrame(data=attendances, schema=attendances_schema)
att_df.dtypes

[('id', 'int'), ('attendance', 'float'), ('theater_country', 'string')]

In [34]:
att_df.createOrReplaceTempView("att")

In [35]:
query = """
SELECT *
FROM movies
JOIN att ON movies.id = att.id
"""

spark.sql(query).show()

+---+--------+-------+----+-----+---+---+-----------+---------------+
| id|    name|company|year|month|day| id| attendance|theater_country|
+---+--------+-------+----+-----+---+---+-----------+---------------+
|  5|아이언맨|   마블|2008|    4| 30|  5|  4300365.0|             KR|
|  1|어벤져스|   마블|2012|    4| 26|  1|1.3934592E7|             KR|
|  3|  배트맨|     DC|2008|    8|  6|  3|  4226242.0|             KR|
|  2|  슈퍼맨|     DC|2013|    6| 13|  2|  2182227.0|             KR|
|  4|겨울왕국| 디즈니|2014|    1| 16|  4|1.0303058E7|             KR|
+---+--------+-------+----+-----+---+---+-----------+---------------+



In [36]:
spark.stop()
sc.stop()