In [74]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, from_unixtime, to_timestamp, unix_timestamp,udf,when,ceil
from geopy.distance import geodesic
from pyspark.sql.types import FloatType
import pandas as pd

In [75]:
# Initialize Spark session
spark = SparkSession.builder.appName("ETL_Pipeline").getOrCreate()

In [76]:
# File paths
gps_data_path = "data/gps_data.json"
route_schedule_path = "data/route_schedule.csv"
output_file_path = "data/output.parquet"

In [77]:
# Define the schema for GPS data
# Load the GPS data (allow Spark to infer the schema)
gps_data_path = "data/gps_data.json"
gps_df = spark.read.option("multiline", "true").json(gps_data_path)

# Show the inferred schema and data
gps_df.printSchema()
gps_df.show(truncate=False)

root
 |-- bus_id: string (nullable = true)
 |-- current_time: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)

+------+-------------------+--------+---------+
|bus_id|current_time       |latitude|longitude|
+------+-------------------+--------+---------+
|B101  |2023-01-01 12:50:00|37.7749 |-122.4194|
|B102  |2023-01-01 12:55:00|36.8716 |-121.2727|
+------+-------------------+--------+---------+



In [79]:
 # Load Route Schedule data
 # Load Route Schedule data
routes_df = spark.read.csv(route_schedule_path, header=True, inferSchema=True) \
    .withColumnRenamed("latitude", "stop_latitude") \
    .withColumnRenamed("longitude", "stop_longitude") \
    .alias("routes")

# Show the schema and data
routes_df.printSchema()
routes_df.show(truncate=False)

root
 |-- bus_id: string (nullable = true)
 |-- route_id: integer (nullable = true)
 |-- stop_id: integer (nullable = true)
 |-- stop_name: string (nullable = true)
 |-- stop_latitude: double (nullable = true)
 |-- stop_longitude: double (nullable = true)
 |-- scheduled_time: timestamp (nullable = true)

+------+--------+-------+---------+-------------+--------------+-------------------+
|bus_id|route_id|stop_id|stop_name|stop_latitude|stop_longitude|scheduled_time     |
+------+--------+-------+---------+-------------+--------------+-------------------+
|B101  |1       |101    |Main St  |37.7749      |-122.4194     |2023-01-01 12:50:00|
|B102  |2       |102    |Market St|37.8716      |-122.2727     |2023-01-01 12:55:00|
+------+--------+-------+---------+-------------+--------------+-------------------+



In [80]:
gps_with_routes = gps_df.join(routes_df, on="bus_id")
gps_with_routes.printSchema()
gps_with_routes.show(truncate=False)


root
 |-- bus_id: string (nullable = true)
 |-- current_time: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- route_id: integer (nullable = true)
 |-- stop_id: integer (nullable = true)
 |-- stop_name: string (nullable = true)
 |-- stop_latitude: double (nullable = true)
 |-- stop_longitude: double (nullable = true)
 |-- scheduled_time: timestamp (nullable = true)

+------+-------------------+--------+---------+--------+-------+---------+-------------+--------------+-------------------+
|bus_id|current_time       |latitude|longitude|route_id|stop_id|stop_name|stop_latitude|stop_longitude|scheduled_time     |
+------+-------------------+--------+---------+--------+-------+---------+-------------+--------------+-------------------+
|B101  |2023-01-01 12:50:00|37.7749 |-122.4194|1       |101    |Main St  |37.7749      |-122.4194     |2023-01-01 12:50:00|
|B102  |2023-01-01 12:55:00|36.8716 |-121.2727|2       |102    |Market St

In [81]:
# Calculate the distance between the bus location and stop location
gps_with_routes = gps_with_routes.withColumn(
    "distance_km",
    expr("""
        6371 * acos(
            cos(radians(latitude)) *
            cos(radians(stop_latitude)) *
            cos(radians(stop_longitude) - radians(longitude)) +
            sin(radians(latitude)) *
            sin(radians(routes.stop_latitude))
        )
    """)
)
gps_with_routes.select("bus_id","route_id","stop_id","stop_name","scheduled_time","current_time","distance_km").show(truncate=False)

+------+--------+-------+---------+-------------------+-------------------+------------------+
|bus_id|route_id|stop_id|stop_name|scheduled_time     |current_time       |distance_km       |
+------+--------+-------+---------+-------------------+-------------------+------------------+
|B101  |1       |101    |Main St  |2023-01-01 12:50:00|2023-01-01 12:50:00|0.0               |
|B102  |2       |102    |Market St|2023-01-01 12:55:00|2023-01-01 12:55:00|142.03047987009157|
+------+--------+-------+---------+-------------------+-------------------+------------------+



In [82]:
# Define the conditions
df_with_times = gps_with_routes.withColumn(
    "actual_arrival_time", 
    when(gps_with_routes["distance_km"] <= 1, gps_with_routes["current_time"]).otherwise(None)
).withColumn(
    "time_taken_for_distance",
    ceil(3 * gps_with_routes["distance_km"]).cast("int")  # Rounding up the value
)
df_with_times.select("bus_id", "route_id", "stop_id", "stop_name", "scheduled_time", "current_time", "distance_km", "actual_arrival_time", "time_taken_for_distance").show(truncate=False)

+------+--------+-------+---------+-------------------+-------------------+------------------+-------------------+-----------------------+
|bus_id|route_id|stop_id|stop_name|scheduled_time     |current_time       |distance_km       |actual_arrival_time|time_taken_for_distance|
+------+--------+-------+---------+-------------------+-------------------+------------------+-------------------+-----------------------+
|B101  |1       |101    |Main St  |2023-01-01 12:50:00|2023-01-01 12:50:00|0.0               |2023-01-01 12:50:00|0                      |
|B102  |2       |102    |Market St|2023-01-01 12:55:00|2023-01-01 12:55:00|142.03047987009157|NULL               |427                    |
+------+--------+-------+---------+-------------------+-------------------+------------------+-------------------+-----------------------+



In [90]:
def add_minutes_to_timestamp(df, timestamp_col, minutes_col):
    return df.withColumn('approximate_arrival_time', 
        from_unixtime(
            unix_timestamp(col(timestamp_col)) + (col(minutes_col) * 60)
        )
    )


# Example usage
# Assuming df has 'current_time' and 'time_taken_for_distance' columns
result_df = add_minutes_to_timestamp(df_with_times, 'current_time', 'time_taken_for_distance')
result_df = result_df.withColumn('distance_km', ceil(col('distance_km')).cast('int'))

result_df.select("bus_id", "route_id", "stop_id", "stop_name", "scheduled_time", "actual_arrival_time", "approximate_arrival_time", "distance_km").show(truncate=False)

+------+--------+-------+---------+-------------------+-------------------+------------------------+-----------+
|bus_id|route_id|stop_id|stop_name|scheduled_time     |actual_arrival_time|approximate_arrival_time|distance_km|
+------+--------+-------+---------+-------------------+-------------------+------------------------+-----------+
|B101  |1       |101    |Main St  |2023-01-01 12:50:00|2023-01-01 12:50:00|2023-01-01 12:50:00     |0          |
|B102  |2       |102    |Market St|2023-01-01 12:55:00|NULL               |2023-01-01 20:02:00     |143        |
+------+--------+-------+---------+-------------------+-------------------+------------------------+-----------+



In [21]:
# Speed of the bus in km/h
BUS_SPEED_KMH = 20
# Calculate the delay and approximate arrival time based on the distance
def calculate_approx_arrival_time(row):
    if row['distance_km'] > 1:  # If bus is more than 1 km away from the stop
        travel_time_hours = row['distance_km'] / BUS_SPEED_KMH  # Time to travel the distance in hours
        travel_time_seconds = travel_time_hours * 3600  # Convert to seconds
        approx_arrival_time = row['current_time'] + travel_time_seconds  # Calculate the estimated time
        return approx_arrival_time
    return None

In [28]:
schema = ["bus_id", "route_id", "stop_id", "stop_name", "scheduled_time", "current_time", "distance_km"]

# Convert RDD to DataFrame
gps_with_routes_df = spark.createDataFrame(gps_with_routes, schema)

# Speed of the bus in km/h
BUS_SPEED_KMH = 20

# Calculate the delay and approximate arrival time based on the distance using pandas_udf
@udf(returnType=FloatType())
def calculate_approx_arrival_time(current_time: pd.Timestamp, distance_km: float):
    if distance_km > 1:  # If bus is more than 1 km away from the stop
        travel_time_hours = distance_km / BUS_SPEED_KMH  # Time to travel the distance in hours
        travel_time_seconds = travel_time_hours * 3600  # Convert to seconds
        approx_arrival_time = current_time + pd.Timedelta(seconds=travel_time_seconds)  # Estimated time
        return approx_arrival_time
    return None

# Convert current_time to timestamp if it's not already
from pyspark.sql.functions import to_timestamp
gps_with_routes_df = gps_with_routes_df.withColumn(
    "current_time", 
    to_timestamp(col("current_time"), "yyyy-MM-dd HH:mm:ss")
)

# Apply UDF to the DataFrame
gps_with_routes_df = gps_with_routes_df.withColumn(
    "bus_approx_arrival_time", 
    calculate_approx_arrival_time(col("current_time"), col("distance_km"))
)

# Show the result
gps_with_routes_df.show(truncate=False)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 22.0 failed 1 times, most recent failure: Lost task 0.0 in stage 22.0 (TID 22) (DESKTOP-CLKN115 executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:701)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:745)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:698)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:663)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:639)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:585)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:543)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 17 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:181)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:701)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:745)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:698)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:663)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:639)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:585)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:543)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 17 more
