# `penquins`: a `python` client library for `kowalski` 

---

Run this notebook in the browser in `Google Colab`: 

[![Open In Google Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/dmitryduev/kowalski/blob/master/nb/penquins-tutorial.ipynb)

<b>Tip:</b> if you are running this notebook in `Google Colab` and run out of disk space with the default runtime, try changing it to a GPU-accelerated one in `Runtime` -> `Change runtime type` -> `Hardware accelerator` -> `GPU`.

---

`penquins` - <u>p</u>rocessing <u>en</u>ormous <u>q</u>ueries of ztf <u>u</u>sers <u>ins</u>tantaneously 

## Zero to Hero

In this tutorial, we will explore the details of how to (efficiently) programmatically query `kowalski` with `python >3.6`. We will also cover the best practices and pro tips.

<div style="color: #155724;
    background-color: #d4edda;
    border-color: #c3e6cb;
    padding: .75rem 1.25rem;
    margin-bottom: 1rem;
    border: 1px solid transparent;
    border-radius: .25rem;">
    <b>Note:</b> it is strongly recommended to thoroughly study this notebook if you want to efficiently use <tt>kowalski</tt>. <tt>Kowalski</tt> gives a lot of power to its users, so it is expected that it is used responsibly.
</div>

## Installation

[`penuquins`](https://github.com/dmitryduev/kowalski/blob/master/penquins.py) is very lightweight and only depends on `pymongo` and `requests`. Use `pip` to install it into your environment:

In [1]:
!pip install git+https://github.com/dmitryduev/kowalski.git

## Quick start

Import the necessities:

In [2]:
from IPython.core.display import display, HTML, JSON
import json

from penquins import Kowalski

For security, let us store the access credentials in a `json` file `secrets_penquins.json`:

In [3]:
secrets = {
    "kowalski": {
        "username": "YOUR_USERNAME",
        "password": "YOUR_PASSWORD"
    }
}

# with open('secrets_penquins.json', 'w') as f:
#     json.dump(secrets, f)

Load the credentials and initialize a `Kowalski` object:

In [17]:
with open('secrets_penquins.json', 'r') as f:
    secrets = json.load(f)

k = Kowalski(username=secrets['kowalski']['username'], password=secrets['kowalski']['password'])

By default, the `Kowalski` object will try to connect to the `kowalski` instance running at Caltech using the following parameters:
```python
protocol='https', host='kowalski.caltech.edu', port=443
```

You can explicitely set those if you are connecting to another instance of `kowalski`.

Set `verbose=True` if you want more feedback from `Kowalski`.

Let us check that the connection is healthy:

In [18]:
connection_ok = k.check_connection()
print(f'Connection OK: {connection_ok}')

Connection OK: True


Now let us construct a simple query that should return the `candid` of a ZTF alert from the `ZTF_alerts` collection and run it. 

_Please see below for more info on the available query types and how to (efficiently) construct them_.

In [20]:
q = {"query_type": "find",
     "query": {
         "catalog": "ZTF_alerts",
         "filter": {"candid": 935175894815015009},
#          "filter": {"candid": 714287740515015072},
#          "filter": {"objectId": "ZTF18acrkaks"},
         "projection": {"_id": 0, "candid": 1},
     }
     }
r = k.query(query=q)
display(JSON(r, expanded=True))

<IPython.core.display.JSON object>

This query will block the execution of your program until it receives the result or when it hits the default timeout, which is set to _5 minutes_. You can manually set up the query timeout in _milliseconds_ after which it will be killed on the server:

In [7]:
q['kwargs'] = {'max_time_ms': 10}
r = k.query(query=q)
r

{'user': 'admin',
 'kwargs': {'max_time_ms': 10},
 'status': 'done',
 'result_data': {'query_result': [{'candid': 714287740515015072}]}}

Starting from `penquins` version `1.0.0`, queries are no longer registered in the database and saved to disk _by default_,
which provides significant execution speed improvement.

You can enqueue a query on the server, which may be useful for long-running queries. Remember to set the `max_time_ms` parameter if you are expecting the query to run for more than five minutes.

In [21]:
q['kwargs'] = {'enqueue_only': True, 'max_time_ms': 600000}
r = k.query(query=q)
display(r)

{'status': 'enqueued', 'query_id': 'aa2f6227ea40f5d318726c1c49d88c7e'}

Executing this query will return a query `id` that can be then used to retrieve the query result:

In [22]:
qid = r['query_id']
result = k.get_query(query_id=qid, part='result')
display(result)

{'task_id': 'aa2f6227ea40f5d318726c1c49d88c7e',
 'result': '{"query_result": [{"candid": 935175894815015009}]}'}

You can also retrieve the original query:

In [23]:
result = k.get_query(query_id=qid, part='task')
display(JSON(result, expanded=True))

<IPython.core.display.JSON object>

Or delete the query from Kowalski:

In [24]:
result = k.delete_query(query_id=qid)
display(result)

{'message': 'success'}

By default, the queries/results stored on `kowalski` are deleted after five days. 
To override this, set a manual expiration interval in days:

In [12]:
q["kwargs"] = {"query_expiration_interval": 30}

## Error management

<div style="color: #0c5460;
    background-color: #d1ecf1;
    border-color: #bee5eb;
    padding: .75rem 1.25rem;
    margin-bottom: 1rem;
    border: 1px solid transparent;
    border-radius: .25rem;">
    <b>Note:</b> <tt>kowalski</tt> will refuse connection if your installed version of <tt>penquins</tt> is outdated.
</div>

In case a query fails, the result will contain the traceback error message. Using our running example query:

In [25]:
q = {"query_type": "find",
     "query": {
         "catalog": "ZTF_alerts",
         "filter": {"candid": 714287740515015072},
#          "filter": {"objectId": "ZTF18acrkaks"},
         "projection": {"_id": 0, "candid": 1, "objectId": 0},
     }
     }

r = k.query(query=q)
# r
print(r['msg'])

Traceback (most recent call last):
  File "/app/server.py", line 1027, in execute_query
    query_result['query_result'] = await _select.to_list(length=None)
  File "/usr/local/lib/python3.6/site-packages/motor/core.py", line 1133, in _to_list
    result = get_more_result.result()
  File "/usr/local/lib/python3.6/concurrent/futures/thread.py", line 56, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/usr/local/lib/python3.6/site-packages/pymongo/cursor.py", line 1140, in _refresh
    self.__send_message(q)
  File "/usr/local/lib/python3.6/site-packages/pymongo/cursor.py", line 1010, in __send_message
    helpers._check_command_response(first)
  File "/usr/local/lib/python3.6/site-packages/pymongo/helpers.py", line 155, in _check_command_response
    raise OperationFailure(msg % errmsg, code, response)
pymongo.errors.OperationFailure: Projection cannot have a mix of inclusion and exclusion.



## Querying `kowalski`

`Kowalski`'s API supports several types of queries: `info`, `cone_search`, `general_search`, and a number of frequently-used sub-types of general search: `find`, `find_one`, `count_documents`, and `aggregate`.

`Kowalski` uses a [`MongoDB` `NoQSL` database](https://mongodb.com) on the backend. The query syntax is therefore based on the `MongoDB` query language, `MQL`. It might look unusual if you are experienced in `SQL`, but feels quite natural if you are using `python`.

### General advice, recommendations, and best practices

<div style="color: #155724;
    background-color: #d4edda;
    border-color: #c3e6cb;
    padding: .75rem 1.25rem;
    margin-bottom: 1rem;
    border: 1px solid transparent;
    border-radius: .25rem;">
    <b>Note:</b> <tt>Kowalski</tt> gives a lot of power to its users, so it is expected that it is used responsibly.
</div>

#### Query types

- The `info` query type is used to get the available catalog names and information about the available indexes.

- The `cone_search` query type is used to positionally cross-match one or more `ICRS` points ($R.A.$, $Decl.$) with one or more catalogs. Additional filtering, result projection/size restriction/index "hints" may be applied as well.

- The `find` query type is used to query a catalog using a filter expression. The users have control over the result projection/size restriction and index "hints".

- The `find_one` query type is similar to `find`, but returns the first filter expression match and does not allow result projection.

- The `count_documents` query type is used to count the number of documents in a catalog that match a particular filter expression. 

- The `aggregate` query type is used to execute aggregation pipelines on a catalog potentially involving complicated computations and/or/involving (left outer) joins with other catalogs.

- The `general_search` query type provides the lowest level access to the database. It comes with no default "noob-protection", so the users are encouraged to use it only when absolutely necessary.

<div style="color: #0c5460;
    background-color: #d1ecf1;
    border-color: #bee5eb;
    padding: .75rem 1.25rem;
    margin-bottom: 1rem;
    border: 1px solid transparent;
    border-radius: .25rem;">
    <b>Note:</b> Use <tt>general_search</tt> only when absolutely necessary!<br>
    In many cases, the new general search query sub-types should be preferred over the <tt>general_search</tt> type as they provide better readability and control over the query contents and come with some "noob protection": by default, such queries will time out after 5 minutes. The same also applies to the <tt>cone_search</tt> query type.<br>

</div>

#### Gotchas

- Check available catalogs:

In [28]:
q = {"query_type": "info",
     "query": {
         "command": "catalog_names"
     }
     }
r = k.query(query=q)
catalog_names = r['result_data']['query_result']
display(catalog_names)

['sdss_ellipticals',
 'mzls_ellipticals',
 'milliquas_v6',
 'legacysurveys_photoz_DR7',
 'legacysurveys_photoz_DR6',
 'cfht_w3_photozs',
 'ZTF_alerts',
 'TNS',
 'RFC_2019a',
 'Known_lenses_20180901',
 'IPHAS_DR2',
 'Gaia_DR2_light_curves',
 'Gaia_DR2_WD',
 'Gaia_DR2',
 'CLU_20190625',
 'CLU_20190406',
 'CLU_20181213V2',
 'AllWISE',
 '2MASS_XSC',
 '2MASS_PSC']

- To check the available "fields" (think "columns" in SQL/table-speak) for a particular catalog, explore the last ingested document.
    - `MongoDB` does not enforce any schema by default meaning that the contents of individual documents in a collection/catalog may differ from one another.
    - <b>Make sure the field you are querying actually exists!</b> For example, if you make a typo and query for a non-existent field, the database will have to perform a full collection scan looking for entries that have the mis-typed field.
    
For example, get the last document ingested into the `ZTF_alerts` collection:

In [29]:
q = {"query_type": "general_search",
     "query": "db['ZTF_alerts'].find().sort([('_id', -1)]).limit(1)"
     }
r = k.query(query=q)
alert = r['result_data']['query_result'][0]
display(JSON(alert))

<IPython.core.display.JSON object>

If you are now planning to query for e.g. `objectID` or `obsjd`, or `candidate.objectId`, check that you will not trigger a full database scan (<b>spoiler:</b> you will since none of these exists in any alert):

In [14]:
'objectID' in alert

False

In [15]:
'obsjd' in alert

False

In [16]:
('candidate' in alert) and ('objectId' in alert['candidate'])

False

- Before querying a catalog, explore its stats and <b>available indexes</b>. 

    - You want to minimize the required I/O operations. Indexes on "fields" (think "columns" in `SQL`/table-speak) provide a fast way to find the location of a "document" (think catalog entry) on disk for further retrieval, or even fetching the field value if no other data is needed (so-called "covered queries"). 
    - Indexes may be compound to speed-up multi-field queries / enable multi-field covered queries (see below for more details). 
    - Whenever possible, construct your queries to use indexes. A query that cannot use an index will initiate a full collection (catalog) scan, which for large catalogs may be very costly in terms of I/O = time.

To illustrate the advice given above, let us assume we want to query the `ZTF_alerts` catalog. 

In [10]:
# print large numbers in human-readable format
!pip install humanize
import humanize



In [30]:
q = {"query_type": "info",
     "query": {
         "command": "catalog_info",
         "catalog": "ZTF_alerts"
     }
     }
r = k.query(query=q)
size = r['result_data']['query_result']['size']
count = r['result_data']['query_result']['count']
avg_doc_size = r['result_data']['query_result']['avgObjSize']

print(f'Catalog size: {humanize.naturalsize(size)}')
print(f'Number of entries: {humanize.intword(count)}')
print(f'Average entry size: {humanize.naturalsize(avg_doc_size)}')

Catalog size: 1.9 TB
Number of entries: 19.5 million
Average entry size: 99.0 kB


That's a lot of data! You definitely want to minimize I/O for your queries! Let us explore the available indexes:

In [31]:
q = {"query_type": "info",
     "query": {
         "command": "index_info",
         "catalog": "ZTF_alerts"
     }
     }
r = k.query(query=q)

# indexes = [v['key'] for k, v in r['result_data']['query_result'].items()]
# for ii, ind in enumerate(indexes):
#     print(f'index #{ii+1}: {ind}')

indexes = r['result_data']['query_result']
for ii, (kk, vv) in enumerate(indexes.items()):
    print(f'index #{ii+1}: "{kk}"\n{vv["key"]}\n')

index #1: "_id_"
[['_id', 1]]

index #2: "coordinates.radec_geojson_2dsphere_candid_-1"
[['coordinates.radec_geojson', '2dsphere'], ['candid', -1]]

index #3: "coordinates.radec_geojson_2dsphere_objectId_-1"
[['coordinates.radec_geojson', '2dsphere'], ['objectId', -1]]

index #4: "objectId_1"
[['objectId', 1]]

index #5: "candid_1"
[['candid', 1]]

index #6: "candidate.pid_1"
[['candidate.pid', 1]]

index #7: "objectId_-1_candidate.pid_1"
[['objectId', -1], ['candidate.pid', 1]]

index #8: "candidate.pdiffimfilename_1"
[['candidate.pdiffimfilename', 1]]

index #9: "candidate.jd_-1_classifications.braai_-1_candid_-1"
[['candidate.jd', -1], ['classifications.braai', -1], ['candid', -1]]

index #10: "jd_field_rb_drb_braai_ndethhist_magpsf_isdiffpos"
[['candidate.jd', 1], ['candidate.field', 1], ['candidate.rb', 1], ['candidate.drb', 1], ['classifications.braai', 1], ['candidate.ndethist', 1], ['candidate.magpsf', 1], ['candidate.isdiffpos', 1], ['objectId', 1]]

index #11: "candidate.jd_1

The database wants to minimize I/O, too, so when you make a query, it will first try to find an index to speed up your query. You can explicitely provide it with a `hint` on what index to try and use (see below).

<div style="color: #721c24;
    background-color: #f8d7da;
    border-color: #f5c6cb;
    padding: .75rem 1.25rem;
    margin-bottom: 1rem;
    border: 1px solid transparent;
    border-radius: .25rem;">
    If the database fails to find such an index, it will have to look at the individual documents on disk.
    <br><br>
    The same applies if you make a typo and query the <tt>ZTF_alerts</tt> catalog for, for example, "objectID" or "candidate.objectId" (both don't exist) instead of "objectId", the database will have to look at <b>all</b> entries in the database.
</div>

If you now want to, for example, find out the `objectId`'s of ZTF alerts fainter than 20.5 mag detected on August 2, 2019 in field 650 with a [`drb` score](https://arxiv.org/pdf/1907.11259.pdf) higher than 0.99, you should construct the query in such a way that it uses index #10, you may even provide it as a hint to `kowalski`:

In [48]:
q = {"query_type": "find",
     "query": {
         "catalog": "ZTF_alerts",
         "filter": {'candidate.jd': {'$gt': 2458697.5, '$lt': 2458698.5},
                    'candidate.field': 650,
                    'candidate.drb': {'$gt': 0.99},
                    'candidate.magpsf': {'$gt': 20.5}
                   },
         "projection": {'objectId': 1, '_id': 0}
     },
     "kwargs": {
         "hint": "jd_field_rb_drb_braai_ndethhist_magpsf_isdiffpos",
         "max_time_ms": 10000
     }
     }
r = k.query(query=q)
# display(JSON(r, expanded=True))
oids = [d['objectId'] for d in r['result_data']['query_result']]
print(oids)

['ZTF19ablycmh', 'ZTF19ablxvoh', 'ZTF19ablxvoj', 'ZTF19ablxvoi', 'ZTF19ablxyvs', 'ZTF19abdyeno', 'ZTF19abdyzog', 'ZTF19ablxxlc', 'ZTF19ablxyqz', 'ZTF19abjajrj', 'ZTF18abunvqa', 'ZTF19ablxyne', 'ZTF19abbymtm', 'ZTF19abctuku', 'ZTF19ablxwkp', 'ZTF19abeyyro', 'ZTF19ablxxlc', 'ZTF19abeyxxv']


- If possible, construct your queries in such a way that the result is small(er) in size. Think of a way to slice/split your query into smaller chunks; this is usually good for performance.

#### Compound indexes and prefixes

See a detailed discussion of compound indexes in `MongoDB` [here](https://docs.mongodb.com/manual/core/index-compound/).

`MongoDB` supports compound indexes, where a single index structure holds references to multiple fields within a collection's documents. Compound indexes can support queries that match on multiple fields.

##### Prefixes

Index prefixes are the beginning subsets of indexed fields. For example, consider the compound index #9 above:

```python
[['candidate.jd', -1], ['classifications.braai', -1], ['candid', -1]]
```

The index has the following index prefixes:

```python
{'candidate.jd': -1}
{'candidate.jd': -1, 'classifications.braai': -1}
```

For a compound index, `MongoDB` can use the index to support queries on the index prefixes. As such, `MongoDB` can use the index for queries on the following fields:

- the `candidate.jd` field,
- the `candidate.jd` field and the `classifications.braai` field,
- the `candidate.jd` field and the `classifications.braai` field and the `candid` field.

`MongoDB` can also use the index to support a query on `candidate.jd` and `candid` fields since `candidate.jd` field corresponds to a prefix. However, the index would not be as efficient in supporting the query as would be an index on only `candidate.jd` and `candid`.

However, `MongoDB` cannot use the index to support queries that include the following fields since without the item field, none of the listed fields correspond to a prefix index:

- the `classifications.braai` field,
- the `candid` field, or
- the `classifications.braai` and `candid` fields.

### Query examples

#### `cone_search`

The user can run the cone search around an arbitrary number of sky positions.

The (object) coordinates are passed to the API as a string, but they must be written as a valid `python` expression,
namely as a list of tuples with `RA`'s and `Dec`'s in one of the three supported formats (see examples below):


The object coordinates ($R.A.$, $Decl.$) must be written as a `python`-readable list/tuple of comma-separated object coordinate pairs in one of the three supported formats:
- degrees (expressed as `float` or `int`)
- HH:MM:SS, DD:MM:SS (expressed as `string`) 
- HHhMMmSSs, DDdMMmSSs (expressed as `string`)

Examples:

```python
[(0.0, 0.0)]
[(0.0, 0.0), (1.0, 1.0)]
[('08:55:29.205', '-34:02:36.8944')]
```

Another option is to use a dictionary `{'object_id': (ra, dec), ...}`, for example:

```python
{'object1': ('08h55m29.205s', '-34d02m36.8944s'), 
 'object2': ('06h55m29.205s', '32d02m36.1944s')}
```

The coordinates could also be passed as a string:

```python
"[('08:55:29.205', '-34:02:36.8944')]"
```

For a single source, either two numbers [deg deg] or two strings [HH:MM:SS DD:MM:SS] or [HHhMMmSSs DDdMMmSSs] could be passed:

```python
"0.0 0.0"
"18:43:58.5333 -20:41:41.513"
```

The cone search radius can be specified in `arcsec`, `arcmin`, `deg`, or `rad`.

When specifying which catalogs to search, use the `"filter"` key to define the constraints, and the
`"projection"` key to specify which catalog fields to retain in the query results (see examples below).

The optional `kwargs` dictionary may be used to pass additional parameters:
- `enqueue_only`: boolean. When set to `True`, enqueue on the server and return `query_id`
- `limit`: integer (>=1). Maximum number of matched documents to return (per catalog)
- `skip`: integer (>=0). Number of matched documents to skip (per catalog)
- `hint`: string. Index name to use.
- `max_time_ms`: integer (>0). Specifies a time limit in milliseconds for a query operation. If the specified time is exceeded, the operation will be aborted. Defaults to 300000 (5 minutes).

Additionally, `kwargs` may be used to store auxiliary data with the query (for example, for personal book-keeping).

##### Examples

<b>#1:</b> Get at most three documents (alerts) from the `ZTF_alerts` catalog within `8` arcseconds for two sky positions `("14:35:02.5510", "14:46:36.864") and (20.739842, 29.685781)` and return their `objectId`'s, `candidate.rcid`'s, and `candidate.drb`'s and do not return `_id`'s (which is returned by default). Additionally search the `Gaia_DR2` catalog and return `_id`'s of the matched sources.

In [65]:
q = {"query_type": "cone_search",
     "object_coordinates": {
         "radec": '[("14:35:02.5510", "14:46:36.864"), (20.739842, 29.685781)]', 
         "cone_search_radius": "8",
         "cone_search_unit": "arcsec"
     },
     "catalogs": {
         "ZTF_alerts": {
             "filter": {"candidate.drb": {"$gt": 0.9}},
             "projection": {
                 "objectId": 1,
                 "candidate.rcid": 1,
                 "candidate.drb": 1,
                 "_id": 0
             }
         },
         "Gaia_DR2": {
             "filter": {},
             "projection": {
                 "_id": 1
             }
         }
     },
     "kwargs": {
         "limit": 3
     }
     }
r = k.query(query=q)
data = r['result_data']
display(JSON(data, expanded=True))

<IPython.core.display.JSON object>

#### `count_documents`

The `count_documents` query type is used to count the number of documents in a catalog that match a particular filter expression.

##### Examples

<b>#1:</b> Count the sources in the `RFC_2019a` catalog

In [120]:
q = {"query_type": "count_documents", 
     "query": {
         "catalog": "RFC_2019a",
         "filter": {}
     }
     }
r = k.query(query=q)

data = r['result_data']['query_result']
display(data)

15740

---
<b>#2</b>: Count the number of `ZTF` alerts detected in the `TESS` northern fields on August 3, 2019

In [121]:
q = {"query_type": "count_documents", 
     "query": {
         "catalog": "ZTF_alerts",
         "filter": {'candidate.jd': {'$gt': 2458698.5, '$lt': 2458699.5},
                    'candidate.programpi': 'TESS'}
     }
     }
r = k.query(query=q)

data = r['result_data']['query_result']
display(data)

310452

#### `find`

The `find` query type is used to query a `catalog` using a `filter` expression. The
`projection` key is used to specify which fields ("columns" in SQL/table-speak) to retain in the query results.

The optional `kwargs` dictionary may be used to pass additional parameters:
- `enqueue_only`: boolean. When set to `True`, enqueue on the server and return `query_id`
- `limit`: integer (>=1). Maximum number of matched documents to return (per catalog)
- `skip`: integer (>=0). Number of matched documents to skip (per catalog)
- `hint`: string. Index name to use.
- `max_time_ms`: integer (>0). Specifies a time limit in milliseconds for a query operation. If the specified time is exceeded, the operation will be aborted. Defaults to 300000 (5 minutes).

Additionally, `kwargs` may be used to store auxiliary data with the query (for example, for personal book-keeping).

##### Examples

*<b>#1</b>: Find all ZTF alerts with a given objectId and return the full contents of the alert packets (including image cutouts)*

In [82]:
q = {"query_type": "find", 
     "query": {
         "catalog": "ZTF_alerts",
         "filter": {'objectId': 'ZTF18acxdguz'},
         "projection": {}
     }
     }
r = k.query(query=q)

# display the first match:
data = r['result_data']['query_result'][0]
display(JSON(data))

<IPython.core.display.JSON object>

---
*<b>#2</b>: Get time-stamped (difference image) magnitude measurements and drb scores for all detections of object `ZTF19abitskw` with drb score of >= 0.9 and sort them by observation Julian date in descending order*

In [109]:
q = {"query_type": "find", 
     "query": {
         "catalog": "ZTF_alerts",
         "filter": {'objectId': 'ZTF19abitskw', 'candidate.drb': {'$gte': 0.9}},
         "projection": {'candidate.jd': 1, 'candidate.magpsf': 1, 'candidate.drb': 1}
     },
     "kwargs": {
         "sort": [('candidate.jd', -1)]
     }
     }
     
r = k.query(query=q)

data = r['result_data']['query_result']
display(JSON(data, expanded=True))

<IPython.core.display.JSON object>

---
<i><b>#3:</b> Find all Gaia\_DR2 objects where the absolute G-band magnitude is below an empirical line in the HR diagram (that is, above a line when we plot the magnitude from low to high):
 $M_G - 5 \times log10(1000 / parallax ) + 5  >  2.5\times(BP - RP) + 7.5$, return source \_id's, phot\_g\_mean\_mag, bp-rp, and positions</i>

In [115]:
q = {"query_type": "find", 
     "query": {
         "catalog": "Gaia_DR2",
         "filter": {'parallax': {'$gt': 0}, 
                    '$expr': {'$gt': 
                              [{'$add': 
                                [{'$subtract': 
                                  ['$phot_g_mean_mag', 
                                   {'$multiply': [5, {'$log10': {'$divide': [1000.0, '$parallax']}}]}]}, 5]}, 
                               {'$add': [{'$multiply': [2.5, '$bp_rp']}, 7.5]}]}},
         "projection": {'_id': 1, 'phot_g_mean_mag': 1, 'bp_rp': 1, 'coordinates': 1}
     },
     "kwargs": {
         "limit": 1,  # comment out to get all such object, not just one
         "max_time_ms": 1000  # increase if necessary
     }
     }
     
r = k.query(query=q)

data = r['result_data']['query_result']
display(JSON(data, expanded=True))

<IPython.core.display.JSON object>

---
*<b>#4:</b> Get Gaia DR2 G-band light-curve for Gaia DR2 source with id `1796422062134294656`*

Note that in the `Gaia_DR2_light_curves`, `source_id` field is `int64`.

In [118]:
q = {"query_type": "find", 
     "query": {
         "catalog": "Gaia_DR2_light_curves",
         "filter": {'source_id': 1796422062134294656, 'band': 'G'},
         "projection": {'_id': 0}
     },
     "kwargs": {
         "max_time_ms": 5000  # increase if necessary
     }
     }
     
r = k.query(query=q)

data = r['result_data']['query_result']
display(JSON(data, expanded=False))

<IPython.core.display.JSON object>

---
*<b>#5:</b> Iterate over `_id`'s of all sources in the `RFC_2019a` catalog in batches of 100*

In [131]:
import tqdm
import numpy as np

In [132]:
# count the documents first
q = {"query_type": "count_documents", "query": {"catalog": "RFC_2019a", "filter": {}}}
r = k.query(query=q)
n = r['result_data']['query_result']
display(n)

batch_size = 100
num_batches = int(np.ceil(n / batch_size))

for nb in tqdm.tqdm(range(num_batches)):
    q = {"query_type": "find", 
         "query": {"catalog": "RFC_2019a", "filter": {}, "projection": {"_id": 1}},
         "kwargs": {"skip": nb*batch_size, "limit": batch_size}}
    r = k.query(query=q)
    data = r['result_data']['query_result']
    # do stuff with data
    # print(data)

15740

100%|██████████| 158/158 [00:02<00:00, 57.56it/s]


---
*<b>#6:</b> Find all `PanSTARRS1 DR1` objects with $g-r<0$, return source _id's, colors, and positions*

In [None]:
q = {"query_type": "find", 
     "query": {
         "catalog": "PanSTARRS1",
         "filter": {'$expr': {'$lt': ['$gMeanPSFMag', '$rMeanPSFMag']}},
         "projection": {'_id': 1, 'gMeanPSFMag': 1, 'rMeanPSFMag': 1, 'raMean': 1, 'decMean': 1}
     },
     "kwargs": {
         "limit": 1,  # comment out to get all such object, not just one
         "max_time_ms": 5000  # increase if necessary
     }
     }
     
r = k.query(query=q)

data = r['result_data']['query_result']
display(JSON(data, expanded=False))

---

*<b>#7:</b> Find all PanSTARRS objects with $g-r < -0.1$, return source _id's, colors, and positions*

In [None]:
q = {"query_type": "find", 
     "query": {
         "catalog": "PanSTARRS1",
         "filter": {'gMeanPSFMag': {'$ne': -999}, 'rMeanPSFMag': {'$ne': -999}, 
                    '$expr': {'$gt': [{'$subtract': ['$gMeanPSFMag', '$rMeanPSFMag']}, -0.1]}},
         "projection": {'_id': 1, 'gMeanPSFMag': 1, 'rMeanPSFMag': 1, 'raMean': 1, 'decMean': 1}
     },
     "kwargs": {
         "limit": 1,  # comment out to get all such object, not just one
         "max_time_ms": 5000  # increase if necessary
     }
     }
     
r = k.query(query=q)

data = r['result_data']['query_result']
display(JSON(data, expanded=False))

#### `aggregate`

The `aggregate` query type is used to execute aggregation pipelines on a catalog potentially involving complicated computations and/or/involving (left outer) joins with other catalogs.

##### Examples

*<b>#1</b>: Get all ZTF transient `objectId`'s detected more than 40 times from August 2-4, 2019*

In [134]:
from astropy.time import Time
import datetime

jd_start = Time(datetime.datetime(2019, 8, 2), scale='utc').jd
jd_end = Time(datetime.datetime(2019, 8, 4), scale='utc').jd
# jd_start, jd_end = 2458697.5, 2458699.5

q = {"query_type": "aggregate", 
     "query": {
         "catalog": "ZTF_alerts",
         "pipeline": [{'$match': {'candidate.jd': {'$gt': jd_start, '$lt': jd_end}}}, 
                      {'$group' : { '_id': '$objectId', 'count': { '$sum': 1 } } }, 
                      {'$match': {'count' : {'$gt': 40} } }, 
                      {'$project': {'objectId' : '$_id', '_id' : 0} }]
     },
     "kwargs": {
         "max_time_ms": 20000  # increase if necessary
     }
     }

r = k.query(query=q)

data = r['result_data']['query_result']
# display(JSON(data, expanded=True))
oids = [o['objectId'] for o in data]
display(oids)

['ZTF18aavedij',
 'ZTF18aaxpntb',
 'ZTF18aaxythd',
 'ZTF18aaydzzi',
 'ZTF18aayhgbi',
 'ZTF18aazmbgp',
 'ZTF18abahirx',
 'ZTF18abcmsqc',
 'ZTF18abcqfcq',
 'ZTF18abeaynz',
 'ZTF18abjflti']

#### `general_search`

The `general_search` query type provides a general interface that allows the user to execute queries of arbitrary complexity. The following `pymongo/mongodb` operations are supported: `aggregate`, `map_reduce`, `distinct`, `count_documents`, `find_one`, `find`.

<div style="color: #721c24;
    background-color: #f8d7da;
    border-color: #f5c6cb;
    padding: .75rem 1.25rem;
    margin-bottom: 1rem;
    border: 1px solid transparent;
    border-radius: .25rem;">
    Use with causion and only when absolutely necessary! Does not come with any default noob/accident protection, although such may be of course specified in the query.
</div>

##### Recommendations

Kowalski gives a lot of power to the users, so it is expected that it is used responsibly.

<span class="badge badge-warning">Tip</span> It is always a good idea to test a query before running it "full steam" by limiting the number of returned documents and setting a time-out limit. For example, the following query will return at most 3 matches and will expire after 10 seconds:

In [76]:
q = {"query_type": "general_search", 
     "query": "db['ZTF_alerts'].find({'candidate.drb': {'$gt': 0.9}}, {'_id': 1}, "
              "limit=3, max_time_ms=10000)" 
     }
r = k.query(query=q)

data = r['result_data']['query_result']
display(JSON(data, expanded=True))

<IPython.core.display.JSON object>

Without this limitation, the output will be many GB in size.

If you are unsure about the size of the query result, you can try counting the number of returned documents first:

In [None]:
q = {"query_type": "general_search", 
     "query": "db['ZTF_alerts'].count_documents({'candidate.rb': {'$gt': 0.99}}, max_time_ms=10000)" 
     }

r = k.query(query=q)

data = r['result_data']['query_result']
display(JSON(data, expanded=True))

Use projections to ditch the fields that you do not need in the result and reduce its size:

In [79]:
q = {"query_type": "general_search", 
     "query": "db['ZTF_alerts'].find({'candidate.rb': {'$gt': 0.99}}, "
              "{'candid': 1, 'objectId': 1, 'candidate.rb': 1}, limit=3, max_time_ms=10000)" 
     }

r = k.query(query=q)

data = r['result_data']['query_result']
display(JSON(data, expanded=True))

<IPython.core.display.JSON object>

You can either specify which fields to return by saying `{'FIELD_1': 1, 'FIELD_2': 1, ...}`,
or which ones to leave out keeping all the rest: `{'FIELD_1': 0, 'FIELD_2': 0, ...}`.

You get the fastest execution time if the field(s) that you are querying are indexed. In this case, the database needs to only perform a fast search in the index that in most cases fits into Kowalski's memory. If there is no index available, the database will have to load all the actual documents into 
the memory, which may take a lot of time. You can check the available indexes like this:

In [81]:
q = {"query_type": "general_search", 
     "query": "db['ZTF_alerts'].index_information()" 
     }

Contact Dima if you need a field indexed.

When running aggregation pipelines, use `allowDiskUse=True` if you get the "exceeded memory limit" error.

##### Examples

<span class="badge badge-success">Note</span> In the web interface, you should only type in the "de-stringed" `q['query']` value. You can think of it as of the result of `literal_eval(q['query'])`.

*Find all ZTF alerts with a given `objectId` and return their `candid`'s*

In [84]:
q = {"query_type": "general_search", 
     "query": "db['ZTF_alerts'].find({'objectId': 'ZTF19abkofjp'}, {'candid': 1, '_id': 0},  max_time_ms=10000)" 
     }
r = k.query(query=q)

data = r['result_data']['query_result']
display(JSON(data, expanded=True))

<IPython.core.display.JSON object>

<span class="badge badge-success">Note</span> In the web interface, you should only type:

```python
db['ZTF_alerts'].find({'objectId': 'ZTF19abkofjp'}, {'candid': 1, '_id': 0},  max_time_ms=10000)
```

---

*Find all ZTF alerts with given `objectId`'s*

In [88]:
q = {"query_type": "general_search", 
     "query": "db['ZTF_alerts'].find({'objectId': {'$in': ['ZTF19abitskw', 'ZTF19abhcefa']}},  max_time_ms=10000)" 
     }
r = k.query(query=q)

data = r['result_data']['query_result']
display(JSON(data, expanded=False))

<IPython.core.display.JSON object>

---

*Get all alert `objectId`'s for transients with more than 40 detections, each with a drb score of >= 0.5*

In [102]:
q = { "query_type": "general_search", 
"query": "db['ZTF_alerts'].aggregate([{'$match': {'candidate.jd': {'$gt': 2458697.5, '$lt': 2458699.5}}},"
                                      "{'$group' : { '_id': '$objectId', "
                                                    "'count': { '$sum': {'$cond': [ { '$gte': [ '$candidate.drb', 0.5 ] }, 1, 0]} } } }, "
                                      "{'$match': {'count' : {'$gt': 40} } }," 
                                      "{'$project': {'objectId' : '$_id', '_id' : 0} }], allowDiskUse=True)" }
r = k.query(query=q)
data = r['result_data']['query_result']
# display(JSON(data, expanded=True))
oids = [o['objectId'] for o in data]
display(oids)

['ZTF18aaxpntb',
 'ZTF18aaxythd',
 'ZTF18aaydzzi',
 'ZTF18aayhgbi',
 'ZTF18aazmbgp',
 'ZTF18abahirx',
 'ZTF18abcmsqc',
 'ZTF18abeaynz',
 'ZTF18abjflti']

---


*Get all alert `objectId`'s for transients with more than 20 detections in R and i bands, each with a drb score of >= 0.5*

```python
q = { "query_type": "general_search", 
"query": "db['ZTF_alerts'].aggregate([{'$group' : { '_id': '$objectId', 
                                                    'count': { '$sum': { '$cond': [ { '$and': [ 
                                                                                                { '$in': [ '$candidate.fid', [2, 3] ] }, 
                                                                                                { '$gt': [ '$candidate.rb', 0.5 ] } 
                                                                                              ] 
                                                                                     }, 1, 0 ] 
                                                                        } 
                                                              } } }, 
                                      {'$match': {'count' : {'$gt': 20} } }, 
                                      {'$project': {'objectId' : '$_id', '_id' : 0} }], allowDiskUse=True)" }
```

---

*Get all alert `objectId`'s, `candidate.ra`'s, and `candidate.dec`'s for transients with more than 10 detections in g, R and i bands, each with an rb score of >= 0.5, within a box on the sky*

```python
q = { "query_type": "general_search", 
"query": "db['ZTF_alerts'].aggregate([
                                      {'$match': {'coordinates.radec_geojson': {'$geoWithin': { '$box': [[70.0 - 180.0, 48.0], [70.0005 - 180.0, 48.0005]] }}}},
                                      {'$group' : { '_id': {'objectId': '$objectId', 'ra': '$candidate.ra', 'dec': '$candidate.dec'},
                                                    'count': { '$sum': { '$cond': [ { '$and': [ { '$in': [ '$candidate.fid', [1, 2, 3] ] }, 
                                                                                                { '$gt': [ '$candidate.rb', 0.5 ] } 
                                                                                              ] 
                                                                                     }, 1, 0 ] 
                                                                        } 
                                                              } } }, 
                                      {'$match': {'count' : {'$gt': 10}} }, 
                                      {'$project': {'objectId' : '$_id.objectId', '_id' : 0, 'ra': '$_id.ra', 'dec': '$_id.dec'} }], allowDiskUse=True)" }
```

##### Spatial (positional) queries

It is possible to use the general search interface to execute "spatial" queries, including (and not limited to)
cone, box, and polygon searches.

The `coordinates.radec_geojson` field defined for every object in the database (for all catalogs) has an
associated spherical 2D index, which allows for fast positional queries. `MongoDB` supports many
query operators, see [here](https://docs.mongodb.com/manual/reference/operator/query-geospatial/) 
for more details. The caveat to keep in mind is the following: `MongoDB` uses `GeoJSON` objects to represent `2D`
positions on the sphere. Both the longitude (RA) and latitude (Dec) must be expressed in decimal degrees, and the
valid longitude values are between `-180` and `180`, both inclusive, so you must subtract 180.0 degrees from your RA value.

---

*Run a complicated query, remove the 'cordinates' field from the result, and select a random sample from it*

```python
q = {"query_type": "general_search", 
     "query": "db['ZTF_alerts'].aggregate([{'$match': {'candidate.jd': {'$gt': 2458322.500000}, 'candidate.isdiffpos': 't', 'candidate.programid': 1, 
                                                       'candidate.rb': {'$gt': 0.3, '$lt': 0.65}, '$expr': {'$gt': [{'$abs': '$candidate.ssdistnr'}, 8]}} }, 
                                           {'$project': {'coordinates': 0}}, {'$sample': { 'size': 100 }} ], allowDiskUse=True)"
     }

```

##### Misc

<a href="https://docs.mongodb.com/manual/reference/sql-aggregation-comparison/" target="_blank">SQL to MongoDB Aggregation Mapping Chart <i class="fa fa-external-link" aria-hidden="true"></i></a>