In [None]:
import json
import requests
from getpass import getpass
import sys
import time
import re
import threading
import datetime
import os
import pandas as pd
import geopandas as gpd
import geojson
import pprint
from geojson import Polygon, Feature, FeatureCollection, dump

import warnings
warnings.filterwarnings("ignore")

In [None]:
# user inputs
################################################
# change tile number
tile_number = 'h3v9'
# change data set name
datasetName = 'landsat_ard_tile_c2'
#  sensor list comment uncomment as needed
sensors = [
            'LC08',
            'LC09',
           'LE07',
           'LT05'
           ]
cloudCoverFilter = {'min' : 0, 'max' : 75}
fileType = 'band'
# user input ends
################################################

In [None]:
# read spatial extent from a csv
aoi_df = pd.read_csv('tile_aoi.csv')
aoi_df = aoi_df[aoi_df['tile']==tile_number]
# corner coordinate
llx = aoi_df['llx'].to_list()[0]
lly = aoi_df['lly'].to_list()[0]
urx = aoi_df['urx'].to_list()[0]
ury = aoi_df['ury'].to_list()[0]
# create a spatial filter
spatialFilter = {
    "filterType": "mbr",
    "lowerLeft": {
        "latitude": lly,
        "longitude": llx
    },
    "upperRight": {
        "latitude": ury ,
        "longitude": urx
    }
}

pprint.pprint(spatialFilter)
# delete df
del aoi_df

In [None]:
def sendRequest(url, data, apiKey=None, exitIfNoResponse=True):
    """
    Send a request to an M2M endpoint and return the parsed JSON response.

    Parameters:
    url (str): The URL of the M2M endpoint
    data (dict): The payload to be sent with the request
    apiKey (str): Optional API key for authentication
    exitIfNoResponse (bool): Whether to stop execution on failure

    Returns:
    dict or None: Parsed JSON response or None on error
    """
    json_data = json.dumps(data)

    try:
        if apiKey is None:
            response = requests.post(url, json_data)
        else:
            headers = {'X-Auth-Token': apiKey}
            response = requests.post(url, json_data, headers=headers)

        httpStatusCode = response.status_code
        if response is None:
            print("No output from service.")
            return None

        output = json.loads(response.text)

        if output.get('errorCode') is not None:
            print(f"{output['errorCode']} - {output['errorMessage']}")
            return None

        if httpStatusCode == 404:
            print("404 Not Found")
            return None
        elif httpStatusCode == 401:
            print("401 Unauthorized")
            return None
        elif httpStatusCode == 400:
            print("400 Bad Request")
            return None

        return output.get('data', None)

    except Exception as e:
        raise RuntimeError(f"Request failed: {e}")
    finally:
        if 'response' in locals():
            response.close()

In [None]:
def downloadFile(url):
    sema.acquire()
    try:
        with requests.get(url, stream=True, timeout=120) as response:
            response.raise_for_status()

            # Parse filename from content-disposition
            disposition = response.headers.get('content-disposition', '')
            matches = re.findall('filename="?([^"]+)"?', disposition)
            filename = matches[0] if matches else f"file_{int(time.time())}.dat"

            filepath = os.path.join(data_dir, filename)

            # Stream the file to disk to save memory
            with open(filepath, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    if chunk:  # filter out keep-alive chunks
                        f.write(chunk)

    except Exception as e:
        print(f"\nFailed to download from {url}. Error: {e}")
        # Reattempt download (but avoid unbounded recursion)
        try:
            runDownload(threads, url)
        except Exception as retry_error:
            print(f"Retry also failed: {retry_error}")
    finally:
        sema.release()


In [None]:
# downloading function
def runDownload(threads, url):
    def downloadWithLimit(url):
        with sema:  # enforce concurrency limit
            downloadFile(url)

    thread = threading.Thread(target=downloadWithLimit, args=(url,))
    threads.append(thread)
    thread.start()

In [None]:
# Define required directories
data_dir = 'data_ls'
utils_dir = 'utils'
dirs = [data_dir, utils_dir]

# Create directories if they don't exist
for d in dirs:
    try:
        os.makedirs(d, exist_ok=True)
        print(f"Directory '{d}' is ready.")  # Unified message
    except OSError as e:
        print(f"[Error] Could not create '{d}': {e}") 

In [None]:
# Set the maximum number of concurrent download threads
MAX_THREADS = 50
sema = threading.Semaphore(value=MAX_THREADS)

# Generate a timestamped label for logging or output folders
label = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")

# Thread list to keep track of active downloads
threads = []

In [None]:
def prompt_ERS_login(serviceURL):
    print("Logging in...\n")

    p = ['Enter EROS Registration System (ERS) Username: ', 'Enter ERS Account Token: ']

    # Use requests.post() to make the login request
    response = requests.post(f"{serviceURL}login-token",
                             json={'username': '*****************************',
                                    'token':'********************************************'})

    if response.status_code == 200:  # Check for successful response
        apiKey = response.json()['data']
        print('\nLogin Successful, API Key Received!')
        headers = {'X-Auth-Token': apiKey}
        return apiKey
    else:
        print("\nLogin was unsuccessful, please try again or create an account at: https://ers.cr.usgs.gov/register.")
        

In [None]:
serviceUrl = "https://m2m.cr.usgs.gov/api/api/json/stable/"
apiKey = prompt_ERS_login(serviceUrl) 

In [None]:
# count time for image loading
import time
start_time = time.time()

# single sensor overide
sensors = [
    'LT05',
    'LE07',
    'LC08',
    'LC09'
    ] 
# walk over each sensor 
for sensor in sensors:
    print(sensor)
    if sensor == 'LT05':
        bandNames = {'SR_B1', 'SR_B2', 'SR_B3', 'SR_B4', 'SR_B5', 'SR_B7', 'QA_PIXEL', 'ST_B6'}
        temporal_coverage = {'start' : '1984-03-01', 'end' : '2012-05-05'}# mission do not have images after 2011 november
        #temporal_coverage = {'start' : '1994-01-01', 'end' : '2012-05-05'}
        #print(bandNames)
    elif sensor == 'LE07':
        bandNames = {'SR_B1', 'SR_B2', 'SR_B3', 'SR_B4', 'SR_B5', 'SR_B7', 'QA_PIXEL', 'ST_B6'}
        temporal_coverage = {'start' : '1999-01-01', 'end' : '2022-04-06'} # mission ended 2022-04-06
        #temporal_coverage = {'start' : '2002-01-01', 'end' : '2003-01-01'}
        #print(bandNames)
    elif sensor == 'LC09':
        bandNames = {'SR_B2', 'SR_B3', 'SR_B4', 'SR_B5', 'SR_B6', 'SR_B7', 'QA_PIXEL', 'ST_B10'}
        temporal_coverage = {'start' : '2021-10-31', 'end' : '2024-12-31'}
        #temporal_coverage = {'start' : '2024-01-01', 'end' : '2024-12-31'}
    else:
        bandNames = {'SR_B2', 'SR_B3', 'SR_B4', 'SR_B5', 'SR_B6', 'SR_B7', 'QA_PIXEL', 'ST_B10'}
        temporal_coverage = {'start' : '2013-01-01', 'end' : '2024-12-31'}
        #temporal_coverage = {'start' : '2014-01-01', 'end' : '2024-10-01'}
        #print(bandNames)

    # temporal filter: list temporal filter  dicts
    # just devide the large temporal range to anual cycles to avoid bad response from api
    dates = pd.date_range(start=temporal_coverage['start'], 
                      end=temporal_coverage['end'], 
                      freq='YS') 
    # add final date manually
    dates = dates.append(pd.to_datetime([temporal_coverage['end']]))
    # create a list of dict using loop
    temp_list = [{'start': str(dates[i].date()), 'end': str(dates[i+1].date())} for i in range(len(dates)-1)]
    # start downloading for each: after this line
    for temporalFilter in temp_list:
            # search payload
            search_payload = {
            'datasetName' : datasetName,
            'sceneFilter' : {
                'spatialFilter' : spatialFilter,
                'acquisitionFilter' : temporalFilter,
                'cloudCoverFilter' : cloudCoverFilter}
                }
            # scene search
            scenes = sendRequest(serviceUrl + "scene-search", search_payload, apiKey)
            # idfeild to grab
            idField = 'entityId'
            # entity id list
            entityIds = []
            # take entid if bulk true
            for result in scenes['results']:
                # Add this scene to the list I would like to download if bulk is available
                if result['options']['bulk'] == True:
                    entityIds.append(result[idField])
            # filter for sensor
            # select only a single sensor: so the only one sensor goes to next list ids
            entityIds = [item for item in entityIds if item[:4] == sensor]
            if len(entityIds) == 0: continue
            listId = f"temp_{datasetName}_list" # customized list id
            # scn list payload: mesg from me to api
            scn_list_add_payload = {
                "listId": listId,
                'idField' : idField,
                "entityIds": entityIds,
                "datasetName": datasetName
                }
            # clean old requests: otherwise it mixed up new request with old
            sendRequest(serviceUrl + "scene-list-remove", {"listId": listId}, apiKey) 
            # number of image count
            count = sendRequest(serviceUrl + "scene-list-add", scn_list_add_payload, apiKey)
            print(f'number of images to download {count}')
            # add download code; rename old folder
            sendRequest(serviceUrl + "scene-list-get", {'listId' : scn_list_add_payload['listId']}, apiKey)
            # 
            download_opt_payload = {
                        "listId": listId,
                        "datasetName": datasetName
                        }
            #if fileType == 'band_group':
            #     download_opt_payload['includeSecondaryFileGroups'] = True
            products = sendRequest(serviceUrl + "download-options", download_opt_payload, apiKey)
            filegroups = sendRequest(serviceUrl + "dataset-file-groups", {'datasetName' : datasetName}, apiKey)

            # file group id
            fileGroupIds = {"ls_c2ard_sr"} # can change this

            # Select products
            print("Selecting products...")
            downloads = []
            if fileType == 'bundle':
                # Select bundle files
                print("    Selecting bundle files...")
                for product in products:        
                    if product["bulkAvailable"] and product['downloadSystem'] != 'folder':               
                        downloads.append({"entityId":product["entityId"], "productId":product["id"]})


            elif fileType == 'band':
                # Select band files
                print("    Selecting band files...")
                for product in products:  
                    if product["secondaryDownloads"] is not None and len(product["secondaryDownloads"]) > 0:
                        for secondaryDownload in product["secondaryDownloads"]:
                            for bandName in bandNames:
                                if secondaryDownload["bulkAvailable"] and bandName in secondaryDownload['displayId']:
                                    downloads.append({"entityId":secondaryDownload["entityId"], "productId":secondaryDownload["id"]})


            elif fileType == 'band_group':        
                # Get secondary dataset ID and file group IDs with the scenes
                print("    Checking for scene band groups and get secondary dataset ID and file group IDs with the scenes...")
                sceneFileGroups = []
                entityIds = []
                datasetId = None
                for product in products:  
                    if product["secondaryDownloads"] is not None and len(product["secondaryDownloads"]) > 0:
                        for secondaryDownload in product["secondaryDownloads"]:
                            if secondaryDownload["bulkAvailable"] and secondaryDownload["fileGroups"] is not None:
                                if datasetId == None:
                                    datasetId = secondaryDownload['datasetId']
                                for fg in secondaryDownload["fileGroups"]:                            
                                    if fg not in sceneFileGroups:
                                        sceneFileGroups.append(fg)
                                    if secondaryDownload['entityId'] not in entityIds:
                                        entityIds.append(secondaryDownload['entityId'])

                # Send dataset request to get the secondary dataset name by the dataset ID
                data_req_payload = {
                    "datasetId": datasetId,
                }
                results = sendRequest(serviceUrl + "dataset", data_req_payload, apiKey)
                secondaryDatasetName = results['datasetAlias']

                # Add secondary scenes to a list
                secondaryListId = f"temp_{datasetName}_scecondary_list" # customized list id
                sec_scn_add_payload = {
                    "listId": secondaryListId,
                    "entityIds": entityIds,
                    "datasetName": secondaryDatasetName
                }

                print("    Adding secondary scenes to list...")
                count = sendRequest(serviceUrl + "scene-list-add", sec_scn_add_payload, apiKey)    
                print("    Added", count, "secondary scenes\n")

                # Compare the provided file groups Ids with the scenes' file groups IDs
                if fileGroupIds:
                    fileGroups = []
                    for fg in fileGroupIds:
                        fg = fg.strip() 
                        if fg in sceneFileGroups:
                            fileGroups.append(fg)
                else:
                    fileGroups = sceneFileGroups
            else:
                # Select all available files
                for product in products:        
                    if product["bulkAvailable"]:
                        if product['downloadSystem'] != 'folder':            
                            downloads.append({"entityId":product["entityId"], "productId":product["id"]})
                        if product["secondaryDownloads"] is not None and len(product["secondaryDownloads"]) > 0:
                            for secondaryDownload in product["secondaryDownloads"]:
                                if secondaryDownload["bulkAvailable"]:
                                    downloads.append({"entityId":secondaryDownload["entityId"], "productId":secondaryDownload["id"]})

                        
            


            # sending download request
            if fileType != 'band_group':
                download_req2_payload = {
                    "downloads": downloads,
                    "label": label
                }
            else:
                if len(fileGroups) > 0:
                    download_req2_payload = {
                        "dataGroups": [
                            {
                                "fileGroups": fileGroups,
                                "datasetName": secondaryDatasetName,
                                "listId": secondaryListId
                            }
                        ],
                        "label": label
                    }
                else:
                    print('No file groups found')
                    sys.exit()

            print(f"Sending download request ...")
            download_request_results = sendRequest(serviceUrl + "download-request", download_req2_payload, apiKey)
            print(f"Done sending download request") 

            if len(download_request_results['newRecords']) == 0 and len(download_request_results['duplicateProducts']) == 0:
                print('No records returned, please update your scenes or scene-search filter')
                sys.exit()

            # Attempt the download URLs 
            for result in download_request_results['availableDownloads']:
                #print(f"Get download url: {result['url']}\n" )
                runDownload(threads, result['url'])
                
            preparingDownloadCount = len(download_request_results['preparingDownloads'])
            preparingDownloadIds = []
            if preparingDownloadCount > 0:
                for result in download_request_results['preparingDownloads']:  
                    preparingDownloadIds.append(result['downloadId'])

                download_ret_payload = {"label" : label}                
                # Retrieve download URLs
                print("Retrieving download urls...\n")
                download_retrieve_results = sendRequest(serviceUrl + "download-retrieve", download_ret_payload, apiKey, False)
                if download_retrieve_results != False:
                    print(f"    Retrieved: \n" )
                    for result in download_retrieve_results['available']:
                        if result['downloadId'] in preparingDownloadIds:
                            preparingDownloadIds.remove(result['downloadId'])
                            runDownload(threads, result['url'])
                            print(f"       {result['url']}\n" )
                        
                    for result in download_retrieve_results['requested']:   
                        if result['downloadId'] in preparingDownloadIds:
                            preparingDownloadIds.remove(result['downloadId'])
                            runDownload(threads, result['url'])
                            print(f"       {result['url']}\n" )
                
                # Didn't get all download URLs, retrieve again after 30 seconds
                while len(preparingDownloadIds) > 0: 
                    print(f"{len(preparingDownloadIds)} downloads are not available yet. Waiting for 30s to retrieve again\n")
                    time.sleep(30)
                    download_retrieve_results = sendRequest(serviceUrl + "download-retrieve", download_ret_payload, apiKey, False)
                    if download_retrieve_results != False:
                        for result in download_retrieve_results['available']:                            
                            if result['downloadId'] in preparingDownloadIds:
                                preparingDownloadIds.remove(result['downloadId'])
                                #print(f"    Get download url: {result['url']}\n" )
                                runDownload(threads, result['url'])
                                
            print("\nDownloading files... Please do not close the program\n")
            for thread in threads:
                thread.join() 

            # create a log file
            with open(tile_number+"_logFile.txt", "w") as file:
                # write info into logfile
                start_year = temporalFilter['start'][:4]
                text = f"Images available for year {start_year} is {count}\n"
                file.write(sensor + " ")
                file.write(text)


# rename the folder to the tile ID
os.rename('data_ls', tile_number)

# logging out from the system
endpoint = "logout"  
if sendRequest(serviceUrl + endpoint, None, apiKey) == None:        
    print("\nLogged Out\n")
else:
    print("\nLogout Failed\n")

# count time taken for the process
end_time = time.time()
elapsed_time = end_time - start_time
print(f"\nTotal time taken: {elapsed_time / 60:.2f} minutes ({elapsed_time:.2f} seconds)")

Execute following cell if need to clean up the payload

In [None]:
remove_scnlst_payload = {
    "listId": listId
}
sendRequest(serviceUrl + "scene-list-remove", remove_scnlst_payload, apiKey)

if fileType == 'band_group':    
    # Remove the secondary scene list
    remove_scnlst2_payload = {
        "listId": secondaryListId
    }
    sendRequest(serviceUrl + "scene-list-remove", remove_scnlst2_payload, apiKey)