In [101]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.types import *
from pyspark.sql.window import Window
import numpy as np

spark = SparkSession.builder.getOrCreate()

In [126]:
cab_rides = spark.read.csv("../Uber_Lyft_Cab_prices/cab_rides.csv", header=True, inferSchema=True)
weather = spark.read.csv("../Uber_Lyft_Cab_prices/weather.csv", header=True, inferSchema=True)

                                                                                

# Feature Engineering

In [127]:
cab_rides = (cab_rides
            .withColumn("day", f.from_unixtime(f.col("time_stamp")/1000, "yyyy-MM-dd HH:mm"))
            .withColumn("time_hour", f.from_unixtime(f.col("time_stamp")/1000, "yyyy-MM-dd HH"))
            .withColumn("hour", f.from_unixtime(f.col("time_stamp")/1000, "HH").cast(IntegerType()))
            .withColumn("day_of_week", f.from_unixtime(f.col("time_stamp")/1000, "EEEE"))
            .withColumn("month", f.from_unixtime(f.col("time_stamp")/1000, "MM").cast(IntegerType()))
)
weather = (weather
            .withColumn("time_hour", f.from_unixtime(f.col("time_stamp"), "yyyy-MM-dd HH"))
)

In [128]:
print("cab_rides dim: ", (cab_rides.count(), len(cab_rides.columns)))
print("weather dim: ", (weather.count(), len(weather.columns)))

cab_rides dim:  (693071, 15)
weather dim:  (6276, 9)


In [129]:
df = cab_rides.join(weather, on=(cab_rides["time_hour"] == weather["time_hour"]) & (cab_rides["source"] == weather["location"])).drop("time_hour", "location", "time_stamp")

In [130]:
df.show(5)

+--------+--------+-------------+----------------+-----+----------------+--------------------+------------+------------+----------------+----+-----------+-----+-----+------+--------+------+--------+-----+
|distance|cab_type|  destination|          source|price|surge_multiplier|                  id|  product_id|        name|             day|hour|day_of_week|month| temp|clouds|pressure|  rain|humidity| wind|
+--------+--------+-------------+----------------+-----+----------------+--------------------+------------+------------+----------------+----+-----------+-----+-----+------+--------+------+--------+-----+
|    0.44|    Lyft|North Station|Haymarket Square|  5.0|             1.0|424553bb-7174-41e...|   lyft_line|      Shared|2018-12-16 07:30|   7|     Sunday|   12|38.46|  0.29| 1022.25|  null|    0.76| 7.68|
|    0.44|    Lyft|North Station|Haymarket Square| 11.0|             1.0|4bd23055-6827-41c...|lyft_premier|         Lux|2018-11-27 00:00|   0|    Tuesday|   11|43.82|  0.99| 1002.5

In [131]:
df.summary(["distance", "price", "surge_multiplier", "hour", "month", "temp", "clouds", "pressure", "rain", "humidity", "wind"]).show()

[Stage 257:>                                                        (0 + 8) / 8]

+-------+------------------+------------------+-------------------+------------------+------------------+----------------+------------------+------------------+-------------------+-------------------+-----------------+
|summary|          distance|             price|   surge_multiplier|              hour|             month|            temp|            clouds|          pressure|               rain|           humidity|             wind|
+-------+------------------+------------------+-------------------+------------------+------------------+----------------+------------------+------------------+-------------------+-------------------+-----------------+
|  count|           1265675|           1164996|            1265675|           1265675|           1265675|         1265675|           1265675|           1265675|             206947|            1265675|          1265675|
|   mean|2.1895901159455953|16.543447041878256| 1.0138785628222096|12.056116301578209|11.329610682047129|39.2568016354924|0.

                                                                                

In [132]:
check_price = df.filter(f.col("price").isNull())
print("check price isnull: ", check_price.count())

check price isnull:  100679


In [133]:
df = (df.withColumn("rain", f.when(f.col("rain").isNull(), 0).otherwise(f.col("rain"))))
df.count()

                                                                                

1265675

In [134]:
check_price.describe(["distance", "price", "surge_multiplier", "hour", "month", "temp", "clouds", "pressure", "rain", "humidity", "wind"]).show()



+-------+-----------------+-----+----------------+------------------+-------------------+-----------------+------------------+------------------+-------------------+-------------------+------------------+
|summary|         distance|price|surge_multiplier|              hour|              month|             temp|            clouds|          pressure|               rain|           humidity|              wind|
+-------+-----------------+-----+----------------+------------------+-------------------+-----------------+------------------+------------------+-------------------+-------------------+------------------+
|  count|           100679|    0|          100679|            100679|             100679|           100679|            100679|            100679|              16636|             100679|            100679|
|   mean|2.190055920301131| null|             1.0|12.002622195293954|  11.33063498842857|39.26622016507909|0.6822318457672294|1005.0268542595858|0.06785021038711223| 0.754784910457

                                                                                

In [135]:
df = df.where(f.col("price").isNotNull())

In [136]:
df.write.parquet("../Uber_Lyft_Cab_prices/df_final.parquet")

[Stage 273:>                                                        (0 + 8) / 8]

23/02/01 21:39:15 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/02/01 21:39:15 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/02/01 21:39:20 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers


                                                                                