Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion _shared_utils/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
setup(
name="shared_utils",
packages=find_packages(),
version="2.2.1",
version="2.2.2",
description="Shared utility functions for data analyses",
author="Cal-ITP",
license="Apache",
Expand Down
5 changes: 2 additions & 3 deletions _shared_utils/shared_utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
rt_dates,
rt_utils,
schedule_rt_utils,
utils_to_add,
)

__all__ = [
# "calitp_color_palette",
"dask_utils",
"geog_utils_to_add",
"gtfs_utils",
Expand All @@ -19,6 +19,5 @@
"schedule_rt_utils",
"rt_dates",
"rt_utils",
# "styleguide",
# "utils",
"utils_to_add",
]
54 changes: 54 additions & 0 deletions _shared_utils/shared_utils/utils_to_add.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import os
from pathlib import Path
from typing import Union

import dask_geopandas as dg
import geopandas as gpd
from calitp_data_analysis import get_fs, utils

fs = get_fs()


def parse_file_directory(file_name: str) -> str:
"""
Grab the directory of the filename.
For GCS bucket, we do not want '.' as the parent
directory, we want to parse and put together the
GCS filepath correctly.
"""
if str(Path(file_name).parent) != ".":
return str(Path(file_name).parent)
else:
return ""


def geoparquet_gcs_export(gdf: Union[gpd.GeoDataFrame, dg.GeoDataFrame], gcs_file_path: str, file_name: str, **kwargs):
"""
Save geodataframe as parquet locally,
then move to GCS bucket and delete local file.

gdf: geopandas.GeoDataFrame
gcs_file_path: str
Ex: gs://calitp-analytics-data/data-analyses/my-folder/
file_name: str
Filename, with or without .parquet.
"""
# Parse out file_name into stem (file_name_sanitized)
# and parent (file_directory_sanitized)
file_name_sanitized = Path(utils.sanitize_file_path(file_name))
file_directory_sanitized = parse_file_directory(file_name)

# Make sure GCS path includes the directory we want the file to go to
expanded_gcs = f"{Path(gcs_file_path).joinpath(file_directory_sanitized)}/"
expanded_gcs = str(expanded_gcs).replace("gs:/", "gs://")

if isinstance(gdf, dg.GeoDataFrame):
gdf.to_parquet(f"{expanded_gcs}{file_name_sanitized}", overwrite=True, **kwargs)

else:
gdf.to_parquet(f"{file_name_sanitized}.parquet", **kwargs)
fs.put(
f"{file_name_sanitized}.parquet",
f"{str(expanded_gcs)}{file_name_sanitized}.parquet",
)
os.remove(f"{file_name_sanitized}.parquet", **kwargs)
7 changes: 4 additions & 3 deletions gtfs_funnel/config.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
raw_vp_file: "vp"
usable_vp_file: "vp_usable"
vp_condensed_line_file: "vp_condensed"
vp_condensed_line_file: "condensed/vp_condensed"
vp_nearest_neighbor_file: "condensed/vp_nearest_neighbor"
timestamp_col: "location_timestamp_local"
time_min_cutoff: 10
stop_times_direction_file: "stop_times_direction"
trip_metrics_file: "schedule_trip_metrics"
route_direction_metrics_file: "schedule_route_direction_metrics"
trip_metrics_file: "schedule_trip/schedule_trip_metrics"
route_direction_metrics_file: "schedule_route_dir/schedule_route_direction_metrics"
route_identification_file: "standardized_route_ids"
17 changes: 12 additions & 5 deletions gtfs_funnel/route_typologies.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,27 @@
import pandas as pd

from calitp_data_analysis.geography_utils import WGS84
from calitp_data_analysis import utils
#from calitp_data_analysis import utils
from shared_utils import utils_to_add
from segment_speed_utils import helpers, gtfs_schedule_wrangling
from segment_speed_utils.project_vars import RT_SCHED_GCS, PROJECT_CRS

catalog = intake.open_catalog(
"../_shared_utils/shared_utils/shared_data_catalog.yml")

def assemble_scheduled_trip_metrics(analysis_date: str) -> pd.DataFrame:

def assemble_scheduled_trip_metrics(
analysis_date: str,
dict_inputs: dict
) -> pd.DataFrame:
"""
Get GTFS schedule trip metrics including time-of-day buckets,
scheduled service minutes, and median stop spacing.
"""
STOP_TIMES_FILE = dict_inputs["stop_times_direction_file"]

df = gpd.read_parquet(
f"{RT_SCHED_GCS}stop_times_direction_{analysis_date}.parquet"
f"{RT_SCHED_GCS}{STOP_TIMES_FILE}_{analysis_date}.parquet"
)

trips_to_route = helpers.import_scheduled_trips(
Expand Down Expand Up @@ -173,7 +180,7 @@ def pop_density_by_shape(shape_df: gpd.GeoDataFrame):
ROUTE_DIR_EXPORT = CONFIG_DICT["route_direction_metrics_file"]

for date in analysis_date_list:
trip_metrics = assemble_scheduled_trip_metrics(date)
trip_metrics = assemble_scheduled_trip_metrics(date, CONFIG_DICT)

trip_metrics.to_parquet(
f"{RT_SCHED_GCS}{TRIP_EXPORT}_{date}.parquet")
Expand All @@ -196,7 +203,7 @@ def pop_density_by_shape(shape_df: gpd.GeoDataFrame):
how = "left"
)

utils.geoparquet_gcs_export(
utils_to_add.geoparquet_gcs_export(
route_dir_metrics2,
RT_SCHED_GCS,
f"{ROUTE_DIR_EXPORT}_{date}"
Expand Down
25 changes: 16 additions & 9 deletions gtfs_funnel/vp_condenser.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
from loguru import logger

from calitp_data_analysis.geography_utils import WGS84
from calitp_data_analysis import utils
#from calitp_data_analysis import utils
from shared_utils import utils_to_add
from segment_speed_utils import vp_transform, wrangle_shapes
from segment_speed_utils.project_vars import SEGMENT_GCS

Expand Down Expand Up @@ -65,9 +66,9 @@ def condense_vp_to_linestring(
align_dataframes = False
).compute().set_geometry("geometry").set_crs(WGS84)

utils.geoparquet_gcs_export(
utils_to_add.geoparquet_gcs_export(
vp_condensed,
f"{SEGMENT_GCS}condensed/",
SEGMENT_GCS,
f"{EXPORT_FILE}_{analysis_date}"
)

Expand All @@ -76,7 +77,10 @@ def condense_vp_to_linestring(
return


def prepare_vp_for_all_directions(analysis_date: str) -> gpd.GeoDataFrame:
def prepare_vp_for_all_directions(
analysis_date: str,
dict_inputs: dict
) -> gpd.GeoDataFrame:
"""
For each direction, exclude one the opposite direction and
save out the arrays of valid indices.
Expand All @@ -86,8 +90,11 @@ def prepare_vp_for_all_directions(analysis_date: str) -> gpd.GeoDataFrame:
Subset vp_idx, location_timestamp_local and coordinate arrays
to exclude southbound.
"""
INPUT_FILE = dict_inputs["vp_condensed_line_file"]
EXPORT_FILE = dict_inputs["vp_nearest_neighbor_file"]

vp = delayed(gpd.read_parquet)(
f"{SEGMENT_GCS}condensed/vp_condensed_{analysis_date}.parquet",
f"{SEGMENT_GCS}{INPUT_FILE}_{analysis_date}.parquet",
)

dfs = [
Expand All @@ -105,10 +112,10 @@ def prepare_vp_for_all_directions(analysis_date: str) -> gpd.GeoDataFrame:

del results

utils.geoparquet_gcs_export(
utils_to_add.geoparquet_gcs_export(
gdf,
f"{SEGMENT_GCS}condensed/",
f"vp_nearest_neighbor_{analysis_date}"
SEGMENT_GCS,
f"{EXPORT_FILE}_{analysis_date}"
)

del gdf
Expand Down Expand Up @@ -138,7 +145,7 @@ def prepare_vp_for_all_directions(analysis_date: str) -> gpd.GeoDataFrame:
f"{time1 - start}"
)

prepare_vp_for_all_directions(analysis_date)
prepare_vp_for_all_directions(analysis_date, CONFIG_DICT)

end = datetime.datetime.now()
logger.info(
Expand Down
4 changes: 2 additions & 2 deletions rt_scheduled_v_ran/scripts/Makefile
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
spatial_accuracy:
python vp_spatial_accuracy.py
rt_sched_pipeline:
python rt_v_scheduled_trip.py
2 changes: 2 additions & 0 deletions rt_scheduled_v_ran/scripts/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
trip_metrics: "vp_trip/trip_metrics"
route_direction_metrics: "vp_route_dir/route_direction_metrics"
Loading