# First algorithmic question: 
how to identify high-cadence subsets within arbitrary sparse lightcurves?

Ideas:
1. select all observations for a given object (even with catflags $\neq 0$)
2. concatenate mjd with all filters and sort this array by their value of mjd
    * or do this with same filter?
    * or do this just with the r-band
        * from the high cadence survey paper (https://academic.oup.com/mnras/article/505/1/1254/6274690)
            * All observations of the high-cadence Galactic plane survey were obtained in ZTF-r band.
3. using the sorted array of mjd define an array of $\Delta t$
4. define max_dt between observations to be considered high-cadence
5. define min_len(ght) of continuous observations to be considered high-cadence
    * because one can have consecutive observations of 4 adjacent fields ($> 4$)
        * from the high cadence survey paper (https://academic.oup.com/mnras/article/505/1/1254/6274690) 
            * cadence of 40 sec
            * either observed one field or alternated between two adjacent fields continuously for ≈1.5–3 h on two to three consecutive nights in the ZTF-r band.
                * In June, we observed every field continuously for 1 h15 min and in July for 1 h 25 min.
                * We alternated between two adjacent fields continuously for 2 h 40min each night. Because more time was available each night most fields were observed for ≈3 h. The same fields were repeated the following night.
                * We lost only a total of five nights due to weather during June/July and August observations.
                
Idea: high cadence observations with $\Delta t_{max}$ of $4 \cdot 40s$  (160 s) during at least 1 hr 15 min (or 1 hr) $\approx 22 - 28 $ observations

In [2]:
sec_to_day = 1 / (24 * 60 * 60)
max_dt_hc = 4 * 40 * sec_to_day

min_consec_obs = int((75 * 60) / (4 * 40))

In [3]:
min_consec_obs

28

In [4]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import astropy.units as u
%matplotlib inline
%config InlineBackend.figure_format = "retina"
from matplotlib import rcParams
rcParams['savefig.dpi'] = 550
rcParams['font.size'] = 20
plt.rc('font', family='serif')

import lsdb
#from lsdb import lsdb_client
#client = lsdb_client(dask_on_ray=False, num_workers=12)

import tape

import dask 

dask.config.set({"temporary-directory" :'/epyc/ssd/users/fbcb/tmp'})

from dask.distributed import Client
client = Client(n_workers=10, threads_per_worker=10,
                memory_limit="60G")



In [5]:
tape.__version__

'0.3.3'

In [6]:
# Load ZTF object table
ztf = lsdb.read_hipscat("/data3/epyc/data3/hipscat/catalogs/ztf_axs/ztf_dr14")

# Load ZTF DR14 source table
ztf_src = lsdb.read_hipscat("/data3/epyc/data3/hipscat/catalogs/ztf_axs/ztf_source")

In [7]:
ss = ztf.join(ztf_src, left_on="ps1_objid", right_on="ps1_objid")

# Functions

In [8]:
def identify_hc_objects_band(mjd, band, flag, mag, magerr, band_name, 
                             max_dt = max_dt_hc, min_len = min_consec_obs):
    
    if (band.size > 0):
        
#        if np.all(np.isnan(band)):
#            return 0, np.nan, np.nan
        
#        else:
        arr_band_name = np.array([band_name])
        bands_obs = np.unique(band)
    
        if np.isin(arr_band_name, bands_obs, invert=True)[0]:
            return 0, np.nan, np.nan

        band_mask = ((band == band_name) & (flag == 0) & (magerr>0)  
                    & (~np.isnan(mjd)) & (~np.isnan(mag)) & (~np.isnan(magerr))) # remove nans!
    
        if np.sum(band_mask) == 0:
            return 0, np.nan, np.nan
    
        else:
                # sort time
            srt = mjd.argsort() # np.array(mjd).argsort()
            mjd = mjd[srt] # np.array(mjd)[srt]
        
            mjd_band = mjd[band_mask]
    
            num_obs = len(mjd_band)
    
            if num_obs < min_len:
                return 0, np.nan, np.nan
    
            else:
                    # calculate the difference in mjd for the same band
                dt = np.diff(mjd_band)
                len_dt = num_obs - 1
        
                    # create an array of indexes of delta t
                idx_dt = np.indices((len_dt,))[0]
    
                    # create a mask indicating if dt > max_dt
                above_max_dt = (dt > max_dt)
        
                    # select indexes that satisfy dt > max_dt
                idx_mask = idx_dt[above_max_dt]
            
                    # all observations are hc observations
                if (len(idx_mask) == 0):
                    mjd_start_hc = np.min(mjd_band)
                    mjd_end_hc = np.max(mjd_band)
                    return 1, mjd_start_hc, mjd_end_hc
            
                else:
                        # edge cases
                    idx_0 = idx_mask[0]
                    last_idx = idx_mask[-1]
                
                    first_obs_hc = (idx_0 >= min_len)
                    last_obs_hc = (len_dt - last_idx >= min_len)
                
                        # center case
                    idx_mask_diff = np.diff(idx_mask)
                    high_cadence_mask = (idx_mask_diff >= (min_len + 1))
                    center_obs_hc = (np.sum(high_cadence_mask) >= 1)
                    
                    if center_obs_hc:
                        idx_start_hc = idx_mask[:-1][high_cadence_mask] + 1
                        idx_end_hc = idx_start_hc + idx_mask_diff[high_cadence_mask] - 1
                        
                        mjd_start_hc = mjd_band[idx_start_hc]
                        mjd_end_hc = mjd_band[idx_end_hc]
                    
                        if first_obs_hc:
                            mjd_1st_hc_start = np.min(mjd_band)
                            mjd_1st_hc_end = mjd_band[idx_0 - 1]
                        
                            mjd_s_hc = np.append(mjd_1st_hc_start, mjd_start_hc)
                            mjd_e_hc = np.append(mjd_1st_hc_end, mjd_end_hc)
                        
                            if last_obs_hc:
                                mjd_last_hc_start = mjd_band[last_idx + 1]
                                mjd_last_hc_end = mjd_band[-1]
                            
                                mjd_s_hc = np.append(mjd_s_hc, mjd_last_hc_start)
                                mjd_e_hc = np.append(mjd_e_hc, mjd_last_hc_end)
                            
                                return 1, mjd_s_hc, mjd_e_hc
                        
                            else:
                                return 1, mjd_s_hc, mjd_e_hc
                        
                        elif last_obs_hc:
                            
                            mjd_last_hc_start = mjd_band[last_idx + 1]
                            mjd_last_hc_end = mjd_band[-1]
                            
                            mjd_s_hc = np.append(mjd_start_hc, mjd_last_hc_start)
                            mjd_e_hc = np.append(mjd_end_hc, mjd_last_hc_end)
                        
                            return 1, mjd_s_hc, mjd_e_hc 
                        
                        else:
                            return 1, mjd_start_hc, mjd_end_hc
                    
                
                    elif first_obs_hc:
                        mjd_1st_hc_start = np.min(mjd_band)
                        mjd_1st_hc_end = mjd_band[idx_0 - 1]
                        
                        if last_obs_hc:
                            mjd_last_hc_start = mjd_band[last_idx + 1]
                            mjd_last_hc_end = mjd_band[-1]
                            
                            mjd_s_hc = np.append(mjd_1st_hc_start, mjd_last_hc_start)
                            mjd_e_hc = np.append(mjd_1st_hc_end, mjd_last_hc_end)
                            
                            return 1, mjd_s_hc, mjd_e_hc
                        
                        else:
                            return 1, mjd_1st_hc_start, mjd_1st_hc_end
                    
                    elif last_obs_hc:
                        mjd_last_hc_start = mjd_band[last_idx + 1]
                        mjd_last_hc_end = mjd_band[-1]
                            
                        return 1, mjd_last_hc_start, mjd_last_hc_end
                
                    else:
                        return 0, np.nan, np.nan

    else:
        return 0, np.nan, np.nan

In [9]:
# Define output columns
output_cols = ["high_cad_g", "mjd_start_high_cad_g", "mjd_end_high_cad_g", 
               "high_cad_r", "mjd_start_high_cad_r", "mjd_end_high_cad_r",
               "high_cad_i", "mjd_start_high_cad_i", "mjd_end_high_cad_i",]

# Define DataFrame with loc and scale as meta
my_meta = pd.DataFrame(columns=output_cols, dtype=float)

# ** kwargs
def determine_hc(mjd, band, flag, mag, magerr): #**kwargs
    # determine if a given object has high cadence observations in any band
    
    hc_g, mjd_s_g, mjd_e_g = identify_hc_objects_band(mjd, band, flag, mag, magerr, 'g')
    hc_r, mjd_s_r, mjd_e_r = identify_hc_objects_band(mjd, band, flag, mag, magerr, 'r')
    hc_i, mjd_s_i, mjd_e_i = identify_hc_objects_band(mjd, band, flag, mag, magerr, 'i')
    
    return pd.Series([hc_g, mjd_s_g, mjd_e_g, hc_r, mjd_s_r, mjd_e_r, hc_i, mjd_s_i, mjd_e_i],index=output_cols)

# Try to do this with tape

Need to run output_cols and my_meta again

In [10]:
import dask.dataframe as dd
from tape import Ensemble, ColumnMapper

In [11]:
# Initialize an Ensemble
ens = Ensemble(client=client)
ens.client_info()

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 10
Total threads: 100,Total memory: 558.79 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:37136,Workers: 10
Dashboard: http://127.0.0.1:8787/status,Total threads: 100
Started: 2 minutes ago,Total memory: 558.79 GiB

0,1
Comm: tcp://127.0.0.1:43227,Total threads: 10
Dashboard: http://127.0.0.1:39190/status,Memory: 55.88 GiB
Nanny: tcp://127.0.0.1:40289,
Local directory: /epyc/ssd/users/fbcb/tmp/dask-scratch-space/worker-06g22svo,Local directory: /epyc/ssd/users/fbcb/tmp/dask-scratch-space/worker-06g22svo

0,1
Comm: tcp://127.0.0.1:44959,Total threads: 10
Dashboard: http://127.0.0.1:46776/status,Memory: 55.88 GiB
Nanny: tcp://127.0.0.1:45519,
Local directory: /epyc/ssd/users/fbcb/tmp/dask-scratch-space/worker-i38esh1e,Local directory: /epyc/ssd/users/fbcb/tmp/dask-scratch-space/worker-i38esh1e

0,1
Comm: tcp://127.0.0.1:36518,Total threads: 10
Dashboard: http://127.0.0.1:37563/status,Memory: 55.88 GiB
Nanny: tcp://127.0.0.1:34897,
Local directory: /epyc/ssd/users/fbcb/tmp/dask-scratch-space/worker-44_rcv70,Local directory: /epyc/ssd/users/fbcb/tmp/dask-scratch-space/worker-44_rcv70

0,1
Comm: tcp://127.0.0.1:40965,Total threads: 10
Dashboard: http://127.0.0.1:35383/status,Memory: 55.88 GiB
Nanny: tcp://127.0.0.1:45086,
Local directory: /epyc/ssd/users/fbcb/tmp/dask-scratch-space/worker-mk70i4cs,Local directory: /epyc/ssd/users/fbcb/tmp/dask-scratch-space/worker-mk70i4cs

0,1
Comm: tcp://127.0.0.1:39549,Total threads: 10
Dashboard: http://127.0.0.1:40179/status,Memory: 55.88 GiB
Nanny: tcp://127.0.0.1:37403,
Local directory: /epyc/ssd/users/fbcb/tmp/dask-scratch-space/worker-rwh11417,Local directory: /epyc/ssd/users/fbcb/tmp/dask-scratch-space/worker-rwh11417

0,1
Comm: tcp://127.0.0.1:40872,Total threads: 10
Dashboard: http://127.0.0.1:43401/status,Memory: 55.88 GiB
Nanny: tcp://127.0.0.1:38129,
Local directory: /epyc/ssd/users/fbcb/tmp/dask-scratch-space/worker-b0e78698,Local directory: /epyc/ssd/users/fbcb/tmp/dask-scratch-space/worker-b0e78698

0,1
Comm: tcp://127.0.0.1:41072,Total threads: 10
Dashboard: http://127.0.0.1:43771/status,Memory: 55.88 GiB
Nanny: tcp://127.0.0.1:40101,
Local directory: /epyc/ssd/users/fbcb/tmp/dask-scratch-space/worker-vtslxl3z,Local directory: /epyc/ssd/users/fbcb/tmp/dask-scratch-space/worker-vtslxl3z

0,1
Comm: tcp://127.0.0.1:36797,Total threads: 10
Dashboard: http://127.0.0.1:42597/status,Memory: 55.88 GiB
Nanny: tcp://127.0.0.1:43535,
Local directory: /epyc/ssd/users/fbcb/tmp/dask-scratch-space/worker-rqakihq8,Local directory: /epyc/ssd/users/fbcb/tmp/dask-scratch-space/worker-rqakihq8

0,1
Comm: tcp://127.0.0.1:34828,Total threads: 10
Dashboard: http://127.0.0.1:43726/status,Memory: 55.88 GiB
Nanny: tcp://127.0.0.1:45777,
Local directory: /epyc/ssd/users/fbcb/tmp/dask-scratch-space/worker-nf8cv5vg,Local directory: /epyc/ssd/users/fbcb/tmp/dask-scratch-space/worker-nf8cv5vg

0,1
Comm: tcp://127.0.0.1:34553,Total threads: 10
Dashboard: http://127.0.0.1:42015/status,Memory: 55.88 GiB
Nanny: tcp://127.0.0.1:45475,
Local directory: /epyc/ssd/users/fbcb/tmp/dask-scratch-space/worker-2i5f6vyl,Local directory: /epyc/ssd/users/fbcb/tmp/dask-scratch-space/worker-2i5f6vyl


In [12]:
# ColumnMapper Establishes which table columns map to timeseries quantities
colmap = ColumnMapper(
        id_col='_hipscat_index',
        time_col='mjd',
        flux_col='mag',
        err_col='magerr',
        band_col='band',
      )

In [13]:
# We can read from parquet
ens.from_dask_dataframe(
    source_frame=ss._ddf,
    object_frame=ztf._ddf,
    column_mapper=colmap,
    sync_tables=False, # Avoid doing an initial sync
    sorted=True, # If the input data is already sorted by the chosen index
    sort=False,
)

<tape.ensemble.Ensemble at 0x7f15faffbe80>

In [14]:
ens.source

Unnamed: 0_level_0,ps1_objid_ztf_dr14,ra_ztf_dr14,dec_ztf_dr14,ps1_gMeanPSFMag_ztf_dr14,ps1_rMeanPSFMag_ztf_dr14,ps1_iMeanPSFMag_ztf_dr14,nobs_g_ztf_dr14,nobs_r_ztf_dr14,nobs_i_ztf_dr14,mean_mag_g_ztf_dr14,mean_mag_r_ztf_dr14,mean_mag_i_ztf_dr14,Norder_ztf_dr14,Dir_ztf_dr14,Npix_ztf_dr14,index_ztf_source,ps1_objid_ztf_source,ra_ztf_source,dec_ztf_source,ps1_gMeanPSFMag_ztf_source,ps1_rMeanPSFMag_ztf_source,ps1_iMeanPSFMag_ztf_source,nobs_g_ztf_source,nobs_r_ztf_source,nobs_i_ztf_source,mean_mag_g_ztf_source,mean_mag_r_ztf_source,mean_mag_i_ztf_source,catflags_ztf_source,fieldID_ztf_source,mag_ztf_source,magerr_ztf_source,mjd_ztf_source,rcID_ztf_source,band_ztf_source,Norder_ztf_source,Dir_ztf_source,Npix_ztf_source
npartitions=311037,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1,Unnamed: 30_level_1,Unnamed: 31_level_1,Unnamed: 32_level_1,Unnamed: 33_level_1,Unnamed: 34_level_1,Unnamed: 35_level_1,Unnamed: 36_level_1,Unnamed: 37_level_1,Unnamed: 38_level_1
0,int64,float64,float64,float64,float64,float64,int32,int32,int32,float64,float64,float64,int32,int32,int32,int64,int64,float64,float64,float64,float64,float64,int32,int32,int32,float64,float64,float64,int16,int16,float32,float32,float64,int16,string,int32,int32,int32
281474976710656,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
13834987686537986048,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
18446744073709551615,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [15]:
ens.source.query('~band_ztf_source.isnull()').update_ensemble()

<tape.ensemble.Ensemble at 0x7f15faffbe80>

In [16]:
ens.sample(frac=0.0001)

<tape.ensemble.Ensemble at 0x7f18aa562fe0>

In [17]:
calc_ = ens.batch(
    determine_hc,
    'mjd_ztf_source', 'band_ztf_source', 
    'catflags_ztf_source', 
    'mag_ztf_source', 'magerr_ztf_source', 
    meta=my_meta,
    use_map=True)

Using generated label, result_1, for a batch result.


In [18]:
ens.object.join(calc_).update_ensemble()

<tape.ensemble.Ensemble at 0x7f15faffbe80>

In [None]:
#ens.dropna(table="source", subset="ztf_source_band")

In [None]:
len(ens.object)

In [19]:
ens.object.compute()

This may cause some slowdown.
Consider scattering data ahead of time and using futures.
Process Dask Worker process (from Nanny):
Process Dask Worker process (from Nanny):
2024-02-23 12:32:40,321 - distributed.nanny - ERROR - Worker process died unexpectedly
2024-02-23 12:32:40,321 - distributed.nanny - ERROR - Worker process died unexpectedly
2024-02-23 12:32:40,321 - distributed.nanny - ERROR - Worker process died unexpectedly
Process Dask Worker process (from Nanny):
2024-02-23 12:32:40,322 - distributed.nanny - ERROR - Worker process died unexpectedly
Process Dask Worker process (from Nanny):
Process Dask Worker process (from Nanny):
2024-02-23 12:32:40,322 - distributed.nanny - ERROR - Worker process died unexpectedly
2024-02-23 12:32:40,322 - distributed.nanny - ERROR - Worker process died unexpectedly
Process Dask Worker process (from Nanny):
Process Dask Worker process (from Nanny):
Process Dask Worker process (from Nanny):
Process Dask Worker process (from Nanny):
Process Dask

In [None]:
ens.save_ensemble(
    ".",
    "ensemble",
    additional_frames=["result_1"],
)