In [None]:
from pathlib import Path
from typing import List, Dict, Any, Optional
import numpy as np
import math
import csv
from statistics import mean
import folium
from tqdm import tqdm

import ais_query, ais_maps, ais_utils

parquet_folder_path = "ais-data/parquet/"  # Path to folder with Parquet files

In [None]:
def parse_dkcpc_routes(path: str | Path) -> List[Dict[str, Any]]:
    """
    Parse DKCPC_2018-style route files.

    Logic:
    - 'Rute ...' only sets a *pending* name.
    - 'Linjefarge ...' actually decides if we start a new route or
      keep appending to the current one.
    - Consecutive blocks with the same color (e.g. 'Lilla') are merged
      into a single route.
    """
    path = Path(path)
    routes: List[Dict[str, Any]] = []
    current_route: Optional[Dict[str, Any]] = None
    pending_name: Optional[str] = None

    def _start_new_route(name: Optional[str] = None):
        nonlocal current_route, routes
        # Close previous route if it has points
        if current_route is not None and current_route["points"]:
            routes.append(current_route)
        current_route = {
            "name": name or "uten navn",
            "color": None,
            "points": [],
            "metadata": {},
        }

    with path.open("r", encoding="utf-8", errors="ignore") as f:
        for raw_line in f:
            line = raw_line.strip()
            if not line:
                continue

            # Header, can ignore
            if line.startswith("Ferdig forenklet"):
                continue

            # This *does not* start a new route directly,
            # we just remember the name for the next Linjefarge block.
            if line.startswith("Rute "):
                pending_name = line[len("Rute "):].strip() or "uten navn"
                continue

            # Here we decide whether to start a new route or
            # keep using the existing one based on color.
            if line.startswith("Linjefarge "):
                color = line[len("Linjefarge "):].strip()

                if current_route is None:
                    # First route
                    _start_new_route(pending_name)
                    current_route["color"] = color
                else:
                    if current_route["color"] == color:
                        # Same color as current route → same route
                        # Optionally update the name if it was generic
                        if current_route["name"] == "uten navn" and pending_name:
                            current_route["name"] = pending_name
                    else:
                        # Color changed → close route and start a new one
                        _start_new_route(pending_name)
                        current_route["color"] = color

                pending_name = None
                continue

            # Ignore "Plottsett ..." and other metadata lines
            if line.startswith("Plottsett"):
                continue

            # Coordinate lines: lat_minutes lon_minutes timestamp label
            parts = line.split()
            if len(parts) >= 2:
                try:
                    lat_min = float(parts[0])
                    lon_min = float(parts[1])
                except ValueError:
                    continue

                if current_route is None:
                    _start_new_route(pending_name)
                    pending_name = None

                current_route["points"].append((lat_min, lon_min))

    # Append last route if it has points
    if current_route is not None and current_route["points"]:
        routes.append(current_route)

    return routes

def export_bbox_points_to_txt(
    routes: List[Dict[str, Any]],
    bbox: list[float],
    out_path: str | Path,
) -> int:
    """
    Filter all points inside bbox, convert to degrees, and export to a TXT file.

    bbox format: [lat_max, lon_min, lat_min, lon_max]

    Output columns (comma-separated):
        lat_deg,lon_deg,route_name,color
    """
    out_path = Path(out_path)
    lat_max, lon_min, lat_min, lon_max = bbox

    rows = []

    for r in routes:
        name = r["name"]
        color = r["color"] if r["color"] is not None else ""

        for lat_min_val, lon_min_val in r["points"]:
            lat = lat_min_val / 60.0
            lon = lon_min_val / 60.0

            if not (lat_min <= lat <= lat_max and lon_min <= lon <= lon_max):
                continue

            rows.append((lat, lon, name, color))

    # Write to txt (CSV-style)
    with out_path.open("w", encoding="utf-8") as f:
        f.write("lat_deg,lon_deg,route_name,color\n")
        for lat, lon, name, color in rows:
            # escape commas in name if needed
            safe_name = name.replace(",", " ")
            safe_color = color.replace(",", " ")
            f.write(f"{lat:.8f},{lon:.8f},{safe_name},{safe_color}\n")

    return len(rows)


if __name__ == "__main__":
    txt_path = "ais-data/DKCPC_2018.txt"  # input file
    routes = parse_dkcpc_routes(txt_path)
    print(f"Loaded {len(routes)} routes.")

    bbox = [57.58, 10.5, 57.12, 11.92]  # [lat_max, lon_min, lat_min, lon_max]

    for r in routes:
        r["name"] = r["color"] if r.get("color") is not None else r.get("name", "uten navn")

    n_points = export_bbox_points_to_txt(
        routes,
        bbox,
        out_path="dkcpc_points_bbox_deg.txt",
    )
    print(f"Exported {n_points} points to dkcpc_points_bbox_deg.txt")



In [None]:
def plot_txt_points_folium(
    txt_path: str | Path,
    output_html: str = "dkcpc_points_map.html",
    tiles: str = "CartoDB positron",
    zoom_start: int = 9,
):
    """
    Plot points from a TXT file exported by export_bbox_points_to_txt()
    using Folium.

    Expected columns:
        lat_deg, lon_deg, route_name, color
    """
    txt_path = Path(txt_path)

    rows = []
    lats = []
    lons = []

    # Read file
    with txt_path.open("r", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        for row in reader:
            lat = float(row["lat_deg"])
            lon = float(row["lon_deg"])
            rows.append(row)
            lats.append(lat)
            lons.append(lon)

    if not rows:
        raise ValueError("No rows found in TXT file.")

    # Map center = average of coordinates
    center_lat = mean(lats)
    center_lon = mean(lons)

    m = folium.Map(location=[center_lat, center_lon], tiles=tiles, zoom_start=zoom_start)

    # Plot points
    for row in rows:
        lat = float(row["lat_deg"])
        lon = float(row["lon_deg"])
        name = row["route_name"] or "unknown"
        color = row["color"] or "black"

        folium.CircleMarker(
            location=(lat, lon),
            radius=2,
            color=color,
            fill=True,
            fill_opacity=0.9,
            weight=1,
            tooltip=f"{name} ({lat:.5f}, {lon:.5f})",
        ).add_to(m)

    # Save map
    m.save(output_html)
    print(f"Map saved to {output_html}")

    return m

# Plot all exported points
m = plot_txt_points_folium(
    "dkcpc_points_bbox_deg.txt",
    output_html="dkcpc_bbox_points_map.html"
)

In [None]:
cable_CG2_points = [(57.30158, 10.53598), (57.30987, 10.56213), (57.32877, 10.62335), (57.44588,10.95990), (57.48487, 11.27270), (57.51543, 11.50590) ,(57.47733, 11.74353), (57.46635, 11.82507), (57.45608, 11.91480)]
cable_kattegat2A_points = [(57.23810, 10.54834), (57.25287, 10.56423), (57.26348,10.65152), (57.25885, 10.73908),(57.26067, 10.75360), (57.25628, 10.79103), (57.25233, 10.86909)]
cable_kattegat2B_points = [(57.30917, 11.19625), (57.39863, 11.48437), (57.44917,11.64585), (57.46658, 11.76620), (57.46273, 11.77677), (57.46470, 11.82323), (57.46743, 11.85143), (57.46640, 11.89138), (57.45608, 11.91480)]

cable_points = {
    "CG2": cable_CG2_points,
    "Kattegat2A": cable_kattegat2A_points,
    "Kattegat2B": cable_kattegat2B_points,
}
# Saving CSV
filename = "submarine_cables_divided.csv"

with open(filename, mode='w', newline='') as file:
    writer = csv.writer(file)
    # Header: cable name, order of the point, Latitude, Longitude
    writer.writerow(["cable_id", "point_order", "lat", "lon"])
    
    for cable_name, points in cable_points.items():
        for i, (lat, lon) in enumerate(points):
            writer.writerow([cable_name, i, lat, lon])

print(f"Data saved to '{filename}'")

## This is a generic function to build a polyline from the points

In [None]:
from shapely.geometry import LineString

def build_polyline(points_latlon):
    """
    points_latlon: lista di (lat, lon) IN GRADI, nell'ordine in cui
                   vuoi che vengano collegati.
    Ritorna: shapely.geometry.LineString
    """
    if len(points_latlon) < 2:
        raise ValueError("Servono almeno 2 punti per costruire una spezzata.")
    
    # Shapely vuole (x, y) = (lon, lat)
    coords = [(lon, lat) for (lat, lon) in points_latlon]
    return LineString(coords)

line_cg2 = build_polyline(cable_CG2_points)
line_kg2A = build_polyline(cable_kattegat2A_points)
line_kg2B = build_polyline(cable_kattegat2B_points)

### here a function to plot both points and the newly generated maps in folium

In [None]:

# --- 1. Define Cable Data (Hardcoded for safety) ---

# --- 2. Load Points from File ---
txt_points_path = Path("dkcpc_points_bbox_deg.txt")

rows = []
lats = []
lons = []

# Check if file exists to prevent crash
if txt_points_path.exists():
    with txt_points_path.open("r", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        for row in reader:
            try:
                lat = float(row["lat_deg"])
                lon = float(row["lon_deg"])
                rows.append(row)
                lats.append(lat)
                lons.append(lon)
            except ValueError:
                continue
else:
    print(f"Warning: {txt_points_path} not found. Map will only show main cables.")
    # Default center to first cable if no points found
    lats = [p[0] for p in cable_CG2_points]
    lons = [p[1] for p in cable_CG2_points]

# --- 2. Load Points from File ---
txt_points_path = Path("dkcpc_points_bbox_deg.txt")

rows = []
lats = []
lons = []

# Check if file exists to prevent crash
if txt_points_path.exists():
    with txt_points_path.open("r", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        for row in reader:
            try:
                lat = float(row["lat_deg"])
                lon = float(row["lon_deg"])
                rows.append(row)
                lats.append(lat)
                lons.append(lon)
            except ValueError:
                continue
else:
    print(f"Warning: {txt_points_path} not found. Map will only show main cables.")
    # Default center to first cable if no points found
    lats = [p[0] for p in cable_CG2_points]
    lons = [p[1] for p in cable_CG2_points]

    
# --- 3. Create Map ---
# Center the map
center_lat = np.mean(lats) if lats else 57.3
center_lon = np.mean(lons) if lons else 11.0

m_all = folium.Map(location=[center_lat, center_lon], tiles="CartoDB positron", zoom_start=9)

# --- 4. Add Main Cables (Fixed Colors) ---
# CG2 -> Green
folium.PolyLine(cable_CG2_points, color="green", weight=4, opacity=0.9, tooltip="Cable CG2").add_to(m_all)
# Kattegat2A -> Purple
folium.PolyLine(cable_kattegat2A_points, color="purple", weight=4, opacity=0.9, tooltip="Cable Kattegat2A").add_to(m_all)
# Kattegat2B -> Red
folium.PolyLine(cable_kattegat2B_points, color="red", weight=4, opacity=0.9, tooltip="Cable Kattegat2B").add_to(m_all)

# --- 5. Add Exported Points (Black Markers) ---
for row in rows:
    lat = float(row["lat_deg"])
    lon = float(row["lon_deg"])
    route_name = row.get("route_name", "Unknown")
    
    folium.CircleMarker(
        location=(lat, lon),
        radius=2,
        color="black",        # Outline color
        fill=True,
        fill_color="black",   # Fill color
        fill_opacity=0.6,
        weight=1,
        tooltip=f"{route_name}"
    ).add_to(m_all)

folium.LayerControl().add_to(m_all)

out_html = "all_cables_and_points_map.html"
m_all.save(out_html)
print(f"Map successfully saved to {out_html}")

In [None]:
# A hypothetical vessel position near the CG2 cable
ship_lat = 57.315
ship_lon = 10.580

distance, cable_name, point_coords = ais_utils.get_min_distance_to_cables(ship_lat, ship_lon, cable_points)

print(f"Vessel Position: ({ship_lat}, {ship_lon})")
print(f"Nearest Cable: {cable_name}")
print(f"Minimum Distance: {distance:.2f} meters")
print(f"Closest point on cable geometry: {point_coords}")

In [None]:
df = ais_query.query_ais_duckdb(parquet_folder_path, verbose=True)

In [None]:
df.head()

In [None]:
# --- EXAMPLE USAGE ---

# Assuming 'A' is Latitude and 'B' is Longitude based on your description
# If you aren't sure, print df.head() to check which column looks like 56.x or 57.x (Lat)
lat_column_name = 'Latitude' # Change this to 'A' or 'B' etc. if needed
lon_column_name = 'Longitude' # Change this to 'A' or 'B' etc. if needed

cables = ais_utils.cable_points
# Run the function

# df is the DataFrame you loaded from parquet earlier
results_df = ais_utils.analyze_cable_risks(df, cables, lat_column_name, lon_column_name)
# Inspect results
print(results_df.head())

# Save to CSV
results_df.to_csv("vessel_cable_distances.csv", index=False)
print("Saved distances to vessel_cable_distances.csv")

In [None]:
m = ais_maps.create_ship_path_html(df,209275000, out_html="ship_path_map.html")

In [None]:
m = ais_maps.create_cable_risk_heatmap(results_df, dist_threshold=1000)