In [1]:
import os
import sys
import requests
import time
import json
import getpass

## Input user credentials and configuration

In [2]:
username = getpass.getpass()

········


In [3]:
password = getpass.getpass()

········


In [4]:
config = {
    'auth_token': '',
    'apiBaseUrl': 'https://des.ncsa.illinois.edu/desaccess/api',
    'filesBaseUrl': 'https://des.ncsa.illinois.edu/files-desaccess',
    'username': username,
    'password': password,
    'database': 'desdr',
    'release': 'DR1',
}

## Define API wrapper functions

### Define login function

In [5]:
def login():
    """Obtains an auth token using the username and password credentials for a given database.
    """
    # Login to obtain an auth token
    r = requests.post(
        '{}/login'.format(config['apiBaseUrl']),
        data={
            'username': config['username'],
            'password': config['password'],
            'database': config['database']
        }
    )
    # Store the JWT auth token
    config['auth_token'] = r.json()['token']
    return config['auth_token']


### Define cutout job submission function

In [6]:
def submit_cutout_job():
    """Submits a query job and returns the complete server response which includes the job ID."""

    # Specify API request parameters
    data = {
        'db': config['database'],
        'release': config['release'],
        'xsize': 1.5,
        'ysize': 0.5,
        'make_fits': True,
        'make_rgb_lupton': True,
        'make_rgb_stiff': True,
        'return_list': False,
        'colors_fits': 'y',
        'rgb_stiff_colors': 'riz',
        'rgb_lupton_colors': 'izy',
        'positions': positions,
    }

    # Submit job
    r = requests.put(
        '{}/job/cutout'.format(config['apiBaseUrl']),
        data=data,
        headers={'Authorization': 'Bearer {}'.format(config['auth_token'])}
    )
    response = r.json()
    # print(json.dumps(response, indent=2))
    
    if response['status'] == 'ok':
        job_id = response['jobid']
        print('Job "{}" submitted.'.format(job_id))
        # Refresh auth token
        config['auth_token'] = response['new_token']
    else:
        job_id = None
        print('Error submitting job: '.format(response['message']))
    
    return response


### Define query job submission function

In [7]:
def submit_query_job():
    """Submits a query job and returns the complete server response which includes the job ID."""

    # Specify API request parameters
    data = {
        'username': config['username'],
        'db': config['database'],
        'filename': 'results.csv',
        'query': '''
            SELECT
            COADD_OBJECT_ID,RA,DEC,
            MAG_AUTO_G G,
            MAG_AUTO_R R,
            WAVG_MAG_PSF_G G_PSF,
            WAVG_MAG_PSF_R R_PSF
            FROM DR1_MAIN
            WHERE
            RA between 323.36-0.12 and 323.36+0.12 and
            DEC between -0.82-0.12 and -0.82+0.12 and
            WAVG_SPREAD_MODEL_I + 3.0*WAVG_SPREADERR_MODEL_I < 0.005 and
            WAVG_SPREAD_MODEL_I > -1 and
            IMAFLAGS_ISO_G = 0 and
            IMAFLAGS_ISO_R = 0 and
            FLAGS_G < 4 and
            FLAGS_R < 4
        '''
    }

    # Submit job
    r = requests.put(
        '{}/job/query'.format(config['apiBaseUrl']),
        data=data,
        headers={'Authorization': 'Bearer {}'.format(config['auth_token'])}
    )
    response = r.json()
    
    if response['status'] == 'ok':
        job_id = response['jobid']
        print('Job "{}" submitted.'.format(job_id))
        # Refresh auth token
        config['auth_token'] = response['new_token']
    else:
        job_id = None
        print('Error submitting job: '.format(response['message']))
    
    return response

### Define job status function

In [8]:
def get_job_status(job_id):
    """Returns the current status of the job identified by the unique job_id."""

    r = requests.post(
        '{}/job/status'.format(config['apiBaseUrl']),
        data={
            'job-id': job_id
        },
        headers={'Authorization': 'Bearer {}'.format(config['auth_token'])}
    )
    response = r.json()
    # Refresh auth token
    config['auth_token'] = response['new_token']
    # print(json.dumps(response, indent=2))
    return response

### Define job file downloader function

In [9]:
def download_job_files(url, outdir):
    os.makedirs(outdir, exist_ok=True)
    r = requests.get('{}/json'.format(url))
    for item in r.json():
        if item['type'] == 'directory':
            suburl = '{}/{}'.format(url, item['name'])
            subdir = '{}/{}'.format(outdir, item['name'])
            download_job_files(suburl, subdir)
        elif item['type'] == 'file':
            data = requests.get('{}/{}'.format(url, item['name']))
            with open('{}/{}'.format(outdir, item['name']), "wb") as file:
                file.write(data.content)

    response = r.json()
    return response

### Define downloaded file lister

In [10]:
def list_downloaded_files(download_dir):
    for dirpath, dirnames, filenames in os.walk(download_dir):
        for filename in filenames:
            print(os.path.join(dirpath, filename))


## Obtain authentication token

In [11]:
 # Authenticate and store the auth token for subsequent API calls
try:
    print('Logging in as user "{}" ("{}") and storing auth token...'.format(config['username'], config['database']))
    login()
except:
    print('Login failed.')
    sys.exit(1)

Logging in as user "jtest" ("dessci") and storing auth token...


## Submit jobs

### Submit a query job to retrieve data from the DESSCI database

In [12]:
job_type = 'query'

print('Submitting query job...')
response = submit_query_job()

# Store the unique job ID for the new job
job_id = response['jobid']
print('New job submitted: {}'.format(job_id))


Submitting query job...
Job "634b41cd9d104c6b849e3b0b852e57ee" submitted.
New job submitted: 634b41cd9d104c6b849e3b0b852e57ee


### Poll the status of the job until it is complete

In [13]:
print('Polling status of job "{}"...'.format(job_id), end='')
job_status = ''
while job_status != 'ok':
    # Fetch the current job status
    response = get_job_status(job_id)
    # Quit polling if there is an error getting a status update
    if response['status'] != 'ok':
        break
    job_status = response['jobs'][0]['job_status']
    if job_status == 'success' or job_status == 'failure':
        print('\nJob completed with status: {}'.format(job_status))
        break
    else:
        # Display another dot to indicate that polling is still active
        print('.', end='', sep='', flush=True)
    time.sleep(3)

# print('Complete job status details:')
# print(json.dumps(response, indent=2))


Polling status of job "634b41cd9d104c6b849e3b0b852e57ee"...
Job completed with status: success


### Download the job output files

In [14]:
# If successful, display job results
if job_status == 'success':

    # Construct the job file download URL
    job_url = '{}/{}/{}/{}'.format(config['filesBaseUrl'], config['username'], job_type, job_id)
    download_dir = './{}'.format(job_id)
    # Download each raw job file sequentially to a subfolder of the working directory
    download_job_files(job_url, download_dir)
    print('Files for job "{}" downloaded to "{}"'.format(job_id, download_dir))
    list_downloaded_files(download_dir)
else:
    print('The job "{}" failed.'.format(job_id))

Files for job "634b41cd9d104c6b849e3b0b852e57ee" downloaded to "./634b41cd9d104c6b849e3b0b852e57ee"
./634b41cd9d104c6b849e3b0b852e57ee/meta.json
./634b41cd9d104c6b849e3b0b852e57ee/results.csv


### Submit a cutout job to retrieve data from the DESSCI database

In [15]:
job_type = 'cutout'

print('Submitting cutout job...')
response = submit_cutout_job()

# Store the unique job ID for the new job
job_id = response['jobid']
print('New job submitted: {}'.format(job_id))

Submitting cutout job...
Job "534a3fbe88444be2a052bfb783b1b6f1" submitted.
New job submitted: 534a3fbe88444be2a052bfb783b1b6f1


### Poll the status of the job until it is complete

In [16]:
print('Polling status of job "{}"...'.format(job_id), end='')
job_status = ''
while job_status != 'ok':
    # Fetch the current job status
    response = get_job_status(job_id)
    # Quit polling if there is an error getting a status update
    if response['status'] != 'ok':
        break
    job_status = response['jobs'][0]['job_status']
    if job_status == 'success' or job_status == 'failure':
        print('\nJob completed with status: {}'.format(job_status))
        break
    else:
        # Display another dot to indicate that polling is still active
        print('.', end='', sep='', flush=True)
    time.sleep(3)

# print('Complete job status details:')
# print(json.dumps(response, indent=2))


Polling status of job "534a3fbe88444be2a052bfb783b1b6f1"..............
Job completed with status: success


### Download the job output files

In [18]:
# If successful, display job results
if job_status == 'success':

    # Download single compressed archive file containing all job files
    job_archive_filename = '{}.tar.gz'.format(job_id)
    job_archive_file_url = '{}/{}/{}/{}'.format(config['filesBaseUrl'], config['username'], 'cutout', job_archive_filename)
    data = requests.get(job_archive_file_url)
    with open('./{}'.format(job_archive_filename), "wb") as file:
        file.write(data.content)
    print('Job archive file "{}" downloaded.'.format(job_archive_filename))

    # Construct the job file download URL
    job_url = '{}/{}/{}/{}'.format(config['filesBaseUrl'], config['username'], job_type, job_id)
    download_dir = './{}'.format(job_id)
    # Download each raw job file sequentially to a subfolder of the working directory
    download_job_files(job_url, download_dir)
    print('Files for job "{}" downloaded to "{}"'.format(job_id, download_dir))
    list_downloaded_files(download_dir)
else:
    print('The job "{}" failed.'.format(job_id))