In [1]:
from pathlib import Path
from datetime import datetime
from functools import partial
from PIL import Image
import json

from multiprocessing.pool import ThreadPool

import ee
from affine import Affine
import geopandas as gpd
from shapely.geometry import box
import numpy as np

from pyproj import CRS

from gdstools import (
    split_bbox,
    GEEImageLoader,
    ConfigLoader,
    save_cog,
    image_collection,
    multithreaded_execution,
)


In [19]:
# %%
import time

def timeit(method):
    """Decorator that times the execution of a method and prints the time taken."""
    def timed(*args, **kwargs):
        start_time = time.time()
        result = method(*args, **kwargs)
        end_time = time.time()
        print(f"{method.__name__} took {end_time - start_time:.2f} seconds to run.")
        return result
    return timed

# %%
def naip_from_gee(
    bbox,
    year,
    dim=1,
    num_threads=8,
    epsg=4326,
    scale=1
):
    """Fetch NAIP image url from Google Earth Engine (GEE) using a bounding box.

    Parameters
    ----------
    bbox : list
        Bounding box in the form [xmin, ymin, xmax, ymax].
    year : int
        Year (e.g. 2019)
    epsg : int, optional
        EPSG code for coordinate reference system. Default is 4326.
    scale : int, optional
        Resolution in meters of the image to fetch. Default is 1.

    Returns
    -------
    numpy array, dict
        Returns a tuple containing the image as a numpy array and its metadata as a dictionary.
    """
    start_date = f"{year}-01-01"
    end_date = f"{year}-12-31"

    # get the naip image collection for our aoi and timeframe
    eebbox = ee.Geometry.BBox(*bbox)
    collection = (
        ee.ImageCollection("USDA/NAIP/DOQQ")
        .filterDate(start_date, end_date)
        .filterBounds(eebbox)
    )

    date_range = collection.reduceColumns(ee.Reducer.minMax(), ['system:time_start'])
    ts_end, ts_start = date_range.getInfo().values()

    image = GEEImageLoader(collection.median().clip(eebbox))
    imarray, profile = quad_fetch(
        collection, 
        bbox, 
        dim=dim, 
        num_threads=num_threads, 
        epsg=epsg, 
        scale=scale
    )

    image.metadata_from_collection(collection)
    image.set_property("system:time_start", ts_start)# * 1000)
    image.set_property("system:time_end", ts_end)# * 1000)
    image.set_params("scale", scale)
    image.set_params("crs", f"EPSG:{epsg}")
    image.set_params("region", bbox)
    image.set_viz_params("min", 0)
    image.set_viz_params("max", 255)
    image.set_viz_params("bands", ["R", "G", "B"])
    image.set_property("profile", profile)

    return imarray, image.metadata


def quad_fetch(collection, bbox, dim=1, num_threads=None, **kwargs):
    """Breaks user-provided bounding box into quadrants and retrieves data
    using `fetcher` for each quadrant in parallel using a ThreadPool.

    Parameters
    ----------
    bbox : 4-tuple or list
        Coordinates of x_min, y_min, x_max, and y_max for bounding box of tile.
    dim : int, optional
        Dimension of the quadrants to split the bounding box into. Default is 1.
    num_threads : int, optional
        Number of threads to use for parallel executing of data requests. Default is None.
    **kwargs
        Additional keyword arguments that will be passed to `naip_from_gee`.

    Returns
    -------
    numpy array, dict
        Returns a tuple containing the image as a numpy array and its metadata as a dictionary.
    """
    def clip_image(bbox, scale, epsg):
        ee_bbox = ee.Geometry.BBox(*bbox)
        image = GEEImageLoader(collection.median().clip(ee_bbox))
        image.set_params("scale", scale)
        image.set_params("crs", f"EPSG:{epsg}")
        return image.to_array()

    if dim > 1:
        if num_threads is None:
            num_threads = dim**2

        bboxes = split_bbox(dim, bbox)

        get_quads = partial(clip_image, **kwargs)
        with ThreadPool(num_threads) as p:
            quads = p.map(get_quads, bboxes)

        # Split quads list in tuples of size dim
        images = [x[0] for x in quads]
        quad_list = [images[x:x + dim] for x in range(0, len(images), dim)]

        # Reverse order of rows to match rasterio's convention
        [x.reverse() for x in quad_list]
        image = np.concatenate(
            [
                np.hstack(quad_list[x]) for x in range(0, len(quad_list))
            ],
            2
        )

        profile = quads[0][1]
        first = quads[0][1]['transform']
        last = quads[-1][1]['transform']
        profile['transform'] = Affine(
            first.a,
            first.b,
            first.c,
            first.d,
            first.e,
            last.f
        )
        h, w = image.shape[1:]
        profile.update(width=w, height=h)

        return image, profile

    else:
        return naip_from_gee(bbox, **kwargs)


@timeit
def get_naip(bbox, year, outpath, dim=3, overwrite=False, num_threads=None):
    """
    Downloads a NAIP image from Google Earth Engine and saves it as a Cloud-Optimized GeoTIFF (COG) file.

    Parameters
    ----------
    bbox : list-like
        list of bounding box coordinates (minx, miny, maxx, maxy)
    year : int
        year of the NAIP image to download
    outpath : str or Path
        path to save the downloaded image
    dim : int, optional
        dimension of the image to download (default is 3)
    overwrite : bool, optional
        whether to overwrite an existing file with the same name (default is False)
    num_threads : int, optional
        number of threads to use for downloading (default is None)

    Returns
    -------
    None
    """
    outpath = Path(outpath)
    try:
        image, metadata = naip_from_gee(bbox, dim=dim, year=year, num_threads=num_threads)
    except Exception as e:
        print(f"Failed to fetch {outpath.name}: {e}")
        return

    preview = Image.fromarray(
        np.moveaxis(image[:3], 0, -1).astype(np.uint8)
    ).convert('RGB')
    h, w = preview.size
    preview = preview.resize((w//30, h//30))
    preview.save(outpath.parent / outpath.name.replace('-cog.tif',
                 '-preview.png'), optimize=True)
    profile = metadata['properties']['profile']
    metadata['properties'].pop('profile')

    with open(outpath.parent / outpath.name.replace('-cog.tif', '-metadata.json'), 'w') as f:
        json.dump(metadata, f, indent=2)
    save_cog(image, profile, outpath, overwrite=overwrite)

    return


In [15]:
conf = ConfigLoader('../fbstacs').load()
api_url = conf['items']['naip']['api']

GRID = conf.DEV_GRID
PROJDATADIR = conf.DEV_PROJDATADIR
WORKERS = 3
year = 2020

ee.Initialize(opt_url=api_url)

qq_shp = gpd.read_file(GRID)
qq_shp['STATE'] = qq_shp.PRIMARY_STATE.apply(lambda x: str(x)[:2])

outpath = Path(PROJDATADIR) / 'naip' / str(year)
outpath.mkdir(exist_ok=True, parents=True)

In [16]:
qq_shp['STATE'] = qq_shp.PRIMARY_STATE.apply(lambda x: str(x)[:2])

outpath = Path(PROJDATADIR) / 'naip' / str(year)
outpath.mkdir(exist_ok=True, parents=True)

In [17]:
for row in qq_shp.itertuples():
    bbox = row.geometry.bounds
    dim = 6
    year = year
    outpath = outpath / f"{row.CELL_ID}_{year}_{row.STATE.upper()}_NAIP_DOQQ-cog.tif"
    num_threads = 5
    overwrite = False
    break

In [20]:
get_naip(
    bbox=bbox,
    dim=dim,
    year=year,
    outpath=outpath,
    num_threads=20,
    overwrite=False
)

Failed to fetch 108243_2020_OR_NAIP_DOQQ-cog.tif: 0
