# Functions for evaluating signal-to-noise ratio (SNR)

Isaac Cheng - January 2022

Currently reproduces Table 3-6 of the CASTOR Science Maturation Study.

This is just a notebook to make and test the functions easily. I will create a proper
Python file to be used in a module later. This code is super messy... Sorry!

Resources:
- [CCD Signal/Noise](http://slittlefair.staff.shef.ac.uk/teaching/phy217/lectures/instruments/L14/index.html)


In [1]:
import numpy as np
import pandas as pd
import astropy.units as u
import matplotlib.pyplot as plt
from scipy.interpolate import interp1d

# Import my own modules
import sys

sys.path.append("../src/")
import constants as const

## Python Methods


Load data functions


In [2]:
def load_passbands(filters="all", limits=None, resolution=None):
    """
    Loads the passband throughput curves.

    Parameters
    ----------
      filters :: 1D list of strings or "all"
        The list of filters to load; valid filters are:
          - "uv"
          - "u"
          - "g"
        If "all" is given, all filters are loaded.

      limits :: dict of lists or None
        Dictionary containing key-value pairs of the filter and the wavelength where the
        value is a list containing the lower and upper wavelength limits for that passband
        (inclusive). If the values are lists of floats, the units are assumed to be
        micrometres. For example:
        {"uv": [0.15, 0.30], "u": [0.30, 0.40], "g": [0.40, 0.55]} or
        {"uv": [150 * u.nm, 300 * u.nm], "g": [400 * u.nm, 550 * u.nm]}.
        If None, use the passband limits (from the constants file) for each of the
        specified filters.

      resolution :: scalar or `astropy.Quantity` or None
        The linear interpolation resolution of the passband curves. If a scalar, the unit
        is assumed to be in micrometres. If None, use the native resolution of the
        passband files.

    Returns
    -------
      passbands :: dict of `pandas.DataFrame`
        Dictionary containing the passband throughput curves. Each throughput curve is a
        pandas DataFrame and the wavelengths are in micrometres.

    Examples
    --------
    ```
    passbands = load_passbands(filters="all")  # load all filters
    print(passbands)  # view all data for all filters
    print(passbands["uv"])  # view all data for the "uv" filter
    print(passbands["u"]["wavelength"])  # view the wavelength values of the "u" filter
    print(passbands["g"]["throughput"])  # view the throughput values of the "g" filter
    print(passbands["uv"].iloc[0])  # view the 1st row's data for the "uv" filter
    print(passbands["uv"].iloc[0:3])  # view data from rows 0 to 2 for the "uv" filter
    ```
    """
    #
    # Check inputs
    #
    if type(filters) == str:
        filters = [filters]
    if np.ndim(filters) != 1:
        raise ValueError("filters must be a 1D list of strings or 'all'")
    if filters == ["all"]:
        filters = const.PASSBANDS
    else:
        if any(band not in const.PASSBANDS for band in filters):
            raise ValueError(f"Invalid filters. Valid filters are: {const.PASSBANDS}")
    #
    if limits is None:
        limits = {band: const.PASSBAND_LIMITS[band] for band in filters}
    else:
        for key in limits.keys():
            for i, lim in enumerate(limits[key]):
                if isinstance(lim, u.Quantity):
                    limits[key][i] = lim.to(u.um).value
    if any(band not in limits.keys() for band in filters) or any(
        key not in filters for key in limits.keys()
    ):
        raise ValueError("filters and limits must be consistent")
    #
    if isinstance(resolution, u.Quantity):
        resolution = resolution.to(u.um).value
    #
    # Load passband files
    #
    passbands = dict.fromkeys(filters)  # initialize empty dictionary
    for band in passbands:
        passbands[band] = pd.read_csv(
            const.DATAPATH + f"passbands/passband_castor.{band}",
            sep=" +",
            header=None,
            comment="#",
            engine="python",
        )  # sep=" +" is Python regex to match a variable number of spaces
    #
    # Trim passband data to given limits
    #
    if resolution is not None:
        # Interpolate passband data to given resolution
        for band in passbands:
            interp = interp1d(
                passbands[band][0].values,
                passbands[band][1].values,
                kind="linear",
                bounds_error=False,
                fill_value=np.nan,
            )
            wavelengths = np.arange(
                limits[band][0], limits[band][1] + 0.5 * resolution, resolution
            )
            passbands[band] = pd.DataFrame(
                {"wavelength": wavelengths, "throughput": interp(wavelengths)},
            )
    else:
        # Do not interpolate passband data
        for band in passbands:
            is_good = (passbands[band][0] >= limits[band][0]).values & (
                passbands[band][0] <= limits[band][1]
            ).values
            passbands[band] = pd.DataFrame(
                {
                    "wavelength": (passbands[band][0].values)[is_good],
                    "throughput": (passbands[band][1].values)[is_good],
                }
            )
    return passbands


In [3]:

def load_sky_background(resolution=None, limits=None):
    """
    Loads the sky background noise.

    Parameters
    ----------
      resolution :: float or `astropy.Quantity` or None
        The linear interpolation resolution of the data. If a float, value must be in
        angstroms. If None, use the native resolution of the sky background data.

      limits :: list of 2 floats or list of 2 `astropy.Quantity` or None
        If resolution is None, limit specifies the minimum and maximum wavelengths of the
        returned background data, inclusive. If resolution is not None, limit specifies
        the start and end wavelengths of the interpolated background data, inclusive. If
        the list elements are floats, the values are assumed to be in angstroms. If this
        parameter is None, use the min and max wavelengths from the background file as the
        interpolation limits.

    Returns
    -------
      background :: `pandas.DataFrame`
        DataFrame containing the sky background noise. Wavelengths are in angstroms and
        the background values are in erg/cm^2/s/A/arcsec^2.
    """
    #
    # Check inputs
    #
    if isinstance(resolution, u.Quantity):
        resolution = resolution.to(u.AA).value
    if limits is not None:
        for i, lim in enumerate(limits):
            if isinstance(lim, u.Quantity):
                limits[i] = lim.to(u.AA).value
    #
    # Load background data
    #
    background = pd.read_csv(
        const.DATAPATH + "background/high_sky_background.txt",
        sep=" ",
        header=0,
        comment="#",
    )
    #
    # Interpolate passband data to given resolution
    #
    if resolution is not None:
        if limits is None:
            limits = [
                np.nanmin(background["wavelength"]),
                np.nanmax(background["wavelength"]),
            ]
        wavelengths = np.arange(
            limits[0], limits[1] + 0.5 * resolution, resolution
        )
        interp_background = pd.DataFrame({"wavelength": wavelengths})
        for col in background.columns:
            if col == "wavelength":
                continue  # no need to interpolate the wavelength column
            interp = interp1d(
                background["wavelength"].values,
                background[col].values,
                kind="linear",
                bounds_error=False,
                fill_value=np.nan,
            )
            # Add column to DataFrame
            interp_background[col] = interp(wavelengths)
        background = interp_background
    elif limits is not None:
        background = background[
            (background["wavelength"] >= limits[0])
            & (background["wavelength"] <= limits[1])
        ]
    return background

Unit Conversion Functions


In [4]:
def convert_freq_wavelength(data, to="wavelength", output_unit=u.AA):
    """
    Converts between frequency and wavelength for light in a vacuum.

    Parameters
    ----------
      data :: scalar or `astropy.Quantity` or array
        The frequency of wavelength data to convert. If values are scalars, they are
        assumed to be in Hz for frequencies and in angstroms for wavelengths.

      to :: "wavelength" or "frequency"
        The quantity to convert the input data to.

      output_unit :: `astropy.Quantity`
        The unit of the returned, converted data.

    Returns
    -------
      converted_data :: scalar or array
        The converted data.
    """
    #
    # Check inputs
    #
    if to == "wavelength":
        unit = u.Hz  # data are frequencies
    elif to == "frequency":
        unit = u.AA  # data are wavelengths
    else:
        raise ValueError(f"to must be 'wavelength' or 'frequency'")
    #
    if isinstance(data, u.Quantity):
        data = data.to(unit)
    else:
        data = data * unit
    #
    # Convert
    #
    lightspeed = const.LIGHTSPEED * u.m / u.s
    converted_data = (lightspeed / data).to(output_unit).value
    return converted_data


def flam_to_photlam(flam, wavelength):
    """
    Converts from flam (erg/cm^2/s/A) to photlam (photon/cm^2/s/A).
    See <https://www.stsci.edu/~strolger/docs/UNITS.txt>.

    Parameters
    ----------
      flam :: array
        The flux in flam.

      wavelength :: array of scalars or `astropy.Quantity`
        The corresponding wavelengths of the flux. If values are scalars, they are assumed
        to be in angstroms.

    Returns
    -------
      photlam :: array
        The flux in photlam.
    """
    if isinstance(wavelength, u.Quantity):
        wavelength = wavelength.to(u.AA).value
    return const.FLAM_TO_PHOTLAM * flam * wavelength


def fnu_to_photlam(fnu, wavelength):
    """
    Converts from fnu (erg/cm^2/s/Hz) to photlam (photom/cm^2/s/A). See
    <https://hea-www.harvard.edu/~pgreen/figs/Conversions.pdf> ("F_\nu" to "f_\lambda").

    Parameters
    ----------
      fnu :: array
        The flux in fnu.

      wavelength :: array of scalars or `astropy.Quantity`
        The corresponding wavelengths of the flux. If values are scalars, they are assumed
        to be in angstroms.

    Returns
    -------
      photlam :: array
        The flux in photlam.
    """
    if isinstance(wavelength, u.Quantity):
        wavelength = wavelength.to(u.AA).value
    return const.FNU_TO_PHOTLAM * fnu / wavelength


def fnu_to_flam(fnu, wavelength):
    """
    Converts from fnu (erg/cm^2/s/Hz) to flam (erg/cm^2/s/A). See
    <https://hea-www.harvard.edu/~pgreen/figs/Conversions.pdf> ("F_\nu" to "F_\lambda").

    Parameters
    ----------
      fnu :: array
        The flux in fnu.

      wavelength :: array of scalars or `astropy.Quantity`
        The corresponding wavelengths of the flux. If values are scalars, they are assumed
        to be in angstroms.

    Returns
    -------
      photlam :: array
        The flux in photlam.
    """
    if isinstance(wavelength, u.Quantity):
        wavelength = wavelength.to(u.AA).value
    return const.FNU_TO_FLAM * fnu / (wavelength * wavelength)

In [5]:
def convert_rel_abs_mag(mag, dist, mag_err=0.0, dist_err=0.0, to="abs"):
    """
    Converts between relative and absolute magnitudes.

    Parameters
    ----------
      mag :: scalar or array
        The relative or absolute magnitudes.

      dist :: scalar or `astropy.Quantity` or array
        The distance to the source. If values are scalars, they are assumed to be in
        parsecs.

      mag_err :: scalar or array
        The uncertainty in the relative or absolute magnitudes.

      dist_err :: scalar or `astropy.Quantity` or array
        The uncertainty in the distance to the source. If values are scalars, they are
        assumed to be in parsecs.

      to :: "abs" or "rel"
        If "abs", convert input magnitudes to absolute magnitudes. If "rel", convert
        input magnitudes to relative magnitudes.

    Returns
    -------
      new_mag, rel_mag_err :: scalar or array
        The converted magnitudes and their uncertainties.
    """
    #
    # Check inputs
    #
    if isinstance(dist, u.Quantity):
        dist = dist.to(u.parsec).value
    if isinstance(dist_err, u.Quantity):
        dist_err = dist_err.to(u.parsec).value
    #
    # Convert magnitudes
    #
    if to == "abs":
        new_mag = mag - 5 * (np.log10(dist) - 1)
    elif to == "rel":
        new_mag = mag + 5 * (np.log10(dist) - 1)
    else:
        raise ValueError("to must be either 'abs' or 'rel'")
    new_mag_err = np.sqrt(mag_err ** 2 + (5 / np.log(10) * dist_err / dist) ** 2)
    #
    return new_mag, new_mag_err


def flux_to_mag(flux, flux_err=0.0, zpt=-48.60, calc_abs=False, dist=None, dist_err=0.0):
    """
    Calculates the relative or absolute magnitude of a source and its uncertainty given
    the monochromatic flux of the source.

    Parameters
    ----------
      flux, flux_err :: scalar or arrays
        The monochromatic flux and its uncertainty. The unit of the flux depends on the
        magnitude system.

      zpt :: scalar
        The zero point of the magnitude system.

      calc_abs :: bool
        If True, calculate the absolute magnitude; also requires the dist parameter to be
        provided. If False, calculate the relative magnitude.

      dist, dist_err :: scalar or `astropy.Quantity` or array
        The distance to the source and its uncertainty. If values are scalars, they are
        assumed to be in parsecs.

    Returns
    -------
      mag, mag_err :: float or array
        The relative or absolute magnitude and its uncertainty.
    """
    #
    # Calculate magnitude
    #
    rel_mag = -2.5 * np.log10(flux) + zpt
    rel_mag_err = 2.5 / np.log(10) * abs(flux_err / flux)
    #
    if calc_abs:
        if dist is None:
            raise ValueError("dist must be provided if calc_abs is True")
        abs_mag, abs_mag_err = convert_rel_abs_mag(
            rel_mag, dist, mag_err=rel_mag_err, dist_err=dist_err, to="abs"
        )
        return abs_mag, abs_mag_err
    #
    return rel_mag, rel_mag_err


def mag_to_flux(mag, mag_err=0.0, zpt=-48.60):
    """
    Converts magnitude to flux. The units of the flux will depend on the magnitude system.
    
    Parameters
    ----------
      mag, mag_err :: scalar or arrays
        The magnitudes and their uncertainties
      
      zpt :: scalar
        The zero point of the magnitude system. For example, zpt=-48.60 corresponds to the 
        AB magnitude system and the flux will be in units of erg/s/cm^2/Hz. Likewise,
        zpt=-21.1 corresponds to the ST magnitude system and the flux will be in units of
        erg/s/cm^2/A.
    """
    flux = 10 ** (-0.4 * (mag - zpt))
    flux_err = 0.4 * np.log(10) * abs(flux * mag_err)
    return flux, flux_err


In [6]:
def calc_photon_energy(
    wavelength=None, frequency=None, wavelength_err=0.0, frequency_err=0.0
):
    """
    Calculates the energy of a photon given its wavelength or frequency.

    Parameters
    ----------
      wavelength, frequency :: scalar or `astropy.Quantity` or array
        The wavelength and frequency of the photon; only one of these should be provided.
        If values are scalars, wavelength is assumed to be in angstrom and frequency in
        hertz.

      wavelength_err, frequency_err :: scalar or `astropy.Quantity` or array
        The uncertainty in the wavelength and uncertainty; only one of these should be
        provided. If values are scalars, wavelength_err is assumed to be in angstrom and
        frequency_err in hertz.

    Returns
    -------
      energy, energy_err :: scalar or array
        The energy of the photon and its uncertainty, in ergs (1 erg = 10^-7 joule).
    """
    if wavelength is not None and frequency is not None:
        raise ValueError("Only one of wavelength and frequency should be provided")
    elif wavelength is not None:
        if isinstance(wavelength, u.Quantity):
            wavelength = wavelength.to(u.AA).value
        if isinstance(wavelength_err, u.Quantity):
            wavelength_err = wavelength_err.to(u.AA).value
        energy = const.PLANCK_H * const.LIGHTSPEED / (wavelength * 1e-10)  # J
        energy_err = energy * (wavelength_err / wavelength)
    elif frequency is not None:
        if isinstance(frequency, u.Quantity):
            frequency = frequency.to(u.Hz).value
        if isinstance(frequency_err, u.Quantity):
            frequency_err = frequency_err.to(u.Hz).value
        energy = const.PLANCK_H * frequency  # J
        energy_err = energy * (frequency_err / frequency)
    else:
        raise ValueError("Either wavelength or frequency must be provided")
    return energy * 1e7, energy_err * 1e7  # erg


## Trying to find the time required to reach a given SNR...


In [7]:
#
# Set parameters
#
RESOLUTION = 1 * u.nm  # common resolution for interpolation
PASSBAND_TOT_LIMITS = [
    min(const.PASSBAND_LIMITS.values(), key=lambda x: x[0])[0] * u.um,
    max(const.PASSBAND_LIMITS.values(), key=lambda x: x[1])[1] * u.um,
]  # minimum and maximum wavelength covered by the telescope
PASSBANDS = const.PASSBANDS
NPIX = 1  # number of pixels of the source (1 => point source)


In [8]:
castor_passbands = load_passbands(filters=PASSBANDS, limits=None, resolution=RESOLUTION)

sky_background = load_sky_background(
    resolution=RESOLUTION, limits=PASSBAND_TOT_LIMITS
)  # background initially in units of erg/cm^2/s/A/arcsec^2
for col in sky_background.columns:
    if col == "wavelength":
        continue
    sky_flam = sky_background[col] * const.PX_AREA  # erg/cm^2/s/A
    sky_photlam = flam_to_photlam(sky_flam, sky_background["wavelength"] * u.AA)  # photons/s/cm^2/A
    sky_background[col] = sky_photlam * const.APER_AREA  # photons/s/A

# Assume geocoronal emission constant over entire aperture
geo_background = (
    const.GEOCORONAL_FLUX
    * const.PX_AREA
    * const.APER_AREA
    / calc_photon_energy(
        wavelength=const.GEOCORONAL_WAVELENGTH * u.AA, wavelength_err=0.0
    )[0]
)  # photon / s


In [9]:
castor_passbands

{'uv':      wavelength  throughput
 0         0.150    0.014345
 1         0.151    0.015924
 2         0.152    0.017648
 3         0.153    0.019521
 4         0.154    0.021553
 ..          ...         ...
 146       0.296    0.017400
 147       0.297    0.013037
 148       0.298    0.009245
 149       0.299    0.006053
 150       0.300    0.003508
 
 [151 rows x 2 columns],
 'u':      wavelength  throughput
 0         0.300    0.524463
 1         0.301    0.527992
 2         0.302    0.531407
 3         0.303    0.534705
 4         0.304    0.537884
 ..          ...         ...
 96        0.396    0.328489
 97        0.397    0.313956
 98        0.398    0.299435
 99        0.399    0.284926
 100       0.400    0.270427
 
 [101 rows x 2 columns],
 'g':      wavelength  throughput
 0         0.400    0.343162
 1         0.401    0.360624
 2         0.402    0.376599
 3         0.403    0.391541
 4         0.404    0.405954
 ..          ...         ...
 146       0.546    0.554785
 1

In [10]:
sky_background

Unnamed: 0,wavelength,earthshine,zodiacal_light,total_sky_background
0,1500.0,,,
1,1510.0,,,
2,1520.0,,,
3,1530.0,,,
4,1540.0,,,
...,...,...,...,...
396,5460.0,0.000055,0.000111,0.000167
397,5470.0,0.000055,0.000112,0.000167
398,5480.0,0.000055,0.000112,0.000167
399,5490.0,0.000055,0.000112,0.000168


In [11]:
#
# Set AB magnitudes of the point sources
#
ab_mags = np.arange(22.0, 29.0, 0.5)  # relative AB magnitudes
#
# Convert to flam (erg/s/cm^2/A) and photlam (photons/s/cm^2/A)
#
ab_fnu = mag_to_flux(ab_mags, mag_err=0.0, zpt=-48.60)[0]  # erg/s/cm^2/Hz
ab_flam = pd.DataFrame(index=ab_mags, columns=PASSBANDS)  # erg/s/cm^2/A
ab_phot_rate_per_aa = pd.DataFrame(index=ab_mags, columns=PASSBANDS)  # photons/s/A
for band in PASSBANDS:
    wavelengths = castor_passbands[band]["wavelength"].values * u.um
    for i, ab_mag in enumerate(ab_mags):
        ab_flam[band][ab_mag] = fnu_to_flam(ab_fnu[i], wavelengths)
        photlam = fnu_to_photlam(ab_fnu[i], wavelengths)
        ab_phot_rate_per_aa[band][ab_mag] = photlam * const.APER_AREA


In [12]:
#
# Integrate flux over passband---i.e., get all relevant quantites in units of electrons/s
#
ab_source_e_rate = pd.DataFrame(index=ab_mags, columns=PASSBANDS)  # electron/s
ab_sky_e_rate = dict.fromkeys(PASSBANDS)  # electron/s
for band in PASSBANDS:
    throughput = castor_passbands[band]["throughput"].values
    #
    # Calculate total background noise from sky and geocoronal emission. Assume background
    # is constant over wavelength resolution (e.g., 10 angstroms) and is uniform over the
    # whole aperture.
    #
    band_start = (
        (castor_passbands[band]["wavelength"].iloc[0] * u.um).to(u.AA).value
    )
    band_end = (castor_passbands[band]["wavelength"].iloc[-1] * u.um).to(u.AA).value
    is_in_band = (sky_background["wavelength"] >= band_start) & (
        sky_background["wavelength"] <= band_end
    )
    background = np.nansum(
        sky_background["total_sky_background"][is_in_band]
        * RESOLUTION.to(u.AA).value
        * throughput
    ) * NPIX
    # Add geocoronal emission line [O II] 2471A to the relevant passband
    if (const.GEOCORONAL_WAVELENGTH >= band_start) & (
        const.GEOCORONAL_WAVELENGTH <= band_end
    ):
        throughput_interp = interp1d(
            castor_passbands[band]["wavelength"].values,
            throughput,
            kind="linear",
            bounds_error=False,
            fill_value=np.nan,
        )
        geo_throughput = throughput_interp(
            (const.GEOCORONAL_WAVELENGTH * u.AA).to(u.um)
        )
        background += geo_background * geo_throughput * NPIX
    ab_sky_e_rate[band] = background
    #
    # Calculate total signal in band
    #
    for i, ab_mag in enumerate(ab_mags):
        # Assume flux is constant over wavelength resolution (e.g., 10 angstroms)
        e_rate = np.nansum(
            ab_phot_rate_per_aa[band][ab_mag]
            * throughput
            * RESOLUTION.to(u.AA).value
        )
        ab_source_e_rate[band][ab_mag] = e_rate
dark_current = const.DARK_CURRENT * NPIX  # electron/s


In [13]:
ab_source_e_rate

Unnamed: 0,uv,u,g
22.0,9.67137,10.1912,12.843
22.5,6.10222,6.43023,8.10338
23.0,3.85024,4.0572,5.11289
23.5,2.42934,2.55992,3.22601
24.0,1.53281,1.6152,2.03548
24.5,0.967137,1.01912,1.2843
25.0,0.610222,0.643023,0.810338
25.5,0.385024,0.40572,0.511289
26.0,0.242934,0.255992,0.322601
26.5,0.153281,0.16152,0.203548


In [14]:
ab_sky_e_rate

{'uv': 0.0055523860501595634,
 'u': 0.023971286002944743,
 'g': 0.12193540662452251}

In [15]:
dark_current

0.01

In [16]:
#
TARGET_SNR = 10
#
t_lim = pd.DataFrame(
    index=ab_mags, columns=PASSBANDS
)  # time to reach magnitude given target SNR (s)
read_noise = const.READ_NOISE ** 2 * NPIX  # electrons^2
for band in PASSBANDS:
    for ab_mag in ab_mags:
        t = 0  # second
        snr = 0  # signal to noise ratio
        while snr < TARGET_SNR:
            t += 1
            signal = ab_source_e_rate[band][ab_mag] * t  # electrons
            noise = np.sqrt(
                signal + ab_sky_e_rate[band] * t + dark_current * t + read_noise
            )
            snr = signal / noise
            if t > 20000:
                print(f"Exiting for {band} band at {ab_mag} mag")
                break
        t_lim[band][ab_mag] = t


In [17]:
t_lim

Unnamed: 0,uv,u,g
22.0,12,11,9
22.5,18,17,14
23.0,29,27,22
23.5,45,43,35
24.0,71,68,56
24.5,114,109,92
25.0,181,176,152
25.5,290,286,259
26.0,469,470,455
26.5,766,791,835
