## Map Partitions Test - Update Completeness
* https://github.com/cal-itp/data-analyses/blob/main/rt_segment_speeds/scripts/nearest_vp_to_stop.py
* The functions should all start from `vp_usable`
* cd rt_segment_speeds && pip install -r requirements.txt && cd

In [1]:
import datetime
import dask.dataframe as dd
import dask_geopandas as dg
import dask
import geopandas as gpd
import pandas as pd
from scripts import vp_spatial_accuracy
from segment_speed_utils import helpers
from calitp_data_analysis.geography_utils import WGS84
from segment_speed_utils.project_vars import (
    PROJECT_CRS,
    SEGMENT_GCS,
    analysis_date,
    GCS_FILE_PATH,
    COMPILED_CACHED_VIEWS,
    RT_SCHED_GCS,
    CONFIG_PATH
)

from typing import Literal
import numpy as np

from shared_utils.rt_utils import MPH_PER_MPS
from calitp_data_analysis import utils

# cd rt_segment_speeds && pip install -r requirements.txt && cd
from shared_utils import portfolio_utils, schedule_rt_utils
from segment_speed_utils import helpers, sched_rt_utils, wrangle_shapes, segment_calcs

In [2]:
# Times
import datetime
from loguru import logger

In [3]:
pd.options.display.max_columns = 100
pd.options.display.float_format = "{:.2f}".format
pd.set_option("display.max_rows", None)
pd.set_option("display.max_colwidth", None)

In [4]:
# 14,514,960 rows
vp_usable= dd.read_parquet(
      f"{SEGMENT_GCS}vp_usable_{analysis_date}"
)

In [5]:
gtfs_keys = ["7cc0cb1871dfd558f11a2885c145d144",
             "d2b09fbd392b28d767c28ea26529b0cd"]

In [None]:
# Test a subset
# vp_usable_subset = vp_usable.loc[vp_usable.schedule_gtfs_dataset_key.isin(gtfs_keys)]

### % of total trip time with 2 pings per minute
* Takes 1:23 secs
* Counting how many rows appear per minute by `trip instance key` to figure out how many gtfs pings occur.

In [None]:
def two_pings_per_min(vp_usable_df:pd.DataFrame) -> pd.DataFrame:
    
    # Find number of pings each minute
    df = (
        vp_usable_df.groupby(
            ["trip_instance_key",
                pd.Grouper(key="location_timestamp_local", freq="1Min"),
            ]
        )
        .vp_idx.count()
        .reset_index()
        .rename(columns={"vp_idx": "number_of_pings_per_minute"})
    )
    
    # Determine which rows have 2+ pings per minute
    df = df.assign(
        minutes_w_atleast2_trip_updates= df.apply(
            lambda x: 1 if x.number_of_pings_per_minute >= 2 else 0, axis=1
        )
    )
    
    # Create max time col
    df["max_time"] = df.location_timestamp_local
    
    # Find the min time for each trip and sum up total min with at least 2 pings per min
    df = (
        df.groupby(["trip_instance_key"])
        .agg(
            {
                "location_timestamp_local": "min",
                "max_time": "max",
                "minutes_w_atleast2_trip_updates": "sum",
                "number_of_pings_per_minute":"count"
            }
        )
        .reset_index()
        .rename(columns={"location_timestamp_local": "min_time",
                         "number_of_pings_per_minute":"total_minute_w_gtfs"})
    )
    
    # Find total trip time and add an extra minute
    df["total_trip_time"] = (df.max_time - df.min_time) / pd.Timedelta(minutes=1) + 1
    
    df = df.drop(columns = ['min_time','max_time'])
    return df

In [None]:
# df1 = two_pings_per_min(vp_usable_pd)

In [None]:
# df1.info()

In [None]:
len(vp_usable)

In [None]:
start = datetime.datetime.now()
print(start)
partitions_test1 = vp_usable.map_partitions(
       two_pings_per_min,
        meta = {'trip_instance_key':'object', 
                'minutes_w_atleast2_trip_updates':'int64', 
                'total_minute_w_gtfs':'int64',
                'total_trip_time':'float64',},
        align_dataframes = False
    ).persist()

end = datetime.datetime.now()
logger.info(f"execution time: {end-start}")

In [None]:
type(partitions_test1)

#### Look at one trip

In [None]:
# df1.loc[df1.trip_instance_key == "00068c2e2316950af50ffaa9584c7a46"]

In [None]:
# df2.loc[df2.trip_instance_key ==  "00068c2e2316950af50ffaa9584c7a46"]

### Density: on average, how many pings occur per minute
* Takes 34 secs
* Double check this

In [None]:
def density_pings_5_min(vp_usable_df:pd.DataFrame) -> pd.DataFrame:
    
    # Count number of pings per 5 minutes
    df = (
        vp_usable_df.groupby(
            [
                *["trip_instance_key"],
                pd.Grouper(key="location_timestamp_local", freq="5Min"),
            ]
        )
        .vp_idx.count()
        .reset_index()
    )
    
    # Find median of pings per 5 minutes for each trip
    df = (
        df.groupby(["trip_instance_key"])
        .agg({"vp_idx": "median"})
        .reset_index()
        .rename(columns = {'vp_idx':'median_pings_per_5_min'})
    )
    
    # Divide by 5
    # df.median_pings_per_5_min = df.median_pings_per_5_min/5
    
    return df

In [None]:
# df2 = density_pings_5_min(vp_usable_pd)

In [None]:
# df2.info()

In [None]:
start = datetime.datetime.now()
print(start)
partitions_test2 = vp_usable.map_partitions(
       density_pings_5_min,
        meta = {'trip_instance_key':'object', 
                'median_pings_per_5_min':'float64'},
        align_dataframes = False
    ).persist()

end = datetime.datetime.now()
logger.info(f"execution time: {end-start}")

In [None]:
# len(partitions_test2)

In [None]:
# len(partitions_test1)

In [None]:
partitions_test1 = partitions_test1.compute()

In [None]:
partitions_test1.sample(3)

In [None]:
partitions_test2 = partitions_test2.compute()

In [None]:
partitions_test2.head(1)

In [None]:
# len(update_completeness)

In [None]:
# update_completeness.trip_instance_key.nunique().compute()

### Spatial Accuracy
* Do I use shapes or trips_with_shape?

#### Test to see difference between `shapes` and `trips_with_shape`

In [6]:
 shapes = (
        pd.read_parquet(
            f"{COMPILED_CACHED_VIEWS}routelines_{analysis_date}.parquet",
            columns=["shape_array_key"],
        )
        .dropna()
        .drop_duplicates()
    )


In [7]:
trips_with_shape = (
        helpers.import_scheduled_trips(
            analysis_date,
            columns=["trip_instance_key", "shape_array_key"],
            get_pandas=True,
        ))

In [8]:
trips_with_shape_shapes = set(trips_with_shape.shape_array_key.unique().tolist())
shapes_shapes = set(shapes.shape_array_key.unique().tolist())
trips_with_shape_shapes - shapes_shapes

{None}

In [9]:
shapes_shapes - trips_with_shape_shapes 

set()

In [10]:
def grab_shape_keys_in_vp(vp_usable: dd.DataFrame, analysis_date: str) -> pd.DataFrame:
    """
    Subset raw vp and find unique trip_instance_keys.
    Create crosswalk to link trip_instance_key to shape_array_key.
    """
    vp_usable = (vp_usable[['trip_instance_key']]
                 .drop_duplicates()
                 .reset_index(drop=True)
                )

    trips_with_shape = (
        helpers.import_scheduled_trips(
            analysis_date,
            columns=["trip_instance_key", "shape_array_key"],
            get_pandas=False,
        )
    )
    
    # Only one row per trip/shape
    # trip_instance_key and shape_array_key are the only 2 cols left
    m1 = dd.merge(vp_usable, trips_with_shape, on = "trip_instance_key", how = "inner")
    m1 = m1.compute() 
    
    return m1

In [11]:
# 1 minute
start = datetime.datetime.now()
print(start)
spatial_df1 = grab_shape_keys_in_vp(vp_usable, analysis_date)
end = datetime.datetime.now()
logger.info(f"execution time: {end-start}")

2023-12-05 11:01:58.031496


2023-12-05 11:02:52.197 | INFO     | __main__:<module>:5 - execution time: 0:00:54.165745


In [12]:
type(spatial_df1)

pandas.core.frame.DataFrame

In [13]:
len(spatial_df1)

77091

In [14]:
# spatial_df1_pd = spatial_df1.compute()

In [15]:
# spatial_df1_pd.sample()

In [16]:
# len(spatial_df1_pd), spatial_df1_pd.trip_instance_key.nunique()

In [17]:
def buffer_shapes2(
    trips_with_shape: pd.DataFrame,
    analysis_date: str,
    buffer_meters: int = 35,
):
    """
    Filter scheduled shapes down to the shapes that appear in vp.
    Buffer these.
    
    Attach the shape geometry for a subset of shapes or trips.
    """
    subset = trips_with_shape.shape_array_key.unique().tolist()
    
    shapes = helpers.import_scheduled_shapes(
        analysis_date,
        columns = ["shape_array_key", "geometry"],
        filters = [[("shape_array_key", "in", subset)]],
        crs = PROJECT_CRS,
        get_pandas = True
    )
    
    # to_crs takes awhile, so do a filtering on only shapes we need
    shapes = shapes.assign(
        geometry = shapes.geometry.buffer(buffer_meters)
    )
    
    trips_with_shape_geom = pd.merge(
        shapes,
        trips_with_shape,
        on = "shape_array_key",
        how = "inner"
    )
    
    return trips_with_shape_geom


In [18]:
start = datetime.datetime.now()
print(start)
spatial_df2 = buffer_shapes2(spatial_df1,
       analysis_date,
       35) 
end = datetime.datetime.now()
logger.info(f"execution time: {end-start}")

2023-12-05 11:02:52.511491


2023-12-05 11:03:10.236 | INFO     | __main__:<module>:7 - execution time: 0:00:17.724454


In [19]:
type(spatial_df2)

geopandas.geodataframe.GeoDataFrame

In [20]:
len(spatial_df2)

77091

In [21]:
spatial_df2.trip_instance_key.nunique()

77091

#### Redo `merge_vp_with_shape_and_count` because it takes super long

In [22]:
type(vp_usable)

dask.dataframe.core.DataFrame

In [23]:
#vp_gdf = gpd.GeoDataFrame(
#            vp_usable_subset, geometry=gpd.points_from_xy(vp_usable_subset.x, vp_usable_subset.y), crs=WGS84
#    ).to_crs(PROJECT_CRS)

In [24]:
def vp_usable_to_gdf(vp_usable: dd.DataFrame)-> gpd.GeoDataFrame:
    
    keep = ['trip_instance_key','x','y','location_timestamp_local']
    vp_usable = vp_usable[keep]
    
    vp_gdf = gpd.GeoDataFrame(
        vp_usable, geometry=gpd.points_from_xy(vp_usable.x, vp_usable.y), crs=WGS84
    ).to_crs(PROJECT_CRS)
    
    vp_gdf = vp_gdf.rename(
        columns={
            0: "trip_instance_key",
            3: "location_timestamp_local",
          }
    )
    
    vp_gdf = vp_gdf[["trip_instance_key","location_timestamp_local",'geometry']]
  
    return vp_gdf

In [25]:
# test = vp_usable_to_gdf(vp_usable_subset, spatial_df2)

In [26]:
start = datetime.datetime.now()
print(start)
spatial_df3 = vp_usable.map_partitions(
        vp_usable_to_gdf,
        meta = {'trip_instance_key':'object', 
                'location_timestamp_local':'datetime64[ns]', 
                'geometry':'geometry'},
        align_dataframes = False
    ).persist()
end = datetime.datetime.now()
logger.info(f"execution time: {end-start}")

2023-12-05 11:03:10.447693


2023-12-05 11:04:04.064 | INFO     | __main__:<module>:11 - execution time: 0:00:53.617093


In [27]:
type(spatial_df3)

dask.dataframe.core.DataFrame

In [35]:
type(spatial_df2)

geopandas.geodataframe.GeoDataFrame

In [36]:
def merge_vp_shapes(vp_gdf:dd.DataFrame, buffered_gdf:gpd.GeoDataFrame):
    vp_gdf = vp_gdf.compute()
    buffered_gdf = buffered_gdf.set_geometry('geometry')
    buffered_gdf = buffered_gdf.set_crs(PROJECT_CRS)
    
    m1 = pd.merge(
        vp_gdf, buffered_gdf, on="trip_instance_key", how="inner"
    )
    
    
    return m1

In [37]:
start = datetime.datetime.now()
print(start)
spatial_df4 = merge_vp_shapes(spatial_df3, spatial_df2)
end = datetime.datetime.now()
logger.info(f"execution time: {end-start}")

2023-12-05 11:05:35.158774


2023-12-05 11:05:41.275 | INFO     | __main__:<module>:5 - execution time: 0:00:06.117071


In [38]:
type(spatial_df4)

geopandas.geodataframe.GeoDataFrame

In [77]:
def total_counts(gdf: gpd.GeoDataFrame) -> pd.DataFrame:
    
    # Count vps in the shape
    vp2 = gdf.assign(is_within=gdf.geometry_x.within(gdf.geometry_y)).query(
        "is_within==True"
    )
   
    vps_in_shape = (
        vp2.groupby("trip_instance_key", observed=True, group_keys=False)
        .agg({"location_timestamp_local": "count"})
        .reset_index()
        .rename(columns={"location_timestamp_local": "vp_in_shape"})
    )
    
    
    # Count total vps for the trip 
    total_vp = vp_spatial_accuracy.total_vp_counts_by_trip(gdf)
    
    count_df = pd.merge(total_vp, vps_in_shape, on="trip_instance_key", how="left")
    
    count_df = count_df.assign(
        vp_in_shape=count_df.vp_in_shape.fillna(0).astype("int32"),
        total_vp=count_df.total_vp.fillna(0).astype("int32"),
    )

    return vp2

In [94]:
type(vp_usable)

dask.dataframe.core.DataFrame

In [95]:
vp_usable_subset = vp_usable.head(1000)

In [None]:
spatial_df4_subset = spatial_df4.head()

In [84]:

start = datetime.datetime.now()
print(start)
spatial_df5 = total_counts(spatial_df4_subset)
end = datetime.datetime.now()
logger.info(f"execution time: {end-start}")

2023-12-05 11:42:30.775 | INFO     | __main__:<module>:5 - execution time: 0:00:00.024276


2023-12-05 11:42:30.751499


In [85]:
spatial_df5.head()

Unnamed: 0,trip_instance_key,location_timestamp_local,geometry_x,shape_array_key,geometry_y,is_within


In [78]:
"""
start = datetime.datetime.now()
print(start)
spatial_df5 = vp_usable_subset.map_partitions(
        total_counts,
        meta = {'trip_instance_key':'object', 
                'total_vp':'int32', 
                'vp_in_shape':'int32'},
        align_dataframes = False
    ).persist()
end = datetime.datetime.now()
logger.info(f"execution time: {end-start}")
"""

'\nstart = datetime.datetime.now()\nprint(start)\nspatial_df5 = vp_usable_subset.map_partitions(\n        total_counts,\n        meta = {\'trip_instance_key\':\'object\', \n                \'total_vp\':\'int32\', \n                \'vp_in_shape\':\'int32\'},\n        align_dataframes = False\n    ).persist()\nend = datetime.datetime.now()\nlogger.info(f"execution time: {end-start}")\n'

In [71]:
spatial_df4_dask = dd.from_pandas(spatial_df4, npartitions=1)

In [72]:
len(spatial_df4_dask)

13068735

In [102]:
vp_usable2 = (vp_usable[['trip_instance_key']]
                 .drop_duplicates()
                 .reset_index(drop=True)
                )

In [104]:
trips_with_shape = (
        helpers.import_scheduled_trips(
            analysis_date,
            columns=["trip_instance_key", "shape_array_key"],
            get_pandas=False,
        )
    )

In [105]:
m1 = dd.merge(vp_usable2, trips_with_shape, on = "trip_instance_key", how = "inner")

In [107]:
type(m1)

dask.dataframe.core.DataFrame

In [108]:
subset = m1.shape_array_key.unique()

In [109]:
type(subset)

dask.dataframe.core.Series

In [110]:
subset = subset.compute().tolist()

In [112]:
shapes = helpers.import_scheduled_shapes(
        analysis_date,
        columns = ["shape_array_key", "geometry"],
        filters = [[("shape_array_key", "in", subset)]],
        crs = PROJECT_CRS,
        get_pandas = True
    )

In [113]:
def spatial_accuracy_test(vp_usable: dd.DataFrame, analysis_date: str, buffer_meters:int = 35):
    vp_usable2 = (vp_usable[['trip_instance_key']]
                 .drop_duplicates()
                 .reset_index(drop=True)
                )

    trips_with_shape = (
        helpers.import_scheduled_trips(
            analysis_date,
            columns=["trip_instance_key", "shape_array_key"],
            get_pandas=False,
        )
    )
    
    # Only one row per trip/shape
    # trip_instance_key and shape_array_key are the only 2 cols left
    m1 = dd.merge(vp_usable2, trips_with_shape, on = "trip_instance_key", how = "inner")
    
    subset = m1.shape_array_key.unique()
    subset = subset.compute().tolist()
    
    shapes = helpers.import_scheduled_shapes(
        analysis_date,
        columns = ["shape_array_key", "geometry"],
        filters = [[("shape_array_key", "in", subset)]],
        crs = PROJECT_CRS,
        get_pandas = True
    )
    
    # to_crs takes awhile, so do a filtering on only shapes we need
    shapes = shapes.assign(
        geometry = shapes.geometry.buffer(buffer_meters)
    )
    
    trips_with_shape_geom = pd.merge(
        shapes,
        m1,
        on = "shape_array_key",
        how = "inner"
    )
    
    keep = ['trip_instance_key','x','y','location_timestamp_local']
    vp_usable3 = vp_usable[keep]
    
    vp_gdf = gpd.GeoDataFrame(
        vp_usable3, geometry=gpd.points_from_xy(vp_usable3.x, vp_usable3.y), crs=WGS84
    ).to_crs(PROJECT_CRS)
    
    vp_gdf = vp_gdf.rename(
        columns={
            0: "trip_instance_key",
            3: "location_timestamp_local",
          }
    )
    
    vp_gdf = vp_gdf[["trip_instance_key","location_timestamp_local",'geometry']]
    
    trips_with_shape_geom = trips_with_shape_geom.set_geometry('geometry')
    trips_with_shape_geom = trips_with_shape_geom.set_crs(PROJECT_CRS)
    
    m2 = pd.merge(
        vp_gdf, buffered_gdf, on="trip_instance_key", how="inner"
    )
    
    vp2 = m2.assign(is_within=m2.geometry_x.within(m2.geometry_y)).query(
        "is_within==True"
    )
    
    vps_in_shape = (
        vp2.groupby("trip_instance_key", observed=True, group_keys=False)
        .agg({"location_timestamp_local": "count"})
        .reset_index()
        .rename(columns={"location_timestamp_local": "vp_in_shape"})
    )
    
    
    # Count total vps for the trip 
    total_vp = vp_spatial_accuracy.total_vp_counts_by_trip(m2)
    
    count_df = pd.merge(total_vp, vps_in_shape, on="trip_instance_key", how="left")
    
    count_df = count_df.assign(
        vp_in_shape=count_df.vp_in_shape.fillna(0).astype("int32"),
        total_vp=count_df.total_vp.fillna(0).astype("int32"),
    )

    return count_df

In [114]:
start = datetime.datetime.now()
print(start)
test = vp_usable.map_partitions(
        spatial_accuracy_test,
        analysis_date, 
        35,
        meta = {'trip_instance_key':'object', 
                'total_vp':'int32', 
                'vp_in_shape':'int32'},
        align_dataframes = False
    ).persist()
end = datetime.datetime.now()
logger.info(f"execution time: {end-start}")

2023-12-05 12:02:12.920052


ArrowTypeError: Array type doesn't match type of values set: string vs null