In [1]:
import os
import fastavro
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from correction.compute import *

In [2]:
from multiprocessing import Pool, cpu_count

def applyParallel(dfGrouped, func):
    with Pool(cpu_count()) as p:
        ret_list = p.map(func, [group for name, group in dfGrouped])
    return pd.concat(ret_list)

In [3]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

# Extract SNe

In [4]:
#!pip install psycopg2-binary
import psycopg2

In [5]:
import json
credentials_file = "../../usecases/alercereaduser_v2.json"
with open(credentials_file) as jsonfile:
    params = json.load(jsonfile)["params"]

In [6]:
conn = psycopg2.connect(dbname=params['dbname'], user=params['user'], host=params['host'], password=params['password'])

In [7]:
query = "select * from class"
classes = pd.read_sql_query(query, conn)
classmapper = dict(zip(classes.name.tolist(), classes.id.tolist()))
classmapper_invert = dict(zip(classes.id.tolist(), classes.name.tolist()))
classmapper

{'Other': 0,
 'Ceph': 1,
 'LPV': 4,
 'RRL': 5,
 'EB': 3,
 'SNe': 6,
 'DSCT': 2,
 'Blazar': 8,
 'CV/Nova': 9,
 'SLSN': 14,
 'AGN': 18,
 'SN': 19,
 'Variable Star': 20,
 'Asteroid': 21,
 'Bogus': 22,
 'SNIa': 10,
 'SNIbc': 11,
 'SNII': 12,
 'SNIIn': 13,
 'EBSD/D': 15,
 'EBC': 16,
 'Periodic-Other': 17,
 'AGN-I': 7,
 'RS-CVn': 23,
 'QSO-I': 24}

In [8]:
query='''
select oid
from objects
where classrf in ('%i', '%i', '%i', '%i')
''' % (classmapper["SNIa"], classmapper["SNIbc"], classmapper["SNII"], classmapper["SLSN"])
sn = pd.read_sql_query(query, conn)
display(sn.head())
sn.shape

Unnamed: 0,oid
0,ZTF19aattvhh
1,ZTF19abvbraf
2,ZTF20aafujbt
3,ZTF19adazxnz
4,ZTF18ablvirg


(7587, 1)

In [9]:
#data = pd.read_csv("../data_examples/csv/raw_detections.csv", index_col=False)
#'ZTF19aazzpje' #'ZTF20aaelulu' #'ZTF19aazzpje' # SN
#'ZTF18aaiscil' # weird
#'ZTF18abdgukn' # AGN
#'ZTF17aaaedmd' # LPV
#"ZTF19abaejrh"
#"ZTF20aaelulu"

# Work with test parquets

In [10]:
det = pd.read_parquet("./det_sample.parquet")
det["mjd"] = det.jd - 2400000.5

In [11]:
non_det = pd.read_parquet("./non_det_sample.parquet")
non_det["mjd"] = non_det.jd - 2400000.5

In [12]:
# check for oid in non_det missing from det
len(set(non_det.objectId.unique()).difference(set(det.objectId.unique())))

0

In [13]:
# count number of detections
ndet = det.groupby("objectId").apply(len)

In [14]:
# select a given number of oids
seloid = ndet[ndet > 5].index.values
#seloid = sn.oid.values[:1000]

In [15]:
# filter detections
det = det.loc[det.objectId.isin(seloid)].copy()
non_det = non_det.loc[non_det.objectId.isin(seloid)][["objectId", "fid", "mjd", "diffmaglim"]].copy()

In [16]:
det.head()

Unnamed: 0,objectId,jd,fid,pid,diffmaglim,pdiffimfilename,programpi,programid,candid,isdiffpos,...,szmag3,sgscore3,distpsnr3,nmtchps,rfid,jdstartref,jdendref,nframesref,parent_candid,mjd
0,ZTF17aaaaact,2458426.0,1,671340022015,20.808331,ztf_20181103339826_000652_zg_c06_o_q1_scimrefd...,Kulkarni,1,671340022015010000,f,...,20.863701,0.0035,22.340849,5,652120120,2458292.0,2458348.0,15,0,58425.340023
1,ZTF17aaaaact,2458429.0,1,674220892015,20.655697,ztf_20181106220880_000652_zg_c06_o_q1_scimrefd...,Kulkarni,1,674220892015010000,f,...,20.863701,0.0035,22.497215,5,652120120,2458292.0,2458348.0,15,0,58428.220891
2,ZTF17aaaaact,2458432.0,1,677242062015,20.205976,ztf_20181109242049_000652_zg_c06_o_q1_scimrefd...,Kulkarni,1,677242062015010000,f,...,20.863701,0.0035,22.616156,5,652120120,2458292.0,2458348.0,15,0,58431.24206
3,ZTF17aaaaact,2458472.0,1,717098632015,18.827154,ztf_20181219098542_000652_zg_c06_o_q1_scimrefd...,Kulkarni,1,717098632015010024,f,...,20.863701,0.0035,22.495527,5,652120120,2458292.0,2458348.0,15,0,58471.098634
4,ZTF17aaaaact,2458737.0,1,982462462015,20.833609,ztf_20190910462419_000652_zg_c06_o_q1_scimrefd...,Kulkarni,1,982462462015010000,f,...,20.863701,0.0035,22.405315,5,652120120,2458292.0,2458348.0,15,0,58736.462465


In [17]:
det.shape

(599951, 85)

In [18]:
det.objectId.unique()

array(['ZTF17aaaaact', 'ZTF17aaaaaso', 'ZTF17aaaaaue', ...,
       'ZTF20abcramc', 'ZTF20abcrptz', 'ZTF20abcxoqp'], dtype=object)

In [19]:
non_det.head()

Unnamed: 0,objectId,fid,mjd,diffmaglim
0,ZTF17aaabwoc,2,58883.182292,19.106199
1,ZTF17aaabwoc,2,58804.244595,19.5737
2,ZTF17aaabwoc,2,58715.455243,20.1231
3,ZTF17aaabwoc,2,58856.181933,19.7777
4,ZTF17aaabwoc,2,58690.431516,20.1749


In [20]:
non_det.shape

(2347098, 4)

In [21]:
non_det.objectId.unique()

array(['ZTF17aaabwoc', 'ZTF17aabdsmw', 'ZTF17aabqioz', ...,
       'ZTF20aajfrrc', 'ZTF20aamifit', 'ZTF20aavzlmg'], dtype=object)

In [22]:
# test c2
#data.at[180, "isdiffpos"] = 'f'
# test c3
#data.at[150, "distnr"] = 0.1

In [23]:
pd.options.display.max_rows = 999
display(det[["objectId", "fid", "magpsf", "distnr", "chinr", "sharpnr"]])

Unnamed: 0,objectId,fid,magpsf,distnr,chinr,sharpnr
0,ZTF17aaaaact,1,18.929613,0.057899,2.057,-0.199
1,ZTF17aaaaact,1,19.072699,0.185630,2.057,-0.199
2,ZTF17aaaaact,1,19.020626,0.301870,2.057,-0.199
3,ZTF17aaaaact,1,18.776876,0.304684,2.057,-0.199
4,ZTF17aaaaact,1,19.452328,0.116335,2.057,-0.199
5,ZTF17aaaaact,1,19.310141,0.155953,2.057,-0.199
6,ZTF17aaaaact,1,18.725660,0.215732,2.057,-0.199
7,ZTF17aaaaact,1,18.713564,0.231983,2.057,-0.199
8,ZTF17aaaaact,1,18.805702,0.217483,2.057,-0.199
9,ZTF17aaaaact,1,18.960356,0.171950,2.057,-0.199


In [24]:
def apply_correction_df(data):
    
    # create copy of dataframe
    df = data.copy()
    df.set_index("candid", inplace=True)
    
    # map isdiffpos
    df['isdiffpos'] = df['isdiffpos'].map({'t': 1., '1': 1, '0': -1, 'f': -1.})

    # apply correction where distnr < 1.4
    df["corrected"] = df["distnr"] < 1.4

    # correct
    correction_results = df.apply(lambda x: correction(x.magnr, x.magpsf, x.sigmagnr, x.sigmapsf, x.isdiffpos, oid=df.objectId.iloc[0], candid=x.name) if x["corrected"] else (np.nan, np.nan, np.nan), axis=1, result_type="expand")
    try:
        df["magpsf_corr"], df["sigmapsf_corr"], df["sigmapsf_corr_ext"] = correction_results[0], correction_results[1], correction_results[2]
    except:
        display(correction_results)
        print(correction_results)
    
    # check if suspicious behavior in the light curve (c1 | c2 | c3)
    corr_magstats = df.loc[df.index.min()]["corrected"]
    mask = ((df["corrected"] == False) & (df.isdiffpos == -1)) | (corr_magstats & (df["corrected"] == False)) | ((corr_magstats == False) & df["corrected"])
    df["dubious"] = mask
    
    return df

In [25]:
def apply_mag_stats(df):
    
    response = {}
    
    # minimum and maximum candid
    idxmin = df.index.min()
    idxmax = df.index.max()

    # corrected at the first detection?
    response['corrected'] = df.loc[idxmin]["corrected"]
    
    # stellar_object at first detection?
    nearZTF = (df.loc[idxmin].distnr >= 0) & (df.loc[idxmin].distnr < 1.4) # near a ZTF object
    nearPS1 = (df.loc[idxmin].distpsnr1 >= 0) & (df.loc[idxmin].distpsnr1 < 1.4) # near a ZTF object
    stellarPS1 = (df.loc[idxmin].sgscore1 > 0.4)  # nearest object in PS1 is stelar
    stellarZTF = (df.loc[idxmin].chinr < 2) & (df.loc[idxmin].sharpnr > -0.13) & (df.loc[idxmin].sharpnr < 0.1) # nearest object in ZT is stellar
    response["nearZTF"] = nearZTF
    response["nearPS1"] = nearPS1
    response["stellarZTF"] = nearZTF
    response["stellarPS1"] = nearPS1
    response["stellar"] = (nearZTF & nearPS1 & stellarPS1) | (nearZTF & ~nearPS1 & stellarZTF)
    
    # number of detections and dubious detections
    response["ndet"] = df.shape[0]
    response["ndubious"] = df.dubious.sum()

    # reference id
    response["nrfid"] = len(df.rfid.dropna().unique())

    # psf magnitude statatistics
    response["magpsf_mean"] = df.magpsf.mean()
    response["magpsf_median"] = df.magpsf.median()
    response["magpsf_max"] = df.magpsf.max()
    response["magpsf_min"] = df.magpsf.min()
    response["magpsf_first"] = df.loc[idxmin].magpsf
    response["sigmapsf_first"] = df.loc[idxmin].sigmapsf
    response["magpsf_last"] = df.loc[idxmax].magpsf
    
    # psf corrected magnitude statatistics
    response["magpsf_corr_mean"] = df.magpsf_corr.mean()
    response["magpsf_corr_median"] = df.magpsf_corr.median()
    response["magpsf_corr_max"] = df.magpsf_corr.max()
    response["magpsf_corr_min"] = df.magpsf_corr.min()
    response["magpsf_corr_first"] = df.loc[idxmin].magpsf_corr
    response["magpsf_corr_last"] = df.loc[idxmax].magpsf_corr
    
    # corrected psf magnitude statistics
    response["magap_mean"] = df.magap.mean()
    response["magap_median"] = df.magap.median()
    response["magap_max"] = df.magap.max()
    response["magap_min"] = df.magap.min()
    response["magap_first"] = df.loc[idxmin].magap
    response["magap_last"] = df.loc[idxmax].magap
           
    # time statistics
    response["first_mjd"] = df.loc[idxmin].mjd
    response["last_mjd"] = df.loc[idxmax].mjd
       
    return pd.Series(response)

In [26]:
def apply_object_stats(df):
    
    response = {}

    response["nearZTF"] = df.nearZTF.all()
    response["nearPS1"] = df.nearPS1.all()
    response["stellar"] = df.stellar.all()
    response["corrected"] = df.corrected.all()
    response["ndet"] = df.ndet.sum() # sum of detections in all bands
    response["ndubious"] = df.ndubious.sum() # sum of dubious corrections in all bands
    
    return pd.Series(response)

In [27]:
def do_dmdt(nd):
    
    response = {}
    objectId = nd.objectId.iloc[0]
    fid = nd.fid.iloc[0]
    if fid in magstats.loc[objectId].index:
        mjd_first = magstats.loc[objectId].loc[fid].first_mjd
        if nd.mjd.min() < mjd_first:
            mask = nd.mjd < mjd_first
            magpsf_first = magstats.loc[objectId].loc[fid].magpsf_first
            sigmapsf_first = magstats.loc[objectId].loc[fid].sigmapsf_first
            dm = magpsf_first - nd.loc[mask].diffmaglim
            dm_sigma = magpsf_first + sigmapsf_first - nd.loc[mask].diffmaglim
            dt = mjd_first - nd.loc[mask].mjd
            dmsigdt = (dm_sigma / dt)
            idxmin = dmsigdt.idxmin()
            response["dmdt_first"] = dmsigdt.loc[idxmin]
            response["dm_first"] = magpsf_first - nd.diffmaglim.loc[idxmin]
            response["sigmadm_first"] = sigmapsf_first - nd.diffmaglim.loc[idxmin]
            response["dt_first"] = dt.loc[idxmin]
        else:
            response["dmdt_first"] = np.nan
            response["dm_first"] = np.nan
            response["sigmadm_first"] = np.nan
            response["dt_first"] = np.nan
    else:
        response["dmdt_first"] = np.nan
        response["dm_first"] = np.nan
        response["sigmadm_first"] = np.nan
        response["dt_first"] = np.nan

    return pd.Series(response)

# Do the magic

In [28]:
if len(non_det.index.names) == 2:
    non_det.reset_index(level=[0, 1], inplace=True)
non_det.head()

Unnamed: 0,objectId,fid,mjd,diffmaglim
0,ZTF17aaabwoc,2,58883.182292,19.106199
1,ZTF17aaabwoc,2,58804.244595,19.5737
2,ZTF17aaabwoc,2,58715.455243,20.1231
3,ZTF17aaabwoc,2,58856.181933,19.7777
4,ZTF17aaabwoc,2,58690.431516,20.1749


In [None]:
dflarge = applyParallel(det.groupby(["objectId", "fid"]), apply_correction_df)
dflarge.head(2)

In [None]:
magstats = dflarge.groupby(["objectId", "fid"]).apply(apply_mag_stats)
magstats.head(2)

In [None]:
objstats = magstats.groupby(["objectId"]).apply(apply_object_stats)
objstats.head(2)

In [None]:
dmdt = pd.DataFrame(non_det.groupby(["objectId", "fid"]).apply(do_dmdt))
magstats = magstats.join(dmdt)
magstats

In [None]:
non_det.set_index(["objectId", "fid"], inplace=True)

# Plot some examples

In [None]:
#for oid in dflarge.objectId.unique():

def plot_object(oid):
    
    colors = {1: 'g', 2: 'r'}
    period = {}
    period['ZTF18aazxcwf'] = 0.614515 # RRL

        
    fig, ax = plt.subplots(ncols = 3, sharey = False, figsize=(30, 6))
    maskoid = dflarge.objectId == oid
    display(pd.DataFrame(objstats.loc[oid][["corrected", "nearZTF", "stellar"]]))
    display(magstats.loc[oid][["nearZTF", "nearPS1", "stellarZTF", "stellarPS1", "corrected", "nearZTF", "nearPS1", "stellar", "dmdt_first"]])
    #display(dflarge.loc[maskoid][["rfid", "fid", "mjd", "magpsf", "magnr", "isdiffpos", "magpsf_corr", "corrected", "dubious"]])
    
    for fid in dflarge.loc[maskoid].fid.unique():
        mask = maskoid & (dflarge.fid == fid)

        color = colors[fid]
        
        # non-detections
        if oid in non_det.index.levels[0]:
            if fid in non_det.loc[oid].index:
                ax[0].scatter(non_det.loc[oid].loc[fid].mjd, non_det.loc[oid].loc[fid].diffmaglim, marker = 'v', color=color, alpha=0.2)

        for pos in dflarge.loc[mask].isdiffpos.unique():
            
            mask = mask & (dflarge.isdiffpos == pos)
            mask_diff = mask & (dflarge.sigmapsf < 2)
            mask_corr = mask & (dflarge.sigmapsf_corr < 2)
            mask_corr_ext = mask & (dflarge.sigmapsf_corr_ext < 2)
                
            marker = 'o' if pos == 1 else '*'
            neg = '' if pos == 1 else ' (negative)'

            if oid not in period.keys():
                ax[0].errorbar(dflarge.loc[mask_diff].mjd, dflarge.loc[mask_diff].magpsf, yerr=dflarge.loc[mask_diff].sigmapsf, c=color, marker=marker, lw=0, elinewidth=1, alpha=1, label="magpsf%s" % neg)
                ax[1].errorbar(dflarge.loc[mask_corr].jd, dflarge.loc[mask_corr].magpsf_corr, yerr=dflarge.loc[mask_corr].sigmapsf_corr, c=color, marker=marker, lw=0, elinewidth=1, alpha=1, label="magpsf_corr%s" % neg)
                ax[2].errorbar(dflarge.loc[mask_corr_ext].jd, dflarge.loc[mask_corr_ext].magpsf_corr, yerr=dflarge.loc[mask_corr_ext].sigmapsf_corr_ext, c=color, marker=marker, lw=0, elinewidth=1, alpha=1, label="magpsf_corr_ext%s" % neg)
            else:
                ax[0].errorbar(np.mod(dflarge.loc[mask_diff].mjd, period[oid]), dflarge.loc[mask_diff].magpsf, yerr=dflarge.loc[mask_diff].sigmapsf, c=color, marker=marker, lw=0, elinewidth=1, alpha=1, label="magpsf%s" % neg)
                ax[1].errorbar(np.mod(dflarge.loc[mask_corr].mjd, period[oid]), dflarge.loc[mask_corr].magpsf_corr, yerr=dflarge.loc[mask_corr].sigmapsf_corr, c=color, marker=marker, lw=0, elinewidth=1, alpha=1, label="magpsf_corr%s" % neg)
                ax[2].errorbar(np.mod(dflarge.loc[mask_corr_ext].mjd, period[oid]), dflarge.loc[mask_corr_ext].magpsf_corr, yerr=dflarge.loc[mask_corr_ext].sigmapsf_corr_ext, c=color, marker=marker, lw=0, elinewidth=1, alpha=1, label="magpsf_corr_ext%s" % neg)
    
    for i in range(3):
        ax[i].set_title("oid: %s, corrected: %s, dmdt: %s" % (oid, objstats.loc[oid].corrected, magstats.loc[oid].dmdt_first.values))
        ax[i].set_ylim(ax[i].get_ylim()[::-1])
        ax[i].legend()
        if oid not in period.keys():
            ax[i].set_xlabel("jd")
        else:
            ax[i].set_xlabel("phase")

In [None]:
for idx, oid in enumerate(dflarge.objectId.unique()):#sn.loc[sn.oid.isin(seloid)].oid):
    if idx < 50: # and oid == "ZTF19abdkgmf":
        print(oid)
        plot_object(oid)

In [None]:
dflarge.memory_usage(index=True).sum()

In [None]:
non_det.memory_usage(index=True).sum()

In [None]:
df.oid.isin(seloid).sum()

In [None]:
corr_f = []
corr_ext_f = []
masksigmapsf = dflarge.sigmapsf < 2
masksigmapsf_corr = dflarge.sigmapsf_corr < 2
masksigmapsf_corr_ext = dflarge.sigmapsf_corr_ext < 2
for oid in dflarge.objectId.unique():
    maskoid = dflarge.objectId == oid
    for fid in dflarge.loc[maskoid].fid.unique():
        mask = maskoid & (dflarge.fid == fid)
        #print(oid, fid, dflarge.loc[mask].isdiffpos.unique())
        corr_f.append(np.sum(mask & masksigmapsf_corr) / np.sum(mask & masksigmapsf))
        corr_ext_f.append(np.sum(mask & masksigmapsf_corr_ext) / np.sum(mask & masksigmapsf))
        #log["stellar%i" % fid] = dflarge.loc[mask].stellar_magstats.unique()
fig, ax = plt.subplots(ncols=2, figsize=(15, 6))
ax[0].hist(corr_f, bins=100);
ax[1].hist(corr_ext_f, bins=100);