In [1]:
from functools import cache

import pyspark
from pyspark.sql import SparkSession

In [2]:
# pyspark.__version__
# pyspark.__file__
# pwd

In [3]:
jdbc_driver_path = "./python_app/jars/postgresql-42.5.4.jar"

spark = SparkSession.builder \
    .appName("testCitiBikeStationStatusTransform") \
    .config("spark.jars", jdbc_driver_path) \
    .config("spark.driver.memory", "8g") \
    .config("spark.memory.fraction", "0.8") \
    .master("local[4]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("INFO")

25/01/02 15:52:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
# POSTGRESQL connection parameters
# POSTGRESQL_HOST = "postgres"
POSTGRESQL_PORT = "5432"
POSTGRES_DB = "citibike_db"
POSTGRES_USER = "citibike_user"
POSTGRES_PASSWORD = "citibike_pass"

# JDBC URL
# jdbc_url = f"jdbc:postgresql://{POSTGRESQL_HOST}:{POSTGRESQL_PORT}/{POSTGRES_DB}"

jdbc_url = f"jdbc:postgresql://localhost:{POSTGRESQL_PORT}/{POSTGRES_DB}"

# Connection properties
connection_properties = {
    "user": POSTGRES_USER,
    "password": POSTGRES_PASSWORD,
    "driver": "org.postgresql.Driver"
}

In [5]:
df_station_status = spark.read \
    .jdbc(url=jdbc_url,
          table="station_status",
          properties=connection_properties
          )
df_station_status.printSchema()

25/01/02 15:52:58 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.
25/01/02 15:52:58 INFO SharedState: Warehouse path is 'file:/Users/tobi/Documents/projects/citibike-stream/spark-warehouse'.


root
 |-- id: integer (nullable = true)
 |-- station_id: string (nullable = true)
 |-- num_bikes_available: integer (nullable = true)
 |-- num_docks_available: integer (nullable = true)
 |-- is_installed: boolean (nullable = true)
 |-- is_renting: boolean (nullable = true)
 |-- is_returning: boolean (nullable = true)
 |-- last_reported: timestamp (nullable = true)
 |-- inserted_at: timestamp (nullable = true)



In [6]:
df_station_info = spark.read \
    .jdbc(url=jdbc_url,
            table="station_information",
            properties=connection_properties
            )
df_station_info.printSchema()

root
 |-- id: integer (nullable = true)
 |-- station_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- short_name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- region_id: string (nullable = true)
 |-- capacity: integer (nullable = true)
 |-- eightd_has_key_dispenser: boolean (nullable = true)
 |-- has_kiosk: boolean (nullable = true)
 |-- installed: boolean (nullable = true)
 |-- last_reported: timestamp (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- updated_at: timestamp (nullable = true)



In [7]:
df_tripdata = spark.read \
    .jdbc(url=jdbc_url,
            table="tripdata",
            properties=connection_properties
            )
df_tripdata.printSchema()

root
 |-- ride_id: string (nullable = true)
 |-- rideable_type: string (nullable = true)
 |-- started_at: timestamp (nullable = true)
 |-- ended_at: timestamp (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: string (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: string (nullable = true)
 |-- start_lat: decimal(9,6) (nullable = true)
 |-- start_lng: decimal(9,6) (nullable = true)
 |-- end_lat: decimal(9,6) (nullable = true)
 |-- end_lng: decimal(9,6) (nullable = true)
 |-- member_casual: string (nullable = true)
 |-- month: date (nullable = true)



In [8]:
df_station_status.show(5)

25/01/02 15:57:39 INFO CodeGenerator: Code generated in 175.627416 ms
25/01/02 15:57:39 INFO SparkContext: Starting job: showString at NativeMethodAccessorImpl.java:0
25/01/02 15:57:39 INFO DAGScheduler: Got job 0 (showString at NativeMethodAccessorImpl.java:0) with 1 output partitions
25/01/02 15:57:39 INFO DAGScheduler: Final stage: ResultStage 0 (showString at NativeMethodAccessorImpl.java:0)
25/01/02 15:57:39 INFO DAGScheduler: Parents of final stage: List()
25/01/02 15:57:39 INFO DAGScheduler: Missing parents: List()
25/01/02 15:57:39 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at showString at NativeMethodAccessorImpl.java:0), which has no missing parents
25/01/02 15:57:39 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 15.7 KiB, free 6.2 GiB)
25/01/02 15:57:40 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 7.3 KiB, free 6.2 GiB)
25/01/02 15:57:40 INFO BlockManagerInfo: Added broadcast_0_pie

+---+--------------------+-------------------+-------------------+------------+----------+------------+-------------------+--------------------+
| id|          station_id|num_bikes_available|num_docks_available|is_installed|is_renting|is_returning|      last_reported|         inserted_at|
+---+--------------------+-------------------+-------------------+------------+----------+------------+-------------------+--------------------+
|  1|816e50eb-dc4b-47d...|                  0|                  0|       false|     false|       false|2024-11-05 17:22:24|2024-12-27 20:00:...|
|  2|566a6389-5c22-49c...|                  0|                  0|       false|     false|       false|2024-11-11 15:41:13|2024-12-27 20:00:...|
|  3|64f0f28c-bedc-42d...|                  0|                102|       false|     false|       false|2024-12-27 16:05:08|2024-12-27 20:00:...|
|  4|66de85d2-0aca-11e...|                 27|                 17|        true|      true|        true|2024-12-27 19:57:08|2024-12

25/01/02 15:57:42 INFO CodeGenerator: Code generated in 19.80375 ms


In [9]:
df_station_info.show(5)

+----+--------------------+--------------------+----------+---------+----------+---------+--------+------------------------+---------+---------+--------------------+--------------------+--------------------+
|  id|          station_id|                name|short_name|      lat|       lon|region_id|capacity|eightd_has_key_dispenser|has_kiosk|installed|       last_reported|          created_at|          updated_at|
+----+--------------------+--------------------+----------+---------+----------+---------+--------+------------------------+---------+---------+--------------------+--------------------+--------------------+
| 643|1198e739-787c-4ce...|     7 Ave & W 55 St|   6847.05|40.764126|-73.980973|       71|      67|                   false|     true|    false|2024-12-30 14:21:...|2024-12-29 11:03:...|2024-12-30 14:21:...|
|1255| 1885442082749874736|Grand Concourse &...|   8340.01| 40.84761| -73.90745|       71|      22|                   false|     true|    false|2024-12-30 14:21:...|202

25/01/02 15:58:56 INFO CodeGenerator: Code generated in 19.840334 ms
25/01/02 15:58:56 INFO SparkContext: Starting job: showString at NativeMethodAccessorImpl.java:0
25/01/02 15:58:56 INFO DAGScheduler: Got job 1 (showString at NativeMethodAccessorImpl.java:0) with 1 output partitions
25/01/02 15:58:56 INFO DAGScheduler: Final stage: ResultStage 1 (showString at NativeMethodAccessorImpl.java:0)
25/01/02 15:58:56 INFO DAGScheduler: Parents of final stage: List()
25/01/02 15:58:56 INFO DAGScheduler: Missing parents: List()
25/01/02 15:58:56 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[5] at showString at NativeMethodAccessorImpl.java:0), which has no missing parents
25/01/02 15:58:56 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 18.0 KiB, free 6.2 GiB)
25/01/02 15:58:56 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 7.8 KiB, free 6.2 GiB)
25/01/02 15:58:56 INFO BlockManagerInfo: Added broadcast_1_piec

In [10]:
df_tripdata.show(5)

25/01/02 16:05:33 INFO CodeGenerator: Code generated in 16.3335 ms
25/01/02 16:05:33 INFO SparkContext: Starting job: showString at NativeMethodAccessorImpl.java:0
25/01/02 16:05:33 INFO DAGScheduler: Got job 2 (showString at NativeMethodAccessorImpl.java:0) with 1 output partitions
25/01/02 16:05:33 INFO DAGScheduler: Final stage: ResultStage 2 (showString at NativeMethodAccessorImpl.java:0)
25/01/02 16:05:33 INFO DAGScheduler: Parents of final stage: List()
25/01/02 16:05:33 INFO DAGScheduler: Missing parents: List()
25/01/02 16:05:33 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[8] at showString at NativeMethodAccessorImpl.java:0), which has no missing parents
25/01/02 16:05:33 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 18.2 KiB, free 6.2 GiB)
25/01/02 16:05:33 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 7.9 KiB, free 6.2 GiB)
25/01/02 16:05:33 INFO BlockManagerInfo: Added broadcast_2_piece0

+----------------+-------------+-------------------+-------------------+--------------------+----------------+--------------------+--------------+---------+----------+---------+----------+-------------+----------+
|         ride_id|rideable_type|         started_at|           ended_at|  start_station_name|start_station_id|    end_station_name|end_station_id|start_lat| start_lng|  end_lat|   end_lng|member_casual|     month|
+----------------+-------------+-------------------+-------------------+--------------------+----------------+--------------------+--------------+---------+----------+---------+----------+-------------+----------+
|3F874FD7056276BA|electric_bike|2024-10-31 00:00:00|2024-11-01 00:00:00|    W 30 St & 10 Ave|         6459.07|Cleveland Pl & Sp...|       5492.05|40.752694|-74.002353|40.722104|-73.997249|       member|2024-10-01|
|E4FE320A5D6A8901| classic_bike|2024-10-31 00:00:00|2024-11-01 00:00:00|Sullivan St & Was...|         5721.01|Cleveland Pl & Sp...|       5492.0

25/01/02 16:05:37 INFO CodeGenerator: Code generated in 10.747166 ms
25/01/02 16:05:37 INFO JDBCRDD: closed connection
25/01/02 16:05:37 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 2430 bytes result sent to driver
25/01/02 16:05:37 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 4072 ms on macbookpro.fritz.box (executor driver) (1/1)
25/01/02 16:05:37 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
25/01/02 16:05:37 INFO DAGScheduler: ResultStage 2 (showString at NativeMethodAccessorImpl.java:0) finished in 4,081 s
25/01/02 16:05:37 INFO DAGScheduler: Job 2 is finished. Cancelling potential speculative or zombie tasks for this job
25/01/02 16:05:37 INFO TaskSchedulerImpl: Killing all running tasks in stage 2: Stage finished
25/01/02 16:05:37 INFO DAGScheduler: Job 2 finished: showString at NativeMethodAccessorImpl.java:0, took 4,084026 s
                                                                                

In [11]:
station_info_df = spark.read \
    .jdbc(url=jdbc_url, table="station_information", properties=connection_properties) \
    .select("station_id",
            "name",
            "short_name",
            "lat",
            "lon",
            "region_id",
            "capacity",
            "eightd_has_key_dispenser",
            "has_kiosk",
            "installed",
            "last_reported", "created_at", "updated_at"
            ) \
            .cache()

In [24]:
station_status_df = spark.read \
    .jdbc(url=jdbc_url, table="station_status", properties=connection_properties) \
    .select("station_id", "num_bikes_available", "num_docks_available", "is_installed", "is_renting", "is_returning", "last_reported", "inserted_at")
station_status_df = station_status_df.withColumnRenamed("last_reported", "status_last_reported")

In [25]:
joined_df = station_status_df.join(
    station_info_df,
    on="station_id",
    how="inner")

In [26]:
joined_df.show(5)

25/01/02 17:54:19 INFO DefaultCachedBatchSerializer: Predicate isnotnull(station_id#226) generates partition filter: ((station_id.count#2076 - station_id.nullCount#2075) > 0)
25/01/02 17:54:19 INFO SparkContext: Starting job: $anonfun$withThreadLocalCaptured$1 at FutureTask.java:264
25/01/02 17:54:19 INFO DAGScheduler: Got job 11 ($anonfun$withThreadLocalCaptured$1 at FutureTask.java:264) with 1 output partitions
25/01/02 17:54:19 INFO DAGScheduler: Final stage: ResultStage 12 ($anonfun$withThreadLocalCaptured$1 at FutureTask.java:264)
25/01/02 17:54:19 INFO DAGScheduler: Parents of final stage: List()
25/01/02 17:54:19 INFO DAGScheduler: Missing parents: List()
25/01/02 17:54:19 INFO DAGScheduler: Submitting ResultStage 12 (MapPartitionsRDD[55] at $anonfun$withThreadLocalCaptured$1 at FutureTask.java:264), which has no missing parents
25/01/02 17:54:19 INFO MemoryStore: Block broadcast_14 stored as values in memory (estimated size 35.6 KiB, free 6.0 GiB)
25/01/02 17:54:19 INFO MemoryS

+--------------------+-------------------+-------------------+------------+----------+------------+--------------------+--------------------+--------------------+----------+-----------------+------------------+---------+--------+------------------------+---------+---------+--------------------+--------------------+--------------------+
|          station_id|num_bikes_available|num_docks_available|is_installed|is_renting|is_returning|status_last_reported|         inserted_at|                name|short_name|              lat|               lon|region_id|capacity|eightd_has_key_dispenser|has_kiosk|installed|       last_reported|          created_at|          updated_at|
+--------------------+-------------------+-------------------+------------+----------+------------+--------------------+--------------------+--------------------+----------+-----------------+------------------+---------+--------+------------------------+---------+---------+--------------------+--------------------+--------

25/01/02 17:54:22 INFO CodeGenerator: Code generated in 7.949458 ms
25/01/02 17:54:22 INFO JDBCRDD: closed connection
25/01/02 17:54:22 INFO Executor: Finished task 0.0 in stage 13.0 (TID 12). 2980 bytes result sent to driver
25/01/02 17:54:22 INFO TaskSetManager: Finished task 0.0 in stage 13.0 (TID 12) in 2146 ms on macbookpro.fritz.box (executor driver) (1/1)
25/01/02 17:54:22 INFO TaskSchedulerImpl: Removed TaskSet 13.0, whose tasks have all completed, from pool 
25/01/02 17:54:22 INFO DAGScheduler: ResultStage 13 (showString at NativeMethodAccessorImpl.java:0) finished in 2,150 s
25/01/02 17:54:22 INFO DAGScheduler: Job 12 is finished. Cancelling potential speculative or zombie tasks for this job
25/01/02 17:54:22 INFO TaskSchedulerImpl: Killing all running tasks in stage 13: Stage finished
25/01/02 17:54:22 INFO DAGScheduler: Job 12 finished: showString at NativeMethodAccessorImpl.java:0, took 2,152380 s
                                                                            

In [35]:
from pyspark.sql.functions import avg, col, to_date, max as spark_max

aggregated_df = joined_df.groupBy("station_id", "name", "short_name", "region_id", "lat", "lon", "capacity", "has_kiosk", "installed", "is_installed", "is_returning", "is_renting", ) \
    .agg(
        avg("num_bikes_available").alias("avg_bikes_available"),
        avg("num_docks_available").alias("avg_docks_available"),
        spark_max("status_last_reported").alias("status_last_reported"),
        spark_max("last_reported").alias("info_last_reported")
)
aggregated_df = aggregated_df.withColumn(
    "aggregation_date", to_date(col("status_last_reported"))
)

In [36]:
aggregated_df.show(5)

25/01/02 18:16:28 INFO CodeGenerator: Code generated in 6.916125 ms
25/01/02 18:16:28 INFO DefaultCachedBatchSerializer: Predicate isnotnull(station_id#226) generates partition filter: ((station_id.count#3811 - station_id.nullCount#3810) > 0)
25/01/02 18:16:28 INFO SparkContext: Starting job: $anonfun$withThreadLocalCaptured$1 at FutureTask.java:264
25/01/02 18:16:28 INFO DAGScheduler: Got job 23 ($anonfun$withThreadLocalCaptured$1 at FutureTask.java:264) with 1 output partitions
25/01/02 18:16:28 INFO DAGScheduler: Final stage: ResultStage 29 ($anonfun$withThreadLocalCaptured$1 at FutureTask.java:264)
25/01/02 18:16:28 INFO DAGScheduler: Parents of final stage: List()
25/01/02 18:16:28 INFO DAGScheduler: Missing parents: List()
25/01/02 18:16:28 INFO DAGScheduler: Submitting ResultStage 29 (MapPartitionsRDD[107] at $anonfun$withThreadLocalCaptured$1 at FutureTask.java:264), which has no missing parents
25/01/02 18:16:28 INFO MemoryStore: Block broadcast_30 stored as values in memory (

+--------------------+--------------------+----------+---------+----------+------------+--------+---------+---------+------------+------------+----------+-------------------+-------------------+--------------------+--------------------+----------------+
|          station_id|                name|short_name|region_id|       lat|         lon|capacity|has_kiosk|installed|is_installed|is_returning|is_renting|avg_bikes_available|avg_docks_available|status_last_reported|  info_last_reported|aggregation_date|
+--------------------+--------------------+----------+---------+----------+------------+--------+---------+---------+------------+------------+----------+-------------------+-------------------+--------------------+--------------------+----------------+
|66dd4c5f-0aca-11e...|Columbus Ave & W ...|   7175.05|       71|40.7770575|-73.97898475|      67|     true|    false|        true|        true|      true| 39.724425887265134| 23.748434237995824| 2025-01-02 17:14:34|2024-12-30 14:21:...|  

25/01/02 18:16:32 INFO JDBCRDD: closed connection
25/01/02 18:16:32 INFO Executor: Finished task 0.0 in stage 30.0 (TID 24). 3301 bytes result sent to driver
25/01/02 18:16:32 INFO TaskSetManager: Finished task 0.0 in stage 30.0 (TID 24) in 3280 ms on macbookpro.fritz.box (executor driver) (1/1)
25/01/02 18:16:32 INFO TaskSchedulerImpl: Removed TaskSet 30.0, whose tasks have all completed, from pool 
25/01/02 18:16:32 INFO DAGScheduler: ShuffleMapStage 30 (showString at NativeMethodAccessorImpl.java:0) finished in 3,286 s
25/01/02 18:16:32 INFO DAGScheduler: looking for newly runnable stages
25/01/02 18:16:32 INFO DAGScheduler: running: Set()
25/01/02 18:16:32 INFO DAGScheduler: waiting: Set()
25/01/02 18:16:32 INFO DAGScheduler: failed: Set()
25/01/02 18:16:32 INFO ShufflePartitionsUtil: For shuffle(6), advisory target size: 67108864, actual target size 1048576, minimum partition size: 1048576
25/01/02 18:16:32 INFO HashAggregateExec: spark.sql.codegen.aggregate.map.twolevel.enabled i

In [37]:
aggregated_df.createOrReplaceTempView("station_status_transformed")

In [38]:
dim_station_status = spark.sql("""
                               select count(*) as total_records
                               from station_status_transformed
                               """)

In [39]:
dim_station_status.show()

25/01/02 19:30:28 INFO CodeGenerator: Code generated in 7.99125 ms
25/01/02 19:30:28 INFO DefaultCachedBatchSerializer: Predicate isnotnull(station_id#226) generates partition filter: ((station_id.count#4158 - station_id.nullCount#4157) > 0)
25/01/02 19:30:28 INFO SparkContext: Starting job: $anonfun$withThreadLocalCaptured$1 at FutureTask.java:264
25/01/02 19:30:28 INFO DAGScheduler: Got job 26 ($anonfun$withThreadLocalCaptured$1 at FutureTask.java:264) with 1 output partitions
25/01/02 19:30:28 INFO DAGScheduler: Final stage: ResultStage 33 ($anonfun$withThreadLocalCaptured$1 at FutureTask.java:264)
25/01/02 19:30:28 INFO DAGScheduler: Parents of final stage: List()
25/01/02 19:30:28 INFO DAGScheduler: Missing parents: List()
25/01/02 19:30:28 INFO DAGScheduler: Submitting ResultStage 33 (MapPartitionsRDD[120] at $anonfun$withThreadLocalCaptured$1 at FutureTask.java:264), which has no missing parents
25/01/02 19:30:28 INFO MemoryStore: Block broadcast_34 stored as values in memory (e

+-------------+
|total_records|
+-------------+
|         2372|
+-------------+



25/01/02 19:30:33 INFO CodeGenerator: Code generated in 29.357833 ms
25/01/02 19:30:33 INFO Executor: Finished task 0.0 in stage 36.0 (TID 28). 6217 bytes result sent to driver
25/01/02 19:30:33 INFO TaskSetManager: Finished task 0.0 in stage 36.0 (TID 28) in 64 ms on macbookpro.fritz.box (executor driver) (1/1)
25/01/02 19:30:33 INFO TaskSchedulerImpl: Removed TaskSet 36.0, whose tasks have all completed, from pool 
25/01/02 19:30:33 INFO DAGScheduler: ShuffleMapStage 36 (showString at NativeMethodAccessorImpl.java:0) finished in 0,069 s
25/01/02 19:30:33 INFO DAGScheduler: looking for newly runnable stages
25/01/02 19:30:33 INFO DAGScheduler: running: Set()
25/01/02 19:30:33 INFO DAGScheduler: waiting: Set()
25/01/02 19:30:33 INFO DAGScheduler: failed: Set()
25/01/02 19:30:33 INFO SparkContext: Starting job: showString at NativeMethodAccessorImpl.java:0
25/01/02 19:30:33 INFO DAGScheduler: Got job 29 (showString at NativeMethodAccessorImpl.java:0) with 1 output partitions
25/01/02 19

In [40]:
aggregated_df.printSchema()

root
 |-- station_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- short_name: string (nullable = true)
 |-- region_id: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- capacity: integer (nullable = true)
 |-- has_kiosk: boolean (nullable = true)
 |-- installed: boolean (nullable = true)
 |-- is_installed: boolean (nullable = true)
 |-- is_returning: boolean (nullable = true)
 |-- is_renting: boolean (nullable = true)
 |-- avg_bikes_available: double (nullable = true)
 |-- avg_docks_available: double (nullable = true)
 |-- status_last_reported: timestamp (nullable = true)
 |-- info_last_reported: timestamp (nullable = true)
 |-- aggregation_date: date (nullable = true)



In [None]:
# station_status_transformed = \
# """CREATE TABLE IF NOT EXISTS station_status_transformed (
#     station_id VARCHAR(255),
#     name VARCHAR(255),
#     short_name VARCHAR(255),
#     region_id VARCHAR(255),
#     lat DECIMAL,
#     lon DECIMAL,
#     capacity INT,
#     has_kiosk BOOLEAN,
#     installed BOOLEAN,
#     is_installed BOOLEAN,
#     is_returning BOOLEAN,
#     is_renting BOOLEAN,
#     avg_bikes_available DECIMAL,
#     avg_docks_available DECIMAL,
#     status_last_reported TIMESTAMP,
#     info_last_reported TIMESTAMP,
#     aggregation_date DATE
# ) PARTITION BY RANGE (aggregation_date);
# """

In [42]:
aggregated_df.write \
    .jdbc(url=jdbc_url,
          table="station_status_transformed",
          mode="append",
          properties=connection_properties
          )

25/01/02 19:42:34 INFO DefaultCachedBatchSerializer: Predicate isnotnull(station_id#226) generates partition filter: ((station_id.count#4914 - station_id.nullCount#4913) > 0)
25/01/02 19:42:34 INFO SparkContext: Starting job: $anonfun$withThreadLocalCaptured$1 at FutureTask.java:264
25/01/02 19:42:34 INFO DAGScheduler: Got job 33 ($anonfun$withThreadLocalCaptured$1 at FutureTask.java:264) with 1 output partitions
25/01/02 19:42:34 INFO DAGScheduler: Final stage: ResultStage 44 ($anonfun$withThreadLocalCaptured$1 at FutureTask.java:264)
25/01/02 19:42:34 INFO DAGScheduler: Parents of final stage: List()
25/01/02 19:42:34 INFO DAGScheduler: Missing parents: List()
25/01/02 19:42:34 INFO DAGScheduler: Submitting ResultStage 44 (MapPartitionsRDD[151] at $anonfun$withThreadLocalCaptured$1 at FutureTask.java:264), which has no missing parents
25/01/02 19:42:34 INFO MemoryStore: Block broadcast_43 stored as values in memory (estimated size 34.6 KiB, free 6.1 GiB)
25/01/02 19:42:34 INFO Memory

In [43]:
df_tripdata.printSchema()

root
 |-- ride_id: string (nullable = true)
 |-- rideable_type: string (nullable = true)
 |-- started_at: timestamp (nullable = true)
 |-- ended_at: timestamp (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: string (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: string (nullable = true)
 |-- start_lat: decimal(9,6) (nullable = true)
 |-- start_lng: decimal(9,6) (nullable = true)
 |-- end_lat: decimal(9,6) (nullable = true)
 |-- end_lng: decimal(9,6) (nullable = true)
 |-- member_casual: string (nullable = true)
 |-- month: date (nullable = true)



In [70]:
tripdata_df = spark.read \
    .jdbc(url=jdbc_url, table="tripdata", properties=connection_properties) \
    .select(
        "ride_id", "rideable_type", "started_at", "ended_at", "start_station_name", "start_station_id", "end_station_name", "end_station_id", "start_lat", "start_lng", "end_lat", "end_lng", "member_casual", "month"
            ) \
            .cache()

tripdata_df = tripdata_df.withColumnRenamed("start_station_name", "original_start_station_name") \
                         .withColumnRenamed("start_lat", "original_start_lat") \
                         .withColumnRenamed("end_station_name", "original_end_station_name") \
                         .withColumnRenamed("end_lat", "original_end_lat")

In [45]:
aggregated_df.printSchema(5)

root
 |-- station_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- short_name: string (nullable = true)
 |-- region_id: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- capacity: integer (nullable = true)
 |-- has_kiosk: boolean (nullable = true)
 |-- installed: boolean (nullable = true)
 |-- is_installed: boolean (nullable = true)
 |-- is_returning: boolean (nullable = true)
 |-- is_renting: boolean (nullable = true)
 |-- avg_bikes_available: double (nullable = true)
 |-- avg_docks_available: double (nullable = true)
 |-- status_last_reported: timestamp (nullable = true)
 |-- info_last_reported: timestamp (nullable = true)
 |-- aggregation_date: date (nullable = true)



In [71]:
transformed_station_status_df = spark.read \
    .jdbc(url=jdbc_url, table="station_status_transformed", properties=connection_properties) \
        .select(
            "station_id", "name", "short_name", "region_id", "lat", "lon", "capacity", "has_kiosk", "installed", "is_installed",           "is_returning", "is_renting", "avg_bikes_available", "avg_docks_available", "status_last_reported", "info_last_reported",      "aggregation_date")\
        .cache()

In [72]:
from pyspark.sql.functions import col, broadcast

# Rename columns for Start Station
start_station_df = transformed_station_status_df.select(
    col("short_name").alias("start_station_short_name"),
    col("name").alias("start_station_name"),
    col("lat").alias("start_lat"),
    col("lon").alias("start_lon"),
    col("capacity").alias("start_capacity"),
    col("avg_bikes_available").alias("start_avg_bikes_available"),
    col("avg_docks_available").alias("start_avg_docks_available"),
    col("status_last_reported").alias("start_status_last_reported"),
    col("info_last_reported").alias("start_info_last_reported")
)

# Rename columns for End Station
end_station_df = transformed_station_status_df.select(
    col("short_name").alias("end_station_short_name"),
    col("name").alias("end_station_name"),
    col("lat").alias("end_lat"),
    col("lon").alias("end_lon"),
    col("capacity").alias("end_capacity"),
    col("avg_bikes_available").alias("end_avg_bikes_available"),
    col("avg_docks_available").alias("end_avg_docks_available"),
    col("status_last_reported").alias("end_status_last_reported"),
    col("info_last_reported").alias("end_info_last_reported")
)


In [73]:
from pyspark.sql.functions import broadcast

# Broadcast Start and End Station DataFrames
start_station_broadcast = broadcast(start_station_df)
end_station_broadcast = broadcast(end_station_df)


In [74]:
# Join tripdata_df with Start Station Information
trip_with_start = tripdata_df.join(
    start_station_broadcast,
    tripdata_df.start_station_id == start_station_broadcast.start_station_short_name,
    how="left"
)

# Join the result with End Station Information
final_df = trip_with_start.join(
    end_station_broadcast,
    tripdata_df.end_station_id == end_station_broadcast.end_station_short_name,
    how="left"
)

In [50]:
trip_with_start.show(5)

25/01/02 20:01:22 INFO CodeGenerator: Code generated in 16.601916 ms
25/01/02 20:01:22 INFO DAGScheduler: Got job 36 (showString at NativeMethodAccessorImpl.java:0) with 1 output partitions
25/01/02 20:01:22 INFO DAGScheduler: Final stage: ResultStage 48 (showString at NativeMethodAccessorImpl.java:0)
25/01/02 20:01:22 INFO DAGScheduler: Parents of final stage: List()
25/01/02 20:01:22 INFO DAGScheduler: Missing parents: List()
25/01/02 20:01:22 INFO DAGScheduler: Submitting ResultStage 48 (*(1) Scan JDBCRelation(tripdata) [numPartitions=1] [ride_id#5005,rideable_type#5006,started_at#5007,ended_at#5008,start_station_name#5009,start_station_id#5010,end_station_name#5011,end_station_id#5012,start_lat#5013,start_lng#5014,end_lat#5015,end_lng#5016,member_casual#5017,month#5018] PushedFilters: [], ReadSchema: struct<ride_id:string,rideable_type:string,started_at:timestamp,ended_at:timestamp,start_station_...
 MapPartitionsRDD[164] at showString at NativeMethodAccessorImpl.java:0), which has

+----------------+-------------+-------------------+-------------------+--------------------+----------------+--------------------+--------------+---------+----------+---------+----------+-------------+----------+------------------------+--------------------+--------------------+--------------------+--------------+-------------------------+-------------------------+--------------------------+------------------------+
|         ride_id|rideable_type|         started_at|           ended_at|  start_station_name|start_station_id|    end_station_name|end_station_id|start_lat| start_lng|  end_lat|   end_lng|member_casual|     month|start_station_short_name|  start_station_name|           start_lat|           start_lon|start_capacity|start_avg_bikes_available|start_avg_docks_available|start_status_last_reported|start_info_last_reported|
+----------------+-------------+-------------------+-------------------+--------------------+----------------+--------------------+--------------+---------+--

25/01/02 20:01:42 INFO JDBCRDD: closed connection
25/01/02 20:01:42 INFO MemoryStore: Block rdd_164_0 stored as values in memory (estimated size 124.3 MiB, free 5.9 GiB)
25/01/02 20:01:42 INFO BlockManagerInfo: Added rdd_164_0 in memory on macbookpro.fritz.box:53671 (size: 124.3 MiB, free: 6.0 GiB)
25/01/02 20:01:42 INFO Executor: 1 block locks were not released by task 0.0 in stage 48.0 (TID 36)
[rdd_164_0]
25/01/02 20:01:42 INFO Executor: Finished task 0.0 in stage 48.0 (TID 36). 1402 bytes result sent to driver
25/01/02 20:01:42 INFO TaskSetManager: Finished task 0.0 in stage 48.0 (TID 36) in 19982 ms on macbookpro.fritz.box (executor driver) (1/1)
25/01/02 20:01:42 INFO TaskSchedulerImpl: Removed TaskSet 48.0, whose tasks have all completed, from pool 
25/01/02 20:01:42 INFO DAGScheduler: ResultStage 48 (showString at NativeMethodAccessorImpl.java:0) finished in 20,005 s
25/01/02 20:01:42 INFO DAGScheduler: Job 36 is finished. Cancelling potential speculative or zombie tasks for th

In [62]:
final_df.show(5)

25/01/02 20:15:56 INFO DefaultCachedBatchSerializer: Predicate isnotnull(short_name#5119) generates partition filter: ((short_name.count#8802 - short_name.nullCount#8801) > 0)
25/01/02 20:15:56 INFO DefaultCachedBatchSerializer: Predicate isnotnull(short_name#7691) generates partition filter: ((short_name.count#8887 - short_name.nullCount#8886) > 0)
25/01/02 20:15:56 INFO SparkContext: Starting job: $anonfun$withThreadLocalCaptured$1 at FutureTask.java:264
25/01/02 20:15:56 INFO SparkContext: Starting job: $anonfun$withThreadLocalCaptured$1 at FutureTask.java:264
25/01/02 20:15:56 INFO DAGScheduler: Got job 43 ($anonfun$withThreadLocalCaptured$1 at FutureTask.java:264) with 1 output partitions
25/01/02 20:15:56 INFO DAGScheduler: Final stage: ResultStage 55 ($anonfun$withThreadLocalCaptured$1 at FutureTask.java:264)
25/01/02 20:15:56 INFO DAGScheduler: Parents of final stage: List()
25/01/02 20:15:56 INFO DAGScheduler: Missing parents: List()
25/01/02 20:15:56 INFO DAGScheduler: Submit

+----------------+-------------+-------------------+-------------------+--------------------+----------------+--------------------+--------------+---------+----------+---------+----------+-------------+----------+------------------------+--------------------+--------------------+--------------------+--------------+-------------------------+-------------------------+--------------------------+------------------------+----------------------+--------------------+--------------------+--------------------+------------+-----------------------+-----------------------+------------------------+----------------------+
|         ride_id|rideable_type|         started_at|           ended_at|  start_station_name|start_station_id|    end_station_name|end_station_id|start_lat| start_lng|  end_lat|   end_lng|member_casual|     month|start_station_short_name|  start_station_name|           start_lat|           start_lon|start_capacity|start_avg_bikes_available|start_avg_docks_available|start_status_las

25/01/02 20:15:56 INFO CodeGenerator: Code generated in 22.124709 ms
25/01/02 20:15:56 INFO SparkContext: Starting job: showString at NativeMethodAccessorImpl.java:0
25/01/02 20:15:56 INFO DAGScheduler: Got job 45 (showString at NativeMethodAccessorImpl.java:0) with 1 output partitions
25/01/02 20:15:56 INFO DAGScheduler: Final stage: ResultStage 57 (showString at NativeMethodAccessorImpl.java:0)
25/01/02 20:15:56 INFO DAGScheduler: Parents of final stage: List()
25/01/02 20:15:56 INFO DAGScheduler: Missing parents: List()
25/01/02 20:15:56 INFO DAGScheduler: Submitting ResultStage 57 (MapPartitionsRDD[207] at showString at NativeMethodAccessorImpl.java:0), which has no missing parents
25/01/02 20:15:56 INFO MemoryStore: Block broadcast_61 stored as values in memory (estimated size 44.6 KiB, free 5.9 GiB)
25/01/02 20:15:56 INFO MemoryStore: Block broadcast_61_piece0 stored as bytes in memory (estimated size 16.3 KiB, free 5.9 GiB)
25/01/02 20:15:56 INFO BlockManagerInfo: Added broadcas

In [75]:
final_df.printSchema()

root
 |-- ride_id: string (nullable = true)
 |-- rideable_type: string (nullable = true)
 |-- started_at: timestamp (nullable = true)
 |-- ended_at: timestamp (nullable = true)
 |-- original_start_station_name: string (nullable = true)
 |-- start_station_id: string (nullable = true)
 |-- original_end_station_name: string (nullable = true)
 |-- end_station_id: string (nullable = true)
 |-- original_start_lat: decimal(9,6) (nullable = true)
 |-- start_lng: decimal(9,6) (nullable = true)
 |-- original_end_lat: decimal(9,6) (nullable = true)
 |-- end_lng: decimal(9,6) (nullable = true)
 |-- member_casual: string (nullable = true)
 |-- month: date (nullable = true)
 |-- start_station_short_name: string (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_lat: decimal(38,18) (nullable = true)
 |-- start_lon: decimal(38,18) (nullable = true)
 |-- start_capacity: integer (nullable = true)
 |-- start_avg_bikes_available: decimal(38,18) (nullable = true)
 |-- start_avg_

In [53]:
"""
CREATE TABLE agg_tripdata (
    ride_id VARCHAR PRIMARY KEY,
    rideable_type VARCHAR,
    started_at TIMESTAMP,
    ended_at TIMESTAMP,
    member_casual VARCHAR,
    month DATE,
    start_station_id VARCHAR,
    start_station_name_enriched VARCHAR,
    start_lat_enriched DECIMAL(38,18),
    start_lng DECIMAL(9,6),
    start_lon_enriched DECIMAL(38,18),
    start_capacity INTEGER,
    start_avg_bikes_available DECIMAL(38,18),
    start_avg_docks_available DECIMAL(38,18),
    start_status_last_reported TIMESTAMP,
    start_info_last_reported TIMESTAMP,
    end_station_id VARCHAR,
    end_station_name_enriched VARCHAR,
    end_lat_enriched DECIMAL(38,18),
    end_lng DECIMAL(9,6),
    end_lon_enriched DECIMAL(38,18),
    end_capacity INTEGER,
    end_avg_bikes_available DECIMAL(38,18),
    end_avg_docks_available DECIMAL(38,18),
    end_status_last_reported TIMESTAMP,
    end_info_last_reported TIMESTAMP,
    trip_duration_seconds BIGINT,
    aggregation_date DATE
);
"""

AnalysisException: [AMBIGUOUS_REFERENCE] Reference `start_station_name` is ambiguous, could be: [`start_station_name`, `start_station_name`].

In [76]:
final_df.write \
    .jdbc(url=jdbc_url,
          table="agg_tripdata",
          mode="append",
          properties=connection_properties
          )

AnalysisException: Column start_station_id not found in schema Some(StructType(StructField(ride_id,StringType,true),StructField(rideable_type,StringType,true),StructField(started_at,TimestampType,true),StructField(ended_at,TimestampType,true),StructField(original_start_station_name,StringType,true),StructField(original_start_lat,DecimalType(38,18),true),StructField(original_end_station_name,StringType,true),StructField(original_end_lat,DecimalType(38,18),true),StructField(member_casual,StringType,true),StructField(month,IntegerType,true),StructField(start_station_short_name,StringType,true),StructField(start_station_name,StringType,true),StructField(start_lat,DecimalType(38,18),true),StructField(start_lon,DecimalType(38,18),true),StructField(start_capacity,IntegerType,true),StructField(start_avg_bikes_available,DecimalType(38,18),true),StructField(start_avg_docks_available,DecimalType(38,18),true),StructField(start_status_last_reported,TimestampType,true),StructField(start_info_last_reported,TimestampType,true),StructField(end_station_short_name,StringType,true),StructField(end_station_name,StringType,true),StructField(end_lat,DecimalType(38,18),true),StructField(end_lon,DecimalType(38,18),true),StructField(end_capacity,IntegerType,true),StructField(end_avg_bikes_available,DecimalType(38,18),true),StructField(end_avg_docks_available,DecimalType(38,18),true),StructField(end_status_last_reported,TimestampType,true),StructField(end_info_last_reported,TimestampType,true))).