In [1]:
"""
Identifier:     csst_mci_astrometry/astrometry.py
Name:           astrometry.py
Description:    astrometry code for MCI
Author:         Xiyan Peng
Created:        2023-11-25
Modified-History:
    2023-11-14, Xiyan Peng, created
    2023-12-29, Xiyan Peng, modified
    2023-12-11, Xiyan Peng, add the docstring
"""
import os
import numpy as np

from astropy import units as u
from astropy.table import Table, Column, MaskedColumn
from astropy.time import Time, TimeDelta
from astropy.coordinates import SkyCoord, Distance, GCRS, CartesianRepresentation
from astropy.wcs import WCS
from astropy.io import fits, ascii
from astropy.stats import sigma_clip
from astroquery.vizier import Vizier

from csst_mci_common.data_manager import CsstMCIDataManager
from csst_mci_common.logger import get_logger

import logging
import warnings
from multiprocessing import Process, Manager, Pool
import joblib

In [2]:
from csst_mci_common.common import CsstMCIInit
from csst_mci_common.status import CsstStatus, CsstResult
csst_mci = CsstMCIInit()
dm = csst_mci.dm

In [3]:
l0_sci_c1_path = '/nfsdata/share/pipeline-unittest/csst_mci/data/dfs_dummy/L0/sim_ver20230725/20100000001/CSST_MCI_C1_EXDF_20230918003941_20230918004441_20100000001_07_L0_V01.fits'
csst_mci.config.set_mci_config("sci_c1_path", l0_sci_c1_path)
csst_mci.config.get_mci_config('sci_c1_path')

from csst_mci_common.config_dfs import env_dfs_set_ins_naoc
from csst_mci_common.config_dfs import env_dfs_set_ast_naoc


env_dfs_set_ins_naoc()
env_dfs_set_ast_naoc()

print(csst_mci.config.get_mci_config('obsid'))
print(csst_mci.config.get_mci_config('img_path'))
print(csst_mci.config.get_mci_config('wht_path'))
print(csst_mci.config.get_mci_config('head_path'))
print(csst_mci.config.get_mci_config('flag_path'))

20100000001
/nfsdata/share/pipeline-unittest/csst_mci/data/dfs_dummy/L1/sim_ver20230725/20100000001/ins_output/CSST_MCI_C1_EXDF_20230918003941_20230918004441_20100000001_07_L1_V01_img.fits
/nfsdata/share/pipeline-unittest/csst_mci/data/dfs_dummy/L1/sim_ver20230725/20100000001/ins_output/CSST_MCI_C1_EXDF_20230918003941_20230918004441_20100000001_07_L1_V01_wht.fits
/nfsdata/share/pipeline-unittest/csst_mci/data/dfs_dummy/L1/sim_ver20230725/20100000001/ins_output/CSST_MCI_C1_EXDF_20230918003941_20230918004441_20100000001_07_L1_V01_img.head
/nfsdata/share/pipeline-unittest/csst_mci/data/dfs_dummy/L1/sim_ver20230725/20100000001/ins_output/CSST_MCI_C1_EXDF_20230918003941_20230918004441_20100000001_07_L1_V01_flg.fits


### 函数定义

In [4]:
def write_wcs_head(image_path: str, head_output: str):
    """
    Write the WCS head.

    Write the new WCS head from *img.fits.

    Parameters
    ----------
    image_path : str
        The image to be processed, *img.fits.
    head_output : str
        A new head.
    Returns
    -------
    None

    Examples
    --------
    >>> write_wcs_head(fp_img, fp_head_ast)
    """

    if os.path.splitext(image_path)[-1] == ".fits":
        header = fits.getheader(image_path, 1)
        newheader = fits.Header()

        #        del newheader[3:22]
        newheader["STA_AST"] = (1, "Completion degree of MCI astrometric solution")
        newheader["VER_AST"] = ("v2023.01", "Version of MCI Astrometry soft")
        newheader["STM_AST"] = ("", "Time of last MCI Astrometry")

        newheader["EQUINOX"] = (2000.00000000, "Mean equinox")
        newheader["RADESYS"] = ("ICRS    ", "Astrometric system")
        newheader["CTYPE1"] = ("RA---TPV", "WCS projection type for this axis")
        newheader["CTYPE2"] = ("DEC--TPV", "WCS projection type for this axis")
        newheader["CUNIT1"] = ("deg     ", "Axis unit")
        newheader["CUNIT2"] = ("deg     ", "Axis unit")
        newheader["CRVAL1"] = (header["CRVAL1"], "World coordinate on this axis")
        newheader["CRVAL2"] = (header["CRVAL2"], "World coordinate on this axis")
        newheader["CRPIX1"] = (header["CRPIX1"], "Reference pixel on this axis")
        newheader["CRPIX2"] = (header["CRPIX2"], "Reference pixel on this axis")

        newheader["CD1_1"] = (header["CD1_1"], "Linear projection matrix")
        newheader["CD1_2"] = (header["CD1_2"], "Linear projection matrix")
        newheader["CD2_1"] = (header["CD2_1"], "Linear projection matrix")
        newheader["CD2_2"] = (header["CD2_2"], "Linear projection matrix")
        newheader["PV1_0"] = (0, "Projection distortion parameter")
        newheader["PV1_1"] = (1, "Projection distortion parameter")
        newheader["PV1_2"] = (0, "Projection distortion parameter")
        newheader["PV1_4"] = (0, "Projection distortion parameter")
        newheader["PV1_5"] = (0, "Projection distortion parameter")
        newheader["PV1_6"] = (0, "Projection distortion parameter")
        newheader["PV1_7"] = (0, "Projection distortion parameter")
        newheader["PV1_8"] = (0, "Projection distortion parameter")
        newheader["PV1_9"] = (0, "Projection distortion parameter")
        newheader["PV1_10"] = (0, "Projection distortion parameter")
        newheader["PV1_12"] = (0, "Projection distortion parameter")
        newheader["PV1_13"] = (0, "Projection distortion parameter")
        newheader["PV1_14"] = (0, "Projection distortion parameter")
        newheader["PV1_15"] = (0, "Projection distortion parameter")
        newheader["PV1_16"] = (0, "Projection distortion parameter")
        newheader["PV2_0"] = (0, "Projection distortion parameter")
        newheader["PV2_1"] = (1, "Projection distortion parameter")
        newheader["PV2_2"] = (0, "Projection distortion parameter")
        newheader["PV2_4"] = (0, "Projection distortion parameter")
        newheader["PV2_5"] = (0, "Projection distortion parameter")
        newheader["PV2_6"] = (0, "Projection distortion parameter")
        newheader["PV2_7"] = (0, "Projection distortion parameter")
        newheader["PV2_8"] = (0, "Projection distortion parameter")
        newheader["PV2_9"] = (0, "Projection distortion parameter")
        newheader["PV2_10"] = (0, "Projection distortion parameter")
        newheader["PV2_12"] = (0, "Projection distortion parameter")
        newheader["PV2_13"] = (0, "Projection distortion parameter")
        newheader["PV2_14"] = (0, "Projection distortion parameter")
        newheader["PV2_15"] = (0, "Projection distortion parameter")
        newheader["PV2_16"] = (0, "Projection distortion parameter")

        newheader["ASTIRMS1"] = (-9999, "Astrom. dispersion RMS (intern., high S/N)")
        newheader["ASTIRMS2"] = (-9999, "Astrom. dispersion RMS (intern., high S/N)")
        newheader["ASTRRMS1"] = (-9999, "Astrom. dispersion RMS (ref., high S/N)")
        newheader["ASTRRMS2"] = (-9999, "Astrom. dispersion RMS (ref., high S/N)")

        #        newheader['STA_CCRS'] = (
        #            1, "Completion degree of relative astrometric solution in MCI")
        #        newheader['VER_CCRS'] = (
        #            "v2023.01", "Version of CSST relative Astrometry soft in MCI")
        #        newheader['STM_CCRS'] = ("", "Time of last CSST Astrometry in MCI")

        newheader["ASTGATE"] = (" ", "Camera shutter information")
        newheader["ASTCONF"] = (" ", "Configuration file for astrometry")
        newheader["ASTSIM"] = ("normal", "Image classification for MCI Astrometry")
        newheader["GCRSREF"] = (
            "Gaia dr3 v01",
            "Reference Catalogue for MCI Astrometry",
        )
        newheader["ASTHIS"] = (1, "Astrometric solution Record for MCI Astrometry")
        newheader["DELT_RA"] = (-9999, "Change in central RA")
        newheader["DELT_dec"] = (-9999, "Change in central DEC")
        newheader["DELT_ps"] = (-9999, "Change in pixelscale")
        empty_primary = fits.PrimaryHDU(header=newheader)
        empty_primary.writeto(head_output, overwrite=True)


def work_sext(
    image_path: str,
    config_path: str,
    out_path: str,
    out_para_path: str,
    filter_path: str,
):
    """
    Do astrometry by source-extractor.

    Do astrometry by source-extractor, the basic use of source-extractor is 'source-extractor  -c ' + fp_config + fp_image'

    Parameters
    ----------
    image_path : str
        The image fits to be processed, *.fits.
    config_path : str
        The config file of source-extractor, *.sex.
    out_path : str
        The name of the output catalog.
    out_para_path : str
        The parameters of the output catalog, *.param.
    filter_path : str
        The filter of the image, *.conv.

    Returns
    -------
    None

    Examples
    -------
    >>> work_sext(fp_img,fp_config_sext,fp_out_sext,fp_para_sext,fp_filter_sext)
    """
    fp_image = image_path
    fp_config = config_path
    fp_out = out_path
    fp_para = out_para_path
    fp_filter = filter_path

    cmd_sex = (
        "sex  -c "
        + fp_config + " "
        + fp_image
        + " -CATALOG_NAME "
        + fp_out
        + " -PARAMETERS_NAME "
        + fp_para
        + " -FILTER_NAME "
        + fp_filter
    )

    os.system(cmd_sex)


def createxy_upwcs(sext_out_path: str, sext_out_path_up: str):
    """
    Update the fits of source-extractor.

    Update the fits of source-extractor, the part to be updated is data.

    Parameters
    ----------
    sext_out_path : str
        The result from source-extractor, but the type is LDAC *.fits.
    sext_out_path_up : str
        The update fits, *.fits.

    Returns
    -------
    tdata
        The updated data of the fits.

    Examples
    -------
    >>> stardata = createxy_upwcs(fp_out_sext, fp_out_sext_up)
    """
    fp_out = sext_out_path
    fp_out_up = sext_out_path_up

    # filter data from the output fits from source-extractor
    hdulist = fits.open(fp_out, mode="update")
    ptdata1 = hdulist[2].data  # LDAC
    ptdata1["FLUXERR_AUTO"] = ptdata1["FLUX_AUTO"] / 100
    tdata = ptdata1

    region = (tdata["AWIN_IMAGE"] < 1.2) & (tdata["AWIN_IMAGE"] > 0.2)
    tdata = tdata[region]
    if len(tdata["ELLIPTICITY"]) > 800:
        temp = np.sort(tdata, order=["FLUX_AUTO"], axis=0)
        tdata = temp[-150:-1]

    # write filtered data to file, *.reg.
    filesatreg = fp_out + ".reg"
    os.system("rm " + filesatreg)
    with open(filesatreg, "a") as f3:
        f3.write(
            'global color=red dashlist=8 3 width=1 font="helvetica 10 normal roman" '
            "select=1 highlite=1 dash=0 fixed=0 edit=1 move=1 delete=1 include=1 source=1"
            + "\n"
        )
        f3.write("physical" + "\n")
        for kk in range(len(tdata)):
            f3.write(
                "circle("
                + str("%12.6f" % tdata["X_IMAGE"][kk])
                + ",  "
                + str("%12.6f" % tdata["Y_IMAGE"][kk])
                + ","
                + str(4)
                + ")"
                + "\n"
            )

    try:
        assert (len(tdata) > 30) and (len(tdata) < 30000)
    except AssertionError:
        raise AssertionError("too few stars or too much stars ")

    # update the output fits from source-extractor, only change the data part (hdulist[2].data).
    hdulist[2].data = tdata
    hdulist.flush()
    hdulist.writeto(fp_out_up, overwrite="True")

    return tdata


def work_scamp(
    fp_sext_out: str,
    fp_refcat: str,
    fp_config_para: str,
    fp_scamp_head: str,
    fp_up_head: str,
):
    """
    Do astrometry by scamp.

    Do astrometry by scamp from the result of source-extractor.

    Parameters
    ----------
    fp_sext_out : str
        Fits name of result of source-extractor.
    fp_refcat : str
        Gaia reference catalog.
    fp_config_para : str
        Scamp config file.
    fp_scamp_head : str
        Scamp result head file.
    fp_up_head : str
        The aheader suffix file.

    Returns
    -------
    None

    Examples
    -------
    >>> work_scamp(fp_sext_out, fp_refcat, fp_config_para, fp_scamp_head, fp_up_head)
    """

    cmd_scamp = (
        " scamp "
        + fp_sext_out
        + " -c "
        + fp_config_para
        + " -ASTREFCAT_NAME "
        + fp_refcat
        + " -AHEADER_SUFFIX "
        + fp_up_head
    )
    os.system(cmd_scamp)

    try:
        assert os.access(fp_scamp_head, os.F_OK)
    except AssertionError:
        raise AssertionError("no scamp result head file")


def rewrite_wcs_head(scamp_head_path: str, scamp_head_bk_path: str):
    """
    Rewrite the WCS head.

    Rewrite the WCS head from Scamp to the new fits header.

    Parameters
    ----------
    scamp_head_path : str
        Scamp head file name.
    scamp_head_bk_path : str
        Rewrited scamp head file name.

    Returns
    -------
    None

    Examples
    --------
    >>> rewrite_wcs_head(fp_head_scamp, fp_head_bk_scamp)
    """

    fp_head = scamp_head_path
    fp_head_bk = scamp_head_bk_path

    if os.path.splitext(fp_head)[-1] == ".head":
        f = open(fp_head, "r")
        f_bk = open(fp_head_bk, "w")
        a = ""
        i = 0
        for v in f.readlines():
            sp = ""
            asp = ""
            i += 1
            if len(v) <= 81:
                sp = " " * (81 - len(v))
            if "Sorbonne" in v:
                v = "COMMENT   (c) 2010-2018 Sorbonne Universite/Universite de Bordeaux/CNRS"
            if "END" in v:
                asp = " " * 80 * (36 - i % 36)
                i = i + (36 - i % 36)
                # print(i)
            a = a + v + sp + asp
        f_bk.write(a.replace("\n", ""))
        f_bk.close()
        f.close()


def check_astrometry(w: WCS, stardata: fits.fitsrec.FITS_rec, refcat_now: Table):
    """
    Check_astrometry.

    Check_astrometry.

    Parameters
    ----------
    w : WCS
        Scamp head file name.
    stardata : fits.fitsrec.FITS_rec
        The filtered data of the source-extractor fits.
    refcat_now : Table
        Rewrited scamp head file name.

    Returns
    -------
    meanra : float
        Mean of R.A.
    meande : float
        Mean of Declination.
    stdra : float
        Std of R.A.
    stdde : float
        Std of Declination.
    match_num : int
        Number of matched stars.

    Examples
    --------
    >>> check_astrometry(w, stardata, refcat_now)
    """

    coeff0 = coeff01 = 99999
    cstd = cstd1 = 99999
    match_num = 0
    obsc = w.pixel_to_world(stardata["XWIN_IMAGE"] - 1, stardata["YWIN_IMAGE"] - 1)
    mask = abs(refcat_now["X_WORLD"]) > 0  # and (abs( refdata['Y_WORLD'] >0 ))
    refdata = refcat_now[mask]
    gaia_ra = refdata["X_WORLD"]
    gaia_dec = refdata["Y_WORLD"]

    refc = SkyCoord(ra=gaia_ra * u.deg, dec=gaia_dec * u.deg)
    idx, d2d, d3d = obsc.match_to_catalog_sky(refc)
    ref_uid = np.unique(idx)
    obs_uid = np.full_like(ref_uid, -1)
    tmpj = -1
    ccdraoff_med = ccddecoff_med = ccdra_rms = ccddec_rms = -1
    for i in ref_uid:
        tmpj = tmpj + 1
        iid = idx == i
        iiid = d2d.deg[iid] == d2d.deg[iid].min()
        obs_uid[tmpj] = iid.nonzero()[0][iiid.nonzero()[0]][0]

    uidlim = d2d[obs_uid].arcsecond < 1
    if uidlim.sum() > 0:
        obs_uidlim = obs_uid[uidlim]
        ref_uidlim = ref_uid[uidlim]
        obsm1 = obsc[obs_uidlim].ra.deg
        refm1 = refc[ref_uidlim].ra.deg
        obsmd1 = obsc[obs_uidlim].dec.deg
        refmd1 = refc[ref_uidlim].dec.deg
        obsm = obsc[obs_uidlim].ra.arcsec * np.cos(
            obsc[obs_uidlim].dec.deg * np.pi / 180
        )
        refm = refc[ref_uidlim].ra.arcsec * np.cos(
            obsc[obs_uidlim].dec.deg * np.pi / 180
        )

        deltara = obsm - refm

        obsmd = obsc[obs_uidlim].dec.arcsec
        refmd = refc[ref_uidlim].dec.arcsec
        deltadec = obsmd - refmd

        clip = sigma_clip((refm - obsm), sigma=3)
        coeff0 = np.median(clip.data[~clip.mask])
        cstd = np.std(clip.data[~clip.mask])
        cobsm = obsm[~clip.mask]
        crefm = refm[~clip.mask]
        clip1 = sigma_clip(refmd - obsmd, sigma=3)
        coeff01 = np.median(clip1.data[~clip1.mask])
        cstd1 = np.std(clip1.data[~clip1.mask])
        cobsmd = obsmd[~clip1.mask]
        crefmd = refmd[~clip1.mask]
        match_num = len(obs_uidlim)

    return coeff0, coeff01, cstd, cstd1, match_num


def saveresult(ast_head_path, meanra, meande, stdra, stdde, match_num):
    """
    Write the standard fits header.

    Write the standard fits header from the result of scamp and astrometry check.

    Parameters
    ----------
    ast_head_path : str
        Path of ast fits file.
    meanra : float
        Mean of R.A.
    meande : float
        Mean of Declination.
    stdra : float
        Std of R.A.
    stdde : float
        Std of Declination.
    match_num : int
        Number of matched stars.

    Returns
    -------
    None

    Examples
    --------
    >>> saveresult(fp_head_ast, meanra, meande, stdra, stdde, match_num)
    """
    fp_ast_head = ast_head_path
    if match_num > 0:
        header = fits.getheader(fp_ast_head, ignore_missing_simple=True)

        newheader = fits.Header()
        #        del newheader[3:22]
        newheader["STA_AST"] = (0, "Completion degree of MCI astrometric solution")
        newheader["VER_AST"] = ("v2023.01", "Version of MCI Astrometry soft")
        newheader["STM_AST"] = ("", "Time of last MCI Astrometry")

        newheader["EQUINOX"] = (2000.00000000, "Mean equinox")
        newheader["RADESYS"] = ("ICRS    ", "Astrometric system")
        newheader["CTYPE1"] = ("RA---TPV", "WCS projection type for this axis")
        newheader["CTYPE2"] = ("DEC--TPV", "WCS projection type for this axis")
        newheader["CUNIT1"] = ("deg     ", "Axis unit")
        newheader["CUNIT2"] = ("deg     ", "Axis unit")
        newheader["CRVAL1"] = (header["CRVAL1"], "World coordinate on this axis")
        newheader["CRVAL2"] = (header["CRVAL2"], "World coordinate on this axis")
        newheader["CRPIX1"] = (header["CRPIX1"], "Reference pixel on this axis")
        newheader["CRPIX2"] = (header["CRPIX2"], "Reference pixel on this axis")

        newheader["CD1_1"] = (header["CD1_1"], "Linear projection matrix")
        newheader["CD1_2"] = (header["CD1_2"], "Linear projection matrix")
        newheader["CD2_1"] = (header["CD2_1"], "Linear projection matrix")
        newheader["CD2_2"] = (header["CD2_2"], "Linear projection matrix")
        newheader["PV1_0"] = (header["PV1_0"], "Projection distortion parameter")
        newheader["PV1_1"] = (header["PV1_1"], "Projection distortion parameter")
        newheader["PV1_2"] = (header["PV1_2"], "Projection distortion parameter")
        newheader["PV1_4"] = (header["PV1_4"], "Projection distortion parameter")
        newheader["PV1_5"] = (header["PV1_5"], "Projection distortion parameter")
        newheader["PV1_6"] = (header["PV1_6"], "Projection distortion parameter")
        newheader["PV1_7"] = (header["PV1_7"], "Projection distortion parameter")
        newheader["PV1_8"] = (header["PV1_8"], "Projection distortion parameter")
        newheader["PV1_9"] = (header["PV1_9"], "Projection distortion parameter")
        newheader["PV1_10"] = (header["PV1_10"], "Projection distortion parameter")
        newheader["PV1_12"] = (header["PV1_12"], "Projection distortion parameter")
        newheader["PV1_13"] = (header["PV1_13"], "Projection distortion parameter")
        newheader["PV1_14"] = (header["PV1_14"], "Projection distortion parameter")
        newheader["PV1_15"] = (header["PV1_15"], "Projection distortion parameter")
        newheader["PV1_16"] = (header["PV1_16"], "Projection distortion parameter")
        newheader["PV2_0"] = (header["PV2_0"], "Projection distortion parameter")
        newheader["PV2_1"] = (header["PV2_1"], "Projection distortion parameter")
        newheader["PV2_2"] = (header["PV2_2"], "Projection distortion parameter")
        newheader["PV2_4"] = (header["PV2_4"], "Projection distortion parameter")
        newheader["PV2_5"] = (header["PV2_5"], "Projection distortion parameter")
        newheader["PV2_6"] = (header["PV2_6"], "Projection distortion parameter")
        newheader["PV2_7"] = (header["PV2_7"], "Projection distortion parameter")
        newheader["PV2_8"] = (header["PV2_8"], "Projection distortion parameter")
        newheader["PV2_9"] = (header["PV2_9"], "Projection distortion parameter")
        newheader["PV2_10"] = (header["PV2_10"], "Projection distortion parameter")
        newheader["PV2_12"] = (header["PV2_12"], "Projection distortion parameter")
        newheader["PV2_13"] = (header["PV2_13"], "Projection distortion parameter")
        newheader["PV2_14"] = (header["PV2_14"], "Projection distortion parameter")
        newheader["PV2_15"] = (header["PV2_15"], "Projection distortion parameter")
        newheader["PV2_16"] = (header["PV2_16"], "Projection distortion parameter")

        newheader["ASTIRMS1"] = (stdra, "Astrom. dispersion RMS (intern., high S/N)")
        newheader["ASTIRMS2"] = (stdde, "Astrom. dispersion RMS (intern., high S/N)")
        newheader["ASTRRMS1"] = (stdra, "Astrom. dispersion RMS (ref., high S/N)")
        newheader["ASTRRMS2"] = (stdde, "Astrom. dispersion RMS (ref., high S/N)")

        #        newheader['STA_CCRS'] = (
        #            1, "Completion degree of relative astrometric solution in MCI")
        #        newheader['VER_CCRS'] = (
        #            "v2023.01", "Version of CSST relative Astrometry soft in MCI")
        #        newheader['STM_CCRS'] = ("", "Time of last CSST Astrometry in MCI")

        newheader["ASTGATE"] = (" ", "Camera shutter information")
        newheader["ASTCONF"] = (" ", "Configuration file for astrometry")
        newheader["ASTSIM"] = ("normal", "Image classification for MCI Astrometry")
        newheader["GCRSREF"] = (
            "Gaia dr3 v01",
            "Reference Catalogue for MCI Astrometry",
        )
        newheader["ASTHIS"] = (1, "Astrometric solution Record for MCI Astrometry")
        newheader["DELT_RA"] = (meanra, "Change in central RA")
        newheader["DELT_dec"] = (meande, "Change in central DEC")
        newheader["DELT_ps"] = (0, "Change in pixelscale")
        empty_primary = fits.PrimaryHDU(header=newheader)
        empty_primary.writeto(fp_ast_head, overwrite=True)


def run_one_frame(
    refcat_now: Table,
    refcat_path: str,
    img_path: str,
    ast_head_path: str,
    sext_config_path: str,
    sext_out_path: str,
    sext_out_path_up: str,
    sext_para_path: str,
    sext_filter_path: str,
    scamp_config_path: str,
    scamp_head_path: str,
    scamp_head_suffix_path: str,
    scamp_head_bk_path: str,
    logger: get_logger = None,
):
    """
    Fit wcs for one chip.

    This function calculates and save the pv parameters of single chip CSST data.

    Parameters
    ----------

    refcat_now : Table
        The reference catalog.
    use_dfs : bool
        If Ture, using DFS api, if False, using gaia_query.
    logger : logging.Logger
        This is the log.

    Returns
    -------
    CsstStatus
        The status of run.

    Examples
    --------
    >>> run_one_frame(refcat_now, refcat_path, img_path, ast_head_path, sext_config_path, sext_out_path, sext_out_path_up, sext_para_path, sext_filter_path, scamp_config_path, scamp_head_path, scamp_head_suffix_path, scamp_head_bk_path, logger)
    """

    # set default logger
    if logger is None:
        logger = get_logger()

    logger.info("Begin of one frame.")

    # write the WCS head from *img.fits.
    fp_img = img_path
    fp_head_ast = ast_head_path
    write_wcs_head(fp_img, fp_head_ast)

    # run source-extractor for the image fits.
    fp_config_sext = sext_config_path
    fp_out_sext = sext_out_path
    fp_para_sext = sext_para_path
    fp_filter_sext = sext_filter_path

    work_sext(fp_img, fp_config_sext, fp_out_sext, fp_para_sext, fp_filter_sext)

    # update the fits of source-extractor
    fp_out_sext_up = sext_out_path_up
    stardata = createxy_upwcs(fp_out_sext, fp_out_sext_up)

    # run scamp for the result from source-extractor.
    fp_refcat = refcat_path
    fp_config_scamp = scamp_config_path
    fp_head_scamp = scamp_head_path
    fp_head_suffix_scamp = scamp_head_suffix_path
    work_scamp(
        fp_out_sext_up, fp_refcat, fp_config_scamp, fp_head_scamp, fp_head_suffix_scamp
    )

    # rewrite the scamp WCS head
    fp_head_bk_scamp = scamp_head_bk_path
    rewrite_wcs_head(fp_head_scamp, fp_head_bk_scamp)

    #
    wcshdr = fits.getheader(fp_head_bk_scamp, ignore_missing_simple=True)
    if not wcshdr.get("PV1_16"):
        print("Warning: astrometry failure in this chip. Exiting...")
        os.system(" rm " + fp_head_bk_scamp)
    else:
        os.system("cp " + fp_head_bk_scamp + " " + fp_head_ast)

    wcshdr["CTYPE1"] = "RA---TPV"
    wcshdr["CTYPE2"] = "DEC--TPV"
    w = WCS(wcshdr)
    meanra, meande, stdra, stdde, match_num = check_astrometry(w, stardata, refcat_now)

    logger.info("################# astrometry first result: ##############")
    logger.info(" The first step: check match ")
    if match_num < 6:
        logger.info(" match_num  less than 10   bad astrometry")
        return CsstStatus.ERROR
    else:
        logger.info(" check error")
        print(meanra, meande, stdra, stdde, match_num)
        if (
            meanra * 1000 < 1000
            and meande * 1000 < 1000
            and stdra * 1000.0 < 3000
            and stdde * 1000 < 3000
        ):
            logger.info(" good result for " + str(fp_img))
            logger.info(
                "median ra_off,   dec_off (mas) from scamp:"
                + str(meanra * 1000.0)
                + "  "
                + str(meande * 1000.0)
            )
            logger.info(
                "rms ra_off,   dec_off (mas) from scamp:"
                + str(stdra * 1000.0)
                + "  "
                + str(stdde * 1000.0)
            )

        else:
            logger.error("step3 big error")
            return CsstStatus.ERROR

    saveresult(fp_head_ast, meanra, meande, stdra, stdde, match_num)
    logger.info("The fitting ran smoothly.")
    return CsstStatus.PERFECT


def run_mci_astrometry(
    img_path: str, refcat_path: str, target_detectors: list, logger: get_logger = None
):
    """
    Fit wcs for more than one chips.

    This function calculates and save the pv parameters of many chips of CSST data.

    Parameters
    ----------
    logger : logging.Logger
        This is the log.

    Returns
    -------
    CsstStatus
        The status of run.

    Examples
    --------
    >>> result = run_mci_astrometry(logger=None)
    """

    # set default logger
    if logger is None:
        logger = get_logger()

    # get information of input image
    logger.info("Begin of csst_mci_astrometry.")

    fp_image = img_path
    image_header = fits.getheader(fp_image, ext=0)

    pointing_ra = image_header["OBJ_RA"]
    pointing_dec = image_header["OBJ_DEC"]

    # reference catalog
    fp_refcat = refcat_path
    refcat_now = fits.getdata(fp_refcat, ext=1)

    # print(target_detectors)
    # do distortion resolving for each detector in parallel
    results = joblib.Parallel(n_jobs=len(target_detectors))(
        joblib.delayed(run_one_frame)(detector, refcat_now)
        for detector in target_detectors
    )

    return (
        CsstStatus.PERFECT
        if all([_ == CsstStatus.PERFECT for _ in results])
        else CsstStatus.ERROR
    )


### 测试 astrometry 代码 run_astrometry aprt

In [7]:
dm = csst_mci.dm
print(type(dm))
dm.target_detectors = ["C1"]
print(dm.target_detectors)

imgfits = dm.l1_detector(detector=dm.target_detectors[0], post="img.fits")
nimgfits = dm.l1_detector(detector=dm.target_detectors[0], post="nimg.fits")

print(imgfits)
print(nimgfits)
image_header = fits.getheader(imgfits, ext=0)
type(image_header)

<class 'csst_mci_common.data_manager.CsstMCIDataManager'>
['C1']
/nfsdata/share/pipeline-unittest/csst_mci/data/dfs_dummy/L1/sim_ver20230725/20100000001/ins_output/CSST_MCI_C1_EXDF_20230918003941_20230918004441_20100000001_07_L1_V01_img.fits
/nfsdata/share/pipeline-unittest/csst_mci/data/dfs_dummy/L1/sim_ver20230725/20100000001/ins_output/CSST_MCI_C1_EXDF_20230918003941_20230918004441_20100000001_07_L1_V01_nimg.fits


astropy.io.fits.header.Header

### 参数赋值

In [6]:
logger = get_logger()
CONFIG_PATH = "/home/fangwf/software/miniconda3/envs/csst_mci/lib/python3.11/site-packages/csst_mci_astrometry/data/"

img_path: str = '/nfsdata/share/pipeline-unittest/csst_mci/data/dfs_dummy/L1/sim_ver20230725/20100000001/ins_output/CSST_MCI_C1_EXDF_20230918003941_20230918004441_20100000001_07_L1_V01_nimg.fits'
refcat_path: str = '/nfsdata/share/pipeline-unittest/csst_mci/data/dfs_dummy/L1/sim_ver20230725/20100000001/l1_proc_output/gaiadr3nowlac.fits'
target_detectors: list = ['C1']
ast_head_path: str = '/nfsdata/share/pipeline-unittest/csst_mci/data/dfs_dummy/L1/sim_ver20230725/20100000001/ins_output/CSST_MCI_C1_EXDF_20230918003941_20230918004441_20100000001_07_L1_V01_ast.fits'
sext_config_path = CONFIG_PATH + "astrom.sex"
sext_out_path: str = "/nfsdata/share/pipeline-unittest/csst_mci/data/dfs_dummy/L1/sim_ver20230725/20100000001/ins_output/CSST_MCI_C1_EXDF_20230918003941_20230918004441_20100000001_07_L1_V01_nimg.acat"
sext_out_path_up: str = "/nfsdata/share/pipeline-unittest/csst_mci/data/dfs_dummy/L1/sim_ver20230725/20100000001/ins_output/CSST_MCI_C1_EXDF_20230918003941_20230918004441_20100000001_07_L1_V01_ste.acat"
sext_para_path: str = CONFIG_PATH + "astrom.param"
sext_filter_path: str = CONFIG_PATH + "gauss_4.0_7x7.conv"
scamp_config_path: str = CONFIG_PATH + "g3default.scamp"
scamp_head_path: str = "/nfsdata/share/pipeline-unittest/csst_mci/data/dfs_dummy/L1/sim_ver20230725/20100000001/ins_output/CSST_MCI_C1_EXDF_20230918003941_20230918004441_20100000001_07_L1_V01_ste.head"
scamp_head_suffix_path: str = ".head"
scamp_head_bk_path: str = '/nfsdata/share/pipeline-unittest/csst_mci/data/dfs_dummy/L1/sim_ver20230725/20100000001/ins_output/CSST_MCI_C1_EXDF_20230918003941_20230918004441_20100000001_07_L1_V01_bk.head'


In [7]:
fp_refcat = refcat_path
hdu = fits.open(fp_refcat)
refcat_now = Table(hdu[2].data)

In [49]:
# logger = get_logger()
# CONFIG_PATH = "/home/fangwf/software/miniconda3/envs/csst_mci/lib/python3.11/site-packages/csst_mci_astrometry/data/"

# detector = dm.target_detectors[0]
# img_path = dm.l1_detector(detector=detector, post="nimg.fits")
# refcat_path = dm.l1_file(name="gaiadr3nowlac.fits")

# ast_head_path = dm.l1_detector(detector=detector, post="ast.fits")
# sext_out_path = dm.l1_detector(detector=detector, post="nimg.acat")
# sext_out_path_up = dm.l1_detector(detector=detector, post="ste.acat")
# sext_config_path = CONFIG_PATH + "astrom.sex"
# sext_para_path: str = CONFIG_PATH + "astrom.param"
# sext_filter_path: str = CONFIG_PATH + "gauss_4.0_7x7.conv"

# scamp_head_path = dm.l1_detector(detector=detector, post="ste.head")
# scamp_head_bk_path = dm.l1_detector(detector=detector, post="bk.head")
# scamp_head_suffix_path: str = ".head"
# scamp_config_path: str = CONFIG_PATH + "g3default.scamp"

# fp_refcat = refcat_path
# hdu = fits.open(fp_refcat)
# refcat_now = Table(hdu[2].data)

# result = run_one_frame(refcat_now, 
#                        refcat_path, 
#                        img_path, 
#                        ast_head_path, 
#                        sext_config_path, 
#                        sext_out_path, 
#                        sext_out_path_up, 
#                        sext_para_path, 
#                        sext_filter_path, 
#                        scamp_config_path, 
#                        scamp_head_path, 
#                        scamp_head_suffix_path, 
#                        scamp_head_bk_path, 
#                        logger)

# print(result)

### 测试 run one frame

In [8]:
# write the WCS head from *img.fits.
fp_img = img_path
fp_head_ast = ast_head_path
write_wcs_head(fp_img, fp_head_ast)

# check
# fits.open(fp_head_ast).info()
# print(repr(fits.getheader(fp_head_ast)))
# finished

In [9]:
# run source-extractor for the image fits.

fp_config_sext = sext_config_path
fp_out_sext = sext_out_path
fp_out_sext_up = sext_out_path_up
fp_para_sext = sext_para_path
fp_filter_sext = sext_filter_path
work_sext(fp_img, fp_config_sext, fp_out_sext, fp_para_sext, fp_filter_sext)

stardata = createxy_upwcs(fp_out_sext, fp_out_sext_up)

# check
# stardata
# finished

[1M> 
[1A----- SExtractor 2.28.0 started on 2024-01-04 at 22:19:41 with 1 thread

[1M> Setting catalog parameters
[1A[1M> Reading detection filter
[1A[1M> Initializing catalog
[1A[1M> Looking for CSST_MCI_C1_EXDF_20230918003941_20230918004441_20100000001_07_L1_V01_nimg.fits
[1A----- Measuring from: CSST_MCI_C1_EXDF_20230918003941_20230918004441_20100000001_07_L1_V01_nimg.fits [1/1]
      "Unnamed" / no ext. header / 9216x9232 / 32 bits (floats)
Detection+Measurement image: [1M> Setting up background maps
[1A[1M> Setting up background map at line:   64
[1A[1M> Setting up background map at line:  128
[1A[1M> Setting up background map at line:  192
[1A[1M> Setting up background map at line:  256
[1A[1M> Setting up background map at line:  320
[1A[1M> Setting up background map at line:  384
[1A[1M> Setting up background map at line:  448
[1A[1M> Setting up background map at line:  512
[1A[1M> Setting up background map at line:  576
[1A[1M> Setting up backgrou

In [10]:
# run scamp for the result from source-extractor.
fp_refcat = refcat_path
fp_config_scamp = scamp_config_path
fp_head_scamp = scamp_head_path
fp_head_suffix_scamp = scamp_head_suffix_path
work_scamp(fp_out_sext, fp_refcat, fp_config_scamp, fp_head_scamp, fp_head_suffix_scamp)












[1M> 
[1A----- SCAMP 2.10.0 started on 2024-01-04 at 22:20:00 with 8 threads

[1M> 
[1A----- 1 input:
[1M> Examining Catalog CSST_MCI_C1_EXDF_20230918003941_20230918004441_20100000001_07_L1_V01_nimg.acat
[1ACSST_MCI_C1_EXDF_20230918003941_20230918004441_20100000001_07_L1_V01_nimg.acat:  "no ident           "  EXTERN. HEADER   1 set     147 detections

----- 147 detections loaded
[1M> Grouping fields on the sky ...
[1A[1M> Grouping fields: field 1/1, 0 group
[1A[1M> 
[1A
----- 1 instrument found for astrometry:

Instrument A1 :
1 extension
FILTER  = 'u       '                                                            
QRUNID  =                                                                       

----- 1 instrument found for photometry:

Instrument P1 :
FILTER  = 'u       '                                                            

----- 1 field group found:

 Group  1: 1 field at 07:44:43.40 +39:25:23.4 with radius 5.435'
[7m                  instruments  ep

In [15]:
fp_head_bk_scamp = scamp_head_bk_path

fp_head = fp_head_scamp
fp_head_bk = fp_head_bk_scamp


f = open(fp_head, "r")
f_bk = open(fp_head_bk, "w")
a = ""
i = 0
for v in f.readlines():
    sp = ""
    asp = ""
    i += 1
    if len(v) <= 81:
        sp = " " * (81 - len(v))
    if "Sorbonne" in v:
        v = "COMMENT   (c) 2010-2018 Sorbonne Universite/Universite de Bordeaux/CNRS"
    if "END" in v:
        asp = " " * 80 * (36 - i % 36)
        i = i + (36 - i % 36)
        # print(i)
    a = a + v + sp + asp
f_bk.write(a.replace("\n", ""))
f_bk.close()
f.close()


In [21]:
fp_head
fp_head_bk
# os.system("code fp_head")

'/nfsdata/share/pipeline-unittest/csst_mci/data/dfs_dummy/L1/sim_ver20230725/20100000001/ins_output/CSST_MCI_C1_EXDF_20230918003941_20230918004441_20100000001_07_L1_V01_bk.head'

In [53]:
# rewrite the scamp WCS head
fp_head_bk_scamp = scamp_head_bk_path
rewrite_wcs_head(fp_head_scamp, fp_head_bk_scamp)
wcshdr = fits.getheader(fp_head_bk_scamp, ignore_missing_simple=True)

In [None]:
if not wcshdr.get("PV1_16"):
    print("Warning: astrometry failure in this chip. Exiting...")
    os.system(" rm " + fp_head_bk_scamp)
else:
    os.system("cp " + fp_head_bk_scamp + " " + fp_head_ast)

wcshdr["CTYPE1"] = "RA---TPV"
wcshdr["CTYPE2"] = "DEC--TPV"
w = WCS(wcshdr)


In [None]:
meanra, meande, stdra, stdde, match_num = check_astrometry(w, stardata, refcat_now)


In [None]:
logger = get_logger()

logger.info("################# astrometry first result: ##############")
logger.info(" The first step: check match ")
if match_num < 6:
    logger.info(" match_num  less than 10   bad astrometry")
    result =  CsstStatus.ERROR
else:
    logger.info(" check error")
    print(meanra, meande, stdra, stdde, match_num)
    if (
        meanra * 1000 < 1000
        and meande * 1000 < 1000
        and stdra * 1000.0 < 3000
        and stdde * 1000 < 3000
    ):
        logger.info(" good result for " + str(fp_img))
        logger.info(
            "median ra_off,   dec_off (mas) from scamp:"
            + str(meanra * 1000.0)
            + "  "
            + str(meande * 1000.0)
        )
        logger.info(
            "rms ra_off,   dec_off (mas) from scamp:"
            + str(stdra * 1000.0)
            + "  "
            + str(stdde * 1000.0)
        )

    else:
        logger.error("step3 big error")
        result = CsstStatus.ERROR

saveresult(fp_head_ast, meanra, meande, stdra, stdde, match_num)
logger.info("The fitting ran smoothly.")
result = CsstStatus.PERFECT

In [57]:
result

<CsstStatus.PERFECT: 0>

In [58]:
meanra

-0.0010739107674453408

In [60]:
fp_head_scamp


OSError: No SIMPLE card found, this file does not appear to be a valid FITS file. If this is really a FITS file, try with ignore_missing_simple=True