In [None]:
import os
from typing import Dict, Any, Union
from pathlib import Path
from io import BytesIO
from exif import Image as ExifImage

import requests
import json

import folium

from tqdm import tqdm
import numpy as np
from PIL import Image

In [None]:
secrets = {
    "access_token": "MLY|6148807671914225|994d45073bcccc0eca929741524e65f6"
}

# Get locations from exif

In [None]:
def decimal_coords(coords, ref):
 decimal_degrees = coords[0] + coords[1] / 60 + coords[2] / 3600
 if ref in ["S", "W"]:
  decimal_degrees = -decimal_degrees

 return decimal_degrees

def image_coordinates(image_path):
    try:
        with open(image_path, 'rb') as src:
            img = ExifImage(src)

        if not img.has_exif:
            return None

        img.gps_longitude
        coords = (
            decimal_coords(img.gps_latitude, img.gps_latitude_ref),
            decimal_coords(img.gps_longitude, img.gps_longitude_ref)
        )
    except:
        coords = None

    return coords

In [None]:
scene = '0025'
# img_dir = f'/Volumes/Extreme_SSD/MegaDepth/scenes/{scene}/images/'
img_dir = f'../../data/scenes/{scene}/images/'

img_list = os.listdir(img_dir)
img_list = [os.path.join(img_dir, img) for img in img_list]

# get the coordinates of the images
coords = [image_coordinates(img_path) for img_path in img_list]
coords = [coord for coord in coords if coord is not None]
coords = np.array(coords)


center = np.median(coords, axis=0)
std = np.std(coords, axis=0)

print(f"Number of coordinates: {len(coords)}")
print(f"Center: {center}")
print(f"Std:    {std}")

In [None]:
# get bbox such that 90% of the area is covered
def dist(p1, p2):
    return ((p1[0]-p2[0])**2 + (p1[1]-p2[1])**2)**0.5

# sort points by distance
def sort_points(center, points):
    return sorted(points, key=lambda p: dist(p, center))

def get_bbox(points, ratio=0.8):
    points = np.array(points)
    points = points[:int(ratio*len(points))]
    x1, y1 = points.min(axis=0)
    x2, y2 = points.max(axis=0)
    return x1, y1, x2, y2

In [None]:
xmin, ymin, xmax, ymax = get_bbox(sort_points(center, coords), 0.5)

In [None]:
bbox = [
    [xmin, ymin],
    [xmax, ymin],
    [xmax, ymax],
    [xmin, ymax],
    [xmin, ymin]
]

map = folium.Map(location=center, zoom_start=20)

folium.PolyLine(bbox, color="red", weight=5).add_to(map)


for coord in coords:
    folium.CircleMarker(
        location=coord,
        radius=1,
        color='blue',
        fill=True,
        fill_color='blue'
    ).add_to(map)

folium.CircleMarker(
    location=center,
    radius=10,
    color='red',
    fill=True,
    fill_color='red'
).add_to(map)


map

# Get close images from Mapillary

In [None]:
import requests
import json
from vt2geojson.tools import vt_bytes_to_geojson

import math
def deg2num(lat_deg, lon_deg, zoom):
  lat_rad = math.radians(lat_deg)
  n = 2.0 ** zoom
  xtile = int((lon_deg + 180.0) / 360.0 * n)
  ytile = int((1.0 - math.asinh(math.tan(lat_rad)) / math.pi) / 2.0 * n)
  return (xtile, ytile)

MAP_ACCESS_TOKEN = secrets["access_token"]

#set zoom levels and corner coordinates
z = 14
ll_lat = xmin 
ur_lat = xmax
ll_lon = ymin
ur_lon = ymax
llx, lly = deg2num(ll_lat, ll_lon, z)
urx, ury = deg2num(ur_lat, ur_lon, z)

print(llx, lly, urx, ury)

#uncomment the one layer you wish to use
# type="mly1_computed_public"
# type="mly_map_feature_point"
# type="mly_map_feature_traffic_sign"
# type="mly1_computed_public"
# type="mly1_public"

types = ["mly1_computed_public","mly_map_feature_point","mly_map_feature_traffic_sign","mly1_computed_public","mly1_public"]

for type in types:
    output = {"type":"FeatureCollection","features":[]}
    for x in range(min(llx,urx),max(llx,urx)+1,1):
        for y in range(min(lly,ury),max(lly,ury)+1,1):
            print (type,x,y)
            url = f"https://tiles.mapillary.com/maps/vtp/{type}/2/{z}/{x}/{y}?access_token={MAP_ACCESS_TOKEN}"
            r = requests.get(url)
            if r.status_code != 200:
                print("error", r.status_code, url)
                continue
            vt_content = r.content
            features = vt_bytes_to_geojson(vt_content, x, y, z)
            for f in features["features"]:
                output['features'].append(f)

In [None]:
output = [out for out in output["features"] if out["geometry"]["type"] == "Point"]

coords = [out["geometry"]["coordinates"][::-1] for out in output]

for idx, coord in enumerate(coords):
    folium.CircleMarker(
        location=coord,
        radius=1,
        color='black',
        fill=True,
        fill_color='black',
        popup=f"Point {idx}"
    ).add_to(map)

map

In [None]:
# Only keep coords inside bbox
def inside_bbox(coordinate: list, bbox: list):
    return bbox[0] <= coordinate[0] <= bbox[1] and bbox[2] <= coordinate[1] <= bbox[3]

len_old = len(coords)
coords = [coord for coord in coords if inside_bbox(coord, [xmin, xmax, ymin, ymax])]
print(f"{len_old} -> {len(coords)}")

In [None]:
for idx, coord in enumerate(coords):
    folium.CircleMarker(
        location=coord,
        radius=1,
        color='green',
        fill=True,
        fill_color='green',
        popup=f"Point {idx}"
    ).add_to(map)

map

# Download images from bbox

In [None]:
def get_altitude(lat: float, lon: float):
    url = f"https://api.open-elevation.com/api/v1/lookup?locations={lat},{lon}"
    try:
        r = requests.get(url).json()
        return r["results"][0]["elevation"]
    except:
        return -1

def get_image_coordinates(id: Union[str, int], secrets: Dict[str, Any]):
    url = f"https://graph.mapillary.com/{id}?access_token={secrets['access_token']}"
    headers = {"Authorization": f"OAuth {secrets['access_token']}"}
    response = requests.get(url, headers)
    data = response.json()
    return data["geometry"]["coordinates"][::-1]

def get_image_infos(id: Union[str, int], secrets: Dict[str, Any]):
    url = f"https://graph.mapillary.com/{id}?access_token={secrets['access_token']}"
    fields = "&fields=id,captured_at,compass_angle,computed_compass_angle,sequence,geometry,computed_geometry"
    url += fields

    headers = {"Authorization": f"OAuth {secrets['access_token']}"}
    response = requests.get(url, headers)
    data = response.json()
    lat, lon = data["geometry"]["coordinates"][::-1]
    alt = get_altitude(lat, lon)
    return {
        "id": data["id"],
        "captured_at": data["captured_at"],
        "compass_angle": data["compass_angle"],
        "computed_compass_angle": data["computed_compass_angle"],
        "sequence": data["sequence"],
        "coordinates": [lat, lon, alt],
        "computed_coordinates": data["computed_geometry"]["coordinates"][::-1],
    }

def download_image(id: Union[str, int], secrets: Dict[str, Any]):
    # get the image url
    url = f"https://graph.mapillary.com/{id}?fields=thumb_2048_url&access_token={secrets['access_token']}"
    headers = {"Authorization": f"OAuth {secrets['access_token']}"}
    response = requests.get(url, headers)
    data = response.json()

    # download the image
    img_url = data["thumb_2048_url"]
    response = requests.get(img_url)
    img = Image.open(BytesIO(response.content))

    return img, get_image_infos(id, secrets)

def get_sequence_ids(id: str, secrets: Dict[str, Any]):
    url = f"https://graph.mapillary.com/image_ids?sequence_id={id}&access_token={secrets['access_token']}"

    headers = {"Authorization": f"OAuth {secrets['access_token']}"}
    response = requests.get(url, headers)
    data = response.json()
    return [x["id"] for x in data["data"]]

In [None]:
out_dir = Path("/Users/alexanderveicht/Desktop/out")

folder = out_dir / scene

if not folder.exists():
    folder.mkdir()

In [None]:
bbox_images = [out for out in output if inside_bbox(out["geometry"]["coordinates"][::-1], [xmin, xmax, ymin, ymax])]

ids = [out["properties"]["id"] for out in bbox_images]

infos = {}
for id in tqdm(ids, total=len(ids), desc="Downloading images"):
    img, info = download_image(id, secrets)
    img_fn = folder / f"{id}.jpg"
    img.save(img_fn)
    infos[id] = info

with open(folder / "infos.json", "w") as f:
    json.dump(infos, f)

# Create locations.txt

In [None]:
with open(folder / "infos.json", "r") as f:
    infos = json.load(f)

In [None]:
locs = []
locs_comp = []

for id, info in infos.items():
    long, lat, alt = info["coordinates"]
    line = f"{id}.jpg {long} {lat} {alt}\n"
    locs.append(line)
    long, lat = info["computed_coordinates"]
    line = f"{id}.jpg {long} {lat} {alt}\n"
    locs_comp.append(line)

with open(folder / "locs.txt", "w") as f:
    f.writelines(locs)

with open(folder / "locs_comp.txt", "w") as f:
    f.writelines(locs_comp)

# Get closest sequence to center

In [None]:
# dists = [dist(center, coords[i]) for i in range(len(coords))]
# closest_to_center = np.argmin(dists)
# closest_to_center_distance = dists[closest_to_center]


# print(f"Closest to center: {closest_to_center} ({coords[closest_to_center]} <-> {center})))")
# print(f"Distance: {closest_to_center_distance}")

# seq_id = output[idx]["properties"]["sequence_id"]
# print(f"Sequence ID: {seq_id}")

# Get Sequence from Mapillary

In [None]:
# out_dir = Path("/Users/alexanderveicht/Desktop/out")

# seq_id = "75v6cy432dhdi8c7luvdic"

# seq_folder = out_dir / seq_id

# if not seq_folder.exists():
#     seq_folder.mkdir()

In [None]:
# infos = {}

# seq_img_ids = get_sequence_ids(seq_id, secrets)
# for idx, img_id in tqdm(enumerate(seq_img_ids), total = len(seq_img_ids)):
#     fname = f"{seq_id}_{idx:04d}.jpg"
#     # img, info = download_image(img_id, secrets)
#     info = get_image_infos(img_id, secrets)
#     # img.save(seq_folder / fname)
#     infos[fname] = info

# # save infos
# with open(seq_folder / "info.json", "w") as f:
#     json.dump(infos, f)

# Plot on Map

In [None]:
import pyproj

def gps_to_ecef_pyproj(coords):
    transformer = pyproj.Transformer.from_crs(
        {"proj":'latlong', "ellps":'WGS84', "datum":'WGS84'},
        {"proj":'geocent', "ellps":'WGS84', "datum":'WGS84'},
    )
    x, y, z = transformer.transform(coords[0], coords[1], coords[2], radians=False)

    return x, y, z

def ecef_to_lla(coord):
    transformer = pyproj.Transformer.from_crs(
        {"proj":'geocent', "ellps":'WGS84', "datum":'WGS84'},
        {"proj":'latlong', "ellps":'WGS84', "datum":'WGS84'},
    )
    lon, lat, alt = transformer.transform(coord[0], coord[1], coord[2], radians=False)
    return lon, lat, alt

# plot location on map
def plot_map(poses: list, points: list, color='red'):
    map = folium.Map(location=poses[0], zoom_start=30)
    
    for coord in poses:
        folium.CircleMarker(coord, radius=1, color=color).add_to(map)

    for coord in points:
        folium.CircleMarker(coord, radius=0.1, color='blue').add_to(map)


    return map

In [None]:
import pycolmap
import numpy as np

from tqdm import tqdm

from hloc.visualization import plot_images
from hloc.utils.io import read_image

from megadepth.utils.projections import backward_project

geo_model = pycolmap.Reconstruction("../../data/test-mapillary/sparse/geo-model")

p3d = geo_model.points3D
p3d = [p3d[p].xyz for p in p3d if p3d[p].track.length() > 20]
idx = np.random.choice(len(p3d), 5000)
p3d = [p3d[i] for i in idx]
p3d = [ecef_to_lla(p)[:2][::-1] for p in tqdm(p3d)]

In [None]:
with open("../../data/test-mapillary/coords-comp.txt", "r") as f:
    lines = f.read().split("\n")

lines = [c.split(" ") for c in lines[:-1]]
lines = [[str(c[0]), float(c[1]), float(c[2])] for c in lines]

coords = {c[0]: {"gt": [c[1], c[2]]} for c in lines}

In [None]:
images = geo_model.images
cameras = geo_model.cameras

poses = {}
for i, k1 in enumerate(images.keys()):
    image_1 = images[k1]
    camera_1 = cameras[image_1.camera_id]
    poses[image_1.name] = backward_project(
        points_2d=np.array([[0, 0]]),
        image=image_1,
        camera=camera_1,
        depth=0,
    )

for k in coords.keys():
    if k in poses.keys():
        coords[k]["pose"] = ecef_to_lla(poses[k][0])[:2][::-1]

In [None]:
gt_poses = [p["gt"] for p in coords.values() if "pose" in p.keys()]
poses = [p["pose"] for p in coords.values() if "pose" in p.keys()]
gt_poses = np.array(gt_poses)
poses = np.array(poses)

In [None]:
N = 5

img_fn = [f"../../data/test-mapillary/images/{fname}" for fname in coords.keys()][:N]

plot_images([read_image(fn) for fn in img_fn])

In [None]:
map = folium.Map(location=gt_poses[0], zoom_start=15)

for i, pose in enumerate(gt_poses):
    folium.Circle(pose, popup=f'GT Pose {i}', color="black", radius=1).add_to(map)

for i, pose in enumerate(poses):
    folium.Circle(pose, popup=f'Pose {i}', color="red", radius=1).add_to(map)

for i in range(len(poses)):
    folium.PolyLine([poses[i], gt_poses[i]], color="green", weight=2.5, opacity=1).add_to(map)

for coord in p3d:
    folium.CircleMarker(coord, radius=0.1, color='blue').add_to(map)

map