In [105]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [106]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [107]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("Squaregps") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "2") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()


In [108]:
#import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from datetime import datetime
import numpy as np
import pandas as pd
from time import sleep

In [109]:
from geopy.geocoders import Nominatim
from geopy.point import Point

geolocator = Nominatim(user_agent="test")
geocode_cache = {}

def reverse_geocoding(lat, lng, retries=3):
    key = (lat, lng)
    if key in geocode_cache:
        return geocode_cache[key]
    for _ in range(retries):
        try:
            location = geolocator.reverse((lat, lng), timeout=60)
            address = location.address if location else (lat, lng)
            geocode_cache[key] = address
            return address
        except Exception as e:
            print(f"Geocoding error: {e}, retrying...")
            sleep(1)
    return (lat, lng)


In [110]:
def parse_timestamp(ts_str):
    try:
        # Parse the timestamp string to a datetime object
        dt = datetime.strptime(ts_str, '%Y-%m-%dT%H:%M:%S%z')
        return dt
    except Exception as e:
        print(f"Error parsing timestamp {ts_str}: {e}")
        return None

parse_timestamp_udf = udf(parse_timestamp, TimestampType())



In [111]:
df = spark.read \
.option("inferSchema", "true") \
.option("header", "true").csv("/content/Untitled_raw_data.csv")

In [112]:
# Convert `msg_time` to timestamp type
df = df.withColumn('msg_time', parse_timestamp_udf(col('msg_time')))

In [113]:
# Calculate time difference between rows (in seconds)
window_spec = Window.orderBy('msg_time')
df = df.withColumn('time_diff', (unix_timestamp(col('msg_time')) - unix_timestamp(lag('msg_time').over(window_spec))).cast('int'))

# Create a column to identify periods of speed < 5 km/h
df = df.withColumn('below_5', when(col('speed') < 5, 1).otherwise(0))


In [114]:
# Flagging the start and end of parking periods
# A parking start is flagged when the speed drops below 5 km/h after being above 5 km/h.
# A parking end is flagged when the speed rises above 5 km/h after being below 5 km/h.

df = df.withColumn('parking_start_flag', when((col('below_5') == 1) & (lag('below_5').over(window_spec) == 0), 1).otherwise(0))
df = df.withColumn('parking_end_flag', when((col('below_5') == 1) & (lead('below_5').over(window_spec) == 0), 1).otherwise(0))

# Removing rows without moving
df = df.where("not (below_5 == 1 and parking_start_flag == 0 and parking_end_flag == 0)")

In [115]:
# calculate the time difference between the start and end of parking periods.
df = df.withColumn('parking_diff', (unix_timestamp(col('msg_time')) - unix_timestamp(lag('msg_time').over(window_spec))))
df = df.withColumn('parking_diff_2', lead('parking_diff').over(window_spec))\
.withColumn('next_parking_flag', lag('parking_end_flag').over(window_spec))
#it is long enough for parking
df = df.withColumn('is_parking', expr('parking_diff > 180 or parking_diff_2 > 180'))

In [116]:
df.limit(10).show()

+-------------------+-----------+------------+-----+----+---------+-------+------------------+----------------+------------+--------------+-----------------+----------+
|           msg_time|        lat|         lng|speed|fuel|time_diff|below_5|parking_start_flag|parking_end_flag|parking_diff|parking_diff_2|next_parking_flag|is_parking|
+-------------------+-----------+------------+-----+----+---------+-------+------------------+----------------+------------+--------------+-----------------+----------+
|2024-06-30 23:07:26|40.94801833|-74.23513833|    0|  \N|        5|      1|                 0|               1|        null|             1|             null|      null|
|2024-06-30 23:07:27|40.94801833|-74.23513833|   33|  \N|        1|      0|                 0|               0|           1|             8|                1|     false|
|2024-06-30 23:07:35|40.94766667|-74.23623667|   49|  \N|        8|      0|                 0|               0|           8|             5|                

In [117]:
#Identify trips
df = df.withColumn('trip_start_flag', expr("case when next_parking_flag = 1 and not is_parking then 1 else 0 end"))
df = df.withColumn('trip_end_flag', expr("case when parking_start_flag = 1 and is_parking then 1 else 0 end"))

In [118]:
# Assign a trip ID
df = df.withColumn('trip_id', expr('sum(trip_start_flag) over (order by msg_time rows between unbounded preceding and current row)'))

In [119]:
#сalculate speed_max #avg speed
df = df.withColumn('speed_max', max('speed').over(Window.partitionBy('trip_id')))

df = df.withColumn('trip_dur', sum(expr('case when trip_end_flag =1 then 0 else time_diff end')).over(Window.partitionBy('trip_id')))
df = df.withColumn('distance', (col('speed') * col('time_diff')) / 3600)
df = df.withColumn('trip_distance', sum(expr('case when trip_end_flag =1 then 0 else distance end')).over(Window.partitionBy('trip_id')))
df = df.withColumn('avg_speed',  round(col('trip_distance') / (col('trip_dur') / 3600), 2))

In [121]:
# Only keep rows that mark the start or end of trips.
trips = df.where("trip_start_flag = 1 or trip_end_flag = 1") \
.withColumn('trip_start_dttm', min('msg_time').over(Window.partitionBy('trip_id'))) \
.withColumn('trip_end_dttm', max('msg_time').over(Window.partitionBy('trip_id'))) \
.withColumn('end_lat', lead('lat').over(Window.partitionBy('trip_id').orderBy('msg_time'))) \
.withColumn('end_lng', lead('lng').over(Window.partitionBy('trip_id').orderBy('msg_time'))) \
.select('trip_id', 'trip_start_dttm', 'trip_end_dttm', col('lat').alias('start_lat'), col('lng').alias('start_lng'),'end_lat', 'end_lng','speed_max', 'avg_speed')  \

# collapse to only one row per trip
trips = trips.withColumn("rn", row_number().over(Window.partitionBy('trip_id').orderBy('trip_start_dttm')))  \
    .select('trip_id', 'trip_start_dttm', 'trip_end_dttm', 'start_lat', 'start_lng', 'end_lat', 'end_lng', 'speed_max', 'avg_speed')  \
    .where("rn = 1")

In [122]:
trips.count()

487

In [123]:
trips.show()

+-------+-------------------+-------------------+-----------+------------+-----------+------------+---------+---------+
|trip_id|    trip_start_dttm|      trip_end_dttm|  start_lat|   start_lng|    end_lat|     end_lng|speed_max|avg_speed|
+-------+-------------------+-------------------+-----------+------------+-----------+------------+---------+---------+
|      1|2024-06-30 23:07:27|2024-06-30 23:27:57|40.94801833|-74.23513833|40.97530167|-74.16303833|       59|    43.43|
|      2|2024-06-30 23:37:53|2024-07-01 00:03:28|40.97530167|-74.16303833|41.02764333|-74.03736333|       59|    43.79|
|      3|2024-07-01 00:13:24|2024-07-01 00:25:23|41.02764333|-74.03736333|40.96947333|-74.06209667|       59|    43.99|
|      4|2024-07-01 00:35:20|2024-07-01 00:48:34|40.96947333|-74.06209667|  40.915215|-74.01376167|       59|    44.23|
|      5|2024-07-01 00:58:30|2024-07-01 01:11:45|  40.915215|-74.01376167|40.99088833|   -73.97123|       59|    43.02|
|      6|2024-07-01 01:21:41|2024-07-01 

In [124]:
#  Convert a pandas df
trips_pd_start = trips.select(col('trip_id'),col('start_lat'),col('start_lng'))\
.toPandas()
trips_pd_end = trips.select(col('trip_id'),col('end_lat').alias('start_lat'),col('end_lng').alias('start_lng'))\
.toPandas()

In [125]:
# Concatenate start and end trip points
frames = [trips_pd_start, trips_pd_end]
result = pd.concat(frames).drop_duplicates().dropna()

In [126]:
#get addresses for the lat/lng points
result['address'] = np.vectorize(reverse_geocoding)(result['start_lat'], result['start_lng'])

In [127]:
result.head()

Unnamed: 0,trip_id,start_lat,start_lng,address
0,1,40.948018,-74.235138,"Preakness HIlls Country Club, 1050, Ratzer Roa..."
1,2,40.975302,-74.163038,"34, Orchard Place, Hawthorne, Passaic County, ..."
2,3,41.027643,-74.037363,"88, Chadwick Court, Park Ridge, Bergen County,..."
3,4,40.969473,-74.062097,"740, Pascack Road, Paramus, Bergen County, New..."
4,5,40.915215,-74.013762,"158, Baker Avenue, Surrey Lane Estates, Bergen..."


In [128]:
from pyspark.sql import Row

# Convert Pandas DataFrame to list of Rows and Create Spark DataFrame
rows = [Row(**row) for row in result.to_dict(orient='records')]
trips_with_addresses = spark.createDataFrame(rows)


In [129]:
start = trips_with_addresses.alias('start')

In [130]:
# Join the detailed trip information (trip_id, start/end lat/lng) with the addresses

trips_with_details = trips.join(
    start,
    ((trips.trip_id == start.trip_id) &
    (trips.start_lat == start.start_lat) &
    (trips.start_lng == start.start_lng)) ,
    'left'
).select(
    trips.trip_id,
    trips.trip_start_dttm,
    trips.trip_end_dttm,
    trips.end_lat,
    trips.end_lng,
    trips.speed_max,
    trips.avg_speed,
    start.address.alias('Address Start'))




In [135]:
final_result = trips_with_details.join(
    trips_with_addresses,
    ((trips_with_details.trip_id == trips_with_addresses.trip_id) &
    (trips_with_details.end_lat == trips_with_addresses.start_lat) &
    (trips_with_details.end_lng == trips_with_addresses.start_lng) ),
    'left'  \
).select( \
    trips.trip_id,  \
    trips.trip_start_dttm,
    trips.trip_end_dttm,
    col('Address Start'),
    trips_with_addresses.address.alias('Address End'),
    trips.avg_speed.alias('Speed Average') ,
    trips.speed_max.alias('Speed Max'))


In [136]:

# Show the resulting DataFrame
final_result.show(truncate=False)

+-------+-------------------+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------+-------------+---------+
|trip_id|trip_start_dttm    |trip_end_dttm      |Address Start                                                                                                                                        |Address End                                                                                                                                     |Speed Average|Speed Max|
+-------+-------------------+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------

In [137]:
# Export the final result to CSV
final_result.orderBy(['trip_start_dttm', 'trip_id']).coalesce(1).write.csv('content/result.csv')