## dataframe 과 sql

### csv로부터 dataframe을 만들고, 그룹핑 해보자

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("trip_count_sql").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/14 20:13:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [1]:
directory = "/Users/shinjiyoung/PycharmProjects/data-engineering/01-spark/data"
filename = "fhvhv_tripdata_2020-03.csv"

In [4]:
# inferSchema : 스키마 자동 예측
# header : csv에서 header 자동 인식
data = spark.read.csv(f"file:///{directory}/{filename}", inferSchema = True, header = True)

                                                                                

In [7]:
data.show(5)

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0005|              B02510|2020-03-01 00:03:40|2020-03-01 00:23:39|          81|         159|   null|
|           HV0005|              B02510|2020-03-01 00:28:05|2020-03-01 00:38:57|         168|         119|   null|
|           HV0003|              B02764|2020-03-01 00:03:07|2020-03-01 00:15:04|         137|         209|      1|
|           HV0003|              B02764|2020-03-01 00:18:42|2020-03-01 00:38:42|         209|          80|   null|
|           HV0003|              B02764|2020-03-01 00:44:24|2020-03-01 00:58:44|         256|         226|   null|
+-----------------+--------------------+-------------------+-------------------+

In [8]:
data.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: integer (nullable = true)



In [9]:
data.createOrReplaceTempView("mb_data")

In [10]:
spark.sql("select * from mb_data limit 5").show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0005|              B02510|2020-03-01 00:03:40|2020-03-01 00:23:39|          81|         159|   null|
|           HV0005|              B02510|2020-03-01 00:28:05|2020-03-01 00:38:57|         168|         119|   null|
|           HV0003|              B02764|2020-03-01 00:03:07|2020-03-01 00:15:04|         137|         209|      1|
|           HV0003|              B02764|2020-03-01 00:18:42|2020-03-01 00:38:42|         209|          80|   null|
|           HV0003|              B02764|2020-03-01 00:44:24|2020-03-01 00:58:44|         256|         226|   null|
+-----------------+--------------------+-------------------+-------------------+

In [18]:
# pickup_datetime의 날짜별 건수 그룹핑
# split
spark.sql("select pickup_date, count(*) as trips from (select split(pickup_datetime, ' ')[0] as pickup_date from mb_data) group by pickup_date order by pickup_date").show()

[Stage 23:>                                                       (0 + 16) / 16]

+-----------+------+
|pickup_date| trips|
+-----------+------+
| 2020-03-01|784246|
| 2020-03-02|648986|
| 2020-03-03|697880|
| 2020-03-04|707879|
| 2020-03-05|731165|
| 2020-03-06|872012|
| 2020-03-07|886071|
| 2020-03-08|731222|
| 2020-03-09|628940|
| 2020-03-10|626474|
| 2020-03-11|628601|
| 2020-03-12|643257|
| 2020-03-13|660914|
| 2020-03-14|569397|
| 2020-03-15|448125|
| 2020-03-16|391518|
| 2020-03-17|312298|
| 2020-03-18|269232|
| 2020-03-19|252773|
| 2020-03-20|261900|
+-----------+------+
only showing top 20 rows



                                                                                

### 사람들이 어디서 가장 많이 타고 내리는가를 집계해보자

In [21]:
directory = "/Users/shinjiyoung/PycharmProjects/data-engineering/01-spark/data"
filename_trip = "fhvhv_tripdata_2020-03.csv"
filename_zone = "taxi+_zone_lookup.csv"

In [23]:
trip = spark.read.csv(f"file:////{directory}/{filename_trip}", inferSchema=True, header=True)
zone = spark.read.csv(f"file:////{directory}/{filename_zone}", inferSchema=True, header=True)

                                                                                

In [27]:
trip.createOrReplaceTempView("trip")
zone.createOrReplaceTempView("zone")

In [29]:
spark.sql("select * from trip limit 3").show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0005|              B02510|2020-03-01 00:03:40|2020-03-01 00:23:39|          81|         159|   null|
|           HV0005|              B02510|2020-03-01 00:28:05|2020-03-01 00:38:57|         168|         119|   null|
|           HV0003|              B02764|2020-03-01 00:03:07|2020-03-01 00:15:04|         137|         209|      1|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+



In [30]:
spark.sql("select * from zone limit 3").show()

+----------+-------+--------------------+------------+
|LocationID|Borough|                Zone|service_zone|
+----------+-------+--------------------+------------+
|         1|    EWR|      Newark Airport|         EWR|
|         2| Queens|         Jamaica Bay|   Boro Zone|
|         3|  Bronx|Allerton/Pelham G...|   Boro Zone|
+----------+-------+--------------------+------------+



In [39]:
# 가장 많은 승객이 승차한 지역은 어디일까?
spark.sql("select zone.Borough, count(*) as peoples \
from trip join zone on trip.PULocationID = zone.LocationID \
group by zone.Borough \
order by count(*)").show()

[Stage 70:>                                                       (0 + 16) / 16]

+-------------+-------+
|      Borough|peoples|
+-------------+-------+
|          EWR|    362|
|      Unknown|    845|
|Staten Island| 178818|
|        Bronx|2086592|
|       Queens|2437383|
|     Brooklyn|3735764|
|    Manhattan|4953140|
+-------------+-------+





In [40]:
# 가장 많은 승객이 하차한 지역은 어디일까?
spark.sql("select zone.Borough, count(*) as peoples \
from trip join zone on trip.DOLocationID = zone.LocationID \
group by zone.Borough \
order by count(*)").show()

[Stage 74:>                                                       (0 + 16) / 16]

+-------------+-------+
|      Borough|peoples|
+-------------+-------+
|          EWR|  65066|
|Staten Island| 177727|
|      Unknown| 387759|
|        Bronx|2043486|
|       Queens|2468408|
|     Brooklyn|3696682|
|    Manhattan|4553776|
+-------------+-------+



                                                                                