In [None]:
import numpy as np
import matplotlib.pyplot as plt
from astropy.io import fits
from scipy.interpolate import interp1d
from scipy.ndimage import gaussian_filter1d
from astropy.stats import sigma_clip

In [None]:
def read_spectrum(filename, hdu_index=1, lambda_column='WAVELENGTH', flux_column='FLUX',err_column='ERROR'):
    
    if filename.lower().endswith('.fits'):
        
        with fits.open(filename) as hdul:
            hdu = hdul[hdu_index]
            data = hdu.data
            header = hdu.header

            # Table-like HDU
            
            if hasattr(data, 'columns') and lambda_column in data.columns.names:
                lambda_obs = data[lambda_column]
                flux = data[flux_column]
                flux_err = data[err_column] if err_column in data.columns.names else np.full_like(flux, np.nan)
            else:
                # Image-like HDU
                flux = np.asarray(data).squeeze()
                crval = header.get("CRVAL1")
                cdelt = header.get("CDELT1")
                if crval is None or cdelt is None:
                    raise KeyError("FITS header lacks CRVAL1/CDELT1 to compute wavelength array.")
                lambda_obs = crval + cdelt * np.arange(len(flux))
                flux_err = np.full_like(flux, np.nan)

    else:
        # ASCII file
        ascii_data = np.loadtxt(filename, unpack=True)
        if ascii_data.shape[0] == 3:
            lambda_obs, flux, flux_err = ascii_data
        elif ascii_data.shape[0] == 2:
            lambda_obs, flux = ascii_data
            flux_err = np.full_like(flux, np.nan)
        else:
            raise ValueError("ASCII file must have 2 or 3 columns: wavelength, flux [, error]")

    lambda_obs = np.asarray(lambda_obs).astype(float).squeeze()
    flux = np.asarray(flux).astype(float).squeeze()
    flux_err = np.asarray(flux_err).astype(float).squeeze()

    return lambda_obs, flux, flux_err

In [None]:
def normalize_continuum(lambda_obs, flux, order=3):
    
    mask_selection = np.isfinite(flux)
    lambda_obs_fit, flux_fit = lambda_obs[mask_selection], flux[mask_selection]
    
    mask_clip = sigma_clip(flux_fit, sigma=3).mask
    
    if np.all(mask_clip):
        continuum_coeff = np.polyfit(lambda_obs_fit, flux_fit, order)
    else:
        continuum_coeff = np.polyfit(lambda_obs_fit[~mask_clip], flux_fit[~mask_clip], order)
        
    continuum = np.polyval(continuum_coeff, lambda_obs)
    
    flux_norm = flux/continuum
    
    return flux_norm, continuum

In [None]:
def correct_redshift(lambda_obs, z):
    
    lambda_rest = lambda_obs / (1+z)
    
    return lambda_rest

In [None]:
def measure_equivalent_width(lambda_obs, flux, flux_err, line_region, continuum_regions):
    
    mask_line = (lambda_obs >= line_region[0]) & (lambda_obs <= line_region[1])
    
    lambda_continuum_list = []
    flux_continuum_list = []
    
    for continuum_window in continuum_regions:
        
        mask_continuum = (lambda_obs >= continuum_window[0]) & (lambda_obs <= continuum_window[1])
        lambda_continuum_list.append(lambda_obs[mask_continuum])
        flux_continuum_list.append(flux[mask_continuum])
    
    lambda_continuum_all = np.concatenate(lambda_continuum_list)
    flux_continuum_all = np.concatenate(flux_continuum_list)
    
    continuum_coeff = np.polyfit(lambda_continuum_all, flux_continuum_all, 1)
    continuum_fit = np.polyval(continuum_coeff, lambda_obs[mask_line])
    
    eq_width = np.trapz(1 - flux[mask_line]/continuum_fit, lambda_obs[mask_line])
    
    if flux_err is not None and np.all(np.isfinite(flux_err)):
        flux_err_line = flux_err[mask_line]
        delta_lambda = np.mean(np.diff(lambda_obs[mask_line]))
        eq_width_err = np.sqrt(np.sum((flux_err_line/continuum_fit)**2)) * delta_lambda
    else:
        eq_width_err = np.nan
    
    return eq_width, eq_width_err

In [None]:
def match_resolution(flux, lambda_obs, fwhm_obs, fwhm_model):
    
    if fwhm_obs > fwhm_model:
        pixel_step = np.median(np.diff(lambda_obs))
        sigma_pixel = np.sqrt(fwhm_obs**2 - fwhm_model**2) / (2.355*pixel_step)
        flux_smoothed = gaussian_filter1d(flux, sigma_pixel)
        return flux_smoothed
    else:
        return flux

In [None]:
def chi2_fit(lamda_obs, flux, flux_err, lambda_model, flux_model):
    
    interp_model = interp1d(lambda_model, flux_model, bounds_error=False, fill_value='extrapolate')
    flux_model_interp = interp_model(lambda_obs)
    
    mask_valid = (np.isfinite(flux_err)) & (flux_err > 0)
    chi2 = np.sum(((flux[mask_valid] - flux_model_interp[mask_valid])/flux_err[mask_valid])**2) \
         / (np.sum(mask_valid) - 1)
   
    return chi2, flux_model_interp

In [None]:
def compare_with_models(lambda_obs, flux, flux_err, models, z, fwhm_obs=None, fwhm_model=None, lines=None):

    lambda_rest = correct_redshift(lambda_obs, z)
    flux_norm, _ = normalize_continuum(lambda_rest, flux)
    
    results = []
    
    for model in models:
        lambda_model = model['lambda']
        flux_model = model['flux']
        
        if fwhm_obs is not None and fwhm_model is not None:
            flux_model = match_resolution(flux_model, lambda_model, fwhm_obs, fwhm_model)
        
        flux_model_norm, _ = normalize_continuum(lambda_model, flux_model)

        chi2, flux_model_interp = chi2_fit(lambda_rest, flux_norm, flux_err, lambda_model, flux_model_norm)
        
        line_indices = {}
        if lines is not None:
            for line_name, (line_region, continuum_regions) in lines.items():
                eq_width, eq_width_err = measure_equivalent_width(lambda_rest, flux_norm, flux_err,
                                                                  line_region, continuum_regions)
                line_indices[line_name] = (eq_width, eq_width_err)
        
        results.append({'age': model.get('age', np.nan),
                        'Z': model.get('Z', np.nan),
                        'chi2': chi2,
                        'lines': line_indices,
                        'model_flux': flux_model_interp})
    
    dtype = [('age', float), ('Z', float), ('chi2', float), ('lines', object), ('model_flux', object)]
    results = np.array([tuple(d[k] for k in ['age','Z','chi2','lines','model_flux']) for d in results], dtype=dtype)
    
    best_model_idx = np.argmin(results['chi2'])
    best_model = results[best_model_idx]
    
    plt.figure(figsize=(10,5))
    plt.plot(lambda_rest, flux_norm, label='Observed spectrum')
    plt.plot(lambda_rest, best_model['model_flux'], 
             label='Best-fit model (age={}, Z={})'.format(best_model['age'], best_model['Z']), alpha=0.7)
    plt.xlabel(r'$\lambda\,[\mathrm{\AA}]$', fontsize=12)
    plt.ylabel('Normalized flux', fontsize=12)
    plt.title('Comparison between observed and synthetic spectrum', fontsize=13)
    plt.legend()
    plt.show()
    
    return results, best_model

In [None]:
if __name__ == '__main__':

    use_mock_data = True 

    if use_mock_data:
        lambda_obs = np.linspace(3800, 6250, 5000)
        flux = (np.ones_like(lambda_obs)
                - 0.3 * np.exp(-0.5* ((lambda_obs - 4861) / 1.5)**2)   # Hβ
                - 0.15 * np.exp(-0.5 * ((lambda_obs - 4339) / 1.2)**2)  # Hγ
                - 0.1 * np.exp(-0.5 * ((lambda_obs - 4102) / 1.0)**2)   # Hδ
                - 0.2 * np.exp(-0.5 * ((lambda_obs - 5175) / 2.0)**2)  # Mgb
                - 0.15 * np.exp(-0.5 * ((lambda_obs - 5270) / 1.5)**2)  # Fe5270
                - 0.1 * np.exp(-0.5 * ((lambda_obs - 5335) / 1.5)**2)) # Fe5335
        flux += 0.02 * np.random.randn(len(lambda_obs))
        flux[:600] = 1.0
        flux[-600:] = 1.0
        flux_err = 0.05 * np.ones_like(flux)
        z = 0.0
        print('Mock data')
    else:
        lambda_obs, flux, flux_err = read_spectrum('observed_spectrum.txt')
        z = 0.01
        print('Real data')

    flux_norm, _ = normalize_continuum(lambda_obs, flux)

    # Lines to be measured
    
    lines = {
        'Hβ':    ((4847, 4876), [(4827, 4847), (4876, 4891)]),
        'Hγ':    ((4335, 4360), [(4310, 4330), (4365, 4375)]),
        'Hδ':    ((4090, 4115), [(4070, 4090), (4120, 4140)]),
        'Mgb':   ((5160, 5192), [(5142, 5161), (5191, 5206)]),
        'Fe5270':((5245, 5285), [(5233, 5248), (5285, 5318)]),
        'Fe5335':((5312, 5352), [(5304, 5315), (5345, 5355)]),
        'NaD':   ((5880, 5905), [(5860, 5880), (5905, 5925)]),
        'Ca4227':((4215, 4230), [(4200, 4215), (4230, 4240)]),
        'G4300': ((4280, 4315), [(4260, 4280), (4315, 4330)]),
        'TiO':   ((6180, 6220), [(6160, 6180), (6220, 6240)])
    }

    # Equivalent width measurements
    
    for line_name, (line_region, continuum_regions) in lines.items():
        try:
            eq_width, eq_width_err = measure_equivalent_width(lambda_obs, flux_norm, flux_err,
                                                              line_region, continuum_regions)
            print(f'{line_name} = {eq_width:.2f} +/- {eq_width_err:.2f} Å')
        except Exception as e:
            print(f'{line_name}: could not measure EW ({e})')

    # Model grid generation
    model_grid = []
    for age in [1, 5, 10]:
        for Z in [0.004, 0.02]:
            l = lambda_obs.copy()  
            if use_mock_data:
                f = (np.ones_like(l)
                     - 0.3 * np.exp(-0.5 * ((l-4861) / 1.5)**2)
                     - 0.15 * np.exp(-0.5 * ((l-4339) / 1.2)**2)
                     - 0.1 * np.exp(-0.5 * ((l-4102) / 1.0)**2)
                     - 0.2 * np.exp(-0.5 * ((l-5175) / 2.0)**2)
                     - 0.15 * np.exp(-0.5 * ((l-5270) / 1.5)**2)
                     - 0.1 * np.exp(-0.5 * ((l-5335) / 1.5)**2))
                f += 0.02 * np.random.randn(len(l))
                f[:600] = 1.0
                f[-600:] = 1.0
            else:
                f = np.ones_like(l)
            model_grid.append({'lambda': l, 'flux': f, 'age': age, 'Z': Z})

    # Model comparison
    
    results, best_model = compare_with_models(lambda_obs, flux, flux_err, model_grid,
                                              z=z, fwhm_obs=3.0, fwhm_model=2.5, lines=lines)

    print('Best-fit model:', best_model)