In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from typing import List, Optional
import geopandas as gpd
import matplotlib.pyplot as plt
from shapely.geometry import Point
import contextily as ctx
import pydantic

import polars

In [None]:
class Location(pydantic.BaseModel):
    latitude: float
    longitude: float
    type: Optional[str] = None
    subtype: Optional[str] = None
    continent: Optional[str] = None
    country: Optional[str] = None
    region: Optional[str] = None
    subregion: Optional[str] = None


def parse_kml_element(elem, ns, parent_names=None) -> List[Location]:
    if parent_names is None:
        parent_names = []

    locations = []

    name_elem = elem.find("kml:name", ns)
    current_names = parent_names.copy()
    if name_elem is not None:
        current_names.append(name_elem.text.strip())

    if elem.tag == f"{{{ns['kml']}}}Placemark":
        coords_elem = elem.find(".//kml:coordinates", ns)
        if coords_elem is not None:
            coord_text = coords_elem.text.strip()
            first_coord = coord_text.split()[0]
            coord_parts = first_coord.strip("> \n\t").split(",")

            if len(coord_parts) >= 2:
                try:
                    lon = float(coord_parts[0])
                    lat = float(coord_parts[1])

                    hierarchy = current_names[1:8] + [None] * (
                        6 - len(current_names[1:8])
                    )

                    loc = Location(
                        latitude=lat,
                        longitude=lon,
                        type=hierarchy[0],
                        subtype=hierarchy[1],
                        continent=hierarchy[2],
                        country=hierarchy[3],
                        region=hierarchy[4],
                        subregion=hierarchy[5],
                    )
                    locations.append(loc)
                except ValueError:
                    pass

    for child in elem:
        locations.extend(parse_kml_element(child, ns, current_names))

    return locations


def extract_locations_from_kml(file_path: str) -> List[Location]:
    tree = ET.parse(file_path)
    root = tree.getroot()
    ns = {"kml": "http://www.opengis.net/kml/2.2"}
    return parse_kml_element(root, ns)


locations = extract_locations_from_kml("archaeogeodesy.kml")

In [None]:
df = polars.DataFrame(data=[l.model_dump() for l in locations])

In [None]:
for col in df.columns:
    n_unique = df[col].n_unique()
    print(f"Column: {col} | Unique count: {n_unique}")
    if n_unique < 100:
        print(df[col].unique())

In [None]:
import plotly.express as px
import pandas as pd

# Load the car share dataset
# df = px.data.carshare()
# print(df.head())

# Create the scatter map box
fig = px.scatter_map(
    df,
    lat="latitude",
    lon="longitude",
    color="subtype",
    color_continuous_scale=px.colors.cyclical.IceFire,
    size_max=15,
    zoom=20,
    map_style="satellite",
)

# Update the layout with a title and margins
fig.update_layout(
    title="Montreal Car Share Data",
    margin={"r": 0, "t": 0, "l": 0, "b": 0},
    height=600,
    width=1000,
)
fig.show(render="notebook")

In [None]:
from typing import List, Optional
from dataclasses import dataclass
from shapely.geometry import Point, Polygon
import geopandas as gpd
import matplotlib.pyplot as plt
import contextily as ctx
from geopy.distance import distance as geopy_distance
from pyproj import Transformer
from math import radians, cos


# Define ZoomCircle
@dataclass
class ZoomCircle:
    center_lon: float  # Longitude of center
    center_lat: float  # Latitude of center
    radius_km: float  # Radius in kilometers


def plot_locations(locations: List[Location], zoom_circle: Optional[ZoomCircle] = None):
    # Convert location data into a GeoDataFrame
    gdf = gpd.GeoDataFrame(
        locations,
        geometry=[Point(loc.longitude, loc.latitude) for loc in locations],
        crs="EPSG:4326",  # WGS84
    )

    # Filter points by zoom_circle if provided
    if zoom_circle:

        def within_radius(row):
            return (
                geopy_distance(
                    (row.geometry.y, row.geometry.x),
                    (zoom_circle.center_lat, zoom_circle.center_lon),
                ).km
                <= zoom_circle.radius_km
            )

        gdf = gdf[gdf.apply(within_radius, axis=1)]

    # Define Amazon bounding box (WGS84)
    amazon_bbox = {"minx": -80, "maxx": -45, "miny": -20, "maxy": 7}

    # Create a Polygon for the Amazon bounding box
    amazon_polygon = Polygon(
        [
            (amazon_bbox["minx"], amazon_bbox["miny"]),
            (amazon_bbox["minx"], amazon_bbox["maxy"]),
            (amazon_bbox["maxx"], amazon_bbox["maxy"]),
            (amazon_bbox["maxx"], amazon_bbox["miny"]),
            (amazon_bbox["minx"], amazon_bbox["miny"]),
        ]
    )

    amazon_gdf = gpd.GeoDataFrame({"geometry": [amazon_polygon]}, crs="EPSG:4326")

    # Convert to Web Mercator for basemap
    gdf_mercator = gdf.to_crs(epsg=3857)
    amazon_gdf_mercator = amazon_gdf.to_crs(epsg=3857)

    # Plot locations
    fig, ax = plt.subplots(figsize=(15, 10))
    gdf_mercator.plot(ax=ax, alpha=0.7, edgecolor="k", color="red", label="Locations")
    amazon_gdf_mercator.boundary.plot(
        ax=ax, color="green", linewidth=2, label="Amazon Region"
    )

    # Add labels (subtype)
    if not gdf.empty and "subtype" in gdf.columns:
        for x, y, label in zip(
            gdf_mercator.geometry.x, gdf_mercator.geometry.y, gdf["subtype"]
        ):
            if label:
                ax.text(x, y, label, fontsize=9, ha="center", va="bottom")

    # Add OSM basemap
    ctx.add_basemap(ax, source=ctx.providers.OpenStreetMap.Mapnik)

    # Add longitude/latitude ticks
    transformer = Transformer.from_crs("EPSG:3857", "EPSG:4326", always_xy=True)

    def format_ticks(tick_values, axis="x"):
        lons_or_lats = []
        for t in tick_values:
            lon, lat = (
                transformer.transform(t, 0)
                if axis == "x"
                else transformer.transform(0, t)
            )
            lons_or_lats.append(round(lon, 2) if axis == "x" else round(lat, 2))
        return lons_or_lats

    x_ticks = ax.get_xticks()
    y_ticks = ax.get_yticks()
    ax.set_xticklabels(format_ticks(x_ticks, axis="x"))
    ax.set_yticklabels(format_ticks(y_ticks, axis="y"))
    ax.set_xlabel("Longitude")
    ax.set_ylabel("Latitude")

    # Set zoom extents if provided
    if zoom_circle:
        transformer_to_merc = Transformer.from_crs(
            "EPSG:4326", "EPSG:3857", always_xy=True
        )
        deg_per_km_lat = 1 / 110.574  # Approximate
        deg_per_km_lon = 1 / (111.320 * cos(radians(zoom_circle.center_lat)))
        delta_lat = zoom_circle.radius_km * deg_per_km_lat
        delta_lon = zoom_circle.radius_km * deg_per_km_lon
        minx = zoom_circle.center_lon - delta_lon
        maxx = zoom_circle.center_lon + delta_lon
        miny = zoom_circle.center_lat - delta_lat
        maxy = zoom_circle.center_lat + delta_lat
        minx_m, miny_m = transformer_to_merc.transform(minx, miny)
        maxx_m, maxy_m = transformer_to_merc.transform(maxx, maxy)
        ax.set_xlim(minx_m, maxx_m)
        ax.set_ylim(miny_m, maxy_m)

    # Final plot adjustments
    plt.title("Locations by Subtype on OpenStreetMap")
    plt.legend()
    plt.show()

In [None]:
plot_locations(
    locations=locations,
    zoom_circle=ZoomCircle(center_lat=-10, center_lon=-80, radius_km=1000),
)

In [None]:
import boto3
import re
from botocore import UNSIGNED
from botocore.config import Config
import os
import tqdm

# Set up S3 client (public, unsigned)
s3 = boto3.client(
    "s3",
    endpoint_url="https://opentopography.s3.sdsc.edu",
    config=Config(signature_version=UNSIGNED),
)

bucket = "raster"
prefix = "AW3D30/"

# Regex pattern to capture coordinates like W045 and S020
pattern = re.compile(r"W(\d{3})|S(\d{3})")

# Coordinate bounds
lon_min = 45
lon_max = 80
lat_min = 7
lat_max = 20

# List and filter files
paginator = s3.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=bucket, Prefix=prefix)

matching_files = []

for page in pages:
    for obj in page.get("Contents", []):
        key = obj["Key"]
        matches = pattern.findall(key)
        if not matches:
            continue

        # Extract longitude and latitude from filename
        lon_match = next((int(lon) for lon, lat in matches if lon), None)
        lat_match = next((int(lat) for lon, lat in matches if lat), None)

        # Filter by ranges
        if lon_match is not None and lat_match is not None:
            if lon_min <= lon_match <= lon_max and lat_min <= lat_match <= lat_max:
                matching_files.append(key)

# Print results
print(f"Found {len(matching_files)} matching files:")
for file in tqdm.tqdm(matching_files):
    filename = os.path.join("./AW3D30", os.path.basename(file))
    # print(f"Downloading {file} -> {filename}")
    s3.download_file(bucket, file, filename)