# Labeling sources with the ZTF Variable Marshal

[![Open In Google Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/dmitryduev/ztf-variable-marshal/blob/master/nb/label.ipynb)

This notebook details the data labeling workflow with the ZTF Variable Marshal (ZVM).

In this example, we will use a dataset containing a few recently discovered W Uma-type binary stars.
We will connect to the ZVM, create a dedicated program for the dataset, save the sources to the program, add periods, and set labels.

In [1]:
%%capture
!pip install git+https://github.com/dmitryduev/ztf-variable-marshal.git

In [1]:
from zvm import zvm
import json
import pandas as pd
import numpy as np
from tqdm.auto import tqdm
from IPython.core.display import display, HTML, JSON
import multiprocessing as mp
from multiprocessing.pool import ThreadPool
import requests

n_cpu = mp.cpu_count()

For security, let us store the access credentials in a local json file secrets_zvm.json:

In [2]:
secrets = {
    "zvm": {
        "protocol": "https",
        "host": "rico.caltech.edu",
        "port": 443,
        "username": "<USERNAME>",
        "password": "<PASSWORD>"
    }
}

# with open('secrets_zvm_rico.json', 'w') as f:
#     json.dump(secrets, f)

Load the credentials and initialize a zvm object:

In [3]:
with open('secrets_zvm_rico.json', 'r') as f:
    secrets = json.load(f)

z = zvm(**secrets['zvm'], verbose=False)
print(f'Connection OK: {z.check_connection()}')

Connection OK: True


## Dataset: CMO RC600 W Uma-type binaries

The recently discovered W Uma-type binaries come from the new automated [60-cm telescope of the Caucasus Mountain Observatory](http://sai.msu.ru/new_vars/) (Sternberg Astronomical Institute, Lomonosov Moscow State University, Russia). Their telescope is equipped with an Andor iKon-L (DZ936N-BV) 2048x2048 CCD camera and a set of filters (U, B, V, Rc, Ic, g2, r2, i2, Clear). The FoV is 22' by 22'. To search for new variable stars, they used the [VaST](http://scan.sai.msu.ru/vast/) code.

In [4]:
dataset = [{'ra': '19:11:20.631', 'dec': '+05:06:00.12', 'Gaia_DR2': 4293456085994508800, 'p': 0.31028, 'p_units': 'days'},
           {'ra': '19:12:00.636', 'dec': '+05:09:10.63', 'Gaia_DR2': 4293549651886492032, 'p': 0.26425, 'p_units': 'days'},
           {'ra': '19:12:05.254', 'dec': '+04:59:52.44', 'Gaia_DR2': 4293403760425690624, 'p': 0.30650, 'p_units': 'days'},
           {'ra': '00:38:09.817', 'dec': '+59:04:24.08', 'Gaia_DR2': 425417545760834176, 'p': 0.30650, 'p_units': 'days'},
           {'ra': '00:39:44.341', 'dec': '+59:08:48.87', 'Gaia_DR2': 425376966913716992, 'p': 0.29018, 'p_units': 'days'},
           {'ra': '00:39:44.487', 'dec': '+59:06:29.91', 'Gaia_DR2': 425375416425590016, 'p': 0.30193, 'p_units': 'days'},
           {'ra': '00:39:54.337', 'dec': '+59:01:00.88', 'Gaia_DR2': 425371675514064128, 'p': 0.30800, 'p_units': 'days'},
           {'ra': '00:40:23.234', 'dec': '+59:07:10.31', 'Gaia_DR2': 425374115055452416, 'p': 0.32955, 'p_units': 'days'},
           {'ra': '00:40:40.830', 'dec': '+59:06:52.52', 'Gaia_DR2': 425373423560788736, 'p': 0.32743, 'p_units': 'days'},
           {'ra': '00:40:41.791', 'dec': '+59:09:33.90', 'Gaia_DR2': 425467333025500032, 'p': 0.47833, 'p_units': 'days'}]

In [5]:
df = pd.DataFrame.from_records(dataset, columns=['ra', 'dec', 'Gaia_DR2', 'p', 'p_units'])
df

Unnamed: 0,ra,dec,Gaia_DR2,p,p_units
0,19:11:20.631,+05:06:00.12,4293456085994508800,0.31028,days
1,19:12:00.636,+05:09:10.63,4293549651886492032,0.26425,days
2,19:12:05.254,+04:59:52.44,4293403760425690624,0.3065,days
3,00:38:09.817,+59:04:24.08,425417545760834176,0.3065,days
4,00:39:44.341,+59:08:48.87,425376966913716992,0.29018,days
5,00:39:44.487,+59:06:29.91,425375416425590016,0.30193,days
6,00:39:54.337,+59:01:00.88,425371675514064128,0.308,days
7,00:40:23.234,+59:07:10.31,425374115055452416,0.32955,days
8,00:40:40.830,+59:06:52.52,425373423560788736,0.32743,days
9,00:40:41.791,+59:09:33.90,425467333025500032,0.47833,days


## Labeling

Get programs:

In [7]:
r = z.api(endpoint='programs', method='get', data={'format': 'json'})
display(JSON(r))

<IPython.core.display.JSON object>

Create a new program:

In [8]:
r = z.api(endpoint='programs', method='put', data={'program_name': 'W_Uma_CMO_RC600', 
                                                   'program_description': '10 W Uma binaries discovered with CMO RC600'})
display(JSON(r))

<IPython.core.display.JSON object>

We can now save the sources to the newly created `zvm_program_id=2` program automatically pulling all available ZTF light curve data, adding periods, and labels. 

Note that we will be using the "random" source naming scheme instead of the standard incremental alpha-numeric scheme used on the marshal as it is faster and it better handles source name collision avoidance.

In [6]:
# zvm_program_id = r.get('result', dict()).get('_id', 1)
zvm_program_id = 2

In [8]:
for ir, row in tqdm(df.iterrows(), total=len(df)):
    # save source
    r = z.api(endpoint='sources', method='put', data={'ra': row.ra, 'dec': row.dec, 
                                                      'prefix': 'ZTF',
                                                      'naming': 'random',
                                                      'zvm_program_id': zvm_program_id, 
                                                      'automerge': True})
#     display(JSON(r))
    source_id = r['result']['_id']
    # set period
    r = z.api(endpoint=f'sources/{source_id}', method='post', data={'source_id': source_id,
                                                                'action': 'add_period',
                                                                'period': row.p,
                                                                'period_unit': row.p_units.capitalize()})
#     display(JSON(r))
    # set label
    r = z.api(endpoint=f'sources/{source_id}', method='post', data={'source_id': source_id,
                                                                'action': 'set_labels',
                                                                'labels': [{'type': 'phenomenological',
                                                                            'label': 'variable', 
                                                                            'value': 1},
                                                                           {'type': 'phenomenological',
                                                                            'label': 'periodic', 
                                                                            'value': 1},
                                                                           {'type': 'intrinsic',
                                                                            'label': 'binary star', 
                                                                            'value': 1},
                                                                           {'type': 'intrinsic',
                                                                            'label': 'W Uma', 
                                                                            'value': 1}]})
#     display(JSON(r))
#     break

HBox(children=(IntProgress(value=0, max=10), HTML(value='')))




### Working with a large number of sources

While the simple example above works fine with a relatively small number of sources, let us now write a parallel version of the code for working with a large number of sources.

In [11]:
def save_source(irow):
    i, row = irow
    
    for ii in range(3):
        try:
            # save by position
            r = z.api(endpoint='sources', method='put', data={'ra': row.ra, 'dec': row.dec, 
                                                              'prefix': 'ZTF',
                                                              'naming': 'random',
                                                              'zvm_program_id': zvm_program_id, 
                                                              'automerge': True})
            source_id = r['result']['_id']
            # set period
            r = z.api(endpoint=f'sources/{source_id}', method='post', data={'source_id': source_id,
                                                                        'action': 'add_period',
                                                                        'period': row.p,
                                                                        'period_unit': row.p_units.capitalize()})
            # set label
            r = z.api(endpoint=f'sources/{source_id}', method='post', data={'source_id': source_id,
                                                                        'action': 'set_labels',
                                                                        'labels': [{'type': 'phenomenological',
                                                                                    'label': 'variable', 
                                                                                    'value': 1},
                                                                                   {'type': 'phenomenological',
                                                                                    'label': 'periodic', 
                                                                                    'value': 1},
                                                                                   {'type': 'intrinsic',
                                                                                    'label': 'binary star', 
                                                                                    'value': 1},
                                                                                   {'type': 'intrinsic',
                                                                                    'label': 'W Uma', 
                                                                                    'value': 1}]})
            break
        except Exception as e:
            # print(e)
            continue

Note: the code below will save the same sources to the program again, assigning new unique identifiers.

In [12]:
with ThreadPool(processes=np.min((4, n_cpu))) as p:
    r = list(tqdm(p.imap(save_source, df.iterrows()), total=len(df)))

HBox(children=(IntProgress(value=0, max=10), HTML(value='')))




Note for initiated: do not use `mp.Pool` (a pool of processes) as `python`'s `openssl` library (which is used internally by `zvm`'s client when establishing secure connection with the server) has some issues with it, so the first API calls in the newly started processes fail most of the time.

### Available classes

While the web labeling GUI will take care of label (naming) consistency for you, when doing things programmatically, you should make sure to do the same.

In particular, when setting sub classes of the `intrinsic` classification tree, do not forget to also mark the corresponding super classes (for consistency), e.g. `[{'type': 'intrinsic', 'label': 'W Uma', 'value': 1}, {'type': 'intrinsic', 'label': 'binary stars', 'value': 1}]`

Here are the currently available labels that the ZVM understands:

In [13]:
r = requests.get(url='https://raw.githubusercontent.com/dmitryduev/ztf-variable-marshal/'
                 'master/ztf-variable-marshal/config.json')
config = r.json()
display(config['classifications'])

{'phenomenological': ['variable',
  'periodic',
  'multi periodic',
  'long periodic',
  'stochastic',
  'eclipsing',
  'eruptive',
  'data artifacts'],
 'intrinsic': {'pulsator': ['high amplitude Delta Scu',
   'Gamma Dor',
   'Cepheid',
   {'Cepheid type-II': ['BL Her', 'W Virginis', 'RV Tau']},
   {'RR Lyrae': ['RR Lyrae ab',
     'RR Lyrae c',
     'RR Lyrae d',
     'RR Lyrae Blazhko']},
   {'WD pulsator': ['DAV (ZZ Ceti)', 'DBV', 'ELMV']},
   {'sdB': ['sdBV p-mode', 'sdBV g-mode', 'BLAP', 'high-g BLAP']}],
  'binary star': ['W Uma',
   'detached eclipsing MS-MS',
   'Beta Lyr',
   'RS CVn',
   {'compact binary': ['eclipsing dWD',
     'eclipsing WD+dM (NN Ser)',
     'eclipsing sdB+dM (HW Vir)',
     'Redback pulsar']}],
  'AGN': []}}

### Querying the ZVM for labels

Let us now query the marshal for the labeled data. Grab id's (both marshal's and ZTF_sources') of 5 sources from our `zvm_program_id` that got at least one label. 

In [15]:
q = {"query_type": "find",
     "query": {
         "catalog": "sources",
         "filter": {'zvm_program_id': zvm_program_id, 'labels.0': {'$exists': True}},
#          "filter": {'_id': 'ZTF4uo925lq'},
         "projection": {'_id': 1, 'lc.id': 1, 'labels': 1}
     },
     "kwargs": {
         "limit": 5 
     }
    }
r = z.query(query=q)
data = r['result']['result_data']['query_result']
# display(JSON(r, expanded=True))
display(JSON(data, expanded=False))

<IPython.core.display.JSON object>

### Misc

- Notes on how things work internally
    - I deployed a dedicated "clean" instance of the variable marshal for the labeling work not to flood the main ZVM instance on `skipper` with potentially millions of sources, which will likely confuse its users. However, the relevant labelers can use the same access credentials as on the main instance. Note that the code base is exactly the same, so you can label stuff on `skipper` as well if you want/need.
    - We represent individual data sets as ZVM programs. Therefore, all sources to be labeled are simply assigned to a program when saving to the marshal. The labeling GUI displays a view of the same database entries of individual sources, the interaction is done via the API.