In [2]:
# install deps
!pip install requests shapely geographiclib gpxpy

# # example: Helsinki → Tallinn
# python osm_ferry_path.py --from "Helsinki" --to "Tallinn" \
#   --geojson helsinki_tallinn.geojson --gpx helsinki_tallinn.gpx --print-info

Collecting gpxpy
  Downloading gpxpy-1.6.2-py3-none-any.whl.metadata (5.9 kB)
Downloading gpxpy-1.6.2-py3-none-any.whl (42 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.6/42.6 kB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: gpxpy
Successfully installed gpxpy-1.6.2


In [7]:
#!/usr/bin/env python3
"""
Fetch a reference vessel path (e.g., ferry lane) between two ports using OpenStreetMap.

Modified for Google Colab usage: instead of argparse, you directly set variables `PORT_FROM`, `PORT_TO`, etc.

"""
from __future__ import annotations
import json
import math
import time
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple

import requests
from shapely.geometry import LineString, Point, mapping
from shapely.ops import linemerge
from geographiclib.geodesic import Geodesic
import gpxpy
import gpxpy.gpx

USER_AGENT = "OSM-Ferry-Path/1.0 (contact: your_email@example.com)"
NOMINATIM = "https://nominatim.openstreetmap.org/search"
OVERPASS = "https://overpass-api.de/api/interpreter"

@dataclass
class Geocode:
    name: str
    lat: float
    lon: float
    bbox: Tuple[float, float, float, float]  # south, north, west, east

def geocode_place(q: str) -> Geocode:
    params = {"q": q, "format": "jsonv2", "limit": 1}
    r = requests.get(NOMINATIM, params=params, headers={"User-Agent": USER_AGENT}, timeout=30)
    r.raise_for_status()
    arr = r.json()
    if not arr:
        raise ValueError(f"No geocoding result for: {q}")
    it = arr[0]
    bbox = tuple(map(float, it["boundingbox"]))
    return Geocode(
        name=it.get("display_name", q),
        lat=float(it["lat"]),
        lon=float(it["lon"]),
        bbox=(bbox[0], bbox[1], bbox[2], bbox[3]),
    )

def bbox_union(a, b, pad_km=10):
    south = min(float(a[0]), float(b[0]))
    north = max(float(a[1]), float(b[1]))
    west = min(float(a[2]), float(b[2]))
    east = max(float(a[3]), float(b[3]))
    pad_deg = pad_km / 111.0
    return (south - pad_deg, north + pad_deg, west - pad_deg, east + pad_deg)

def overpass_query(bbox):
    s, n, w, e = bbox
    q = f"""
    [out:json][timeout:120];
    ( relation["route"="ferry"]({s},{w},{n},{e}); );
    (._;>;);
    out geom;
    """
    r = requests.post(OVERPASS, data={"data": q}, headers={"User-Agent": USER_AGENT}, timeout=180)
    r.raise_for_status()
    return r.json()

def haversine_km(a, b):
    R = 6371.0088
    lat1, lon1 = math.radians(a[0]), math.radians(a[1])
    lat2, lon2 = math.radians(b[0]), math.radians(b[1])
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    h = math.sin(dlat/2)**2 + math.cos(lat1)*math.cos(lat2)*math.sin(dlon/2)**2
    return 2 * R * math.asin(math.sqrt(h))

def collect_relations(osm):
    nodes, ways, rels = {}, {}, []
    for el in osm.get("elements", []):
        if el["type"] == "node":
            nodes[el["id"]] = el
        elif el["type"] == "way":
            ways[el["id"]] = el
        elif el["type"] == "relation" and el.get("tags", {}).get("route") == "ferry":
            rels.append(el)
    return rels, ways, nodes

def way_to_linestring(way):
    if "geometry" not in way:
        return None
    coords = [(pt["lon"], pt["lat"]) for pt in way["geometry"]]
    if len(coords) < 2:
        return None
    return LineString(coords)

def stitch_relation_geometry(rel, ways):
    lines = []
    for m in rel.get("members", []):
        if m.get("type") == "way" and m.get("ref") in ways:
            ls = way_to_linestring(ways[m["ref"]])
            if ls is not None:
                lines.append(ls)
    if not lines:
        return None
    merged = linemerge(lines)
    if isinstance(merged, LineString):
        return merged
    try:
        return max(list(merged), key=lambda l: l.length)
    except Exception:
        return None

def score_relation_for_ports(rel, geom, portA, portB):
    if geom is None:
        return -1e9, float("inf"), float("inf"), (False, False)
    tags = rel.get("tags", {})
    name = (tags.get("name") or "").lower()
    ref = (tags.get("ref") or "").lower()
    from_tag = (tags.get("from") or "").lower()
    to_tag = (tags.get("to") or "").lower()
    a = portA.name.lower().split(",")[0].strip()
    b = portB.name.lower().split(",")[0].strip()
    a_hit = a and (a in name or a in from_tag or a in to_tag or a in ref)
    b_hit = b and (b in name or b in from_tag or b in to_tag or b in ref)
    # endpoint heuristic distances
    start = (geom.coords[0][1], geom.coords[0][0])
    end = (geom.coords[-1][1], geom.coords[-1][0])
    d1 = min(haversine_km((portA.lat, portA.lon), start), haversine_km((portA.lat, portA.lon), end))
    d2 = min(haversine_km((portB.lat, portB.lon), start), haversine_km((portB.lat, portB.lon), end))
    score = - (d1 + d2) + (80 if (a_hit and b_hit) else 0) + (30 if (a_hit or b_hit) else 0)
    return score, d1, d2, (a_hit, b_hit)

def great_circle_polyline(a, b, npoints=200):
    geod = Geodesic.WGS84
    inv = geod.Inverse(a[0], a[1], b[0], b[1])
    l = geod.Line(a[0], a[1], inv["azi1"])
    s12 = inv["s12"]
    pts = []
    for i in range(npoints + 1):
        s = s12 * i / npoints
        pos = l.Position(s)
        pts.append((pos["lon2"], pos["lat2"]))
    return LineString(pts)

def fetch_reference_path(port_from, port_to, verbose=True):
    if verbose:
        print(f"Geocoding '{port_from}' and '{port_to}' ...")
    A = geocode_place(port_from)
    time.sleep(1)
    B = geocode_place(port_to)

    if verbose:
        print("Building search bbox and querying Overpass for ferry relations ...")
    bbox = bbox_union(A.bbox, B.bbox, pad_km=30)
    osm = overpass_query(bbox)
    rels, ways, nodes = collect_relations(osm)

    if verbose:
        print(f"Found {len(rels)} ferry relations in search area. Scoring...")

    candidates = []
    for rel in rels:
        geom = stitch_relation_geometry(rel, ways)
        sc, dA, dB, hits = score_relation_for_ports(rel, geom, A, B)
        candidates.append((sc, dA, dB, hits, rel, geom))

    # Prefer relations that pass near BOTH ports or whose tags explicitly hit both
    NEAR_THRESH_KM = 20.0
    strict = [c for c in candidates if c[5] is not None and ((c[3][0] and c[3][1]) or (c[1] <= NEAR_THRESH_KM and c[2] <= NEAR_THRESH_KM))]
    pool = strict if strict else [c for c in candidates if c[5] is not None]

    if not pool:
        if verbose:
            print("No ferry relations with geometry found. Falling back to great-circle path.")
        gc = great_circle_polyline((A.lat, A.lon), (B.lat, B.lon))
        meta = {"source": "great_circle_fallback", "from": A.__dict__, "to": B.__dict__}
        return gc, meta

    best = max(pool, key=lambda x: x[0])
    best_sc, dA, dB, hits, best_rel, best_geom = best

    # If still suspicious (far from one port), fall back
    if (dA > 50 and not hits[0]) or (dB > 50 and not hits[1]):
        if verbose:
            print("Top match seems unrelated to one port; using great-circle fallback.")
        gc = great_circle_polyline((A.lat, A.lon), (B.lat, B.lon))
        meta = {"source": "great_circle_fallback", "note": "no solid OSM relation matched both ports", "from": A.__dict__, "to": B.__dict__}
        return gc, meta

    if verbose:
        t = best_rel.get("tags", {})
        print("Selected relation:", t.get("name", f"relation/{best_rel['id']}"))
    meta = {
        "source": "osm_overpass",
        "relation_id": best_rel["id"],
        "relation_tags": best_rel.get("tags", {}),
        "from": A.__dict__,
        "to": B.__dict__,
        "near_port_km": {"from": round(dA,2), "to": round(dB,2)},
        "tag_hits": {"from": hits[0], "to": hits[1]},
    }
    return best_geom, meta

def to_geojson(linestring, meta):
    return {"type": "FeatureCollection", "features": [{"type": "Feature", "geometry": mapping(linestring), "properties": meta}]}

def to_gpx(linestring, name="Reference Route"):
    gpx = gpxpy.gpx.GPX()
    gpx_track = gpxpy.gpx.GPXTrack(name=name)
    gpx.tracks.append(gpx_track)
    seg = gpxpy.gpx.GPXTrackSegment()
    gpx_track.segments.append(seg)
    for x, y in linestring.coords:
        seg.points.append(gpxpy.gpx.GPXTrackPoint(latitude=y, longitude=x))
    return gpx

# --- Colab-friendly entry point ---

def run_colab_example(PORT_FROM="Helsinki", PORT_TO="Tallinn", SAVE_GEOJSON=None, SAVE_GPX=None, PRINT_INFO=True):
    route, meta = fetch_reference_path(PORT_FROM, PORT_TO, verbose=True)
    if SAVE_GEOJSON:
        with open(SAVE_GEOJSON, "w", encoding="utf-8") as f:
            json.dump(to_geojson(route, meta), f, ensure_ascii=False, indent=2)
        print(f"✓ Wrote GeoJSON to: {SAVE_GEOJSON}")
    if SAVE_GPX:
        gpx = to_gpx(route, name=f"{PORT_FROM} → {PORT_TO}")
        with open(SAVE_GPX, "w", encoding="utf-8") as f:
            f.write(gpx.to_xml())
        print(f"✓ Wrote GPX to: {SAVE_GPX}")
    if PRINT_INFO:
        print(json.dumps(meta, ensure_ascii=False, indent=2))


In [11]:
run_colab_example("Helsinki", "Göteborg",
                  SAVE_GEOJSON="helsinki_Göteborg.geojson",
                  SAVE_GPX="helsinki_Göteborg.gpx",
                  PRINT_INFO=True)

Geocoding 'Helsinki' and 'Göteborg' ...
Building search bbox and querying Overpass for ferry relations ...
Found 210 ferry relations in search area. Scoring...
Top match seems unrelated to one port; using great-circle fallback.
✓ Wrote GeoJSON to: helsinki_Göteborg.geojson
✓ Wrote GPX to: helsinki_Göteborg.gpx
{
  "source": "great_circle_fallback",
  "note": "no solid OSM relation matched both ports",
  "from": {
    "name": "Helsinki, Helsingin seutukunta, Uusimaa, Manner-Suomi, Suomi / Finland",
    "lat": 60.1698897,
    "lon": 24.9384719,
    "bbox": [
      59.922486,
      60.2978497,
      24.7828026,
      25.2545116
    ]
  },
  "to": {
    "name": "Göteborg, Göteborgs Stad, Västra Götalands län, 411 10, Sverige",
    "lat": 57.7072326,
    "lon": 11.9670171,
    "bbox": [
      57.5472326,
      57.8672326,
      11.8070171,
      12.1270171
    ]
  }
}


In [12]:
import folium
import gpxpy

# Load GPX
with open("helsinki_Göteborg.gpx", "r") as f:
    gpx = gpxpy.parse(f)

# Create a map centered on the first point
first_point = gpx.tracks[0].segments[0].points[0]
m = folium.Map(location=[first_point.latitude, first_point.longitude], zoom_start=7)

# Add the GPX line
coords = [(p.latitude, p.longitude) for p in gpx.tracks[0].segments[0].points]
folium.PolyLine(coords, color="blue", weight=3).add_to(m)

m


In [13]:
import folium
import json

# Load the GeoJSON
with open("helsinki_Göteborg.geojson", "r") as f:
    data = json.load(f)

# Center map on first coordinate
coords = data["features"][0]["geometry"]["coordinates"]
lon, lat = coords[0]  # GeoJSON stores [lon, lat]
m = folium.Map(location=[lat, lon], zoom_start=7)

# Add the route
folium.GeoJson(data, name="route").add_to(m)

m


In [16]:
%%writefile sea_only_path_colab.py
# (paste the full code I gave you here)

#!/usr/bin/env python3
"""
Sea‑only path between two ports (Google Colab ready)
---------------------------------------------------
This script computes a *water‑only* shortest path using a coarse hex/8-neighbor grid
and a land mask from Natural Earth "land" polygons (1:10m).

Why: Great‑circle lines can cut across land. This solver avoids land and
returns a navigable reference path over water (not official fairways).

Usage in Colab:
    !pip -q install geopandas shapely pyproj folium requests

    from sea_only_path_colab import run_colab_waterpath
    run_colab_waterpath("Helsinki", "Tallinn", grid_km=5,
                        SAVE_GEOJSON="helsinki_tallinn_water.geojson",
                        SHOW_MAP=True)

Notes:
- This is an *approximate* water path. For official routes (TSS/fairways), use ENCs/AIS.
- grid_km controls resolution (smaller = more accurate but slower).
"""
from __future__ import annotations
import math, json, io, zipfile, time
from dataclasses import dataclass
from typing import Tuple, List, Dict, Optional

import requests
import geopandas as gpd
from shapely.geometry import Point, LineString, Polygon
from shapely.strtree import STRtree
from shapely.ops import unary_union
from pyproj import Geod

USER_AGENT = "SeaOnlyPath/1.0 (contact: your_email@example.com)"
NOMINATIM = "https://nominatim.openstreetmap.org/search"
NE_LAND_ZIP = "https://naturalearth.s3.amazonaws.com/10m_physical/ne_10m_land.zip"

g = Geod(ellps="WGS84")

@dataclass
class Geocode:
    name: str
    lat: float
    lon: float
    bbox: Tuple[float, float, float, float]

def geocode_place(q: str) -> Geocode:
    r = requests.get(NOMINATIM, params={"q": q, "format": "jsonv2", "limit": 1}, headers={"User-Agent": USER_AGENT}, timeout=30)
    r.raise_for_status()
    arr = r.json()
    if not arr:
        raise ValueError(f"No geocoding result for: {q}")
    it = arr[0]
    bbox = tuple(map(float, it["boundingbox"]))
    return Geocode(it.get("display_name", q), float(it["lat"]), float(it["lon"]), (bbox[0], bbox[1], bbox[2], bbox[3]))

def download_land_polygons() -> gpd.GeoDataFrame:
    r = requests.get(NE_LAND_ZIP, headers={"User-Agent": USER_AGENT}, timeout=120)
    r.raise_for_status()
    z = zipfile.ZipFile(io.BytesIO(r.content))
    shp = [n for n in z.namelist() if n.endswith('.shp')][0]
    with z.open(shp) as f:
        # GeoPandas can read from a virtual file if we write all members; simpler: extract to memory FS
        z.extractall("./ne_land")
    gdf = gpd.read_file("./ne_land/" + shp.split('/')[-1])
    gdf = gdf.to_crs(4326)
    return gdf

def build_land_index(gdf: gpd.GeoDataFrame):
    geoms = list(gdf.geometry)
    # dissolve to fewer polygons to speed up
    merged = unary_union(geoms)
    if merged.geom_type == 'Polygon':
        geoms = [merged]
    elif merged.geom_type == 'MultiPolygon':
        geoms = list(merged.geoms)
    else:
        geoms = [merged]
    index = STRtree(geoms)
    return geoms, index

def hav_km(a: Tuple[float,float], b: Tuple[float,float]) -> float:
    az12, az21, dist = g.inv(a[1], a[0], b[1], b[0])
    return dist/1000.0

def bbox_union(a: Tuple[float,float,float,float], b: Tuple[float,float,float,float], pad_km=50) -> Tuple[float,float,float,float]:
    south = min(a[0], b[0]); north = max(a[1], b[1]); west = min(a[2], b[2]); east = max(a[3], b[3])
    pad_deg = pad_km / 111.0
    return (south - pad_deg, north + pad_deg, west - pad_deg, east + pad_deg)

def is_water(lat: float, lon: float, land_geoms, land_index) -> bool:
    pt = Point(lon, lat)
    # quick bbox hits
    for geom in land_index.query(pt):
        if geom.contains(pt):
            return False
    return True

def generate_grid(bbox, step_km=10.0) -> List[Tuple[float,float]]:
    s, n, w, e = bbox
    # approximate degrees per km at mid-lat
    midlat = (s+n)/2
    deg_lat = 1/110.574
    deg_lon = 1/(111.320*math.cos(math.radians(midlat)) + 1e-9)
    dlat = step_km*deg_lat
    dlon = step_km*deg_lon
    pts = []
    lat = s
    while lat <= n:
        lon = w
        while lon <= e:
            pts.append((lat, lon))
            lon += dlon
        lat += dlat
    return pts

from heapq import heappush, heappop

def astar(start: Tuple[float,float], goal: Tuple[float,float], nodes: List[Tuple[float,float]], is_blocked, step_km) -> List[Tuple[float,float]]:
    # make a dict index
    node_set = set(nodes)
    # 8-neighbor deltas in approx degree space scaled to step size
    # We'll reconstruct neighbors on the fly by snapping to grid
    def neighbors(lat, lon):
        # compute grid step in deg at local lat
        deg_lat = 1/110.574
        deg_lon = 1/(111.320*math.cos(math.radians(lat)) + 1e-9)
        dlat = step_km*deg_lat
        dlon = step_km*deg_lon
        for dy in (-dlat, 0, dlat):
            for dx in (-dlon, 0, dlon):
                if dx==0 and dy==0: continue
                yield (lat+dy, lon+dx)
    def h(a,b):
        return hav_km(a,b)
    # snap start/goal to nearest water node
    def nearest_water(pt):
        lat, lon = pt
        best = None; bestd = 1e18
        for q in nodes:
            d = hav_km(pt, q)
            if d < bestd and is_blocked(q)==False:
                best, bestd = q, d
        return best
    sN = nearest_water(start)
    gN = nearest_water(goal)
    if sN is None or gN is None:
        return []
    openh = []
    heappush(openh, (0+h(sN,gN), 0, sN, None))
    came = {}
    gscore = {sN: 0}
    closed = set()
    while openh:
        f, cost, curr, parent = heappop(openh)
        if curr in closed: continue
        came[curr] = parent
        if hav_km(curr, gN) <= step_km*1.5:
            # reached vicinity
            came[gN] = curr
            break
        closed.add(curr)
        lat, lon = curr
        for nb in neighbors(lat, lon):
            if is_blocked(nb):
                continue
            ng = cost + hav_km(curr, nb)
            if nb not in gscore or ng < gscore[nb]:
                gscore[nb] = ng
                heappush(openh, (ng + h(nb, gN), ng, nb, curr))
    # reconstruct
    if gN not in came:
        return []
    path = []
    node = gN
    while node is not None:
        path.append(node)
        node = came[node]
    path.reverse()
    return path

# Public entry point for Colab

def run_colab_waterpath(PORT_FROM: str, PORT_TO: str, grid_km: float = 10.0,
                        SAVE_GEOJSON: Optional[str] = None, SHOW_MAP: bool = True):
    print(f"Geocoding '{PORT_FROM}' and '{PORT_TO}' ...")
    A = geocode_place(PORT_FROM)
    time.sleep(1)
    B = geocode_place(PORT_TO)

    print("Downloading Natural Earth land polygons ...")
    land = download_land_polygons()
    geoms, land_index = build_land_index(land)

    bbox = bbox_union(A.bbox, B.bbox, pad_km=80)
    print("Building water grid ...")
    nodes = generate_grid(bbox, step_km=grid_km)
    def blocked(pt):
        return not is_water(pt[0], pt[1], geoms, land_index)

    print("Running water‑only A* pathfinder ...")
    path = astar((A.lat, A.lon), (B.lat, B.lon), nodes, blocked, grid_km)
    if not path:
        raise RuntimeError("Failed to compute a water‑only path. Try larger pad or coarser grid.")

    line = LineString([(lon, lat) for lat, lon in path])
    meta = {
        "source": "water_grid_astar",
        "grid_km": grid_km,
        "from": {"name": A.name, "lat": A.lat, "lon": A.lon},
        "to": {"name": B.name, "lat": B.lat, "lon": B.lon},
    }
    fc = {"type":"FeatureCollection","features":[{"type":"Feature","geometry": json.loads(gpd.GeoSeries([line]).to_json())['features'][0]['geometry'],"properties": meta}]}

    if SAVE_GEOJSON:
        with open(SAVE_GEOJSON, 'w', encoding='utf-8') as f:
            json.dump(fc, f, ensure_ascii=False, indent=2)
        print(f"✓ Wrote GeoJSON to: {SAVE_GEOJSON}")

    if SHOW_MAP:
        try:
            import folium
            m = folium.Map(location=[(A.lat+B.lat)/2, (A.lon+B.lon)/2], zoom_start=6)
            folium.GeoJson(fc, name="water_path").add_to(m)
            print("Rendered interactive map below.")
            return m
        except Exception as e:
            print("Folium not available, skipping map:", e)
    return fc


Writing sea_only_path_colab.py


In [17]:
!pip -q install geopandas shapely pyproj folium requests

from sea_only_path_colab import run_colab_waterpath

# Example: Helsinki → Tallinn (tight grid is fine here)
m = run_colab_waterpath("Helsinki", "Tallinn",
                        grid_km=5,
                        SAVE_GEOJSON="helsinki_tallinn_water.geojson",
                        SHOW_MAP=True)
m


Geocoding 'Helsinki' and 'Tallinn' ...
Downloading Natural Earth land polygons ...
Building water grid ...
Running water‑only A* pathfinder ...


AttributeError: 'numpy.int64' object has no attribute 'contains'