In [1]:
import geopandas as gp
import shapely
from shapely.geometry import Point
from shapely.ops import transform
import pyproj
from collections import Counter
import arcpy

In [2]:
from lrs_tools import gp_lrs
from lrs_tools import arcpy_lrs

In [3]:
import logging
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG) # Set the debug level here
fileHandler = logging.FileHandler(f'test.log', mode='w')
log.addHandler(fileHandler)

In [4]:
import importlib
importlib.reload(gp_lrs)
importlib.reload(arcpy_lrs)

<module 'lrs_tools.arcpy_lrs' from 'c:\\Users\\daniel.fourquet\\Documents\\Tasks\\TMC Conflation 2025\\NPMRDS\\lrs_tools\\arcpy_lrs.py'>

In [5]:
# Load lrs to an LRS object
lrs_path = r'C:\Users\daniel.fourquet\Documents\Tasks\TMC Conflation 2025\Data\LRS_MASTER_RICHMOND.shp'

# Filter to only include NB and EB routes, excluding PA and Interstates.  Exclude ramps
lrs = gp_lrs.LRS(lrs_path, filter=True, ramps=0)

# Load lrs overlap to an LRS object
lrs_overlap_path = r'C:\Users\daniel.fourquet\Documents\Tasks\TMC Conflation 2025\Data\LRS_OVERLAP_RICHMOND.shp'

# Filter to only include NB and EB routes, excluding PA and Interstates.  Exclude ramps
lrs_overlap = gp_lrs.LRS(lrs_overlap_path, filter=True, ramps=0)


In [29]:
# Load TMCs to a GeoDataFrame
tmcs_path = r'C:\Users\daniel.fourquet\Documents\Tasks\TMC Conflation 2025\NPMRDS\data\NPMRDS_medium_test.shp'
tmcs = gp.read_file(tmcs_path)
tmcs = tmcs[['Tmc', 'RoadNumber', 'RoadName', 'TmcLinear', 'geometry']]
tmcs = tmcs.to_crs(epsg=3968)

# Calculate begin, end, and mid-points
tmcs['begin_point'] = shapely.get_point(tmcs['geometry'], 0)
tmcs['end_point'] = shapely.get_point(tmcs['geometry'], -1)
def get_midpoint(geom):
    return geom.interpolate(0.5, normalized=True)
tmcs['mid_point'] = tmcs.geometry.apply(get_midpoint)

# Get TMCs dissolved by lineartmc
linear_tmcs = tmcs.dissolve(['TmcLinear', 'RoadName']).reset_index()
linear_tmcs['geometry'] = shapely.line_merge(linear_tmcs.geometry)
linear_tmcs = linear_tmcs.explode()
def interpolate(geom, pct):
    return geom.interpolate(pct, normalized=True)
linear_tmcs['begin_point'] = linear_tmcs.geometry.apply(interpolate, pct=0)
linear_tmcs['mid_point'] = linear_tmcs.geometry.apply(interpolate, pct=0.5)
linear_tmcs['end_point'] = linear_tmcs.geometry.apply(interpolate, pct=1)


  if s.type.startswith("Multi") or s.type == "GeometryCollection":


In [30]:
# Get most common routes near linear tmcs.  Only return results with one match
def get_route_list(tmc, test_point_count=None, distance=15, point_threshold=3, lrs=lrs):
    log.debug(f'\n{tmc}')
    
    if not test_point_count:  # Use begin, mid, and end points if points not supplied
        points = (tmc['begin_point'], tmc['mid_point'], tmc['end_point'])
    else:
        points = [tmc['geometry'].interpolate(x/10, normalized=True) for x in range(test_point_count)]
    
    log.debug('points:')
    for point in points:
        log.debug(f'\t{point}')
    routes = Counter()
    for point in points:
        nearby_routes = gp_lrs.get_nearby_routes(point, distance, lrs)
        routes.update(nearby_routes)

    # Check for no matches
    if len(routes) == 0:
        log.debug('\tNo matches.  Returning None')
        return None

    log.debug(f'\tRoutes:  {routes}')

    # If the number of points that match with a single route is equal
    # to the point_threshold, it is likely a match.  Otherwise a  more
    # precise method is needed.
    most_common_count = routes.most_common(1)[0][1]
    if most_common_count < point_threshold:
        log.debug('\tMost common below threshold.  Returning None')
        return None
    
    most_common_routes = routes.most_common()

    # Find the maximum count
    max_count = most_common_routes[0][1]

    # Filter for elements with the maximum count
    result = [element for element, count in most_common_routes if count == max_count]

    if len(result) == 1:
        log.debug(f'Returning {result[0]}')
        return result[0]
    
    return None


In [31]:

# Get most common routes
log.debug(f'===\nLinear TMC IDs - get_route_list\n===')
linear_tmcs['rte_nm_match'] = linear_tmcs.apply(get_route_list, axis=1)

# Remove null values.  These will need to be matched at the tmc level
linear_tmcs = linear_tmcs.loc[linear_tmcs['rte_nm_match'].notnull()].copy()

# Reduce to only needed columns to join back to the tmc dataframe
linear_tmcs = linear_tmcs.reset_index()[['TmcLinear', 'RoadName', 'rte_nm_match']].drop_duplicates()

# Remove any linear tmc id that appears more than once
linear_tmcs = linear_tmcs[~linear_tmcs.duplicated(['TmcLinear', 'RoadName'], keep=False)]

In [32]:
# Join tmcs and linear_tmcs by TmcLinear
tmcs = tmcs.merge(linear_tmcs, how='left', on=['TmcLinear', 'RoadName'])

In [33]:
print(f"Remaining null values: {len(tmcs.loc[tmcs['rte_nm_match'].isnull()])}")

Remaining null values: 68


In [34]:
# Match by TMC, but only on tmcs where rte_nm_match is null
tmcs_no_match = tmcs.loc[tmcs['rte_nm_match'].isnull()].copy()

log.debug(f'===\TMCs - get_route_list\n===')
tmcs_no_match['rte_nm_match'] = tmcs_no_match.apply(get_route_list, axis=1)
tmcs_no_match = tmcs_no_match[['Tmc', 'rte_nm_match']]

# Join back to main tmcs
tmcs = tmcs.merge(tmcs_no_match, how='left', on='Tmc', suffixes=('', '_new')).fillna(tmcs_no_match)
tmcs.loc[tmcs['rte_nm_match'].isnull(), 'rte_nm_match'] = tmcs['rte_nm_match_new']
tmcs.drop(columns='rte_nm_match_new', axis=1, inplace=True)

print(f"Remaining null values: {len(tmcs.loc[tmcs['rte_nm_match'].isnull()])}")

Remaining null values: 21


In [35]:
# Match by TMC again with a lower search radius, but only on tmcs where rte_nm_match is null
tmcs_no_match = tmcs.loc[tmcs['rte_nm_match'].isnull()].copy()

log.debug(f'===\TMCs - get_route_list - low search radius\n===')
tmcs_no_match['rte_nm_match'] = tmcs_no_match.apply(get_route_list, distance=5, axis=1)
tmcs_no_match = tmcs_no_match[['Tmc', 'rte_nm_match']]

# Join back to main tmcs
tmcs = tmcs.merge(tmcs_no_match, how='left', on='Tmc', suffixes=('', '_new')).fillna(tmcs_no_match)
tmcs.loc[tmcs['rte_nm_match'].isnull(), 'rte_nm_match'] = tmcs['rte_nm_match_new']
tmcs.drop(columns='rte_nm_match_new', axis=1, inplace=True)

print(f"Remaining null values: {len(tmcs.loc[tmcs['rte_nm_match'].isnull()])}")

Remaining null values: 20


In [36]:
# Match by TMC again with a higher search radius, but only on tmcs where rte_nm_match is null
tmcs_no_match = tmcs.loc[tmcs['rte_nm_match'].isnull()].copy()

log.debug(f'===\TMCs - get_route_list - low search radius\n===')
tmcs_no_match['rte_nm_match'] = tmcs_no_match.apply(get_route_list, distance=25, axis=1)
tmcs_no_match = tmcs_no_match[['Tmc', 'rte_nm_match']]

# Join back to main tmcs
tmcs = tmcs.merge(tmcs_no_match, how='left', on='Tmc', suffixes=('', '_new')).fillna(tmcs_no_match)
tmcs.loc[tmcs['rte_nm_match'].isnull(), 'rte_nm_match'] = tmcs['rte_nm_match_new']
tmcs.drop(columns='rte_nm_match_new', axis=1, inplace=True)

print(f"Remaining null values: {len(tmcs.loc[tmcs['rte_nm_match'].isnull()])}")

Remaining null values: 14


In [37]:
# Match by TMC again, but using the overlap lrs instead
tmcs_no_match = tmcs.loc[tmcs['rte_nm_match'].isnull()].copy()

log.debug(f'===\TMCs - get_route_list - overlap LRS\n===')
tmcs_no_match['rte_nm_match'] = tmcs_no_match.apply(get_route_list, distance=25, lrs=lrs_overlap, axis=1)
tmcs_no_match = tmcs_no_match[['Tmc', 'rte_nm_match']]

# Join back to main tmcs
tmcs = tmcs.merge(tmcs_no_match, how='left', on='Tmc', suffixes=('', '_new')).fillna(tmcs_no_match)
tmcs.loc[tmcs['rte_nm_match'].isnull(), 'rte_nm_match'] = tmcs['rte_nm_match_new']
tmcs.drop(columns='rte_nm_match_new', axis=1, inplace=True)

print(f"Remaining null values: {len(tmcs.loc[tmcs['rte_nm_match'].isnull()])}")

Remaining null values: 11


In [38]:
# Match by TMC again, but using 5 points along the length of the tmc
# instead of begin, mid, and end points
tmcs_no_match = tmcs.loc[tmcs['rte_nm_match'].isnull()].copy()

log.debug(f'===\TMCs - get_route_list - 5 points\n===')
tmcs_no_match['rte_nm_match'] = tmcs_no_match.apply(get_route_list, test_point_count=5, point_threshold=4, axis=1)
tmcs_no_match = tmcs_no_match[['Tmc', 'rte_nm_match']]

# Join back to main tmcs
tmcs = tmcs.merge(tmcs_no_match, how='left', on='Tmc', suffixes=('', '_new')).fillna(tmcs_no_match)
tmcs.loc[tmcs['rte_nm_match'].isnull(), 'rte_nm_match'] = tmcs['rte_nm_match_new']
tmcs.drop(columns='rte_nm_match_new', axis=1, inplace=True)

print(f"Remaining null values: {len(tmcs.loc[tmcs['rte_nm_match'].isnull()])}")

Remaining null values: 3


In [39]:
tmcs.loc[tmcs['rte_nm_match'].isnull()]

Unnamed: 0,Tmc,RoadNumber,RoadName,TmcLinear,geometry,begin_point,end_point,mid_point,rte_nm_match
228,110N04846,195.0,I-195 S,124,"LINESTRING (178168.742 174152.284, 178184.931 ...",POINT (178168.742 174152.284),POINT (178424.828 173918.117),POINT (178287.166 174024.355),
270,110N18081,10.0,BROAD ROCK BLVD,1658,"LINESTRING (178747.817 166577.194, 178738.382 ...",POINT (178747.817 166577.194),POINT (178738.382 166563.669),POINT (178743.100 166570.431),
325,110P15764,,74A,15763,"LINESTRING (182913.002 172315.720, 182906.629 ...",POINT (182913.002 172315.720),POINT (182738.892 172156.430),POINT (182869.714 172179.768),


In [40]:
lrs.geodataframe.loc[lrs.geodataframe['RTE_NM'] == 'R-VA   SR00197EB']

Unnamed: 0,LRM_CURREN,RTE_NM,RTE_COMMON,RTE_CATEGO,RTE_JURIS_,RTE_JURIS1,RTE_NBR,RTE_TYPE_C,RTE_TYPE_N,RTE_TYPE_M,...,RTE_SOUR_1,RTE_RAMP_D,RTE_DSC,RTE_MEASUR,ROUTESYSID,CHANGE_STA,CHANGE_S_1,LEN,SHAPE_Leng,geometry
459,2022-11-18,R-VA SR00197EB,VA-197E,State Highway Primary,,,197,SR,State Route,,...,SR00197,,HTRIS Route ID: < SR00197 >,OSM,SR00197,CL,2023-01-27,0.0,7244.429378,"MULTILINESTRING ((177130.443 174935.448, 17721..."
