In [19]:
import os
import json
import time
import requests
import importlib
from planetscopedownload import planetscopedownload
importlib.reload(planetscopedownload)

data_dir = 'data'
api_path = os.path.join(data_dir, 'sites', "PlanetScope_API_key.txt")
PLANET_API_KEY = planetscopedownload.load_api_key(api_path)

# testing new Planetscope API key

In [5]:
cloud_filter = {
    "type": "RangeFilter",
    "field_name": "cloud_cover",
    "config": {
        'lt': 0.1
    }
}

In [6]:
and_filter = {
    "type": 'AndFilter',
    "config": [geometry, data_filter, cloud_filter]
}

# p(and_filter)

# Find out which ones we have permissions for

# Donwload with orders API

In [7]:
DATA_URL =  'https://api.planet.com/data/v1'
DATA_QUICK_SEARCH_URL = f'{DATA_URL}/quick-search'
ORDERS_URL = 'https://api.planet.com/compute/ops/orders/v2'
ITEM_TYPE = "PSScene"



def planet_auth(planet_api, data_url='https://api.planet.com/data/v1', orders_url='https://api.planet.com/compute/ops/orders/v2'):
    auth = requests.auth.HTTPBasicAuth(planet_api, '')
    data_response = requests.get(data_url, auth=auth)
    orders_response = requests.get(orders_url, auth=auth)
    if not data_response.status_code in [200, 201, 202]: raise RuntimeError(f"Authentification failed for data api: {json.dumps(data_response.json(), indent=2)}")
    if not orders_response.status_code in [200, 201, 202]: raise RuntimeError(f"Authentification failed for orders api: {json.dumps(orders_response.json(), indent=2)}")
    print('Planets data and orders API authentification successful')
    return auth

auth = planet_auth(PLANET_API_KEY)
    

Planets data and orders API authentification successful


In [8]:
# get the ids that we are interested in

def get_item_ids(and_filter:dict, auth_or_api_key:requests.auth.HTTPBasicAuth, data_quick_search_url='https://api.planet.com/data/v1/quick-search', item_type='PSScene'):

    if isinstance(auth_or_api_key, str):
        auth = planet_auth(PLANET_API_KEY) 
    elif isinstance(auth_or_api_key, requests.auth.HTTPBasicAuth):
        auth = auth_or_api_key

    desired_products = [
        'assets.ortho_analytic_4b_sr:download', # toar image
        'assets.ortho_udm2:download' # udm file
    ]
    

    search_request = {
        "item_types": [item_type],
        "filter": and_filter
    }


    search_result = requests.post(
        data_quick_search_url,
        auth = auth,
        json=search_request
    )

    if not search_result.status_code in (200, 201, 202):
        print("❌ Failed to place order")
        print(f"Status code: {search_result.status_code}")
        try:
            print("Error details:", json.dumps(search_result.json(), indent=2))
        except Exception:
            print("Raw response:", search_result.text)
        raise RuntimeError('See above issue in data API')

    # print(feature['id'])
    # p(feature['_permissions']) # NOTE maybe can tell us if we have access
    features = search_result.json()['features']
    image_ids = []
    for feature in features:
        valid = True
        for product_type in desired_products:
            if not product_type in feature['_permissions']:
                # print(f'{product_type} missing permissions for {feature["id"]}')
                valid=False # NOTE if there inst permission to all the data we need skip this id
        if valid: image_ids.append(feature['id'])
    
    return(image_ids)

image_ids = get_item_ids(and_filter=and_filter, auth_or_api_key=PLANET_API_KEY)

Planets data and orders API authentification successful


In [11]:
sitename = 'australianarrabeen'

products = [
    {
        'item_ids': image_ids,
        'item_type': "PSScene",
        "product_bundle":"analytic_udm2"
    }
]

request = {
    "name": sitename,
    "products":products,
    "delivery": {"single_archive": True, "archive_type": 'zip'}
}

In [12]:
headers = {'content-type': 'application/json'}

def place_order(request, auth, orders_url='https://api.planet.com/compute/ops/orders/v2'):
    response = requests.post(orders_url, data=json.dumps(request), auth=auth, headers=headers)

    if response.status_code in (200, 201, 202):
        print("✅ Order placed successfully")
        order_id = response.json()['id']
        print(f"Order ID: {order_id}")
        order_url = orders_url + '/' + order_id
        return order_url
    else:
        print("❌ Failed to place order")
        print(f"Status code: {response.status_code}")
        try:
            print("Error details:", json.dumps(response.json(), indent=2))
        except Exception:
            print("Raw response:", response.text)
        return None

In [13]:
order_url = place_order(request, session.auth)

✅ Order placed successfully
Order ID: 9ef82dd0-c3f5-430d-849c-a7564cf735b4


In [14]:
def poll_for_success(order_url, auth, num_loops=30):
    count = 0
    while(count < num_loops):
        count += 1
        r = requests.get(order_url, auth=session.auth)
        response = r.json()
        state = response['state']
        print(state)
        end_states = ['success', 'failed', 'partial']
        if state in end_states:
            print(state)
            break
        time.sleep(10)
        
poll_for_success(order_url, session.auth)

running
running
running
running
running
running
running
running
running
running
running
running
running
running
running
running
running
running
running
running
running
running
running
running
running
running
running
running
running
running


In [15]:
r = requests.get(order_url, auth=session.auth)
response = r.json()
results = response['_links']['results']

In [16]:
[r['name'] for r in results]

['9ef82dd0-c3f5-430d-849c-a7564cf735b4/output.zip']

# Download

In [None]:
import pathlib
def download_results(results, overwrite=False):
    results_urls = [r['location'] for r in results]
    results_names = [r['name'] for r in results]
    print('{} items to download'.format(len(results_urls)))
    
    for url, name in zip(results_urls, results_names):
        path = pathlib.Path(os.path.join('data', name))
        
        if overwrite or not path.exists():
            print('downloading {} to {}'.format(name, path))
            r = requests.get(url, allow_redirects=True)
            path.parent.mkdir(parents=True, exist_ok=True)
            open(path, 'wb').write(r.content)
        else:
            print('{} already exists, skipping {}'.format(path, name))

In [3]:
def write_api_key_file(api_key:str, overwrite:bool=False, data_dir:str=os.path.join(os.getcwd(), 'data')):
    sites_dir = os.path.join(data_dir, 'sites')
    if not os.path.exists(sites_dir): os.mkdir(sites_dir)
    
    file_path = os.path.join(sites_dir, 'PlanetScope_API_key.txt')
    if overwrite or not os.path.exists(file_path):
        # if we want to overwrite or if it doesnt exsist we will need to make it
        with open(file_path, "w") as file:
            file.write(api_key)