### Config stuff

In [4]:
import os
from xmlrpc.client import DateTime

import ConnectionConfig as cc
from delta import DeltaTable
cc.setupEnvironment()

Dynamically set JAVA_HOME: /Users/user/Library/Java/JavaVirtualMachines/temurin-21.0.2/Contents/Home


In [5]:
spark = cc.startLocalCluster("FACT_RIDE")
spark.getActiveSession()

:: loading settings :: url = jar:file:/Users/user/Desktop/spark_and_hadop/spark-3.5.4-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/user/.ivy2/cache
The jars for the packages stored in: /Users/user/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.postgresql#postgresql added as a dependency
org.elasticsearch#elasticsearch-spark-30_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-8183ec6c-7e3e-42a4-8237-c625574535a3;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.2.0 in central
	found io.delta#delta-storage;3.2.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.0 in central
	found org.apache.kafka#kafka-clients;3.3.2 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.9.1 in central
	found org.slf4j#slf4j-api;2.0.6 in central
	found org.apache.hadoop#hadoop-client-runtime;3.

# Fact transformations
This notebooks creates the sales fact table from scratch based on the operational source table "sales"
When creating a fact table always follow the listed steps in order.


#### 1 READ NECESSARY SOURCE TABLE(S) AND PERFORM TRANSFORMATIONS
**When reading from the source table make sure you include all data necessary:**
- to calculate the measure values
- the source table keys that you have to use to lookup the correct surrogate keys in the dimension tables.

**If more than one table is needed to gather the necesary information you can opt for one of two strategies:**
- Use a select query when reading from the jdbc source with the spark.read operation. Avoid complex queries because the operational database needs a lot of resources to run those queries.
- Perform a spark.read operation for each table separately and join the tables within Spark. The joins will take place on the cluster instead of the database. You limit the database recources used, but there can be a significant overhead of unnecessary data tranferred to the cluster.


In this case we just rename Amount and create a default count_mv column.
The transformations are minimal. In reality, transformations can be far more complex. If so, it can be advisable to work out the transforms in more then one step.*



In [6]:
cc.set_connectionProfile("default")
ride_src_df = spark.read \
    .format("jdbc") \
    .option("url", cc.create_jdbc()) \
    .option("driver" , cc.get_Property("driver")) \
    .option("dbtable","rides").option("user", cc.get_Property("username")) \
    .option("password", cc.get_Property("password")) \
    .option("partitionColumn", "rideid") \
    .option("numPartitions", 4) \
    .option("lowerBound", 0) \
    .option("upperBound", 1000) \
    .load()

subscriptions_df = spark.read \
    .format("jdbc") \
    .option("url", cc.create_jdbc()) \
    .option("driver" , cc.get_Property("driver")) \
    .option("dbtable","subscriptions").option("user", cc.get_Property("username")) \
    .option("password", cc.get_Property("password")) \
    .option("partitionColumn", "subscriptionid") \
    .option("numPartitions", 4) \
    .option("lowerBound", 0) \
    .option("upperBound", 1000) \
    .load()

vehicles_df = spark.read \
    .format("jdbc") \
    .option("url", cc.create_jdbc()) \
    .option("driver" , cc.get_Property("driver")) \
    .option("dbtable","vehicles").option("user", cc.get_Property("username")) \
    .option("password", cc.get_Property("password")) \
    .option("partitionColumn", "vehicleid") \
    .option("numPartitions", 4) \
    .option("lowerBound", 0) \
    .option("upperBound", 1000) \
    .load()

bikelots_df = spark.read \
    .format("jdbc") \
    .option("url", cc.create_jdbc()) \
    .option("driver" , cc.get_Property("driver")) \
    .option("dbtable","bikelots").option("user", cc.get_Property("username")) \
    .option("password", cc.get_Property("password")) \
    .option("partitionColumn", "bikelotid") \
    .option("numPartitions", 4) \
    .option("lowerBound", 0) \
    .option("upperBound", 1000) \
    .load()




# weather_df = spark.read.json(r'C:\Users\kkiva\data4_project_group5\examples\weather\*.json')


In [25]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType, ArrayType, DateType

# Define the schema for the JSON structure based on the provided response
weather_schema = StructType([
    StructField("zipCode", StringType(), True),
    StructField("coord", StructType([
        StructField("lon", FloatType(), True),
        StructField("lat", FloatType(), True)
    ]), True),
    StructField("weather", ArrayType(StructType([
        StructField("id", IntegerType(), True),
        StructField("main", StringType(), True),
        StructField("description", StringType(), True),
        StructField("icon", StringType(), True)
    ])), True),
    StructField("base", StringType(), True),
    StructField("main", StructType([
        StructField("temp", FloatType(), True),
        StructField("feels_like", FloatType(), True),
        StructField("temp_min", FloatType(), True),
        StructField("temp_max", FloatType(), True),
        StructField("pressure", IntegerType(), True),
        StructField("humidity", IntegerType(), True),
        StructField("sea_level", IntegerType(), True),
        StructField("grnd_level", IntegerType(), True)
    ]), True),
    StructField("visibility", IntegerType(), True),
    StructField("wind", StructType([
        StructField("speed", FloatType(), True),
        StructField("deg", IntegerType(), True),
        StructField("gust", FloatType(), True)
    ]), True),
    StructField("rain", StructType([
        StructField("1h", FloatType(), True)
    ]), True),
    StructField("clouds", StructType([
        StructField("all", IntegerType(), True)
    ]), True),
    StructField("dt", IntegerType(), True),
    StructField("api_current_time", StringType(), True),
    StructField("sys", StructType([
        StructField("type", IntegerType(), True),
        StructField("id", IntegerType(), True),
        StructField("country", StringType(), True),
        StructField("sunrise", IntegerType(), True),
        StructField("sunset", IntegerType(), True)
    ]), True),
    StructField("timezone", IntegerType(), True),
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("cod", IntegerType(), True)
])


AttributeError: 'StructType' object has no attribute 'createOrReplaceTempView'

In [26]:
weather_df = spark.read.option("multiline", "true").schema(weather_schema).json("weather/*.json")
weather_df.printSchema()
weather_df.show(5, truncate=False)


root
 |-- zipCode: string (nullable = true)
 |-- coord: struct (nullable = true)
 |    |-- lon: float (nullable = true)
 |    |-- lat: float (nullable = true)
 |-- weather: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: integer (nullable = true)
 |    |    |-- main: string (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- icon: string (nullable = true)
 |-- base: string (nullable = true)
 |-- main: struct (nullable = true)
 |    |-- temp: float (nullable = true)
 |    |-- feels_like: float (nullable = true)
 |    |-- temp_min: float (nullable = true)
 |    |-- temp_max: float (nullable = true)
 |    |-- pressure: integer (nullable = true)
 |    |-- humidity: integer (nullable = true)
 |    |-- sea_level: integer (nullable = true)
 |    |-- grnd_level: integer (nullable = true)
 |-- visibility: integer (nullable = true)
 |-- wind: struct (nullable = true)
 |    |-- speed: float (nullable = true)
 |    |-- de


#### 2 MAKE DIMENSION TABLES AVAILABLE AS VIEWS

In [27]:
dim_date = spark.read.format("delta").load("spark-warehouse/dimdate")
dim_vehicle = spark.read.format("delta").load("spark-warehouse/dimvehicle")
dim_user = spark.read.format("delta").load("spark-warehouse/dimuser")
dim_weather = spark.read.format("delta").load("spark-warehouse/dimweather")
dim_lock = spark.read.format("delta").load("spark-warehouse/dimlock")

dim_date.createOrReplaceTempView("dimDate")
dim_user.createOrReplaceTempView("dimUser")
dim_vehicle.createOrReplaceTempView("dimVehicle")
dim_weather.createOrReplaceTempView("dimWeather")
dim_lock.createOrReplaceTempView("dimLock")

In [41]:
xxd = spark.sql("""
SELECT *  FROM dimVehicle;
""")
xxd.show()

                                                                                

+---------+----------+-------------------+
|vehicleid|biketypeid|biketypedescription|
+---------+----------+-------------------+
|     NULL|         1|          Velo Bike|
|     NULL|         2|        Velo E-Bike|
|     NULL|         3|               Step|
|     NULL|         4|            Scooter|
+---------+----------+-------------------+




#### 3 Build the fact table

Within the creation of a fact table always perform these two tasks:
1.   Include the measures of the fact
2. Use the dimension tables to look up the surrogate keys that correspond with the natural key value. In case of SCD2 dimension use the scd_start en scd_end to find the correct version of the data in the dimension


In [28]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
import math

def haversine_km(lat1, lon1, lat2, lon2):
    if None in (lat1, lon1, lat2, lon2):  # Handle NULL values safely
        return None  
    R = 6371  # Radius of Earth in km
    lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = math.sin(dlat / 2) ** 2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2) ** 2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    return R * c

# Register the UDF in Spark SQL
haversine_udf = udf(haversine_km, DoubleType())
spark.udf.register("haversine_km", haversine_udf)


25/04/23 11:22:34 WARN SimpleFunctionRegistry: The function haversine_km replaced a previously registered function.


<pyspark.sql.udf.UserDefinedFunction at 0x1127b2f10>

In [34]:
ride_src_df.createOrReplaceTempView("rides_source")
subscriptions_df.createOrReplaceTempView("subscriptions_source")

# Create temp views for the required dataframes
weather_df.createOrReplaceTempView("weather_data")

vehicles_df.createOrReplaceTempView("vehicle_data")
bikelots_df.createOrReplaceTempView("bikelots_data")

ridesFactFromSource = spark.sql("""
    SELECT src.rideid AS ride_id, 
           du.userSK AS user_sk, 
           src.startlockid AS start_lock_id, 
           src.endlockid AS end_lock_id, 
           dd.date_sk AS date_sk, 
           dv.biketypeid AS vehicle_id, 
              CASE 
           WHEN unix_timestamp(src.endtime) > unix_timestamp(src.starttime) 
           THEN ROUND((unix_timestamp(src.endtime) - unix_timestamp(src.starttime)) / 60, 2)
           ELSE 0  
           END AS ride_duration, 
           Round(haversine_km(
               CAST(SPLIT(REPLACE(dl_start.gpscoord, '(', ''), ',')[0] AS DOUBLE),
               CAST(SPLIT(REPLACE(REPLACE(dl_start.gpscoord, ')', ''), '(', ''), ',')[1] AS DOUBLE),
               CAST(SPLIT(REPLACE(dl_end.gpscoord, '(', ''), ',')[0] AS DOUBLE),
               CAST(SPLIT(REPLACE(REPLACE(dl_end.gpscoord, ')', ''), '(', ''), ',')[1] AS DOUBLE)
           ),2) AS distance_km,
           MIN(dw.weather_id) AS weather_id,  
           md5(concat(src.rideid, du.userSK, src.startlockid, src.endlockid, dd.date_sk, dv.biketypeid)) AS md5 
    FROM rides_source AS src 
    LEFT OUTER JOIN subscriptions_source AS sub ON src.subscriptionid = sub.subscriptionid 
    LEFT OUTER JOIN dimUser AS du ON sub.userid = du.userid 
    LEFT OUTER JOIN dimLock AS dl_start ON src.startlockid = dl_start.lockid  
    LEFT OUTER JOIN dimLock AS dl_end ON src.endlockid = dl_end.lockid  
    LEFT OUTER JOIN dimDate AS dd ON DATE(src.starttime) = dd.date 
    LEFT OUTER JOIN vehicle_data AS vd ON src.vehicleid = vd.vehicleid
    LEFT OUTER JOIN bikelots_data AS bl ON vd.bikelotid = bl.bikelotid
    LEFT OUTER JOIN dimVehicle as dv ON bl.biketypeid = dv.biketypeid
    LEFT OUTER JOIN weather_data AS wd ON dl_start.zipcode = wd.zipcode
    LEFT OUTER JOIN dimWeather AS dw ON wd.weather[0].main = dw.weather_condition
    GROUP BY src.rideid, du.userSK, src.startlockid, src.endlockid, dd.date_sk, dv.biketypeid,
             src.starttime, src.endtime, dl_start.gpscoord, dl_end.gpscoord
""")


In [31]:
ride_src_df.createOrReplaceTempView("rides_source")
subscriptions_df.createOrReplaceTempView("subscriptions_source")

In [21]:
# trying to fix null subscription ids 
xd = spark.sql("""
SELECT * from rides_source


""")
xd.show()

+------+-----------------+-----------------+-------------------+-------------------+---------+--------------+-----------+---------+
|rideid|       startpoint|         endpoint|          starttime|            endtime|vehicleid|subscriptionid|startlockid|endlockid|
+------+-----------------+-----------------+-------------------+-------------------+---------+--------------+-----------+---------+
|     1|(51.2083,4.44595)|(51.1938,4.40228)|2015-09-22 00:00:00|2012-09-22 00:00:00|      844|         13296|       4849|     3188|
|     2|(51.2174,4.41597)|(51.2188,4.40935)|2015-09-22 00:00:00|2012-09-22 00:00:00|     4545|         45924|       NULL|     NULL|
|     3|(51.2088,4.40834)|(51.2077,4.39846)|2015-09-22 00:00:00|2012-09-22 00:00:00|     3419|         25722|       2046|     1951|
|     4|(51.2023,4.41208)|(51.2119,4.39894)|2015-09-22 00:00:00|2012-09-22 00:00:00|     1208|         31000|       1821|     2186|
|     5|(51.1888,4.45039)|(51.2221,4.40467)|2015-09-22 00:00:00|2012-09-22 0

In [35]:
ridesFactFromSource.printSchema()
ridesFactFromSource.show(50)

root
 |-- ride_id: integer (nullable = true)
 |-- user_sk: string (nullable = true)
 |-- start_lock_id: integer (nullable = true)
 |-- end_lock_id: integer (nullable = true)
 |-- date_sk: long (nullable = true)
 |-- vehicle_id: integer (nullable = true)
 |-- ride_duration: double (nullable = true)
 |-- distance_km: double (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- md5: string (nullable = true)



[Stage 184:>                                                        (0 + 1) / 1]

+-------+--------------------+-------------+-----------+-------+----------+-------------+-----------+----------+--------------------+
|ride_id|             user_sk|start_lock_id|end_lock_id|date_sk|vehicle_id|ride_duration|distance_km|weather_id|                 md5|
+-------+--------------------+-------------+-----------+-------+----------+-------------+-----------+----------+--------------------+
|     13|d8f685cc-3412-4e0...|         5619|       2717|   2455|         2|          0.0|       5.38|         3|ec253130345dc6c41...|
|     50|93e34254-7e69-47f...|         2962|       2973|   3916|         2|         5.23|       0.56|         3|e63cb075e0d01c00f...|
|     67|f018bf1b-7ae3-484...|         1494|       5591|   3916|         1|        17.13|       4.46|         3|83caf42c81687b0cc...|
|     79|b8f870fe-c89a-4fb...|         3260|       1411|   3916|         1|         13.2|       2.16|         3|3bc9fded18efd494f...|
|    108|ea4639ae-6955-40b...|         6314|       4117|   391

                                                                                

## Initial load
The first time loading the fact table perform a FULL load. All data is written to the Delta Table.
After initial load the code line has to be disabled

In [16]:

ridesFactFromSource.write.format("delta").mode("overwrite").saveAsTable("factRides")



NameError: name 'ridesFactFromSource' is not defined

## Incremental load
When previous runs where performend you can opt for a 'faster' incremental run that only writes away changes. UPDATES and INSERTS are performed in one run.
In our solution we use an md5 based on all fields in the source table to detect changes. This is not the most efficient way to detect changes. A better way is to use a timestamp field in the source table and use that to detect changes. This is not implemented in this example.

In [7]:
dt_factRides = DeltaTable.forPath(spark,".\spark-warehouse\\factrides")
dt_factRides.toDF().createOrReplaceTempView("factRides_current")
#Merge to perform updates (TODO: Implement md5 strategy)

result = spark.sql("MERGE INTO factRides_current AS target \
      using factRides_new AS source ON target.rideID = source.rideID \
      WHEN MATCHED and source.MD5<>target.MD5 THEN UPDATE SET * \
      WHEN NOT MATCHED THEN INSERT *")

result.show()

+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
+-----------------+----------------+----------------+-----------------+
|                0|               0|               0|                0|
+-----------------+----------------+----------------+-----------------+


In [8]:
# IMPORTANT: ALWAYS TEST THE CREATED CODE.
# In this example I changed order 498 in the operational database and checked the change after the run.
# spark.sql("select * from factsales f join dimsalesrep ds on f.salesrepSK = ds.salesrepSK where OrderID = 192  ").show()
spark.sql("select count(*) from factrides").show()
spark.sql("select * from factrides where rideId=1").show()



+--------+
|count(1)|
+--------+
|     999|
+--------+

+-------+------+--------------------+--------+----------+--------------------+
|OrderID|dateSK|          salesrepSK|count_mv|revenue_mv|                 md5|
+-------+------+--------------------+--------+----------+--------------------+
|      1|   650|b65df3d9-20dc-42d...|       1| 851804379|a237b06f2932af7dd...|
+-------+------+--------------------+--------+----------+--------------------+


### Checking the history of your delta fact table

In [9]:
# The history information is derived from the delta table log files. They contain a lot of information of all the actions performed on the table. In this case it tells us something about de merge operations. You can find statistics about the update and insert counts in the document.

fact.history().show(10,False)

NameError: name 'fact' is not defined

In [13]:
spark.stop()