In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('24121003_yellowtaxi_trip_count').getOrCreate()

24/12/12 09:26:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
import os
trip_files = '/trips/*'
zone_file = 'taxi+_zone_lookup.csv'
directory = os.path.join(os.getcwd(), 'data')

In [3]:
trips_df = spark.read.csv(f'file:///{directory}/{trip_files}', inferSchema = True, header = True)

                                                                                

In [4]:
zone_df = spark.read.csv(f'file:///{directory}/{zone_file}', inferSchema = True, header = True)

In [5]:
trips_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [6]:
zone_df.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [7]:
trips_df.createOrReplaceTempView('trips')
zone_df.createOrReplaceTempView('zone')

In [8]:
comb_df = spark.sql(
    '''
    select t.VendorID, 
        TO_DATE(t.tpep_pickup_datetime) as pickup_date,
        TO_DATE(t.tpep_dropoff_datetime) as dropoff_date,
        HOUR(t.tpep_pickup_datetime) as pickup_time,
        HOUR(t.tpep_dropoff_datetime) as dropoff_time,
        t.passenger_count,
        t.trip_distance,
        t.payment_type,
        t.tip_amount,
        t.total_amount,
        t.tolls_amount,
        pz.Zone as pickup_zone,
        dz.Zone as dropoff_zone
        
        
    from trips t
    LEFT JOIN zone pz ON t.PULocationID = pz.LocationID
    LEFT JOIN zone dz ON t.DOLocationID = dz.LocationID
    '''
)

In [9]:
comb_df.count()

                                                                                

15000700

In [44]:
comb_df.show(5)

+--------+-----------+------------+-----------+------------+---------------+-------------+------------+----------+------------+------------+-----------------+--------------+
|VendorID|pickup_date|dropoff_date|pickup_time|dropoff_time|passenger_count|trip_distance|payment_type|tip_amount|total_amount|tolls_amount|      pickup_zone|  dropoff_zone|
+--------+-----------+------------+-----------+------------+---------------+-------------+------------+----------+------------+------------+-----------------+--------------+
|       2| 2021-03-01|  2021-03-01|          0|           0|              1|          0.0|           2|       0.0|         4.3|         0.0|               NV|            NV|
|       2| 2021-03-01|  2021-03-01|          0|           0|              1|          0.0|           2|       0.0|         3.8|         0.0|   Manhattanville|Manhattanville|
|       2| 2021-03-01|  2021-03-01|          0|           0|              1|          0.0|           2|       0.0|         4.8|   

In [10]:
comb_df.createOrReplaceTempView('comb')

## 데이터 전처리

1. 2021년을 제외한 자료 제거
2. 이상치 값 제거

In [11]:
#2021년이 아닌 데이터
spark.sql(
    '''
    SELECT distinct pickup_date
    FROM comb 
    WHERE year(pickup_date) != '2021'
    ORDER BY pickup_date
    '''
).show()



+-----------+
|pickup_date|
+-----------+
| 2002-12-31|
| 2003-01-05|
| 2004-04-04|
| 2008-12-31|
| 2009-01-01|
| 2020-12-31|
| 2029-05-05|
+-----------+



                                                                                

In [12]:
# 이상치 값 찾기

for i in ['passenger_count', 'trip_distance','tip_amount', 'total_amount']:
    print(f'{i}의 describe -----------------------------------')
    comb_df.select(i).describe().show()

passenger_count의 describe -----------------------------------


                                                                                

+-------+------------------+
|summary|   passenger_count|
+-------+------------------+
|  count|          14166672|
|   mean|1.4253783104458126|
| stddev|  1.04432704905968|
|    min|                 0|
|    max|                 9|
+-------+------------------+

trip_distance의 describe -----------------------------------


                                                                                

+-------+-----------------+
|summary|    trip_distance|
+-------+-----------------+
|  count|         15000700|
|   mean|6.628629402627818|
| stddev|671.7293482115828|
|    min|              0.0|
|    max|        332541.19|
+-------+-----------------+

tip_amount의 describe -----------------------------------


                                                                                

+-------+-----------------+
|summary|       tip_amount|
+-------+-----------------+
|  count|         15000700|
|   mean|2.146797558780939|
| stddev|2.610914434555077|
|    min|          -333.32|
|    max|          1140.44|
+-------+-----------------+

total_amount의 describe -----------------------------------




+-------+-----------------+
|summary|     total_amount|
+-------+-----------------+
|  count|         15000700|
|   mean|18.75545205708744|
| stddev|145.7442452805979|
|    min|           -647.8|
|    max|         398469.2|
+-------+-----------------+



                                                                                

In [23]:
taxi = spark.sql(
    '''
    SELECT *
    FROM comb 
    WHERE year(pickup_date) == '2021'
        AND tip_amount > 0
        AND total_amount > 0
        AND trip_distance > 0
    '''
)
taxi.count()

                                                                                

10567904

In [29]:
spark.sql(
    '''
    SELECT trip_distance, tip_amount, total_amount, tolls_amount
    FROM comb 
    WHERE tip_amount = 0
    '''
).show(20)

+-------------+----------+------------+------------+
|trip_distance|tip_amount|total_amount|tolls_amount|
+-------------+----------+------------+------------+
|          0.0|       0.0|         4.3|         0.0|
|          0.0|       0.0|         3.8|         0.0|
|          0.0|       0.0|         4.8|         0.0|
|         12.4|       0.0|        43.8|         0.0|
|          9.7|       0.0|        32.3|         0.0|
|         16.2|       0.0|        45.3|         0.0|
|         2.57|       0.0|        12.8|         0.0|
|          0.4|       0.0|         5.3|         0.0|
|         3.26|       0.0|        17.3|         0.0|
|         2.35|       0.0|        12.8|         0.0|
|         18.3|       0.0|       61.42|        6.12|
|          2.0|       0.0|        11.8|         0.0|
|          1.3|       0.0|        16.8|         0.0|
|         4.19|       0.0|        17.8|         0.0|
|          3.5|       0.0|        16.3|         0.0|
|         4.89|       0.0|        18.3|       

In [31]:
taxi.createOrReplaceTempView("taxi")

In [37]:
comb_df.describe().show()



+-------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+-------------------+--------------------+--------------------+
|summary|          VendorID|       pickup_time|      dropoff_time|   passenger_count|     trip_distance|       payment_type|        tip_amount|      total_amount|       payment_type|         pickup_zone|        dropoff_zone|
+-------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+-------------------+--------------------+--------------------+
|  count|           8596840|           8921357|           8921357|           8596839|           8921357|            8596838|           8921355|           8921354|            8596838|             8921357|             8921356|
|   mean| 1.682631874037437|14.074268185882485|14.178853844768234|1.4186727237767276|3.7238418706928

                                                                                

In [38]:
# 실행계획, 실행결과(4040)
query2 = '''
select pickup_date, pickup_time
from comb
where pickup_time > 0 and pickup_time <= 12
'''
spark.sql(query2).show(5)

+-----------+-----------+
|pickup_date|pickup_time|
+-----------+-----------+
| 2021-03-01|          1|
| 2021-03-01|          1|
| 2021-03-01|          1|
| 2021-03-01|          1|
| 2021-03-01|          1|
+-----------+-----------+
only showing top 5 rows



In [39]:
spark.sql(query2).explain()

== Physical Plan ==
*(3) Project [cast(tpep_pickup_datetime#17 as date) AS pickup_date#419, hour(cast(tpep_pickup_datetime#17 as timestamp), Some(Asia/Seoul)) AS pickup_time#421]
+- *(3) BroadcastHashJoin [DOLocationID#24], [LocationID#425], LeftOuter, BuildRight, false
   :- *(3) Project [tpep_pickup_datetime#17, DOLocationID#24]
   :  +- *(3) BroadcastHashJoin [PULocationID#23], [LocationID#68], LeftOuter, BuildRight, false
   :     :- *(3) Filter ((isnotnull(tpep_pickup_datetime#17) AND (hour(cast(tpep_pickup_datetime#17 as timestamp), Some(Asia/Seoul)) > 0)) AND (hour(cast(tpep_pickup_datetime#17 as timestamp), Some(Asia/Seoul)) <= 12))
   :     :  +- FileScan csv [tpep_pickup_datetime#17,PULocationID#23,DOLocationID#24] Batched: false, DataFilters: [isnotnull(tpep_pickup_datetime#17), (hour(cast(tpep_pickup_datetime#17 as timestamp), Some(Asia/..., Format: CSV, Location: InMemoryFileIndex[file:/home/lab09/git/src/data/trips/yellow_tripdata_2021-01.csv, file:/home/lab..., Partition

In [40]:
# 실행계획, 실행결과(4040)
query3 = '''
select pickup_date, count(*) as trip_count
from comb
where pickup_time > 0
group by pickup_date
order by pickup_date
'''
spark.sql(query3).show(5)



+-----------+----------+
|pickup_date|trip_count|
+-----------+----------+
| 2002-12-31|         1|
| 2004-04-04|         1|
| 2008-12-31|        15|
| 2009-01-01|        23|
| 2020-12-31|        16|
+-----------+----------+
only showing top 5 rows



                                                                                

In [41]:
spark.sql(query3).explain()

== Physical Plan ==
*(5) Sort [pickup_date#419 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(pickup_date#419 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#1840]
   +- *(4) HashAggregate(keys=[pickup_date#419], functions=[count(1)])
      +- Exchange hashpartitioning(pickup_date#419, 200), ENSURE_REQUIREMENTS, [id=#1836]
         +- *(3) HashAggregate(keys=[pickup_date#419], functions=[partial_count(1)])
            +- *(3) Project [cast(tpep_pickup_datetime#17 as date) AS pickup_date#419]
               +- *(3) BroadcastHashJoin [DOLocationID#24], [LocationID#425], LeftOuter, BuildRight, false
                  :- *(3) Project [tpep_pickup_datetime#17, DOLocationID#24]
                  :  +- *(3) BroadcastHashJoin [PULocationID#23], [LocationID#68], LeftOuter, BuildRight, false
                  :     :- *(3) Filter (isnotnull(tpep_pickup_datetime#17) AND (hour(cast(tpep_pickup_datetime#17 as timestamp), Some(Asia/Seoul)) > 0))
                  :     :  +- FileScan csv [

In [12]:
comb_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- pickup_date: date (nullable = true)
 |-- dropoff_date: date (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- dropoff_time: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- pickup_zone: string (nullable = true)
 |-- dropoff_zone: string (nullable = true)



In [None]:
# 1. 운행 거리와 요금의 상관관계 분석  > 쿼리, 데이터프레임으로 구현


In [34]:
spark.sql(
    '''
    SELECT payment_type, total_amount, tip_amount, trip_distance
    FROM taxi
    LIMIT 10 
    '''
).show()

+------------+------------+----------+-------------+
|payment_type|total_amount|tip_amount|trip_distance|
+------------+------------+----------+-------------+
|           1|       70.07|     11.65|         16.5|
|           1|       11.16|      1.86|         1.13|
|           1|       18.59|      4.29|         2.68|
|           1|       43.67|      7.25|          9.3|
|           1|        46.1|      7.68|         9.58|
|           1|        19.3|       2.0|         3.58|
|           1|        14.8|       5.0|         0.91|
|           1|       47.25|      9.45|        13.41|
|           1|       14.16|      2.36|         1.53|
|           1|       54.96|      9.16|         16.6|
+------------+------------+----------+-------------+



In [35]:
spark.sql(
    '''
    SELECT *
    FROM taxi
    LIMIT 10 
    '''
).show()

+--------+-----------+------------+-----------+------------+---------------+-------------+------------+----------+------------+------------+--------------------+--------------------+
|VendorID|pickup_date|dropoff_date|pickup_time|dropoff_time|passenger_count|trip_distance|payment_type|tip_amount|total_amount|tolls_amount|         pickup_zone|        dropoff_zone|
+--------+-----------+------------+-----------+------------+---------------+-------------+------------+----------+------------+------------+--------------------+--------------------+
|       1| 2021-03-01|  2021-03-01|          0|           0|              0|         16.5|           1|     11.65|       70.07|        6.12|   LaGuardia Airport|                  NA|
|       2| 2021-03-01|  2021-03-01|          0|           0|              1|         1.13|           1|      1.86|       11.16|         0.0|        East Chelsea|                  NV|
|       2| 2021-03-01|  2021-03-01|          0|           0|              1|         

In [33]:
spark.sql(
    '''
    SELECT pickup_time, count(VendorID) as using_c, 
    FROM taxi
    Group By pickup_time
    Order By using_c Desc 
    Limit 5
    '''
).show()

AnalysisException: cannot resolve '`pickup_time`' given input columns: []; line 4 pos 13;
'GlobalLimit 5
+- 'LocalLimit 5
   +- 'Sort ['using_c DESC NULLS LAST], true
      +- 'Aggregate ['pickup_time], ['pickup_time, 'count('VendorID) AS using_c#1016, 'FROM AS taxi#1017]
         +- OneRowRelation


In [None]:

spark.sql(
    '''
    SELECT pickup_time, count(VendorID)
    FROM comb
    Group By pickup_time
    '''
).show()

In [None]:
# 4. 승차지역/하차지역별 평균거리, 요금


In [None]:
# 5. 팁의 비율에 따른 거리, 여행 건수 서비스 관련 분석


In [44]:
spark.stop()