In [1]:
# from dem_stitcher.stitcher import stitch_dem
# import rasterio
# from rasterio import plot
import matplotlib.pyplot as plt
from shapely.geometry import Point, Polygon
import asf_search as asf
from datetime import datetime, date, timedelta
from typing import List
from pystac_client import Client, ItemSearch

In [2]:
# for formatting the datetime object to asfsearch syntax
def datetime2asfsearch(entered_date: datetime) -> str:
    return datetime.strftime(entered_date,'%Y') + '-' + datetime.strftime(entered_date,'%m') + '-' + datetime.strftime(entered_date,'%d') + 'T' + datetime.strftime(entered_date,'%H') + ':' + datetime.strftime(entered_date,'%M') + ':' + datetime.strftime(entered_date,'%S') + 'Z'

In [3]:
# for searching for landsat8 data
def hls_search(aoi: Point, date: List[datetime] = None):
    STAC_URL = 'https://cmr.earthdata.nasa.gov/stac'
    api = Client.open(f'{STAC_URL}/LPCLOUD/')
    hls_collections = ['HLSL30.v2.0', 'HLSS30.v2.0']
    
    if date == None:
        search_datetime = [datetime.combine(date.today(), datetime.min.time()), datetime.now()]
    else:
        search_datetime = date
    
    search_params = {
        "collections": hls_collections,
        "bbox": [aoi.x,aoi.y,aoi.x + 0.01,aoi.y + 0.01], # list of xmin, ymin, xmax, ymax
        "datetime": search_datetime,
        "max_items": 500
    }
    search_hls = api.search(**search_params)
    hls_collection = search_hls.get_all_items()
    d = list(hls_collection)
    
    return d

In [4]:
# for searching sentinel1 data
def asf_search(aoi: Point, date: List[datetime] = None):
    if date == None:
        today = date.today()
        start = str(today) + 'T00:00:00Z'
        end = str(today) + 'T23:59:59Z'
    else:
        start = datetime2asfsearch(date[0])
        end = datetime2asfsearch(date[1])

    opts = {
        'platform': asf.PLATFORM.SENTINEL1,
        'maxResults': 500,
        'start': start,
        'end': end
    }
    results = asf.search(**opts)
    
    return results

In [5]:
# find next acquisition date
def acq_search(sensor: str, aoi: Point, date):
    rep = 0
    acq = False
    
    while acq == False or rep < 10:
        
        if 'landsat8' in sensor.lower():
            results = hls_search(aoi, [date + timedelta(days = 100 * rep), date + timedelta(days = 100 * (rep + 1))])
        elif 'sentinel1' in sensor.lower():
            results = asf_search(aoi, [date + timedelta(days = 100 * rep), date + timedelta(days = 100 * (rep + 1))])
        
        coords = []
        for i in range(len(results)):
            coords.append(results[i].geometry['coordinates'][0])

        for i in range(len(results)):
            poly = Polygon(coords[i])
            if aoi.within(poly):
                acq = True
                break
            
        if acq:
            if 'landsat8' in sensor.lower():
                next_acq = results[i].properties['start_datetime']
            elif 'sentinel1' in sensor.lower():
                next_acq = results[i].properties['startTime']
            break
        
        next_acq = 'Search yielded no results'
        rep += 1
    return next_acq

In [6]:
def get_coverage(sensor: List[str], aoi: Point, date: List[datetime] = None) -> List[dict]:
    """
    Sensor: choose sentinel1 and/or landsat8
    AOI: enter coordinates as Point object
    date: leave as none if searching today, else enter time range as datetime tuple: datetime(YYYY,MM,DD)
    """
    freq = {}
    next_acq = {}
    area = {}
    
    for sensor_name in sensor:
        freq[sensor_name] = ''
        next_acq[sensor_name] = ''
        area[sensor_name] = ''
        
        if 'landsat8' in sensor_name.lower():
            results = hls_search(aoi, date)
        elif 'sentinel1' in sensor_name.lower():
            results = asf_search(aoi, date)

        coords = []
        for i in range(len(results)):
            coords.append(results[i].geometry['coordinates'][0])

        # find all intersecting polygons and save as list
        counter = 0
        polygon_list = []
        for i in range(len(results)):
            poly = Polygon(coords[i])
            if aoi.within(poly):
                counter += 1
                polygon_list.append(Polygon(coords[i]))
        
        # calculate frequency
        if counter == 0:
            freq[sensor_name] = 'There is no coverage during this time'
            area[sensor_name] = 0
        else:
            delta = date[1].date() - date[0].date()
            freq[sensor_name] = str(delta / counter)
        
        # find next acquisition time, if search time is today then returns 'N/A'
        if date == None:
            next_acq[sensor_name] = 'N/A'
            
        else:
            
            # put arbitrary hard stop at 10 searches (1000 days)
            if 'sentinel1' in sensor_name.lower():
                next_acq[sensor_name] = acq_search('sentinel1', aoi, date[1])
                    
            elif 'landsat8' in sensor_name.lower():
                next_acq[sensor_name] = acq_search('landsat8', aoi, date[1])
                
        if counter != 0:
            
            area[sensor_name] = polygon_list[0]
            if counter != 1:
                
                for i in range(len(polygon_list) - 1):
                    area[sensor_name] = area[sensor_name].intersection(polygon_list[i + 1])
         
    return freq, next_acq, area

In [None]:
freq, next_acq, area = get_coverage(['landsat8','sentinel1'],Point(105, 74),[datetime(2015,1,1), datetime(2017,1,1)])

In [None]:
print(freq['landsat8'])
print(next_acq['landsat8'])
print(freq['sentinel1'])
print(next_acq['sentinel1'])

In [None]:
print(area['landsat8'])
print(area['sentinel1'])