In [6]:
import numpy as np
from astropy.io import ascii
import os 
from tqdm import tqdm
from astropy.time import Time
import pandas as pd
import warnings
import time
import random
from gatspy import periodic, datasets
import matplotlib.pyplot as plt
from astropy.table import Table
from gatspy import datasets, periodic

import scipy.stats as sci_stat
import sys

%matplotlib inline
%config InlineBackend.figure_format = "retina"
from matplotlib import rcParams
rcParams['savefig.dpi'] = 250
rcParams['font.size'] = 20

global data_path
data_path = '../data/plasticc/data/'

In [7]:
def generate_toi_table(data, meta_info, meta_theta_EB, meta_theta_RRL):
    """
    Generate table that contains the light curve ID and transient type. 

    Input
    -----
    data: Head data table that contains photometry
    meta_info: Table that contains the meta-data (i.e classification name)
    meta_theta_<TYPE>: Table that contains metadata information (i.e Period)
    
    """
    id_av_rrl, id_av_eb = [], []

    for uid in tqdm(np.unique(data['object_id'])):
        ww = np.where(meta_theta_EB['object_id'] == uid)
        if np.shape(ww)[-1]==1:
            id_av_eb.append(uid)

    for uid in tqdm(np.unique(data['object_id'])):
        ww = np.where(meta_theta_RRL['object_id'] == uid)
        if np.shape(ww)[-1]==1:
            id_av_rrl.append(uid)

    id_av_rrl, id_av_eb = np.array(id_av_rrl), np.array(id_av_eb)

    _id1 = np.array(['rrl' for _ in range(len(id_av_rrl))])
    _id2 = np.array(['eb' for _ in range(len(id_av_eb))])

    # All ID's and & ID tags
    all_id = np.concatenate([id_av_rrl, id_av_eb])
    _id_all = np.concatenate([_id1, _id2])

    # Final TOI table
    toi_table = Table([all_id, _id_all], names=('obj_id', 'type'))

    return toi_table

In [8]:
# Read ALL PlastiCC data & meta
data = pd.read_csv(data_path + "plasticc_train_lightcurves.csv.gz",
                  compression='gzip',
                  error_bad_lines=False)

meta_info = ascii.read(data_path + "plasticc_train_metadata.csv") # ascii meta since it's smaller
meta_theta_EB = ascii.read(data_path + 'plasticc_modelpar/' + 'plasticc_modelpar_016_EB.csv')
meta_theta_RRL = ascii.read(data_path + 'plasticc_modelpar/' + 'plasticc_modelpar_092_RRL.csv')

# Fetch all toi's
toi_table = generate_toi_table(data, meta_info, meta_theta_EB, meta_theta_RRL)

# Helper functions to read and digest plasticc data

100%|██████████| 7848/7848 [00:00<00:00, 22266.86it/s]
100%|██████████| 7848/7848 [00:00<00:00, 13941.64it/s]


In [42]:
x = 0
assert x>0, ("egg")

AssertionError: egg

In [24]:
# Helper functions to read and digest plasticc data

def generate_lc(obj_id, band='all', data_table=data, det=1):
    """Unpack and return PlastiCC data in numpy array format.
    
    Input:
    ------
    obj_id: Object ID
    band: Photometric bandpass filter. 'all' includes ugrizy, or 'ugrizy'
    data_table: Pandas data table containing the light curves
    det: Detection from the image subtraction algorithm. ==1 detection, ==0 not detection (i.e upper limit)
    """
    
    data_table_mod = data_table[data_table['detected_bool']==det]
    # Select light curve based on the ID 
    lc = data_table_mod[data_table_mod['object_id']==obj_id]
    
    lsst_bands = list('ugrizy') # lsst photomeric bands
    
    lc_array = lc.to_numpy()

    # Capture empty light curve
    assert len(lc_array[:,1])==0, ("Sorry, it seems like your obj_id query was wrong!")

    mjd, flux, flux_err = lc_array[:,1], lc_array[:,3], lc_array[:,4]
    flt = lc_array[:,2].astype(int).astype(str)    
    
    for j in range(6):
        flt[flt==str(j)] = lsst_bands[j]
    
    if band=='all':
        return mjd, flux, flux_err, flt
    else:
        return mjd[flt==band], flux[flt==band], flux_err[flt==band], flt[flt==band]
     

def fetch_type(lid, table=toi_table):
    """Fetch the classification type of transient given light curve table and original head TOI table."""
    return table[table['obj_id']==lid]


def fetch_meta_info(lc_id, lc_type):
    """Fetch metadata for transient type.
    
    Input
    -----
    lc_id: Light curve ID 
    lc_type: classification type (i.e rrl, eb)

    Output
    ------
    meta_<type>_table: Table that contains metadata (i.e period and other physical properties)
    """
    if lc_type=='rrl':
        # crossmatch to approprirate table
        xm_ = np.where(meta_theta_RRL['object_id']==lc_id)
        return meta_theta_RRL[xm_]
    elif lc_type=='eb':
        # crossmatch to approprirate table
        xm_ = np.where(meta_theta_EB['object_id']==lc_id)
        return meta_theta_EB[xm_]

# Write a function that will generate N random from each class (equal)
def draw_rand_trans(table, N=10, class_type='rrl'):
    """Given N this function will draw an equal number of trnasinets.
       Note: It will not draw the same transiennt
    """
    # isolate each unique class
    req_tab = table[table['type']==class_type]  
    
    # Random number generator w/o repeat
    rng = np.random.default_rng()
    rn = rng.choice(len(req_tab), size=N, replace=False)
    
    return req_tab[rn]  

def run_multi_lsp(x, y, err, fts, fmin=0.1, fmax=150, k=1, mode='fast', dt_cut=365, k_term_base=0):
    """Run all methods of multiband gatspy Lomb-Scargle Periodogram. 

        Input
        ------
        x, y, err, fts: phase, magnitudes/flux, error, filter list
        fmin, fmax: minimum and maximum search period in the Lomb-Scargle
        k (int): Number of Fourier components
        mode (str): LSP method. Currently supports 'fast' and "general"
        dt_cut (int): Maximum baseline time. Default is 1 year worth of photometry.

        Output
        ------
        best_period: Best period found from the highest peak in the LSP
        TODO: False Alarm Probability, TOP N peaks?!
    """
    
    try:
        # Pre-processing to photometry
        dt = x-x[0] # calculate baseline
        x, y, err, fts = x[dt<=dt_cut], y[dt<=dt_cut], err[dt<=dt_cut], fts[dt<=dt_cut]
        y += -1*min(y) # TODO: PLASTICC light curves can be negative. For now normalize such that they're at least positive
        dt = x-x[0] # evaluate baseline again!
        # Check fmax limit
        if max(dt)<fmax:
            fmax = max(dt)-3
    except:
        return np.nan
    
    if mode=='fast':
        try:
            model = periodic.LombScargleMultibandFast(fit_period=True,optimizer_kwds={"quiet": True},
                                  Nterms=k)
            model.optimizer.set(period_range=(fmin, fmax))
            model = model.fit(x, y, dy=err, filts=fts)
            return model.best_period
        except:
            return np.nan
    elif mode=='general':
        try:
            model = periodic.LombScargleMultiband(fit_period=True,optimizer_kwds={"quiet": True},
                      Nterms_base=k_term_base, Nterms_band=k)
            model.optimizer.set(period_range=(fmin, fmax))
            model = model.fit(x, y, dy=err, filts=fts)
            return model.best_period
        except:
            return np.nan
        

def run_single_lsp(x, y, err, fts, band='u', fmin=0.1, fmax=150, k=1, mode='fast', dt_cut=365):
    """Run all methods of single-band gatspy Lomb-Scargle Periodogram. 

        Input
        ------
        x, y, err, fts: phase, magnitudes/flux, error, filter list
        band (str): Photometric band you want to run LSP. Currently supports 'ugrizy'.
        fmin, fmax: minimum and maximum search period in the Lomb-Scargle
        k (int): Number of Fourier components
        mode (str): LSP method. Currently supports 'fast' and "general"
        dt_cut (int): Maximum baseline time. Default is 1 year worth of photometry.

        Output
        ------
        best_period: Best period found from the highest peak in the LSP
        TODO: False Alarm Probability, TOP N peaks?!
    """
    
    try:
        # Pre-processing to photometry
        dt = x-x[0] # calculate transient duration
        x, y, err, fts = x[dt<=dt_cut], y[dt<=dt_cut], err[dt<=dt_cut], fts[dt<=dt_cut]
        y += -1*min(y)
        dt = x-x[0] # updated dt
        
        # isolate photometric band
        x, y, err = x[fts==band], y[fts==band], err[fts==band]
        
        # Check fmax limit
        if max(dt)<fmax:
            fmax = max(dt)-5 
        
    except:
        return np.nan
    
    if mode=='fast':
        try:
            model = periodic.LombScargleFast(fit_period=True,optimizer_kwds={"quiet": True},
                                  Nterms=1)
            model.optimizer.set(period_range=(fmin, fmax))
            model = model.fit(x, y, dy=err)
            return model.best_period
        except:
            return np.nan
    elif mode=='general':
        try:
            model = periodic.LombScargleMultiband(fit_period=True,optimizer_kwds={"quiet": True},
                      Nterms_base=k)
            model.optimizer.set(period_range=(fmin, fmax))
            model = model.fit(x, y, dy=err)
            return model.best_period
        except:
            return np.nan
  

def generate_tags(kmax):
    """Generate titles for master table on LSP analysis"""
    # Create data table
    m_lsp_name_fast_list = []
    m_lsp_name_gen_list = []
    for i in range(kmax):
        m_lsp_name_fast_list.append('multi_lsp_f'+f'{i+1}')
        m_lsp_name_gen_list.append('multi_lsp_g'+f'{i+1}')
        
    s_lsp_gen_list = []
    for iii in range(kmax):
        for jj, band_name in enumerate(list('ugrizy')):
            s_lsp_gen_list.append('s_lsp_g'+f'{iii+1}'+f'_{band_name}')
            
    s_lsp_fast_list = [] 
    for band_name in list('ugrizy'):
        s_lsp_fast_list.append(f's_lsp_f_{band_name}')
        
    master_names = np.concatenate([['id'], ['ndet'], ['ptrue'], m_lsp_name_fast_list, m_lsp_name_gen_list, s_lsp_gen_list, s_lsp_fast_list])
    
    return master_names

In [25]:
draw_rand_trans(toi_table, N=10, class_type='eb')

obj_id,type
int64,str3
60478784,eb
15700,eb
98154518,eb
60376,eb
84871886,eb
36512644,eb
118970396,eb
37856107,eb
27214925,eb
30673,eb


In [26]:
lc = generate_lc(61510669, band='all')

[61510669. 61510669. 61510669. 61510669. 61510669. 61510669. 61510669.
 61510669. 61510669. 61510669. 61510669.] 11


AssertionError: Sorry, it seems like your obj_id query was wrong!

In [23]:
lc[0]

array([59887.3397, 59907.2138, 59957.2404, 59972.0645, 59985.1984,
       60330.0709, 60357.0909, 60385.0897, 60458.9793, 60622.251 ,
       60640.3122])