<a href="https://colab.research.google.com/github/PedroTechy/CarrisInsight/blob/streaming_development/spark_jobs/extract_carris_vehicles.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Step 1: Authenticate with Google Cloud


In [None]:
!gcloud auth application-default login

# Step 2: Install Spark

In [None]:
!pip install pyspark

# Step 3: Setup Spark Env and Imports

In [2]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, LongType
from pyspark.sql.functions import min, max, first, last, col, window, from_unixtime, to_timestamp, count, udf
import os

In [None]:
home_directory = os.getenv("HOME")

spark = SparkSession.builder \
    .appName('pyspark-run-with-gcp-bucket') \
    .config("spark.jars", "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar") \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .getOrCreate()

gs_input_path = "gs://edit-de-project-streaming-data/carris-vehicles"
spark._jsc.hadoopConfiguration().set("google.cloud.auth.service.account.json.keyfile",
                                     f"{home_directory}/.config/gcloud/application_default_credentials.json")

In [7]:
#Cleaning the path to ensure clean directories
!rm -rf content/lake/

# Step 4: Define Schema and User Defined Functions

In [109]:
# Define the schema for your JSON files

schema = StructType([
    StructField("bearing", FloatType(), True),
    StructField("block_id", StringType(), True),
    StructField("current_status", StringType(), True),
    StructField("id", StringType(), True),
    StructField("lat", FloatType(), True),
    StructField("line_id", StringType(), True),
    StructField("lon", FloatType(), True),
    StructField("pattern_id", StringType(), True),
    StructField("route_id", StringType(), True),
    StructField("schedule_relationship", StringType(), True),
    StructField("shift_id", StringType(), True),
    StructField("speed", FloatType(), True),
    StructField("stop_id", StringType(), True),
    StructField("timestamp", LongType(), True),
    StructField("trip_id", StringType(), True)
])

In [110]:
import math
def haversine(lat1, lon1, lat2, lon2):
    try:
      # Earth radius in kilometers
      R = 6371.0

      # Convert latitude and longitude from degrees to radians
      lat1_rad, lon1_rad = math.radians(lat1), math.radians(lon1)
      lat2_rad, lon2_rad = math.radians(lat2), math.radians(lon2)

      # Differences
      delta_lat = lat2_rad - lat1_rad
      delta_lon = lon2_rad - lon1_rad

      # Haversine formula
      a = math.sin(delta_lat / 2)**2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(delta_lon / 2)**2
      c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))

      # Distance
      distance = R * c
      return distance
    except:
      return 0

haversine_udf = udf(haversine, FloatType())



In [171]:
def get_stops(spark):
    url = f"https://api.carrismetropolitana.pt/stops"
    response = requests.get(url)

    filtered = [{'stop_id': stop['stop_id'], 'stop_lat': float(
        stop['lat']), 'stop_lon': float(stop['lon'])} for stop in json.loads(response.text)]
    
    schema = StructType([
    StructField("stop_id", StringType(), True),
    StructField("stop_lat", FloatType(), True),
    StructField("stop_lon", FloatType(), True)])

    stops = spark.createDataFrame(filtered, schema=schema)

    return stops
stops = get_stops(spark)


In [133]:
def aggregate_data(df):
    window_spec = window("timestamp", "2 minutes", "10 seconds")

    transformed = (df.withWatermark("timestamp", "3 minutes")
    .groupBy("id", "trip_id", window_spec)
    .agg(
        max("current_status").alias("current_status"),
        max("route_id").alias("route_id"),
        max("stop_id").alias("stop_id"),
        min("timestamp").alias("first_timestamp"),
        max("timestamp").alias("last_timestamp"),
        first("lat").alias("first_lat"),
        first("lon").alias("first_lon"),
        last("lat").alias("last_lat"),
        last("lon").alias("last_lon")
    )
    )
    return transformed

In [166]:
def calculate_distances_without_udf(df):
    print("started transform")
    dist_df = (df
               .withColumn(
                   "distance_km",
                   6371.0 * (2 * math.atan2(
                       math.sqrt(

                           math.sin(
                               (math.radians(col("latitude_2")) - math.radians(col("latitude_1"))) / 2)**2 +

                           math.cos(math.radians(col("latitude_1"))) * math.cos(math.radians(col("latitude_2"))) *
                           math.sin((math.radians(col("longitude_2")) -
                                     math.radians(col("longitude_1")))/2)**2

                       ),
                       math.sqrt(1 - (math.sin(
                           (math.radians(col("latitude_2")) - math.radians(col("latitude_1"))) / 2)**2 +

                           math.cos(math.radians(col("latitude_1"))) * math.cos(math.radians(col("latitude_2"))) *
                           math.sin((math.radians(col("longitude_2")) - math.radians(col("longitude_1")))/2)**2))))
               )
               .withColumn(
                   "time_delta", col("last_timestamp").cast("long") -
                   col("first_timestamp").cast("long")
               ).withColumn("average_speed", col("distance") / (col("time_delta") / 3600))
               )
    print("ended transform")

    return dist_df

In [168]:
def calculate_distances(df):
    print("started transform")
    dist_df = (df.withColumn(
        "distance",
        haversine_udf(col("first_lat"), col("first_lon"), col("last_lat"), col("last_lon")
        ))
    .withColumn(
        "time_delta", col("last_timestamp").cast("long") - col("first_timestamp").cast("long")
    ).withColumn("average_speed", col("distance") / (col("time_delta") / 3600))
        )
    print("ended transform")

    return dist_df



## Step 5: Start streaming and transformation process

In [None]:
df = (spark.readStream.option("maxFilesPerTrigger", 1)
    .format("json")
    .schema(schema)
    .load(gs_input_path))
print("will start streaming")


transformed_df = df.withColumn("timestamp", to_timestamp(from_unixtime("timestamp")))
# Write a df with the datetype transform only
print("Transformed data")


# Group by vehicle ID and window, then get the first and last timestamps and lat/lon values
result_df = (
    transformed_df.transform(aggregate_data).transform(calculate_distances)

)

query = (result_df.writeStream
.outputMode('append')
.option('checkpointLocation', 'content/lake/processing/vehicles_checkpoint')
.trigger(processingTime='10 seconds')
.start('content/lake/processing/vehicles/data')
)

query.awaitTermination(30)

query.stop()


## Step 6: Read and verify results

In [None]:
df = spark.read.format("parquet").load("content/lake/processing/vehicles/data")


In [None]:
df.count()

In [None]:
view = (df
 .orderBy("last_timestamp", ascending=False)
 .dropDuplicates(["id"])
 .join(stops, how='left', on='stop_id')
 .withColumn("distance_till_stop", haversine_udf(col("last_lat"), col('last_lon'), col('stop_lat'), col('stop_lon')))
 .withColumn("stop_eta", col("distance_till_stop") / col("average_speed") *60)
 )


In [None]:
view.show()