# Summary

* [Installation of pyvo](#Installation-of-pyvo)
* [Importing PyVo and checking the version](#Importing-PyVo-and-checking-the-version)
* [Authentication](#Authentication)
* [Short (synchronous) queries](#Short-queries)
* [Asynchronous jobs](#Asynchronous-jobs)
    * [The 60 seconds queue](#The-60-seconds-queue)
    * [The 30 minutes queue](#The-30-minutes-queue)
* [Submitting multiple queries](#Submitting-multiple-queries)
    * [Your query is too long? Chunk it!](#Your-query-is-too-long?-Chunk-it!)
    * [List of file queries](#List-of-file-queries)
* [Convert to various python types](#Convert-to-various-python-types)
* [Automatically downloading files from path results](#Automatically-downloading-files-from-path-results)
* [Archiving your jobs](#Archiving-your-jobs)
    * [Archiving all COMPLETED jobs](#Archiving-all-COMPLETED-jobs)
    * [Rerunning ARCHIVED jobs](#Rerunning-ARCHIVED-jobs)


# Installation of pyvo
---

In order to interact with the TAP interface of `rave-survey.org` you only require
`python 3+` and `pyvo 1+`.

```bash
pip install pyvo>=1.0
```

# Importing PyVo and checking the version
---

It is useful to always print the version of pyvo you are using. Most of non-working scripts fail because of an old version of `pyvo`.



In [None]:
from pkg_resources import parse_version
import pyvo

#
# Verify the version of pyvo
#
if parse_version(vo.__version__) < parse_version('1.0'):
    raise ImportError('pyvo version must be at least than 1.0')

print('\npyvo version {} \n'.format(vo.__version__))

# Authentication
---

After registration you can access your API Token by clicking on your user name in the right side of the menu bar. Then select `API Token`. 

![api-token](files/API_rave_Token_screenshot.png)

You will see a long alphanumerical word. Just copy it where ever you see `<your-token>` ; in the following examples. 

![api-token-blured](files/API_rave_Token_blured.png)

> **The `API Token` identifies you and provides access to the results tables of your queries.**

The connection to the TAP service can be done that way:

In [None]:
import requests
import pyvo

#
# Setup tap_service
#
service_name = 'rave-survey.org'
url = "https://www.rave-survey.org/tap"
token = 'Token <your-token>'

print('TAP service {} \n'.format(service_name))

# Setup authorization
tap_session = requests.Session()
tap_session.headers['Authorization'] = token

tap_service = vo.dal.TAPService(url, session=tap_session)

# Short queries
---

Many queries last less than a few seconds, we call them **short queries**. The latter can be executed with **synchronized** jobs. You will retrieve the results interactively.

In [None]:
lang = 'PostgreSQL'

query = '''
SELECT raveid, radeg, dedeg
FROM "ravedr4"."rave_dr4"
  LIMIT 100;
'''

tap_result = tap_service.run_sync(query, language=lang)

> **Remark**:
> the `lang` parameter can take two values either `PostgreSQL` or `ADQL`
> this allows to access some featured present in the one or the other language 
> for more details about the difference ebtween both please refer : [FAQ](https://gaia.aip.de/cms/documentation/faq/snagsftu1/), [Blog](https://gaia.aip.de/cms/adql-queries-in-the-interface/), [Documentation](http://tapvizier.u-strasbg.fr/adql/help.html) or to [IOVA docs](http://www.ivoa.net/documents/latest/ADQL.html)


The result `tap_result` is a so called [`TAPResults`](https://pyvo.readthedocs.io/en/latest/api/pyvo.dal.TAPResults.html#pyvo.dal.TAPResults) that is essentially a wrapper around an Astropy `votable.Table`. For standard conversion see [Convert to various python types](#convertion).

The entire script can be found on github: [rave-tutorial-sync-job.py](https://github.com/agy-why/rave-tap-pyvo-tutorials/blob/master/rave-tutorial-sync-job.py).

# Asynchronous jobs
---

For slightly longer queries, typically counting or larger selections (>10000 objects) a synchronized job will fail because of timeouts (from http protocol or server settings). This is why we provide the possibility to submit **asynchronous** jobs. These type of jobs will run on the server side, store the results such that you can retrieve them at a later time. They come in two flavors:
* 60 seconds queue
* 30 minutes queue


## The 60 seconds queue

Most of the asynchronous queries will require less than 60 seconds, basically all queries without `JOIN`, or `CONE SEARCH`. Therefore this queue is the default and should be preferred.


In [None]:
#
# Submit the query as an async job
#
query_name = "multi_dr3"
lang = 'PostgreSQL'

query = '''
-- Multiple observations in DR3

SELECT F.raveid, F.radeg, F.dedeg, T.N 
FROM 
    ( SELECT raveid, count(*) AS N 
      FROM ravedr3.rave_dr3a 
          GROUP BY raveid
          HAVING count(*) > 1 
          ORDER BY N DESC ) T JOIN ravedr3.rave_dr3a F 
    ON F.raveid = T.raveid
'''

job = tap_service.submit_job(query, language=lang, runid=query_name, queue="60s")
job.run()

#
# Wait to be completed (or an error occurs)
#
job.wait(phases=["COMPLETED", "ERROR", "ABORTED"], timeout=60.)
print('JOB {name}: {status}'.format(name=job.job.runid , status=job.phase))

#
# Fetch the results
#
job.raise_if_error()
print('\nfetching the results...')
tap_results = job.fetch_result()
print('...DONE\n')

As for sync jobs, the result is a [`TAPResults`](https://pyvo.readthedocs.io/en/latest/api/pyvo.dal.TAPResults.html#pyvo.dal.TAPResults) object.

The entire script can be found on gihub: [rave-tutorial-async-60s.py](https://github.com/agy-why/rave-tap-pyvo-tutorials/blob/master/rave-tutorial-async-60s.py)


## The 30 minutes queue

Some complex queries like Cross-Matching or geometric search may take more than 60 seconds. For this purpose we provide the **30 minutes queue**. If you need longer queue please contact us. 

When running a long query, you surely don't want to block CPU ressources for a python process that just wait for two hours, for the queue to finish. Therefore long queries are typically done in two parts (= two scripts), one that submits the request, another one that retrieve the results.

### Submitting a job and store `job_urls` for later retrieval

We first submit the query as an async job to the `30m` queue, and store the job (the url) of the newly created job into a file `job_url.txt`. With this url we are able to retrieve the results (after it's finished) at any later time.

In [None]:
#
# Submit the query as an async job
#
query_name = "multi_dr3"
lang = 'PostgreSQL'

query = '''
-- Multiple observations in DR3

SELECT F.raveid, F.radeg, F.dedeg, T.N 
FROM 
    ( SELECT raveid, count(*) AS N 
      FROM ravedr3.rave_dr3a 
          GROUP BY raveid
          HAVING count(*) > 1 
          ORDER BY N DESC ) T JOIN ravedr3.rave_dr3a F 
    ON F.raveid = T.raveid
'''

job = tap_service.submit_job(query, language=lang, runid=query_name, queue="30m")
job.run()

print('JOB {name}: SUBMITTED'.format(name=job.job.runid))
print('JOB {name}: {status}'.format(name=job.job.runid , status=job.phase))

#
# Save the job's url in a file to later retrieve results.
#
print('URL: {}'.format(job.url))

with open('job_url.txt', 'w') as fd:
    fd.write(job.url)

### Retrieve the results at a later time

In order to retrieve the results, we will first recreate the job from the `job_url` stored in the `job_url.txt` file and verify that the job is finished, by asking for its current phase. In case the job is finished we will retrieve the results as usual.

In [None]:
#
# Recreate the job from url and session (token)
#

# read the url
with open('job_url.txt', 'r') as fd:
    job_url = fd.readline()

# recreate the job 
job = AsyncTAPJob(job_url, session=tap_session)

#
# Check the job status
#
print('JOB {name}: {status}'.format(name=job.job.runid , status=job.phase))

# if still running --> exit
if job.phase not in ("COMPLETED", "ERROR", "ABORTED"):
    exit(0)

#
# Fetch the results
#
job.raise_if_error()
print('\nfetching the results...')
tap_results = job.fetch_result()
print('\n...DONE\n')

Thanks to this method you can submit a job, go for a coffee, write a paper and retrieve the results 
when it suits you. The job and its results are stored on the server side under your user account.

The entire scripts can be found on github: [rave-tutorial-submit-30m.py](https://github.com/agy-why/rave-tap-pyvo-tutorials/blob/master/rave-tutorial-submit-30m.py) and [rave-tutorial-retrieve-30m.py](https://github.com/agy-why/rave-tap-pyvo-tutorials/blob/master/rave-tutorial-retrieve-30m.py)

# Submitting multiple queries

Some time it is needed to submit several queries at one time. Either because the entire query may last longer than 30 minutes and you need to cut it in smaller parts, or because you need non `JOIN`-able information from various tables.

## Your query is too long? Chunk it!

Before contacting us and ask for longer queue time: You may try to cut the long query into chunks, and execute your long query as a list of shorter queries. 

There are two typical way to do it:
* cut it with `LIMIT` and `OFFSET`
* filter it with `WHERE`

Let us concider this query:

In [None]:
base_query = '''
-- Get objects with radial velocites within a range (DR4) 

SELECT * 
FROM ravedr4.rave_dr4
  WHERE hrv
    BETWEEN 5.0 AND 25.0 
  ORDER BY hrv DESC
  '''

We can add `LIMIT` and `OFFSET` in order to cut the request in shorter parts:

In [None]:
base_query = '''
-- Get objects with radial velocites within a range (DR4) 

SELECT * 
FROM ravedr4.rave_dr4
  WHERE hrv
    BETWEEN 5.0 AND 25.0 
  ORDER BY hrv DESC
  
  LIMIT {limit} OFFSET {offset}
'''

and use it the following way:

In [None]:
# list of failed jobs
failed = []

#
# Submit queries
#
base_name = 'hrv_{index}'
lang = 'PostgreSQL'

base_query = '''
-- Get objects with radial velocites within a range (DR4) 

SELECT * 
FROM ravedr4.rave_dr4
  WHERE hrv
    BETWEEN 5.0 AND 25.0 
  ORDER BY hrv DESC
  
  LIMIT {limit:d} OFFSET {offset:d}
'''

limit = 1000
total = 10000
index = 0

# open the file to store the jobs
fd = open('jobs_url.txt', 'w')

# header 
print('          {: ^{name_width}} -- limit : offset'.format('name', name_width=len(base_name)-6))

for offset in range(0, total, limit):

    query = base_query.format(limit=limit, offset=offset)
    name = base_name.format(index=index)

    print('> Query : {name} -- {limit}:{offset}'.format(name=name, limit=limit, offset=offset))

    # Create the async job
    try:
        job = tap_service.submit_job(query, language=lang, runid=name, queue="30m")
    except Exception as e:
        print('ERROR could not create the job.')
        print(e)
        failed.append(runid)
        continue

    # Run the run
    try:
        job.run()
    except Exception as e:
        print('Error: could not run the job. Are you sure about your SQL query? Wrong language?')
        print(e)
        failed.append(name)
        continue

    # Save the submitted jobs into a file
    fd.write(job.url + '\n')
    index = index + 1

# Verify that all jobs have been submitted
try:
    assert(failed == [])
except AssertionError:
    print("The following jobs had failed: {jobs}".format(failed))

fd.close()

Retrieving the results is similar to the single query version above.

In [None]:
partial_results = []
running_job_names = []

#
# Recreate the job from url and session (token)
#

# read the url
with open('jobs_url.txt', 'r') as fd:
    job_urls = fd.readlines()

# reopen the file to store the non finished jobs
fd = open('jobs_url.txt', 'w')

for job_url in job_urls:

    # recreate the job 
    job = AsyncTAPJob(job_url, session=tap_session)

    #
    # Check the job status
    #
    print('JOB {name}: {status}'.format(name=job.job.runid , status=job.phase))

    # if still running --> exit
    if job.phase not in ("COMPLETED", "ERROR", "ABORTED"):
        running_job_names.append(job.job.runid)
        fd.write(job_url)
        continue

    #
    # Fetch the results
    #
    job.raise_if_error() # This need to be caught!!
    print('fetching the results...\n')
    partial_results.append(job.fetch_result())
    
print('...DONE\n')

try:
    assert(running_job_names == [])
except AssertionError:
    print("The following jobs are still executing: {}".format(running_job_names))

fd.close()

The entire scripts can be found on gihub: [rave-tutorial-submit-multi.py](https://github.com/agy-why/rave-tap-pyvo-tutorials/blob/master/rave-tutorial-submit-multi.py) and [rave-tutorial-retrieve-multi.py](https://github.com/agy-why/rave-tap-pyvo-tutorials/blob/master/rave-tutorial-retrieve-multi.py)

## List of file queries

Sometimes it is useful to just send all `.sql` queries present in a directory. For such purpose you can use comments to provide the proper parameters.

Let us consider the file `rave-example03.sql`

```sql
-- Count the number of objects in the RAVE DR4 database

-- LANGUAGE = PostgreSQL
-- QUEUE = 60s

SELECT count(*)
FROM "ravedr4"."rave_dr4";
```

The `language` and `queue` are prescibed as comments. The query can then be submitted in a script like the following:

In [None]:
import glob

#
# Submit the query as an Asynchrone job
#

# find all .sql files in current directory
queries_filename = sorted(glob.glob('./*.sql'))
print('Sending {n} examples'.format(n=len(queries_filename)))

# initialize test results
jobs = []
failed =  []

# send all queries
for query_filename in queries_filename:

    # read the .SQL file
    with open(query_filename, 'r') as fd:
        query = ' '.join(fd.readlines())

    # Set language from comments (default: PostgreSQL)
    if 'LANGUAGE = ADQL' in query:
        lang = 'ADQL'
    else:
        lang = 'PostgreSQL'

    # Set queue from comments (default: 60s)
    if 'QUEUE = 30m' in query:
        queue = "30m"
    else:
        queue = "60s"

    # Set the runid from sql filename
    base = os.path.basename(query_filename)
    runid = os.path.splitext(base)[0]
    
    print('\n> Query : {name}\n{query}\n'.format(name=runid, query=query))

The rest of the submission process and retrieval can be done in any manner. An example 
can be found on github: [rave-tutorial-from-files.py](https://github.com/agy-why/rave-tap-pyvo-tutorials/blob/master/rave-tutorial-from-files.py)

# Convert result to various python types

The results obtained via the `fetch_results()` method returns a so called `TAPResults` object. The latter is essencially a `votable`. In case you are not familiar with `votables` here is a few tricks to get back to some more general pythonic types.

* Print the data:
  ```python
  tap_results.to_table().pprint(max_lines=10)
  ```
  It is important to notice the `max_lines` keyword, printing too many lines may crash a low-memory machine.
  
* Show as html (in a browser):
  ```python
  tap_results.to_table().show_in_browser(max_lines=10)
  ```
  It is important to notice the `max_lines` keyword, printing too many lines may crash a low-memory machine.
  
* Show in a notebook (ipython, jupyter or jupyterlab):
  ```python
  tap_results.to_table().show_in_notebook(display_length=10)
  ```
  It is important to notice the `display_length` keyword, printing too many lines may crash a low-memory machine.
  
* Get a numpy array:
  ```python
  np_array = tap_results.to_table().as_array()
  ```
  
* Get a Panda's DataFrame
    ```python
    df = tap_results.to_table().to_pandas()
    ```
    
    * Get the header of DataFrame:
        ```python
        df.head()
        ```

# Automatically downloading files from path results

Some queries do not return `data` but the `urls` of the files where the data are stored ; examples are spectras, or images. When your query returns a few file-paths it is possible to download them by hand, however it is usually more practical to download them automaticaly. Here is an example using pure python code:

In [None]:
#
# Automatically downloading files
#

#
# Query the fits files
#
lang = "PostgreSQL"

query = """
SELECT spectrum_fits 
FROM ravedr6.dr6_spectra 
  WHERE rave_obs_id LIKE '200304%' 
"""

# Submit the query as Synchronous job
tap_result = tap_service.run_sync(query, language=lang)
path_to_fits = tap_result.to_table()

#
# Download the fits files into local directory
#
target_directory = './fits/'
fit_file_base_url = 'https://www.rave-survey.org/files/'

for fit_file in path_to_fits:
    
    # extract name of the fits
    fit_name = os.path.basename(fit_file[0])
    
    # set the target local file
    fit_file_name = os.path.join(target_directory, fit_name)
    
    # build the url pointing to the fit file
    fit_file_url = os.path.join(fit_file_base_url, fit_file[0])
    
    # download and save into target file
    print("Downloaded {fitfile} into {target}".format(fitfile=fit_name, target=fit_file_name))
    urllib.request.urlretrieve(fit_file_url, fit_file_name)
    
print('\nDone')

# Archiving your jobs

If you submit several large queries you may go over quota: set to 10 GB. In order to avoid to get over quota you may consider archiving your jobs. Archiving removes the data from the server side but keeps the SQL query. This allows to resubmit a query at a later time.

Deleting (archiving) a job with pyvo can be simply done that way:

In [None]:
job.delete()

## Archiving all COMPLETED jobs

A nice feature of the TAP service is to retrieve all jobs that are marked as completed and archive them at ones. This can be done as follows:

In [None]:
#
# Archiving all COMPLETED jobs
#

# obtain the list of completed job_descriptions
completed_job_descriptions = tap_service.get_job_list(phases='COMPLETED')

# Archiving each of them
for job_description in completed_job_descriptions:
    
    # get the jobid
    jobid = job_description.jobid
    
    # recreate the url by hand
    job_url = tap_service.baseurl + '/async/' + jobid
    
    # recreate the job
    job = AsyncTAPJob(job_url, session=tap_session)
    
    print('Archiving: {url}'.format(url=job_url))
    job.delete() # archive job

An example can be found on github: [rave-tutorial-archive-jobs.py](https://github.com/agy-why/rave-tap-pyvo-tutorials/blob/master/rave-tutorial-archive-jobs.py)

## Rerunning ARCHIVED jobs

Rerunning and retrieving results from a job that have been archived previously, can be achieved that way:

In [None]:
#
# Rerunning Archived jobs
#

# obtain the list of the two last ARCHIVED job_descriptions
archived_job_descriptions = tap_service.get_job_list(phases='ARCHIVED', last=2)

# rerunning the two last Archived jobs
for job_description in archived_job_descriptions:
    
    # get jobid
    jobid = job_description.jobid
    
    # recreate the url by hand
    job_url = tap_service.baseurl + '/async/' + jobid
    
    # recreate the archived job
    archived_job = AsyncTAPJob(job_url, session=tap_session)
    
    # extract the query
    query = archived_job.query
    
    # resubmit the query with corresponding parameters
    job = tap_service.submit_job(query, language='PostgreSQL', runid='rerun', queue='60s')
    print('{url}:\n{query}\n'.format(url=job_url, query=query))
    
    # start the job
    job.run()

Retrieving the results is done alike explained above.

If you prefer you can also filter for a given `runid`.

In [None]:
#
# Filtering by runid
#

target_runid = 'rave-example03'

# obtain the list of completed job_descriptions
archived_job_descriptions = tap_service.get_job_list(phases='ARCHIVED')

for job_description in archived_job_descriptions:
    
    # select the job with runid fitting target_runid
    if job_description.runid == target_runid:
        
        # get jobid
        jobid = job_description.jobid
    
        # recreate the url by hand
        job_url = tap_service.baseurl + '/async/' + jobid
    
        # recreate the archived job
        archived_job = AsyncTAPJob(job_url, session=tap_session)
    
        # extract the query
        query = archived_job.query
    
        # resubmit the query with corresponding parameters
        job = tap_service.submit_job(query, language='PostgreSQL', runid='rerun', queue='60s')
        print('{url}:\n{query}\n'.format(url=job_url, query=query))
    
        # start the job
        job.run()

An example can be found on github: [rave-tutorial-rerunning-archived-jobs.py](https://github.com/agy-why/rave-tap-pyvo-tutorials/blob/master/rave-tutorial-rerunning-archived-jobs.py)