Skip to content

Commit

Permalink
Merge pull request #382 from AstarVienna/fh/somerefac
Browse files Browse the repository at this point in the history
Refactor some rarely-used utils functions
  • Loading branch information
teutoburg committed Mar 1, 2024
2 parents 53ed24c + d20a45d commit 2d57b6f
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 186 deletions.
22 changes: 12 additions & 10 deletions scopesim/optics/surface.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@
from synphot.models import Empirical1D

from ..effects import ter_curves_utils as ter_utils
from .surface_utils import make_emission_from_emissivity, \
make_emission_from_array
from ..utils import (get_meta_quantity, quantify, extract_type_from_unit,
from_currsys, convert_table_comments_to_dict, find_file,
get_logger)
from .surface_utils import (make_emission_from_emissivity,
make_emission_from_array, extract_type_from_unit)
from ..utils import (get_meta_quantity, quantify, from_currsys,
convert_table_comments_to_dict, find_file, get_logger)

logger = get_logger(__name__)


@dataclass
class PoorMansSurface:
"""Solely used by SurfaceList."""

# FIXME: Use correct types instead of Any
emission: Any
throughput: Any
Expand All @@ -32,7 +32,8 @@ class PoorMansSurface:

class SpectralSurface:
"""
Initialised by a file containing one or more of the following columns:
Initialised by a file containing one or more of the following columns.
transmission, emissivity, reflection. The column wavelength must be given.
Alternatively kwargs for the above mentioned quantities can be passed as
arrays. If they are not Quantities, then a unit should also be passed with
Expand Down Expand Up @@ -114,6 +115,9 @@ def emission(self):
``PHOTLAM arcsec^-2``, even though ``arcsec^-2`` is not given.
"""
flux = self._get_array("emission")
if flux is None and "temperature" not in self.meta:
return None

if flux is not None:
wave = self._get_array("wavelength")
flux = make_emission_from_array(flux, wave, meta=self.meta)
Expand All @@ -124,17 +128,15 @@ def emission(self):
temp = quantify(temp, u.deg_C)
temp = temp.to(u.Kelvin, equivalencies=u.temperature())
flux = make_emission_from_emissivity(temp, emiss)
flux.meta["temperature"] = temp
else:
flux = None

has_solid_angle = extract_type_from_unit(flux.meta["solid_angle"],
"solid angle")[1] != u.Unit("")
if flux is not None and has_solid_angle:
conversion_factor = flux.meta["solid_angle"].to(u.arcsec ** -2)
flux = flux * conversion_factor
flux.meta["solid_angle"] = u.arcsec ** -2
flux.meta["history"].append(f"Converted to arcsec-2: {conversion_factor}")
flux.meta["history"].append(
f"Converted to arcsec-2: {conversion_factor}")

if flux is not None and "rescale_emission" in self.meta:
dic = from_currsys(self.meta["rescale_emission"], self.cmds)
Expand Down
148 changes: 98 additions & 50 deletions scopesim/optics/surface_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@
from astropy import units as u
from synphot import SourceSpectrum, BlackBody1D, Empirical1D

from ..utils import (quantify, extract_type_from_unit, extract_base_from_unit,
get_logger)
from ..utils import quantify, get_logger


logger = get_logger(__name__)


def make_emission_from_emissivity(temp, emiss_src_spec):
def make_emission_from_emissivity(temp: u.Quantity[u.K],
emiss_src_spec) -> SourceSpectrum:
"""
Create an emission SourceSpectrum using blackbody and emissivity curves.
Parameters
----------
temp : float, Quantity
[Kelvin] If float, then must be in Kelvin
temp : Quantity[Kelvin]
Blackbody temperature.
emiss_src_spec : synphot.SpectralElement
An emissivity response curve in the range [0..1]
Expand All @@ -26,23 +26,29 @@ def make_emission_from_emissivity(temp, emiss_src_spec):
flux : synphot.SourceSpectrum
"""
if isinstance(temp, u.Quantity):
temp = temp.to(u.Kelvin, equivalencies=u.temperature()).value

if emiss_src_spec is None:
logger.warning("Either emission or emissivity must be set")
flux = None
else:
flux = SourceSpectrum(BlackBody1D, temperature=temp)
flux.meta["solid_angle"] = u.sr**-1
flux = flux * emiss_src_spec
flux.meta["history"] = ["Created from Blackbody curve. Units are to be"
" understood as per steradian"]
return None

# This line is redundant for all the places we actually call this function.
# But the tests want this to work, so I'll include it. Ultimately, this is
# an internal utils function, so it should be fine to just to static type
# checking to ensure temp is always in K.
with u.set_enabled_equivalencies(u.temperature()):
temp <<= u.K

flux = SourceSpectrum(BlackBody1D, temperature=temp.value)
flux *= emiss_src_spec
flux.meta["temperature"] = temp
flux.meta["solid_angle"] = u.sr**-1
flux.meta["history"] = [
"Created from Blackbody curve. Units are per steradian",
]

return flux


def make_emission_from_array(flux, wave, meta):
def make_emission_from_array(flux, wave, meta) -> SourceSpectrum:
"""
Create an emission SourceSpectrum using an array.
Expand All @@ -63,35 +69,35 @@ def make_emission_from_array(flux, wave, meta):
"""
if not isinstance(flux, u.Quantity):
if "emission_unit" in meta:
try:
flux = quantify(flux, meta["emission_unit"])
else:
except KeyError:
logger.warning("emission_unit must be set in self.meta, "
"or emission must be an astropy.Quantity")
flux = None

if isinstance(wave, u.Quantity) and isinstance(flux, u.Quantity):
flux_unit, angle = extract_type_from_unit(flux.unit, "solid angle")
flux = flux / angle

if is_flux_binned(flux.unit):
flux = normalise_binned_flux(flux, wave)

orig_unit = flux.unit
flux = SourceSpectrum(Empirical1D, points=wave,
lookup_table=flux)
flux.meta["solid_angle"] = angle
flux.meta["history"] = [("Created from emission array with units "
f"{orig_unit}")]
else:
"or emission must be an astropy.Quantity object")
return None

if not isinstance(wave, u.Quantity):
logger.warning("wavelength and emission must be "
"astropy.Quantity py_objects")
flux = None
"astropy.Quantity objects")
return None

flux_unit, angle = extract_type_from_unit(flux.unit, "solid angle")
flux /= angle

flux = normalise_flux_if_binned(flux, wave)

orig_unit = flux.unit
flux = SourceSpectrum(Empirical1D, points=wave,
lookup_table=flux)
flux.meta["solid_angle"] = angle
flux.meta["history"] = [
f"Created from emission array with units {orig_unit}",
]

return flux


def normalise_binned_flux(flux, wave):
def normalise_flux_if_binned(flux, wave):
"""
Convert a binned flux Quantity array back into flux density.
Expand All @@ -109,32 +115,74 @@ def normalise_binned_flux(flux, wave):
flux : array-like Quantity
"""
bins = np.zeros(len(wave)) * wave.unit
if (u.bin not in flux.unit.bases and
"flux density" in str(flux.unit.physical_type)):
# not binned, return as-is
return flux

bins = np.zeros_like(wave)
# edge bins only have half the flux of other bins
bins[:-1] = 0.5 * np.diff(wave)
bins[1:] += 0.5 * np.diff(wave)
# bins[0] *= 2. # edge bins only have half the flux of other bins
# bins[-1] *= 2.

bin_unit = extract_base_from_unit(flux.unit, u.bin)[1]
flux = flux / bins / bin_unit
# TODO: Why not just do flux /= (bins / u.bin)
flux /= (bins * bin_unit)

return flux


def is_flux_binned(unit):
# moved these two here from utils, because they weren't used anywhere else
def extract_type_from_unit(unit, unit_type):
"""
Check if the (flux) unit is a binned unit.
Extract ``astropy`` physical type from a compound unit.
Parameters
----------
unit : Unit
unit : astropy.Unit
unit_type : str
The physical type of the unit as given by ``astropy``
Returns
-------
flag : bool
new_unit : Unit
The input unit minus any base units corresponding to `unit_type`.
extracted_units : Unit
Any base units corresponding to `unit_type`.
"""
extracted_units = u.Unit("")
for base, power in zip(unit.bases, unit.powers):
if unit_type == (base ** abs(power)).physical_type:
extracted_units *= base ** power

new_unit = unit / extracted_units

return new_unit, extracted_units


def extract_base_from_unit(unit, base_unit):
"""
unit = unit**1
# unit.physical_type is a string in astropy<=4.2 and a PhysicalType
# class in astropy==4.3 and thus has to be cast to a string first.
return (u.bin in unit._bases or "flux density" not in str(unit.physical_type))
Extract ``astropy`` base unit from a compound unit.
Parameters
----------
unit : astropy.Unit
base_unit : Unit, str
Returns
-------
new_unit : Unit
The input unit minus any base units corresponding to `base_unit`.
extracted_units : Unit
Any base units corresponding to `base_unit`.
"""
extracted_units = u.Unit("")
for base, power in zip(unit.bases, unit.powers):
if base == base_unit:
extracted_units *= base ** power

new_unit = unit * extracted_units ** -1

return new_unit, extracted_units
58 changes: 0 additions & 58 deletions scopesim/source/source_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,61 +172,3 @@ def make_img_wcs_header(pixel_scale, image_size):
imgwcs.wcs.cunit = [u.deg, u.deg]

return imgwcs.to_header()

# unit = extract_unit_from_imagehdu(imagehdu)
#
# per_unit_area = False
# if area is None:
# per_unit_area = True
# area = 1*u.m**2
#
# unit, sa_unit = utils.extract_type_from_unit(unit, "solid angle")
# unit = convert_flux(waverange, 1 * unit, "count", area=area)
# # [ct] comes out of convert_flux
# unit *= u.s
#
# if sa_unit != "":
# cunit1 = u.deg
# cunit2 = u.deg
# if "CUNIT1" in imagehdu.header and "CUNIT2" in imagehdu.header:
# cunit1 = u.Unit(imagehdu.header["CUNIT1"])
# cunit2 = u.Unit(imagehdu.header["CUNIT2"])
# cdelt1 = imagehdu.header["CDELT1"] * cunit1
# cdelt2 = imagehdu.header["CDELT2"] * cunit2
#
# pix_angle_area = cdelt1 * cdelt2
# unit *= (pix_angle_area * sa_unit).si.value
#
# if per_unit_area is True:
# unit *= u.m**-2
#
# zero = 0 * u.Unit(unit)
# scale = 1 * u.Unit(unit)
#
# if "BSCALE" in imagehdu.header:
# scale *= imagehdu.header["BSCALE"]
# imagehdu.header["BSCALE"] = 1
# if "BZERO" in imagehdu.header:
# zero = imagehdu.header["BZERO"]
# imagehdu.header["BZERO"] = 0
#
# imagehdu.data = imagehdu * scale + zero
# imagehdu.header["BUNIT"] = str(imagehdu.data.unit)
# imagehdu.header["FLUXUNIT"] = str(imagehdu.data.unit)
#
# return imagehdu
#
# def extract_unit_from_imagehdu(imagehdu):
# if "BUNIT" in imagehdu.header:
# unit = u.Unit(imagehdu.header["BUNIT"])
# elif "FLUXUNIT" in imagehdu.header:
# unit = u.Unit(imagehdu.header["FLUXUNIT"])
# elif isinstance(imagehdu.data, u.Quantity):
# unit = imagehdu.data.unit
# imagehdu.data = imagehdu.data.value
# else:
# unit = ""
# logger.warning("No flux unit found on ImageHDU. Please add BUNIT or "
# "FLUXUNIT to the header.")
#
# return unit
Loading

0 comments on commit 2d57b6f

Please sign in to comment.