In [1]:
import ConnectionConfig as cc
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
import math
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

cc.setupEnvironment()
spark = cc.startLocalCluster("factRides")
spark.getActiveSession()

In [2]:
#EXTRACT
cc.set_connectionProfile("default")
ride_src_df = spark.read \
    .format("jdbc") \
    .option("url", cc.create_jdbc()) \
    .option("driver" , cc.get_Property("driver")) \
    .option("dbtable", "rides") \
    .option("user", cc.get_Property("username")) \
    .option("password", cc.get_Property("password")) \
    .option("partitionColumn", "rideid") \
    .option("numPartitions", 4) \
    .option("lowerBound", 0) \
    .option("upperBound", 5000) \
    .load()

ride_src_df.show()



+------+-----------------+-----------------+-------------------+-------------------+---------+--------------+-----------+---------+
|rideid|       startpoint|         endpoint|          starttime|            endtime|vehicleid|subscriptionid|startlockid|endlockid|
+------+-----------------+-----------------+-------------------+-------------------+---------+--------------+-----------+---------+
|     1|(51.2083,4.44595)|(51.1938,4.40228)|2015-09-22 00:00:00|2012-09-22 00:00:00|      844|         13296|       4849|     3188|
|     2|(51.2174,4.41597)|(51.2188,4.40935)|2015-09-22 00:00:00|2012-09-22 00:00:00|     4545|         45924|       NULL|     NULL|
|     3|(51.2088,4.40834)|(51.2077,4.39846)|2015-09-22 00:00:00|2012-09-22 00:00:00|     3419|         25722|       2046|     1951|
|     4|(51.2023,4.41208)|(51.2119,4.39894)|2015-09-22 00:00:00|2012-09-22 00:00:00|     1208|         31000|       1821|     2186|
|     5|(51.1888,4.45039)|(51.2221,4.40467)|2015-09-22 00:00:00|2012-09-22 0

In [3]:
dim_date = spark.read.format("delta").load("spark-warehouse/date_dim")
dim_date.show()

+--------+----------+----+----------+--------+---------+------+----------+-------+
| date_sk|      date|year|month_name|month_nr| day_name|day_nr|is_weekday|quarter|
+--------+----------+----+----------+--------+---------+------+----------+-------+
|20090101|2009-01-01|2009|   January|       1| Thursday|     5|         Y|      1|
|20090102|2009-01-02|2009|   January|       1|   Friday|     6|         Y|      1|
|20090103|2009-01-03|2009|   January|       1| Saturday|     7|         N|      1|
|20090104|2009-01-04|2009|   January|       1|   Sunday|     1|         N|      1|
|20090105|2009-01-05|2009|   January|       1|   Monday|     2|         Y|      1|
|20090106|2009-01-06|2009|   January|       1|  Tuesday|     3|         Y|      1|
|20090107|2009-01-07|2009|   January|       1|Wednesday|     4|         Y|      1|
|20090108|2009-01-08|2009|   January|       1| Thursday|     5|         Y|      1|
|20090109|2009-01-09|2009|   January|       1|   Friday|     6|         Y|      1|
|200

In [4]:
dim_lock = spark.read.format("delta").load("spark-warehouse/lock_dim")
dim_lock.show()

+----------+-------+----------+--------------------+------+-------+---------+-----------------+
|station_id|lock_id|station_nr|              street|number|zipcode| district|        gps_coord|
+----------+-------+----------+--------------------+------+-------+---------+-----------------+
|        12|    217|       120|Schijnpoortweg (2...| 27-29|   2060|ANTWERPEN|(51.2276,4.43923)|
|        12|    218|       120|Schijnpoortweg (2...| 27-29|   2060|ANTWERPEN|(51.2276,4.43923)|
|        12|    219|       120|Schijnpoortweg (2...| 27-29|   2060|ANTWERPEN|(51.2276,4.43923)|
|        12|    220|       120|Schijnpoortweg (2...| 27-29|   2060|ANTWERPEN|(51.2276,4.43923)|
|        12|    221|       120|Schijnpoortweg (2...| 27-29|   2060|ANTWERPEN|(51.2276,4.43923)|
|        12|    222|       120|Schijnpoortweg (2...| 27-29|   2060|ANTWERPEN|(51.2276,4.43923)|
|        12|    223|       120|Schijnpoortweg (2...| 27-29|   2060|ANTWERPEN|(51.2276,4.43923)|
|        12|    224|       120|Schijnpoo

In [5]:
dim_vehicle = spark.read.format("delta").load("spark-warehouse/vehicle_dim")
dim_vehicle.show()

+----------+-------------------+
|biketypeid|biketypedescription|
+----------+-------------------+
|         1|          Velo Bike|
|         2|        Velo E-Bike|
|         3|               Step|
|         4|            Scooter|
+----------+-------------------+



In [6]:
dim_weather = spark.read.format("delta").load("spark-warehouse/weather_dim")
dim_weather.show()

+----------+------------+
|weather_id|weather_type|
+----------+------------+
|         1|  Unpleasant|
|         2|    Pleasant|
|         4|     Unknown|
|         3|     Neutral|
+----------+------------+



In [7]:
dim_user = spark.read.format("delta").load("spark-warehouse/user_dim_current")
dim_user.show()

+------+--------------------+--------------------+--------------------+--------+-------+--------------------+------------+--------------------+-------------------+-------------------+--------------------+-------+
|userid|                name|               email|              street|  number|zipcode|                city|country_code|             user_sk|          scd_start|            scd_end|                 md5|current|
+------+--------------------+--------------------+--------------------+--------+-------+--------------------+------------+--------------------+-------------------+-------------------+--------------------+-------+
| 45000|          de Wit Sem|Sem.de.Wit@gmail.com|          Palmenlaan|    167 |   2020|           Antwerpen|          BE|c4b24408-7e39-429...|1990-01-01 00:00:00|2100-12-12 00:00:00|fcd00ed6ae31a8a79...|   true|
| 45001|          Hoek Johan|Johan.Hoek@outloo...|        Schildedreef|    431 |   2970|s Gravenwezel/Sch...|          BE|d98a741e-0de4-429...|1990-

In [8]:
#EXTRACT
ride_src_df.createOrReplaceTempView("rides_source")

vehicles_src_df = spark.read \
    .format("jdbc") \
    .option("driver", cc.get_Property("driver")) \
    .option("url", cc.create_jdbc()) \
    .option("dbtable", "vehicles") \
    .option("user", cc.get_Property("username")) \
    .option("password", cc.get_Property("password")) \
    .load()

bikelots_src_df = spark.read \
    .format("jdbc") \
    .option("driver", cc.get_Property("driver")) \
    .option("url", cc.create_jdbc()) \
    .option("dbtable", "bikelots") \
    .option("user", cc.get_Property("username")) \
    .option("password", cc.get_Property("password")) \
    .load()

vehicles_src_df.show()
bikelots_src_df.show()

vehicles_src_df.createOrReplaceTempView("vehicle_view")
bikelots_src_df.createOrReplaceTempView("bikelot_view")

+---------+------------+---------+-------------------+------+-----------------+
|vehicleid|serialnumber|bikelotid|  lastmaintenanceon|lockid|         position|
+---------+------------+---------+-------------------+------+-----------------+
|        1|        1000|        1|2020-01-19 02:14:57|  NULL|(51.1968,4.40579)|
|        2|        2000|        1|2020-03-08 01:49:24|  NULL|(51.2177,4.42075)|
|        3|        3000|        1|2020-06-01 12:37:26|  1568|(51.1926,4.42151)|
|        4|        4000|        1|2020-02-27 03:13:56|  NULL|(51.2311,4.41267)|
|        5|        5000|        1|2021-03-21 03:38:31|  NULL|(51.2177,4.42075)|
|        6|        6000|        1|2020-06-16 21:44:19|  NULL|(51.2195,4.41169)|
|        7|        7000|        1|2019-10-01 10:29:21|  1556| (51.2273,4.4307)|
|        8|        8000|        1|2019-12-06 17:09:49|  NULL|(51.2047,4.39625)|
|        9|        9000|        1|2020-01-01 10:06:51|  NULL|(51.2058,4.41837)|
|       10|       10000|        1|2019-1

In [9]:
subscriptions_src_df = spark.read \
    .format("jdbc") \
    .option("driver", cc.get_Property("driver")) \
    .option("url", cc.create_jdbc()) \
    .option("dbtable", "subscriptions") \
    .option("user", cc.get_Property("username")) \
    .option("password", cc.get_Property("password")) \
    .load()

subscriptions_src_df.show()
subscriptions_src_df.createOrReplaceTempView("subscriptions_view")

+--------------+----------+------------------+------+
|subscriptionid| validfrom|subscriptiontypeid|userid|
+--------------+----------+------------------+------+
|             1|2019-08-02|                 3|     1|
|             2|2019-11-12|                 1|     1|
|             3|2020-12-14|                 1|     1|
|             4|2021-10-05|                 2|     2|
|             5|2022-09-17|                 3|     3|
|             6|2019-04-08|                 1|     4|
|             7|2022-05-16|                 3|     5|
|             8|2019-06-29|                 3|     6|
|             9|2023-11-30|                 3|     6|
|            10|2023-12-31|                 3|     6|
|            11|2021-12-01|                 2|     7|
|            12|2019-02-01|                 2|     8|
|            13|2023-11-26|                 3|     8|
|            14|2021-05-08|                 3|     9|
|            15|2023-08-15|                 3|     9|
|            16|2023-11-29| 

In [10]:
weather_schema = StructType([
    StructField("timestamp", StringType(), True),
    StructField("latitude", StringType(), True),
    StructField("longitude", StringType(), True),
    StructField("temperature", StringType(), True),
    StructField("precipitation", StringType(), True),
    StructField("weather_code", StringType(), True),
    StructField("zipCode", IntegerType(), True),
    StructField("weather_condition", StringType(), True)
])

df_weather = spark.read \
    .option("multiLine", True) \
    .schema(weather_schema) \
    .json("weather/")

df_weather.show()
df_weather.createOrReplaceTempView("weather_view")

+-------------------+--------+---------+-----------+-------------+------------+-------+-----------------+
|          timestamp|latitude|longitude|temperature|precipitation|weather_code|zipCode|weather_condition|
+-------------------+--------+---------+-----------+-------------+------------+-------+-----------------+
|2021-09-05 12:04:36| 51.2195|  4.41164|       20.7|          0.0|           0|   2000|         Pleasant|
|2021-09-05 14:00:00| 51.2334|  4.38254|       23.2|          0.0|           0|   2050|         Pleasant|
|2021-09-05 14:32:45| 51.2512|  4.43727|       23.0|          0.0|           0|   2170|         Pleasant|
|2021-09-05 12:46:54| 51.1893|  4.43551|       20.8|          0.0|           0|   2600|         Pleasant|
|2021-09-05 13:42:03| 51.1951|  4.41756|       22.2|          0.0|           0|   2600|         Pleasant|
|2021-09-05 13:02:38| 51.1705|  4.39471|       22.2|          0.0|           0|   2610|         Pleasant|
|2021-09-05 12:53:08| 51.1735|  4.35216|      

In [11]:
dim_weather.createOrReplaceTempView("dimWeather")
dim_user.createOrReplaceTempView("dimUser")
dim_lock.createOrReplaceTempView("dimLock")
dim_vehicle.createOrReplaceTempView("dimVehicle")
dim_date.createOrReplaceTempView("dimDate")

In [12]:
#TRANSFORM
def haversine_km(lat1, lon1, lat2, lon2):
    if None in (lat1, lon1, lat2, lon2):
        return None

    R = 6371  # Earth radius in kilometers
    lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
    c = 2 * math.asin(math.sqrt(a))
    return R * c

# Register it as a UDF
spark.udf.register("haversine_km", haversine_km, DoubleType())

<function __main__.haversine_km(lat1, lon1, lat2, lon2)>

In [13]:
ride_src_df.createOrReplaceTempView("rides_source")
ridesFactFromSource = spark.sql("select src.rideid as ride_id, du.user_sk, dd.date_sk, dl.lock_id as start_lock_id, dl2.lock_id as end_lock_id, dv.biketypeid as vehicle_id, coalesce(dw.weather_id, 4) as weather_id,\
                                haversine_km( \
                                    CAST(split_part(REPLACE(REPLACE(dl.gps_coord, '(', ''), ')', ''), ',', 1) AS DOUBLE), \
                                    CAST(split_part(REPLACE(REPLACE(dl.gps_coord, '(', ''), ')', ''), ',', 2) AS DOUBLE), \
                                    CAST(split_part(REPLACE(REPLACE(dl2.gps_coord, '(', ''), ')', ''), ',', 1) AS DOUBLE), \
                                    CAST(split_part(REPLACE(REPLACE(dl2.gps_coord, '(', ''), ')', ''), ',', 2) AS DOUBLE)\
                                    ) AS ride_distance, \
                                CASE \
                                    WHEN (unix_timestamp(src.endtime) - unix_timestamp(src.starttime)) / 60 < 0 \
                                        THEN NULL \
                                    ELSE round((unix_timestamp(src.endtime) - unix_timestamp(src.starttime)) / 60, 2) \
                                END AS ride_duration, \
                                md5(concat(ride_id, dd.date_sk)) as md5 \
                                from rides_source as src \
                                left outer join dimDate as dd on cast(src.starttime as DATE) = cast(dd.date as DATE) \
                                left outer join dimLock as dl on src.startlockid = dl.lock_id \
                                left outer join dimLock as dl2 on src.endlockid = dl2.lock_id \
                                left outer join vehicle_view as vv on src.vehicleid = vv.vehicleid \
                                left outer join bikelot_view as blv on vv.bikelotid = blv.bikelotid \
                                left outer join dimVehicle as dv on blv.biketypeid = dv.biketypeid \
                                left outer join subscriptions_view as sv on src.subscriptionid = sv.subscriptionid \
                                left outer join dimUser as du on sv.userid = du.userid \
                                left outer join weather_view as wv on cast(src.starttime as TIMESTAMP) = wv.timestamp \
                                and dl.zipcode = wv.zipcode \
                                left outer join dimWeather dw on UPPER(wv.weather_condition) = UPPER(dw.weather_type)\
                                ")
ridesFactFromSource.show(100)

+-------+--------------------+--------+-------------+-----------+----------+----------+------------------+-------------+--------------------+
|ride_id|             user_sk| date_sk|start_lock_id|end_lock_id|vehicle_id|weather_id|     ride_distance|ride_duration|                 md5|
+-------+--------------------+--------+-------------+-----------+----------+----------+------------------+-------------+--------------------+
|      1|1c121858-c0ed-49c...|20150922|         4849|       3188|         1|         4| 3.491018931475966|         NULL|bc8cab957d7e02da2...|
|      2|66766efd-d826-49d...|20150922|         NULL|       NULL|         3|         4|              NULL|         NULL|f70bc90e65cc45559...|
|      3|d88e41db-86b7-444...|20150922|         2046|       1951|         1|         4| 1.494908123910869|         NULL|fbdba2ebbd15003b5...|
|      4|ae57c91f-f235-419...|20150922|         1821|       2186|         1|         4|2.3114308809290756|         NULL|a21759453704a0ee8...|
|     

In [14]:
#LOAD
ridesFactFromSource.createOrReplaceTempView("factRides_new")

df_rides = spark.sql("select * from factRides_new")
#parquet file
df_rides.repartition(1).write.format("parquet").mode("overwrite").saveAsTable("factRides_pq")