diff --git a/_shared_utils/setup.py b/_shared_utils/setup.py index 96ac7b7102..7a3800d9f2 100644 --- a/_shared_utils/setup.py +++ b/_shared_utils/setup.py @@ -3,7 +3,7 @@ setup( name="shared_utils", packages=find_packages(), - version="2.2.1", + version="2.2.2", description="Shared utility functions for data analyses", author="Cal-ITP", license="Apache", diff --git a/_shared_utils/shared_utils/__init__.py b/_shared_utils/shared_utils/__init__.py index 4545e76502..ac36a8ec16 100644 --- a/_shared_utils/shared_utils/__init__.py +++ b/_shared_utils/shared_utils/__init__.py @@ -7,10 +7,10 @@ rt_dates, rt_utils, schedule_rt_utils, + utils_to_add, ) __all__ = [ - # "calitp_color_palette", "dask_utils", "geog_utils_to_add", "gtfs_utils", @@ -19,6 +19,5 @@ "schedule_rt_utils", "rt_dates", "rt_utils", - # "styleguide", - # "utils", + "utils_to_add", ] diff --git a/_shared_utils/shared_utils/utils_to_add.py b/_shared_utils/shared_utils/utils_to_add.py new file mode 100644 index 0000000000..dde5388726 --- /dev/null +++ b/_shared_utils/shared_utils/utils_to_add.py @@ -0,0 +1,54 @@ +import os +from pathlib import Path +from typing import Union + +import dask_geopandas as dg +import geopandas as gpd +from calitp_data_analysis import get_fs, utils + +fs = get_fs() + + +def parse_file_directory(file_name: str) -> str: + """ + Grab the directory of the filename. + For GCS bucket, we do not want '.' as the parent + directory, we want to parse and put together the + GCS filepath correctly. + """ + if str(Path(file_name).parent) != ".": + return str(Path(file_name).parent) + else: + return "" + + +def geoparquet_gcs_export(gdf: Union[gpd.GeoDataFrame, dg.GeoDataFrame], gcs_file_path: str, file_name: str, **kwargs): + """ + Save geodataframe as parquet locally, + then move to GCS bucket and delete local file. + + gdf: geopandas.GeoDataFrame + gcs_file_path: str + Ex: gs://calitp-analytics-data/data-analyses/my-folder/ + file_name: str + Filename, with or without .parquet. + """ + # Parse out file_name into stem (file_name_sanitized) + # and parent (file_directory_sanitized) + file_name_sanitized = Path(utils.sanitize_file_path(file_name)) + file_directory_sanitized = parse_file_directory(file_name) + + # Make sure GCS path includes the directory we want the file to go to + expanded_gcs = f"{Path(gcs_file_path).joinpath(file_directory_sanitized)}/" + expanded_gcs = str(expanded_gcs).replace("gs:/", "gs://") + + if isinstance(gdf, dg.GeoDataFrame): + gdf.to_parquet(f"{expanded_gcs}{file_name_sanitized}", overwrite=True, **kwargs) + + else: + gdf.to_parquet(f"{file_name_sanitized}.parquet", **kwargs) + fs.put( + f"{file_name_sanitized}.parquet", + f"{str(expanded_gcs)}{file_name_sanitized}.parquet", + ) + os.remove(f"{file_name_sanitized}.parquet", **kwargs) diff --git a/gtfs_funnel/config.yml b/gtfs_funnel/config.yml index a7e1d14020..ff71de2a41 100644 --- a/gtfs_funnel/config.yml +++ b/gtfs_funnel/config.yml @@ -1,9 +1,10 @@ raw_vp_file: "vp" usable_vp_file: "vp_usable" -vp_condensed_line_file: "vp_condensed" +vp_condensed_line_file: "condensed/vp_condensed" +vp_nearest_neighbor_file: "condensed/vp_nearest_neighbor" timestamp_col: "location_timestamp_local" time_min_cutoff: 10 stop_times_direction_file: "stop_times_direction" -trip_metrics_file: "schedule_trip_metrics" -route_direction_metrics_file: "schedule_route_direction_metrics" +trip_metrics_file: "schedule_trip/schedule_trip_metrics" +route_direction_metrics_file: "schedule_route_dir/schedule_route_direction_metrics" route_identification_file: "standardized_route_ids" \ No newline at end of file diff --git a/gtfs_funnel/route_typologies.py b/gtfs_funnel/route_typologies.py index d5af0744b1..d7765c0575 100644 --- a/gtfs_funnel/route_typologies.py +++ b/gtfs_funnel/route_typologies.py @@ -7,20 +7,27 @@ import pandas as pd from calitp_data_analysis.geography_utils import WGS84 -from calitp_data_analysis import utils +#from calitp_data_analysis import utils +from shared_utils import utils_to_add from segment_speed_utils import helpers, gtfs_schedule_wrangling from segment_speed_utils.project_vars import RT_SCHED_GCS, PROJECT_CRS catalog = intake.open_catalog( "../_shared_utils/shared_utils/shared_data_catalog.yml") -def assemble_scheduled_trip_metrics(analysis_date: str) -> pd.DataFrame: + +def assemble_scheduled_trip_metrics( + analysis_date: str, + dict_inputs: dict +) -> pd.DataFrame: """ Get GTFS schedule trip metrics including time-of-day buckets, scheduled service minutes, and median stop spacing. """ + STOP_TIMES_FILE = dict_inputs["stop_times_direction_file"] + df = gpd.read_parquet( - f"{RT_SCHED_GCS}stop_times_direction_{analysis_date}.parquet" + f"{RT_SCHED_GCS}{STOP_TIMES_FILE}_{analysis_date}.parquet" ) trips_to_route = helpers.import_scheduled_trips( @@ -173,7 +180,7 @@ def pop_density_by_shape(shape_df: gpd.GeoDataFrame): ROUTE_DIR_EXPORT = CONFIG_DICT["route_direction_metrics_file"] for date in analysis_date_list: - trip_metrics = assemble_scheduled_trip_metrics(date) + trip_metrics = assemble_scheduled_trip_metrics(date, CONFIG_DICT) trip_metrics.to_parquet( f"{RT_SCHED_GCS}{TRIP_EXPORT}_{date}.parquet") @@ -196,7 +203,7 @@ def pop_density_by_shape(shape_df: gpd.GeoDataFrame): how = "left" ) - utils.geoparquet_gcs_export( + utils_to_add.geoparquet_gcs_export( route_dir_metrics2, RT_SCHED_GCS, f"{ROUTE_DIR_EXPORT}_{date}" diff --git a/gtfs_funnel/vp_condenser.py b/gtfs_funnel/vp_condenser.py index 9b3a4892e7..b39d5f2992 100644 --- a/gtfs_funnel/vp_condenser.py +++ b/gtfs_funnel/vp_condenser.py @@ -11,7 +11,8 @@ from loguru import logger from calitp_data_analysis.geography_utils import WGS84 -from calitp_data_analysis import utils +#from calitp_data_analysis import utils +from shared_utils import utils_to_add from segment_speed_utils import vp_transform, wrangle_shapes from segment_speed_utils.project_vars import SEGMENT_GCS @@ -65,9 +66,9 @@ def condense_vp_to_linestring( align_dataframes = False ).compute().set_geometry("geometry").set_crs(WGS84) - utils.geoparquet_gcs_export( + utils_to_add.geoparquet_gcs_export( vp_condensed, - f"{SEGMENT_GCS}condensed/", + SEGMENT_GCS, f"{EXPORT_FILE}_{analysis_date}" ) @@ -76,7 +77,10 @@ def condense_vp_to_linestring( return -def prepare_vp_for_all_directions(analysis_date: str) -> gpd.GeoDataFrame: +def prepare_vp_for_all_directions( + analysis_date: str, + dict_inputs: dict +) -> gpd.GeoDataFrame: """ For each direction, exclude one the opposite direction and save out the arrays of valid indices. @@ -86,8 +90,11 @@ def prepare_vp_for_all_directions(analysis_date: str) -> gpd.GeoDataFrame: Subset vp_idx, location_timestamp_local and coordinate arrays to exclude southbound. """ + INPUT_FILE = dict_inputs["vp_condensed_line_file"] + EXPORT_FILE = dict_inputs["vp_nearest_neighbor_file"] + vp = delayed(gpd.read_parquet)( - f"{SEGMENT_GCS}condensed/vp_condensed_{analysis_date}.parquet", + f"{SEGMENT_GCS}{INPUT_FILE}_{analysis_date}.parquet", ) dfs = [ @@ -105,10 +112,10 @@ def prepare_vp_for_all_directions(analysis_date: str) -> gpd.GeoDataFrame: del results - utils.geoparquet_gcs_export( + utils_to_add.geoparquet_gcs_export( gdf, - f"{SEGMENT_GCS}condensed/", - f"vp_nearest_neighbor_{analysis_date}" + SEGMENT_GCS, + f"{EXPORT_FILE}_{analysis_date}" ) del gdf @@ -138,7 +145,7 @@ def prepare_vp_for_all_directions(analysis_date: str) -> gpd.GeoDataFrame: f"{time1 - start}" ) - prepare_vp_for_all_directions(analysis_date) + prepare_vp_for_all_directions(analysis_date, CONFIG_DICT) end = datetime.datetime.now() logger.info( diff --git a/rt_scheduled_v_ran/scripts/Makefile b/rt_scheduled_v_ran/scripts/Makefile index 42d37f3295..247da7f018 100644 --- a/rt_scheduled_v_ran/scripts/Makefile +++ b/rt_scheduled_v_ran/scripts/Makefile @@ -1,2 +1,2 @@ -spatial_accuracy: - python vp_spatial_accuracy.py +rt_sched_pipeline: + python rt_v_scheduled_trip.py diff --git a/rt_scheduled_v_ran/scripts/config.yml b/rt_scheduled_v_ran/scripts/config.yml new file mode 100644 index 0000000000..caa0b07af4 --- /dev/null +++ b/rt_scheduled_v_ran/scripts/config.yml @@ -0,0 +1,2 @@ +trip_metrics: "vp_trip/trip_metrics" +route_direction_metrics: "vp_route_dir/route_direction_metrics" \ No newline at end of file diff --git a/rt_scheduled_v_ran/scripts/route_aggregation.ipynb b/rt_scheduled_v_ran/scripts/route_aggregation.ipynb new file mode 100644 index 0000000000..8d38285955 --- /dev/null +++ b/rt_scheduled_v_ran/scripts/route_aggregation.ipynb @@ -0,0 +1,646 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "20e9270f-eef9-4e33-8e2b-260de6589a96", + "metadata": {}, + "source": [ + "## RT vs Schedule Route Aggregation Issues\n", + "\n", + "#### 1. Keep schedule and vp apart \n", + "For metrics derived from `vp_usable`, wherever we can merge with schedule info, do it. These metrics will include `vp_only` and `vp_and_schedule`, but **not** `schedule_only`. Schedule stuff `schedule_only` and `vp_and_schedule` will be done in `route_typologies`.\n", + "\n", + "Merge RT and schedule stuff **after** route-direction-time period aggregation.\n", + "Trips that are in vp will not have a route_id or direction_id, so our aggregation will wrap up all those into \"Unknown\" routes.\n", + "\n", + "For RT stuff, keep speed separate from other metrics for now.\n", + "\n", + "#### 2. Add columns for trip table \n", + "* Need `schedule_gtfs_dataset_key` from `vp_usable` and also `helpers.import_scheduled_trips` to get route-direction info.\n", + "* Add `time_of_day`, `peak_offpeak` column with `gtfs_schedule_wrangling`\n", + "\n", + "#### 3. Get metrics\n", + "Set up a function to do the division for certain percentages or other normalized metrics.\n", + "\n", + "This can be used for trip-level table, but will also need to be used after route-direction aggregation.\n", + "\n", + "#### 4. Set up for weighted metrics \n", + "Set up a function to help with weighted averages or percents. This should include all the columns we need to sum for a given grouping. \n", + "\n", + "For trips, this won't do anything, and it can be passed onto the metrics function in 3.\n", + "For route-direction-time_period, this will do something, and it will be passed onto the metrics function in 3.\n", + "\n", + "#### 5. Are functions generalizable?\n", + "For these functions for aggregation, put it separately in a script / `segment_speed_utils`. Leave this until it's obvious what can be used.\n", + "\n", + "#### 6. References to review while making changes\n", + "* how to set up speed-trip tables [add natural identifiers where necessary](https://github.com/cal-itp/data-analyses/blob/main/rt_segment_speeds/scripts/stop_arrivals_to_speed.py)\n", + "* [averaging of speeds](https://github.com/cal-itp/data-analyses/blob/main/rt_segment_speeds/scripts/average_speeds.py)\n", + "* crosswalk of operator identifiers [created here](https://github.com/cal-itp/data-analyses/blob/main/gtfs_funnel/crosswalk_gtfs_dataset_key_to_organization.py) and there is a [helper function](https://github.com/cal-itp/data-analyses/blob/main/rt_segment_speeds/segment_speed_utils/helpers.py#L169)...so use this! \n", + "* [segment_calcs](https://github.com/cal-itp/data-analyses/blob/main/rt_segment_speeds/segment_speed_utils/segment_calcs.py) for some aggregation\n", + "* [time helpers](https://github.com/cal-itp/data-analyses/blob/main/rt_segment_speeds/segment_speed_utils/time_helpers.py)" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "f8329ebd-74be-46d4-93bf-6031064b8ec5", + "metadata": {}, + "outputs": [], + "source": [ + "import dask.dataframe as dd\n", + "import pandas as pd\n", + "import yaml\n", + "\n", + "from shared_utils import rt_dates \n", + "from segment_speed_utils import helpers, gtfs_schedule_wrangling\n", + "from segment_speed_utils.project_vars import RT_SCHED_GCS, SEGMENT_GCS" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "b7d1538a-9a40-40a4-bbe4-524e9c7b1477", + "metadata": {}, + "outputs": [], + "source": [ + "analysis_date = rt_dates.DATES[\"dec2023\"]" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "41c09c7d-99f9-413d-86ba-73005d100e12", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'trip_metrics': 'vp_trip/trip_metrics',\n", + " 'route_direction_metrics': 'vp_route_dir/route_direction_metrics'}" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "with open(\"config.yml\") as f:\n", + " config_dict = yaml.safe_load(f)\n", + " \n", + "config_dict" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "123a5f17-5e31-4fb4-a90a-3c1e1ebecdf2", + "metadata": {}, + "outputs": [], + "source": [ + "EXPORT_FILE = config_dict[\"trip_metrics\"]" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "b0395091-31ab-4b8c-b041-61328448c2be", + "metadata": {}, + "outputs": [], + "source": [ + "df = pd.read_parquet(\n", + " f\"{RT_SCHED_GCS}trip_level_metrics/{analysis_date}_metrics.parquet\"\n", + ")\n", + "df.to_parquet(f\"{RT_SCHED_GCS}{EXPORT_FILE}_{analysis_date}.parquet\")" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "1d9e1279-c066-4a41-aebe-3f6c2c477423", + "metadata": {}, + "outputs": [], + "source": [ + "gtfs_key = dd.read_parquet(\n", + " f\"{SEGMENT_GCS}vp_usable_{analysis_date}\",\n", + " columns = [\"schedule_gtfs_dataset_key\", \"trip_instance_key\"]\n", + ").drop_duplicates().compute()" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "0b543be9-1af5-4c3b-b00e-a0e279349c82", + "metadata": {}, + "outputs": [], + "source": [ + "df2 = pd.merge(\n", + " gtfs_key,\n", + " df,\n", + " on = \"trip_instance_key\",\n", + " how = \"inner\",\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "a83d3abf-0f21-4b40-b67a-550077e42036", + "metadata": {}, + "outputs": [], + "source": [ + "trip_to_route = helpers.import_scheduled_trips(\n", + " analysis_date,\n", + " columns = [\"trip_instance_key\", \"route_id\", \"direction_id\"],\n", + " get_pandas = True\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "cda3e9f8-f0e3-4b1b-977a-51f2af4f9eae", + "metadata": {}, + "outputs": [], + "source": [ + "# The left only merges are in vp, but not in schedule\n", + "# Fill in route_id and direction_id with missing\n", + "df3 = pd.merge(\n", + " df2,\n", + " trip_to_route,\n", + " on = \"trip_instance_key\",\n", + " how = \"left\",\n", + " indicator = \"sched_rt_category\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "7958387d-6111-4bea-9e85-ea5236966144", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "both 77977\n", + "left_only 8151\n", + "right_only 0\n", + "Name: sched_rt_category, dtype: int64" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df3.sched_rt_category.value_counts()" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "7c10477b-69aa-49bd-97ec-79ac3463fd19", + "metadata": {}, + "outputs": [], + "source": [ + "df3 = df3.assign(\n", + " route_id = df3.route_id.fillna(\"Unknown\"),\n", + " direction_id = df3.direction_id.astype(\"Int64\"),\n", + " sched_rt_category = df3.apply(\n", + " lambda x: \"vp_only\" if x.sched_rt_category==\"left_only\"\n", + " else \"vp_sched\",\n", + " axis=1)\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "792b172e-3027-416f-b702-9c9ff9d0c89e", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "vp_sched 77977\n", + "vp_only 8151\n", + "Name: sched_rt_category, dtype: int64" + ] + }, + "execution_count": 12, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df3.sched_rt_category.value_counts()" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "21df331d-397c-4247-ac2a-0a579df02698", + "metadata": {}, + "outputs": [], + "source": [ + "time_of_day = gtfs_schedule_wrangling.get_trip_time_buckets(\n", + " analysis_date\n", + ").pipe(\n", + " gtfs_schedule_wrangling.add_peak_offpeak_column\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "b7335363-a3be-456e-a3ba-f5d56265ba01", + "metadata": {}, + "outputs": [], + "source": [ + "df4 = pd.merge(\n", + " df3.drop(columns = \"service_minutes\"),\n", + " time_of_day,\n", + " on = \"trip_instance_key\",\n", + " how = \"inner\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "0dc44202-cb24-4829-8d8f-a26de4ef32a9", + "metadata": {}, + "source": [ + "Base off of this:\n", + "https://github.com/cal-itp/data-analyses/blob/main/rt_segment_speeds/segment_speed_utils/segment_calcs.py#L89-L132" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "088b4d11-d8c1-41c5-94c5-1d5f6fd9dee3", + "metadata": {}, + "outputs": [], + "source": [ + "def weighted_average_function(\n", + " df: pd.DataFrame, \n", + " group_cols: list\n", + "):\n", + " df2 = (df.groupby(group_cols + [\"peak_offpeak\"])\n", + " .agg({\n", + " \"trip_instance_key\": \"count\",\n", + " #\"rt_service_min\": \"mean\", # can't use this twice...\n", + " # only if we move this to portfolio_utils.aggregate()\n", + "\n", + " # weighted average for trip updates\n", + " \"total_min_w_gtfs\": \"sum\",\n", + " \"rt_service_min\": \"sum\",\n", + "\n", + " # weighted average of pings per min\n", + " \"total_pings_for_trip\": \"sum\",\n", + " \"service_minutes\": \"sum\", # is it this one or rt_service_min?\n", + "\n", + " # weighted spatial accuracy \n", + " \"total_vp\": \"sum\",\n", + " \"vp_in_shape\": \"sum\",\n", + " }).reset_index()\n", + " )\n", + "\n", + " return df2" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "81618bff-930e-4c5b-8147-7b8e65b403a8", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
schedule_gtfs_dataset_keyroute_iddirection_idsched_rt_categorypeak_offpeaktrip_instance_keytotal_min_w_gtfsrt_service_mintotal_pings_for_tripservice_minutestotal_vpvp_in_shape
0015d67d5b75b5cf2b710bbadadfb75f5170vp_schedoffpeak10729732.9833332153567.02153.01708.0
1015d67d5b75b5cf2b710bbadadfb75f5170vp_schedpeak12808839.9333332388690.02388.02195.0
2015d67d5b75b5cf2b710bbadadfb75f5171vp_schedoffpeak11675697.9666671992569.01992.01705.0
3015d67d5b75b5cf2b710bbadadfb75f5171vp_schedpeak11596618.7833331757595.01757.01693.0
4015d67d5b75b5cf2b710bbadadfb75f52190vp_schedoffpeak9183242.500000533152.0533.0428.0
.......................................
5620ff1bc5dde661d62c877165421e9ca257ROUTEA0vp_schedpeak3113114.03333334090.0340.0155.0
5621ff1bc5dde661d62c877165421e9ca257ROUTEA1vp_schedoffpeak8409408.4333331252222.01252.0481.0
5622ff1bc5dde661d62c877165421e9ca257ROUTEA1vp_schedpeak8337336.8500001038239.01038.0522.0
5623ff1bc5dde661d62c877165421e9ca257ROUTEB1vp_schedoffpeak3159159.416667467141.0467.0335.0
5624ff1bc5dde661d62c877165421e9ca257ROUTEB1vp_schedpeak5260259.800000773235.0773.0601.0
\n", + "

5625 rows × 12 columns

\n", + "
" + ], + "text/plain": [ + " schedule_gtfs_dataset_key route_id direction_id \\\n", + "0 015d67d5b75b5cf2b710bbadadfb75f5 17 0 \n", + "1 015d67d5b75b5cf2b710bbadadfb75f5 17 0 \n", + "2 015d67d5b75b5cf2b710bbadadfb75f5 17 1 \n", + "3 015d67d5b75b5cf2b710bbadadfb75f5 17 1 \n", + "4 015d67d5b75b5cf2b710bbadadfb75f5 219 0 \n", + "... ... ... ... \n", + "5620 ff1bc5dde661d62c877165421e9ca257 ROUTEA 0 \n", + "5621 ff1bc5dde661d62c877165421e9ca257 ROUTEA 1 \n", + "5622 ff1bc5dde661d62c877165421e9ca257 ROUTEA 1 \n", + "5623 ff1bc5dde661d62c877165421e9ca257 ROUTEB 1 \n", + "5624 ff1bc5dde661d62c877165421e9ca257 ROUTEB 1 \n", + "\n", + " sched_rt_category peak_offpeak trip_instance_key total_min_w_gtfs \\\n", + "0 vp_sched offpeak 10 729 \n", + "1 vp_sched peak 12 808 \n", + "2 vp_sched offpeak 11 675 \n", + "3 vp_sched peak 11 596 \n", + "4 vp_sched offpeak 9 183 \n", + "... ... ... ... ... \n", + "5620 vp_sched peak 3 113 \n", + "5621 vp_sched offpeak 8 409 \n", + "5622 vp_sched peak 8 337 \n", + "5623 vp_sched offpeak 3 159 \n", + "5624 vp_sched peak 5 260 \n", + "\n", + " rt_service_min total_pings_for_trip service_minutes total_vp \\\n", + "0 732.983333 2153 567.0 2153.0 \n", + "1 839.933333 2388 690.0 2388.0 \n", + "2 697.966667 1992 569.0 1992.0 \n", + "3 618.783333 1757 595.0 1757.0 \n", + "4 242.500000 533 152.0 533.0 \n", + "... ... ... ... ... \n", + "5620 114.033333 340 90.0 340.0 \n", + "5621 408.433333 1252 222.0 1252.0 \n", + "5622 336.850000 1038 239.0 1038.0 \n", + "5623 159.416667 467 141.0 467.0 \n", + "5624 259.800000 773 235.0 773.0 \n", + "\n", + " vp_in_shape \n", + "0 1708.0 \n", + "1 2195.0 \n", + "2 1705.0 \n", + "3 1693.0 \n", + "4 428.0 \n", + "... ... \n", + "5620 155.0 \n", + "5621 481.0 \n", + "5622 522.0 \n", + "5623 335.0 \n", + "5624 601.0 \n", + "\n", + "[5625 rows x 12 columns]" + ] + }, + "execution_count": 16, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "weighted_average_function(df4, [\"schedule_gtfs_dataset_key\", \n", + " \"route_id\", \"direction_id\", \n", + " \"sched_rt_category\"])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "813b4dee-5fce-4e23-9774-06cb05ddf272", + "metadata": {}, + "outputs": [], + "source": [ + "def calculate_percent_normalized_metrics(df: pd.DataFrame):\n", + " # metrics like pings per minute, percent of trip with RT\n", + " # should be calculated after aggregation\n", + " # can be done at trip-level, can be done after sums are taken for route-direction\n", + " # do not do simple averages in aggregation\n", + " return" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "145e5312-c4ab-433b-9ad1-53a6b924122b", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c72e75f0-52c2-413d-81b7-397301886f2d", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.13" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/rt_scheduled_v_ran/scripts/vp_spatial_accuracy.py b/rt_scheduled_v_ran/scripts/vp_spatial_accuracy.py deleted file mode 100644 index 874a068e0c..0000000000 --- a/rt_scheduled_v_ran/scripts/vp_spatial_accuracy.py +++ /dev/null @@ -1,229 +0,0 @@ -""" -Calculate a trip-level metric of spatial accuracy. - -Newmark's GTFS RT spatial accuracy metric is simply -how many vehicle positions correctly join onto a buffered -shape. -""" -import dask.dataframe as dd -import dask_geopandas as dg -import datetime -import geopandas as gpd -import pandas as pd -import sys - -from loguru import logger - -from segment_speed_utils import helpers -from segment_speed_utils.project_vars import (SEGMENT_GCS, - COMPILED_CACHED_VIEWS, - RT_SCHED_GCS, - PROJECT_CRS - ) - -def grab_shape_keys_in_vp(analysis_date: str) -> pd.DataFrame: - """ - Subset raw vp and find unique trip_instance_keys. - Create crosswalk to link trip_instance_key to shape_array_key. - """ - vp_trip_df = (pd.read_parquet( - f"{SEGMENT_GCS}vp_{analysis_date}.parquet", - columns=["trip_instance_key"]) - .drop_duplicates() - .dropna(subset="trip_instance_key") - ) - - # Make sure we have a shape geometry too - # otherwise map_partitions will throw error - shapes = pd.read_parquet( - f"{COMPILED_CACHED_VIEWS}routelines_{analysis_date}.parquet", - columns = ["shape_array_key"], - ).dropna().drop_duplicates() - - trips_with_shape = helpers.import_scheduled_trips( - analysis_date, - columns = ["trip_instance_key", "shape_array_key"], - get_pandas = True - ).merge( - shapes, - on = "shape_array_key", - how = "inner" - ).merge( - vp_trip_df, - on = "trip_instance_key", - how = "inner" - ).drop_duplicates().dropna().reset_index(drop=True) - - return trips_with_shape - - -def buffer_shapes( - analysis_date: str, - trips_with_shape_subset: pd.DataFrame, - buffer_meters: int = 35, - **kwargs -) -> gpd.GeoDataFrame: - """ - Filter scheduled shapes down to the shapes that appear in vp. - Buffer these. - - Attach the shape geometry for a subset of shapes or trips. - """ - shapes_subset = trips_with_shape_subset.shape_array_key.unique().tolist() - - shapes = helpers.import_scheduled_shapes( - analysis_date, - columns = ["shape_array_key", "geometry"], - filters = [[("shape_array_key", "in", shapes_subset)]], - crs = PROJECT_CRS, - get_pandas = True - ) - - # to_crs takes awhile, so do a filtering on only shapes we need - shapes = shapes.assign( - geometry = shapes.geometry.buffer(buffer_meters) - ) - - trips_with_shape_geom = pd.merge( - shapes, - trips_with_shape_subset, - on = "shape_array_key", - how = "inner" - ) - - return trips_with_shape_geom - - -def merge_vp_with_shape_and_count( - vp: dd.DataFrame, - trips_with_shape_geom: gpd.GeoDataFrame -) -> gpd.GeoDataFrame: - """ - Merge vp with crosswalk and buffered shapes. - Get vp count totals and vp within shape. - """ - vp_gdf = gpd.GeoDataFrame( - vp, - geometry = gpd.points_from_xy(vp.x, vp.y), - crs = PROJECT_CRS - ) - - vp2 = pd.merge( - vp_gdf, - trips_with_shape_geom, - on = "trip_instance_key", - how = "inner" - ).reset_index(drop=True) - - total_vp = total_vp_counts_by_trip(vp2) - - vp2 = vp2.assign( - is_within = vp2.geometry_x.within(vp2.geometry_y) - ).query('is_within==True') - - vps_in_shape = (vp2.groupby("trip_instance_key", - observed = True, group_keys = False) - .agg({"location_timestamp_local": "count"}) - .reset_index() - .rename(columns = {"location_timestamp_local": "vp_in_shape"}) - ) - - count_df = pd.merge( - total_vp, - vps_in_shape, - on = "trip_instance_key", - how = "left" - ) - - count_df = count_df.assign( - vp_in_shape = count_df.vp_in_shape.fillna(0).astype("int32"), - total_vp = count_df.total_vp.fillna(0).astype("int32") - ) - - return count_df - - -def total_vp_counts_by_trip(vp: gpd.GeoDataFrame) -> pd.DataFrame: - """ - Get a count of vp for each trip, whether or not those fall - within buffered shape or not - """ - count_vp = ( - vp.groupby("trip_instance_key", - observed=True, group_keys=False) - .agg({"location_timestamp_local": "count"}) - .reset_index() - .rename(columns={"location_timestamp_local": "total_vp"}) - ) - - return count_vp - - -if __name__=="__main__": - from update_vars import analysis_date_list - #from dask.distributed import LocalCluster, Client - - #client = Client("dask-scheduler.dask.svc.cluster.local:8786") - #cluster = LocalCluster(n_workers = 2) - - LOG_FILE = "../logs/spatial_accuracy.log" - logger.add(LOG_FILE, retention="3 months") - logger.add(sys.stderr, - format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", - level="INFO") - - for analysis_date in analysis_date_list: - - logger.info(f"Analysis date: {analysis_date}") - - start = datetime.datetime.now() - - # Create crosswalk of trip_instance_keys to shape_array_key - trips_with_shape = grab_shape_keys_in_vp(analysis_date) - - trips_with_shape_geom = buffer_shapes( - analysis_date, - trips_with_shape, - buffer_meters = 35 - ) - - # Import vp and partition it - vp = dg.read_parquet( - f"{SEGMENT_GCS}vp_{analysis_date}.parquet", - ).to_crs(PROJECT_CRS) - - vp = vp.assign( - x = vp.geometry.x, - y = vp.geometry.y - ).drop(columns = "geometry") - - vp = vp.repartition(npartitions = 100).persist() - - results = vp.map_partitions( - merge_vp_with_shape_and_count, - trips_with_shape_geom, - meta = { - "trip_instance_key": "str", - "total_vp": "int32", - "vp_in_shape": "int32" - }, - align_dataframes = False - ) - - time1 = datetime.datetime.now() - logger.info(f"use map partitions to find within shape vp: {time1 - start}") - - results = results.compute() - - time2 = datetime.datetime.now() - logger.info(f"compute results: {time2 - time1}") - - results.to_parquet( - f"{RT_SCHED_GCS}vp_spatial_accuracy_{analysis_date}.parquet") - - end = datetime.datetime.now() - logger.info(f"export: {end - time2}") - logger.info(f"execution time: {end - start}") - - #client.close() - #cluster.close() \ No newline at end of file diff --git a/rt_segment_speeds/scripts/average_speeds.py b/rt_segment_speeds/scripts/average_speeds.py index 100beffc8d..cda502b922 100644 --- a/rt_segment_speeds/scripts/average_speeds.py +++ b/rt_segment_speeds/scripts/average_speeds.py @@ -12,6 +12,7 @@ from typing import Literal from calitp_data_analysis.geography_utils import WGS84 +from shared_utils import utils_to_add from calitp_data_analysis import utils from segment_speed_utils import (gtfs_schedule_wrangling, helpers, segment_calcs, time_helpers) @@ -152,10 +153,10 @@ def single_day_averages(analysis_date: str, dict_inputs: dict): Start from single day segment-trip speeds and aggregate by peak_offpeak, weekday_weekend. """ - SHAPE_SEG_FILE = Path(dict_inputs["shape_stop_single_segment"]) - ROUTE_SEG_FILE = Path(dict_inputs["route_dir_single_segment"]) - TRIP_FILE = Path(dict_inputs["trip_speeds_single_summary"]) - ROUTE_DIR_FILE = Path(dict_inputs["route_dir_single_summary"]) + SHAPE_SEG_FILE = dict_inputs["shape_stop_single_segment"] + ROUTE_SEG_FILE = dict_inputs["route_dir_single_segment"] + TRIP_FILE = dict_inputs["trip_speeds_single_summary"] + ROUTE_DIR_FILE = dict_inputs["route_dir_single_summary"] start = datetime.datetime.now() @@ -182,10 +183,10 @@ def single_day_averages(analysis_date: str, dict_inputs: dict): columns = col_order + ["geometry"] ) - utils.geoparquet_gcs_export( + utils_to_add.geoparquet_gcs_export( shape_stop_segments, - f"{SEGMENT_GCS}{str(SHAPE_SEG_FILE.parent)}/", - f"{SHAPE_SEG_FILE.stem}_{analysis_date}" + SEGMENT_GCS, + f"{SHAPE_SEG_FILE}_{analysis_date}" ) del shape_stop_segments, segment_geom @@ -215,10 +216,10 @@ def single_day_averages(analysis_date: str, dict_inputs: dict): columns = col_order + ["geometry"] ) - utils.geoparquet_gcs_export( + utils_to_add.geoparquet_gcs_export( route_dir_segments, - f"{SEGMENT_GCS}{str(ROUTE_SEG_FILE.parent)}/", - f"{ROUTE_SEG_FILE.stem}_{analysis_date}" + SEGMENT_GCS, + f"{ROUTE_SEG_FILE}_{analysis_date}" ) del route_dir_segments, segment_geom @@ -264,10 +265,10 @@ def single_day_averages(analysis_date: str, dict_inputs: dict): columns = col_order + ["route_name", "geometry"] ) - utils.geoparquet_gcs_export( + utils_to_add.geoparquet_gcs_export( route_dir_avg, - f"{SEGMENT_GCS}{str(ROUTE_DIR_FILE.parent)}/", - f"{ROUTE_DIR_FILE.stem}_{analysis_date}" + SEGMENT_GCS, + f"{ROUTE_DIR_FILE}_{analysis_date}" ) del route_dir_avg, common_shape_geom @@ -287,8 +288,8 @@ def multi_day_averages(analysis_date_list: list, dict_inputs: dict): the seven days is concatenated first before averaging, so that we get weighted averages. """ - ROUTE_SEG_FILE = Path(dict_inputs["route_dir_multi_segment"]) - ROUTE_DIR_FILE = Path(dict_inputs["route_dir_multi_summary"]) + ROUTE_SEG_FILE = dict_inputs["route_dir_multi_segment"] + ROUTE_DIR_FILE = dict_inputs["route_dir_multi_summary"] df = delayed(concatenate_trip_segment_speeds)( analysis_date_list, dict_inputs) @@ -327,10 +328,10 @@ def multi_day_averages(analysis_date_list: list, dict_inputs: dict): columns = col_order + ["geometry"] ) - utils.geoparquet_gcs_export( + utils_to_add.geoparquet_gcs_export( route_dir_segments, - f"{SEGMENT_GCS}{str(ROUTE_SEG_FILE.parent)}/", - f"{ROUTE_SEG_FILE.stem}_{time_span_str}" + SEGMENT_GCS, + f"{ROUTE_SEG_FILE}_{time_span_str}" ) del route_dir_segments @@ -366,10 +367,10 @@ def multi_day_averages(analysis_date_list: list, dict_inputs: dict): columns = col_order + ["route_name", "geometry"] ) - utils.geoparquet_gcs_export( + utils_to_add.geoparquet_gcs_export( route_dir_avg, - f"{SEGMENT_GCS}{str(ROUTE_DIR_FILE.parent)}/", - f"{ROUTE_DIR_FILE.stem}_{time_span_str}" + SEGMENT_GCS, + f"{ROUTE_DIR_FILE}_{time_span_str}" ) del route_dir_avg diff --git a/rt_segment_speeds/scripts/cut_stop_segments.py b/rt_segment_speeds/scripts/cut_stop_segments.py index 6f78ec0167..a5dfb464ce 100644 --- a/rt_segment_speeds/scripts/cut_stop_segments.py +++ b/rt_segment_speeds/scripts/cut_stop_segments.py @@ -13,9 +13,9 @@ import sys from loguru import logger -from pathlib import Path -from calitp_data_analysis import utils +#from calitp_data_analysis import utils +from shared_utils import utils_to_add from calitp_data_analysis.geography_utils import WGS84 from segment_speed_utils import gtfs_schedule_wrangling, helpers from segment_speed_utils.project_vars import (SEGMENT_GCS, @@ -128,7 +128,7 @@ def cut_stop_segments(analysis_date: str) -> gpd.GeoDataFrame: for analysis_date in analysis_date_list: start = datetime.datetime.now() - SEGMENT_FILE = Path(RT_DICT["segments_file"]) + SEGMENT_FILE = RT_DICT["segments_file"] segments = cut_stop_segments(analysis_date) @@ -145,10 +145,10 @@ def cut_stop_segments(analysis_date: str) -> gpd.GeoDataFrame: how = "inner" ) - utils.geoparquet_gcs_export( + utils_to_add.geoparquet_gcs_export( segments, - f"{SEGMENT_GCS}{str(SEGMENT_FILE.parent)}/", - f"{SEGMENT_FILE.stem}_{analysis_date}" + SEGMENT_GCS, + f"{SEGMENT_FILE}_{analysis_date}" ) del segments, shape_to_route diff --git a/rt_segment_speeds/scripts/nearest_vp_to_stop.py b/rt_segment_speeds/scripts/nearest_vp_to_stop.py index 455afe38a8..8e3973e068 100644 --- a/rt_segment_speeds/scripts/nearest_vp_to_stop.py +++ b/rt_segment_speeds/scripts/nearest_vp_to_stop.py @@ -10,10 +10,10 @@ import sys from loguru import logger -from pathlib import Path from calitp_data_analysis.geography_utils import WGS84 -from calitp_data_analysis import utils +#from calitp_data_analysis import utils +from shared_utils import utils_to_add from segment_speed_utils import helpers, neighbor from segment_speed_utils.project_vars import SEGMENT_GCS @@ -126,10 +126,10 @@ def nearest_neighbor_rt_stop_times( results = add_nearest_neighbor_result(gdf, analysis_date) - utils.geoparquet_gcs_export( + utils_to_add.geoparquet_gcs_export( results, - f"{SEGMENT_GCS}{str(EXPORT_FILE.parent)}/", - f"{EXPORT_FILE.stem}_{analysis_date}", + SEGMENT_GCS, + f"{EXPORT_FILE}_{analysis_date}", ) end = datetime.datetime.now() @@ -152,7 +152,7 @@ def nearest_neighbor_shape_segments( """ start = datetime.datetime.now() - EXPORT_FILE = Path(f'{dict_inputs["stage2"]}') + EXPORT_FILE = dict_inputs["stage2"] SEGMENT_FILE = dict_inputs["segments_file"] subset_trips = pd.read_parquet( @@ -194,10 +194,10 @@ def nearest_neighbor_shape_segments( del gdf - utils.geoparquet_gcs_export( + utils_to_add.geoparquet_gcs_export( results, - f"{SEGMENT_GCS}{str(EXPORT_FILE.parent)}/", - f"{EXPORT_FILE.stem}_{analysis_date}", + SEGMENT_GCS, + f"{EXPORT_FILE}_{analysis_date}", ) end = datetime.datetime.now() diff --git a/rt_segment_speeds/scripts/select_stop_segments.py b/rt_segment_speeds/scripts/select_stop_segments.py index 4a1791cfd0..4ad6071974 100644 --- a/rt_segment_speeds/scripts/select_stop_segments.py +++ b/rt_segment_speeds/scripts/select_stop_segments.py @@ -13,9 +13,8 @@ import geopandas as gpd import pandas as pd -from pathlib import Path - -from calitp_data_analysis import utils +#from calitp_data_analysis import utils +from shared_utils import utils_to_add from shared_utils import rt_dates from segment_speed_utils import helpers from segment_speed_utils.project_vars import SEGMENT_GCS @@ -70,14 +69,14 @@ def select_one_trip_per_shape(analysis_date: str): for analysis_date in analysis_date_list: start = datetime.datetime.now() - SEGMENT_FILE = Path(STOP_SEG_DICT["segments_file"]) + SEGMENT_FILE = STOP_SEG_DICT["segments_file"] segments = select_one_trip_per_shape(analysis_date) - utils.geoparquet_gcs_export( + utils_to_add.geoparquet_gcs_export( segments, - f"{SEGMENT_GCS}{str(SEGMENT_FILE.parent)}/", - f"{SEGMENT_FILE.stem}_{analysis_date}" + SEGMENT_GCS, + f"{SEGMENT_FILE}_{analysis_date}" ) end = datetime.datetime.now()