In [None]:
!pip install osmnx
!pip install mercantile

In [None]:
import osmnx as ox
import mercantile
import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
import requests
import shapely
from shapely.geometry import Point, LineString, Polygon
import pyproj
import os
from google.colab import drive
import pickle
import json
import time
import math

# Mount to Gdrive

In [None]:
# connect to drive
drive.mount('/gdrive')

Mounted at /gdrive


# Setup

In [None]:
path = '/gdrive/MyDrive/berlin_bike_CV/'
folder = 'metadata'
mapillary_folder = 'mapillary'

name = 'DSR'

# Load Data

In [None]:
# load original graph
graph_name = '_graph_origin.pkl'
graph_path = os.path.join(path, folder, name + graph_name)
with open(graph_path, 'rb') as pickle_file:
    G = pickle.load(pickle_file)

# load edges per tile
file_name = '_tiles_edges.pkl'
file_path = os.path.join(path, folder, name + file_name)
with open(file_path, 'rb') as pickle_file:
  tiles_data = pickle.load(pickle_file)

# Find images per edge

In [None]:
# defining coordinate to metre and metre to coordinate transformers
source_crs = pyproj.CRS("EPSG:4326")  # WGS84 geographic coordinate system
target_crs = pyproj.CRS(proj= 'utm', zone=33)  # Universal Transverse Mercator in meters

transformer_coord_2_metre = pyproj.Transformer.from_crs(source_crs, target_crs, always_xy=True)

transformer_metre_2_coord = pyproj.Transformer.from_crs(target_crs, source_crs, always_xy=True)

In [None]:
# defining constant values for offset and width of each polygon (bounding box)

# offset defines separation from street junction in metre
POLYGONOFFSET = 5

# width defines street width in metre
POLYGONWIDTH = 10

In [None]:
# defining point apart from a junction
# TODO: take into account when street is shorter than the offset

def find_point_on_line(point1, point2, distance=POLYGONOFFSET):

  direction_vector = (point2[0] - point1[0], point2[1] - point1[1])
  direction_length = math.sqrt(direction_vector[0]**2 + direction_vector[1]**2)
  normalized_direction = (direction_vector[0] / direction_length, direction_vector[1] / direction_length)
  point_on_line = (point1[0] + normalized_direction[0] * distance, point1[1] + normalized_direction[1] * distance)

  return point_on_line

In [None]:
# define polygon for street
def street_polygon(u, v, k):

  if 'geometry' in G.edges[(u, v, k)]:
    geometry = G.edges[(u, v, k)]['geometry']

  else:
    geometry = LineString([(G.nodes[u]['x'], G.nodes[u]['y']), (G.nodes[v]['x'], G.nodes[v]['y'])])

  list_geom = list(geometry.coords)
  list_lon = [coord[0] for coord in list_geom]
  list_lat = [coord[1] for coord in list_geom]

  list_x, list_y = transformer_coord_2_metre.transform(list_lon, list_lat)

  list_geom = list(zip(list_x, list_y))
  list_geom[0] = find_point_on_line(list_geom[0], list_geom[1])
  list_geom[-1] = find_point_on_line(list_geom[-1], list_geom[-2])
  list_geom = LineString(list_geom)

  transformed_polygon = list_geom.buffer(distance=POLYGONWIDTH, cap_style=3)

  list_geom = list(transformed_polygon.exterior.coords)
  list_x = [coord[0] for coord in list_geom]
  list_y = [coord[1] for coord in list_geom]

  list_lon, list_lat = transformer_metre_2_coord.transform(list_x, list_y)

  polygon = Polygon(list(zip(list_lon, list_lat)))

  return polygon

In [None]:
# point is within street polygon
def is_in_street_polygon(point, polygon):

  is_contained = polygon.contains(Point(point[0], point[1]))

  return is_contained

In [None]:
# find images per edges per tile
for tile_quadkey, edges in tiles_data.items():
  file_name = tile_quadkey + '.geojson'
  file_path = os.path.join(path, folder, mapillary_folder, file_name)
  with open(file_path, 'r') as f:
      data = json.load(f)

  for (u, v, k) in edges:
      polygon = street_polygon(u, v, k)
      image_list = []

      for feature in data['features']:

          # get coordinates
          lng = feature['geometry']['coordinates'][0]
          lat = feature['geometry']['coordinates'][1]

          # take image id, if image within street polygon
          if is_in_street_polygon((lng, lat), polygon):
              image_id = feature['properties']['id']
              image_prop = {image_id: {'coordinates': [lng, lat]}}
              image_list.append(image_prop)

              # stop after 3 images
              if len(image_list) >= 3:
                  break

      tiles_data[tile_quadkey][(u, v, k)] = image_list

In [None]:
# save dict of tiles with image data
file_name = '_tiles_edges_with_images.pkl'
file_path = os.path.join(path, folder, name + file_name)
with open(file_path, 'wb') as pickle_file:
    pickle.dump(tiles_data, pickle_file)

print('Done')

Done
