In [1]:
import ConnectionConfig as cc
from delta import DeltaTable
from pyspark.sql.functions import col, hour, explode,monotonically_increasing_id,from_unixtime, unix_timestamp
cc.setupEnvironment()

In [2]:
spark = cc.startLocalCluster("factRide")
spark.getActiveSession()

In [3]:
#Set Connection
cc.set_connectionProfile("VeloDB")

#Extract the ride from the database 
df_operational_rides = spark.read \
    .format("jdbc") \
    .option("url", cc.create_jdbc()) \
    .option("driver" , cc.get_Property("driver")) \
    .option("dbtable", "rides") \
    .option("user", cc.get_Property("username")) \
    .option("password", cc.get_Property("password")) \
    .option("partitionColumn", "rideId") \
    .option("numPartitions", 4) \
    .option("lowerBound", 0) \
    .option("upperBound", 1000) \
    .load()

#Extract the locks from the database 
df_operational_locks = spark.read \
    .format("jdbc") \
    .option("url", cc.create_jdbc()) \
    .option("driver" , cc.get_Property("driver")) \
    .option("dbtable", "locks") \
    .option("user", cc.get_Property("username")) \
    .option("password", cc.get_Property("password")) \
    .option("partitionColumn", "lockId") \
    .option("numPartitions", 4) \
    .option("lowerBound", 0) \
    .option("upperBound", 1000) \
    .load()

#Extract the stations from the database 
df_operational_stations = spark.read \
    .format("jdbc") \
    .option("url", cc.create_jdbc()) \
    .option("driver" , cc.get_Property("driver")) \
    .option("dbtable", "stations") \
    .option("user", cc.get_Property("username")) \
    .option("password", cc.get_Property("password")) \
    .option("partitionColumn", "stationId") \
    .option("numPartitions", 4) \
    .option("lowerBound", 0) \
    .option("upperBound", 1000) \
    .load()

# Make temporary views for rides, stations and locks
df_operational_rides.createOrReplaceTempView("rides")
df_operational_stations.createOrReplaceTempView("stations")
df_operational_locks.createOrReplaceTempView("locks")


#Join the rides with locks and stations to get the zipcode 
df_rides_with_zip = spark.sql("""
SELECT r.*, s.zipcode
    FROM rides r
    LEFT JOIN locks l ON r.startlockId = l.lockId
    LEFT JOIN stations s ON l.stationId = s.stationId
""")

df_rides_with_zip.createOrReplaceTempView("rides_with_zip")

spark.sql("SELECT * FROM rides_with_zip").show()


+------+-----------------+-----------------+-------------------+-------------------+---------+--------------+-----------+---------+-------+
|rideid|       startpoint|         endpoint|          starttime|            endtime|vehicleid|subscriptionid|startlockid|endlockid|zipcode|
+------+-----------------+-----------------+-------------------+-------------------+---------+--------------+-----------+---------+-------+
|     4|(51.2023,4.41208)|(51.2119,4.39894)|2015-09-22 00:00:00|2012-09-22 00:00:00|     1208|         31000|       1821|     2186|   2018|
|     9|(51.2182,4.41238)| (51.209,4.43266)|2015-09-22 00:00:00|2012-09-22 00:00:00|     5298|         71164|         50|     2067|   2000|
|    12|(51.2088,4.40834)|(51.2145,4.44373)|2015-09-22 00:00:00|2012-09-22 00:00:00|      957|         59847|       2039|     3038|   2000|
|    13|(51.1744,4.40334)|(51.2228,4.42426)|2015-09-22 00:00:00|2012-09-22 00:00:00|     5413|          5045|       5619|     2717|   2610|
|    18|(51.2023,4.4

In [4]:
#Extract dimWeather and dimDate
dim_Weather = spark.read.format("delta").load("spark-warehouse/dimWeather")
dim_Date = spark.read.format("delta").load("spark-warehouse/dimDate")

#Transform dimWeather and dimDate
dim_Weather.createOrReplaceTempView("dimWeather")
dim_Date.createOrReplaceTempView("dimDate")

In [5]:
# Load the JSON files from the weather folder
df_weather = spark.read.json("./weather/", multiLine=True)

df_weather = df_weather.withColumn("weather_time", from_unixtime(col("dt")))

# Flatten the weather column (assuming it's an array)
df_weather_flat = df_weather.withColumn("weather", explode(col("weather")))

# Flatten the DataFrame
df_weather_flat = df_weather_flat.select(
    col("weather.id").alias("weather_id"),
    col("zipCode"),
    col("weather.main").alias("weather_main"),
    col("weather_time")
)

df_weather_flat.createOrReplaceTempView("weatherResponses")
weather_data_responses = spark.sql("SELECT * FROM weatherResponses")
weather_data_responses.show()

+----------+-------+------------+-------------------+
|weather_id|zipCode|weather_main|       weather_time|
+----------+-------+------------+-------------------+
|       801|   2140|      Clouds|2024-10-07 20:58:17|
|       800|   2140|       Clear|2024-10-07 20:58:17|
|       500|   2140|        Rain|2024-10-07 10:00:00|
|       801|   2170|      Clouds|2024-10-07 20:58:17|
|       800|   2170|       Clear|2024-10-07 20:58:17|
|       500|   2170|        Rain|2024-10-07 20:58:17|
|       801|   2600|      Clouds|2024-10-07 20:58:17|
|       800|   2600|       Clear|2024-10-07 20:58:17|
|       500|   2600|        Rain|2024-10-07 20:58:17|
|       801|   2000|      Clouds|2023-10-07 00:20:00|
|       801|   2018|      Clouds|2023-10-30 09:00:00|
|       801|   2020|      Clouds|2024-10-07 20:58:17|
|       801|   2030|      Clouds|2024-10-07 20:58:17|
|       801|   2050|      Clouds|2024-10-07 20:58:17|
|       801|   2060|      Clouds|2024-10-07 20:58:17|
|       801|   2100|      Cl

In [6]:
# Extract hour from ride start time and weather timestamp
df_rides_with_zip = df_rides_with_zip.withColumn("ride_hour", hour(col("starttime")))
df_weather_flat = df_weather_flat.withColumn("weather_hour", hour(col("weather_time")))

df_rides_with_zip.createOrReplaceTempView("rides_with_zip")
df_weather_flat.createOrReplaceTempView("weatherResponses")

In [7]:
#Join rides with weather responses based on zipcode and hour\
df_rides_weather_joined = spark.sql("""
    SELECT rwz.*, wr.weather_id, wr.zipCode as weather_zipcode, 
    wr.weather_main, wr.weather_hour 
    from rides_with_zip rwz
    left join weatherResponses wr on rwz.zipcode = wr.zipCode
    AND rwz.ride_hour = wr.weather_hour
""")
# When I remove the On clause AND rwz.ride_hour = wr.weather_hour, I can get allt the values of NULL values that I can see right now 
df_rides_weather_joined.createOrReplaceTempView("rides_weather")

df_rides_weather_joined.show(50)



+------+-----------------+-----------------+-------------------+-------------------+---------+--------------+-----------+---------+-------+---------+----------+---------------+------------+------------+
|rideid|       startpoint|         endpoint|          starttime|            endtime|vehicleid|subscriptionid|startlockid|endlockid|zipcode|ride_hour|weather_id|weather_zipcode|weather_main|weather_hour|
+------+-----------------+-----------------+-------------------+-------------------+---------+--------------+-----------+---------+-------+---------+----------+---------------+------------+------------+
|     4|(51.2023,4.41208)|(51.2119,4.39894)|2015-09-22 00:00:00|2012-09-22 00:00:00|     1208|         31000|       1821|     2186|   2018|        0|       800|           2018|       Clear|           0|
|     9|(51.2182,4.41238)| (51.209,4.43266)|2015-09-22 00:00:00|2012-09-22 00:00:00|     5298|         71164|         50|     2067|   2000|        0|       801|           2000|      Clouds

In [8]:
# Join the rides with dimWeather and dimDate based on weather condition and date
df_final_rides_with_weather = spark.sql("""
    SELECT monotonically_increasing_id() as rideSK, rideid as ride_id, rwz.startpoint, rwz.endpoint, rwz.starttime, dd.dateSK, rwz.endtime, dd_end.dateSK as enddateSK,rwz.vehicleid, rwz.subscriptionid, rwz.startlockid, rwz.endlockid, 
        rwz.zipcode, rwz.ride_hour,
        rwz.weather_id,
        CASE
            WHEN rwz.weather_main IS NOT NULL THEN rwz.weather_main
            ELSE 'unknown'
        END as weather_main,
        rwz.weather_condition_mapped, 
        coalesce(dw.weatherSK, 4) as weatherSK,
        coalesce(dw.weather_category, 'unknown') as weather_category,
        unix_timestamp(rwz.endtime) - unix_timestamp(rwz.starttime) as ride_duration
    FROM (
        SELECT rw.*,
               CASE
                   WHEN wr.weather_main = 'Clouds' THEN 'Cloudy'
                   WHEN wr.weather_main = 'Clear' THEN 'Sunny'
                   WHEN wr.weather_main = 'Rain' THEN 'Rainy'
                   ELSE 'unknown'
               END as weather_condition_mapped
        FROM rides_weather rw
        LEFT JOIN weatherResponses wr
        ON rw.zipcode = wr.zipCode AND rw.ride_hour = wr.weather_hour
    ) rwz
    LEFT JOIN dimWeather dw 
    ON rwz.weather_condition_mapped = dw.weather_condition
    LEFT JOIN dimDate dd
    ON to_date(rwz.starttime) = dd.calendarDate
    LEFT JOIN dimDate dd_end
    ON to_date(rwz.endtime) = dd_end.calendarDate
""")

df_final_rides_with_weather.createOrReplaceTempView("finalRides")
df_final_rides_with_weather.show(500)




+-----------+-------+-----------------+-----------------+-------------------+------+-------------------+---------+---------+--------------+-----------+---------+-------+---------+----------+------------+------------------------+---------+----------------+-------------+
|     rideSK|ride_id|       startpoint|         endpoint|          starttime|dateSK|            endtime|enddateSK|vehicleid|subscriptionid|startlockid|endlockid|zipcode|ride_hour|weather_id|weather_main|weather_condition_mapped|weatherSK|weather_category|ride_duration|
+-----------+-------+-----------------+-----------------+-------------------+------+-------------------+---------+---------+--------------+-----------+---------+-------+---------+----------+------------+------------------------+---------+----------------+-------------+
|          0|      4|(51.2023,4.41208)|(51.2119,4.39894)|2015-09-22 00:00:00|  2455|2012-09-22 00:00:00|     1360|     1208|         31000|       1821|     2186|   2018|        0|       800|

In [9]:
# Define path for the Delta table
fact_ride_path = "spark-warehouse/factRideS1"

# Retrieve the DataFrame `finalRides`
fact_ride = spark.sql("""
   SELECT rideSK, weatherSK, dateSK, ride_duration
   FROM finalRides
""")
fact_ride.show(20)

# Save factRide DataFrame as a Delta table
fact_ride.write.format("delta").mode("overwrite").save(fact_ride_path)

+------+---------+------+-------------+
|rideSK|weatherSK|dateSK|ride_duration|
+------+---------+------+-------------+
|     0|        1|  2455|    -94608000|
|     1|        3|  2455|    -94608000|
|     2|        3|  2455|    -94608000|
|     3|        4|  2455|    -94608000|
|     4|        4|  3916|          304|
|     5|        4|  3916|          395|
|     6|        4|  3916|          641|
|     7|        4|  3916|         1176|
|     8|        4|  3916|          677|
|     9|        4|  3916|         1066|
|    10|        4|  3916|           25|
|    11|        4|  3916|          391|
|    12|        4|  3916|          269|
|    13|        4|  3916|          314|
|    14|        4|  3916|          815|
|    15|        4|  3916|         1143|
|    16|        4|  3916|          451|
|    17|        4|  3916|          289|
|    18|        4|  3916|          488|
|    19|        4|  3916|          647|
+------+---------+------+-------------+
only showing top 20 rows



In [10]:
# spark.sql("SELECT weatherSK, temperature_condition, weather_condition, weather_category FROM dimWeather").show()
df_rides_with_zip.select("ride_hour").distinct().show()
df_weather_flat.select("weather_hour").distinct().show()

+---------+
|ride_hour|
+---------+
|        0|
|        8|
|        9|
|       10|
|       12|
|       11|
|       13|
|       14|
|       18|
|       15|
|       16|
|       17|
|       19|
|       20|
|       23|
|        6|
|       22|
|        2|
|        4|
|        5|
+---------+
only showing top 20 rows

+------------+
|weather_hour|
+------------+
|          20|
|          10|
|           0|
|           9|
|          23|
+------------+



In [11]:
spark.stop()