In this notebook i want to create some features
- trip duration
- date columns like year month etc ...


In [7]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession


spark = SparkSession.builder \
    .appName("NYC Taxi Analysis") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.ui.port", "4040") \
    .getOrCreate()



train = spark.read.parquet("/Users/anatolijperederij/PycharmProjects/nyc-taxi-ml-pipeline/data/processed/train")
eval = spark.read.parquet("/Users/anatolijperederij/PycharmProjects/nyc-taxi-ml-pipeline/data/processed/eval")
holdout = spark.read.parquet("/Users/anatolijperederij/PycharmProjects/nyc-taxi-ml-pipeline/data/processed/holdout")

print(f"train rows : {train.count()}")
print(f"eval rows : {eval.count()}")
print(f"holdout rows : {holdout.count()}")

train.printSchema()

train rows : 57892523
eval rows : 10036979
holdout rows : 10378129
root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: void (nullable = true)
 |-- airport_fee: void (nullable = true)



In [9]:
import math
def add_date(df):
    df = df.withColumn("Year", F.year(F.col("tpep_pickup_datetime")))
    df = df.withColumn("Month", F.month(F.col("tpep_pickup_datetime")))
    df = df.withColumn("Week", F.weekofyear(F.col("tpep_pickup_datetime")))
    df = df.withColumn("Day", F.dayofmonth(F.col("tpep_pickup_datetime")))
    df = df.withColumn("DayOfWeek", F.dayofweek(F.col("tpep_pickup_datetime")))
    df = df.withColumn("Hour", F.hour(F.col("tpep_pickup_datetime")))
    df = df.withColumn("Hour" , F.sin(2 * F.pi() * F.col("hour") / 24))
    df = df.withColumn("Duration_minutes",
                       (F.unix_timestamp("tpep_dropoff_datetime") - F.unix_timestamp("tpep_pickup_datetime"))/60)
    df = df.drop("tpep_pickup_datetime")
    df = df.drop("tpep_dropoff_datetime")
    return df

train = add_date(train)
eval = add_date(eval)
holdout = add_date(holdout)

train.select("Hour").show(5)


+-------------------+
|               Hour|
+-------------------+
|-0.7071067811865471|
|-0.4999999999999997|
| 0.7071067811865476|
|0.49999999999999994|
|-0.4999999999999997|
+-------------------+
only showing top 5 rows


In [13]:
""" ALL COLUMNS without dates and trip distance we need it 100%
Index(['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime',
       'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag',
       'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra',
       'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge',
       'total_amount', 'congestion_surcharge', 'airport_fee'],
      dtype='object')
"""

cols_to_drop = ["VendorID" , "fare_amount" , "mta_tax" , "extra" , "tip_amount" , "tolls_amount" , "improvement_surcharge" , "store_and_fwd_flag", "passenger_count"]

def drop_cols(df):
    df = df.drop(*cols_to_drop)
    return df

train = drop_cols(train)
eval = drop_cols(eval)
holdout = drop_cols(holdout)

train.limit(1).toPandas().head()

Unnamed: 0,trip_distance,RatecodeID,PULocationID,DOLocationID,payment_type,total_amount,congestion_surcharge,airport_fee,Year,Month,Week,Day,DayOfWeek,Hour,Duration_minutes
0,1.0,1,230,162,1,11.15,,,2016,1,2,14,5,-0.707107,11.133333


In [14]:
train = train.repartition(4)  # 4 файла вместо сотен
train.write.mode("overwrite").parquet("../data/processed/train")
eval = eval.repartition(4)  # 4 файла вместо сотен
eval.write.mode("overwrite").parquet("../data/processed/eval")
holdout = holdout.repartition(4)  # 4 файла вместо сотен
holdout.write.mode("overwrite").parquet("../data/processed/holdout")
spark.stop()

                                                                                