In [1]:
from pyspark.sql.functions import to_timestamp

In [2]:
df = spark.read.format("csv").options(header=True, inferSchema=True, ignoreTrailingWhiteSpace=True).load('green_tripdata_2013-09.csv')
df_clean = df.select(
    df.VendorID.alias('vendor_id'),
    to_timestamp(df.lpep_pickup_datetime, 'yyyy-MM-dd HH:mm:ss').alias('lpep_pickup_datetime'),
    to_timestamp(df.Lpep_dropoff_datetime, 'yyyy-MM-dd HH:mm:ss').alias('lpep_dropoff_datetime'),
    df.Store_and_fwd_flag.alias('store_and_fwd_flag'),
    df.RateCodeID.alias('rate_code_id'),
    df.Pickup_longitude.alias('pickup_longitude'),
    df.Pickup_latitude.alias('pickup_latitude'),
    df.Dropoff_longitude.alias('dropoff_longitude'),
    df.Dropoff_latitude.alias('dropoff_latitude'),
    df.Passenger_count.alias('passenger_count'),
    df.Trip_distance.alias('trip_distance'),
    df.Fare_amount.alias('fare_amount'),
    df.Extra.alias('extra'),
    df.MTA_tax.alias('mta_tax'),
    df.Tip_amount.alias('tip_amount'),
    df.Tolls_amount.alias('tolls_amount'),
    df.Ehail_fee.alias('ehail_fee'),
    df.Total_amount.alias('total_amount'),
    df.Payment_type.alias('payment_type'),
    df.Trip_type.alias('trip_type'),
)
df_clean.write.mode("overwrite").parquet("green_trip_data.parquet")
df_clean.count()

49647

In [3]:
green_trip_data = spark.read.parquet("green_trip_data.parquet")
green_trip_data.createOrReplaceTempView("green_trip_data")
query = """
    SELECT 
        vendor_id,
        lpep_pickup_datetime,
        lpep_dropoff_datetime,
        store_and_fwd_flag,
        rate_code_id,
        pickup_longitude,
        pickup_latitude,
        dropoff_longitude,
        dropoff_latitude,
        passenger_count,
        trip_distance,
        fare_amount,
        extra,
        mta_tax,
        tip_amount,
        tolls_amount,
        ehail_fee,
        total_amount,
        payment_type,
        trip_type,

        IF (HOUR(lpep_pickup_datetime) == 0, 1, 0) AS o_pickup_0,
        IF (HOUR(lpep_pickup_datetime) == 1, 1, 0) AS o_pickup_1,
        IF (HOUR(lpep_pickup_datetime) == 2, 1, 0) AS o_pickup_2,
        IF (HOUR(lpep_pickup_datetime) == 3, 1, 0) AS o_pickup_3,
        IF (HOUR(lpep_pickup_datetime) == 4, 1, 0) AS o_pickup_4,
        IF (HOUR(lpep_pickup_datetime) == 5, 1, 0) AS o_pickup_5,
        IF (HOUR(lpep_pickup_datetime) == 6, 1, 0) AS o_pickup_6,
        IF (HOUR(lpep_pickup_datetime) == 7, 1, 0) AS o_pickup_7,
        IF (HOUR(lpep_pickup_datetime) == 8, 1, 0) AS o_pickup_8,
        IF (HOUR(lpep_pickup_datetime) == 9, 1, 0) AS o_pickup_9,
        IF (HOUR(lpep_pickup_datetime) == 10, 1, 0) AS o_pickup_10,
        IF (HOUR(lpep_pickup_datetime) == 11, 1, 0) AS o_pickup_11,
        IF (HOUR(lpep_pickup_datetime) == 12, 1, 0) AS o_pickup_12,
        IF (HOUR(lpep_pickup_datetime) == 13, 1, 0) AS o_pickup_13,
        IF (HOUR(lpep_pickup_datetime) == 14, 1, 0) AS o_pickup_14,
        IF (HOUR(lpep_pickup_datetime) == 15, 1, 0) AS o_pickup_15,
        IF (HOUR(lpep_pickup_datetime) == 16, 1, 0) AS o_pickup_16,
        IF (HOUR(lpep_pickup_datetime) == 17, 1, 0) AS o_pickup_17,
        IF (HOUR(lpep_pickup_datetime) == 18, 1, 0) AS o_pickup_18,
        IF (HOUR(lpep_pickup_datetime) == 19, 1, 0) AS o_pickup_19,
        IF (HOUR(lpep_pickup_datetime) == 20, 1, 0) AS o_pickup_20,
        IF (HOUR(lpep_pickup_datetime) == 21, 1, 0) AS o_pickup_21,
        IF (HOUR(lpep_pickup_datetime) == 22, 1, 0) AS o_pickup_22,
        IF (HOUR(lpep_pickup_datetime) == 23, 1, 0) AS o_pickup_23,
        
        IF (DAYOFWEEK(lpep_pickup_datetime) == 1, 1, 0) AS o_pickup_sunday,
        IF (DAYOFWEEK(lpep_pickup_datetime) == 2, 1, 0) AS o_pickup_monday,
        IF (DAYOFWEEK(lpep_pickup_datetime) == 3, 1, 0) AS o_pickup_tuesday,
        IF (DAYOFWEEK(lpep_pickup_datetime) == 4, 1, 0) AS o_pickup_wednesday,
        IF (DAYOFWEEK(lpep_pickup_datetime) == 5, 1, 0) AS o_pickup_thursday,
        IF (DAYOFWEEK(lpep_pickup_datetime) == 6, 1, 0) AS o_pickup_friday,
        IF (DAYOFWEEK(lpep_pickup_datetime) == 7, 1, 0) AS o_pickup_saturday,
        
        BIGINT(lpep_dropoff_datetime) - BIGINT(lpep_pickup_datetime) AS trip_duration,
        
        IF (
            ACOS(SIN(RADIANS(40.6413111))*SIN(RADIANS(pickup_latitude)) + COS(RADIANS(40.6413111))*COS(RADIANS(pickup_latitude))*COS(RADIANS(-73.7781391) - RADIANS(pickup_longitude)))*6371 <= 5 
            OR 
            ACOS(SIN(RADIANS(40.6413111))*SIN(RADIANS(dropoff_latitude)) + COS(RADIANS(40.6413111))*COS(RADIANS(dropoff_latitude))*COS(RADIANS(-73.7781391) - RADIANS(dropoff_longitude)))*6371 <= 5
            , 1
            , 0
        ) AS at_jfk_airport
    FROM 
        green_trip_data
"""
green_trip_data_prep = spark.sql(query)
green_trip_data_prep.write.mode("overwrite").parquet("green_trip_data_prep.parquet")
green_trip_data_prep.count()

49647