In [1]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("spark_sql_basic2")
sc = SparkContext(conf=conf)

In [2]:
# RDD만을 이용한 데이터 추출

In [5]:
movies_rdd = sc.parallelize([
    (1, ("어벤져스", "마블")),
    (2, ("슈퍼맨", "DC")),
    (3, ("배트맨", "DC")),
    (4, ("겨울왕국", "디즈니")),
    (5, ("아이언맨", "마블"))
])

attendances_rdd = sc.parallelize([
    (1, (13934592, "KR")),   # 관객수
    (2, (2182227,"KR")),
    (3, (4226242, "KR")),
    (4, (10303058, "KR")),
    (5, (4300365, "KR"))
])

In [6]:
# 마블 영화 중 관객 수가 500만 이상인 영화를 가져오기

In [7]:
# CASE1. join 먼저, filter 나중에
movie_att = movies_rdd.join(attendances_rdd)
movie_att.take(3)

[(2, (('슈퍼맨', 'DC'), (2182227, 'KR'))),
 (4, (('겨울왕국', '디즈니'), (10303058, 'KR'))),
 (1, (('어벤져스', '마블'), (13934592, 'KR')))]

In [8]:
movie_att.filter(
    lambda x : x[1][0][1] == "마블" and x[1][1][0] > 5000000
).collect()

[(1, (('어벤져스', '마블'), (13934592, 'KR')))]

In [9]:
# CASE 2. filter 먼저, join 나중에
filtered_movies = movies_rdd.filter(lambda x : x[1][1] == '마블')
filtered_att = attendances_rdd.filter(lambda x : x[1][0] > 5000000)

filtered_movies.join(filtered_att).collect()

[(1, (('어벤져스', '마블'), (13934592, 'KR')))]

In [10]:
sc.stop()

# 데이터 프레임 만들기

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("spark-sql").getOrCreate()

In [None]:
# 컬럼 추가
movies = [
    (1, "어벤져스", "마블", 2012, 4, 26),
    (2, "슈퍼맨", "DC", 2013, 6, 13),
    (3, "배트맨", "DC", 2008, 8, 6),
    (4, "겨울왕국", "디즈니", 2014, 1, 16),
    (5, "아이언맨", "마블", 2008, 4, 30)
]

In [None]:
#스키마를 알아야 한다.
movie_schema = ["id", "name", "company", "year", "month", "day"]

In [5]:
df = spark.createDataFrame(data=movies, schema=movie_schema)

In [6]:
df.dtypes

[('id', 'bigint'),
 ('name', 'string'),
 ('company', 'string'),
 ('year', 'bigint'),
 ('month', 'bigint'),
 ('day', 'bigint')]

In [23]:
df.show()

+---+--------+-------+----+-----+---+
| id|    name|company|year|month|day|
+---+--------+-------+----+-----+---+
|  1|어벤져스|   마블|2012|    4| 26|
|  2|  슈퍼맨|     DC|2013|    6| 13|
|  3|  배트맨|     DC|2008|    8|  6|
|  4|겨울왕국| 디즈니|2014|    1| 16|
|  5|아이언맨|   마블|2008|    4| 30|
+---+--------+-------+----+-----+---+



In [7]:
df.select("name").show()

+--------+
|    name|
+--------+
|어벤져스|
|  슈퍼맨|
|  배트맨|
|겨울왕국|
|아이언맨|
+--------+



In [8]:
df.filter(df.year >= 2010).show()

+---+--------+-------+----+-----+---+
| id|    name|company|year|month|day|
+---+--------+-------+----+-----+---+
|  1|어벤져스|   마블|2012|    4| 26|
|  2|  슈퍼맨|     DC|2013|    6| 13|
|  4|겨울왕국| 디즈니|2014|    1| 16|
+---+--------+-------+----+-----+---+



In [9]:
# 년월일
df.select(['year', 'month', 'day']).show()

+----+-----+---+
|year|month|day|
+----+-----+---+
|2012|    4| 26|
|2013|    6| 13|
|2008|    8|  6|
|2014|    1| 16|
|2008|    4| 30|
+----+-----+---+



In [10]:
# 2013년 이후 영화
df.select('name').filter(df.year > 2013).show()

+--------+
|    name|
+--------+
|겨울왕국|
+--------+



In [11]:
# 마블영화, DC 같이 꺼내기
df.select(['name', 'company']).filter((df.company == '마블') | (df.company == 'DC')).show()

+--------+-------+
|    name|company|
+--------+-------+
|어벤져스|   마블|
|  슈퍼맨|     DC|
|  배트맨|     DC|
|아이언맨|   마블|
+--------+-------+



In [12]:
spark.stop()

# Spark SQL 사용해 보기

In [13]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("spark-sql").getOrCreate()

In [14]:
# 컬럼 추가
movies = [
    (1, "어벤져스", "마블", 2012, 4, 26),
    (2, "슈퍼맨", "DC", 2013, 6, 13),
    (3, "배트맨", "DC", 2008, 8, 6),
    (4, "겨울왕국", "디즈니", 2014, 1, 16),
    (5, "아이언맨", "마블", 2008, 4, 30)
]

In [17]:
#스키마를 알아야 한다.
movie_schema = ["id", "name", "company", "year", "month", "day"]

In [18]:
df = spark.createDataFrame(data=movies, schema=movie_schema)

In [19]:
df.createOrReplaceTempView("movies")  # 뷰의 이름

In [20]:
# 영화 이름만 가져오기

query = """

SELECT name
  FROM movies

"""
spark.sql(query).show()

+--------+
|    name|
+--------+
|어벤져스|
|  슈퍼맨|
|  배트맨|
|겨울왕국|
|아이언맨|
+--------+



In [21]:
# 2010년 이후에 개봉한 영화를 조회

query = """
    SELECT name
    FROM movies
    WHERE year > 2010
"""
spark.sql(query).show()

+--------+
|    name|
+--------+
|어벤져스|
|  슈퍼맨|
|겨울왕국|
+--------+



In [22]:
# 2012년도 이전에 개봉한 영화의 이름과 회사를 출력
query = """
    SELECT name, company
    FROM movies
    WHERE year < 2012
"""
spark.sql(query).show()

+--------+-------+
|    name|company|
+--------+-------+
|  배트맨|     DC|
|아이언맨|   마블|
+--------+-------+



In [23]:
# like 문자열 데이터에서 특정 단어나 문장을 포함한 데이터를 찾을 때
# % 기호를 사용해서 문장이 매칭되는지 확인 가능!
# 제목이 ~~맨으로 끝나는 데이터의 모든 정보를 조회
query = """
    SELECT * 
    FROM movies
    WHERE name LIKE '%맨'
"""
spark.sql(query).show()

+---+--------+-------+----+-----+---+
| id|    name|company|year|month|day|
+---+--------+-------+----+-----+---+
|  2|  슈퍼맨|     DC|2013|    6| 13|
|  3|  배트맨|     DC|2008|    8|  6|
|  5|아이언맨|   마블|2008|    4| 30|
+---+--------+-------+----+-----+---+



In [25]:
# BETWEEN 특정 데이터와 데이터 사이를 조회
# 개봉 월이 4 ~ 8월 사이. 4 <= 개봉월 <= 8
query = """
    SELECT * 
    FROM movies
    WHERE month BETWEEN 4 AND 8
"""
spark.sql(query).show()

+---+--------+-------+----+-----+---+
| id|    name|company|year|month|day|
+---+--------+-------+----+-----+---+
|  1|어벤져스|   마블|2012|    4| 26|
|  2|  슈퍼맨|     DC|2013|    6| 13|
|  3|  배트맨|     DC|2008|    8|  6|
|  5|아이언맨|   마블|2008|    4| 30|
+---+--------+-------+----+-----+---+



# join 구현하기

In [26]:
attendances = [
    (1, 13934592., "KR"),
    (2, 2182227.,"KR"),
    (3, 4226242., "KR"),
    (4, 10303058., "KR"),
    (5, 4300365., "KR")
]

In [27]:
# 직접 스키마 지정해 보기
from pyspark.sql.types import StringType, FloatType\
    , IntegerType\
    , StructType, StructField

In [28]:
att_schema = StructType([ # 모든 컬럼의 타입을 통칭 - 컬럼 데이터의 집합
    StructField("id", IntegerType(), True), # StructField : 컬럼
    StructField("att", FloatType(), True),
    StructField("theater_country", StringType(), True)
])

In [29]:
att_df = spark.createDataFrame(
    data=attendances,
    schema=att_schema
)

att_df.dtypes

[('id', 'int'), ('att', 'float'), ('theater_country', 'string')]

In [30]:
att_df.createOrReplaceTempView("att")

In [32]:
att_df.select('*').show()

+---+-----------+---------------+
| id|        att|theater_country|
+---+-----------+---------------+
|  1|1.3934592E7|             KR|
|  2|  2182227.0|             KR|
|  3|  4226242.0|             KR|
|  4|1.0303058E7|             KR|
|  5|  4300365.0|             KR|
+---+-----------+---------------+



In [34]:
# df와 join
query = '''
    select movies.id, movies.name, movies.company, att.att
    from movies
    join att on movies.id = att.id
'''

spark.sql(query).show()

+---+--------+-------+-----------+
| id|    name|company|        att|
+---+--------+-------+-----------+
|  1|어벤져스|   마블|1.3934592E7|
|  2|  슈퍼맨|     DC|  2182227.0|
|  3|  배트맨|     DC|  4226242.0|
|  4|겨울왕국| 디즈니|1.0303058E7|
|  5|아이언맨|   마블|  4300365.0|
+---+--------+-------+-----------+



In [35]:
spark.stop()

# SQL 최적화

In [36]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("trip_count_sql").getOrCreate()

In [37]:
trip_file = "learning_spark_data/fhvhv_tripdata_2020-03.csv"

In [40]:
# inferSchema : 자동으로 스키마 예측하게 하기
data = spark.read.csv(trip_file, inferSchema=True, header=True)

In [41]:
data.count()

13392904

In [39]:
data.createOrReplaceTempView("mobility_data")

In [43]:
query = '''
    select *
    from mobility_data
    limit 5
'''

spark.sql(query).show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0005|              B02510|2020-03-01 00:03:40|2020-03-01 00:23:39|          81|         159|   NULL|
|           HV0005|              B02510|2020-03-01 00:28:05|2020-03-01 00:38:57|         168|         119|   NULL|
|           HV0003|              B02764|2020-03-01 00:03:07|2020-03-01 00:15:04|         137|         209|      1|
|           HV0003|              B02764|2020-03-01 00:18:42|2020-03-01 00:38:42|         209|          80|   NULL|
|           HV0003|              B02764|2020-03-01 00:44:24|2020-03-01 00:58:44|         256|         226|   NULL|
+-----------------+--------------------+-------------------+-------------------+

## 스파크 SQL을 사용하는 이유

In [44]:
query = """
select split(pickup_datetime, ' ')[0] as pickup_date, count(*) as trips
from mobility_data
group by pickup_date
"""

spark.sql(query).show()

+-----------+------+
|pickup_date| trips|
+-----------+------+
| 2020-03-03|697880|
| 2020-03-02|648986|
| 2020-03-01|784246|
| 2020-03-06|872012|
| 2020-03-05|731165|
| 2020-03-04|707879|
| 2020-03-09|628940|
| 2020-03-08|731222|
| 2020-03-07|886071|
| 2020-03-10|626474|
| 2020-03-12|643257|
| 2020-03-11|628601|
| 2020-03-16|391518|
| 2020-03-13|660914|
| 2020-03-15|448125|
| 2020-03-14|569397|
| 2020-03-26|141607|
| 2020-03-25|141088|
| 2020-03-20|261900|
| 2020-03-24|141686|
+-----------+------+
only showing top 20 rows



In [45]:
# 실행 계획 살펴보기
spark.sql(query).explain(True)

== Parsed Logical Plan ==
'Aggregate ['pickup_date], ['split('pickup_datetime,  )[0] AS pickup_date#382, 'count(1) AS trips#383]
+- 'UnresolvedRelation [mobility_data], [], false

== Analyzed Logical Plan ==
pickup_date: string, trips: bigint
Aggregate [split(cast(pickup_datetime#266 as string),  , -1)[0]], [split(cast(pickup_datetime#266 as string),  , -1)[0] AS pickup_date#382, count(1) AS trips#383L]
+- SubqueryAlias mobility_data
   +- View (`mobility_data`, [hvfhs_license_num#264,dispatching_base_num#265,pickup_datetime#266,dropoff_datetime#267,PULocationID#268,DOLocationID#269,SR_Flag#270])
      +- Relation [hvfhs_license_num#264,dispatching_base_num#265,pickup_datetime#266,dropoff_datetime#267,PULocationID#268,DOLocationID#269,SR_Flag#270] csv

== Optimized Logical Plan ==
Aggregate [_groupingexpression#387], [_groupingexpression#387 AS pickup_date#382, count(1) AS trips#383L]
+- Project [split(cast(pickup_datetime#266 as string),  , -1)[0] AS _groupingexpression#387]
   +- Rel

In [46]:
# 두번째 쿼리
spark.sql("""select 
                pickup_date, 
                count(*) as trips
             from ( select
                          split(pickup_datetime, ' ')[0] as pickup_date
                          from mobility_data )
             group by pickup_date""").explain(True)

== Parsed Logical Plan ==
'Aggregate ['pickup_date], ['pickup_date, 'count(1) AS trips#391]
+- 'SubqueryAlias __auto_generated_subquery_name
   +- 'Project ['split('pickup_datetime,  )[0] AS pickup_date#390]
      +- 'UnresolvedRelation [mobility_data], [], false

== Analyzed Logical Plan ==
pickup_date: string, trips: bigint
Aggregate [pickup_date#390], [pickup_date#390, count(1) AS trips#391L]
+- SubqueryAlias __auto_generated_subquery_name
   +- Project [split(cast(pickup_datetime#266 as string),  , -1)[0] AS pickup_date#390]
      +- SubqueryAlias mobility_data
         +- View (`mobility_data`, [hvfhs_license_num#264,dispatching_base_num#265,pickup_datetime#266,dropoff_datetime#267,PULocationID#268,DOLocationID#269,SR_Flag#270])
            +- Relation [hvfhs_license_num#264,dispatching_base_num#265,pickup_datetime#266,dropoff_datetime#267,PULocationID#268,DOLocationID#269,SR_Flag#270] csv

== Optimized Logical Plan ==
Aggregate [pickup_date#390], [pickup_date#390, count(1) AS tri

In [47]:
spark.stop()

In [55]:
# 운행 데이터 프레임 생성, Zone 데이터프레임 생성

trip_file = "fhvhv_tripdata_2020-03.csv"   
zone_file = "taxi+_zone_lookup.csv"

In [49]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("trip_count_sql").getOrCreate()

In [56]:
trip_data = spark.read.format('csv')\
        .option('header','true')\
        .option('inferSchema', 'true')\
        .load("learning_spark_data/fhvhv_tripdata_2020-03.csv")      

zone_data = spark.read.format('csv')\
        .option('header','true')\
        .option('inferSchema', 'true')\
        .load("learning_spark_data/taxi+_zone_lookup.csv")      

In [57]:
trip_data.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: integer (nullable = true)



In [62]:
zone_data.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [60]:
trip_data.createOrReplaceTempView("trips")
zone_data.createOrReplaceTempView("zones")

In [61]:
# 승차 Location(PULocationID)별 개수 세기
query = '''
    SELECT PULocationID, COUNT(*)
    FROM trips
    GROUP BY PULocationID
'''

spark.sql(query).show(5)

+------------+--------+
|PULocationID|count(1)|
+------------+--------+
|         148|  116205|
|         243|   87431|
|          31|    5285|
|         137|   85552|
|          85|   46120|
+------------+--------+
only showing top 5 rows



In [65]:
# 하차 Location(DOLocationID)별 개수 세기
query = '''
    SELECT DOLocationID, COUNT(*)
    FROM trips
    GROUP BY DOLocationID
'''

spark.sql(query).show(5)

+------------+--------+
|DOLocationID|count(1)|
+------------+--------+
|         148|   91601|
|         243|   86795|
|          31|    5526|
|          85|   44509|
|         137|   80098|
+------------+--------+
only showing top 5 rows



In [67]:
# HV0003 운송사업자의 승차 지역별 트립 건수를 집계하고, 
query = '''
    SELECT trips.PULocationID, zones.Zone, COUNT(*)
    FROM trips
    JOIN zones
    ON trips.PULocationID = zones.LocationID
    WHERE hvfhs_license_num = 'HV0003'
    GROUP BY trips.PULocationID, zones.Zone
'''

spark.sql(query).show(5)

+------------+--------------------+--------+
|PULocationID|                Zone|count(1)|
+------------+--------------------+--------+
|         181|          Park Slope|   79235|
|         182|         Parkchester|   33478|
|         166| Morningside Heights|   46653|
|         250|Westchester Villa...|   30788|
|          18|        Bedford Park|   70536|
+------------+--------------------+--------+
only showing top 5 rows



In [78]:
# 가장 많은 운송사업자순으로 정렬하는 분석 쿼리  hvfhs_license_num
query = '''
    SELECT hvfhs_license_num,COUNT(*)
    FROM trips
    GROUP BY hvfhs_license_num
    ORDER BY COUNT(*) DESC
'''

spark.sql(query).show(5)

+-----------------+--------+
|hvfhs_license_num|count(1)|
+-----------------+--------+
|           HV0003| 9836763|
|           HV0005| 3219535|
|           HV0004|  336606|
+-----------------+--------+



In [76]:
# 운송사별 운행 건수 비교
query = '''
    SELECT hvfhs_license_num, COUNT(*)
    FROM trips
    GROUP BY hvfhs_license_num
'''

spark.sql(query).show(5)

+-----------------+--------+
|hvfhs_license_num|count(1)|
+-----------------+--------+
|           HV0004|  336606|
|           HV0005| 3219535|
|           HV0003| 9836763|
+-----------------+--------+



In [79]:
# 승차 위치 Borough별 운행 건수
query = '''
    SELECT  zones.Borough, COUNT(*)
    FROM trips
    JOIN zones
    ON trips.PULocationID = zones.LocationID
    GROUP BY zones.Borough
    ORDER BY COUNT(*) DESC
'''

spark.sql(query).show(5)

+-------------+--------+
|      Borough|count(1)|
+-------------+--------+
|    Manhattan| 4953140|
|     Brooklyn| 3735764|
|       Queens| 2437383|
|        Bronx| 2086592|
|Staten Island|  178818|
+-------------+--------+
only showing top 5 rows



In [97]:
#서비스 존별 승차/하차 건수
from pyspark.sql.functions import sum

pickup_df = trip_data.groupBy("PULocationID").count().withColumnRenamed("count", "pickup_count")
drop_df = trip_data.groupBy("DOLocationID").count().withColumnRenamed("count", "drop_count")

pickup_joined = pickup_df.join(
    zone_data,
    pickup_df.PULocationID == zone_data.LocationID,
    how="left"
).groupBy("service_zone").agg(sum("pickup_count").alias("total_pickup"))

drop_joined = drop_df.join(
    zone_data,
    drop_df.DOLocationID == zone_data.LocationID,
    how="left"
).groupBy("service_zone").agg(sum("drop_count").alias("total_drop"))

result_df = pickup_joined.join(
    drop_joined,
    on="service_zone",
    how="outer"
)
result_df.show()

+------------+------------+----------+
|service_zone|total_pickup|total_drop|
+------------+------------+----------+
|    Airports|      319610|    411156|
|   Boro Zone|     9046897|   8885136|
|         EWR|         362|     65066|
|         N/A|         845|    387759|
| Yellow Zone|     4025190|   3643787|
+------------+------------+----------+



## 혼자 생각해보기

In [105]:
# 1. 요일별 승차 건수 분석
from pyspark.sql.functions import date_format, count

trip_weekday = trip_data.withColumn('weekday', date_format('pickup_datetime', 'EEEE'))

weekday_counts = trip_weekday.groupBy('weekday')\
    .agg(count('*').alias('trip_count'))\
    .orderBy('trip_count', ascending=False)

weekday_counts.show()

+---------+----------+
|  weekday|trip_count|
+---------+----------+
|   Sunday|   2241151|
|   Monday|   1964429|
|   Friday|   1954165|
|  Tuesday|   1907888|
| Saturday|   1809669|
| Thursday|   1768802|
|Wednesday|   1746800|
+---------+----------+



In [106]:
# 2. Zone + 요일 조합별 승차 건수 분석
pickup_joined = trip_weekday.join(
    zone_data,
    trip_weekday.PULocationID == zone_data.LocationID,
    how="left"
)

zone_weekday_counts = pickup_joined.groupBy('service_zone', 'weekday')\
    .agg(count("*").alias('trip_count'))\
    .orderBy('trip_count', ascending=False)

zone_weekday_counts.show(10)

+------------+---------+----------+
|service_zone|  weekday|trip_count|
+------------+---------+----------+
|   Boro Zone|   Sunday|   1528460|
|   Boro Zone|   Friday|   1333359|
|   Boro Zone|   Monday|   1326991|
|   Boro Zone|  Tuesday|   1274010|
|   Boro Zone| Saturday|   1265210|
|   Boro Zone| Thursday|   1170749|
|   Boro Zone|Wednesday|   1148118|
| Yellow Zone|   Sunday|    646283|
| Yellow Zone|  Tuesday|    588357|
| Yellow Zone|   Friday|    583137|
+------------+---------+----------+
only showing top 10 rows



In [107]:
# 3. 요일별 가장 붐비는 서비스 존
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.partitionBy('weekday').orderBy(zone_weekday_counts['trip_count'].desc())
ranked_zone_by_weekday = zone_weekday_counts.withColumn('rank', row_number().over(window_spec))

top_zone_each_weekday = ranked_zone_by_weekday.filter("rank == 1").orderBy('weekday')
top_zone_each_weekday.show()

+------------+---------+----------+----+
|service_zone|  weekday|trip_count|rank|
+------------+---------+----------+----+
|   Boro Zone|   Friday|   1333359|   1|
|   Boro Zone|   Monday|   1326991|   1|
|   Boro Zone| Saturday|   1265210|   1|
|   Boro Zone|   Sunday|   1528460|   1|
|   Boro Zone| Thursday|   1170749|   1|
|   Boro Zone|  Tuesday|   1274010|   1|
|   Boro Zone|Wednesday|   1148118|   1|
+------------+---------+----------+----+

