In [1]:
import os
import sys
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

In [137]:
from pyspark.sql.functions import col, acos, cos, sin, lit, radians, lag, dense_rank, count_distinct, sum, first, collect_list, min, max, when, length, concat, unix_timestamp
from pyspark.sql import SparkSession, DataFrame, Column
from pyspark.sql.types import StructType, TimestampType, IntegerType
from pyspark.sql.window import Window

In [3]:
from schemas import agency, calendar_dates, stop_times, routes, trips, stops

In [4]:
def read_file(path: str, spark: SparkSession, schema: StructType) -> DataFrame:
    return spark.read.csv(path, schema=schema, header=True)

In [5]:
spark =  SparkSession.builder \
    .master("local") \
    .appName("Titsa Explorer") \
    .getOrCreate()

In [6]:
calendar_df = read_file('../data/calendar_dates.txt', spark, calendar_dates.schema)

In [7]:
stop_times_df = read_file('../data/stop_times.txt', spark, stop_times.schema)

In [8]:
stops_df = read_file('../data/stops.txt', spark, stops.schema)

In [9]:
trips_df = read_file('../data/trips.txt', spark, trips.schema)

In [10]:
routes_df = read_file('../data/routes.txt', spark, routes.schema)

In [11]:
active_services_df = calendar_df.filter(col(calendar_dates.date) == "20220318").select(calendar_dates.service_id)

In [163]:
routes_related_stops = stop_times_df\
.join(trips_df, [trips.trip_id])\
.join(active_services_df, [trips.service_id])\
.join(routes_df, [routes.route_id])\
.select(
    stop_times.trip_id,
    routes.route_short_name,
    stop_times.stop_id,
    stop_times.stop_sequence
).distinct()

In [13]:
def harvesine_distance(long_x, lat_x, long_y, lat_y):
    return acos(
        sin(radians(lat_x)) * sin(radians(lat_y)) + 
        cos(radians(lat_x)) * cos(radians(lat_y)) * 
            cos(radians(long_x) - radians(long_y))
    ) * lit(6371.0)

In [14]:
harvesine_dist = "harvesine_dist"
prev_stop_id = "prev_stop_id"
prev_stop_name = "prev_stop_name"

In [15]:
def calculate_distance_to_previous_stop(df: DataFrame) -> DataFrame:
    w = Window.partitionBy(stop_times.trip_id).orderBy(stop_times.stop_sequence)
    
    return df.withColumn(harvesine_dist, harvesine_distance(
        stops.stop_lon,
        stops.stop_lat,
        lag(stops.stop_lon, 1).over(w),
        lag(stops.stop_lat, 1).over(w)
    ))\
    .withColumn(prev_stop_id, lag(stops.stop_id, 1).over(w))\
    .withColumn(prev_stop_name, lag(stops.stop_name, 1).over(w))

In [16]:
ranked_results = calculate_distance_to_previous_stop(routes_related_stops\
.join(stops_df, [stops.stop_id]))\
.filter(col(stops.stop_id) != col(prev_stop_id))\
.filter(col(stops.stop_name) != col(prev_stop_name))\
.withColumn("rank", dense_rank().over(Window.partitionBy().orderBy(col(harvesine_dist).asc_nulls_last())))\
.drop(trips.trip_id)\
.distinct()\
.orderBy("rank")\
.cache() 

In [17]:
ranked_results.filter(col(routes.route_short_name) == "51").show()

+-------+----------------+-------------+--------------------+--------+--------+--------------------+-------------------+------------+--------------------+----+
|stop_id|route_short_name|stop_sequence|           stop_name|stop_lat|stop_lon|            stop_url|     harvesine_dist|prev_stop_id|      prev_stop_name|rank|
+-------+----------------+-------------+--------------------+--------+--------+--------------------+-------------------+------------+--------------------+----+
|   1268|              51|           55|PLAZA DE LA ESTAC...|  28.477|-16.4149|http://www.titsa....|0.09190668495685285|        1949|             CAPITOL|  66|
|   1933|              51|           38|             EL LOMO| 28.5209|-16.3843|http://www.titsa....|0.12539393267211138|        1932|                MOYA| 168|
|   1275|              51|           72|          EL PÚLPITO|   28.49|-16.3487|http://www.titsa....|0.16595716527513532|        2586|    TITSA LOS RODEOS| 439|
|   2156|              51|           14|

# Most lines on the same stop

In [18]:
stop_times_routes = stop_times_df\
.join(trips_df, [trips.trip_id])\
.join(active_services_df, [trips.service_id])\
.join(routes_df, [routes.route_id])\
.cache()

In [19]:
diff_routes = "diff_routes"

top_routes = stop_times_routes\
.groupBy(stops.stop_id)\
.agg(
    count_distinct(col(routes.route_id)).alias(diff_routes)
    )

In [20]:
top_routes\
.join(stops_df, [stops.stop_id])\
.select(
    stops.stop_id,
    stops.stop_name,
    diff_routes
)\
.orderBy(col(diff_routes).desc())\
.show(truncate = False)

+-------+-------------------------+-----------+
|stop_id|stop_name                |diff_routes|
+-------+-------------------------+-----------+
|9181   |INTERCAMBIADOR STA.CRUZ  |44         |
|2625   |INTERCAMBIADOR LAGUNA (T)|36         |
|9413   |MERIDIANO                |25         |
|9450   |INTERCAMBIADOR STA.CRUZ  |23         |
|2582   |COROMOTO (T)             |22         |
|2549   |LEOCADIO MACHADO         |22         |
|2692   |FRANCISCO SÁNCHEZ (T)    |21         |
|1290   |SAN BENITO               |19         |
|9213   |JOSÉ HERNÁNDEZ ALFONSO   |19         |
|9449   |INTERCAMBIADOR STA.CRUZ  |19         |
|9385   |EL CORTE INGLÉS          |18         |
|7140   |LOS CRISTIANOS  (T)      |18         |
|1723   |AVENIDA LA CANDELARIA    |18         |
|2409   |SAN ANTONIO              |18         |
|9387   |PARQUE DE BOMBEROS       |16         |
|9386   |TRES DE MAYO             |16         |
|7255   |GRANADILLA (T)           |15         |
|9296   |SAN SEBASTIÁN            |15   

In [21]:
top_routes\
.join(stops_df, [stops.stop_id])\
.select(
    stops.stop_id,
    stops.stop_name,
    diff_routes
)\
.groupBy(stops.stop_name)\
.agg(
    collect_list(stops.stop_id).alias(stops.stop_id),
    count_distinct(col(stops.stop_id)).alias("diff_stops")
)\
.orderBy(col("diff_stops").desc())\
.show(truncate = False)

+----------------+------------------------------------------------------------------------------------------------+----------+
|stop_name       |stop_id                                                                                         |diff_stops|
+----------------+------------------------------------------------------------------------------------------------+----------+
|CEMENTERIO      |[1137, 1141, 1204, 1225, 1376, 4074, 4124, 4926, 5027, 5029, 7076, 7095, 7256, 7362, 9105, 9106]|16        |
|CENTRO DE SALUD |[1219, 1883, 1924, 1928, 2587, 2789, 7257, 7361, 7364, 7382, 7455, 9409]                        |12        |
|EL PINO         |[1636, 1647, 2130, 2145, 2314, 2704, 4957, 7577, 7603, 7735, 7782]                              |11        |
|EL CALVARIO     |[1203, 1226, 1258, 1259, 4016, 4035, 4217, 4356, 4359, 4739]                                    |10        |
|EL MOLINO       |[1519, 1571, 1971, 1977, 2573, 2574, 4301, 4308, 4642]                                       

# Maximum Time Predicted for a Route

In [149]:
def fix_gtfs_time(df: DataFrame, cols: [str])-> DataFrame: 
    for col_name in cols:
        df = df\
        .withColumn(col_name + "_day", when(col(col_name) > '23:59:59', 1).otherwise(0))\
        .withColumn(
            col_name, 
            when(col(col_name) > '23:59:59', concat(col(col_name).substr(lit(1), lit(2)).cast(IntegerType()) - 24, col(col_name).substr(lit(3), length(col_name) - 2)))
            .otherwise(col(col_name))
                   )\
        .withColumn(col_name, col(col_name).cast(TimestampType()))\
        .withColumn(col_name, (unix_timestamp(col_name) + col(col_name + '_day') * 24 * 60 * 60).cast(TimestampType()))\
        .drop(col_name + "_day")
    return df

In [175]:
elapsed = "elapsed"
rank = "rank"

fix_gtfs_time(stop_times_df, [stop_times.arrival_time, stop_times.departure_time])\
.groupBy(stop_times.trip_id)\
.agg(
    (max(stop_times.departure_time) - min(stop_times.arrival_time)).alias(elapsed)
    )\
.join(trips_df, [trips.trip_id])\
.join(routes_df, [routes.route_id])\
.select(
    routes.route_short_name,
    elapsed,
)\
.distinct()\
.withColumn(rank, dense_rank().over(Window.orderBy(col(elapsed).desc())))\
.show(500,truncate=False)

+----------------+-----------------------------------+----+
|route_short_name|elapsed                            |rank|
+----------------+-----------------------------------+----+
|330             |INTERVAL '0 02:47:22' DAY TO SECOND|1   |
|330             |INTERVAL '0 02:44:55' DAY TO SECOND|2   |
|325             |INTERVAL '0 02:36:24' DAY TO SECOND|3   |
|343             |INTERVAL '0 02:28:52' DAY TO SECOND|4   |
|342             |INTERVAL '0 02:14:25' DAY TO SECOND|5   |
|108             |INTERVAL '0 02:13:36' DAY TO SECOND|6   |
|342             |INTERVAL '0 01:59:51' DAY TO SECOND|7   |
|343             |INTERVAL '0 01:56:35' DAY TO SECOND|8   |
|325             |INTERVAL '0 01:53:27' DAY TO SECOND|9   |
|34              |INTERVAL '0 01:52:27' DAY TO SECOND|10  |
|275             |INTERVAL '0 01:52:10' DAY TO SECOND|11  |
|39              |INTERVAL '0 01:51:16' DAY TO SECOND|12  |
|325             |INTERVAL '0 01:51:02' DAY TO SECOND|13  |
|460             |INTERVAL '0 01:50:38' 