# `penquins`: a `python` client library for `kowalski` 

*`penquins` - Processing ENormous Queries of ztf Users INStantaneously - Zero to Hero*

In this tutorial, we will explore the details of how to programmatically query `kowalski` with `python >3.6`. We will also cover the best practices and pro tips.

## Installation

[`penuquins`](https://github.com/dmitryduev/kowalski/blob/master/penquins.py) is very lightweight and only depends on `pymongo` and `requests`. Use `pip` to install it into your environment:

In [1]:
#!pip install git+https://github.com/dmitryduev/kowalski.git

## Quick start

In [2]:
from IPython.core.display import display, HTML, JSON
import json

from penquins import Kowalski

For security, let us store the access credentials in a `json` file `secrets_penquins.json`:

In [3]:
secrets = {
    "kowalski": {
        "username": "YOUR_USERNAME",
        "password": "YOUR_PASSWORD"
    }
}

with open('secrets.json', 'w') as f:
    json.dump(secrets, f)

Load the credentials and initialize a `Kowalski` object:

In [4]:
with open('secrets_penquins.json', 'r') as f:
    secrets = json.load(f)

k = Kowalski(username=secrets['kowalski']['username'], password=secrets['kowalski']['password'])

By default, the `Kowalski` object will try to connect to the `kowalski` instance running at Caltech using the following parameters:
```python
protocol='https', host='kowalski.caltech.edu', port=443
```

You can explicitely set those if you are connecting to another instance of `kowalski`.

Set `verbose=True` if you want more feedback from `Kowalski`.

Let us check that the connection is healthy:

In [5]:
connection_ok = k.check_connection()
print(f'Connection OK: {connection_ok}')

Connection OK: True


Now let us construct a simple query that should return the `candid` of a ZTF alert from the `ZTF_alerts` collection and run it. 

_Please see below for more info on the available query types and how to (efficiently) construct them_.

In [6]:
q = {"query_type": "find",
     "query": {
         "catalog": "ZTF_alerts",
         "filter": {"candid": 714287740515015072},
#          "filter": {"objectId": "ZTF18acrkaks"},
         "projection": {"_id": 0, "candid": 1},
     }
     }
r = k.query(query=q)
display(JSON(r, expanded=True))

<IPython.core.display.JSON object>

This query will block the execution of your program until it receives the result or when it hits the default timeout, which is set to _24 hours_. You can manually set up the query timeout in _milliseconds_ after which it will be killed on the server:

In [7]:
q['kwargs'] = {'max_time_ms': 10}
r = k.query(query=q)
r

{'user': 'admin',
 'kwargs': {'max_time_ms': 10},
 'status': 'done',
 'result_data': {'query_result': [{'candid': 714287740515015072}]}}

Starting from `penquins` version `1.0.0`, queries are no longer registered in the database and saved to disk _by default_,
which provides significant execution speed improvement.

You can enqueue a query on the server. This may be useful for long-running queries:

In [8]:
q['kwargs'] = {'enqueue_only': True}
r = k.query(query=q)
display(r)

{'status': 'enqueued', 'query_id': '141475b6fc6b8ae75fdef56234daba7a'}

Executing this query will return a query `id` that can be then used to retrieve the query result:

In [9]:
qid = r['query_id']
result = k.get_query(query_id=qid, part='result')
display(result)

{'task_id': '141475b6fc6b8ae75fdef56234daba7a',
 'result': '{"query_result": [{"candid": 714287740515015072}]}'}

You can also retrieve the original query:

In [10]:
result = k.get_query(query_id=qid, part='task')
display(JSON(result, expanded=True))

<IPython.core.display.JSON object>

Or delete the query from Kowalski:

In [11]:
result = k.delete_query(query_id=qid)
display(result)

{'message': 'success'}

By default, the queries/results stored on `kowalski` are deleted after five days. 
To override this, set a manual expiration interval in days:

In [12]:
q["kwargs"] = {"query_expiration_interval": 30}

## Error management

Note: `kowalski` will refuse connection if your installed version of `penquins` is outdated.

In case a query fails, the result will contain the traceback error message. Using our running example query:

In [13]:
q = {"query_type": "find",
     "query": {
         "catalog": "ZTF_alerts",
         "filter": {"candid": 714287740515015072},
#          "filter": {"objectId": "ZTF18acrkaks"},
         "projection": {"_id": 0, "candid": 1, "objectId": 0},
     }
     }

r = k.query(query=q)
# r
print(r['msg'])

Traceback (most recent call last):
  File "/app/server.py", line 957, in execute_query
    query_result['query_result'] = await _select.to_list(length=None)
  File "/usr/local/lib/python3.6/site-packages/motor/core.py", line 1133, in _to_list
    result = get_more_result.result()
  File "/usr/local/lib/python3.6/concurrent/futures/thread.py", line 56, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/usr/local/lib/python3.6/site-packages/pymongo/cursor.py", line 1140, in _refresh
    self.__send_message(q)
  File "/usr/local/lib/python3.6/site-packages/pymongo/cursor.py", line 1010, in __send_message
    helpers._check_command_response(first)
  File "/usr/local/lib/python3.6/site-packages/pymongo/helpers.py", line 155, in _check_command_response
    raise OperationFailure(msg % errmsg, code, response)
pymongo.errors.OperationFailure: Projection cannot have a mix of inclusion and exclusion.



## Querying `kowalski`

`Kowalski`'s API supports several types of queries: `info`, `cone_search`, `general_search`, and a number of frequently-used sub-types of general search (`find`, _todo:_ `find_one`, _todo:_ `aggregate`).

`Kowalski` uses a `MongoDB` `NoQSL` database on the backend. The query syntax is therefore based on the `MongoDB` query language, `MQL`. It might look unusual if you are experienced in `SQL`, but feels quite natural if you are using `python`.

### General advice, recommendations, and best practices

`Kowalski` gives a lot of power to its users, so it is expected that it is used responsibly.

- Check available catalogs:

In [14]:
q = {"query_type": "info",
     "query": {
         "command": "catalog_names"
     }
     }
r = k.query(query=q)
display(r['result_data']['query_result'])

['sdss_ellipticals',
 'mzls_ellipticals',
 'milliquas_v6',
 'legacysurveys_photoz_DR7',
 'legacysurveys_photoz_DR6',
 'cfht_w3_photozs',
 'ZTF_sources_20190614',
 'ZTF_exposures_20190614',
 'ZTF_alerts',
 'ZTF_20181220',
 'ZTF_20180919',
 'TNS',
 'TIC_7',
 'TGSS_ADR1',
 'RFC_2018d',
 'RFC_2017c',
 'PanSTARRS1',
 'NVSS_41',
 'Known_lenses_20180901',
 'IPHAS_DR2',
 'Gaia_DR2_light_curves',
 'Gaia_DR2_WD',
 'Gaia_DR2_2MASS_best_neighbour',
 'Gaia_DR2',
 'FIRST_20141217',
 'CLU_20190625',
 'CLU_20190406',
 'CLU_20181213V2',
 'CLU_20180513',
 'CLU_20170106',
 'AllWISE',
 'AMSG_20180302',
 '2MASS_XSC',
 '2MASS_PSC']

- To check the available "fields" (think "columns" in SQL/table-speak) for a particular catalog, explore the last ingested document.
    - `MongoDB` does not enforce any schema by default meaning that the contents of individual documents in a collection/catalog may differ from one another.
    - Make sure the field you are querying actually exists! For example, if you make a typo and query for a non-existent field, the database will have to perform a full collection scan looking for entries that have the mis-typed field.

In [15]:
q = {"query_type": "general_search",
     "query": "db['ZTF_alerts'].find().sort([('_id', -1)]).limit(1)"
     }
r = k.query(query=q)
display(JSON(r['result_data']['query_result'][0]))

<IPython.core.display.JSON object>

- Before querying a catalog, explore its stats and available indexes. 

    - You want to minimize the required I/O operations. Indexes on "fields" (think "columns" in `SQL`/table-speak) provide a fast way to find the location of a "document" (think catalog entry) on disk for further retrieval, or even to fetch the field value if no other data is needed (so-called "covered queries"). 
    - Indexes may be compound to speed-up multi-field queries / enable multi-field covered queries.
    - Whenever possible, construct your queries to use indexes. A query that cannot use an index will initiate a full collection (catalog) scan, which for large catalogs may be very costly in terms of I/O = time.

- If possible, construct your queries in such a way that the result is small(er) in size. Think of a way to "slice" your query into smaller chunks; this is usually good for performance.

To illustrate the advice given above, let us assume we want to query the `ZTF_alerts` catalog. 

In [16]:
# print large numbers in human-readable format
!pip install humanize
import humanize



In [17]:
q = {"query_type": "info",
     "query": {
         "command": "catalog_info",
         "catalog": "ZTF_alerts"
     }
     }
r = k.query(query=q)
size = r['result_data']['query_result']['size']
count = r['result_data']['query_result']['count']
avg_doc_size = r['result_data']['query_result']['avgObjSize']

print(f'Catalog size: {humanize.naturalsize(size)}')
print(f'Number of entries: {humanize.intword(count)}')
print(f'Average entry size: {humanize.naturalsize(avg_doc_size)}')

Catalog size: 14.5 TB
Number of entries: 142.4 million
Average entry size: 101.5 kB


That's a lot of data! You definitely want to minimize I/O for your queries! Let us explore the available indexes:

In [18]:
q = {"query_type": "info",
     "query": {
         "command": "index_info",
         "catalog": "ZTF_alerts"
     }
     }
r = k.query(query=q)

# indexes = [v['key'] for k, v in r['result_data']['query_result'].items()]
# for ii, ind in enumerate(indexes):
#     print(f'index #{ii+1}: {ind}')

indexes = r['result_data']['query_result']
for ii, (kk, vv) in enumerate(indexes.items()):
    print(f'index #{ii+1}: "{kk}"\n{vv["key"]}\n')

index #1: "_id_"
[['_id', 1]]

index #2: "objectId_1"
[['objectId', 1]]

index #3: "candid_1"
[['candid', 1]]

index #4: "candidate.field_1"
[['candidate.field', 1]]

index #5: "candidate.fwhm_1"
[['candidate.fwhm', 1]]

index #6: "candidate.magpsf_1"
[['candidate.magpsf', 1]]

index #7: "candidate.rb_1"
[['candidate.rb', 1]]

index #8: "candidate.jd_1_candidate.programid_1"
[['candidate.jd', 1], ['candidate.programid', 1]]

index #9: "candidate.nid_1"
[['candidate.nid', 1]]

index #10: "coordinates.radec_geojson_2dsphere___id_1"
[['coordinates.radec_geojson', '2dsphere'], ['_id', 1]]

index #11: "coordinates.radec_geojson_2dsphere"
[['coordinates.radec_geojson', '2dsphere']]

index #12: "candidate_jd___id_1"
[['candidate.jd', 1], ['_id', 1]]

index #13: "candidate_jd_1__candidate_rb_1___id_1"
[['candidate.jd', 1], ['candidate.rb', 1], ['_id', 1]]

index #14: "candidate_jd_1__candidate_field_1__candidate_rb_1___id"
[['candidate.jd', 1], ['candidate.field', 1], ['candidate.rb', 1], ['_i

The database wants to minimize I/O, too, so when you make a query, it will first try to find an index to speed up your query. You can explicitely provide it with a `hint` on what index to try and use (see below).

If the database fails to find such an index, it will have to look at the individual documents on disk.

The same applies if you make a type and query the `ZTF_alerts` catalog for, for example, "objectID" or "candidate.objectId" (both don't exist) instead of "objectId", the database will have to look at _all_ entries in the database.

### Query examples

*Sample cone search 1*

Get all objects from the `ZTF_alerts` catalog within `8` arcseconds for two sky positions 
`(173.5155088, 33.0845502), (172.1345, 30.5412)`
and return, `_id`'s, `objectId`'s, `candidate.rcid`'s, and `candidate.rb`'s. 

In [19]:
q = {"query_type": "cone_search",
     "object_coordinates": {
         "radec": "[(173.5155088, 33.0845502), (172.1345, 30.5412)]", 
         "cone_search_radius": "8",
         "cone_search_unit": "arcsec"
     },
     "catalogs": {
         "ZTF_alerts": {
             "filter": {},
             "projection": {
                 "objectId": 1,
                 "candidate.rcid": 1,
                 "candidate.rb": 1
             }
         }
     }
     }
r = k.query(query=q)
data = r['result_data']
display(JSON(data, expanded=True))

<IPython.core.display.JSON object>