In [1]:
from pyspark.sql import SparkSession 

In [8]:
spark = SparkSession.builder.appName("taxi-analysis").getOrCreate()

# 데이터 프레임 생성

In [11]:
trip_files = '/Users/dongwoo/new_york/data/trips/*' # 모든 파일을 가져온다.
zone_file = '/Users/dongwoo/new_york/data/taxi+_zone_lookup.csv' 

In [16]:
trips_df = spark.read.parquet(f"file:///{trip_files}", inferSchema=True, header=True)
zone_df = spark.read.csv(f"file:///{zone_file}", inferSchema=True, header=True)

### 스키마 파싱

In [18]:
trips_df.printSchema()
zone_df.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zo

### 쿼리

In [19]:
trips_df.createOrReplaceTempView('trips') # trips 라는 테이블로 Temporary View에 저장한다.
zone_df.createOrReplaceTempView('zone')

In [34]:
query = '''
SELECT
    t.VendorID AS vender_id,
    TO_DATE(t.tpep_pickup_datetime) AS pickup_date,
    TO_DATE(t.tpep_dropoff_datetime) AS dropoff_date,
    HOUR(t.tpep_pickup_datetime) AS pickup_time,
    HOUR(t.tpep_dropoff_datetime) AS dropoff_time,
    t.trip_distance,
    t.fare_amount,
    t.tip_amount,
    t.tolls_amount,
    t.total_amount,
    t.payment_type,
    
    pz.zone AS pickup_zone,
    dz.zone AS dropoff_zone
FROM
    trips AS t
    LEFT JOIN
        zone AS pz
    ON
        t.PULocationID = pz.LocationID
    LEFT JOIN
        zone AS dz
    ON
        t.DOLocationID = dz.LocationID
'''
combination_df = spark.sql(query)
# combination_df.show()
combination_df.createOrReplaceTempView('comb')

In [36]:
combination_df.printSchema()

root
 |-- vender_id: long (nullable = true)
 |-- pickup_date: date (nullable = true)
 |-- dropoff_date: date (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- dropoff_time: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- pickup_zone: string (nullable = true)
 |-- dropoff_zone: string (nullable = true)



```sql
SELECT
    t.VendorID AS vender_id, -- vender ID
    TO_DATE(t.tpep_pickup_datetime) AS pickup_date, -- 상차 날짜
    TO_DATE(t.tpep_dropoff_datetime) AS dropoff_date, -- 하차 날짜
    HOUR(t.tpep_pickup_datetime) AS pickup_time, -- 상차 시간
    HOUR(t.tpep_dropoff_datetime) AS dropoff_time, -- 하차 시간
    t.trip_distance, -- 운행 길이
    t.fare_amount,- 운행 요금
    t.tip_amount, 
    t.tolls_amount,
    t.total_amount, - 전체 지불 
    t.payment_type, - 지불 타입
    
    pz.zone AS pickup_zone,
    dz.zone AS dropoff_zone
FROM
    trips AS t
    LEFT JOIN -- pickup
        zone AS pz
    ON
        t.PULocationID = pz.LocationID
        
    LEFT JOIN -- dropoff
        zone AS dz
    ON
        t.DOLocationID = dz.LocationID
```

## 데이터 클리닝

In [41]:
spark.sql("SELECT pickup_date, pickup_time FROM comb WHERE pickup_date < '2020-12-31' ").show()

+-----------+-----------+
|pickup_date|pickup_time|
+-----------+-----------+
| 2009-01-01|          9|
| 2009-01-01|          8|
| 2009-01-01|          8|
| 2009-01-01|          8|
| 2009-01-01|          8|
| 2009-01-01|          8|
| 2009-01-01|          9|
| 2009-01-01|          9|
| 2009-01-01|         10|
| 2009-01-01|         11|
| 2009-01-01|          9|
| 2009-01-01|          8|
| 2009-01-01|         10|
| 2009-01-01|          7|
| 2009-01-01|          8|
| 2009-01-01|          8|
| 2009-01-01|          8|
| 2003-01-09|         16|
| 2009-01-01|          8|
| 2009-01-01|          8|
+-----------+-----------+
only showing top 20 rows



In [42]:
combination_df.select('total_amount').describe().show()



+-------+------------------+
|summary|      total_amount|
+-------+------------------+
|  count|          30904308|
|   mean|19.696304829291634|
| stddev|179.19426093049208|
|    min|            -951.0|
|    max|         818286.74|
+-------+------------------+



                                                                                

In [43]:
combination_df.select('trip_distance').describe().show()

+-------+-----------------+
|summary|    trip_distance|
+-------+-----------------+
|  count|         30904308|
|   mean|6.922688674342763|
| stddev| 698.379951118258|
|    min|              0.0|
|    max|        351613.36|
+-------+-----------------+



