## Processing ZTF alerts stored as avro files

### Import python libraries

In [1]:
import matplotlib.pyplot as plt
from matplotlib import rc
rc('text', usetex=True)
from mpl_toolkits.axes_grid1 import make_axes_locatable

import os
import io
import gzip
import tarfile
import warnings

import numpy as np
import pandas as pd
from tqdm import tqdm

from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
import fastavro

from astropy.time import Time
from astropy.io import fits
import astropy.units as u
import aplpy
%matplotlib inline

### Extract compressed alerts archive

In [2]:
'''
date = ["20210831", "20210910", "20210914", "20210915", "20210916", "20210917", "20210918", "20210919", "20210920",
        "20210921", "20210923", "20210926", "20210927", "20210928", "20210929", "20210930", "20211001", "20211002",
        "20211003", "20211004", "20211007", "20211009", "20211010", "20211011", "20211013", "20211014", "20211015"]
'''
date = ["20211001", "20211002",
        "20211003", "20211004", "20211007", "20211009", "20211010", "20211011", "20211013", "20211014", "20211015"]

output_dir = []
for d in date:
    output_dir.append("/Volumes/doogesh_HDPro/ZTF/ztf_public_"+d)

In [3]:
output_dir

['/Volumes/doogesh_HDPro/ZTF/ztf_public_20211001',
 '/Volumes/doogesh_HDPro/ZTF/ztf_public_20211002',
 '/Volumes/doogesh_HDPro/ZTF/ztf_public_20211003',
 '/Volumes/doogesh_HDPro/ZTF/ztf_public_20211004',
 '/Volumes/doogesh_HDPro/ZTF/ztf_public_20211007',
 '/Volumes/doogesh_HDPro/ZTF/ztf_public_20211009',
 '/Volumes/doogesh_HDPro/ZTF/ztf_public_20211010',
 '/Volumes/doogesh_HDPro/ZTF/ztf_public_20211011',
 '/Volumes/doogesh_HDPro/ZTF/ztf_public_20211013',
 '/Volumes/doogesh_HDPro/ZTF/ztf_public_20211014',
 '/Volumes/doogesh_HDPro/ZTF/ztf_public_20211015']

In [3]:
def find_files(root_dir):
    for dir_name, subdir_list, file_list in os.walk(root_dir, followlinks=True):
        for fname in file_list:
            if fname.endswith('.avro'):
                yield dir_name+'/'+fname

In [4]:
def open_avro(fname):
    with open(fname,'rb') as f:          # modes read and binary     and f is a file
        freader = fastavro.reader(f)     # use to read avro file which are coded in binary
        # in principle there can be multiple packets per file
        for packet in freader:
            yield packet

In [5]:
def generate_dictionaries(root_dir):
    for fname in find_files(root_dir):
        for packet in open_avro(fname):
            yield packet

### Apply some filters/cuts to obtain high purity sample

From ZTF Avro documentation: ZTF alert streams contain an nearly entirely unfiltered stream of all 5-sigma (only the most obvious artefacts are rejected). Depending on your science case, you may wish to improve the purity of your sample by filtering the data on the included attributes.

Based on tests done at IPAC (F. Masci, priv. comm), the following filter delivers a relatively pure sample:

* `rb >= 0.65`
* `nbad = 0`
* `fwhm <= 5`
* `elong <= 1.2`
* `abs(magdiff) <= 0.1`

In [6]:
def is_alert_pure(packet):
    pure = True
    # &= to have the intersection between pure and the other 'object'
    # use here to select objects satisfying all the previous conditions 
    pure &= packet['candidate']['rb'] >= 0.65      
    pure &= packet['candidate']['nbad'] == 0
    pure &= packet['candidate']['fwhm'] <= 5
    pure &= packet['candidate']['elong'] <= 1.2
    pure &= np.abs(packet['candidate']['magdiff']) <= 0.1
    return pure

In [7]:
# Maybe we do not require this function
def generate_programs(output_dir_):

    print('{} has {} avro files'.format(output_dir_, len(list(find_files(output_dir_)))))

    from collections import defaultdict
    programs = defaultdict(int)
    for packet in generate_dictionaries(output_dir_):
        programs[packet['candidate']['programid']] += 1
    print(programs)

    return programs

In [8]:
# Maybe we do not require this function
def filter_programs(output_dir_):

    programs = defaultdict(int)
    for packet in filter(is_alert_pure,generate_dictionaries(output_dir_)):
        programs[packet['candidate']['programid']] += 1
    print(programs)

    return programs

#### For applications using lightcurves it's useful to have the data in a dataframe, but this construction is somewhat slower, so let's apply it after our initial filter.

In [9]:
def make_dataframe(packet):
    dfc = pd.DataFrame(packet['candidate'], index=[0])
    df_prv = pd.DataFrame(packet['prv_candidates'])
    dflc = pd.concat([dfc,df_prv], ignore_index=True)
    # we'll attach some metadata--not this may not be preserved after all operations
    # https://stackoverflow.com/questions/14688306/adding-meta-information-metadata-to-pandas-dataframe
    dflc.objectId = packet['objectId']
    dflc.candid = packet['candid']
    return dflc

Let's use the following cuts to select likely extragalactic transients:
* the difference image detection should be positive;
* if there is a PS1 source within 1.5" of the source, it should have a star-galaxy score of < 0.5 (galaxy-like);
* there should be at least two detections separated by more than 30 minutes;
* there should be no known solar system object within 5".

In [10]:
def is_transient(dflc):
    
    candidate = dflc.loc[0]
    
    is_positive_sub = candidate['isdiffpos'] == 't'
    
    if (candidate['distpsnr1'] is None) or (candidate['distpsnr1'] > 1.5):
        no_pointsource_counterpart = True
    else:
        if candidate['sgscore1'] < 0.5:
            no_pointsource_counterpart = True
        else:
            no_pointsource_counterpart = False
            
    where_detected = (dflc['isdiffpos'] == 't') # nondetections will be None
    if np.sum(where_detected) >= 2:
        detection_times = dflc.loc[where_detected,'jd'].values
        dt = np.diff(detection_times)
        not_moving = np.max(dt) >= (30*u.minute).to(u.day).value
    else:
        not_moving = False
    
    no_ssobject = (candidate['ssdistnr'] is None) or (candidate['ssdistnr'] < 0) or (candidate['ssdistnr'] > 5)
    
    return is_positive_sub and no_pointsource_counterpart and not_moving and no_ssobject

In [11]:
transient_selection_list = np.load("data/ZTF_selection_temp.npz")['transient_selection_list'].tolist()

FileNotFoundError: [Errno 2] No such file or directory: 'data/ZTF_selection_temp.npz'

In [33]:
output_dir

['/Volumes/doogesh_HDPro/ZTF/ztf_public_20211001',
 '/Volumes/doogesh_HDPro/ZTF/ztf_public_20211002',
 '/Volumes/doogesh_HDPro/ZTF/ztf_public_20211003',
 '/Volumes/doogesh_HDPro/ZTF/ztf_public_20211004',
 '/Volumes/doogesh_HDPro/ZTF/ztf_public_20211007',
 '/Volumes/doogesh_HDPro/ZTF/ztf_public_20211009',
 '/Volumes/doogesh_HDPro/ZTF/ztf_public_20211010',
 '/Volumes/doogesh_HDPro/ZTF/ztf_public_20211011',
 '/Volumes/doogesh_HDPro/ZTF/ztf_public_20211013',
 '/Volumes/doogesh_HDPro/ZTF/ztf_public_20211014',
 '/Volumes/doogesh_HDPro/ZTF/ztf_public_20211015']

In [None]:
#transient_selection_list = []

pbar = tqdm(total=len(output_dir))

for out_dir in output_dir:
      # pq il n'y a pas d'argument pour la fonction is_alert_pure ? 
    for packet in filter(is_alert_pure,generate_dictionaries(out_dir)): 
        dflc = make_dataframe(packet)
        if is_transient(dflc):
            #print(packet['objectId'])
            if packet['objectId'] not in transient_selection_list:
                transient_selection_list.append(packet['objectId'])
    pbar.update(1)

In [None]:
len(transient_selection_list)

In [None]:
np.savez("data/ZTF_selection.npz", transient_selection_list=transient_selection_list)