In [1]:
import datetime
import _speed_utils as speed_utils
import _threshold_utils as threshold_utils
import altair as alt
import dask.dataframe as dd
import geopandas as gpd
import pandas as pd
from segment_speed_utils import gtfs_schedule_wrangling, helpers, segment_calcs
from segment_speed_utils.project_vars import (
    COMPILED_CACHED_VIEWS,
    PROJECT_CRS,
    SEGMENT_GCS,
    analysis_date,
    CONFIG_PATH
)
from scripts import A1_sjoin_vp_segments
from shared_utils import calitp_color_palette as cp


import os
os.environ['USE_PYGEOS'] = '0'
import geopandas

In a future release, GeoPandas will switch to using Shapely by default. If you are using PyGEOS directly (calling PyGEOS functions on geometries from GeoPandas), this will then stop working and you are encouraged to migrate from PyGEOS to Shapely 2.0 (https://shapely.readthedocs.io/en/latest/migration_pygeos.html).
  import geopandas as gpd


In [2]:
pd.options.display.max_columns = 100
pd.options.display.float_format = "{:.2f}".format
pd.set_option("display.max_rows", None)
pd.set_option("display.max_colwidth", None)

In [3]:
# alt.data_transformers.disable_max_rows()

### Merging

In [4]:
def merge_all_speeds(analysis_date:str) -> pd.DataFrame:
    """
    Merge avg_speeds_stop_segments and
    speed_stops parquets.
    
    Args:
        date: analysis date
    """
    # Open up avg speeds
    avg_speeds = pd.read_parquet(f"{SEGMENT_GCS}avg_speeds_stop_segments_{analysis_date}.parquet")
    avg_speeds = avg_speeds.drop(columns=["geometry", "district", "district_name"])
    # Filter  for all day flags
    avg_speeds = avg_speeds[avg_speeds.time_of_day == 'all_day'].reset_index(drop = True)
    
    # Open up speeds
    speeds = pd.read_parquet(f"{SEGMENT_GCS}speeds_stop_segments_{analysis_date}")
    
    merge_cols = ['gtfs_dataset_key','shape_array_key', 'stop_sequence']
    m1 = pd.merge(avg_speeds, speeds, on = merge_cols, how = 'inner')
    
    m1 = m1.drop_duplicates().reset_index(drop = True)
    
    return m1

In [5]:
m1 = merge_all_speeds(analysis_date)

In [6]:
# m1.shape

In [7]:

# Picked 4 random routes
sample_0_keys = [
    "0fb4f3627996269dc7075276d3b69e36",
    "07c9a47264a43d8d0d16ef7109e8fd68",
    "106d979b9a9e6338827a8e1c145e69fd",
    "000624bd8453dbe4f2eb2765b04bcb98",
]

### Categorize

In [8]:
def categorize_by_percentile_pandas(
    df: pd.DataFrame, column_percentile: str, column_str: str
) -> pd.DataFrame:

    # Find percentiles
    p5 = df[column_percentile].quantile(0.05).astype(float)
    p95 = df[column_percentile].quantile(0.95).astype(float)
    
    def rate(row):
        if ((row[column_percentile] >= 0) and (row[column_percentile] <= p5)):
            return f"{column_str} is low"
        elif (row[column_percentile] >= p95):
               return f"{column_str} is high"
        else:
            return f"{column_str} is avg"
    
    # Apply flags
    df[f"{column_str}cat"] = df.apply(lambda x: rate(x), axis=1)
    
    # Clean
    df[f"{column_str}cat"] = df[f"{column_str}cat"].str.replace("_", "")

    print(f"Done with {column_str}")
    
    return df  

In [9]:
# df1 = categorize_by_percentile_pandas(subset, "meters_elapsed", "meters_")

In [10]:
# df1.head()

In [11]:
# df2 = categorize_by_percentile_pandas(df1, "sec_elapsed", "sec_")

In [12]:
# df2.head()

In [13]:
def categorize_meters_speeds_pandas(df)-> pd.DataFrame:
    start = datetime.datetime.now()
    print(start)
    
    #df = merge_all_speeds(analysis_date)
    
    # Categorize
    df1 = categorize_by_percentile_pandas(df, "meters_elapsed", "meters_")
    df2 = categorize_by_percentile_pandas(df1, "sec_elapsed", "sec_")
  
    # Find size of categories
    print(df2.groupby(['sec_cat','meters_cat']).size())

    # Filter out for only meters that are low or seconds that are high
    df2 = df2[(df2.meters_cat == 'meters is low') | (df2.sec_cat == 'sec is high')].reset_index(drop = True)
    print(f"{len(df2)} rows left after filtering for rows with either high seconds OR low meters") 
    
    def flag_round(row):
        if (row["meters_elapsed"] == 0) & (row["sec_elapsed"] == 0):
            return "division by 0"
        elif row["meters_cat"] == "meters is low":
            return "meters too low"
        elif row["sec_cat"] == "sec is high":
            return "seconds too high"
        else:
            return "ok"
        
    df2["flag"] = df2.apply(lambda x: flag_round(x), axis=1)
    print(df2.flag.value_counts())
    
    # Filter out for only division by 0 
    df3 = df2[(df2.flag == 'division by 0')].reset_index(drop = True)
    
    end = datetime.datetime.now()
    print(f"Took {end-start}")
    return df3

In [14]:
subset = m1[m1.shape_array_key.isin(sample_0_keys)].reset_index()

In [15]:
m2 = categorize_meters_speeds_pandas(subset)

2023-06-29 11:20:08.772300
Done with meters_
Done with sec_
sec_cat      meters_cat    
sec is avg   meters is avg     1829
             meters is high     110
             meters is low       22
sec is high  meters is avg       63
             meters is high      40
             meters is low       47
sec is low   meters is low      850
dtype: int64
1022 rows left after filtering for rows with either high seconds OR low meters
division by 0       850
seconds too high    103
meters too low       69
Name: flag, dtype: int64
Took 0:00:00.217325


In [16]:
m2.flag.value_counts()

division by 0    850
Name: flag, dtype: int64

In [17]:
len(m1)-len(m2)

3075512

In [18]:
len(m2)

850

In [19]:
m2.trip_id.nunique(), m1.trip_id.nunique()

(73, 72067)

In [20]:
m2.shape_array_key.nunique(), m1.shape_array_key.nunique()

(4, 4837)

In [21]:
m2._gtfs_dataset_name.nunique(), m1._gtfs_dataset_name.nunique()

(3, 76)

In [22]:
m2.groupby(["loop_or_inlining"]).agg({"shape_array_key": "nunique"})

Unnamed: 0_level_0,shape_array_key
loop_or_inlining,Unnamed: 1_level_1
0,4


#### See how many trips for a shape ID have problematic rows


In [23]:
# Number of trips that have at least one row that was divided by 0 
# for this shape array key
df1 = m2.groupby(['shape_array_key']).agg({'trip_id':'nunique'}).rename(columns = {'trip_id':'trips_with_zero'}).reset_index()

In [24]:
# Original number of trips
df2 = m1.groupby(['shape_array_key']).agg({'trip_id':'nunique'}).rename(columns = {'trip_id':'all_trips'}).reset_index()

In [25]:
df3 = pd.merge(df1, df2, how = "inner", on = 'shape_array_key')

In [26]:
df3['percent_of_trips_with_problematic_rows'] = df3.trips_with_zero/df3.all_trips * 100

In [27]:
df3['percent_of_trips_with_problematic_rows'].describe()

count     4.00
mean    100.00
std       0.00
min     100.00
25%     100.00
50%     100.00
75%     100.00
max     100.00
Name: percent_of_trips_with_problematic_rows, dtype: float64

In [28]:
# df3.sample(5)

### Investigate 
#### Stage3: "vp_pared_stops"/A3_loop_inlining
* Rewrite this part to filter read_parquet with the shape array and whatnot

In [29]:
def load_vp_stage3(flagged_df:pd.DataFrame, date:str) -> pd.DataFrame:
    
    # Subset the dataframe and use it to filter out for only the values of interest
    shape_array_keys = flagged_df.shape_array_key.unique().tolist()
    stop_seq = flagged_df.stop_sequence.unique().tolist() 
    trip_id = flagged_df.trip_id.unique().tolist() 
    gtfs_dataset_key = flagged_df.gtfs_dataset_key.unique().tolist() 
    
    #flagged_df = flagged_df[['gtfs_dataset_key', 'trip_id','stop_sequence','shape_array_key']]
    vp = pd.read_parquet(f"{SEGMENT_GCS}vp_pared_stops_{date}",
        filters = [[('shape_array_key', "in", shape_array_keys),
                   ('stop_sequence', 'in', stop_seq), 
                   ('trip_id', 'in', trip_id), 
                   ('gtfs_dataset_key', 'in', gtfs_dataset_key)]],)
    
    # Merge to filter
    vp2 = pd.merge(flagged_df, vp, how = "inner", on = ['gtfs_dataset_key', 'trip_id','stop_sequence','shape_array_key'])
    
    return vp2

In [30]:
vp2 = load_vp_stage3(subset, analysis_date)

In [31]:
# vp = pd.read_parquet(f"{SEGMENT_GCS}vp_pared_stops_{analysis_date}")

In [32]:
# Check out stop sequences for the trip below that have division by 0
# subset[subset.trip_id == "1088383"].stop_sequence.unique()

In [33]:
# Stop sequences that were flagged as division by 0
# vp2[vp2.trip_id == "1088383"].sort_values(['trip_id', 'stop_sequence','location_timestamp_local'])

In [34]:
# All the stop sequences for this trip, even those that are ok
# vp_pared[vp_pared.trip_id == "1088383"].sort_values(['trip_id', 'stop_sequence','location_timestamp_local'])

In [35]:
# All the stop sequences for this trip, even those that are ok
# vp_pared[vp_pared.trip_id == "1088383"].sort_values(['location_timestamp_local','stop_sequence',])

In [36]:
def stage3_repeated_timestamps(stage3_df:pd.DataFrame)-> pd.DataFrame:
    """
    Look at how many times a time stamp is repeated a route-trip-location.
    Each of these 3 combos should have a different time for each 
    stop sequence or else the vehicle is not changing locations.
    """
    agg = (stage3_df
     .groupby(['shape_array_key','trip_id', 'location_timestamp_local'])
     .agg({'stop_sequence':'nunique'})
     .reset_index()
     .rename(columns = {'stop_sequence':'number_of_repeated_timestamps'})
    )
    
    # Only keep timestamps that are repeated more than once
    agg = (agg[agg.number_of_repeated_timestamps > 1]).reset_index(drop = True)

    return agg

In [37]:
def stage3_repeated_locations(stage3_df:pd.DataFrame):
    """
    Look at how many times a time stamp is repeated for a stop-trip-route combo.
    Each of these 3 combos should have a different location for each 
    stop sequence or else the vehicle is not changing locations.
    """
    # Concat x and y into a string
    stage3_df['pair'] = stage3_df.x.astype(str) + '/' + vp2.y.astype(str)
    
    # Count number of different stops that reference the same location
    agg = (stage3_df
     .groupby(['shape_array_key','trip_id','pair'])
     .agg({'stop_sequence':'nunique'})
     .reset_index()
     .sort_values('stop_sequence', ascending = False)
     .rename(columns = {'stop_sequence':'number_of_repeated_locs'})               
    )

    # Only keep locations that are repeated more than once
    agg = agg[agg.number_of_repeated_locs != 1].reset_index(drop = True)
    
    return agg

In [38]:
def flag_stage3(flagged_df:pd.DataFrame, date:str) -> pd.DataFrame:
    """
    Flag the errors in stage3
    """
    start = datetime.datetime.now()
    print(start)
    
    # Relevant rows from Vehicle Positions
    vp = load_vp_stage3(flagged_df, date)
    
    # Find repeated timestamps.
    multi_timestamps = stage3_repeated_timestamps(vp)
    
    # Find repeated locations
    multi_locs = stage3_repeated_locations(vp)
    
    # Merge
    timestamps_merge_cols = ['shape_array_key','trip_id','location_timestamp_local']
    loc_merge_cols =  ['shape_array_key','trip_id','pair']
    
    # Want everything found in vehicle positions, so do left merges
    m1 = (vp
          .merge(multi_timestamps, how="left", on= timestamps_merge_cols)
          .merge(multi_locs, how="left", on=loc_merge_cols)
         )
    
    drop_cols = ['vp_idx','x','y','hour','activity_date',]
    m1 = m1.drop(columns = drop_cols)
    
    # Flag
    def flag(row):
        if (row["number_of_repeated_timestamps"] > 1) & (row["number_of_repeated_locs"] > 1):
            return "repeated timestamps & locations"
        elif (row["number_of_repeated_timestamps"] > 1):
            return "repeated timestamps"
        elif (row["number_of_repeated_locs"] > 1):
            return "repeated locations"
        else:
            return "check in stage 2"
        
    m1["stage3_flag"] = m1.apply(lambda x: flag(x), axis=1)
    
    print(m1.stage3_flag.value_counts())
    
    check_in_stage2 = m1[m1.stage3_flag == "check in stage 2"]
    print(f"Have to check {len(check_in_stage2)/len(m1) * 100} % of rows in stage 2")
    
    end = datetime.datetime.now()
    print(f"Took {end-start}")
    return m1

In [39]:
m3 = flag_stage3(m2, analysis_date)

2023-06-29 11:20:18.037861
check in stage 2                   1503
repeated timestamps                 149
repeated timestamps & locations      27
repeated locations                   21
Name: stage3_flag, dtype: int64
Have to check 88.41176470588236 % of rows in stage 2
Took 0:00:05.668398


In [40]:
m3 = m3[m3.stage3_flag == "check in stage 2"]

In [41]:
m3.shape

(1503, 30)

In [42]:
sort_cols = ['trip_id', 'shape_array_key', 'stop_sequence']

#### Stage2: "vp_stop_segment"/A1_sjoin_vp_segments


In [43]:
# Select one route to look at
test_route = "106d979b9a9e6338827a8e1c145e69fd"

In [44]:
test_sequence = 39

In [45]:
test_gtfs_key = "db56b50ab86b5f7a4ae2fc2dd9889bbe"

In [46]:
test_trip = '1088405'

#### Look at export  file

In [47]:
def import_stage_2(date:str, route:str, stop_sequence:str):
    df = pd.read_parquet(
            f"{SEGMENT_GCS}vp_sjoin/vp_stop_segment_{date}",
            filters = [[('shape_array_key', "==", route),
                       ('stop_sequence', "==", stop_sequence)]],
        )
    return df

In [48]:
# stg2 = import_stage_2(analysis_date, test_route, test_sequence)

#### Look at vp trips -> import unique trips

In [49]:
def import_unique_trips(gtfs_key:str, trip: str, route:str):
    vp_trips = A1_sjoin_vp_segments.add_grouping_col_to_vp(
        f"vp_usable_{analysis_date}",
        analysis_date,
       ["shape_array_key"]
    )
    
    # Filter to just one trip/route/operator
    df = vp_trips[(vp_trips.gtfs_dataset_key == gtfs_key)
                    & (vp_trips.shape_array_key == route)
                    & (vp_trips.trip_id == trip)].reset_index(drop = True)
    return df


In [50]:
# unique_trips = import_unique_trips(test_gtfs_key, test_trip, test_route)

#### Look at vehicle positions

In [51]:
def import_vehicle_positions(unique_trips:pd.DataFrame, gtfs_key:str, trip_id:str)-> gpd.GeoDataFrame:
    vp = helpers.import_vehicle_positions(
            SEGMENT_GCS,
            f"vp_usable_{analysis_date}/",
            filters = [[("gtfs_dataset_key", "==", gtfs_key),
                      ('trip_id', '==', trip_id)]],
            columns = ["gtfs_dataset_key", "trip_id", 
                       "vp_idx", "x", "y"],
            partitioned = True
        )
    
    vp = vp.compute()
    vp = vp.merge(unique_trips, on = ["gtfs_dataset_key", "trip_id"],
            how = "inner"
        )
    
    vp_gdf = gpd.GeoDataFrame(
        vp, 
        geometry = gpd.points_from_xy(vp.x, vp.y, crs = "EPSG:4326")
    ).to_crs(PROJECT_CRS).drop(columns = ["x", "y"])
    
    return vp_gdf

In [52]:
#vehicle_positions = import_vehicle_positions(unique_trips, test_gtfs_key, test_trip)

In [53]:
#len(vehicle_positions)

#### Look at segments

In [54]:
def import_segments(flagged_df: pd.DataFrame, route:str, gtfs_key:str) -> gpd.GeoDataFrame:
    
    # Load in ALL segments, flag them.
    gdf = gpd.read_parquet(f"{SEGMENT_GCS}stop_segments_{analysis_date}.parquet",
                           filters = [[("shape_array_key", "==", route),
                                      ("gtfs_dataset_key", "==", gtfs_key),
                                     ]]).to_crs(PROJECT_CRS)
    
    gdf["geometry_buffered"] = gdf.geometry.buffer(35)
    gdf = gdf.set_geometry('geometry_buffered')
    
    # Distinguish between "correct" and "incorrect" seq
    # A sequence can be incorrect even if just one row is "divided by 0"
    incorrect_segments = flagged_df[(flagged_df.shape_array_key == route) & (flagged_df.gtfs_dataset_key == gtfs_key)]
    incorrect_segments_list = incorrect_segments.stop_sequence.unique().tolist()
    incorrect_segments_filtered = gdf[gdf.stop_sequence.isin(incorrect_segments_list)].reset_index(drop = True)
    incorrect_segments_filtered['flag'] = 'contains 0m/0sec'
    
    # Filter for correct segments
    correct_segments = flagged_df[~flagged_df.stop_sequence.isin(incorrect_segments_list)]
    correct_segments_list = correct_segments.stop_sequence.unique().tolist()
    correct_segments_filtered = gdf[gdf.stop_sequence.isin(correct_segments_list)].reset_index(drop = True)
    correct_segments_filtered['flag'] = 'does not contain 0m/0sec'
    
    final = pd.concat([correct_segments_filtered, incorrect_segments_filtered])
    
    return final

In [55]:
flagged_segments = import_segments(m3, test_route, test_gtfs_key)

In [56]:
#segments = A1_sjoin_vp_segments.import_segments_and_buffer(
 #   f"stop_segments_{analysis_date}",
#    35,
   # ["shape_array_key", "stop_sequence"]+ ["seg_idx", "geometry"]
#)

In [57]:
# segments = segments.compute()

#### Stops kept: last and first

In [58]:
def find_first_last_points(route:str, trip:str, gtfs_key:str):
    df = pd.read_parquet(f"{SEGMENT_GCS}vp_pared_stops_{analysis_date}",
        filters = [[('shape_array_key', "==", route),
                  
                   ('trip_id', "==", trip), 
                   ('gtfs_dataset_key', '==', gtfs_key)]],)
    
    gdf =  gpd.GeoDataFrame(
        df, 
        geometry = gpd.points_from_xy(df.x, df.y, crs = "EPSG:4326")
    ).to_crs(PROJECT_CRS).drop(columns = ["x", "y"])
    
    gdf = gdf[['geometry','stop_sequence']]
    
    return gdf

In [59]:
# first_last = find_first_last_points(test_route, test_trip, test_gtfs_key)

In [60]:
# len(first_last)

#### Sjoin 

In [61]:
def sjoin_vp_segments(segments: gpd.GeoDataFrame, vp_gdf: gpd.GeoDataFrame):
    vp_in_seg = gpd.sjoin(
        vp_gdf,
        segments,
        how = "inner",
        predicate = "within"
    )
    
    
    return vp_in_seg

#### Mapping

In [62]:
def display_maps(all_points: gpd.GeoDataFrame, 
                 first_last_points: gpd.GeoDataFrame,
                 segments: gpd.GeoDataFrame,
                 sjoin_results: gpd.GeoDataFrame):
    
    base1 = segments.explore('flag', cmap= 'tab10', height = 400, width = 600, name = 'segments')
    all_points_map = all_points.explore(m = base1, color = 'red',style_kwds = {'weight':6}, name= 'points')
    
    print('ALL POINTS')
    display(all_points_map) 
    
     
    # Right left geo
    sjoin_points = sjoin_results.set_geometry('geometry_left')
    sjoin_segments = sjoin_results.set_geometry('geometry_right')
    sjoin_segments.geometry_right = sjoin_segments.geometry_right.buffer(35)
    base3 = sjoin_segments.explore('flag', cmap= 'tab10', height = 400, width = 600, name = 'segments')
    sjoin_map = sjoin_points.explore(m = base3, color = 'orange',style_kwds = {'weight':6},  name= 'points')
    
    print('SJOIN')
    display(sjoin_map)
    
    base2 = segments.explore('flag', cmap= 'tab10', height = 400, width = 600, name = 'segments')
    first_last_map = first_last_points.explore(m = base2, color = 'pink',style_kwds = {'weight':6},height = 400, width = 600,)
    
    print('ALL FIRST AND LAST')
    display(first_last_map)
   

In [63]:
# display_maps(vehicle_positions,first_last,flagged_segments)

#### Function

In [64]:
def stage2_trouble_shooting(flagged_df:pd.DataFrame,
                            date:str, 
                            route:str, 
                            trip:str, 
                            gtfs_key:str):
    unique_trips = import_unique_trips(gtfs_key, trip, route)
    
    # Find all recorded vps
    vehicle_positions = import_vehicle_positions(unique_trips, gtfs_key, trip)
    
    # Flag segments, whether one row contains 1+ 0/0 division or not
    flagged_segments = import_segments(flagged_df, route, gtfs_key)
    
    # Find first and last pt kept
    first_last = find_first_last_points(route, trip, gtfs_key)
    
    # Sjoin 
    sjoin_results = sjoin_vp_segments(flagged_segments,vehicle_positions)
    
    # Display maps
    display_maps(vehicle_positions,first_last,flagged_segments,sjoin_results)
    

#### Example Trip 1

In [65]:
# subset[(subset.stop_sequence == test_sequence) & (subset.shape_array_key == test_route)]

In [66]:
stage2_trouble_shooting(flagged_df= m3,
                        date = analysis_date,
                        route = test_route,
                        trip = test_trip,
                        gtfs_key = test_gtfs_key)

ALL POINTS


SJOIN


ALL FIRST AND LAST


#### Example Trip 2

In [67]:
test_route2 = "0fb4f3627996269dc7075276d3b69e36"
test_gtfs_key2 = "a4f6fd5552107e05fe9743ac7cce2c55"
test_trip2 = "16939095"

In [68]:
#unique_trips = import_unique_trips(gtfs_key, trip, route)
    
#vehicle_positions = import_vehicle_positions(unique_trips, gtfs_key, trip)

In [69]:
stage2_trouble_shooting(flagged_df= m3,
                        date = analysis_date,
                        route = test_route2,
                        trip = test_trip2,
                        gtfs_key = test_gtfs_key2)

ALL POINTS


SJOIN


ALL FIRST AND LAST


#### Example Trip 3

In [70]:
test_route3 = "07c9a47264a43d8d0d16ef7109e8fd68"
test_gtfs_key3 = "db56b50ab86b5f7a4ae2fc2dd9889bbe"
test_trip3 = "1089348"

In [71]:
# subset[(subset.stop_sequence == 34) & (subset.shape_array_key == test_route3)]

In [81]:
stage2_trouble_shooting(flagged_df= m3,
                        date = analysis_date,
                        route = test_route3,
                        trip = test_trip3,
                        gtfs_key = test_gtfs_key3)

ALL POINTS


SJOIN


ALL FIRST AND LAST


ALL POINTS


SJOIN


ALL FIRST AND LAST


### Stage1: "vp_usable"

In [None]:
# What's the diff between stop segments normal/special/and without any notation?
usable = pd.read_parquet(f"{SEGMENT_GCS}vp_usable_{analysis_date}")

In [None]:
usable.sample()

In [None]:
subset_for_merge2 = subset_for_merge.drop(columns = ['stop_sequence','stop_id','meters_elapsed','sec_elapsed'])

In [None]:
m_cols2 = ['gtfs_dataset_key',
 'trip_id']

In [None]:
subset_for_merge2.head()

In [None]:
# m2[m2.trip_id == '1350']