In [1]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("spark_sql_basic")
sc = SparkContext(conf=conf)
sc

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/28 09:43:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
movies_rdd = sc.parallelize([
    (1, ("어벤져스", "마블")),
    (2, ("슈퍼맨", "DC")),
    (3, ("배트맨", "DC")),
    (4, ("겨울왕국", "디즈니")),
    (5, ("아이언맨", "마블"))
])

attendances_rdd = sc.parallelize([
    (1, (13934592, "KR")),
    (2, (2182227,"KR")),
    (3, (4226242, "KR")),
    (4, (10303058, "KR")),
    (5, (4300365, "KR"))
])

In [3]:
# 마블 영화 중 관객수가 500만 이상인 영화 가져오기

In [5]:
movies = movies_rdd.join(attendances_rdd)
movies.filter(lambda x : x[1][0][1]=='마블' and x[1][1][0] >= 5000000).collect()


[(1, (('어벤져스', '마블'), (13934592, 'KR')))]

In [6]:
# movies2 = movies_rdd.filter(lambda x : x[1][1] == "마블")
# filter

In [7]:
sc.stop()

# SparkSession 생성
- SparkContext에 해당하며, 새로운 스파크 어플리에키션을 만들어준다.

In [10]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("spark-sql").getOrCreate()
spark

In [11]:
movies = [
    (1, "어벤져스", "마블", 2012, 4, 26),
    (2, "슈퍼맨", "DC", 2013, 6, 13),
    (3, "배트맨", "DC", 2008, 8, 6),
    (4, "겨울왕국", "디즈니", 2014, 1, 16),
    (5, "아이언맨", "마블", 2008, 4, 30)
]

In [12]:
movie_schema = ["id","name","company","year","month","day"]

# 2. 데이터 프레임 생성

In [16]:
movie_sdf = spark.createDataFrame(data=movies, schema = movie_schema)
movie_sdf 
# 판다스와 스파크 데이터 프레임의 차이점은, 이렇게 정의한다고 해서 만들어지는 것이 아님
# 앞서 RDD에서 그래왔듯이 action을 해야 만들어짐

DataFrame[id: bigint, name: string, company: string, year: bigint, month: bigint, day: bigint]

In [19]:
# 스키마 정보 확인
movie_sdf.dtypes # 컬럼의 자료형 출력

[('id', 'bigint'),
 ('name', 'string'),
 ('company', 'string'),
 ('year', 'bigint'),
 ('month', 'bigint'),
 ('day', 'bigint')]

In [23]:
# collect()에 해당하는 show() -> show 는 action이다!
movie_sdf.show()

+---+--------+-------+----+-----+---+
| id|    name|company|year|month|day|
+---+--------+-------+----+-----+---+
|  1|어벤져스|   마블|2012|    4| 26|
|  2|  슈퍼맨|     DC|2013|    6| 13|
|  3|  배트맨|     DC|2008|    8|  6|
|  4|겨울왕국| 디즈니|2014|    1| 16|
|  5|아이언맨|   마블|2008|    4| 30|
+---+--------+-------+----+-----+---+



# 3. Spark SQL 사용하기
- `createOrReplaceTempView(view의 이름 기입)` 함수를 이용해서 데이터프레임에 SQL을 사용할 수 있는 View를 만들어 준다.

In [25]:
movie_sdf.createOrReplaceTempView("movies")

In [43]:
query = """
SELECT name
FROM movies
"""

# 쿼리 실행하기 (Transformations)
result = spark.sql(query)

+--------+
|    name|
+--------+
|어벤져스|
|  슈퍼맨|
|  배트맨|
|겨울왕국|
|아이언맨|
+--------+



In [44]:
result.show()

+--------+
|    name|
+--------+
|어벤져스|
|  슈퍼맨|
|  배트맨|
|겨울왕국|
|아이언맨|
+--------+



In [53]:
# 영화 이름, 개봉 연도 가져오기
query1 = """
SELECT name,year,month,day
FROM movies
"""
spark.sql(query1).show()

+--------+----+-----+---+
|    name|year|month|day|
+--------+----+-----+---+
|어벤져스|2012|    4| 26|
|  슈퍼맨|2013|    6| 13|
|  배트맨|2008|    8|  6|
|겨울왕국|2014|    1| 16|
|아이언맨|2008|    4| 30|
+--------+----+-----+---+



In [56]:
# 2010년도 이후에 개봉한 영화의 모든 정보
query2 = """
SELECT *
FROM MOVIES
WHERE YEAR > 2010
"""
spark.sql(query2).show()

+---+--------+-------+----+-----+---+
| id|    name|company|year|month|day|
+---+--------+-------+----+-----+---+
|  1|어벤져스|   마블|2012|    4| 26|
|  2|  슈퍼맨|     DC|2013|    6| 13|
|  4|겨울왕국| 디즈니|2014|    1| 16|
+---+--------+-------+----+-----+---+



In [58]:
# 2010년도 이후에 개봉한 마블 영화의 모든정보
query3 = """
SELECT *
FROM MOVIES
WHERE COMPANY = "마블"
    AND YEAR > 2010
"""

spark.sql(query3).show()

+---+--------+-------+----+-----+---+
| id|    name|company|year|month|day|
+---+--------+-------+----+-----+---+
|  1|어벤져스|   마블|2012|    4| 26|
+---+--------+-------+----+-----+---+



In [60]:
# ~맨으로 끝나는 영화의 모든 정보
query4 = """
SELECT *
FROM MOVIES
WHERE SUBSTR(NAME,-1,1) = "맨"
"""
spark.sql(query4).show()

+---+--------+-------+----+-----+---+
| id|    name|company|year|month|day|
+---+--------+-------+----+-----+---+
|  2|  슈퍼맨|     DC|2013|    6| 13|
|  3|  배트맨|     DC|2008|    8|  6|
|  5|아이언맨|   마블|2008|    4| 30|
+---+--------+-------+----+-----+---+



In [71]:
# id가 3번인 영화보다 늦게 개봉한 마블영화의 모든 정보(연도만 고려할 것)

query5 = """
SELECT *
FROM MOVIES
WHERE COMPANY = "마블"
    AND YEAR > (SELECT YEAR
                FROM MOVIES
                WHERE ID = 3)
"""
spark.sql(query5).show()

+---+--------+-------+----+-----+---+
| id|    name|company|year|month|day|
+---+--------+-------+----+-----+---+
|  1|어벤져스|   마블|2012|    4| 26|
+---+--------+-------+----+-----+---+



# JOIN 구현

In [72]:
attendances = [
    (1, 13934592., "KR"),
    (2, 2182227.,"KR"),
    (3, 4226242., "KR"),
    (4, 10303058., "KR"),
    (5, 4300365., "KR")
]

In [76]:
# 자료형 타입 불러오기
from pyspark.sql.types import StringType, FloatType, IntegerType  # << 컬럼의 데이터 타입임

# 구조를 만들기 위한 타입 불러오기(필수), 칼럼 순서 등
from pyspark.sql.types import StructField, StructType # StructField는 칼럼을 의미하고, StructType은 데이터프레임을 의미

In [78]:
att_schema = StructType([
    StructField("id",IntegerType(),True),
    StructField("attendance",FloatType(),True),
    StructField("country",StringType(),True)
])

In [80]:
att_df = spark.createDataFrame(data=attendances, schema=att_schema)
att_df

DataFrame[id: int, attendance: float, country: string]

In [82]:
att_df.show()

+---+-----------+-------+
| id| attendance|country|
+---+-----------+-------+
|  1|1.3934592E7|     KR|
|  2|  2182227.0|     KR|
|  3|  4226242.0|     KR|
|  4|1.0303058E7|     KR|
|  5|  4300365.0|     KR|
+---+-----------+-------+



In [83]:
att_df.createOrReplaceTempView("att")

In [89]:
# movies, att id를 기반으로 join

query = """
SELECT *
FROM att a, movies b
WHERE a.id = b.id
ORDER BY a.id 
"""
spark.sql(query).show()

+---+-----------+-------+---+--------+-------+----+-----+---+
| id| attendance|country| id|    name|company|year|month|day|
+---+-----------+-------+---+--------+-------+----+-----+---+
|  1|1.3934592E7|     KR|  1|어벤져스|   마블|2012|    4| 26|
|  2|  2182227.0|     KR|  2|  슈퍼맨|     DC|2013|    6| 13|
|  3|  4226242.0|     KR|  3|  배트맨|     DC|2008|    8|  6|
|  4|1.0303058E7|     KR|  4|겨울왕국| 디즈니|2014|    1| 16|
|  5|  4300365.0|     KR|  5|아이언맨|   마블|2008|    4| 30|
+---+-----------+-------+---+--------+-------+----+-----+---+



In [90]:
spark.stop()