## Molecular Detection Pipeline for Cold Core Data

This routine takes ASCII files and a Splatalogue txt file to produce Gaussian fits and classification for each line above 3 sigmas of noise. The scatter (or sigma/RMS) of noise is assigned based on 8 different boxes of frequencies. It also produces an Excel sheet with all the molecular information necessary to determine detection results and saves PNGs for each fit.

*need to download a splatalogue file of all the energetically favorable lines that you're interested in

In [None]:
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D
import numpy as np
import pyspeckit
from pyspeckit.spectrum.parinfo import ParinfoList, Parinfo
from astropy.io import fits
from scipy.signal import find_peaks
import pandas as pd
import os
import math
%matplotlib notebook

In [None]:
## CHANGE THESE VALUES

# load ASCII/txt files with spectral data
core_num = '746'  # set core number/ID
file_name = 'PERS746-all-r-a'  # enter file name here
data = np.loadtxt(f"{file_name}.spec")

# enter frequency range (MHz)
beg_freq = 30000
end_freq = 50000

# enter any bad channel ranges after manually inspecting data (MHz)
mask_ranges = [(45739.00, 45759.00)]

# enter expected velocity range of molecular line center (in km/s)
v_low = 6.0
v_up = 10.0

# enter your chosen minimum FWHM/linewidth (km/s)
FWHM_min = 0.28 

In [None]:
# relabel x (frequency) data into an array
xarr = data[:,0]
yarr = data[:,1]

# plot spectral data (frequency space) 
fig=plt.figure(figsize=(8,6))
plt.title(f"PERS{core_num} Frequency vs. Intensity")
plt.xlabel('Frequency [MHz]')
plt.ylabel('Intensity T$_{mb}$ [K]')
plt.ylim([-10,10])
plt.step(xarr, yarr, color='b', label=f'core {core_num}')
plt.legend(fontsize=16)
plt.show()

In [None]:
# create "boxes" to define RMS baseline
def calculate_baseline_and_detect_lines(xarr, yarr, beg_freq, end_freq, num_boxes=8, sigma_threshold=3, mask_ranges=None):
    box_size = (end_freq - beg_freq) / num_boxes
    box_size = (end_freq - beg_freq) / num_boxes

    scatter_list = []
    lines = []
    low_boxes = []
    up_boxes = []


    for i in range(num_boxes):
        low_box = beg_freq + i * box_size
        up_box = beg_freq + (i + 1) * box_size
        low_boxes.append(low_box)
        up_boxes.append(up_box)

        box_data = yarr[(xarr >= low_box) & (xarr <= up_box)]

        # apply user-defined mask ranges before calculating scatter
        if mask_ranges:
            mask = np.ones_like(box_data, dtype=bool)
            for (lower, upper) in mask_ranges:
                mask &= ~((xarr[(xarr >= low_box) & (xarr <= up_box)] >= lower) & (xarr[(xarr >= low_box) & (xarr <= up_box)] <= upper))
            box_data = box_data[mask]

        # calculating scatter
        median_val = np.nanmedian(box_data)
        mad = np.nanmedian(np.abs(box_data - median_val))
        box_data = box_data[box_data < median_val * mad]  # exclude high values

        scatter = np.nanstd(box_data)
        scatter_list.append(scatter)

        start = np.searchsorted(xarr, low_box)
        end = np.searchsorted(xarr, up_box)
        
        # if calculated scatter/RMS values look too high, use manually calculated RMS values:
        #scatter_list = [0.00266051130910693, 0.00296250878471746, 0.002984108161473998, 0.0034771669622014776, 0.003321972228537444, 0.005681522691149946, 0.006064507543807183, 0.007319156110294874]
        scatter = scatter_list[i]
        
        box_lines = [xarr[j] for j in range(start, end) if yarr[j] >= sigma_threshold * scatter]

        # mask user-defined bad channels for the final line detection
        if mask_ranges:
            for (lower, upper) in mask_ranges:
                box_lines = [x for x in box_lines if x < lower or x > upper]

        box_lines = [item for item in box_lines if not (isinstance(item, float) and np.isnan(item))]
        lines.extend(box_lines)

    return scatter_list, lines, low_boxes, up_boxes

# shows RMS values, 3sigma lines, and amount of 3sigma lines
scatter_list, lines, low_boxes, up_boxes = calculate_baseline_and_detect_lines(xarr, yarr, beg_freq, end_freq, mask_ranges=mask_ranges)

# remove lines that have value np.nan
lines = [item for item in lines if item >= 30000 and not np.isnan(item)]

print(scatter_list)
print(lines)
print(len(lines))

In [None]:
# load splatalogue csv/xlsx file
df = pd.read_csv('splat6.csv', delimiter = ':')

# remove last row (null row)
df.drop(df.tail(1).index, inplace = True)

# remove rows with any NaN values
df.dropna(axis=0, how='any', inplace=True)

# these are now panda series, or 1D arrays # convert series into lists
mol_linename = df["Chemical Name"].tolist()
transition = df["Resolved QNs"]
rf = df["Frequency in MHz (Err) (rest frame, redshifted)"] 
mf = df["Meas Frequency in MHz (Err) (rest frame, redshifted)"] 
sym = df["Species"]
A = df["aij"].tolist() #Einstein A 
E_up = df["E_U (K)"].tolist() #upper energy limit
g = df["Upper State Degeneracy"].tolist() #upper state degeneracy

# merge rf and mf into one column
rf_merged = pd.Series(np.where(rf == ' ', mf, rf))

# measured frequency column into list
mf = df["Meas Frequency in MHz (Err) (rest frame, redshifted)"].tolist()

# removes parentheses from rf_merged and creates new list with error margins
err = []
for index, item in enumerate(rf_merged):
    try:
        parentheses = item.split("(")[1].split(")")[0]
        err.append(parentheses)
        rf_merged[index] = item.split("(")[0].strip()
        
    except IndexError:
        err.append('0')
    
# remove extra numbers in some items in list (rf_merged)
rf_clean = [item.split(',')[0] for item in rf_merged]

# converting rf and error from str to float
rf_str = []
for i in range(0,len(rf_clean)):
    rf_str.append(float(rf_clean[i]))
    
# clean html tags from molecule name
def clean_html_tags(text):
    
    # convert to string if the input is not already a string
    if not isinstance(text, str):
        text = str(text)
                   
    # remove <sub>, </sub>, and <sup tags
    text = text.replace('<sub>', '').replace('</sub>','').replace('<sup>','').replace('</sup>','')
    text = text.replace('<i>','').replace('</i>','').replace('<font color ="red">','')
    text = text.replace('<font color="red">','').replace('</font>','').replace('<font color=red>','')
    text = text.replace('3&Sigma;','').replace('2&Sigma;&mu;','').replace('&','').replace(';','')
    text = text.replace('<font color=>','').replace('red','').replace('<font color=','')
    
    return text

def clean_list_sym(data_list):
    sym_clean = []
    for item in data_list:
        cleaned_item = clean_html_tags(item)
        sym_clean.append(cleaned_item)
        
    return sym_clean

def clean_list_tr(data_list):
    transition_clean = []
    for item in data_list:
        cleaned_item = clean_html_tags(item)
        transition_clean.append(cleaned_item)
        
    return transition_clean


# clean symbol list
sym_clean = clean_list_sym(sym)

# clean transition list
transition_clean = clean_list_tr(transition)

# replace NaN values from transition etc. lists with 0.0001
def replace_nan_values(data_list, replacement_value=0.0001):

    clean_list_nan = []
    for item in data_list:
        if (isinstance(item, float) and math.isnan(item)) or \
           (isinstance(item, np.float64) and np.isnan(item)) or \
           (isinstance(item, pd._libs.tslibs.nattype.NaTType)) or \
           (item is None):
            clean_list_nan.append(replacement_value)
        else:
            clean_list_nan.append(item)
    return clean_list_nan

transition_clean_nan = replace_nan_values(transition_clean)

# define functions that compile Splatalogue information with matched data lines
def splat_details(line_data):
    index = rf_str.index(line_data)
    mol_name = mol_linename[index]
    trans = transition_clean[index]
    symbol = sym_clean[index]
    energy_up = E_up[index]
    A_ij = A[index]
    g_u = g[index]
    
    line_details = [line_data, mol_name, trans, symbol, energy_up, A_ij, g_u]
    return line_details

# function that compares peak values to Splatalogue rest frequencies
def match_lines1(value, splat_lst):
    matches = []
    for item in splat_lst:
        lower_bound = (item) - 1.5
        upper_bound = (item) + 1.5
        if lower_bound <= value <= upper_bound:
            #print(f"{value} is within 1.5 MHz of {item} in the splatalogue.")
            matches.append(item)
    return matches


# new list that stores all the match details
matches_details = []

# match peak frequencies from Yebes data with calculated rest frequencies from splatalogue
for line in lines:
    matches = match_lines1(line, rf_str)
    for match in matches:
        splat_details_lst = splat_details(match)
        splat_details_lst.insert(0, line)
        matches_details.append(splat_details_lst)


# create df from the matches details
columns = ['Line','Rest Frequency', 'Molecule', 'Transition', 'Symbol', 'Upper Energy', 'A', 'Upper State Degeneracy']
matches_df = pd.DataFrame(matches_details, columns=columns)

# save df to a CSV file
# matches_df.to_csv('matches_details.csv', index=False)

line_lst = matches_df["Line"].tolist()
rf_matched = matches_df["Rest Frequency"].tolist()
mol_linename_lst = matches_df["Molecule"].tolist()
transition_lst = matches_df["Transition"].tolist()
symbol_lst = matches_df["Symbol"].tolist()
A_lst = matches_df["A"].tolist()
E_up_lst = matches_df["Upper Energy"].tolist()
g_u_lst = matches_df["Upper State Degeneracy"].tolist()

print(matches_df)

In [None]:
# define frequency ranges and corresponding scatter, 3sigma, and 5sigma values
scatter_ranges = {
        (low_boxes[0], up_boxes[0]): (scatter_list[0], 3*scatter_list[0], 5*scatter_list[0]),
        (low_boxes[1], up_boxes[1]): (scatter_list[1], 3*scatter_list[1], 5*scatter_list[1]),
        (low_boxes[2], up_boxes[2]): (scatter_list[2], 3*scatter_list[2], 5*scatter_list[2]),
        (low_boxes[3], up_boxes[3]): (scatter_list[3], 3*scatter_list[3], 5*scatter_list[3]),
        (low_boxes[4], up_boxes[4]): (scatter_list[4], 3*scatter_list[4], 5*scatter_list[4]),
        (low_boxes[5], up_boxes[5]): (scatter_list[5], 3*scatter_list[5], 5*scatter_list[5]),
        (low_boxes[6], up_boxes[6]): (scatter_list[6], 3*scatter_list[6], 5*scatter_list[6]),
        (low_boxes[7], up_boxes[7]): (scatter_list[7], 3*scatter_list[7], 5*scatter_list[7])
    }

def assign_values(peak_frq, intensity):
    for (low, high), (scatter, three_sigma, five_sigma) in scatter_ranges.items():
        if low <= peak_frq < high:
            if intensity >= five_sigma:
                compiled = ["5 sigma"]
            elif intensity >= three_sigma:
                compiled = ["3 sigma"]
            else:
                compiled = [None]  
            return scatter, three_sigma, five_sigma, compiled
    return None, None, None, [None]  

In [None]:
# defines function that searches for 3sigma lines & Gaussian fits them
parameters_dict = {
    'molecule name': [],
    'species symbol': [],
    'line [MHz]': [],
    'rest frequency [MHz]': [],
    'transition': [],
    'amplitude [mK]': [],
    'amplitude ERROR': [],
    'shift [km/s]': [],
    'shift ERROR': [],
    'width [km/s]': [],
    'width ERROR': [],
    'peak [mK]': [],
    'rms calculated [mK]': [],
    'upper energy limit [K]':[],
    'log10 A(ij)': [],
    'upper state degeneracy': [],
    'sigma detection level': []}

def search_lines(x_frq, y, smoothnum, peak_frq, line, species, xlim1, xlim2, linename, transition, A, E_up, g_u):
    
    # create a new folder for the core number
    new_folder = f'core {core_num}'
    if not os.path.exists(new_folder):
        os.makedirs(new_folder)
    
    # convert x data from MHz to km/s
    c = 299792.458 #km/s
    tel_src = 0 #km/s
    x_vel = x_vel = c * ((peak_frq - x_frq) / peak_frq) + tel_src
    
    # plotting spectra
    sp = pyspeckit.Spectrum(data=y*1000.0, 
            error=None, 
            xarr=x_vel, 
            yarrkwargs={'unit':'mK'}, 
            xarrkwargs={'unit':'km/s'},
            header={})
    
    # smooth radio noise
    if smoothnum >= 2:
        sp.smooth(smoothnum, smoothtype='hanning')
    else:
        print('no smooth')
                        
    # defines units
    sp.xarr.velocity_convention = 'radio'
    sp.xarr.xtype = 'frequency'
    sp.unit='Intensity Temperature [mK]'

    # plots gaussian fit
    fit = sp.specfit(fittype='gaussian', components=True, xmin=v_low, xmax=v_up, add_baseline=False,
                color='r', plot=False, annotate=False)
   
    # guesses=(amplitude, center/shift?, width)
    
    # prints rms
    sp.error[:] = sp.stats((v_low,v_up))['std']
    rms = np.mean(sp.error[:])
    print('rms = ', rms, 'mK')
    
    # prints parameters (amplitude, shift, width)
    parameters = sp.specfit.parinfo
    peak = sp.stats((v_low,v_up))['max']
    print('peak value = ', peak,'mK')
    print(parameters) 
    print(peak_frq)
    
    # correction factor that converts pyspeckit "linewidth" to actual FWHM
    corr = 2*np.sqrt(2*np.log(2))
    
    # check for None before adding correction factor
    if parameters[2].error is not None:
        param2_error_corr = parameters[2].error * corr
    else:
        param2_error_corr = None
    
    # only prints fits for positive lines that are within source velocity range and have a minimum linewidth (FWHM) of your choice
    if (v_low <= parameters[1].value <= v_up) and (parameters[2].value*corr >= FWHM_min) and (parameters[0].value >= 0):
        print('REAL!') 
            
        # plotting axes, labels
        sp.plotter(axis=sp.plotter.axis, 
                   xmin=xlim1,
                   xmax=xlim2, 
                   clear=True, 
                   label=str(linename)+' '+str(transition)+' '+str(species), 
                    color='b')
        
        sp.plotter.figsize = (22,10)
        sp.plotter.axis.tick_params(axis='both', labelsize=18)
        sp.plotter.axis.legend(handlelength=0, handletextpad=0, fontsize=15)
        
        plt.ylabel('Intensity Temperature [mK]', size=18)
        plt.xlabel('Velocity [km/s]', size=18)
        
        # plot gaussian fit
        sp.specfit.plot_fit(annotate = False)
        plt.tight_layout()
        
        # add custom legend entries
        custom_lines = [Line2D([0],[0], color='none', marker=None, linestyle='None', label=f'{linename}'),
                       Line2D([0],[0], color='none', marker=None, linestyle='None', label=f'{species}'),
                       Line2D([0],[0], color='none', marker=None, linestyle='None', label=f'{transition}')]
        plt.legend(handles=custom_lines, fontsize=15)
        
        # save figures as png's
        file_path = os.path.join(new_folder, f'{line}_{peak_frq}_{linename}_FIT.png')
        plt.savefig(file_path)
        
        # assign scatter, 3sigma, 5sigma, and compiled sigma values for the current peak_frq
        scatter, three_sigma, five_sigma, compiled = assign_values(peak_frq, parameters[0].value)

        # update dictionary for parameters (amplitude, shift, width) 
        parameters_dict['molecule name'].append(linename)
        parameters_dict['species symbol'].append(species)
        parameters_dict['line [MHz]'].append(line)
        parameters_dict['rest frequency [MHz]'].append(peak_frq)
        parameters_dict['transition'].append(transition)
        parameters_dict['amplitude [mK]'].append(parameters[0].value)
        parameters_dict['amplitude ERROR'].append(parameters[0].error)
        parameters_dict['shift [km/s]'].append(parameters[1].value)
        parameters_dict['shift ERROR'].append(parameters[1].error)
        parameters_dict['width [km/s]'].append(parameters[2].value*corr)
        parameters_dict['width ERROR'].append(parameters[2].error*corr)
        parameters_dict['peak [mK]'].append(peak)
        parameters_dict['rms calculated [mK]'].append(rms)
        parameters_dict['upper energy limit [K]'].append(E_up)
        parameters_dict['log10 A(ij)'].append(A)
        parameters_dict['upper state degeneracy'].append(g_u)
        parameters_dict['sigma detection level'].append(compiled[0] if isinstance(compiled, list) else compiled)
    
    return parameters[0].value, parameters[1].value, parameters[2].value*corr, parameters[0].error, parameters[1].error, param2_error_corr, peak, rms

# set xlim1 and xlim2 as the range you want for your diagnostic plots

In [None]:
# 1/4 detections
for i in range(0, len(line_lst)// 4):
    search_lines(xarr, yarr, 1, rf_matched[i], line_lst[i], symbol_lst[i], -2, 13, mol_linename_lst[i], transition_lst[i], A_lst[i], E_up_lst[i], g_u_lst[i])

In [None]:
# 2/4 detections
for i in range(len(line_lst)// 4, len(line_lst)// 2):
    search_lines(xarr, yarr, 1, rf_matched[i], line_lst[i], symbol_lst[i], -2, 13, mol_linename_lst[i], transition_lst[i], A_lst[i], E_up_lst[i], g_u_lst[i])

In [None]:
# 3/4 detections
for i in range(len(line_lst) // 2, 3 * len(line_lst) // 4):
    search_lines(xarr, yarr, 1, rf_matched[i], line_lst[i], symbol_lst[i], -2, 13, mol_linename_lst[i], transition_lst[i], A_lst[i], E_up_lst[i], g_u_lst[i])

In [None]:
# 4/4 detections
for i in range(3 * len(line_lst) // 4, len(line_lst)):
    search_lines(xarr, yarr, 1, rf_matched[i], line_lst[i], symbol_lst[i], -2, 13, mol_linename_lst[i], transition_lst[i], A_lst[i], E_up_lst[i], g_u_lst[i])

In [None]:
# make dataframe with parameters
parameters_df = pd.DataFrame.from_dict(parameters_dict)
print(parameters_df)

# remove duplicate results
parameters_df = parameters_df.drop_duplicates()

# export dataframe to excel
parameters_df.to_excel(f'Detection_Parameters_Core_{core_num}.xlsx', index=False)

