In [None]:
import os
from du_astro_utils import calibration, photometry, utils
import matplotlib.pyplot as plt

plt.rcParams["text.usetex"] = True
import numpy as np
from astropy.io import fits
from astropy.stats import sigma_clipped_stats
from scipy.ndimage import median_filter
from tqdm import tqdm

## Reduce Images

In [None]:
dedark = False
if False:
    target_type = "galaxy_cluster"
    if target_type == "asteroid":
        data_dir = os.path.join(utils.C2PU_DATA_DIR, utils.DIR_PHOTOM, utils.DIR_ASTER)
    elif target_type == "exoplanet":
        data_dir = os.path.join(utils.C2PU_DATA_DIR, utils.DIR_PHOTOM, utils.DIR_EXPLTS)
    elif target_type == "variable_star":
        data_dir = os.path.join(utils.C2PU_DATA_DIR, utils.DIR_PHOTOM, utils.DIR_VARSTARS)
    elif target_type == "galaxy_cluster":
        data_dir = os.path.join(utils.C2PU_DATA_DIR, utils.DIR_PHOTOM, utils.DIR_GALCLUST)
    elif target_type == "cluster":
        data_dir = os.path.join(utils.C2PU_DATA_DIR, utils.DIR_PHOTOM, utils.DIR_CLUSTERS)
    print(os.listdir(data_dir))

    for iddir, ddir in enumerate(os.listdir(data_dir)):
        subdata_dir = os.path.join(data_dir, ddir)
        if os.path.isdir(subdata_dir):
            list_fits = [im for im in sorted(os.listdir(subdata_dir)) if ".fits" in im]
            list_fits = sorted(list_fits)
            print(f"{iddir} - {subdata_dir} : {len(list_fits)} files")
        else:
            print(f"{subdata_dir} : not a directory.")

    for idx in range(0, 3):
        achoice = os.listdir(data_dir)[idx]
        reduced = False
        aligned = False
        subdata_dir = os.path.join(data_dir, achoice)
        if reduced:
            subdata_dir = os.path.join(subdata_dir, "REDUCED")
        if aligned:
            subdata_dir = os.path.join(subdata_dir, "aligned")
        list_fits = [im for im in sorted(os.listdir(subdata_dir)) if ".fits" in im]
        list_fits = sorted(list_fits)

        rerun = True
        test_mode = False  # Only runs on a few images, prints the tables at each step and does not write files.
        dict_of_dict = {}

        if rerun or test_mode:
            if test_mode:
                list_fits = list_fits[:10]
            for loc, scimage in enumerate(tqdm(list_fits)):
                fits_sci_image = os.path.join(subdata_dir, scimage)
                fits_sci_image = os.path.abspath(fits_sci_image)
                if os.path.isfile(fits_sci_image):
                    # Get image directory, failename and extension
                    sc_im_dir = os.path.abspath(os.path.dirname(fits_sci_image))
                    sc_im_name, sc_im_ext = os.path.splitext(os.path.basename(fits_sci_image))

                    bias_dir, darks_dir, flats_dir = utils.get_calib_dirs_photometry(fits_sci_image)
                    # print(bias_dir, darks_dir, flats_dir)

                    # Get information from FITS header
                    sc_date, sc_scope, sc_cam, sc_filter, sc_focus, sc_expos, sc_x, sc_y = calibration.get_infos_from_image(fits_sci_image, verbose=False)
                    # print(sc_date, sc_scope, sc_cam, sc_filter, sc_expos, sc_x, sc_y)

                    # Run calibration
                    if dedark:
                        dico_calib = calibration.dedark_sci_image(fits_sci_image, override_date_check=True, max_days=7, overwrite=rerun, verbose=False, write_tmp=test_mode, overwrite_calibs=(loc == 0))
                    else:
                        dico_calib = calibration.reduce_sci_image(fits_sci_image, darks_dir, flats_dir, bias_dir, override_date_check=True, max_days=7, overwrite=rerun, verbose=False, write_tmp=test_mode, overwrite_calibs=(loc == 0))
                    dict_of_dict.update({sc_im_name: dico_calib})

__If not done, plate-solve and align within AIJ.__

## Stacking images

In [None]:
data_dir = os.path.join(utils.C2PU_RES_DIR, utils.DIR_PHOTOM, utils.DIR_GALCLUST)

In [None]:
reduced = True
aligned = True
extension = f"{'DEDARK' if dedark else 'REDUCED'}.fits"
for iddir, ddir in enumerate(os.listdir(data_dir)):
    subdata_dir = os.path.join(data_dir, ddir)
    if reduced:
        subdata_dir = os.path.join(subdata_dir, "REDUCED")
    if aligned:
        subdata_dir = os.path.join(subdata_dir, "aligned")
    if os.path.isdir(subdata_dir):
        list_fits = [im for im in sorted(os.listdir(subdata_dir)) if extension in im]
        list_fits = sorted(list_fits)
        print(f"{iddir} - {subdata_dir} : {len(list_fits)} files")
    else:
        print(f"{subdata_dir} : not a directory.")

In [None]:
dirchoice = os.listdir(data_dir)[1]
subdata_dir = os.path.join(data_dir, dirchoice)
if reduced:
    subdata_dir = os.path.join(subdata_dir, "REDUCED")
if aligned:
    subdata_dir = os.path.join(subdata_dir, "aligned")
if os.path.isdir(subdata_dir):
    list_fits = [im for im in sorted(os.listdir(subdata_dir)) if extension in im]
    list_fits = sorted(list_fits)

In [None]:
list_fits

In [None]:
do_stack = True
manual_stack = False
combtype = "MEDIAN"  # SUM ou AVERAGE ou MEDIAN

if do_stack and not manual_stack:
    center_type = "MOST"
    for combtype in ["SUM", "MEDIAN", "AVERAGE"]:
        for filt in tqdm(["g", "r", "i"]):
            outname = f"coadd_{dirchoice}_SDSS{filt}p_{'DEDARK' if dedark else 'RED'}_SW_{combtype}_{center_type}.fits"
            rel_data_dir = os.path.relpath(subdata_dir)
            if not os.path.isfile(outname):
                swarp_cmd = f"swarp {rel_data_dir}/*SDSS{filt}*{extension} -c default.swarp -IMAGEOUT_NAME {outname} -COMBINE_TYPE {combtype} -CENTER_TYPE {center_type}"
                os.system(swarp_cmd)
            if os.path.isfile(outname):
                with fits.open(outname) as hdul:
                    img_data = hdul[0].data
                mean, med, sigma = sigma_clipped_stats(img_data, sigma=3)
                plt.imshow(img_data, cmap="gray", vmin=med - 5 * sigma, vmax=med + 5 * sigma)
                plt.colorbar()
                plt.show()

In [None]:
manual_stack = True
if do_stack and manual_stack:
    for combtype in ["SUM", "MEDIAN", "AVERAGE"]:
        for filt in tqdm(["g", "r", "i"]):
            outname = f"coadd_{dirchoice}_SDSS{filt}p_{'DEDARK' if dedark else 'RED'}_np{combtype}.fits"
            SDSS_imgs = [os.path.join(subdata_dir, im) for im in list_fits if f"SDSS{filt}" in im]
            if len(SDSS_imgs) > 0:
                if not os.path.isfile(outname):
                    with fits.open(SDSS_imgs[0]) as hdul:
                        ref_hdu = hdul[0].copy()
                        SDSS_stack = np.empty((len(SDSS_imgs), *hdul[0].data.shape))
                    for loc, fits_img in enumerate(tqdm(SDSS_imgs)):
                        with fits.open(fits_img) as hdul:
                            img_data = hdul[0].data
                        SDSS_stack[loc, :, :] = img_data
                        # mean, med, sigma = sigma_clipped_stats(img_data, sigma=3)
                        # plt.imshow(img_data, cmap='gray', vmin=med-5*sigma, vmax=med+5*sigma)
                        # plt.colorbar()
                        # plt.show()
                    if combtype == "AVERAGE":
                        coadd_im = np.mean(SDSS_stack, axis=0)
                    elif combtype == "SUM":
                        coadd_im = np.sum(SDSS_stack, axis=0)
                    else:  ## defaults to median
                        coadd_im = np.median(SDSS_stack, axis=0)
                    ref_hdu.data = coadd_im
                    ref_hdu.writeto(outname, overwrite=True)
                if os.path.isfile(outname):
                    with fits.open(outname) as hdul:
                        coadd_im = hdul[0].data
                    mean, med, sigma = sigma_clipped_stats(coadd_im, sigma=3)
                    plt.imshow(coadd_im, cmap="gray", vmin=med - 5 * sigma, vmax=med + 5 * sigma)
                    plt.colorbar()
                    plt.show()

## References in PANSTARRS

In [None]:
from du_astro_utils import query_panstarrs

In [None]:
manual_stack = False
combtype = "SUM"  # SUM ou AVERAGE ou MEDIAN
dedark = False
center_type = "MOST"
refname = f"coadd_{dirchoice}_SDSSgp_{'DEDARK'if dedark else 'RED'}_np{combtype}.fits" if manual_stack else f"coadd_{dirchoice}_SDSSgp_{'DEDARK'if dedark else 'RED'}_SW_{combtype}_{center_type}.fits"
ref_ps = query_panstarrs(refname)

In [None]:
ref_ps

In [None]:
from astropy.wcs import WCS
from astropy.wcs.utils import skycoord_to_pixel
from astropy.coordinates import Angle, SkyCoord

coord_panstarrs = SkyCoord(ref_ps["RAJ2000"], ref_ps["DEJ2000"])

## Photometry

### G-band

In [None]:
from astropy.table import Table, vstack

from astropy.time import Time
import warnings
from astropy.utils.exceptions import AstropyWarning
from astropy.coordinates.name_resolve import NameResolveError

use_sextractor = True

red_sci_image = f"coadd_{dirchoice}_SDSSgp_{'DEDARK'if dedark else 'RED'}_np{combtype}.fits" if manual_stack else f"coadd_{dirchoice}_SDSSgp_{'DEDARK'if dedark else 'RED'}_SW_{combtype}_{center_type}.fits"
# red_sci_image = os.path.relpath(os.path.join(subdata_dir, 'aligned_NGC-7317_20211007T221309617_SC_SDSSgp+_0045s000_000000_REDUCED.fits'))
im_dir = os.path.abspath(os.path.dirname(red_sci_image))
im_name, im_ext = os.path.splitext(os.path.basename(red_sci_image))
with fits.open(red_sci_image) as hdul:
    hdu = hdul[0]
    wcs = WCS(hdu.header)
    epoch = Time(hdu.header.get("MJD-OBS"), format="mjd", scale="utc")
    if use_sextractor:
        sex_cmd = f"source-extractor -c default.sex {red_sci_image} -CATALOG_NAME {im_name}.cat -CATALOG_TYPE FITS_1.0 -VERBOSE_TYPE QUIET -FILTER_NAME gauss_5.0_9x9.conv"
        os.system(sex_cmd)
        cat_tab_g = Table.read(f"{im_name}.cat")
        cat_tab_g.rename_column("X_IMAGE", "xcentroid")
        cat_tab_g.rename_column("Y_IMAGE", "ycentroid")
        _, fwhm, _ = sigma_clipped_stats(cat_tab_g["FWHM_IMAGE"], sigma=3)
        source_coords_g = SkyCoord(ra=cat_tab_g["ALPHA_J2000"], dec=cat_tab_g["DELTA_J2000"], unit="deg", obstime=epoch)
    else:
        sources_g = photometry.detect_sources(red_sci_image, detection_fwhm=10, verbose=False)
        try:
            fwhm = photometry.get_fwhm(red_sci_image, sources)
        except RuntimeError:
            fwhm = 10
        cat_tab_g = photometry.apert_photometry(red_sci_image, sources_g, fwhm)
        source_coords_g = SkyCoord.from_pixel(cat_tab_g["xcenter"], cat_tab_g["xcenter"], wcs)

In [None]:
cat_tab_g

In [None]:
plt.scatter(cat_tab_g["CLASS_STAR"], cat_tab_g["ELONGATION"])
plt.ylim(0.0, 5.0)
plt.xlabel("CLASS_STAR")
plt.ylabel("ELONGATION")
plt.grid()

In [None]:
plt.hist(cat_tab_g["CLASS_STAR"], bins=50)
plt.xlabel("CLASS_STAR")

In [None]:
plt.hist(cat_tab_g["ELONGATION"], range=(1, 5), bins=50)
plt.xlabel("ELONGATION")

In [None]:
sel_stars = cat_tab_g["ELONGATION"] < 1.2
cat_stars_g = cat_tab_g[sel_stars]
star_coords_g = source_coords_g[sel_stars]

In [None]:
cat_stars_g

In [None]:
import astropy.units as u

xm_id, xm_ang_distance, _ = star_coords_g.match_to_catalog_sky(coord_panstarrs, nthneighbor=1)
if manual_stack:
    print(hdu.header.get("PIXSCALX") * fwhm)
    max_sep = hdu.header.get("PIXSCALX") * fwhm * u.arcsec
    if max_sep.value < 2.5:
        max_sep = 2.5 * u.arcsec
else:
    max_sep = 2.5 * u.arcsec
sep_constraint = xm_ang_distance < max_sep
coord_matches_g = star_coords_g[sep_constraint]
catalog_matches_g = ref_ps[xm_id[sep_constraint]]
coord_catalog_matches_g = coord_panstarrs[xm_id[sep_constraint]]

In [None]:
# Compute instrumental magnitude
if not use_sextractor:
    exptime = hdu.header.get("EXPTIME")
    ins_mag_g = -2.5 * np.log10(cat_stars_g[sep_constraint]["aper_sum_bkgsub"] / exptime)
    cat_mag_g = ref_ps["gmag"][xm_id[sep_constraint]]

    ins_err_g = ins_mag_g - -2.5 * np.log10((cat_stars_g[sep_constraint]["aper_sum_bkgsub"] + cat_stars_g[sep_constraint]["noise"]) / exptime)
    cat_err_g = ref_ps["e_gmag"][xm_id[sep_constraint]]

    cat_stars_g["ins_mag"] = 99
    cat_stars_g["ins_mag"][sep_constraint] = ins_mag_g
else:
    cat_mag_g = ref_ps["gmag"][xm_id[sep_constraint]]
    cat_err_g = ref_ps["e_gmag"][xm_id[sep_constraint]]
    ins_mag_g = cat_stars_g[sep_constraint]["MAG_AUTO"]
    ins_err_g = cat_stars_g[sep_constraint]["MAGERR_AUTO"]

In [None]:
sel = ins_mag_g < 99
plt.scatter(ins_mag_g[sel], cat_mag_g[sel])
plt.grid()

In [None]:
plt.scatter(cat_mag_g[sel], ins_mag_g[sel] - cat_mag_g[sel])
plt.grid()

In [None]:
from sklearn import linear_model

# Selection from magnitude range
mag_min, mag_max = 14, 18
cond = (cat_mag_g > mag_min) & (cat_mag_g < mag_max) & (~cat_mag_g.mask) & (~np.isnan(ins_mag_g)) & (ins_mag_g < 99)

# Create two mock arrays for linear regression
X = ins_mag_g[cond].reshape(-1, 1)
y = cat_mag_g[cond].reshape(-1, 1)


# Simple linear regression
linear = linear_model.LinearRegression()
linear.fit(X, y)


# sigma clipping pour choisir le threshold
from scipy import stats

MAD = stats.median_abs_deviation(X - y)
_, _, sig = sigma_clipped_stats(X - y)

print(MAD, sig)

# RANSAC linear regressions
ransac = linear_model.RANSACRegressor(residual_threshold=3 * MAD[0])
# ransac = linear_model.RANSACRegressor()
ransac.fit(X, y)

# Results
print("Photometric calibration:")
print(f"  Linear Slope: {linear.coef_[0][0]:.3f}")
print(f"  Linear ZP   : {linear.intercept_[0]:.3f}\n")
print(f"  RANSAC Slope: {ransac.estimator_.coef_[0][0]:.3f}")
print(f"  RANSAC ZP   : {ransac.estimator_.intercept_[0]:.3f}")

In [None]:
# Plotting regression
# Outliers and Valid points
inlier_mask = ransac.inlier_mask_
outlier_mask = np.logical_not(inlier_mask)

# Linear regressions (simple and RANSAC)
line_X = np.arange(X.min(), X.max() + 1)[:, np.newaxis]
line_y_simple = linear.predict(line_X)
line_y_ransac = ransac.predict(line_X)

fig, ax = plt.subplots(1, 2, figsize=(15, 5))

# Plot data
ax[0].scatter(X[inlier_mask], y[inlier_mask], color="yellowgreen", marker=".", label="Inliers")
ax[0].scatter(X[outlier_mask], y[outlier_mask], color="gray", marker=".", label="Outliers")

# Plot regressions
ax[0].plot(line_X, line_y_simple, color="cornflowerblue", label="Linear regressor")
ax[0].plot(line_X, line_y_ransac, color="navy", label="RANSAC regressor")

# Axes...
ax[0].legend(loc="lower right")
# ax[0].set_ylim([10,18])
ax[0].set_xlabel("Instrument magnitude")
ax[0].set_ylabel("Catalog magnitude")
ax[0].set_aspect("equal")

_, zp_median, zp_sigma = sigma_clipped_stats(y - X, sigma=3)
ax[1].scatter(y[inlier_mask], y[inlier_mask] - X[inlier_mask], color="yellowgreen", marker=".", label="Inliers")
ax[1].scatter(y[outlier_mask], y[outlier_mask] - X[outlier_mask], color="gray", marker=".", label="Outliers")
ax[1].set_xlabel("Catalog magnitude")

ax[1].axhline(zp_median, label="Median")
ax[1].axhline(zp_median + zp_sigma, linestyle="--", label="Standard deviation")
ax[1].axhline(zp_median - zp_sigma, linestyle="--")
print(f"  sigma  ZP   : {zp_sigma:.3f}")

ax[1].set_ylabel("Instrument - Catalog magnitude")
ax[1].legend(loc="best")

In [None]:
# Compute calibrated mag
cat_tab_g["AB_MAG"] = 99.0

# Positive values
if not use_sextractor:
    positive = np.where(cat_tab_g["aper_sum_bkgsub"] > 0)
    cat_tab_g["AB_MAG"][positive] = ransac.predict((-2.5 * np.log10(cat_tab_g[positive]["aper_sum_bkgsub"] / exptime)).data.reshape(-1, 1)).flatten()
else:
    positive = np.where(cat_tab_g["FLUX_AUTO"] > 0)
    cat_tab_g["AB_MAG"][positive] = ransac.predict(cat_tab_g[positive]["MAG_AUTO"].data.reshape(-1, 1)).flatten()
cat_tab_g

### R- band

In [None]:
red_sci_image = f"coadd_{dirchoice}_SDSSrp_{'DEDARK'if dedark else 'RED'}_np{combtype}.fits" if manual_stack else f"coadd_{dirchoice}_SDSSrp_{'DEDARK'if dedark else 'RED'}_SW_{combtype}_{center_type}.fits"
# red_sci_image = os.path.relpath(os.path.join(subdata_dir, 'aligned_NGC-7317_20211007T215225971_SC_SDSSrp+_0060s000_000000_REDUCED.fits'))
im_dir = os.path.abspath(os.path.dirname(red_sci_image))
im_name, im_ext = os.path.splitext(os.path.basename(red_sci_image))
with fits.open(red_sci_image) as hdul:
    hdu = hdul[0]
    wcs = WCS(hdu.header)
    epoch = Time(hdu.header.get("MJD-OBS"), format="mjd")
    if use_sextractor:
        sex_cmd = f"source-extractor -c default.sex {red_sci_image} -CATALOG_NAME {im_name}.cat -CATALOG_TYPE FITS_1.0 -VERBOSE_TYPE QUIET -FILTER_NAME gauss_5.0_9x9.conv"
        os.system(sex_cmd)
        cat_tab_r = Table.read(f"{im_name}.cat")
        cat_tab_r.rename_column("X_IMAGE", "xcentroid")
        cat_tab_r.rename_column("Y_IMAGE", "ycentroid")
        _, fwhm, _ = sigma_clipped_stats(cat_tab_r["FWHM_IMAGE"], sigma=3)
        source_coords_r = SkyCoord(ra=cat_tab_r["ALPHA_J2000"], dec=cat_tab_r["DELTA_J2000"], unit="deg", obstime=epoch)
    else:
        sources_r = photometry.detect_sources(red_sci_image, detection_fwhm=10, verbose=False)
        try:
            fwhm = photometry.get_fwhm(red_sci_image, sources_r)
        except RuntimeError:
            fwhm = 10
        cat_tab_r = photometry.apert_photometry(red_sci_image, sources_r, fwhm)
        source_coords_r = SkyCoord.from_pixel(cat_tab_r["xcenter"], cat_tab_r["xcenter"], wcs)

In [None]:
plt.scatter(cat_tab_r["CLASS_STAR"], cat_tab_r["ELONGATION"])
plt.xlabel("CLASS_STAR")
plt.ylabel("ELONGATION")
plt.grid()
plt.ylim(0.0, 5.0)

In [None]:
plt.hist(cat_tab_r["CLASS_STAR"], bins=50)
plt.xlabel("CLASS_STAR")

In [None]:
plt.hist(cat_tab_r["ELONGATION"], range=(1, 5), bins=50)
plt.xlabel("ELONGATION")

In [None]:
sel_stars = cat_tab_r["ELONGATION"] < 1.2
cat_stars_r = cat_tab_r[sel_stars]
star_coords_r = source_coords_r[sel_stars]

In [None]:
xm_id, xm_ang_distance, _ = star_coords_r.match_to_catalog_sky(coord_panstarrs, nthneighbor=1)
if manual_stack:
    print(hdu.header.get("PIXSCALX") * fwhm)
    max_sep = hdu.header.get("PIXSCALX") * fwhm * u.arcsec
else:
    max_sep = 2.5 * u.arcsec
sep_constraint = xm_ang_distance < max_sep
coord_matches_r = star_coords_r[sep_constraint]
catalog_matches_r = ref_ps[xm_id[sep_constraint]]
coord_catalog_matches_r = coord_panstarrs[xm_id[sep_constraint]]

In [None]:
# Compute instrumental magnitude
f, a = plt.subplots(1, 1)
if not use_sextractor:
    exptime = hdu.header.get("EXPTIME")
    ins_mag_r = -2.5 * np.log10(cat_stars_r[sep_constraint]["aper_sum_bkgsub"] / exptime)
    cat_mag_r = ref_ps["rmag"][xm_id[sep_constraint]]

    ins_err_r = ins_mag_r - -2.5 * np.log10((cat_stars_r[sep_constraint]["aper_sum_bkgsub"] + cat_stars_r[sep_constraint]["noise"]) / exptime)
    cat_err_r = ref_ps["e_rmag"][xm_id[sep_constraint]]

    cat_stars_r["ins_mag"] = 99
    cat_stars_r["ins_mag"][sep_constraint] = ins_mag_r
else:
    cat_mag_r = ref_ps["rmag"][xm_id[sep_constraint]]
    cat_err_r = ref_ps["e_rmag"][xm_id[sep_constraint]]
    ins_mag_r = cat_stars_r[sep_constraint]["MAG_AUTO"]
    ins_err_r = cat_stars_r[sep_constraint]["MAGERR_AUTO"]

sel = ins_mag_r < 99
a.scatter(ins_mag_r[sel], cat_mag_r[sel])
a.grid()
a.set_xlabel("Instrument magnitude")
a.set_ylabel("Catalog magnitude")
a.set_aspect("equal")
a.set_ylim((12, 20))

In [None]:
# Selection from magnitude range
mag_min, mag_max = 13, 18
cond = (cat_mag_r > mag_min) & (cat_mag_r < mag_max) & (~cat_mag_r.mask) & (~np.isnan(ins_mag_r)) & (ins_mag_r < 99)

# Create two mock arrays for linear regression
X = ins_mag_r[cond].reshape(-1, 1)
y = cat_mag_r[cond].reshape(-1, 1)


# Simple linear regression
linear = linear_model.LinearRegression()
linear.fit(X, y)


# sigma clipping pour choisir le threshold
from scipy import stats

MAD = stats.median_abs_deviation(X - y)
_, _, sig = sigma_clipped_stats(X - y)

print(MAD, sig)

# RANSAC linear regressions
ransac = linear_model.RANSACRegressor(residual_threshold=3 * MAD[0])
# ransac = linear_model.RANSACRegressor()
ransac.fit(X, y)

# Results
print("Photometric calibration:")
print(f"  Linear Slope: {linear.coef_[0][0]:.3f}")
print(f"  Linear ZP   : {linear.intercept_[0]:.3f}\n")
print(f"  RANSAC Slope: {ransac.estimator_.coef_[0][0]:.3f}")
print(f"  RANSAC ZP   : {ransac.estimator_.intercept_[0]:.3f}")

# Plotting regression
# Outliers and Valid points
inlier_mask = ransac.inlier_mask_
outlier_mask = np.logical_not(inlier_mask)

# Linear regressions (simple and RANSAC)
line_X = np.arange(X.min(), X.max() + 1)[:, np.newaxis]
line_y_simple = linear.predict(line_X)
line_y_ransac = ransac.predict(line_X)

fig, ax = plt.subplots(1, 2, figsize=(15, 5))

# Plot data
ax[0].scatter(X[inlier_mask], y[inlier_mask], color="yellowgreen", marker=".", label="Inliers")
ax[0].scatter(X[outlier_mask], y[outlier_mask], color="gray", marker=".", label="Outliers")

# Plot regressions
ax[0].plot(line_X, line_y_simple, color="cornflowerblue", label="Linear regressor")
ax[0].plot(line_X, line_y_ransac, color="navy", label="RANSAC regressor")

# Axes...
ax[0].legend(loc="lower right")
# ax[0].set_ylim([10,18])
ax[0].set_xlabel("Instrument magnitude")
ax[0].set_ylabel("Catalog magnitude")
ax[0].set_aspect("equal")

_, zp_median, zp_sigma = sigma_clipped_stats(y - X, sigma=3)
ax[1].scatter(y[inlier_mask], y[inlier_mask] - X[inlier_mask], color="yellowgreen", marker=".", label="Inliers")
ax[1].scatter(y[outlier_mask], y[outlier_mask] - X[outlier_mask], color="gray", marker=".", label="Outliers")
ax[1].set_xlabel("Catalog magnitude")

ax[1].axhline(zp_median, label="Median")
ax[1].axhline(zp_median + zp_sigma, linestyle="--", label="Standard deviation")
ax[1].axhline(zp_median - zp_sigma, linestyle="--")
print(f"  sigma  ZP   : {zp_sigma:.3f}")

ax[1].set_ylabel("Instrument - Catalog magnitude")
ax[1].legend(loc="best")

In [None]:
# Compute calibrated mag
cat_tab_r["AB_MAG"] = 99.0

# Positive values
if not use_sextractor:
    positive = np.where(cat_tab_r["aper_sum_bkgsub"] > 0)
    cat_tab_r["AB_MAG"][positive] = ransac.predict((-2.5 * np.log10(cat_tab_r[positive]["aper_sum_bkgsub"] / exptime)).data.reshape(-1, 1)).flatten()
else:
    positive = np.where(cat_tab_r["FLUX_AUTO"] > 0)
    cat_tab_r["AB_MAG"][positive] = ransac.predict(cat_tab_r[positive]["MAG_AUTO"].data.reshape(-1, 1)).flatten()
cat_tab_r

## Cross-match R et G

In [None]:
xm_id, xm_ang_distance, _ = source_coords_r.match_to_catalog_sky(source_coords_g, nthneighbor=1)
if manual_stack:
    print(hdu.header.get("PIXSCALX") * fwhm)
    max_sep = hdu.header.get("PIXSCALX") * fwhm * u.arcsec
else:
    max_sep = 2.5 * u.arcsec
sep_constraint = xm_ang_distance < max_sep
coord_matches = source_coords_r[sep_constraint]
catalog_matches = cat_tab_g[xm_id[sep_constraint]]
coord_catalog_matches = source_coords_g[xm_id[sep_constraint]]

In [None]:
g_cat = catalog_matches
r_cat = cat_tab_r[sep_constraint]

sel = np.logical_and(g_cat["AB_MAG"] < 99.0, r_cat["AB_MAG"] < 99.0)
plt.scatter(g_cat[sel]["AB_MAG"] - r_cat[sel]["AB_MAG"], g_cat[sel]["AB_MAG"], marker=".")

On a ici un mélange d'étoiles et de galaxies. Il faut construire le catalogue de galaxies pour la suite.

In [None]:
plt.scatter(g_cat["CLASS_STAR"], g_cat["ELONGATION"], marker=".")
plt.scatter(r_cat["CLASS_STAR"], r_cat["ELONGATION"], marker="x", alpha=0.5)
plt.ylim(0.0, 5.0)
plt.xlabel("CLASS_STAR")
plt.ylabel("ELONGATION")
plt.grid()

In [None]:
plt.hist(g_cat["CLASS_STAR"], bins=50, alpha=0.5)
plt.hist(r_cat["CLASS_STAR"], bins=50, alpha=0.5)
plt.xlabel("CLASS_STAR")

In [None]:
plt.hist(g_cat["ELONGATION"], range=(1, 5), bins=50, alpha=0.5)
plt.hist(r_cat["ELONGATION"], range=(1, 5), bins=50, alpha=0.5)
plt.xlabel("ELONGATION")

In [None]:
joined_cat = g_cat.copy()

In [None]:
joined_cat.columns

In [None]:
joined_cat.rename_column("FLUX_AUTO", "FLUX_AUTO_G")
joined_cat.rename_column("FLUXERR_AUTO", "FLUXERR_AUTO_G")
joined_cat.rename_column("MAG_AUTO", "MAG_AUTO_G")
joined_cat.rename_column("MAGERR_AUTO", "MAGERR_AUTO_G")
joined_cat.rename_column("AB_MAG", "AB_MAG_G")

In [None]:
joined_cat["FLUX_AUTO_R"] = r_cat["FLUX_AUTO"]
joined_cat["FLUXERR_AUTO_R"] = r_cat["FLUXERR_AUTO"]
joined_cat["MAG_AUTO_R"] = r_cat["MAG_AUTO"]
joined_cat["MAGERR_AUTO_R"] = r_cat["MAGERR_AUTO"]

In [None]:
joined_cat["AB_MAG_R"] = r_cat["AB_MAG"]

In [None]:
joined_cat

In [None]:
sel_gal = np.logical_and(joined_cat["CLASS_STAR"] <= 0.6, joined_cat["ELONGATION"] > 1.15)
gal_cat = joined_cat[sel_gal]
sel = np.logical_and(gal_cat["AB_MAG_G"] < 99.0, gal_cat["AB_MAG_R"] < 99.0)
gal_cat = gal_cat[sel]

In [None]:
gal_cat

In [None]:
plt.scatter(gal_cat["AB_MAG_G"] - gal_cat["AB_MAG_R"], gal_cat["AB_MAG_G"])
plt.xlabel("G-R")
plt.ylabel("G")

In [None]:
plt.hist(gal_cat["AB_MAG_G"] - gal_cat["AB_MAG_R"])
plt.xlabel("G-R")

In [None]:
cat_name = f"galaxies_{dirchoice}_G_R_{'DEDARK'if dedark else 'RED'}_np{combtype}.fits" if manual_stack else f"galaxies_{dirchoice}_G_R_{'DEDARK'if dedark else 'RED'}_SW_{combtype}_{center_type}.fits"
gal_cat.write(cat_name, format="fits", overwrite=True)

In [None]:
gimage = f"coadd_{dirchoice}_SDSSgp_{'DEDARK'if dedark else 'RED'}_np{combtype}.fits" if manual_stack else f"coadd_{dirchoice}_SDSSgp_{'DEDARK'if dedark else 'RED'}_SW_{combtype}_{center_type}.fits"
with fits.open(gimage) as stack_gp:
    hdr = stack_gp[0].header
    data = stack_gp[0].data
mean, med, sigma = sigma_clipped_stats(data, sigma=3)
plt.imshow(data, cmap="gray", vmin=med - 5 * sigma, vmax=med + 5 * sigma)
plt.colorbar()
plt.scatter(gal_cat["xcentroid"], gal_cat["ycentroid"], color="y", alpha=0.3)