select r.rideid,r.starttime, r.endtime, r.startpoint, r.endpoint,
       v.vehicleid, v.serialnumber, v.position,
       u.userid, u.name, u.email,u.country_code, u.city, u.zipcode, u.street, u.number
from rides r
join locks l on l.vehicleid = r.vehicleid
join vehicles v on v.lockid = l.lockid
join subscriptions s on s.subscriptionid = r.subscriptionid
join velo_users u on u.userid = s.userid

In [12]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
import ConnectionConfig as cc
cc.setupEnvironment()
spark = cc.startLocalCluster("VELO_JSON_DATA_GENERATION")
spark.getActiveSession()

In [13]:
#Extract
cc.set_connectionProfile("VeloDB")

df_ride = spark.read \
    .format("jdbc") \
    .option("driver" , cc.get_Property("driver")) \
    .option("url", cc.create_jdbc()) \
    .option("dbtable", "rides") \
    .option("user", cc.get_Property("username")) \
    .option("password", cc.get_Property("password")) \
    .option("partitionColumn", "rideid") \
    .option("numPartitions", 4) \
    .option("lowerBound", 0) \
    .option("upperBound", 4500000) \
    .load()

df_lock = spark.read \
    .format("jdbc") \
    .option("driver" , cc.get_Property("driver")) \
    .option("url", cc.create_jdbc()) \
    .option("dbtable", "locks") \
    .option("user", cc.get_Property("username")) \
    .option("password", cc.get_Property("password")) \
    .option("partitionColumn", "lockid") \
    .option("numPartitions", 4) \
    .option("lowerBound", 0) \
    .option("upperBound", 7600) \
    .load()

df_vehicle = spark.read \
    .format("jdbc") \
    .option("driver" , cc.get_Property("driver")) \
    .option("url", cc.create_jdbc()) \
    .option("dbtable", "vehicles") \
    .option("user", cc.get_Property("username")) \
    .option("password", cc.get_Property("password")) \
    .option("partitionColumn", "vehicleid") \
    .option("numPartitions", 4) \
    .option("lowerBound", 0) \
    .option("upperBound", 7000) \
    .load()

df_subscription = spark.read \
    .format("jdbc") \
    .option("driver" , cc.get_Property("driver")) \
    .option("url", cc.create_jdbc()) \
    .option("dbtable", "Subscriptions") \
    .option("user", cc.get_Property("username")) \
    .option("password", cc.get_Property("password")) \
    .option("partitionColumn", "userid") \
    .option("numPartitions", 4) \
    .option("lowerBound", 0) \
    .option("upperBound", 80000) \
    .load()

df_user = spark.read \
    .format("jdbc") \
    .option("driver" , cc.get_Property("driver")) \
    .option("url", cc.create_jdbc()) \
    .option("dbtable", "velo_users") \
    .option("user", cc.get_Property("username")) \
    .option("password", cc.get_Property("password")) \
    .option("partitionColumn", "userid") \
    .option("numPartitions", 4) \
    .option("lowerBound", 0) \
    .option("upperBound", 60000) \
    .load()

df_ride.show(5)
df_lock.show(5)
df_vehicle.show(5)
df_user.show(5)
df_subscription.show(5)



+------+-----------------+-----------------+-------------------+-------------------+---------+--------------+-----------+---------+
|rideid|       startpoint|         endpoint|          starttime|            endtime|vehicleid|subscriptionid|startlockid|endlockid|
+------+-----------------+-----------------+-------------------+-------------------+---------+--------------+-----------+---------+
|     1|(51.2083,4.44595)|(51.1938,4.40228)|2015-09-22 00:00:00|2012-09-22 00:00:00|      844|         13296|       4849|     3188|
|     2|(51.2174,4.41597)|(51.2188,4.40935)|2015-09-22 00:00:00|2012-09-22 00:00:00|     4545|         45924|       NULL|     NULL|
|     3|(51.2088,4.40834)|(51.2077,4.39846)|2015-09-22 00:00:00|2012-09-22 00:00:00|     3419|         25722|       2046|     1951|
|     4|(51.2023,4.41208)|(51.2119,4.39894)|2015-09-22 00:00:00|2012-09-22 00:00:00|     1208|         31000|       1821|     2186|
|     5|(51.1888,4.45039)|(51.2221,4.40467)|2015-09-22 00:00:00|2012-09-22 0

In [14]:
#Transform
df_ride.createOrReplaceTempView("source_Ride")
df_lock.createOrReplaceTempView("source_Lock")
df_vehicle.createOrReplaceTempView("source_Vehicle")
df_subscription.createOrReplaceTempView("source_Subscription")
df_user.createOrReplaceTempView("source_User")

df_dim_ride = spark.sql(f"select r.rideid,r.starttime, r.endtime, r.startpoint, r.endpoint, \
       v.vehicleid, v.serialnumber, v.position, \
       u.userid, u.name, u.email,u.country_code, u.city, u.zipcode, u.street, u.number \
from source_Ride as r \
join source_Lock as l on l.vehicleid = r.vehicleid \
join source_Vehicle as v on v.lockid = l.lockid \
join source_Subscription as s on s.subscriptionid = r.subscriptionid \
join source_User as u on u.userid = s.userid")

df_dim_ride.show()

+-------+-------------------+-------------------+-----------------+-----------------+---------+------------+-----------------+------+------------+--------------------+------------+--------------------+-------+-----------+-------+
| rideid|          starttime|            endtime|       startpoint|         endpoint|vehicleid|serialnumber|         position|userid|        name|               email|country_code|                city|zipcode|     street| number|
+-------+-------------------+-------------------+-----------------+-----------------+---------+------------+-----------------+------+------------+--------------------+------------+--------------------+-------+-----------+-------+
| 920223|2020-08-08 22:08:16|2020-08-08 22:10:01|(51.2282,4.41337)|(51.2219,4.41289)|     3455|     3455000| (51.1975,4.4596)|    12|Simons Thijs|Thijs.Simons@outl...|          BE|Antwerpen/Berendr...|   2040|Bergenhoeve|81 0302|
|1007930|2020-09-08 10:31:05|2020-09-08 10:50:26|(51.2158,4.40314)| (51.216,4.44

In [16]:
df_dim_ride.write.mode("overwrite").json("./Opdracht2_DataGeneratieJson")