In [1]:
import getpass
import logging
import pathlib
import pandas as pd
import geopandas as gpd
from mtcpy.miscio import log_or_print
from mtcpy.geospatial import google_geocode_batch

user = getpass.getuser()

Info: Found credentials at: /Users/jcroff/Library/CloudStorage/Box-Box/dvutils-creds-jcroff.json


In [2]:
def setup_logger(logger_name, output_dir):
    """Set up a logger with the specified name and output directory."""
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.INFO)
    
    # Create a file handler for logging
    log_file = f"{output_dir}/{logger_name}.log"
    file_handler = logging.FileHandler(log_file)
    
    # Create a formatter and set it for the handler
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(formatter)
    
    # Add the handler to the logger
    logger.addHandler(file_handler)
    
    return logger

In [3]:
work_dir = pathlib.Path(
    f"/Users/{user}/Library/CloudStorage/Box-Box/DataViz Projects/Data Services/FasTrak Data"
)
ft_data = work_dir / "Fastrak Accounts Cleaned" / "Final Geocode Results" / "account_complete_9_12_24.csv"
gc_data = work_dir / "Fastrak Accounts Cleaned" / "bay_area_fastrak_accounts_geocoded.csv"
final_gc_data_csv = work_dir / "Fastrak Accounts Cleaned" / "Final Geocode Results" / "bay_area_fastrak_accounts_geocoded_final.csv"
final_gc_data_geojson = work_dir / "Fastrak Accounts Cleaned" / "Final Geocode Results" / "bay_area_fastrak_accounts_geocoded_final.geojson"
summary_data_xlsx = work_dir / "Fastrak Accounts Cleaned" / "Final Geocode Results" / "bay_area_fastrak_accounts_geocoded_summary.xlsx"
final_aggregated_data_csv = work_dir / "Fastrak Accounts Cleaned" / "Final Geocode Results" / "bay_area_fastrak_accounts_geocoded_final_aggregated.csv"
final_aggregated_data_geojson = work_dir / "Fastrak Accounts Cleaned" / "Final Geocode Results" / "bay_area_fastrak_accounts_geocoded_final_aggregated.geojson"
epc_data = (
    "https://services3.arcgis.com/i2dkYWmb4wHvYPda/arcgis/rest/services/"
    "draft_equity_priority_communities_pba2050plus_acs2022a/FeatureServer/0/query?outFields=*&where=1%3D1&f=geojson"
)
tract_data = (
    "https://services3.arcgis.com/i2dkYWmb4wHvYPda/arcgis/rest/services/"
    "region_2020_censustract/FeatureServer/0/query?outFields=*&where=1%3D1&f=geojson"
)

In [4]:
LOGGER = setup_logger(logger_name="fastrak_geocoding", output_dir="Logs")

In [5]:
def read_fastrak_data(file_path):
    """Read the Fastrak data from the specified file path."""
    log_or_print(f"Reading Fastrak data from {file_path}", LOGGER)
    df = pd.read_csv(file_path)
    log_or_print(f"Read {len(df)} records from Fastrak data", LOGGER)

    # drop columns that are blank
    log_or_print("Dropping columns that are blank", LOGGER)
    rm_cols = [col for col in df.columns if col.strip() == '']
    df = df.drop(columns=rm_cols)
    return df

In [6]:
def read_epc_data(file_path):
    """Read the EPC data from the specified file path."""
    log_or_print(f"Reading EPC data from {file_path}", LOGGER)
    df = gpd.read_file(file_path)
    log_or_print(f"Read {len(df)} records from {file_path}", LOGGER)
    return df

In [7]:
def read_tract_data(file_path):
    """Read the tract data from the specified file path."""
    log_or_print(f"Reading tract data from {file_path}", LOGGER)
    df = gpd.read_file(file_path)
    log_or_print(f"Read {len(df)} records from {file_path}", LOGGER)
    return df

In [8]:
def write_geocoded_data(df, file_path, mode='w', header=True):
    """Write the geocoded data to the specified file path.

    Args:
        df (pd.DataFrame): DataFrame containing the geocoded data.
        file_path (str): Path to write the geocoded data to.
        mode (str, optional): Mode to write the data. 'w' for overwrite, 'a' for append. Defaults to 'w'.
        header (bool, optional): Whether to write the header. Defaults to True.
    """
    log_or_print(f"Writing geocoded data to {file_path} with mode={mode} and header={header}", LOGGER)
    df.to_csv(file_path, mode=mode, header=header, index=False)
    log_or_print(f"Wrote {len(df)} records to {file_path}", LOGGER)

In [9]:
def sjoin_geocoded_data(ft_gdf, tracts_gdf):
    """Spatially join the geocoded census tract data."""
    log_or_print(
        f"Spatially joining geocoded Fastrak data with tract data. Fastrack gdf len: {len(ft_gdf)} Tract gdf len: {len(tracts_gdf)}",
        LOGGER,
    )
    # check CRS
    if ft_gdf.crs != tracts_gdf.crs:
        log_or_print("CRS do not match. Reprojecting tracts data to match Fastrak data", LOGGER)
        tracts_gdf = tracts_gdf.to_crs(ft_gdf.crs)
    
    joined_gdf = gpd.sjoin(ft_gdf, tracts_gdf, how="left", predicate="intersects")
    log_or_print(f"Joined {len(joined_gdf)} records", LOGGER)
    return joined_gdf

In [10]:
def read_geocoded_data(file_path):
    """Read the geocoded data from the specified file path."""
    log_or_print(f"Reading geocoded data from {file_path}", LOGGER)
    # read in the geocoded data to a GeoDataFrame, which has a geometry column
    df = pd.read_csv(gc_data)
    g = gpd.GeoSeries.from_wkt(df["geometry"])
    gdf = gpd.GeoDataFrame(df, geometry=g, crs="EPSG:4326")
    log_or_print(f"Read {len(df)} records from geocoded data", LOGGER)
    return gdf

In [11]:
def create_required_cols(df):
    """Create the required columns for geocoding."""
    log_or_print("Creating required columns for geocoding", LOGGER)

    required_columns = ["ADDR", "CITY", "STATE", "ZIP_CODE"]
    
    # Check if all required columns exist in the DataFrame
    missing_columns = [col for col in required_columns if col not in df.columns]
    if missing_columns:
        error_message = f"Missing required columns: {', '.join(missing_columns)}"
        log_or_print(error_message, LOGGER)
        raise ValueError(error_message)

    df = df.copy()
    df["FULL_ADDRESS"] = df["ADDR"] + ", " + df["CITY"] + ", " + df["STATE"] + " " + df["ZIP_CODE"].astype(str)

    log_or_print("Created FULL_ADDRESS column", LOGGER)
    return df

In [12]:
def batch_geocode_addresses(df, out_file_path, overwrite_append=None):
    """Batch geocode addresses using Google Maps Geocoding API.

    Function to batch geocode wraps dvutils.geospatial.google_geocode_batch.

    Args:
        df (pd.DataFrame): DataFrame containing the addresses to geocode.
        out_file_path (str): Path to write the geocoded data to.
        overwrite_append (str, optional): Whether to overwrite the existing geocoded data or append to it. Defaults to None.
            Valid values are: overwrite, append

    Returns:
        pd.DataFrame: DataFrame containing the geocoded addresses with the following columns:
            - address_orig: Original address
            - formatted_address: Formatted address
            - geometry_location_type: Location type of the geocoded address
            - types: Types of the geocoded address
            - partial_match: Whether the geocoded address is a partial match
            - geometry: Geometric information of the geocoded address
    """

    log_or_print(f"Starting batch address geocoding on {len(df)} records", LOGGER)

    # If out file exists, and overwrite_local is False, read the geocoded data
    if overwrite_append not in ["overwrite", "append"] and pathlib.Path(out_file_path).exists():
        log_or_print(
            f"Local geocoded data file exists at {out_file_path}. Reading local data file", LOGGER
        )
        results_df = read_geocoded_data(out_file_path)
        log_or_print(f"Read {len(results_df)} records from geocoded data", LOGGER)
        return results_df
    
    # check if fastrak data has a geocoded match already, if value is not null, skip geocoding
    if "match" in df.columns:
        df = df[df["match"].isnull()]
        log_or_print(f"Starting geocode on {len(df)} records that do not have a geocode match", LOGGER)

    out_cols = [
        "address_orig",
        "formatted_address",
        "geometry_location_type",
        "types",
        "partial_match",
        "geometry",
    ]

    try:
        results_df = google_geocode_batch(
            address_list=df["FULL_ADDRESS"].tolist(),
            include_details=True,
            allowed_location_types=["ROOFTOP", "RANGE_INTERPOLATED"],
        )
        log_or_print(f"Finished batch address geocoding. {len(results_df)} geocoded", LOGGER)
    except Exception as e:
        log_or_print(f"Error during batch geocoding: {e}", LOGGER)
        raise

    # check for bad results by checking if bad_addresses.txt file exists
    bad_address_file = "bad_addresses.txt"
    if pathlib.Path(bad_address_file).exists():
        log_or_print(f"Bad addresses file found at {bad_address_file}", LOGGER)

        try:
            with open(bad_address_file, "r") as f:
                bad_addresses = f.read().splitlines()
            log_or_print(f"Found {len(bad_addresses)} bad addresses", LOGGER)
        except Exception as e:
            log_or_print(f"Error reading bad addresses file: {e}", LOGGER)
            raise

# write the geocoded data based on the overwrite_append parameter
    if overwrite_append == "append":
        if pathlib.Path(out_file_path).exists():
            log_or_print(f"Appending data to existing file at {out_file_path}", LOGGER)
            existing_df = read_geocoded_data(out_file_path)
            combined_df = pd.concat([existing_df, results_df[out_cols]], ignore_index=True)
            write_geocoded_data(combined_df, out_file_path, mode="w", header=True)
        else:
            log_or_print(f"File does not exist. Creating new file at {out_file_path}", LOGGER)
            write_geocoded_data(results_df[out_cols], out_file_path, mode="w", header=True)
    elif overwrite_append == "overwrite":
        log_or_print(f"Overwriting existing file at {out_file_path}", LOGGER)
        write_geocoded_data(results_df[out_cols], out_file_path, mode="w", header=True)
    
    return combined_df if overwrite_append == "append" else results_df[out_cols]

In [13]:
# join the EPC data
def join_epc_data(joined_gdf, epc_gdf, epc_cols=["geoid", "county_fip", "epc_2050p"]):
    """Join the EPC data to the joined geocoded data."""
    log_or_print(
        f"Joining the sjoin tract and Fastrak data to EPC data. Sjoin Fastrak data: {len(joined_gdf)} EPC data: {len(epc_gdf)}",
        LOGGER,
    )
    # rename columns to avoid conflicts
    epc_gdf = epc_gdf.rename(columns={"tract_geoid": "geoid"})
    joined_gdf = pd.merge(
        joined_gdf, epc_gdf[epc_cols], on="geoid", how="left"
    )
    log_or_print(f"Joined {len(joined_gdf)} records", LOGGER)
    return joined_gdf

In [14]:
# join to the original ft data
def join_original_data(joined_gdf, ft_data):
    """Join the original Fastrak data to the joined data."""
    
    # check fastrak columns
    req_cols = [
        "Is Duplicate Row?",
        "Table Names",
        "F4",
        "ACCTNO",
        "ACCTTYPE",
        "ACCTSTATUS",
        "ADDR",
        "CITY",
        "STATE",
        "ZIP_CODE",
        "ADDRESS",
        "TX_DATE",
        "PLAZA",
        "TOLLCNT",
        "VEHCOUNT",
        "TAGCOUNT",
        "Jurisdiction",
        "NAMELSAD20",
        "FULL_ADDRESS",
    ]

    # drop all columns that are not in the required columns
    log_or_print(f"Dropping columns that are not in the required columns list {req_cols}", LOGGER)
    ft_data = ft_data[req_cols].copy()

    log_or_print(
        f"Joining the original Fastrak data to the joined data. Joined data: {len(joined_gdf)} Original Fastrak data: {len(ft_data)}",
        LOGGER,
    )
    # rename ft data address column
    ft_data = ft_data.rename(columns={"FULL_ADDRESS": "address_orig"})
    joined_gdf = pd.merge(joined_gdf, ft_data, on="address_orig", how="right")
    log_or_print(f"Joined {len(joined_gdf)} records", LOGGER)
    return joined_gdf

In [15]:
# final geocoding post processing

def geocode_post_processing(gdf):
    """Post processing for geocoded data."""

    gdf = gdf.copy()
    # classify geocode accuracy
    log_or_print(
        "Flagging matches. Only a match if geometry_location_type in: [ROOFTOP, 'RANGE_INTERPOLATED] and partial_match = False",
        LOGGER,
    )
    gdf["match"] = (
        (gdf["geometry_location_type"].isin(["ROOFTOP", "RANGE_INTERPOLATED"]))
        & (gdf["partial_match"].isnull())
    ).astype(int)
    # log true/false counts
    log_or_print(f"Flagged {gdf['match'].value_counts().to_dict()} records as matches", LOGGER)

    # Flag data within the region
    log_or_print("Flagging data within the region", LOGGER)
    gdf["in_region"] = (gdf["index_right"].notnull()).astype(int)
    log_or_print(
        f"Flagged {gdf['in_region'].value_counts().to_dict()} records within the region", LOGGER
    )

    # drop records that are not matches and not in region
    drop_records = gdf[(gdf["match"] == 0) & (gdf["in_region"] == 0)]
    log_or_print(f"Dropping records that are not matches and not in region. Drop count: {len(drop_records)}", LOGGER)
    gdf = gdf.drop(drop_records.index)

    # drop unnecessary columns
    drop_cols = ["geometry_location_type", "types", "partial_match", "index_right"]
    log_or_print(f"Dropping unnecessary columns: {drop_cols}", LOGGER)
    gdf = gdf.drop(columns=drop_cols)

    return gdf

In [16]:
def summarize_geocoding_results(gdf):

    gdf = gdf.copy()
    # Summarize the results for epc_data
    log_or_print("Summarizing the results for epc_data", LOGGER)
    gdf["epc_data"] = gdf["epc_2050p"].map({0: "Outside EPC", 1: "Within EPC"})
    epc_summary = gdf.groupby("epc_data").size().reset_index(name="count")
    epc_pv = epc_summary.pivot_table(index=None, columns="epc_data", values="count").reset_index(
        drop=True
    )

    # Summarize the results for in_region
    gdf["in_region_data"] = gdf["in_region"].map(
        {0: "Outside Region", 1: "Within Region"}
    )
    region_summary = gdf.groupby("in_region_data").size().reset_index(name="count")
    region_pv = region_summary.pivot_table(
        index=None, columns="in_region_data", values="count"
    ).reset_index(drop=True)

    # Summarize the results for matched
    gdf["matched_data"] = gdf["match"].map({0: "Not Matched", 1: "Matched"})
    matched_summary = gdf.groupby("matched_data").size().reset_index(name="count")
    matched_pv = matched_summary.pivot_table(
        index=None, columns="matched_data", values="count"
    ).reset_index(drop=True)

    # Concatenate all summaries into a single DataFrame
    summary_df = pd.concat([epc_pv, region_pv, matched_pv], axis=1)

    # Calculate within epc share, within region share, and matched share
    summary_df["Within EPC Share"] = round(summary_df["Within EPC"] / gdf.shape[0], 3)
    summary_df["Within Region Share"] = round(summary_df["Within Region"] / gdf.shape[0], 3)
    summary_df["Matched Share"] = round(summary_df["Matched"] / gdf.shape[0], 3)

    # Reorder columns
    col_order = [
        "Outside EPC",
        "Within EPC",
        "Within EPC Share",
        "Outside Region",
        "Within Region",
        "Within Region Share",
        "Not Matched",
        "Matched",
        "Matched Share",
    ]
    summary_df = summary_df[col_order]

    # Format the DataFrame to show thousands separator
    formatted_df = summary_df.style.format(
        {
            "Outside EPC": "{:,.0f}",
            "Within EPC": "{:,.0f}",
            "Within EPC Share": "{:.2%}",
            "Outside Region": "{:,.0f}",
            "Within Region": "{:,.0f}",
            "Within Region Share": "{:.2%}",
            "Not Matched": "{:,.0f}",
            "Matched": "{:,.0f}",
            "Matched Share": "{:.2%}",
        }
    )
    log_or_print("Summarized geocoding results", LOGGER)
    return formatted_df

In [17]:
# final post processing

def final_post_processing(gdf):
    """Final post processing for the joined data."""
    
    # update epc_2050p, match, and in_region columns so they are not null
    cols = ["epc_2050p", "match", "in_region"]
    log_or_print(f"Updating columns {cols} to not null", LOGGER)
    gdf[cols] = gdf[cols].fillna(0).astype(int)

    # move geometry column to the end
    log_or_print("Moving geometry column to the end", LOGGER)
    gdf = gdf[[col for col in gdf.columns if col != "geometry"] + ["geometry"]].copy()

    # provide log summary statistics
    log_or_print(f"Fastrak data geocoded results: {gdf['match'].value_counts().to_dict()}", LOGGER)
    log_or_print(f"Fastrak data in region results: {gdf['in_region'].value_counts().to_dict()}", LOGGER)
    log_or_print(f"Fastrak data EPC results: {gdf['epc_2050p'].value_counts().to_dict()}", LOGGER)

    return gdf

In [18]:
# write final data to geojson and csv

def write_final_data(gdf, out_path_csv, out_path_geojson):
    """Write the final data to the specified file path."""
    # write the final geocoded data to geojson file
    log_or_print(f"Writing final geocoded data to {out_path_geojson} excluding records without geometry.", LOGGER)
    geom_gdf = gdf[gdf["geometry"].notnull()].copy()
    geom_gdf.to_file(out_path_geojson, driver="GeoJSON")
    log_or_print(f"Wrote {len(geom_gdf)} records to {out_path_geojson}", LOGGER)

    # write the final geocoded data to csv file
    log_or_print(f"Writing final geocoded data to {out_path_csv} including all records", LOGGER)
    # remove geometry column
    log_or_print("Removing geometry column", LOGGER)
    tabular_df = gdf.drop(columns="geometry").copy()
    tabular_df.to_csv(out_path_csv, index=False)
    log_or_print(f"Wrote {len(gdf)} records to {out_path_csv}", LOGGER)

In [19]:
def write_summary_data(df, out_path):
    """Write the summary data to the specified file path."""
    # write the summary to an Excel file with the same name as the output file, retaining the df style formatting
    log_or_print(f"Writing summary to {out_path}", LOGGER)
    df.to_excel(out_path, index=False)
    log_or_print(f"Wrote summary to {out_path}", LOGGER)


In [20]:
def write_aggregate_data(gdf, out_path_csv, out_path_geojson):
    """Write the aggregated data to the specified file path."""
    # write the final geocoded data to csv file
    log_or_print(f"Writing aggregated data to {out_path_geojson} of length {len(gdf)}", LOGGER)
    gdf.to_file(out_path_geojson, driver="GeoJSON")
    log_or_print(f"Wrote {len(gdf)} records to {out_path_geojson}", LOGGER)

    # write the final geocoded data to csv file
    log_or_print(f"Writing aggregated data to {out_path_csv} of length {len(gdf)}", LOGGER)
    # remove geometry column
    tabular_df = gdf.drop(columns="geometry").copy()
    tabular_df.to_csv(out_path_csv, index=False)
  

In [21]:
# create a function to aggregate the gdf to the tract level
def aggregate_to_tract(gdf, epc_gdf):
    """Aggregate the geocoded data to the tract level."""
    gdf = gdf.copy()
    log_or_print("Aggregating geocoded data to the tract level", LOGGER)

    # filter out records with match = 0 and in_region = 0 and null geometry
    log_or_print("Filtering out records with match = 0 and in_region = 0 and null geometry", LOGGER)
    gdf = gdf[(gdf["match"] == 1) & (gdf["in_region"] == 1) & gdf["geometry"].notnull()]

    # group by tract and sum the match, in_region, and epc_2050p columns
    tract_gdf = (
        gdf.groupby(["geoid", "epc_2050p", "match", "in_region"]).size().reset_index(name="count")
    )
    log_or_print(f"Aggregated to {len(tract_gdf)} records", LOGGER)

    # join the EPC data on geometry column only to get the EPC data
    epc_gdf = epc_gdf.rename(columns={"tract_geoid": "geoid"})
    log_or_print(
        f"Joining aggregated data to EPC geodata: {len(tract_gdf)} EPC data: {len(epc_gdf)}", LOGGER
    )
    tract_gdf = pd.merge(
        epc_gdf[["geoid", "county_fip", "geometry"]], tract_gdf, on="geoid", how="left"
    )

    return tract_gdf

In [22]:
def main():
    # read in the data
    ft_df = read_fastrak_data(ft_data)

    # create the required columns
    ft_df = create_required_cols(ft_df)

    # drop duplicated addresses
    log_or_print(
        f"Dropping {ft_df.duplicated(subset=['FULL_ADDRESS']).sum()} duplicated addresses", LOGGER
    )
    ft_dedup_df = ft_df.drop_duplicates(subset=["FULL_ADDRESS"])

    # geocode the addresses
    results_gdf = batch_geocode_addresses(
        df=ft_dedup_df, out_file_path=gc_data, overwrite_append=None
    )

    # read the tract data
    tract_gdf = read_tract_data(tract_data)

    # read the EPC data
    epc_gdf = read_epc_data(epc_data)

    # spatially join the geocoded data with the tract data
    joined_gdf = sjoin_geocoded_data(results_gdf, tract_gdf[["geoid", "geometry"]])

    # join the EPC data
    joined_gdf = join_epc_data(joined_gdf, epc_gdf)

    # geocode post processing
    joined_gdf = geocode_post_processing(joined_gdf)

    # drop duplicates from joined_gdf
    log_or_print("Dropping duplicates from geocoded data", LOGGER)
    joined_gdf = joined_gdf.drop_duplicates(subset=["address_orig"], keep="first")

    # join to the original ft data
    final_gdf = join_original_data(joined_gdf, ft_df)

    # final post processing
    final_gdf = final_post_processing(final_gdf)

    # write the final data to geojson and csv
    write_final_data(final_gdf, final_gc_data_csv, final_gc_data_geojson)

    # summarize the geocoding results
    summary_df = summarize_geocoding_results(final_gdf)

    # write summary data
    write_summary_data(summary_df, summary_data_xlsx)

    # aggregate to the tract level
    tract_gdf = aggregate_to_tract(final_gdf, epc_gdf)

    # write the aggregated data
    write_aggregate_data(tract_gdf, final_aggregated_data_csv, final_aggregated_data_geojson)


if __name__ == "__main__":
    final_gdf = main()

  df = pd.read_csv(file_path)
