# Config stuff

In [1]:
from xmlrpc.client import DateTime

import ConnectionConfig as cc
from delta import DeltaTable
cc.setupEnvironment()

# Start local cluster

In [2]:
spark = cc.startLocalCluster("FACT_RIDES")
spark.getActiveSession()

# Create facts table: rides

## Read from sources

### Read from VeloDB database

In [34]:
#EXTRACT

cc.set_connectionProfile("VeloDB")

# Read rides table from source VeloDB database
# Only rows after 2019-01-01 will be read because older rows are corrupt
rides_source_df = spark.read \
    .format("jdbc") \
    .option("url", cc.create_jdbc()) \
    .option("driver" , cc.get_Property("driver")) \
    .option("dbtable", "(select rideid, starttime, endtime, subscriptionid, startlockid, endlockid from rides where starttime > '2019-01-01') as subq") \
    .option("user", cc.get_Property("username")) \
    .option("password", cc.get_Property("password")) \
    .option("partitionColumn", "rideid") \
    .option("numPartitions", 4) \
    .option("lowerBound", 0) \
    .option("upperBound", 4140000) \
    .load()

# Next read operation performs a join between the locks and stations source tables. The goal is to retrieve a zipcode for each lockid. This zipcode is used to link the weather dimension with the facts table 
locks_with_zip = spark.read \
    .format("jdbc") \
    .option("url", cc.create_jdbc()) \
    .option("driver" , cc.get_Property("driver")) \
    .option("dbtable", "(select l.lockid, s.zipcode from locks l \
    left outer join stations s on l.stationid = s.stationid) as subq") \
    .option("user", cc.get_Property("username")) \
    .option("password", cc.get_Property("password")) \
    .option("partitionColumn", "lockid") \
    .option("numPartitions", 4) \
    .option("lowerBound", 0) \
    .option("upperBound", 8000) \
    .load()

locks_with_zip.show(10)

+------+-------+
|lockid|zipcode|
+------+-------+
|     1|   2000|
|     2|   2000|
|     3|   2000|
|     4|   2000|
|     5|   2000|
|     6|   2000|
|     7|   2000|
|     8|   2000|
|     9|   2000|
|    10|   2000|
+------+-------+
only showing top 10 rows



In [35]:
rides_source_df.show(10)

+------+-------------------+-------------------+--------------+-----------+---------+
|rideid|          starttime|            endtime|subscriptionid|startlockid|endlockid|
+------+-------------------+-------------------+--------------+-----------+---------+
|    15|2019-09-22 08:46:43|2019-09-22 09:01:36|         13296|       4849|     3188|
|    16|2019-09-22 08:19:51|2019-09-22 08:21:55|         45924|       NULL|     NULL|
|    17|2019-09-22 08:27:38|2019-09-22 08:30:25|         25722|       2046|     1951|
|    18|2019-09-22 08:41:48|2019-09-22 08:46:52|         31000|       1821|     2186|
|    19|2019-09-22 08:50:08|2019-09-22 09:09:02|         59732|       6382|     2700|
|    20|2019-09-22 08:29:42|2019-09-22 08:31:40|          NULL|       NULL|     NULL|
|    21|2019-09-22 08:05:17|2019-09-22 08:14:44|         31055|       1388|     3401|
|    22|2019-09-22 08:39:11|2019-09-22 08:44:46|         65164|       2572|       13|
|    23|2019-09-22 08:23:27|2019-09-22 08:30:02|      

### Read from deltatables

In [5]:
#EXTRACT
# Dimension date
dim_date = spark.read.format("delta").load("spark-warehouse/dimdate")

# Dimension weather
dim_weather = spark.read.format("delta").load("spark-warehouse/dimweather")

# Dimension customer
dim_customer = spark.read.format("delta").load("spark-warehouse/dimuser")

# Dimension lock
dim_lock = spark.read.format("delta").load("spark-warehouse/dimlock")



### Read from weather data source

In [43]:
#EXTRACT
weather_responses = spark.read.format("json").option("multiLine",True).load("weather")
weather_responses.show(10)

+--------+------+---+--------------+----------+-------+--------------------+-----+------+--------------------+--------+----------+--------------------+-----------------+-------+
|    base|clouds|cod|         coord|        dt|     id|                main| name|  rain|                 sys|timezone|visibility|             weather|             wind|zipCode|
+--------+------+---+--------------+----------+-------+--------------------+-----+------+--------------------+--------+----------+--------------------+-----------------+-------+
|stations| {100}|200|{44.34, 10.99}|1583593967|3163858|{298.74, 933, 64,...|Zocca|{3.16}|{IT, 2075663, 166...|    7200|     10000|[{overcast clouds...|{349, 1.18, 0.62}|   2000|
|stations| {100}|200|{44.34, 10.99}|1583132121|3163858|{298.74, 933, 64,...|Zocca|{3.16}|{IT, 2075663, 166...|    7200|     10000|[{overcast clouds...|{349, 1.18, 0.62}|   2018|
|stations| {100}|200|{44.34, 10.99}|1583134645|3163858|{298.74, 933, 64,...|Zocca|{3.16}|{IT, 2075663, 166...|

## Create tempviews

In [44]:
# Rides source table
rides_source_df.createOrReplaceTempView("ridesSource")

# Table with lockid's and zipcodes
locks_with_zip.createOrReplaceTempView("locksZip")

# Dimension date
dim_date.createOrReplaceTempView("dimDate")

# Dimension weather
dim_weather.createOrReplaceTempView("dimWeather")

# Weather responses
weather_responses.createOrReplaceTempView("weatherResponses")

# Dimension customer
dim_customer.createOrReplaceTempView("dimCustomer")

# Dimension lock
dim_lock.createOrReplaceTempView("dimLock")

## Add zipcodes to rides table

Join ridesSource with locksZip on lockid so we can add the zipcode to the rides table

In [8]:
# TRANSFORM
rides_with_zipcodes = spark.sql("select src.rideid, src.starttime, src.endtime, src.subscriptionid, src.startlockid, \
                                 src.endlockid, lz.zipcode as startlockZipcode \
                                 from ridesSource as src \
                                 left outer join locksZip as lz on src.startlockid = lz.lockid")
rides_with_zipcodes.show(10)

+------+-------------------+-------------------+--------------+-----------+---------+----------------+
|rideid|          starttime|            endtime|subscriptionid|startlockid|endlockid|startlockZipcode|
+------+-------------------+-------------------+--------------+-----------+---------+----------------+
|    18|2019-09-22 08:41:48|2019-09-22 08:46:52|         31000|       1821|     2186|            2018|
|    23|2019-09-22 08:23:27|2019-09-22 08:30:02|         71164|         50|     2067|            2000|
|    15|2019-09-22 08:46:43|2019-09-22 09:01:36|         13296|       4849|     3188|            2140|
|    19|2019-09-22 08:50:08|2019-09-22 09:09:02|         59732|       6382|     2700|            2660|
|    25|2019-09-22 08:48:14|2019-09-22 08:52:45|           999|        985|     2148|            2018|
|    16|2019-09-22 08:19:51|2019-09-22 08:21:55|         45924|       NULL|     NULL|            NULL|
|    17|2019-09-22 08:27:38|2019-09-22 08:30:25|         25722|       204

In [9]:
rides_with_zipcodes.createOrReplaceTempView("ridesWithZip")

## Transform weather responses table

Add weather_ID based on the weather type (condition_id)

Condition id:
- < 800: All codes with a number smaller than 800 means rain in some form. (=onaangenaam code 2)
- = 800: This code means clear sky and sunshine (=aangenaam code 1 if temperature is higher than 15 degrees Celsius)
- \> 800: All other weather conditions (Neutraal code 3)

In [45]:
#TRANSFORM
short_weather_responses = spark.sql("select zipCode as zip_code, dt as timestamp, weather.id[0] as condition_id, main.temp as temperature, \
                                    case \
                                        when condition_id < 800 then 2 \
                                        when condition_id = 800 and main.temp > (273 + 15) then 1 \
                                        when condition_id = 800 and main.temp < (273 + 15) then 3 \
                                        when condition_id > 800 then 3 \
                                    else 4 \
                                    end as weather_ID \
                                    from weatherResponses")
short_weather_responses.show(10)

+--------+----------+------------+-----------+----------+
|zip_code| timestamp|condition_id|temperature|weather_ID|
+--------+----------+------------+-----------+----------+
|    2000|1583593967|         804|     298.48|         3|
|    2018|1583132121|         804|     298.48|         3|
|    2020|1583134645|         804|     298.48|         3|
|    2030|1585973548|         804|     298.48|         3|
|    2050|1569163778|         804|     298.48|         3|
|    2060|1569968446|         804|     298.48|         3|
|    2100|1575264035|         804|     298.48|         3|
|    2140|1619936979|         804|     298.48|         3|
|    2170|1656838375|         804|     298.48|         3|
|    2600|1659778730|         804|     298.48|         3|
+--------+----------+------------+-----------+----------+
only showing top 10 rows



In [46]:
short_weather_responses.createOrReplaceTempView("shortWeatherResponses")

## Build facts table

In [47]:
#TRANSFORM
rides_fact_df = spark.sql("select src.rideid as ride_ID, dd.date_SK, \
                          coalesce(dw.weather_SK, 3) as weather_SK, \
                          1 as count_MV, \
                          (unix_timestamp(endtime) - unix_timestamp(starttime)) as rideDuration_MV, \
                          md5(concat(src.rideid, dd.date_SK, coalesce(dw.weather_SK, 3), 1, rideDuration_MV)) as md5 \
                          from ridesWithZip as src \
                          left outer join dimDate as dd \
                          on cast(src.starttime as DATE) = cast(dd.CalendarDate as DATE) \
                          left outer join shortWeatherResponses wr \
                          on src.startlockZipcode = wr.zip_code \
                          and date_format(src.starttime, 'yyyy-MM-dd-HH') = date_format(from_unixtime(wr.timestamp),'yyyy-MM-dd-HH') \
                          left outer join dimWeather dw on dw.weather_id = wr.weather_ID \
                          where src.subscriptionid is not null ")

rides_fact_df.show(10)

+-------+-------+----------+--------+---------------+--------------------+
|ride_ID|date_SK|weather_SK|count_MV|rideDuration_MV|                 md5|
+-------+-------+----------+--------+---------------+--------------------+
|     18|     21|         3|       1|            304|8ae4eeec24c367d5d...|
|     23|     21|         3|       1|            395|d1a51dbb9eb899c4f...|
|     26|     21|         3|       1|            641|0495f90b50d12567e...|
|     27|     21|         3|       1|           1176|7d493b85be00b2a6d...|
|     34|     21|         3|       1|            677|9e6c18e335cb603d9...|
|     36|     21|         3|       1|           1066|3994164be2545f135...|
|     42|     21|         3|       1|             25|e90776aa1f7fadaf2...|
|     43|     21|         3|       1|            391|8c6a7f5b38fdecf27...|
|     45|     21|         3|       1|            269|2297c7674e590f594...|
|     50|     21|         3|       1|            314|f4d745d86eefd588c...|
+-------+-------+--------

### Create temview from facts table

In [48]:
rides_fact_df.createOrReplaceTempView("factRides_new")

## Write facts table to delta table: Initial load

In [50]:
# LOAD
rides_fact_df.write.format("delta").mode("overwrite").saveAsTable("factRides")

## Incremental load

In [51]:
#LOAD
dt_factRides = DeltaTable.forPath(spark,".\spark-warehouse\\factrides")
dt_factRides.toDF().createOrReplaceTempView("factRides_current")

result = spark.sql("merge into factRides_current as target \
                   using factRides_new as source on target.ride_ID = source.ride_ID \
                   when matched and target.md5 <> source.md5 then update set * \
                   when not matched then insert *")

result.show()

+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
+-----------------+----------------+----------------+-----------------+
|                0|               0|               0|                0|
+-----------------+----------------+----------------+-----------------+



In [52]:
spark.stop()