# Imports, Constants and Setup

In [1]:
import os
import cv2
import requests
import numpy as np
from tqdm import tqdm
import geopandas as gpd
import matplotlib.pyplot as plt
from osmtogeojson import osmtogeojson
import vec_geohash

ZOOM = 16
DISPLAY = False

DATASET_FOLDER = f'data'
train_img_path = f'{DATASET_FOLDER}/train_images/img'
train_masks_path = f'{DATASET_FOLDER}/train_masks/img'
val_img_path = f'{DATASET_FOLDER}/val_images/img'
val_masks_path = f'{DATASET_FOLDER}/val_masks/img'
for path in [train_img_path, train_masks_path, val_img_path, val_masks_path]:
    os.makedirs(path, exist_ok=True)

# Helper functions

In [2]:
def download_img_for_tile(tile_x, tile_y, zoom, source='arcgis'):
    """Downloads image for Tile coordinates and Zoom level, you can also pass source from where to download."""
    if source == 'google':
        url = f'https://mt1.google.com/vt/lyrs=s&x={tile_x}&y={tile_y}&z={zoom}'
    elif source == 'google_new':
        url = f'https://khms3.google.com/kh/v=932?x={tile_x}&y={tile_y}&z={zoom}'
    else:
        url = f'https://server.arcgisonline.com/ArcGIS/rest/services/World_Imagery/MapServer/tile/{zoom}/{tile_y}/{tile_x}'
                
    try:
        img = requests.get(url).content
    except:
        return None


    np_arr = np.frombuffer(img, np.uint8)
    rgb_img = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
    return rgb_img


def query_osm_for_solar_panels_by_bbox(lat_min, lon_min, lat_max, lon_max):
    """Queries OSM for solar generators in bounding box."""
    osm_query = f"""
                    [out:json][timeout:1500];
                    way{lat_min, lon_min, lat_max, lon_max}["generator:source"="solar"];
                    (._;>;);
                    out; 
                """
    response = requests.get("http://overpass-api.de/api/interpreter", params={'data': osm_query})
    return response


def parse_response(response):
    """Parses the response if status is good else raises an error."""
    response.raise_for_status()
    data_json = response.json()
    osm_data = osmtogeojson.process_osm_json(data_json)
    return osm_data

def query_solar_powerplants_for_tile(tile_x, tile_y, zoom):
    lon_min, lat_min, lon_max, lat_max = vec_geohash.tile_to_lat_lon(tile_x, tile_y, zoom)[0]

    padding = 0.02
    lon_min, lat_min, lon_max, lat_max = lon_min-padding, lat_min-padding, lon_max+padding, lat_max+padding

    response = query_osm_for_solar_panels_by_bbox(lat_min, lon_min, lat_max, lon_max)
    data_json = parse_response(response)

    gdf = gpd.GeoDataFrame.from_features(data_json, crs='epsg:4326', columns=['@id', 'geometry'])
    gdf = gdf[gdf.geometry.type == 'Polygon']
    return gdf


def get_polygon_mask(polygon, zoom, offset_x=0, offset_y=0):
    """Creates mask for polygon."""
    lon, lat = np.array(polygon.exterior.coords).T
    pixel_x, pixel_y = vec_geohash.lat_lon_to_pixel(lat, lon, zoom)
    pixel_x, pixel_y = pixel_x-offset_x, pixel_y-offset_y
    return np.vstack((pixel_x, pixel_y)).T


def display_image(image):
    """Displays image."""
    plt.imshow(image)
    plt.axis('off')
    plt.show()

def display_images(img_list):
    f, axarr = plt.subplots(1, len(img_list), figsize=(13,13))
    for x, img in enumerate(img_list):
        axarr[x].imshow(img)
    plt.show()

# Downloading of satellite images

In [3]:
downloaded_set = set()
data = gpd.read_feather('osm_area_data_extracted/osm_data_Italia.geofeather')

source = 'google'
with tqdm(total=len(data)) as pbar:
    for row_num, row_data in data.iterrows():
        pbar.update(1)

        for x in range(row_data.min_tile_x, row_data.max_tile_x + 1):
            for y in range(row_data.min_tile_y, row_data.max_tile_y + 1):

                # skipping already downloaded img
                if (x, y, ZOOM) in downloaded_set:
                    continue

                img = download_img_for_tile(x, y, ZOOM, source)
                if img is None:
                    source = 'arcgis' if source == 'google' else 'google'
                    img = download_img_for_tile(x, y, ZOOM, source)

                if img is None:
                    print(f'Failed to download bot from google and arcgis: {(x, y, ZOOM)}')
                    continue

                downloaded_set.add((x, y, ZOOM))

                # extracting data form mask
                gdf = query_solar_powerplants_for_tile(x, y, ZOOM)
                masks = [get_polygon_mask(polygon, ZOOM, x*256, y*256) for polygon in gdf.geometry]
                masked_img = cv2.fillPoly(np.zeros((256,256)), masks, 255)

                if DISPLAY:
                    overlay_img = img.copy()
                    overlay_img = cv2.polylines(overlay_img, masks, True, (255,0,0), thickness=2)
                    display_images([img, overlay_img, masked_img])

                # skip where mask is less then 1%
                if masked_img.sum() < 165_000:
                    continue

                # training and validation set
                train_img_path, train_masks_path, val_img_path, val_masks_path
                if np.random.rand() > 0.2:
                    cv2.imwrite(f'{train_img_path}/img_{ZOOM}_{x}_{y}.png', img)
                    cv2.imwrite(f'{train_masks_path}/img_{ZOOM}_{x}_{y}.png', masked_img)
                else:
                    cv2.imwrite(f'{val_img_path}/img_{ZOOM}_{x}_{y}.png', img)
                    cv2.imwrite(f'{val_masks_path}/img_{ZOOM}_{x}_{y}.png', masked_img)





100%|██████████| 31042/31042 [1:45:42<00:00,  4.89it/s]  
