In [1]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import astropy.units as u
%matplotlib inline
%config InlineBackend.figure_format = "retina"
from matplotlib import rcParams
rcParams['savefig.dpi'] = 550
rcParams['font.size'] = 20
plt.rc('font', family='serif')
from tqdm import tqdm
import lsdb
from dask.distributed import Client
import dask
import dask.dataframe as dd
from tape import Ensemble, ColumnMapper
from tqdm import tqdm


## For batch analysis ##
from astropy import stats as astro_stats
import astropy.units as u
from astropy.modeling.models import Gaussian1D
from astropy.modeling import fitting
from scipy import stats
from scipy.optimize import curve_fit
from scipy import interpolate

from astropy.io import ascii
from scipy.signal import find_peaks
import scipy.integrate as integrate
from scipy.interpolate import interp1d
from scipy.signal import savgol_filter

dask.config.set({"temporary-directory" :'/epyc/ssd/users/atzanida/tmp'})
dask.config.set({"dataframe.shuffle-compression": 'Snappy'})
dask.config.set({"dataframe.convert-string": False})



<dask.config.set at 0x7f4b40155060>

## Custom Functions

In [2]:
def prepare_lc(time, mag, mag_err, flag, band, band_of_study='r', flag_good=0, q=None, custom_q=False):
    """
    Prepare the light curve for analysis - specifically for the ZTF data.
    
    Parameters:
    -----------
    time (array-like): Input time values.
    mag (array-like): Input magnitude values.
    mag_err (array-like): Input magnitude error values.
    flag (array-like): Input flag values.
    band (array-like): Input band values.
    band_of_study (str): Band to study. Default is 'r' band
    flag_good (int): Flag value for good detections. Default is 0 (see ZTF documentation)

    Returns:
    --------
    time (array-like): Output time values.
    mag (array-like): Output magnitude values.
    mag_err (array-like): Output magnitude error values.
    """

    # Selection and preparation of the light curve (default selection on )
    rmv = (flag == flag_good) & (mag_err>0) & (band==band_of_study) & (~np.isnan(time)) & (~np.isnan(mag)) & (~np.isnan(mag_err)) # remove nans!

    time, mag, mag_err = time[rmv], mag[rmv], mag_err[rmv]

    # sort time
    srt = time.argsort()

    time, mag, mag_err = time[srt], mag[srt], mag_err[srt]
    ts = abs(time - np.roll(time, 1)) > 1e-5

    time, mag, mag_err = time[ts], mag[ts], mag_err[ts]

    # Remove observations that are <0.5 day apart
    cut_close_time = np.where(np.diff(time) < 0.5)[0] + 1
    time, mag, mag_err  = np.delete(time, cut_close_time), np.delete(mag, cut_close_time), np.delete(mag_err, cut_close_time)

    return time, mag, mag_err


def best_peak_detector(peak_dictionary, min_in_dip=1):
    """Chose the best peak from the peak detector with a minimum number of detections threshold. 
    
    Parameters:
    -----------
    peak_dictionary (dict): Dictionary of the peaks.
    min_in_dip (int): Minimum number of detections in the dip. Default is 3 detections.

    Returns:
    --------
    pd.DataFrame: Table of the best dip properties.
    """
    # unpack dictionary
    N_peaks, dict_summary = peak_dictionary
    
    summary_matrix = np.zeros(shape=(N_peaks, 9)) # TODO: add more columns to this matrix
    for i, info in enumerate(dict_summary.keys()):
        summary_matrix[i,:] = np.array(list(dict_summary[f'{info}'].values()))

    dip_table = pd.DataFrame(summary_matrix, columns=['peak_loc', 'window_start', 'window_end', 'N_1sig_in_dip', 'N_in_dip', 'loc_forward_dur', 'loc_backward_dur', 'dip_power', 'average_dt_dif'])

    return dip_table
    
def deviation(mag, mag_err, R, S):
    """Calculate the running deviation of a light curve for outburst or dip detection.
    
    d >> 0 will be dimming
    d << 0 (or negative) will be brightenning
    
    
    Parameters:
    -----------
    mag (array-like): Magnitude values of the light curve.
    mag_err (array-like): Magnitude errors of the light curve.
    R (float): Biweight location of the light curve (global).
    S (float): Biweight scale of the light curve (global).

    Returns:
    --------
    dev (array-like): Deviation values of the light curve.
    """
    # Calculate biweight estimators
    return (mag - R) / np.sqrt(mag_err**2 + S**2) 


def gaus(x, a, x0, sigma, ofs):
    """"Calculate a simple Gaussian function with a term offset"""
    return a*np.exp(-(x-x0)**2/(2*sigma**2)) + ofs

def auto_fit(x, y, loc, base, return_model=False):
    """Perform a Gaussian function auto fitting with some lose priors."""
    try:
        popt, pcov = curve_fit(gaus, 
                                x,
                                y,
                                p0=[1, loc, 1, base],
                            bounds=((0.1, loc-5, 0.1, base-2),
                                    (np.inf, loc+5, np.inf, base+2)))
    except: # if fails return zeros...
        popt, pcov = [0, 0, 0, 0], [0, 0, 0, 0]
    
    if return_model:
        return gaus(x, *popt)
    else:
        return popt

def fwhm_calc(pop):
    """Add parameters recovered"""
    return 2.355 * pop[2] # fwhm 

def calc_sum_score(xdat, ydat, peak_dict, base, rms):
    """Calculate score"""
    score_term = 0
    for i in range(peak_dict[0]):
        event = peak_dict[1][f'dip_{i}']
        loc = event['peak_loc']
        powr = event['dip_power']
        Ndet = event['N_1sig_in_dip']
        
        fit_temrs = auto_fit(xdat, ydat,
                             loc, base, return_model=False)
        fwhm = fwhm_calc(fit_temrs)
        
        score_term += fwhm * powr * Ndet
        
    return (1/(peak_dict[0])) * (1/rms) * score_term


def detect_bursts_edges(time, mag, center_time, baseline_mean, baseline_std, burst_threshold=3.0, expansion_indices=1):
    """
    Detect bursts in a time series using linear interpolation. powered by GPT. 

    Parameters:
    -----------
    time (array-like): Time values of the light curve.
    mag (array-like): Magnitude values of the light curve.
    center_time (float): Center time of the burst.
    baseline_mean (float): Mean of the baseline.
    baseline_std (float): Standard deviation of the baseline.
    burst_threshold (float): Threshold for burst detection. Default is 3.0.
    expansion_indices (int): Number of indices to expand the burst region. Default is 1.

    Returns:
    --------
    burst_start (float): Start time of the burst.
    burst_end (float): End time of the burst.
    """


    # Initialize burst_start and burst_end
    burst_start = burst_end = np.searchsorted(time, center_time)

    # Find burst start
    while burst_start > 0:
        burst_start -= 1
        if mag[burst_start] < baseline_mean + burst_threshold * baseline_std:
            break

    # Find burst end
    while burst_end < len(time) - 1:
        burst_end += 1
        if mag[burst_end] < baseline_mean + burst_threshold * baseline_std:
            break

    # Expand burst region towards the beginning
    burst_start = max(0, burst_start - expansion_indices)

    # Expand burst region towards the end
    burst_end = min(len(time) - 1, burst_end + expansion_indices)

    # Final start and end
    t_start, t_end = time[burst_start], time[burst_end]

    # How many detections above 2std above the mean?
    N_thresh_1 = len((mag[(time>t_start) & (time<t_end)]>baseline_mean + 2*baseline_std))

    return t_start, t_end, abs(t_start-center_time), abs(t_end-center_time), N_thresh_1, 0, 0

def peak_detector(times, dips, power_thresh=3, peak_close_rmv=15, pk_2_pk_cut=30):
    """
    Run and compute dip detection algorithm on a light curve.
    
    Parameters:
    -----------
    times (array-like): Time values of the light curve.
    dips (array-like): Deviation values of the light curve.
    power_thresh (float): Threshold for the peak detection. Default is 3.
    peak_close_rmv (float): Tolerance for removing peaks that are too close to each other. Default is 15.
    pk_2_pk_cut (float): Minimum peak to peak separation. Default is 30 days.

    Returns:
    --------
    N_peaks (int): Number of peaks detected.
    dip_summary (dict): Summary of the dip. Including the peak location, the window start and end, the number of 1 sigma detections in the dip, the number of detections in the dip, the forward and backward duration of the dip, and the dip power.
    """
    try:
        if len(dips)==0:
            return 0, 0

        #TODO: add smoothing savgol_filter again...
        yht = dips

        # Scipy peak finding algorithm
        pks, _ = find_peaks(yht, height=power_thresh, distance=pk_2_pk_cut) #TODO: is 100 days peak separation too aggresive?

        # Reverse sort the peak values
        pks = np.sort(pks)[::-1]
        
        # Time of peaks and dev of peaks
        t_pks, p_pks = times[pks], dips[pks]
        
        # Number of peaks
        N_peaks = len(t_pks)
        
        dip_summary = {}
        for i, (time_ppk, ppk) in enumerate(zip(t_pks, p_pks)):
            #TODO: old version
            #_edges = calc_dip_edges(times, dips, time_ppk, atol=0.2)
            _edges = detect_bursts_edges(times, dips, time_ppk, np.nanmean(dips), np.nanstd(dips), burst_threshold=3.0, expansion_indices=1)
            # t_start, t_end, abs(t_start-center_time), abs(t_end-center_time), N_thresh_1, 0, 0 : above. #TODO: remove this!
            
            dip_summary[f'dip_{i}'] = {
                "peak_loc": time_ppk,
                'window_start': _edges[0],
                'window_end': _edges[1],
                "N_1sig_in_dip": _edges[-3], # number of 1 sigma detections in the dip
                "N_in_dip": _edges[-3], # number of detections in the dip
                'loc_forward_dur': _edges[2],
                "loc_backward_dur": _edges[3],
                "dip_power":ppk,
                "average_dt_dif": _edges[-1]
            }
                    
        return N_peaks, dip_summary
    except:
        return 0, 0


def full_evaluate(time_cat, mag_cat, mag_err_cat, flag_cat, band_cat):
    
    # Digest my light curve. Select band, good detections & sort
    time, mag, mag_err = prepare_lc(time_cat, mag_cat, mag_err_cat, flag_cat, band_cat, 
                                    band_of_study='r', flag_good=0, q=None, custom_q=False)
    
    # Digest my light curve. Select band, good detections & sort
    time_g, mag_g, mag_err_g = prepare_lc(time_cat, mag_cat, mag_err_cat, flag_cat, band_cat, 
                                    band_of_study='g', flag_good=0, q=None, custom_q=False)
    
    if len(time)>10 and len(time_g)>10:
        
        # Evaluate biweight location and scale & other obvious statistics
        R, S = astro_stats.biweight.biweight_location(mag), astro_stats.biweight.biweight_scale(mag)

        # Running deviation
        running_deviation = deviation(mag, mag_err, R, S)

        # Peak detection summary per light curve
        peak_detections = peak_detector(time, running_deviation, power_thresh=4, peak_close_rmv=20, pk_2_pk_cut=20)
        
        # Find the best peak
        bp = best_peak_detector(peak_detections, min_in_dip=3)
        
        if peak_detections[0]>0:
            
            # Investigate the g-band data and ensure we see a ~significant~ event 
            g_validate, out_g = False, 0
                
            Rg, Sg = astro_stats.biweight.biweight_location(mag_g), astro_stats.biweight.biweight_scale(mag_g)
            
            running_deviation_g = deviation(mag_g, mag_err_g, Rg, Sg)

            best_peak_time = bp['peak_loc'].values[0]
            sel_g = np.where((time_g > best_peak_time-3) & (time_g < best_peak_time+3)) # peak within +/- 3 days
            xg, yg, yerrg = time_g[sel_g], mag_g[sel_g], mag_err_g[sel_g]

            Rg_mod, Sg_mod = astro_stats.biweight.biweight_location(yg), astro_stats.biweight.biweight_scale(yg)

            yg_dev = deviation(yg, yerrg, Rg, Sg)
            
            if (len(xg) == 0) or (g_validate==False): # reject if there's no detections...
                g_validate = False
            else:
                g_validate = True
                # Calculate the significance of this g-band bump...
                out_g = (np.nanmean(yg_dev)-np.nanmean(running_deviation_g))/(np.nanstd(running_deviation_g))
                
            if g_validate and out_g >1.5: # both r-band and g-band data show similar peaks...
                _score_ = calc_sum_score(time, mag, peak_detections, R, S)
                
                return _score_, peak_detections[0]/(time[-1]-time[0]), peak_detections[0], bp['N_in_dip'].values[0]
            else:
                return 0, 0, 0, 0
        else:
            return 0, 0, 0, 0
    else:
        return 0, 0, 0, 0 
    
def eval_prelim(time_cat, mag_cat, mag_err_cat, flag_cat, band_cat):
    
    # Digest my light curve. Select band, good detections & sort
    time, mag, mag_err = prepare_lc(time_cat, mag_cat, mag_err_cat, flag_cat, band_cat, 
                                    band_of_study='r', flag_good=0, q=None, custom_q=False)
    
    # Digest my light curve. Select band, good detections & sort
    time_g, mag_g, mag_err_g = prepare_lc(time_cat, mag_cat, mag_err_cat, flag_cat, band_cat, 
                                    band_of_study='g', flag_good=0, q=None, custom_q=False)
    
    if len(time)>10 and len(time_g)>10: 
        # Evaluate biweight location and scale & other obvious statistics
        R, S = astro_stats.biweight.biweight_location(mag), astro_stats.biweight.biweight_scale(mag)

        # Running deviation
        running_deviation = deviation(mag, mag_err, R, S)

        # Peak detection summary per light curve
        peak_detections = peak_detector(time, running_deviation, power_thresh=4, peak_close_rmv=20, pk_2_pk_cut=20)
        
        
        if peak_detections[0]>0:
            del time, mag, mag_err, time_g, mag_g, mag_err_g, R, S, running_deviation
            return peak_detections[0]          
    else:
        del time, mag, mag_err, time_g, mag_g, mag_err_g
        return 0   

In [3]:
client = Client(n_workers=25, memory_limit="auto")

In [4]:
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 25
Total threads: 100,Total memory: 1.02 TiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:34099,Workers: 25
Dashboard: http://127.0.0.1:8787/status,Total threads: 100
Started: Just now,Total memory: 1.02 TiB

0,1
Comm: tcp://127.0.0.1:35324,Total threads: 4
Dashboard: http://127.0.0.1:39287/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:33597,
Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-4ritbl9c,Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-4ritbl9c

0,1
Comm: tcp://127.0.0.1:39255,Total threads: 4
Dashboard: http://127.0.0.1:39914/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:38381,
Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-ysm5ou7s,Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-ysm5ou7s

0,1
Comm: tcp://127.0.0.1:34236,Total threads: 4
Dashboard: http://127.0.0.1:46323/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:44341,
Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-orqnkfte,Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-orqnkfte

0,1
Comm: tcp://127.0.0.1:46463,Total threads: 4
Dashboard: http://127.0.0.1:44686/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:32999,
Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-zd9_ux86,Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-zd9_ux86

0,1
Comm: tcp://127.0.0.1:36372,Total threads: 4
Dashboard: http://127.0.0.1:38189/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:39415,
Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-w38l397q,Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-w38l397q

0,1
Comm: tcp://127.0.0.1:44892,Total threads: 4
Dashboard: http://127.0.0.1:33512/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:37916,
Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-n3rvmezp,Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-n3rvmezp

0,1
Comm: tcp://127.0.0.1:42551,Total threads: 4
Dashboard: http://127.0.0.1:43606/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:43718,
Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-8hxzl8dw,Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-8hxzl8dw

0,1
Comm: tcp://127.0.0.1:46353,Total threads: 4
Dashboard: http://127.0.0.1:37639/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:40340,
Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-gxo706qd,Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-gxo706qd

0,1
Comm: tcp://127.0.0.1:38552,Total threads: 4
Dashboard: http://127.0.0.1:34491/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:42713,
Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-w48s1g_j,Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-w48s1g_j

0,1
Comm: tcp://127.0.0.1:35597,Total threads: 4
Dashboard: http://127.0.0.1:33854/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:42007,
Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-8vu8ix0g,Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-8vu8ix0g

0,1
Comm: tcp://127.0.0.1:46016,Total threads: 4
Dashboard: http://127.0.0.1:36228/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:36309,
Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-thiwm3jv,Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-thiwm3jv

0,1
Comm: tcp://127.0.0.1:42650,Total threads: 4
Dashboard: http://127.0.0.1:41792/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:37388,
Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-2x6yjdjf,Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-2x6yjdjf

0,1
Comm: tcp://127.0.0.1:44216,Total threads: 4
Dashboard: http://127.0.0.1:37687/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:45416,
Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-io07zw6c,Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-io07zw6c

0,1
Comm: tcp://127.0.0.1:37313,Total threads: 4
Dashboard: http://127.0.0.1:33414/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:33966,
Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-nucfl31t,Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-nucfl31t

0,1
Comm: tcp://127.0.0.1:33874,Total threads: 4
Dashboard: http://127.0.0.1:33712/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:46020,
Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-as3v0gpp,Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-as3v0gpp

0,1
Comm: tcp://127.0.0.1:35719,Total threads: 4
Dashboard: http://127.0.0.1:42271/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:37618,
Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-15t8zwq3,Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-15t8zwq3

0,1
Comm: tcp://127.0.0.1:38145,Total threads: 4
Dashboard: http://127.0.0.1:41316/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:34646,
Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-8z010ctb,Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-8z010ctb

0,1
Comm: tcp://127.0.0.1:39574,Total threads: 4
Dashboard: http://127.0.0.1:33910/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:44027,
Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-eba2g1_z,Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-eba2g1_z

0,1
Comm: tcp://127.0.0.1:36356,Total threads: 4
Dashboard: http://127.0.0.1:43015/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:35802,
Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-jn_kzl0i,Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-jn_kzl0i

0,1
Comm: tcp://127.0.0.1:36734,Total threads: 4
Dashboard: http://127.0.0.1:40397/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:35162,
Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-_fs1n_jb,Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-_fs1n_jb

0,1
Comm: tcp://127.0.0.1:44990,Total threads: 4
Dashboard: http://127.0.0.1:46301/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:42594,
Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-svlte0ld,Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-svlte0ld

0,1
Comm: tcp://127.0.0.1:38254,Total threads: 4
Dashboard: http://127.0.0.1:38901/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:33424,
Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-yixffeq4,Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-yixffeq4

0,1
Comm: tcp://127.0.0.1:34149,Total threads: 4
Dashboard: http://127.0.0.1:45773/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:40247,
Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-v8vfg1x4,Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-v8vfg1x4

0,1
Comm: tcp://127.0.0.1:36738,Total threads: 4
Dashboard: http://127.0.0.1:42163/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:34514,
Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-xyy3my03,Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-xyy3my03

0,1
Comm: tcp://127.0.0.1:33093,Total threads: 4
Dashboard: http://127.0.0.1:36165/status,Memory: 41.98 GiB
Nanny: tcp://127.0.0.1:44934,
Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-2x2fz33b,Local directory: /epyc/ssd/users/atzanida/tmp/dask-scratch-space/worker-2x2fz33b


In [5]:
#client.close()

In [6]:
%%time
# Load ZTF DR14 sources
ztf_sources = lsdb.read_hipscat("/epyc/data3/hipscat/catalogs/ztf_axs/ztf_zource")

CPU times: user 58 s, sys: 20.4 s, total: 1min 18s
Wall time: 59.2 s


In [7]:
%%time
# Load FGK objects...
fgk_object = dask.dataframe.read_parquet("/nvme/users/atzanida/tmp/starhorse_new_april21.parquet").compute()

CPU times: user 13.7 s, sys: 49.5 s, total: 1min 3s
Wall time: 1min 4s


In [8]:
# Select only relevant columns...
final_selected_fgk = fgk_object[['RA_ICRS_StarHorse', 'DE_ICRS_StarHorse', 'ps1_objid_ztf_dr14']]

In [9]:
final_selected_fgk.head(1)

Unnamed: 0_level_0,RA_ICRS_StarHorse,DE_ICRS_StarHorse,ps1_objid_ztf_dr14
_hipscat_index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
12482248704,44.996155,0.005615,108000449961107213


In [10]:
steps_in_dec = np.arange(-26, 90, step=0.5)

In [None]:
for i in tqdm(range(10, len(steps_in_dec)-1)):
    _start, _end = steps_in_dec[i], steps_in_dec[i+1]
    
    print (f"#### Starting {_start}, {_end} ####")
    
    q = (final_selected_fgk['DE_ICRS_StarHorse'] > _start) &\
                (final_selected_fgk['DE_ICRS_StarHorse'] < _end)
    
    final_selected_fgk_q = final_selected_fgk[q]
    
    print (f"Batch sky size: {len(final_selected_fgk_q)}")
    
    if len(final_selected_fgk_q)==0:
        continue
        
    print ("Hipscatting table")
    hips_fgk = lsdb.from_dataframe(final_selected_fgk_q,
                                    ra_column="RA_ICRS_StarHorse", 
                                    dec_column="DE_ICRS_StarHorse",
                                        lowest_order=5,
                                        highest_order=8)

    print ("Joining Sources with Final")
    _sources = hips_fgk.join(
        ztf_sources, left_on="ps1_objid_ztf_dr14", right_on="ps1_objid")
    
    print ("Initializing TAPE ENSANBLE")
    ens = Ensemble(client=client)
    
    # ColumnMapper Establishes which table columns map to timeseries quantities
    colmap = ColumnMapper(
            id_col='_hipscat_index',
            time_col='mjd',
            flux_col='mag',
            err_col='magerr',
            band_col='band',
          )

    ens.from_dask_dataframe(
        source_frame=_sources._ddf,
        object_frame=hips_fgk._ddf,
        column_mapper=colmap,
        sync_tables=False, # Avoid doing an initial sync
        sorted=True, # If the input data is already sorted by the chosen index
        sort=False,
    )
    
    print ("Applying batch")
    batch_cal = ens.batch(
        eval_prelim,
        'mjd_ztf_zource', 'mag_ztf_zource', 
        'magerr_ztf_zource', 'catflags_ztf_zource',
        'band_ztf_zource')
    
    print ("Computing...")
    batch_cal_comp = batch_cal.compute()
    
    print ("Storing...")
    batch_cal_comp.to_parquet(f"/epyc/ssd/users/atzanida/tmp/starH24/StarHorse_Full_Comp_Aprilv{i+2}.parquet",
                      engine='pyarrow')

    del batch_cal_comp






  0%|          | 0/221 [00:00<?, ?it/s]

#### Starting -21.0, -20.5 ####
Batch sky size: 339414
Hipscatting table


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.


Joining Sources with Final




Initializing TAPE ENSANBLE
Applying batch
Using generated label, result_1, for a batch result.
Computing...


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
2024-04-23 16:41:27,110 - distributed.spill - ERROR - Failed to pickle ('read_parquet_file_to_pandas-112a222c332190c53ffa2f928b77ea66', 23357)
Traceback (most recent call last):
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/site-packages/distributed/spill.py", line 326, in __setitem__
    pickled = self.dump(value)
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 669, in serialize_bytelist
    header["compression"], frames = zip(
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 670, in <genexpr>
    *(maybe_compress(frame, compression=compression) for frame in frames)
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/e

Storing...


  0%|          | 1/221 [45:23<166:25:29, 2723.32s/it]

#### Starting -20.5, -20.0 ####
Batch sky size: 398778
Hipscatting table


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.


Joining Sources with Final




Initializing TAPE ENSANBLE
Applying batch
Using generated label, result_1, for a batch result.
Computing...


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
  self.client.close()
  self.client.close()




  1%|          | 2/221 [1:32:18<168:57:31, 2777.40s/it]

Storing...
#### Starting -20.0, -19.5 ####
Batch sky size: 375849
Hipscatting table


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.


Joining Sources with Final




Initializing TAPE ENSANBLE
Applying batch
Using generated label, result_1, for a batch result.
Computing...


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
  self.client.close()
  self.client.close()








  1%|▏         | 3/221 [2:17:20<166:05:13, 2742.72s/it]

Storing...
#### Starting -19.5, -19.0 ####
Batch sky size: 360773
Hipscatting table


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.


Joining Sources with Final




Initializing TAPE ENSANBLE
Applying batch
Using generated label, result_1, for a batch result.
Computing...


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
  self.client.close()
  self.client.close()


2024-04-23 18:59:15,336 - distributed.spill - ERROR - Failed to pickle ('read_parquet_file_to_pandas-112a222c332190c53ffa2f928b77ea66', 41289)
Traceback (most recent call last):
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/site-packages/distributed/spill.py", line 326, in __setitem__
    pickled = self.dump(value)
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 669, in serialize_bytelist
    header["compression"], frames = zip(
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 670, in <genexpr>
    *(maybe_compress(frame, compression=compression) for frame in frames)
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/site-packages/distribute









  2%|▏         | 4/221 [3:03:55<166:35:26, 2763.72s/it]

Storing...
#### Starting -19.0, -18.5 ####
Batch sky size: 273208
Hipscatting table


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.


Joining Sources with Final




Initializing TAPE ENSANBLE
Applying batch
Using generated label, result_1, for a batch result.
Computing...


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
  self.client.close()
  self.client.close()


2024-04-23 19:42:58,863 - distributed.spill - ERROR - Failed to pickle ('read_parquet_file_to_pandas-112a222c332190c53ffa2f928b77ea66', 41289)
Traceback (most recent call last):
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/site-packages/distributed/spill.py", line 326, in __setitem__
    pickled = self.dump(value)
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 669, in serialize_bytelist
    header["compression"], frames = zip(
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 670, in <genexpr>
    *(maybe_compress(frame, compression=compression) for frame in frames)
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/site-packages/distribute





  2%|▏         | 5/221 [3:42:00<155:27:46, 2591.05s/it]

Storing...
#### Starting -18.5, -18.0 ####
Batch sky size: 398492
Hipscatting table


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.


Joining Sources with Final




Initializing TAPE ENSANBLE
Applying batch
Using generated label, result_1, for a batch result.
Computing...


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
  self.client.close()
  self.client.close()




2024-04-23 20:24:13,422 - distributed.spill - ERROR - Failed to pickle ('read_parquet_file_to_pandas-112a222c332190c53ffa2f928b77ea66', 23357)
Traceback (most recent call last):
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/site-packages/distributed/spill.py", line 326, in __setitem__
    pickled = self.dump(value)
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 669, in serialize_bytelist
    header["compression"], frames = zip(
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 670, in <genexpr>
    *(maybe_compress(frame, compression=compression) for frame in frames)
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/site-packages/distribute







  3%|▎         | 6/221 [4:27:31<157:34:49, 2638.56s/it]

Storing...
#### Starting -18.0, -17.5 ####
Batch sky size: 386888
Hipscatting table


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.


Joining Sources with Final




Initializing TAPE ENSANBLE
Applying batch
Using generated label, result_1, for a batch result.
Computing...


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
  self.client.close()
  self.client.close()








  3%|▎         | 7/221 [5:08:59<153:55:09, 2589.30s/it]

Storing...
#### Starting -17.5, -17.0 ####
Batch sky size: 293031
Hipscatting table


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.


Joining Sources with Final




Initializing TAPE ENSANBLE
Applying batch
Using generated label, result_1, for a batch result.
Computing...


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
  self.client.close()
  self.client.close()








  4%|▎         | 8/221 [5:45:33<145:45:09, 2463.42s/it]

Storing...
#### Starting -17.0, -16.5 ####
Batch sky size: 341384
Hipscatting table


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.


Joining Sources with Final




Initializing TAPE ENSANBLE
Applying batch
Using generated label, result_1, for a batch result.
Computing...


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
  self.client.close()
  self.client.close()










  4%|▍         | 9/221 [6:24:00<142:11:00, 2414.44s/it]

Storing...
#### Starting -16.5, -16.0 ####
Batch sky size: 381778
Hipscatting table


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.


Joining Sources with Final




Initializing TAPE ENSANBLE
Applying batch
Using generated label, result_1, for a batch result.
Computing...


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


  self.client.close()
  self.client.close()


2024-04-23 23:06:36,449 - distributed.spill - ERROR - Failed to pickle ('read_parquet_file_to_pandas-112a222c332190c53ffa2f928b77ea66', 41300)
Traceback (most recent call last):
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/site-packages/distributed/spill.py", line 326, in __setitem__
    pickled = self.dump(value)
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 669, in serialize_bytelist
    header["compression"], frames = zip(
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 670, in <genexpr>
    *(maybe_compress(frame, compression=compression) for frame in frames)
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/site-packages/distribute





  5%|▍         | 10/221 [7:04:12<141:28:11, 2413.70s/it]

Storing...
#### Starting -16.0, -15.5 ####
Batch sky size: 399609
Hipscatting table


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.


Joining Sources with Final




Initializing TAPE ENSANBLE
Applying batch
Using generated label, result_1, for a batch result.
Computing...


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
  self.client.close()
  self.client.close()






2024-04-23 23:48:08,106 - distributed.spill - ERROR - Failed to pickle ('read_parquet_file_to_pandas-112a222c332190c53ffa2f928b77ea66', 24533)
Traceback (most recent call last):
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/site-packages/distributed/spill.py", line 326, in __setitem__
    pickled = self.dump(value)
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 669, in serialize_bytelist
    header["compression"], frames = zip(
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 670, in <genexpr>
    *(maybe_compress(frame, compression=compression) for frame in frames)
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/site-packages/distribute





  5%|▍         | 11/221 [7:48:37<145:17:36, 2490.75s/it]

Storing...
#### Starting -15.5, -15.0 ####
Batch sky size: 235810
Hipscatting table


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.


Joining Sources with Final




Initializing TAPE ENSANBLE
Applying batch
Using generated label, result_1, for a batch result.
Computing...


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
  self.client.close()
  self.client.close()


2024-04-24 00:25:13,678 - distributed.spill - ERROR - Failed to pickle ('read_parquet_file_to_pandas-112a222c332190c53ffa2f928b77ea66', 23482)
Traceback (most recent call last):
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/site-packages/distributed/spill.py", line 326, in __setitem__
    pickled = self.dump(value)
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 669, in serialize_bytelist
    header["compression"], frames = zip(
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 670, in <genexpr>
    *(maybe_compress(frame, compression=compression) for frame in frames)
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/contextlib.py", line 79, in inner
    return func(*args, **kwds)
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/site-packages/distribute





  5%|▌         | 12/221 [8:22:45<136:46:38, 2355.98s/it]

Storing...
#### Starting -15.0, -14.5 ####
Batch sky size: 395132
Hipscatting table


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.


Joining Sources with Final




Initializing TAPE ENSANBLE
Applying batch
Using generated label, result_1, for a batch result.
Computing...


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
2024-04-24 01:03:00,647 - tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7fa5231e1ba0>>, <Task finished name='Task-9037768' coro=<SpecCluster._correct_state_internal() done, defined at /epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/site-packages/distributed/deploy/spec.py:346> exception=TimeoutError()>)
Traceback (most recent call last):
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/asyncio/tasks.py", line 456, in wait_for
    return fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/epyc/users/atzanida/anaconda3/envs/lsdb_demo_true/lib/python3.10/site-packages/tornado/ioloop.py", line 738, in _run_callback
    ret = callb