In [11]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("24121003_yellowtaxi_trip_count").getOrCreate()

In [12]:
import os
trip_files = '/trips/*'
zone_file = 'taxi+_zone_lookup.csv'
directory = os.path.join(os.getcwd(), 'data')

In [13]:
trips_df = spark.read.csv(f'file:///{directory}/{trip_files}', inferSchema=True, header=True)

                                                                                

In [14]:
zone_df = spark.read.csv(f'file:///{directory}/{zone_file}', inferSchema=True, header=True)

In [44]:
zone_df.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [45]:
zone_df.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [46]:
trips_df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|       2| 2021-03-01 00:22:02|  2021-03-01 00:23:22|              1|          0.0|         1|                 N|         264|         264|           2|        3.0|  0.5|    0.5|       0.0|         0.0|                  0.3

In [15]:
trips_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [16]:
trips_df.createOrReplaceTempView('trips')
zone_df.createOrReplaceTempView('zone')

t,pz,dz 별칭 설정
LEFT JOIN -  각 여행의 출발지와 목적지에 대한 상세 정보 얻기

In [27]:
query = '''
select
t.VendorID,
TO_DATE(t.tpep_pickup_datetime) as pickup_date,
TO_DATE(t.tpep_dropoff_datetime) as dropoff_date,
HOUR(t.tpep_pickup_datetime)  as pickup_time,
HOUR(t.tpep_dropoff_datetime) as dropoff_time,
t.passenger_count,
t.trip_distance,
t.tip_amount,
t.total_amount,
t.payment_type,
pz.Zone as pickup_zone,
dz.Zone as dropoff_zone
from trips t
LEFT JOIN zone pz ON t.PULocationID = pz.LocationID
LEFT JOIN zone dz ON t.DOLocationID = dz.LocationID
'''

In [28]:
comb_df = spark.sql(query)

In [29]:
comb_df.count()

                                                                                

15000700

In [30]:
comb_df.show(5)

+--------+-----------+------------+-----------+------------+---------------+-------------+----------+------------+------------+-----------------+--------------+
|VendorID|pickup_date|dropoff_date|pickup_time|dropoff_time|passenger_count|trip_distance|tip_amount|total_amount|payment_type|      pickup_zone|  dropoff_zone|
+--------+-----------+------------+-----------+------------+---------------+-------------+----------+------------+------------+-----------------+--------------+
|       2| 2021-03-01|  2021-03-01|          0|           0|              1|          0.0|       0.0|         4.3|           2|               NV|            NV|
|       2| 2021-03-01|  2021-03-01|          0|           0|              1|          0.0|       0.0|         3.8|           2|   Manhattanville|Manhattanville|
|       2| 2021-03-01|  2021-03-01|          0|           0|              1|          0.0|       0.0|         4.8|           2|   Manhattanville|Manhattanville|
|       1| 2021-03-01|  2021-03-01

In [31]:
query = '''
select pickup_date, pickup_time
from comb
where pickup_time>0

'''

In [32]:
spark.sql(query).show()

+-----------+-----------+
|pickup_date|pickup_time|
+-----------+-----------+
| 2021-02-28|         23|
| 2021-02-28|         23|
| 2021-02-28|         23|
| 2021-02-28|         23|
| 2021-02-28|         23|
| 2021-02-28|         23|
| 2021-02-28|         23|
| 2021-03-01|         22|
| 2021-03-01|          1|
| 2021-03-01|          1|
| 2021-03-01|          1|
| 2021-03-01|          1|
| 2021-03-01|          1|
| 2021-03-01|          1|
| 2021-03-01|          1|
| 2021-03-01|          1|
| 2021-03-01|          1|
| 2021-03-01|          1|
| 2021-03-01|          1|
| 2021-03-01|          1|
+-----------+-----------+
only showing top 20 rows



In [33]:
query = '''
select pickup_date, pickup_time 
from comb 
where pickup_date<'2020-12-31'
'''
spark.sql(query).show()



+-----------+-----------+
|pickup_date|pickup_time|
+-----------+-----------+
| 2009-01-01|          0|
| 2008-12-31|         23|
| 2009-01-01|          0|
| 2009-01-01|          0|
| 2009-01-01|          0|
| 2009-01-01|          0|
| 2009-01-01|          0|
| 2009-01-01|          1|
| 2009-01-01|          0|
| 2008-12-31|         23|
| 2008-12-31|         23|
| 2008-12-31|         23|
| 2008-12-31|         23|
| 2009-01-01|          0|
| 2009-01-01|          0|
| 2009-01-01|          0|
| 2009-01-01|         16|
| 2009-01-01|         16|
| 2009-01-01|          0|
| 2009-01-01|          0|
+-----------+-----------+
only showing top 20 rows



                                                                                

In [34]:
comb_df.describe()

                                                                                

DataFrame[summary: string, VendorID: string, pickup_time: string, dropoff_time: string, passenger_count: string, trip_distance: string, tip_amount: string, total_amount: string, payment_type: string, pickup_zone: string, dropoff_zone: string]

In [35]:
spark.sql(query).explain()

== Physical Plan ==
*(3) Project [cast(tpep_pickup_datetime#133 as date) AS pickup_date#210, hour(cast(tpep_pickup_datetime#133 as timestamp), Some(Asia/Seoul)) AS pickup_time#212]
+- *(3) BroadcastHashJoin [DOLocationID#140], [LocationID#216], LeftOuter, BuildRight, false
   :- *(3) Project [tpep_pickup_datetime#133, DOLocationID#140]
   :  +- *(3) BroadcastHashJoin [PULocationID#139], [LocationID#184], LeftOuter, BuildRight, false
   :     :- *(3) Filter (isnotnull(tpep_pickup_datetime#133) AND (cast(tpep_pickup_datetime#133 as date) < 18627))
   :     :  +- FileScan csv [tpep_pickup_datetime#133,PULocationID#139,DOLocationID#140] Batched: false, DataFilters: [isnotnull(tpep_pickup_datetime#133), (cast(tpep_pickup_datetime#133 as date) < 18627)], Format: CSV, Location: InMemoryFileIndex[file:/home/lab15/src/data/trips/yellow_tripdata_2021-01.csv, file:/home/lab15/s..., PartitionFilters: [], PushedFilters: [IsNotNull(tpep_pickup_datetime)], ReadSchema: struct<tpep_pickup_datetime:stri

In [43]:
#실행계획, 실행결과(4040)

query2 = '''
select pickup_date, pickup_time 
from comb 
where pickup_time > 0 and pickup_time<=12
'''
spark.sql(query2).explain()

== Physical Plan ==
*(3) Project [cast(tpep_pickup_datetime#133 as date) AS pickup_date#210, hour(cast(tpep_pickup_datetime#133 as timestamp), Some(Asia/Seoul)) AS pickup_time#212]
+- *(3) BroadcastHashJoin [DOLocationID#140], [LocationID#216], LeftOuter, BuildRight, false
   :- *(3) Project [tpep_pickup_datetime#133, DOLocationID#140]
   :  +- *(3) BroadcastHashJoin [PULocationID#139], [LocationID#184], LeftOuter, BuildRight, false
   :     :- *(3) Filter ((isnotnull(tpep_pickup_datetime#133) AND (hour(cast(tpep_pickup_datetime#133 as timestamp), Some(Asia/Seoul)) > 0)) AND (hour(cast(tpep_pickup_datetime#133 as timestamp), Some(Asia/Seoul)) <= 12))
   :     :  +- FileScan csv [tpep_pickup_datetime#133,PULocationID#139,DOLocationID#140] Batched: false, DataFilters: [isnotnull(tpep_pickup_datetime#133), (hour(cast(tpep_pickup_datetime#133 as timestamp), Some(Asi..., Format: CSV, Location: InMemoryFileIndex[file:/home/lab15/src/data/trips/yellow_tripdata_2021-01.csv, file:/home/lab15/s.

In [42]:
#FileScan csv - Filter - Zone 데이터 읽기 - BroadcastHashJoin (첫 번째,두 번째 조인) 
# - Project(date타입 변환) - HashAggregate (부분 집계) - Exchange hashpartitioning(파티셔닝 재분배) - HashAggregate (최종 집계)
# - Exchange rangepartitioning(최종집계결과 pickup_date 기준 정렬 재분배) - Sort 오름차순 ASC

query3 = '''
select pickup_date , count(*) as trip_count
from comb 
where pickup_time > 0
group by pickup_date
order by pickup_date
'''
spark.sql(query3).explain()

== Physical Plan ==
*(5) Sort [pickup_date#210 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(pickup_date#210 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#1240]
   +- *(4) HashAggregate(keys=[pickup_date#210], functions=[count(1)])
      +- Exchange hashpartitioning(pickup_date#210, 200), ENSURE_REQUIREMENTS, [id=#1236]
         +- *(3) HashAggregate(keys=[pickup_date#210], functions=[partial_count(1)])
            +- *(3) Project [cast(tpep_pickup_datetime#133 as date) AS pickup_date#210]
               +- *(3) BroadcastHashJoin [DOLocationID#140], [LocationID#216], LeftOuter, BuildRight, false
                  :- *(3) Project [tpep_pickup_datetime#133, DOLocationID#140]
                  :  +- *(3) BroadcastHashJoin [PULocationID#139], [LocationID#184], LeftOuter, BuildRight, false
                  :     :- *(3) Filter (isnotnull(tpep_pickup_datetime#133) AND (hour(cast(tpep_pickup_datetime#133 as timestamp), Some(Asia/Seoul)) > 0))
                  :     :  +- FileSc