In [25]:
from pyspark.sql import SparkSession
import os

In [2]:
ss = SparkSession.builder.appName('YellowTaxi').getOrCreate()
ss

24/12/10 15:47:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [47]:
trip_files = '/trips/*.csv'
zone_file = 'trips/code/lookup.csv'
directory = os.path.join(os.getcwd(), 'data')

In [48]:
trips_df = ss.read.csv(f'file:///{directory}/{trip_files}', inferSchema = True , header = True)
trips_df.show(3)
trips_df.printSchema()



+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|       2| 2021-03-01 00:22:02|  2021-03-01 00:23:22|              1|          0.0|         1|                 N|         264|         264|           2|        3.0|  0.5|    0.5|       0.0|         0.0|                  0.3

                                                                                

In [49]:
zone_df = ss.read.csv(f'file:///{directory}/{zone_file}', inferSchema = True, header = True)
zone_df.show(3)

+----------+-------+--------------------+------------+
|LocationID|Borough|                Zone|service_zone|
+----------+-------+--------------------+------------+
|         1|    EWR|      Newark Airport|         EWR|
|         2| Queens|         Jamaica Bay|   Boro Zone|
|         3|  Bronx|Allerton/Pelham G...|   Boro Zone|
+----------+-------+--------------------+------------+
only showing top 3 rows



In [51]:
trips_df.createOrReplaceTempView('trips')
zone_df.createOrReplaceTempView('zone')

In [79]:
comb_df = ss.sql('''
select

    T.VendorID
    , Date(T.tpep_pickup_datetime) as pickup_date
    , Date(T.tpep_dropoff_datetime) as dropoff_date
    , HOUR(T.tpep_pickup_datetime) as pickup_time
    , HOUR(T.tpep_dropoff_datetime) as dropoff_time
    , T.passenger_count
    , T.tip_amount
    , T.total_amount
    , T.payment_type
    -- , Z.Zone as pickup_zone
    -- , Z.Zone as Dropoff_zone
    , PZ.Zone as pickup_zone
    , DZ.Zone as Dropoff_zone
from trips as T

--left join zone as Z
--on T.PULocationID = Z.LocationID
--    and T.DOLocationID = Z.LocationID

left join zone PZ on T.PULocationID = PZ.LocationID
left join zone DZ on T.DOLocationID = DZ.LocationID

''')

In [78]:
ss.sql('''
select

    T.VendorID
    , Date(T.tpep_pickup_datetime) as pickup_date
    , Date(T.tpep_dropoff_datetime) as dropoff_date
    , HOUR(T.tpep_pickup_datetime) as pickup_time
    , HOUR(T.tpep_dropoff_datetime) as dropoff_time
    , T.passenger_count
    , T.tip_amount
    , T.total_amount
    , T.payment_type
    , Z.Zone as pickup_zone
    , Z.Zone as Dropoff_zone
    -- , PZ.Zone as pickup_zone
    -- , DZ.Zone as Dropoff_zone
from trips as T

left join zone as Z
on T.PULocationID = Z.LocationID
    and T.DOLocationID = Z.LocationID

--left join zone PZ on T.PULocationID = PZ.LocationID
--left join zone DZ on T.DOLocationID = DZ.LocationID

''').count()

                                                                                

15000700

In [80]:
comb_df.count()

                                                                                

15000700

In [81]:
comb_df.createOrReplaceTempView('comb_view')

In [85]:
ss.sql('''
select distinct pickup_time
from comb_view
''').show()

                                                                                

+-----------+
|pickup_time|
+-----------+
|         12|
|         22|
|          1|
|         13|
|          6|
|         16|
|          3|
|         20|
|          5|
|         19|
|         15|
|          9|
|         17|
|          4|
|          8|
|         23|
|          7|
|         10|
|         21|
|         11|
+-----------+
only showing top 20 rows



In [87]:
ss.sql('''
select distinct pickup_date
from comb_view
order by pickup_date
limit 10
''').show()

ss.sql('''
select distinct pickup_date
from comb_view
order by pickup_date desc
limit 10
''').show()

                                                                                

+-----------+
|pickup_date|
+-----------+
| 2002-12-31|
| 2003-01-05|
| 2004-04-04|
| 2008-12-31|
| 2009-01-01|
| 2020-12-31|
| 2021-01-01|
| 2021-01-02|
| 2021-01-03|
| 2021-01-04|
+-----------+





+-----------+
|pickup_date|
+-----------+
| 2029-05-05|
| 2021-12-15|
| 2021-11-24|
| 2021-11-03|
| 2021-10-12|
| 2021-09-21|
| 2021-08-31|
| 2021-08-09|
| 2021-08-02|
| 2021-08-01|
+-----------+



                                                                                

In [88]:
comb_df.describe()

                                                                                

DataFrame[summary: string, VendorID: string, pickup_time: string, dropoff_time: string, passenger_count: string, tip_amount: string, total_amount: string, payment_type: string, pickup_zone: string, Dropoff_zone: string]

In [97]:
#실행계획, 실행결과 비교하기
q1 = '''
select pickup_date, pickup_time
from comb_view
'''

q2 = '''
select pickup_date, pickup_time
from comb_view
where pickup_time > 0 and pickup_time <=12
'''

q3 = '''
select pickup_date, max(pickup_time)
from comb_view
where pickup_time > 0 and pickup_time <=12
group by pickup_date
order by pickup_date
'''

ss.sql(q1).explain()
ss.sql(q2).explain()
ss.sql(q3).explain()

ss.sql(q1).show()
ss.sql(q2).show()
ss.sql(q3).show()

#브로드캐스트는 최소화하는 것이 성능에 좋음

== Physical Plan ==
*(3) Project [cast(tpep_pickup_datetime#1510 as date) AS pickup_date#2654, hour(cast(tpep_pickup_datetime#1510 as timestamp), Some(Asia/Seoul)) AS pickup_time#2656]
+- *(3) BroadcastHashJoin [DOLocationID#1517], [LocationID#2660], LeftOuter, BuildRight, false
   :- *(3) Project [tpep_pickup_datetime#1510, DOLocationID#1517]
   :  +- *(3) BroadcastHashJoin [PULocationID#1516], [LocationID#1652], LeftOuter, BuildRight, false
   :     :- FileScan csv [tpep_pickup_datetime#1510,PULocationID#1516,DOLocationID#1517] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/lab17/git/src/data/trips/yellow_tripdata_2021-01.csv, file:/home/lab..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<tpep_pickup_datetime:string,PULocationID:int,DOLocationID:int>
   :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#2610]
   :        +- *(1) Filter isnotnull(LocationID#1652)
   :     



+-----------+----------------+
|pickup_date|max(pickup_time)|
+-----------+----------------+
| 2003-01-05|               7|
| 2004-04-04|               4|
| 2009-01-01|              12|
| 2021-01-01|              12|
| 2021-01-02|              12|
| 2021-01-03|              12|
| 2021-01-04|              12|
| 2021-01-05|              12|
| 2021-01-06|              12|
| 2021-01-07|              12|
| 2021-01-08|              12|
| 2021-01-09|              12|
| 2021-01-10|              12|
| 2021-01-11|              12|
| 2021-01-12|              12|
| 2021-01-13|              12|
| 2021-01-14|              12|
| 2021-01-15|              12|
| 2021-01-16|              12|
| 2021-01-17|              12|
+-----------+----------------+
only showing top 20 rows



                                                                                

In [98]:
ss.stop()