In [4]:
from skilift import GTFS, TransitGraph, OpenStreetMap, get_elevations_for_nodes

In [5]:
import pandas as pd
from zipfile import ZipFile
from collections import defaultdict
from typing import Dict, Set
import numpy as np
pd.set_option('display.max_columns', 100)
pd.set_option('display.max_rows', 100)

In [6]:
fn = "/home/bmander/skilift_data/transit/redding.zip"

graph = TransitGraph.load(fn)

In [8]:
# get epoch time on Aptil 19, 2023 at 1:00pm pacific time
t0 = pd.Timestamp("2023-04-19 13:00:00", tz="America/Los_Angeles")
t0

Timestamp('2023-04-19 13:00:00-0700', tz='America/Los_Angeles')

In [9]:
node0 = graph.get_stop_node("Downtown Transit Center", t0)
node0

AtStopNode(stop_id:2000, datetime:2023-04-19 13:00:00-07:00)

In [10]:
node0.outgoing[0].node

DepartureNode(pattern_id:5, service_id:c_1658_b_18260_d_31, row:1, col:0, datetime:2023-04-19 14:25:00-07:00)

In [11]:
node0.outgoing[0].node.outgoing[0].node

ArrivalNode(pattern_id:5, service_id:c_1658_b_18260_d_31, row:1, col:1, datetime:2023-04-19 14:35:00-07:00)

In [12]:
node0.outgoing[0].node.outgoing[0].node.outgoing

[Edge(node=DepartureNode(pattern_id:5, service_id:c_1658_b_18260_d_31, row:1, col:1, datetime:2023-04-19 14:35:00-07:00), weight=0.0),
 Edge(node=AtStopNode(stop_id:8003, datetime:2023-04-19 14:35:00-07:00), weight=60.0)]

In [173]:
import osmium



In [174]:
import typing
class Way(typing.NamedTuple):
    nds: typing.List[int]
    tags: typing.Dict[str, str]

def read_osm(filename):
    """
    Read an OSM file and return a dictionary of nodes and ways.
    """
    nodes = {}
    ways = {}

    class NodeHandler(osmium.SimpleHandler):
        def __init__(self):
            super(NodeHandler, self).__init__()

        def node(self, n):
            nodes[n.id] = (n.location.lat, n.location.lon)

    class HighwayHandler(osmium.SimpleHandler):
        def __init__(self):
            super(HighwayHandler, self).__init__()

        def way(self, w):
            if 'highway' not in w.tags:
                return

            nds = [n.ref for n in w.nodes]
            tags = dict(w.tags)

            ways[w.id] = Way(nds, tags)

    h = HighwayHandler()
    h.apply_file(filename)

    n = NodeHandler()
    n.apply_file(filename)

    return nodes, ways

        

In [175]:
osm_filename = "/home/bmander/skilift_data/street/redding.pbf"

In [176]:
nodes, ways = read_osm(osm_filename)

In [177]:
from collections import Counter
def get_graph_nodes(ways: Dict[int, Dict]) -> Set[int]:
    """Get all the graph nodes from the ways. The graph nodes are the nodes
    that are used more than once, or they are the start or end node of a street.

    Args:
        ways (Dict): A dictionary of ways.

    Returns:
        Set: A set of graph nodes.
    """

    graph_nodes = set()

    node_count = Counter()
    for way in ways.values():
        if len(way.nds) < 2:
            continue

        # add the start and end nodes
        graph_nodes.add(way.nds[0])
        graph_nodes.add(way.nds[-1])

        # count the number of times a node appears
        node_count.update(way.nds)

    # get all nodes that appear more than once
    intersection_nodes = set(node for node, count in node_count.items() if count > 1)
    graph_nodes = graph_nodes.union(intersection_nodes)

    return graph_nodes

    


In [178]:
graph_nodes = get_graph_nodes(ways)

In [256]:
from math import floor
import rasterio as rio

def interpolate(A, i, j):
    """Bilinear interpolation.

    Args:
        A (np.ndarray): A 2x2 array of values.
        i (float): The fractional row.
        j (float): The fractional column.
    
    Returns:
        float: The interpolated value."""
    
    baseline = A[0]*(1-i) + A[1]*i
    return baseline[0]*(1-j) + baseline[1]*j

node_elevs = {}
with rio.open("/home/bmander/skilift_data/elevation/USGS_13_n41w123_20210624.tif") as src:

    print("Reading raster file...", end="")
    elevs = src.read(1) # read the first band
    print("done.")

    for node, (lat,lon) in nodes.items():
        # get the elevation at the node
        # apply the transform to get the fractional row and column
        row, col = src.index(lon, lat, op=lambda x:x)

        if row < 0 or col < 0 or row >= src.height or col >= src.width:
            node_elevs[node] = None
            continue
        
        # read a small window around the point
        row_floor = floor(row)
        col_floor = floor(col)
        elevs_window = elevs[row_floor:row_floor+2, col_floor:col_floor+2]

        # interpolate the elevation
        row_frac = row - row_floor
        col_frac = col - col_floor
        elev = interpolate(elevs_window, row_frac, col_frac)

        node_elevs[node] = elev


Reading raster file...done.
