In [1]:
#Imports
import numpy as np
from astropy.table import Table
from astropy import constants as const
import matplotlib.pyplot as plt
import matplotlib
from scipy.interpolate import interp1d
import os
import time
from mpl_toolkits.mplot3d import Axes3D

%matplotlib inline

In [2]:
dataset_dir = "C:\\Users\\nachi\\OneDrive\\Desktop\\Hawkins Lab\\datasets"
spec_data_path = os.path.join(dataset_dir,"HETVIPS_LAMOST_SPEC_UPDATED.fits")
spec_data = Table.read(spec_data_path)
#for name in sorted(spec_data.colnames):
#    print(name)



The formula for RV is:

$$
v = c * \frac{  {\lambda_s} - {\lambda_o}   } {\lambda_o}\
$$

Where\
$v$ - radial velocity\
$c$ - speed of light\
${\lambda_s}$ - the observed wavelength\
${\lambda_o}$ - rest wavelength

Since the tempelates are currently unavailable I will shift the spectrum by some RV v and pretend that it is the rest spectrum. To change the values of the wavelengths to the rest wavelengths we must apply:

$$
{\lambda_o} = \frac{c}{v+c} * {\lambda_s} ----- (1)
$$


Then for the CCF we must pretend we do not know the value of the RV. We will shift the rest spectrum according to the RV we will be testing with. To perform this shift we use:

$$
{\lambda_s} = \frac{v+c}{c} * {\lambda_o} ----- (2)
$$



In [34]:
#shifting specturm and pretending it is the rest spectrum
spec_access_idx = 75
spec = spec_data["spec_norm"][spec_access_idx]
observed_wavelengths = np.linspace(3500,5500,len(spec))
observed_spec = np.array(spec)

c = 299792.46
v = -9 #rv in  km/s
shift_const = c / (v+c) #formula (1)
rest_wavelengths = observed_wavelengths * shift_const
tmplt_spec = observed_spec #just a placehold

In [39]:
#CCF

def get_score(v,observed_spec,observed_wavelengths,tmplt_spec):
    f = interp1d(observed_wavelengths,observed_spec,fill_value="extrapolate",kind="linear",bounds_error=False)
    tmplt_shift_const = (v+c)/c #formula (2)
    new_wavelengths = rest_wavelengths * tmplt_shift_const
    interpolated_observed_spec = np.array(f(new_wavelengths))
    return np.dot(interpolated_observed_spec,tmplt_spec)

def ccf(observed_spec,observed_wavelengths,tmplt_spec):
    start = time.perf_counter()
    negative_rv_score = get_score(-2,observed_spec,observed_wavelengths,tmplt_spec)
    positive_rv_score = get_score(2,observed_spec,observed_wavelengths,tmplt_spec)

    if negative_rv_score > positive_rv_score:
        value_range = np.arange(-5,-11,-0.5)

    elif positive_rv_score > negative_rv_score:
        value_range = np.arange(0,600,0.5)

    
    current_max = 0
    predicted_rv = 0
    scores = []
    for rv in value_range:
        current_score = get_score(rv,observed_spec,observed_wavelengths,tmplt_spec)
        if  current_score > current_max:
            predicted_rv = rv
            current_max = current_score
        scores.append(current_score)
        
    finish = time.perf_counter()
    print(finish-start)
    return predicted_rv,value_range,scores


predicted_rv,value_range,scores = ccf(observed_spec,observed_wavelengths,tmplt_spec)
predicted_rv

0.005263600000034785


-9.0

In [79]:
#shifting specturm and pretending it is the rest spectrum
spec_access_idx = 75
spec = spec_data["spec_norm"][spec_access_idx]
observed_wavelengths = np.linspace(3500,5500,len(spec))
observed_spec = np.array(spec)

c = 299792.46 #c in km/s
v = 245 #rv in  km/s
shift_const = c / (v+c) #formula (2)
rest_wavelengths = observed_wavelengths * shift_const
tmplt_spec = observed_spec #just a placeholder for now

In [80]:
#CCF Improved
iters = 7
def get_score(v,f,tmplt_spec):
    
    tmplt_shift_const = (v+c)/c #formula (2)
    new_wavelengths = rest_wavelengths * tmplt_shift_const
    interpolated_observed_spec = np.array(f(new_wavelengths))
    return np.dot(interpolated_observed_spec,tmplt_spec)

def ccf(observed_spec,observed_wavelengths,tmplt_spec,high_range):
    start = time.perf_counter()
    negative = False
    f = interp1d(observed_wavelengths,observed_spec,fill_value="extrapolate",kind="linear",bounds_error=False)
    negative_rv_score = get_score(-2,f,tmplt_spec)
    positive_rv_score = get_score(2,f,tmplt_spec)

    if negative_rv_score > positive_rv_score:
        high = -high_range
        negative = True

    elif positive_rv_score > negative_rv_score:
        high = high_range

    low = 0
    mid = (low + high)/2

    counter = 0
    predicted_rv = None

    while counter <= iters:

        if not negative:
            mid_plus_score = get_score(mid+1,f,tmplt_spec)
            mid_minus_score = get_score(mid-1,f,tmplt_spec)
        else:
            mid_plus_score = get_score(mid-1,f,tmplt_spec)
            mid_minus_score = get_score(mid+1,f,tmplt_spec)

        score_delta = mid_plus_score - mid_minus_score

        if score_delta < 0:
            high = mid
            mid = (low + high)/2

        elif score_delta > 0:
            low = mid
            mid = (low + high)/2
        
        predicted_rv = mid
        counter += 1
        
    finish = time.perf_counter()
    print(finish-start)
    print(counter)
    return predicted_rv

predicted_rv = ccf(observed_spec,observed_wavelengths,tmplt_spec,600.5)
predicted_rv


0.003066499999476946
8


245.1259765625

In [81]:
#shifting specturm and pretending it is the rest spectrum
spec_access_idx = 75
spec = spec_data["spec_norm"][spec_access_idx]
observed_wavelengths = np.linspace(3500,5500,len(spec))
observed_spec = np.array(spec)

c = 299792.46 #c in km/s
v = 245 #rv in  km/s
shift_const = c / (v+c) #formula (2)
rest_wavelengths = observed_wavelengths * shift_const
tmplt_spec = observed_spec #just a placeholder for now

In [83]:
#CCF Improved Using Gradients
iters = 7
def get_score(v,f,tmplt_spec):
    
    tmplt_shift_const = (v+c)/c #formula (2)
    new_wavelengths = rest_wavelengths * tmplt_shift_const
    interpolated_observed_spec = np.array(f(new_wavelengths))
    return np.dot(interpolated_observed_spec,tmplt_spec)

def ccf(observed_spec,observed_wavelengths,tmplt_spec,high_range):
    start = time.perf_counter()
    negative = False
    f = interp1d(observed_wavelengths,observed_spec,fill_value="extrapolate",kind="linear",bounds_error=False)
    
    negative_rv_score = get_score(-2,f,tmplt_spec)
    positive_rv_score = get_score(2,f,tmplt_spec)

    if negative_rv_score > positive_rv_score:
        high = -high_range
        negative = True

    elif positive_rv_score > negative_rv_score:
        high = high_range

    low = 0
    mid = (low + high)/2

    predicted_rv = None

    mid_plus_score = get_score(mid+1,f,tmplt_spec)
    mid_minus_score = get_score(mid-1,f,tmplt_spec)
    score_delta = mid_plus_score - mid_minus_score

    prev_delta = score_delta
    prev_c = -((prev_delta/2)*(mid+1)) + (mid_plus_score)

    for i in range(iters):

        if not negative:
            mid_plus_score = get_score(mid+1,f,tmplt_spec)
            mid_minus_score = get_score(mid-1,f,tmplt_spec)
        else:
            mid_plus_score = get_score(mid-1,f,tmplt_spec)
            mid_minus_score = get_score(mid+1,f,tmplt_spec)

        score_delta = mid_plus_score - mid_minus_score

        if (score_delta > 0 and prev_delta > 0) or (score_delta < 0 and prev_delta < 0):
            prev_delta = score_delta
            prev_c =  (mid_plus_score)-((score_delta/2)*(mid+1))

        else:
            c = (mid_plus_score)-((score_delta/2)*(mid+1))
            x = (prev_c-c)/((score_delta/2)-(prev_delta/2))

            finish = time.perf_counter()
            print(i)
            print(finish-start)
            return x

        if score_delta < 0:
            high = mid
            mid = (low + high)/2

        elif score_delta > 0:
            low = mid
            mid = (low + high)/2

    
        predicted_rv = mid
        
        
    finish = time.perf_counter()
    print(finish-start)
    return predicted_rv

predicted_rv = ccf(observed_spec,observed_wavelengths,tmplt_spec,600.5)
predicted_rv


1
0.0028178000011394033


245.0000000002613

In [68]:
#Using formula (1) to bring the observed spectrum to the rest frame
def to_rest(observed_spec,observed_wavelengths,predicted_rv):
    shift_const = (predicted_rv+c)/c #formula (1)
    rest_wavelengths = observed_wavelengths * shift_const
    f = interp1d(rest_wavelengths,observed_spec,fill_value="extrapolate",kind="linear",bounds_error=False)
    rest_spec = np.array(f(observed_wavelengths))
    return rest_spec