# Interpolate elevation data from the spline fit

In [32]:
from shapely.ops import nearest_points, LineString, Point
from pathlib import Path
import geopandas as gpd
import rasterio

import matplotlib.pyplot as plt
import numpy as np

In [33]:
import json
config = json.load((Path.cwd().parent / 'config.json').open('rb'))
network_fp = Path(config['project_directory']) / 'OSM_Download'
export_fp = Path(config['project_directory']) / 'Network'
if network_fp.exists() == False:
    network_fp.mkdir()
config

{'code_directory': 'C:/Users/tpassmore6/Documents/GitHub/BikewaySimDev',
 'project_directory': 'D:/PROJECTS/GDOT',
 'studyarea': 'D:/RAW/Study_Areas/itp.gpkg',
 'geofabrik_fp': 'D:/RAW/OSM/Geofabrik_GA_Extracts',
 'here_fp': 'D:/RAW/HERE/Streets.shp',
 'abm_fp': 'D:/RAW/ARC/rtp_amd6_2030_network.gdb',
 'projected_crs_epsg': 'epsg:2240',
 'geofabrik_year': '2023',
 'usgs': 'D:/RAW/USGS',
 'maptilerapikey': 'rJ9yamTFh9tXOj4PAxqv'}

In [34]:
#for storing the interpolated points with sampled elevation data
import pickle
with (export_fp/'spline_fit_elevation.pkl').open('rb') as fh:
    interpolated_points_dict = pickle.load(fh)

In [35]:
raw_links = gpd.read_file(network_fp / f"osm_{config['geofabrik_year']}.gpkg",layer="raw")
#set the osmid as the index
raw_links.set_index('osmid',inplace=True)
#raw_links = raw_links[['oneway','geometry']]

In [36]:
links = gpd.read_file(export_fp/'networks.gpkg',layer='osm_links')

In [37]:
prev_crs = links.crs

tiff_links = list((Path(config['usgs']) / 'dem_files').glob('*.tif'))

#open the first one to just get the crs
src = rasterio.open(tiff_links[0])
dem_crs = src.crs
src.close()

links.to_crs(dem_crs,inplace=True)
#nodes.to_crs(dem_crs,inplace=True)
raw_links.to_crs(dem_crs,inplace=True)

Example

In [38]:
# #select a link and try it
# linkid = 637636161
# link = links[links['osmid']==linkid].iloc[[0],:]

# #get osm line
# line = raw_links.loc[linkid,'geometry']#interpolated_points_dict[linkid]['geometry']
# line = np.array(line.coords)

# #get geo of start and end
# #or just use the included line to reduce memory?
# pointA = nodes[nodes['osm_N']==link['osm_A'].item()]
# pointB = nodes[nodes['osm_N']==link['osm_B'].item()]
# print(line)

In [39]:
# # Define the coordinates of two additional points
# point1 = (pointA.geometry.item().x,pointA.geometry.item().y)
# point2 = (pointB.geometry.item().x,pointB.geometry.item().y)
# print(point1,point2)

In [40]:
# # Plot the GeoDataFrame and the additional points
# fig, ax = plt.subplots()

# # plot the full link
# ax.plot(line[:,0],line[:,1], color='gray', label='full osm')

# link.plot(ax=ax, color='blue', label='osm segemnt')
# ax.plot(point1[0], point1[1], marker='o', color='red', markersize=10, label='Point 1')
# ax.plot(point2[0], point2[1], marker='o', color='green', markersize=10, label='Point 2')

# # Add labels to the additional points
# #ax.text(point1[0], point1[1], 'Point 1', fontsize=12, ha='right')
# #ax.text(point2[0], point2[1], 'Point 2', fontsize=12, ha='right')

# # Add legend and labels
# ax.legend()
# ax.set_xlabel('Longitude')
# ax.set_ylabel('Latitude')
# ax.set_title('GeoDataFrame with Additional Points')

# # Manually set limits to create a square aspect ratio
# min_x, max_x = ax.get_xlim()
# min_y, max_y = ax.get_ylim()
# width = max(max_x - min_x, max_y - min_y)
# center_x = (min_x + max_x) / 2
# center_y = (min_y + max_y) / 2
# ax.set_xlim(center_x - width / 2, center_x + width / 2)
# ax.set_ylim(center_y - width / 2, center_y + width / 2)

# plt.show()


In [41]:
# point1_geo = Point(point1)
# point2_geo = Point(point2)
# line_geo = LineString(line)

Find the distance of the shapepoint on each line

In [42]:
# from shapely import line_locate_point, equals_exact

# point1_dist = line_locate_point(LineString(line),Point(point1))
# point2_dist = line_locate_point(LineString(line),Point(point2))

# #scenario 1: last point intersects with early point on a line (line loops into itself)
# #so trim off the points before point 1
# if point1_dist >= point2_dist:
#     for first_i, point in enumerate(line):
#         if equals_exact(Point(point),Point(point1),tolerance=1):
#             break
#     new_line = line[first_i+1:]
#     point2_dist = line_locate_point(LineString(new_line),Point(point1))

# #scenario 2: first point intersect with last point on a line
# #so trim off the point at the end of the line
# if point1_dist >= point2_dist:
#     new_line = line[0:-1]
#     point1_dist = line_locate_point(LineString(new_line),Point(point1))
#     point2_dist = line_locate_point(LineString(line),Point(point1))
    
# if point1_dist >= point2_dist:
#     print('error')
# else:
#     print(np.round(point1_dist),np.round(point2_dist))

# Interpolate distance on line
For each link, we need the distance along the line of the start and end point to properly input into the fitted spline

For polygons where a way intersects with itself, we may need some more advanced logic utilizing the node sequence. 

If we have the node sequence, start with the first node given, and see if the second node can be found after that. If it can't then we know we need to reverse the direction, but can we think of an example where 

In [43]:
from shapely import line_locate_point, equals_exact
from shapely.ops import polygonize, unary_union
from itertools import product

interpolated_distance_dict = {}

for idx, row in links.iterrows():
    
    #network link attributes
    osmid = row['osmid']
    a = row['osm_A']
    b = row['osm_B']
    network_line = np.array(row['geometry'].coords)
    point1 = network_line[0]
    point2 = network_line[-1]

    #get raw osm attributes
    line = np.array(raw_links.loc[osmid,'geometry'].coords)
    points = [Point(x) for x in line]
    points = gpd.GeoSeries(points)
    distances = points.distance(points.shift(1)).cumsum().tolist()
    distances[0] = 0
    node_list = json.loads(raw_links.loc[osmid,'all_tags'])['@way_nodes']
    a_idx = node_list.index(a)
    b_idx = node_list.index(b)

    #polygon check
    poly_check = len(list(polygonize(unary_union(LineString(line))))) #> 0
    poly_check_network = len(list(polygonize(unary_union(LineString(network_line))))) #> 0

    method = 'none'
    reverse_geometry = False
    
    # scenario 1: simple line segment no loops
    if (poly_check == 0) & (poly_check_network == 0) & (a != b):
        method = 'simple'
        # step 1: check if network line is in the same direction as loop
        if (a_idx > b_idx):    
            reverse_geometry = True
            a = row['osm_B']
            b = row['osm_A']
            a_idx = node_list.index(a)
            b_idx = node_list.index(b)

        # step 2: get distance of start and end point along the line
        #point1_dist = line_locate_point(LineString(line),Point(point1))
        #point2_dist = line_locate_point(LineString(line),Point(point2))
        network_dist = distances[a_idx:b_idx+1]
        point1_dist = network_dist[0]
        point2_dist = network_dist[-1]

        # step 3: check to see if distance matches the network geometry
        difference = point2_dist - point1_dist
        distance_check = np.abs(difference - LineString(network_line).length) < 0.02
        if distance_check == False:
            print('error')

        segment = [point1_dist,point2_dist]
    
    # scenario 2: network segment is not a loop but the line is
    elif (poly_check > 0) & (poly_check_network == 0) & (a != b):
        method = 'simple loop'

        a_s = [idx for idx, val in enumerate(node_list) if val == a]
        b_s = [idx for idx, val in enumerate(node_list) if val == b]

        #get all possible combinations of A/B
        combinations = list(product(a_s,b_s))
        for a_idx, b_idx in combinations:
            # step 1: see if network line direction matches line
            if (a_idx > b_idx):
                reverse_geometry = True
                # a = row['osm_B']
                # b = row['osm_A']
                # a_idx = node_list.index(a)
                # b_idx = node_list.index(b)
                a_idx, b_idx = b_idx, a_idx

            # step 2: get distance of start and end point along the line
            # start = node_list.index(a)
            # end = node_list[start+1:].index(b) + start + 1
            network_dist = distances[a_idx:b_idx+1]
            point1_dist = network_dist[0]
            point2_dist = network_dist[-1]

            # step 3: check to see if distance matches the network geometry
            difference = point2_dist - point1_dist
            distance_check = np.abs(difference - LineString(network_line).length) < 0.02
            
            segment = [point1_dist,point2_dist]

            if distance_check == True:
                break

        # step 4: if it doesn't then it is likely that the network segment crosses the loop
        # we'll need multiple segments in that case
        if distance_check == False:
            method = 'complex loop'

            #reset the reversed geometry bit
            reverse_geometry = False
            a = row['osm_A']
            b = row['osm_B']
            a_idx = node_list.index(a)
            b_idx = node_list.index(b)

            # if the network line crosses the loop then b_idx will be larger
            if a_idx < b_idx:
                reverse_geometry = True
                a = row['osm_B']
                b = row['osm_A']
                a_idx = node_list.index(a)
                b_idx = node_list.index(b)

            # step 5: get the two segments
            first_segment = [0,distances[b_idx]]
            second_segment = [distances[a_idx],distances[-1]]

            # step 6: check distance
            difference = second_segment[-1] - second_segment[0] + first_segment[1]
            distance_check = np.abs(difference - LineString(network_line).length) < 0.02

            # if distance_check == False:
            #     print('error')
            segment = [first_segment,second_segment]

    else:
        method = 'none'
        segment = 999
        distance_check = False
    
    #finaly assemble the dict
    interpolated_distance_dict[idx] = {
    'method': method,
    'segment': segment,
    'reverse_geometry': reverse_geometry,
    'distance_check': distance_check,
    'poly_check': poly_check
    }

In [44]:
import pandas as pd
df = pd.DataFrame.from_dict(interpolated_distance_dict,orient='index')
interpolated = pd.merge(links,df,left_index=True,right_index=True)

#drop loops
interpolated = interpolated[interpolated['method']!='none']

In [45]:
#todo create a checkpoint here

In [46]:
#export['segment'] = export['segment'].astype(str)
#export['length_m'] = export.length
#export.to_file(Path.home()/'Downloads/scratch.gpkg',layer='all')

# Spline fit
We need elevation data between the two points to interpolate an elevation profile for the smaller links

In [47]:
from scipy.interpolate import splrep, splev, BSpline
import src.elevation_tools as elevation_tools

interpolate_dist_m = 1
new_elevations_dict = {}
spline_or_nah = []

for idx, row in interpolated.iterrows():
    #get ids
    osm_linkid = row['osm_linkid']
    osmid = row['osmid']

    #get osm elevation
    item = interpolated_points_dict[osmid]

    #retrieve the fitted spline if it exists
    spline = item.get('spline',0)
    
    #get segments
    segment = row['segment']
    
    #if no spline, do linear interpolatation
    if spline == 0:
        new_xs = item['distances']
        new_elevations = item['elevations']
        spline_or_nah.append(idx)

    elif isinstance(segment[0],list):
        #get x sequence
        new_xs_segment1 = np.arange(int(segment[0][0]),int(segment[0][1])+interpolate_dist_m,interpolate_dist_m)
        new_xs_segment2 = np.arange(int(segment[1][0]),int(segment[1][1])+interpolate_dist_m,interpolate_dist_m)

        #get new elevations
        new_elevations_segment1 = splev(new_xs_segment1,spline)
        new_elevations_segment2 = splev(new_xs_segment2,spline)

        #combine
        new_xs = np.arange(0,len(new_xs_segment1)+len(new_xs_segment2))
        new_elevations = np.hstack([new_elevations_segment2,new_elevations_segment1])

    else:
        #get new elevation values
        new_xs = np.arange(int(segment[0]),int(segment[1])+interpolate_dist_m,interpolate_dist_m)
        new_elevations = splev(new_xs, spline)
        #recalculate elevations stats
        #TODO if wanting to seperate grades by section use the elevation_stats function
    
    new_elevations_dict[idx] = elevation_tools.simple_elevation_stats(new_xs, new_elevations)


  ascent_grade = np.round(ascent / total_distance * 100,2)
  descent_grade = np.round(descent / total_distance * 100,2)


In [48]:
interpolated.loc[spline_or_nah,'spline_fit'] = False

In [49]:
df = pd.DataFrame.from_dict(new_elevations_dict,orient='index')
elevations_added = pd.merge(interpolated,df,left_index=True,right_index=True)

In [50]:
elevations_added['segment'] = elevations_added['segment'].astype(str)
elevations_added.to_file(export_fp/'elevation.gpkg')

In [51]:
# from scipy.interpolate import splrep, splev, BSpline

# # case 1: only one segment to deal with
# osm_linkid = 1125605250
# osmid = 751119047

# row = export[export['osm_linkid']==osm_linkid].squeeze()

# #get osm elevation
# item = interpolated_points_dict[osmid]

# #retrieve the fitted spline
# spline = item['spline']

# #get segments
# segment = row['segment']

# #get x sequence
# new_xs_segment1 = np.arange(int(segment[0][0]),int(segment[0][1])+interpolate_dist_m,interpolate_dist_m)
# new_xs_segment2 = np.arange(int(segment[1][0]),int(segment[1][1])+interpolate_dist_m,interpolate_dist_m)

# #get new elevations
# new_elevations_segment1 = splev(new_xs_segment1,spline)
# new_elevations_segment2 = splev(new_xs_segment2,spline)

# #combine
# new_xs = np.arange(0,len(new_xs_segment1)+len(new_xs_segment2))
# new_elevations = np.hstack([new_elevations_segment2,new_elevations_segment1])

# fig, ax = plt.subplots()
# ax.plot(item['distances'],item['elevations'],'-')
# ax.plot(new_xs_segment1,new_elevations_segment1,'-.')
# ax.plot(new_xs_segment2,new_elevations_segment2,'-.')

In [52]:
# # use fitted spline to from dict to create new values

# item = interpolated_points_dict[osmid]
# spline = interpolated_points_dict[osmid]['spline']

# interpolate_dist_m = 1
# new_xs = np.arange(int(point1_dist),int(point2_dist)+interpolate_dist_m,interpolate_dist_m)
# new_elevations = splev(new_xs, spline)

#new_grades = pd.Series(new_elevations).diff() #/ interpolate_dist_m * 100

In [53]:
# from importlib import reload
# import src.elevation_tools as elevation_tools
# reload(elevation_tools)
# elevation_tools.simple_elevation_stats(new_xs, new_elevations)
# fig, ax = plt.subplots()
# ax.plot(item['distances'],item['elevations'],'-')
# ax.plot(new_xs,new_elevations,'-.')

In [54]:
# linkid
# item = interpolated_points_dict[linkid]
# import numpy as np
# from scipy.interpolate import splrep, splev, BSpline

# spline = interpolated_points_dict[linkid]['spline']

# new_xs = np.arange(int(point1_dist),int(point2_dist)+10,10)
# new_ys = splev(new_xs, spline)
# new_ys
# fig, ax = plt.subplots()
# ax.plot(item['distances'],item['elevations'],'-')
# ax.plot(new_xs,new_ys,'-.')

Add elevation data to links (deprecated)

In [55]:
# grade_cats = {}

# for linkid, item in tqdm(interpolated_points_dict.items()):     

#     outputs = elevation_tools.elevation_stats(item['distances'],item['smoothed'],grade_threshold)

#     df = pd.DataFrame({
#         'distance_deltas':outputs['distance_deltas'],
#         'segment_grades':outputs['segment_grades']
#         })
#     #create bins (broach 2012 ones)
#     #'(-inf,-6]','(-6,-4]','(-4,-2]','(-2,0]',
#     #-np.inf,-6,-4,-2,
#     bins = [-1,2,4,6,10,15,np.inf]
#     names = ['(0,2]','(2,4]','(4,6]','(6,10]','(10,15]','(15,inf]']
#     df['grade_category'] = pd.cut(df['segment_grades'].abs(), bins, labels = names)

#     #determine if up or down (doesn't matter if flat)
#     df.loc[df['segment_grades'] >= 0,'ascent_or_descent'] = 'ascent'
#     df.loc[df['segment_grades'] < 0,'ascent_or_descent'] = 'descent'

#     # how many meters at each grade category?
#     test = df.groupby(['grade_category','ascent_or_descent'],observed=False)['distance_deltas'].sum().round(0)
#     test = pd.DataFrame(test).transpose()
#     test.index = [linkid]
#     grade_cats[linkid] = {
#         'ascent_m': outputs['ascent'],
#         'descent_m': outputs['descent'],
#         'ascent_grade': outputs['ascent_grade'],
#         'descent_grade': outputs['descent_grade']
#     }
#     grade_cats[linkid].update(
#         test.to_dict(orient='records')[0]
#     )
    
# #.items())#.reseorient='t_index()#.pivot(
#     # columns=['grade_category','up'],
#     # values='distance_deltas'
#     # )
# #grade_cats