In [1]:
import glob
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import blimpy as bl
from scipy.stats import skew, kurtosis
from scipy import stats
from scipy.interpolate import interp1d
from astropy.stats import sigma_clip
import setigen as stg
from astropy import units as u
import h5py
import psutil
from numba import jit
import multiprocessing
import logging
from astropy.timeseries import LombScargle
%matplotlib inline

process = psutil.Process()

print(process.memory_info().rss)

271171584


In [2]:
# Carmen's data

csv_file = '/home/cgchoza/galaxies/complete_cadences_catalog.csv'

df = pd.read_csv(csv_file)

# Personal notes:
# The distribution of these cadences isn't necessarily intuitive;
# there are 459 of them total, but more than 459*6 files
# because some are spliced and some not,
# so some cadences have 6*7 or 6*8 files, 6 for each node,
# etc.

In [3]:
# grab only L-band spectra

dfl = df.iloc[np.where(df['Band'].values == 'L')[0]]

del df

In [4]:
dfl

Unnamed: 0,Target,Session,Band,Cadence ID,Frequency,.h5 path,.dat path,Time
0,DDO210,AGBT18A_999_103,L,24777,2251,/datag/pipeline/AGBT18A_999_103/collate/splice...,/home/obs/turboseti/AGBT18A_999_103/collate/sp...,2018-07-07 08:49:26
1,DDO210,AGBT18A_999_103,L,24777,2251,/datag/pipeline/AGBT18A_999_103/collate/splice...,/home/obs/turboseti/AGBT18A_999_103/collate/sp...,2018-07-07 08:49:26
2,DDO210,AGBT18A_999_103,L,24777,2251,/datag/pipeline/AGBT18A_999_103/collate/splice...,/home/obs/turboseti/AGBT18A_999_103/collate/sp...,2018-07-07 08:49:26
3,DDO210,AGBT18A_999_103,L,24777,2251,/datag/pipeline/AGBT18A_999_103/collate/splice...,/home/obs/turboseti/AGBT18A_999_103/collate/sp...,2018-07-07 08:49:26
4,DDO210,AGBT18A_999_103,L,24777,2251,/datag/pipeline/AGBT18A_999_103/collate/splice...,/home/obs/turboseti/AGBT18A_999_103/collate/sp...,2018-07-07 08:49:26
...,...,...,...,...,...,...,...,...
36553,NGC3226,AGBT22B_999_25,L,411390,1126,/datag/pipeline/AGBT22B_999_25/blc16_blp06/blc...,/home/obs/turboseti/AGBT22B_999_25/blc16_blp06...,2022-11-19 06:13:36
36554,NGC3226,AGBT22B_999_25,L,411390,1126,/datag/pipeline/AGBT22B_999_25/blc16_blp06/blc...,/home/obs/turboseti/AGBT22B_999_25/blc16_blp06...,2022-11-19 06:13:36
36555,NGC3226,AGBT22B_999_25,L,411390,1126,/datag/pipeline/AGBT22B_999_25/blc16_blp06/blc...,/home/obs/turboseti/AGBT22B_999_25/blc16_blp06...,2022-11-19 06:13:36
36556,NGC3226,AGBT22B_999_25,L,411390,1126,/datag/pipeline/AGBT22B_999_25/blc16_blp06/blc...,/home/obs/turboseti/AGBT22B_999_25/blc16_blp06...,2022-11-19 06:13:36


In [4]:
#@jit
def make_snippets(input_arr, freqs_list, half_window):
    snippet_array = [input_arr[:,:,ctrfreq-half_window:ctrfreq+half_window] for ctrfreq in freqs_list]
    return snippet_array

# helper function from B. Brzycki to calculate signal bandwidths

def threshold_baseline_bounds(spec, p=0.01):
    """
    Create bounds based on integrated intensity on either side of the central
    peak, as a fraction of the peak integrated intensity. Uses a 1D fit to the
    noise baseline.
    
    Parameters
    ----------
    spec : ndarray
        Intensity spectra
    p : float, optional
        Fraction of peak, used to set left and right bounds
        
    Returns
    -------
    l : int
        Left bound
    r : int
        Right bound
    metadata : dict
        Dictionary with metadata. Contains noise mean and spectra maximum,
        which are used to normalize spec to the spectra maximum.
    """
    noise_spec = sigma_clip(spec, masked=True)
    x = np.arange(len(spec))

    coeffs = np.polyfit(x[~noise_spec.mask], noise_spec[~noise_spec.mask], 1)
    poly = np.poly1d(coeffs)
    
    spec = spec - poly(x)
    norm_spec = (spec ) / (np.max(spec) )
    
    cutoffs = np.where(norm_spec < p)[0]
    
    peak = np.argmax(norm_spec)
    i = max([0, np.digitize(peak, cutoffs) - 1]) # my edit from Bryan's function
                                                 # failsafe when hit is too close to left edge of dedrifted frame
    #print(i, i+1, len(cutoffs), len(spec), cutoffs[-1])
    l = cutoffs[i] + 1
    if i+1 < len(cutoffs):  # my edit from Bryan's function, intended as a failsafe when bandwidths are too large
        r = cutoffs[i + 1]
    else:
        r = sigbw_wl_in_idxs
    
    metadata = {
        'noise_mean': np.mean(noise_spec),
        'spec_max': np.max(spec)
    }
    return l, r, metadata

### parameter pulling function

def get_parameters(inp_dat):

    input_arr = inp_dat[0]
    ctrfreq = inp_dat[1]
    drift = inp_dat[2]

    f_len = input_arr.shape[1]
    # power_spectrum = np.average(input_arr[:, f_len//2-caleb_wl_in_idxs//2:f_len//2+caleb_wl_in_idxs//2], axis=0)
    # power_spectrum = (power_spectrum-np.median(power_spectrum)) # subtract
    # power_spectrum = power_spectrum / power_spectrum.max()      # and divide
    # sk = skew(power_spectrum)
    # ku = kurtosis(power_spectrum, fisher=False)
    # sarle = (sk**2 + 1) / ku
    # fstd = np.std(power_spectrum)
    
    # time_series = np.average(input_arr, axis=1)
    # normalized_time_series = (time_series-np.median(time_series))/np.max(time_series-np.median(time_series))
    # tsk = skew(normalized_time_series)
    # tstd = np.std(normalized_time_series)

    # # calculate spectral kurtosis for varying spectral window size

    # window_lengths = np.logspace(np.log10(0.0002), np.log10(0.1000), 50) # from 2 kHz to 100 kHz
    # mini_kurts = []

    # full_power_spectrum = np.average(input_arr, axis=0)

    # for wl in window_lengths:
    #     wl_in_idxs = round(wl/np.abs(foff))
    #     #print(wl_in_idxs//2)
    #     ps_slice = full_power_spectrum[f_len//2-wl_in_idxs//2:f_len//2+wl_in_idxs//2]
    #     ps_slice = ps_slice / ps_slice.max()
    #     mk = kurtosis(ps_slice, fisher=False)
    #     mini_kurts.append(mk)

    # if (np.argmax(mini_kurts) == len(mini_kurts) - 1) or (np.argmax(mini_kurts) == 0):
    #     tbw = window_lengths[np.argmax(mini_kurts)]
    # else:
    #     idx_max = np.argmax(mini_kurts)
    #     xxx = np.log10(1e6*np.array(window_lengths))
    #     x_m = xxx[idx_max-1:idx_max+2]
    #     y_m = mini_kurts[idx_max-1:idx_max+2]

    #     # linear algebra to find best-fit parabola
    #     denom = (x_m[0] - x_m[1])*(x_m[0] - x_m[2])*(x_m[1] - x_m[2])
    #     a = (x_m[2] * (y_m[1] - y_m[0]) + x_m[1] * (y_m[0] - y_m[2]) + x_m[0] * (y_m[2] - y_m[1])) / denom
    #     b = (x_m[2]**2 * (y_m[0] - y_m[1]) + x_m[1]**2 * (y_m[2] - y_m[0]) + x_m[0]**2 * (y_m[1] - y_m[2])) / denom
    #     c = (x_m[1] * x_m[2] * (x_m[1] - x_m[2]) * y_m[0] + x_m[2] * x_m[0] * (x_m[2] - x_m[0]) * y_m[1] + x_m[0] * x_m[1] * (x_m[0] - x_m[1]) * y_m[2]) / denom

    #     # vertex coords
    #     x0 = -b / (2 * a)
    #     #y0 = a * x0**2 + b * x0 + c

    #     tbw = 10**x0 / 1e6
        
    # subset_kurtoses = np.array(mini_kurts)[np.where(window_lengths <= caleb_wl)[0]]
    # subset_windows = window_lengths[np.where(window_lengths <= caleb_wl)[0]]

    # corr = stats.pearsonr(np.log10(subset_windows), subset_kurtoses)[0]

    # sigbw_calc_ds = input_arr[:,f_len//2-sigbw_wl_in_idxs//2:f_len//2+sigbw_wl_in_idxs//2]
        
    # frame = stg.Frame.from_data(df=np.abs(foff)*u.MHz,
    #                         dt=18.253611008*u.s,
    #                         fch1=(ctrfreq-sigbw_calc_wl/2)*u.MHz,
    #                         ascending=True, # fch1 is a minimum
    #                         data=sigbw_calc_ds)
    
    # dd_fr = stg.dedrift(frame, -drift) # not sure why the negative is needed, but it works for me

    # spec = stg.integrate(dd_fr)

    # l, r, _ = threshold_baseline_bounds(spec)

    # sigbw = (float(r)-float(l))*np.abs(foff)

    ### calculate Lomb-Scargle power ratio

    ls_snippet = input_arr[:, f_len//2-ls_wl_in_idxs//2:f_len//2+ls_wl_in_idxs//2]
    spec = np.log10(np.average(ls_snippet, axis=0)**2)
    norm_spec = (spec-np.average(spec))/np.max(spec-np.average(spec))

    #freqs = np.arange(ctrfreq-0.005, ctrfreq+0.005, foff)
    freqs = np.linspace(ctrfreq-0.005, ctrfreq+0.005, len(norm_spec))

    f = freqs
    p = norm_spec

    ls_freqs, ls_power = LombScargle(f, p, normalization='standard').autopower()
    
    redness = np.average(ls_power[:len(ls_power)//10])/np.average(ls_power[len(ls_power)//10:len(ls_power)//5])

    #0: frequency [MHz]
    #1: drift rate [Hz/s]
    #2: SNR
    #3: spectral skewness
    #4: spectral kurtosis
    #5: Sarle's coefficient
    #6: correlation coefficient [kurtosis vs. log(bandwidth)]
    #7: turning-point bandwidth [Hz]
    #8: temporal skewness
    #9: time-series standard deviation
    #10: power-spectrum standard deviation
    #11: signal bandwidth
    #12: redness of periodogram

    #if len(np.array([sk, ku, sarle, corr, tbw, tsk, tstd, fstd, sigbw])) != 9:
    #    print(f'Error! Not all parameters calculated. Check hit at freq = {ctrfreq}.')

    #return np.array([sk, ku, sarle, corr, tbw, tsk, tstd, fstd, sigbw, redness])
    return redness

In [5]:
logger = logging.getLogger(__name__)

print(process.memory_info().rss)

caleb_wl = 0.002700 # 2.7 kHz
sigbw_calc_wl = 0.005000 # 5 kHz
full_wl = 0.100000 # 100 kHz
ls_wl = 0.010000 # 10 kHz for LS periodogram

foff = -2.7939677238464355e-06

caleb_wl_in_idxs = round(caleb_wl/np.abs(foff))
sigbw_wl_in_idxs = round(sigbw_calc_wl/np.abs(foff))
full_wl_in_idxs = round(full_wl/np.abs(foff))
ls_wl_in_idxs = round(ls_wl/np.abs(foff))

def process_files(xxx):

    code = xxx[0]
    df0 = xxx[1]

    # Remove all handlers associated with the root logger object.
    for handler in logging.root.handlers[:]:
        logging.root.removeHandler(handler)

    #logging.basicConfig(filename=f'/datax/scratch/benjb/C23_L_M81_injections/{code}_dats.log', filemode="a", level=logging.DEBUG)
    logging.basicConfig(filename=f'/datax/scratch/benjb/{code}_rednesses.log', filemode="a", level=logging.DEBUG)

    param_array = []
    dat_array = np.empty((0, 2), dtype='object')

    for iii in range(len(df0)):

        h5_path = df0['.h5 path'].values[iii]
        dat_path = df0['.dat path'].values[iii]

        logger.info(f'File {iii+1} out of {len(df0)}')
        logger.info(h5_path)
        logger.info(dat_path)

        bytes_available = psutil.virtual_memory()[1]
        if bytes_available <= 32e9:
            logger.info(f'Memory dangerously low: {bytes_available} bytes remaining. Breaking ...')
            break

        f = h5py.File(df0['.h5 path'].values[iii], 'r')
        dset = f['data']

        fch1 = dset.attrs['fch1']
        foff = dset.attrs['foff']

        freqs = np.linspace(fch1, fch1+foff*dset.shape[2], dset.shape[2])

        df_dat = pd.read_table(dat_path, sep='\s+', names=['Top_Hit_#','Drift_Rate','SNR','Uncorrected_Frequency','Corrected_Frequency',
                                                    'Index', 'freq_start', 'freq_end', 'SEFD', 'SEFD_freq', 'Coarse_Channel_Number', 
                                                    'Full_number_of_hits'], skiprows=9)
            
        ctrfreqs = df_dat['Uncorrected_Frequency'].values
        drifts = df_dat['Drift_Rate'].values
        snrs = df_dat['SNR'].values

        if len(ctrfreqs) == 0:
            logger.info('No hits in this file.')
            continue

        ctrfreqs_in_idxs0 = (ctrfreqs - fch1)/foff

        logger.info(f'Left bound of non-chop range: {full_wl_in_idxs//2}')
        logger.info(f'Right bound of non-chop range: {dset.shape[2]-full_wl_in_idxs//2}')

        ctrfreqs_in_idxs = ctrfreqs_in_idxs0[np.where((
            ctrfreqs_in_idxs0 > full_wl_in_idxs//2) & (ctrfreqs_in_idxs0 < dset.shape[2]-full_wl_in_idxs//2))[0]]
        
        if len(ctrfreqs_in_idxs0) != len(ctrfreqs_in_idxs):
            logger.info(f'{len(ctrfreqs_in_idxs0)-len(ctrfreqs_in_idxs)} hit(s) removed for being too close to edge.')
            logger.info(f'Len ctrfreqs = {len(ctrfreqs)}.')
            ctrfreqs = ctrfreqs[np.where((
                ctrfreqs_in_idxs0 > full_wl_in_idxs//2) & (ctrfreqs_in_idxs0 < dset.shape[2]-full_wl_in_idxs//2))[0]]
            drifts = drifts[np.where((
                ctrfreqs_in_idxs0 > full_wl_in_idxs//2) & (ctrfreqs_in_idxs0 < dset.shape[2]-full_wl_in_idxs//2))[0]]
            snrs = snrs[np.where((
                ctrfreqs_in_idxs0 > full_wl_in_idxs//2) & (ctrfreqs_in_idxs0 < dset.shape[2]-full_wl_in_idxs//2))[0]]
            logger.info(f'Len ctrfreqs after chopping = {len(ctrfreqs)}.')
        
        snippets = make_snippets(dset, ctrfreqs_in_idxs.astype(int), full_wl_in_idxs//2)

        #logger.info([snippet.shape for snippet in snippets])

        snippets = np.squeeze(np.array(snippets), axis=2)

        #logger.info([snippet.shape for snippet in snippets])

        inputs = [[snippets[i], ctrfreqs[i], drifts[i]] for i in range(len(ctrfreqs_in_idxs))]
        vecs = map(get_parameters, inputs)
        params = np.array(list(vecs))
        #sigbw_vec = np.array(list(vecs))

        #logger.info(params.shape)

        dat_path_vec = np.full(len(ctrfreqs), dat_path, dtype='object')

        mini_dat_arr = np.transpose(np.array([ctrfreqs, dat_path_vec]))
        #logger.info(mini_dat_arr.shape)
        #logger.info(mini_dat_arr[0])
        #logger.info(mini_dat_arr[1])

        param_array.append(params)
        dat_array = np.concatenate((dat_array, mini_dat_arr))
        #triad_array = np.concatenate((triad_array, np.transpose(np.array([ctrfreqs, drifts, snrs]))))
        #sigbw_array = np.concatenate((sigbw_array, np.transpose(np.array([ctrfreqs, sigbw_vec]))))

        #logger.info(process.memory_info().rss)

    #np.save(f'/datax/scratch/benjb/C23_L_M81_injections/{code}_all_dat_paths.npy', dat_array)
    #np.save(f'/datax/scratch/benjb/C23_L_M81_injections/{code}_all_params.npy', param_array)
    np.save(f'/datax/scratch/benjb/{code}_rednesses.npy', param_array)

299728896


In [126]:
#idx = [343, 909+793, 909*2+464]
#idx = [0, 909]

#dfl.iloc[idx]

#process_files(['test_batch', dfl.iloc[idx]])

626 627 1238 1529 1528
713 714 1421 1617 1616
811 812 1448 1594 1593
618 619 1270 1782 1781
801 802 1542 1705 1704
882 883 1715 1738 1737
856 857 1588 1660 1659
646 647 1208 1709 1708
473 474 1065 1763 1762
640 641 1368 1782 1781
669 670 1235 1723 1722
622 623 1300 1779 1778
770 771 1229 1788 1787
556 557 1107 1785 1784
681 682 1327 1782 1781
647 648 1329 1782 1779
551 552 1107 1785 1783
456 457 1227 1788 1787
677 678 1299 1779 1778
565 566 1235 1723 1722
729 730 1369 1782 1781
591 592 1064 1763 1762
561 562 1209 1709 1705
733 734 1588 1660 1659
834 835 1715 1738 1737
741 742 1542 1705 1704
651 652 1270 1782 1781
637 638 1448 1594 1593
708 709 1421 1617 1616
612 613 1238 1529 1528
825 826 1640 1789 1788
798 799 1618 1775 1774
545 546 1102 1780 1779
756 757 1513 1784 1769
843 844 1681 1784 1783
888 889 1778 1789 1788
472 473 954 1789 1788
504 505 992 1789 1788
493 494 993 1789 1786
494 495 966 1789 1787
502 503 1015 1789 1787
507 508 1001 1789 1788
498 499 992 1789 1788
505 506 996 1789

In [127]:
#tb2 = np.load('/datax/scratch/benjb/test_batch_sigbws.npy', allow_pickle=True)
#plt.hist(tb2[:,1]*1e6, bins=50)
#plt.yscale('log')
#plt.show()

In [87]:
#ff = tb2[:2136,0]
#bw = tb2[:2136,1]
#
#fff = ff[np.where(bw < 0)[0]]
#bbb = bw[np.where(bw < 0)[0]]
#ddd = drifts[np.where(bw < 0)[0]]
#print(len(fff))

11


In [8]:
#dat_path = dfl['.dat path'].values[idx[1]]
#print(dat_path)

/home/obs/turboseti/AGBT21A_996_47/blc01/blc01_guppi_59405_77256_MESSIER87_0128.rawspec.0000/blc01_guppi_59405_77256_MESSIER87_0128.rawspec.0000.dat


In [11]:
#dats = glob.glob('/datax/scratch/benjb/injections/*_inserted.dat')
#h5s = glob.glob('/datax/scratch/benjb/injections/*_inserted.h5')
#
#df0 = pd.DataFrame(np.transpose([dats, h5s]), columns=['.dat path', '.h5 path'])

In [6]:
#dats = np.sort(glob.glob('/datax/scratch/benjb/C23_L_M81_injections/*.dat'))
#h5s = np.sort(glob.glob('/datax/scratch/benjb/C23_L_M81_injections/*.h5'))

#for i in range(12):
#    print(dats[i])
#    print(h5s[i])
    
#df0 = pd.DataFrame(np.transpose([dats, h5s]), columns=['.dat path', '.h5 path'])

In [7]:
#df0

Unnamed: 0,.dat path,.h5 path
0,/datax/scratch/benjb/C23_L_M81_injections/spli...,/datax/scratch/benjb/C23_L_M81_injections/spli...


In [8]:
#process_files(['M81_retrieval', df0])

In [18]:
if __name__ == "__main__":
    # Define n sets of files
    batch_size = len(dfl)//6
    file_sets = [
        ['batch_1', dfl.iloc[0:1*batch_size]],
        ['batch_2', dfl.iloc[1*batch_size:2*batch_size]],
        ['batch_3', dfl.iloc[2*batch_size:3*batch_size]],
        ['batch_4', dfl.iloc[3*batch_size:4*batch_size]],
        ['batch_5', dfl.iloc[4*batch_size:5*batch_size]],
        ['batch_6', dfl.iloc[5*batch_size:]]
    ]

    # Create a pool of n processes
    with multiprocessing.Pool(processes=6) as pool:
        # Map the process_files function to each file set
        pool.map(process_files, file_sets)

    print("All files processed.")

All files processed.
