In [1]:
import os
import random
import time
import csv
import requests
import zipfile
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from collections import defaultdict
from shapely.geometry import Point
import geopandas as gpd
import osmnx as ox
from PIL import Image
from shapely.geometry import LineString
import math

Countries from which we will gather additional data from google static streetview api

In [5]:
COUNTRIES = ['Dominican Republic',
 'Greenland',
 'Luxembourg',
 'Montenegro',
 'North Macedonia',
 'Albania',
 'Palestine',
 'Serbia',
 'Lesotho',
 'Kyrgyzstan',
 'Guatemala',
 'Jordan',
 'Ecuador',
 'Uruguay',
 'Sri Lanka',
 'Senegal',
 'Mongolia']

COUNTRY_ISO_MAP = {
    'Dominican Republic': 'DO',
    'Greenland': 'GL',
    'Luxembourg': 'LU',
    'Montenegro': 'ME',
    'North Macedonia': 'MK',
    'Albania': 'AL',
    'Palestine': 'PS',
    'Serbia': 'RS',
    'Lesotho': 'LS',
    'Kyrgyzstan': 'KG',
    'Guatemala': 'GT',
    'Jordan': 'JO',
    'Ecuador': 'EC',
    'Uruguay': 'UY',
    'Sri Lanka': 'LK',
    'Senegal': 'SN',
    'Mongolia': 'MN'
}

country_name_fix = {
		'Dominican Republic': 'Dominican Rep.',
	}


Saving 10 biggest cities for each country. We will try to gather images close to big cities so that we will have more chances of getting coordinates with streetview photos available

In [3]:
df = pd.read_csv("geonames/worldcities.csv")

# Group by country and get top 10 most populous cities per country
top_cities = (
    df[df['population'].notnull()]  # exclude missing population
    .sort_values(['country', 'population'], ascending=[True, False])
    .groupby('country')
    .head(10)
    .reset_index(drop=True)
)

top_cities = top_cities.rename(columns={
    'country': 'country_full',
    'iso2': 'country'
})

# Save to CSV
top_cities[['country_full', 'city', 'lat', 'lng', 'population', 'country']].to_csv("geonames/top_10_cities_per_country.csv", index=False)

print("Saved top 10 cities per country to top_10_cities_per_country.csv")


Saved top 10 cities per country to top_10_cities_per_country.csv


In [None]:
# --- Configuration ---		
API_KEY = "" 
MAX_TRIES = 500
METADATA_RADIUS = 100  # meters

# ----------------------------------------------------------------------------------------------
# ---------------------------------------HELPER FUNCTIONS---------------------------------------
# ----------------------------------------------------------------------------------------------


# --- Download Natural Earth country borders ---
def download_natural_earth(data_dir="./data"):
    url = "https://naturalearth.s3.amazonaws.com/110m_cultural/ne_110m_admin_0_countries.zip"
    zip_path = os.path.join(data_dir, "countries.zip")
    shp_path = os.path.join(data_dir, "ne_110m_admin_0_countries.shp")

    if os.path.exists(shp_path):
        return shp_path

    os.makedirs(data_dir, exist_ok=True)
    print("Downloading shapefile...")

    headers = {
        "User-Agent": "Mozilla/5.0"
    }

    r = requests.get(url, headers=headers)
    if r.status_code != 200:
        raise Exception(f"Failed to download data. Status code: {r.status_code}")

    with open(zip_path, "wb") as f:
        f.write(r.content)

    try:
        with zipfile.ZipFile(zip_path, "r") as zip_ref:
            zip_ref.extractall(data_dir)
    except zipfile.BadZipFile:
        os.remove(zip_path)
        raise Exception("Downloaded file is not a valid ZIP archive.")

    os.remove(zip_path)
    return shp_path


def download_and_extract_cities500(data_dir="./geonames"):
    os.makedirs(data_dir, exist_ok=True)
    zip_url = "http://download.geonames.org/export/dump/cities500.zip"
    zip_path = os.path.join(data_dir, "cities500.zip")
    extracted_path = os.path.join(data_dir, "cities500.txt")

    if not os.path.exists(extracted_path):
        print("Downloading cities500.zip...")
        r = requests.get(zip_url, stream=True)
        with open(zip_path, "wb") as f:
            for chunk in r.iter_content(chunk_size=8192):
                f.write(chunk)
        print("Downloaded. Extracting...")

        with zipfile.ZipFile(zip_path, "r") as zip_ref:
            zip_ref.extractall(data_dir)

        os.remove(zip_path)
        print(f"Extracted to {extracted_path}")
    else:
        print("cities500.txt already present.")

    return extracted_path


def load_cities_for_country(country_code, cities_file="geonames/cities500.txt"):
    cols = [1, 2, 3, 5]
    sep = ','
    if cities_file == "geonames/cities500.txt":
        cols = [1, 4, 5, 8]
        sep = '\t'
    
    df = pd.read_csv(cities_file, sep=sep, header=None, usecols=cols, names=["name", "lat", "lon", "country"])
    if country_code == "all":
        return df
    return df[df['country'] == country_code].reset_index(drop=True)


# --- Generate random point inside a country ---
def get_random_point_near_city_in_country(country_name, gdf, cities_df, max_km=2):
    
    row = gdf[gdf['NAME'] == country_name]
        
    if country_name in country_name_fix:
        normalized_name = country_name_fix.get(country_name, country_name)
        row = gdf[gdf['NAME'] == normalized_name]
    
    if row.empty:
        return None
    
    geom = row.iloc[0].geometry
    iso_code = COUNTRY_ISO_MAP.get(country_name)
    if not iso_code:
        return None
    
    country_cities = cities_df[cities_df["country"] == iso_code]
    if country_cities.empty:
        return None

    for _ in range(100):  # max tries
        city = country_cities.sample(1).iloc[0]
        dx = random.normalvariate(0, 1) * max_km / 111
        dy = random.normalvariate(0, 1) * max_km / 111
        lon, lat = float(city["lon"]) + dx, float(city["lat"]) + dy
        point = Point(lon, lat)
        if geom.contains(point):
            return point
    return None


# --- Query Street View metadata API ---
def get_valid_streetview_metadata(lat, lon, api_key, radius=100, retries=5):
    url = "https://maps.googleapis.com/maps/api/streetview/metadata"
    params = {
        "location": f"{lat},{lon}",
        "radius": radius,
        "key": api_key,
        "source": "outdoor"
    }

    backoff = 0.2
    for i in range(retries):
        try:
            r = requests.get(url, params=params, timeout=10)
            r.raise_for_status()
            result = r.json()
            if result.get("status") == "OK":
                return result
            return None
        except requests.exceptions.RequestException as e:
            print(f"Metadata API error on attempt {i+1}: {e}")
            time.sleep(backoff)
    return None


def compute_heading_from_road(lat, lon, distance=300, fallback_distance=600):
    try:
        G = ox.graph_from_point((lat, lon), dist=distance, network_type='drive')
        edges = ox.graph_to_gdfs(G, nodes=False)
        if edges.empty:
            # Retry with larger distance
            G = ox.graph_from_point((lat, lon), dist=fallback_distance, network_type='drive')
            edges = ox.graph_to_gdfs(G, nodes=False)
        if edges.empty:
            raise ValueError("No roads found even in fallback distance")

        point = Point(lon, lat)
        edges_proj = edges.to_crs(epsg=3857)
        point_proj = gpd.GeoSeries([point], crs="EPSG:4326").to_crs(epsg=3857).iloc[0]
        nearest_edge = edges_proj.distance(point_proj).sort_values().index[0]
        road = edges.loc[nearest_edge].geometry

        if isinstance(road, LineString):
            coords = list(road.coords)
        elif road.geom_type == "MultiLineString":
            coords = list(road.geoms[0].coords)
        else:
            return None

        if len(coords) < 2:
            return None
        x1, y1 = coords[0]
        x2, y2 = coords[-1]
        
        dx = x2 - x1
        dy = y2 - y1
        angle_rad = math.atan2(dx, dy)
        heading = (math.degrees(angle_rad) + 360) % 360
        return heading

    except Exception as e:
        print("Heading computation error:", e)
        return None


# --- Download actual Street View image by pano_id ---
def download_streetview_by_pano(pano_id, api_key, filename="streetview.jpg", heading=random.randint(0, 360), fov=120, pitch=0):
    url = "https://maps.googleapis.com/maps/api/streetview"
    params = {
        "size": "640x640",
        "pano": pano_id,
        "heading": heading,
        "fov": fov,
        "pitch": pitch,
        "key": api_key,
        "source": "outdoor"
    }
    r = requests.get(url, params=params)
    if r.status_code == 200:
        with open(filename, "wb") as f:
            f.write(r.content)
        return filename
    return None


# --- Detect if downloaded image is a blank placeholder ---
def is_placeholder_image(path):
    try:
        img = Image.open(path).convert("L")  # grayscale
        arr = np.array(img)
        std = arr.std()
        return std < 2  # low variation = likely placeholder
    except:
        return True


# --- Find a valid Street View point inside the country ---
def find_valid_streetview_point(country_name, gdf, cities_df,api_key, max_tries=1000):
    for i in range(max_tries):
        point = get_random_point_near_city_in_country(country_name, gdf, cities_df)
        if point is None:
            continue
        lat, lon = point.y, point.x
        metadata = get_valid_streetview_metadata(lat, lon, api_key, radius=METADATA_RADIUS)
        if metadata:
            pano_id = metadata.get("pano_id")
            if pano_id:
                print(f"Valid image after {i+1} tries: {country_name}")
                return point
        time.sleep(0.1)  # rate limit safety
    print("No valid image found.")
    return None


# --- Plot the result ---
def plot_map_with_point(country_name, gdf, point):
    fig, ax = plt.subplots(figsize=(10, 8))
    gdf.boundary.plot(ax=ax, color='gray')
    
    if country_name in country_name_fix:
        country_name = country_name_fix.get(country_name, country_name)

    gdf[gdf['NAME'] == country_name].plot(ax=ax, color='lightblue', edgecolor='blue')
    if point:
        ax.plot(point.x, point.y, 'ro', markersize=8)
        ax.text(point.x, point.y, f"({point.y:.4f}, {point.x:.4f})", fontsize=8)
    ax.set_title(f"Street View Point in {country_name}")
    plt.tight_layout()
    plt.show()


We will now extract all the images we need

In [None]:
TARGET_IMAGE_COUNT = 250
IMAGE_DIR = "additional_images"

os.makedirs(IMAGE_DIR, exist_ok=True)

# --- Loop across countries and collect images ---
shp_path = download_natural_earth()
cities_txt_path = download_and_extract_cities500()

cities_df = load_cities_for_country("all", "geonames/top_10_cities_per_country.csv")
world = gpd.read_file(shp_path)

MAX_IMAGES_PER_COUNTRY = 250
country_image_count = defaultdict(int)
for n in COUNTRIES:
    country_image_count[n] = 0
collected = 0

print(f"Collecting {TARGET_IMAGE_COUNT} total images...")

# Shuffle countries to start randomly
country_pool = COUNTRIES.copy()
random.shuffle(country_pool)
limit = TARGET_IMAGE_COUNT * len(COUNTRIES) + 1


while collected < limit:
    for country in country_pool:
        if country_image_count[country] >= MAX_IMAGES_PER_COUNTRY:
            continue

        print(f"Searching in: {country}")

        point = find_valid_streetview_point(country, world, cities_df, API_KEY, max_tries=200)

        if point:
            lat, lon = point.y, point.x
            metadata = get_valid_streetview_metadata(lat, lon, API_KEY, radius=METADATA_RADIUS)
            pano_id = metadata.get("pano_id")
            
            heading = compute_heading_from_road(lat, lon)
            if heading is None:
                  heading = 90

            # Folder + filename
            country_folder = os.path.join(IMAGE_DIR, country.replace(" ", "_"))
            os.makedirs(country_folder, exist_ok=True)
            filename = os.path.join(country_folder, f"{country.replace(' ', '_')}_{country_image_count[country]:03d}.jpg")

            saved = download_streetview_by_pano(pano_id, API_KEY, filename=filename, heading=heading)
            if saved and not is_placeholder_image(filename):
                print(f"Saved #{collected+1}: {filename}")
                collected += 1
                country_image_count[country] += 1
            else:
                print("Placeholder detected — skipping")
        else:
            print("No valid location found in this round")

        time.sleep(0.3)

    # Reshuffle for next full round to ensure variety
    random.shuffle(country_pool)

print("Done! All images and metadata saved.")

