In [2]:
import numpy as np
from matplotlib.pyplot import subplots, show
from spectrapepper import alsbaseline

In [9]:
VEGA_DIR = './Final_data/corrected/Vega_response_corrected.txt'
MOON_DIR = './Final_data/corrected/Moon_response_corrected.txt'
ARCTURUS_DIR = './Final_data/corrected/Arcturus_response_corrected.txt'
REGULUS_DIR = './Final_data/corrected/Regulus_response_corrected.txt'

In [10]:
%matplotlib qt

In [3]:
def deriv(data: list, iterations: int = 1):
    """
    :param data: list containing curve to take the derivative from 
    :param iterations: the amount of times the derivative will be taken
    :return: returns the curve after taking iteration'th derivative  
    """
    if not isinstance(iterations, int):
        raise ValueError('iterations should be integer value')

    out = []
    out.append((data[1] - data[0]))
    for i in range(1, len(data) - 1):
        area_slope = ((data[i + 1] - data[i]) + (data[i] - data[i - 1])) / 2
        out.append(area_slope)
    out.append((data[-1] - data[-2]))
    iterations -= 1
    if iterations > 0:
        out = deriv(out, iterations)
    return np.array(out)


def get_peak_locations(data, edge_padding=1):
    """
    :param data: Curve to take the peak locations from 
    :param edge_padding: How many datapoints on the edges of the curve above the threshold should be ignored
    :return: returns tuple of (x, y) where x is the peak location and y is the height (y value) of the curve
    """
    array = data.copy()
    sectors = get_peak_sectors(array)
    peaks = []
    for left, right in sectors:
        min, max = None, None
        argmax = array[left:right].argmax() + left
        argmin = array[left:right].argmin() + left
        if not (argmin <= left + edge_padding or argmin >= right - edge_padding):
            valmin = array[left:right].min()
            min = (argmin, valmin)
            peaks.append(min)
        if not (argmax <= left + edge_padding or argmax >= right - edge_padding):
            valmax = array[left:right].max()
            max = (argmax, valmax)
            peaks.append(max)
        del argmax, argmin, min, max
    return peaks


def get_peak_sectors(array):
    """
    :param array: Curve to be cut up into pieces containing peaks. Separate peak regions by sections of numpy's NaN value
    :return: list of indexes where the numeric values begin and end
    """
    was_nan = True
    start = 0
    sectors = []
    for i, element in enumerate(array):
        is_nan = np.isnan(element)
        if not is_nan and was_nan:
            was_nan = False
            start = i
        elif is_nan and not was_nan:
            was_nan = True
            sectors.append([start, i])
            del start
    if not was_nan:
        sectors.append([start, len(array)])
    return sectors


def exclude(data, value, baseline=0, invert=False):
    """
    :param data: Curve from which to exclude sections 
    :param value: Value surrounding the baseline where the data is to be included/excluded depending on the invert kwarg (exclusion done by replacing the excluded point by NaN)
    :param baseline: Value around which to do the exclusion process
    :param invert: whether you want the section inside the cut-off line or outside the cut of line to be replaced with NaN's
    :return: returns the curve with the excluded sections replaced with NaN
    """
    dat = data.copy()
    value = np.abs(value)
    if not invert:
        excluded_array = np.where(np.abs(dat) < (baseline + value), np.nan, dat)
    else:
        excluded_array = np.where(np.abs(dat) > (baseline + value), np.nan, dat)
    return excluded_array

In [4]:

def process_peaks(raw_data, smoothed=True, deriv_count=2, thres_percent=1.5, power=3, normalise=True, name=''):
    """
    Complete process of getting the peak locations
    :param raw_data: Data to be processed for peaks (1d curve in np.array form)
    :param smoothed: Whether the processing should include smoothing or not (set False if your input is already smoothed)
    :param deriv_count: How many times the derivative should be taken, defaults to 2
    :param thres_percent: The percentile to be discarded (defaults to 1.5)
    :param power: The factor the peaks are to be taken to the power off for separating noise from data
    :param normalise: Whether the data should be normalized (divided by the highest value in the curve)
    :param name: Name of the object to be displayed in graphs
    :return: returns the peaks of the submitted curve
    """
    if smoothed:
        data = alsbaseline(raw_data, lam=100, remove=False, niter=2)
    else:
        data = raw_data

    if deriv_count > 0:
        derivative = deriv(data, deriv_count)
        derivative_adjusted = derivative ** power
    else:
        derivative_adjusted = data
    if normalise:
        if np.abs(derivative_adjusted.max()) > np.abs(derivative_adjusted.min()):
            derivative_adjusted /= derivative_adjusted.max()
        else:
            derivative_adjusted /= derivative_adjusted.min()
    else:
        if np.abs(derivative_adjusted.max()) < np.abs(derivative_adjusted.min()):
            derivative_adjusted *= -1

    threshold = np.percentile(derivative_adjusted, thres_percent)
    excluded_data = exclude(derivative_adjusted, threshold)
    peaks = get_peak_locations(excluded_data)
    sorted_peaks = peaks.copy()
    sorted_peaks.sort(key=lambda x: x[1], reverse=True)

    norm_curv = data / data.max()

    fig, ax = subplots()
    fig.suptitle(f'Peakfinding visualisation {name}')
    ax.set_xlabel('datapoint index')
    ax.set_ylabel('normalised "acceleration"')
    ax.plot(norm_curv, alpha=0.7, linestyle='--', color='navy', label=f'{name} (smooth)')
    ax.scatter(*zip(*peaks), c='r', s=5, label='peaks')
    ax.plot(derivative_adjusted, alpha=0.6, label='cubed derivative')
    ax.axhline(y=threshold, color='y', label='threshold')
    ax.axhline(y=-threshold, color='y')
    ax.legend(loc='upper right')
    ax.grid(alpha=0.6)
    show()
    return peaks

In [5]:
def spectra_picker(index):
    """
    :param index: ranging from 0 to 3 representing the object which data should be returned  
    :return: The raw data of the selected object
    """
    if index == 0:
        raw = np.loadtxt(VEGA_DIR)
    elif index == 1:
        raw = np.loadtxt(ARCTURUS_DIR)
    elif index == 2:
        raw = np.loadtxt(REGULUS_DIR)
    elif index == 3:
        raw = np.loadtxt(MOON_DIR)
    else:
        raise IndexError
    return raw


def peak_functionality(name, data, smooth_data):
    """
    :param name: Name of the object to be used for plotting and subfunctions
    :param data: Curve to be processed
    :param smooth_data: Curve but smoothed
    :return: None
    """
    fig, ax = subplots()
    ax.set_title(f'{name} spectra raw vs smoothed')
    ax.plot(data, label=f'{name} data raw')
    ax.plot(smooth_data, label=f'{name} data smoothed')
    ax.set_xlabel('pixel index')
    ax.set_ylabel('Flux')
    ax.legend()
    ax.grid()
    show()

    threshold = {'Arcturus': 10, "Vega": 4, "Regulus": 2, "Moon": 8}
    # arcturus: thresh_percent = 10 (for most visible peaks)
    # vega: thresh_percent = 4 (for 3 main peaks)
    # regulus: thresh_percent = 2
    # moon: thresh_percent = 2

    rough_peaks = process_peaks(smooth_data, thres_percent=threshold[name], smoothed=False, name=name)
    positive_peaks = [peak for peak in rough_peaks if peak[1] > 0]

    fig, ax = subplots()
    ax.set_title(f'{name} data with peaks')
    ax.plot(data, c='navy', alpha=0.6, label=f'{name} raw data')
    for x, _ in positive_peaks:
        y = data[x]
        ax.scatter(x, y, s=20, c='r')
    ax.scatter(x, y, s=20, c='r', label='Peaks')
    ax.legend()
    ax.grid(alpha=0.6)
    ax.set_xlabel('datapoint index')
    ax.set_ylabel('intensity [counts]')
    show()

In [7]:
def run_peakfinding():
    """Runs the peakfinding algorithm"""
    objects = ['Arcturus', 'Vega', 'Regulus', 'Moon']
    for i in range(len(objects)):
        raw = spectra_picker(i)
        smooth = alsbaseline(raw, lam=50, niter=5, remove=False)
        peak_functionality(objects[i], smooth)