# Test Redshift Database on everest

In [1]:
import os
import re
import glob

import numpy as np
from astropy.io import fits
from astropy.table import Table
from pytz import utc

from sqlalchemy import (create_engine, event, ForeignKey, Column, DDL,
                        BigInteger, Boolean, Integer, String, Float, DateTime,
                        bindparam)
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.declarative import declarative_base, declared_attr
from sqlalchemy.orm import scoped_session, sessionmaker, relationship
from sqlalchemy.schema import CreateSchema

from desiutil.log import get_logger, DEBUG, INFO

from desispec.io.meta import specprod_root
from desispec.database.util import convert_dateobs, parse_pgpass

Base = declarative_base()
engine = None
dbSession = scoped_session(sessionmaker())
schemaname = None


class SchemaMixin(object):
    """Mixin class to allow schema name to be changed at runtime. Also
    automatically sets the table name.
    """

    @declared_attr
    def __tablename__(cls):
        return cls.__name__.lower()

    @declared_attr
    def __table_args__(cls):
        return {'schema': schemaname}


In [2]:
class Target(SchemaMixin, Base):
    """Representation of the target table.
    """

    release = Column(Integer, nullable=False)
    brickid = Column(Integer, nullable=False)
    brickname = Column(String(8), nullable=False)
    brick_objid = Column(Integer, nullable=False)
    morphtype = Column(String(4), nullable=False)
    ra = Column(Float, nullable=False)
    ra_ivar = Column(Float, nullable=False)
    dec = Column(Float, nullable=False)
    dec_ivar = Column(Float, nullable=False)
    dchisq_psf = Column(Float, nullable=False)
    dchisq_rex = Column(Float, nullable=False)
    dchisq_dev = Column(Float, nullable=False)
    dchisq_exp = Column(Float, nullable=False)
    dchisq_ser = Column(Float, nullable=False)
    ebv = Column(Float, nullable=False)
    flux_g = Column(Float, nullable=False)
    flux_r = Column(Float, nullable=False)
    flux_z = Column(Float, nullable=False)
    flux_ivar_g = Column(Float, nullable=False)
    flux_ivar_r = Column(Float, nullable=False)
    flux_ivar_z = Column(Float, nullable=False)
    mw_transmission_g = Column(Float, nullable=False)
    mw_transmission_r = Column(Float, nullable=False)
    mw_transmission_z = Column(Float, nullable=False)
    fracflux_g = Column(Float, nullable=False)
    fracflux_r = Column(Float, nullable=False)
    fracflux_z = Column(Float, nullable=False)
    fracmasked_g = Column(Float, nullable=False)
    fracmasked_r = Column(Float, nullable=False)
    fracmasked_z = Column(Float, nullable=False)
    fracin_g = Column(Float, nullable=False)
    fracin_r = Column(Float, nullable=False)
    fracin_z = Column(Float, nullable=False)
    nobs_g = Column(Integer, nullable=False)
    nobs_r = Column(Integer, nullable=False)
    nobs_z = Column(Integer, nullable=False)
    psfdepth_g = Column(Float, nullable=False)
    psfdepth_r = Column(Float, nullable=False)
    psfdepth_z = Column(Float, nullable=False)
    galdepth_g = Column(Float, nullable=False)
    galdepth_r = Column(Float, nullable=False)
    galdepth_z = Column(Float, nullable=False)
    flux_w1 = Column(Float, nullable=False)
    flux_w2 = Column(Float, nullable=False)
    flux_w3 = Column(Float, nullable=False)
    flux_w4 = Column(Float, nullable=False)
    flux_ivar_w1 = Column(Float, nullable=False)
    flux_ivar_w2 = Column(Float, nullable=False)
    flux_ivar_w3 = Column(Float, nullable=False)
    flux_ivar_w4 = Column(Float, nullable=False)
    mw_transmission_w1 = Column(Float, nullable=False)
    mw_transmission_w2 = Column(Float, nullable=False)
    mw_transmission_w3 = Column(Float, nullable=False)
    mw_transmission_w4 = Column(Float, nullable=False)
    allmask_g = Column(Float, nullable=False)
    allmask_r = Column(Float, nullable=False)
    allmask_z = Column(Float, nullable=False)
    fiberflux_g = Column(Float, nullable=False)
    fiberflux_r = Column(Float, nullable=False)
    fiberflux_z = Column(Float, nullable=False)
    fibertotflux_g = Column(Float, nullable=False)
    fibertotflux_r = Column(Float, nullable=False)
    fibertotflux_z = Column(Float, nullable=False)
    ref_epoch = Column(Float, nullable=False)
    wisemask_w1 = Column(Integer, nullable=False)
    wisemask_w2 = Column(Integer, nullable=False)
    maskbits = Column(Integer, nullable=False)
    shape_r = Column(Float, nullable=False)
    shape_e1 = Column(Float, nullable=False)
    shape_e2 = Column(Float, nullable=False)
    shape_r_ivar = Column(Float, nullable=False)
    shape_e1_ivar = Column(Float, nullable=False)
    shape_e2_ivar = Column(Float, nullable=False)
    sersic = Column(Float, nullable=False)
    sersic_ivar = Column(Float, nullable=False)
    ref_id = Column(BigInteger, nullable=False)
    ref_cat = Column(String(2), nullable=False)
    gaia_phot_g_mean_mag = Column(Float, nullable=False)
    gaia_phot_g_mean_flux_over_error = Column(Float, nullable=False)
    gaia_phot_bp_mean_mag = Column(Float, nullable=False)
    gaia_phot_bp_mean_flux_over_error = Column(Float, nullable=False)
    gaia_phot_rp_mean_mag = Column(Float, nullable=False)
    gaia_phot_rp_mean_flux_over_error = Column(Float, nullable=False)
    gaia_phot_bp_rp_excess_factor = Column(Float, nullable=False)
    gaia_astrometric_excess_noise = Column(Float, nullable=False)
    gaia_duplicated_source = Column(Boolean, nullable=False)
    gaia_astrometric_sigma5d_max = Column(Float, nullable=False)
    gaia_astrometric_params_solved = Column(BigInteger, nullable=False)
    parallax = Column(Float, nullable=False)
    parallax_ivar = Column(Float, nullable=False)
    pmra = Column(Float, nullable=False)
    pmra_ivar = Column(Float, nullable=False)
    pmdec = Column(Float, nullable=False)
    pmdec_ivar = Column(Float, nullable=False)
    photsys = Column(String(1), nullable=False)
    targetid = Column(BigInteger, primary_key=True, autoincrement=False)
    desi_target = Column(BigInteger, nullable=False)
    bgs_target = Column(BigInteger, nullable=False)
    mws_target = Column(BigInteger, nullable=False)
    subpriority = Column(Float, nullable=False)
    obsconditions = Column(BigInteger, nullable=False)
    priority_init = Column(BigInteger, nullable=False)
    numobs_init = Column(BigInteger, nullable=False)
    scnd_target = Column(BigInteger, nullable=False)
    hpxpixel = Column(BigInteger, nullable=False)

    def __repr__(self):
        return "<Target(targetid={0.targetid})>".format(self)


In [3]:
def setup_db(options=None, **kwargs):
    """Initialize the database connection.

    Parameters
    ----------
    options : :class:`argpare.Namespace`
        Parsed command-line options.
    kwargs : keywords
        If present, use these instead of `options`.  This is more
        user-friendly than setting up a :class:`~argpare.Namespace`
        object in, *e.g.* a Jupyter Notebook.

    Returns
    -------
    :class:`bool`
        ``True`` if the configured database is a PostgreSQL database.
    """
    global engine, schemaname
    #
    # Schema creation
    #
    if options is None:
        if len(kwargs) > 0:
            try:
                schema = kwargs['schema']
            except KeyError:
                schema = None
            try:
                overwrite = kwargs['overwrite']
            except KeyError:
                overwrite = False
            try:
                hostname = kwargs['hostname']
            except KeyError:
                hostname = None
            try:
                username = kwargs['username']
            except KeyError:
                username = 'desidev_admin'
            try:
                dbfile = kwargs['dbfile']
            except KeyError:
                dbfile = 'redshift.db'
            try:
                datapath = kwargs['datapath']
            except KeyError:
                datapath = None
            try:
                verbose = kwargs['verbose']
            except KeyError:
                verbose = False
        else:
            raise ValueError("No options specified!")
    else:
        schema = options.schema
        overwrite = options.overwrite
        hostname = options.hostname
        username = options.username
        dbfile = options.dbfile
        datapath = options.datapath
        verbose = options.verbose
    if schema:
        schemaname = schema
        # event.listen(Base.metadata, 'before_create', CreateSchema(schemaname))
        if overwrite:
            event.listen(Base.metadata, 'before_create',
                         DDL('DROP SCHEMA IF EXISTS {0} CASCADE'.format(schemaname)))
        event.listen(Base.metadata, 'before_create',
                     DDL('CREATE SCHEMA IF NOT EXISTS {0}'.format(schemaname)))
    #
    # Create the file.
    #
    postgresql = False
    if hostname:
        postgresql = True
        db_connection = parse_pgpass(hostname=hostname,
                                     username=username)
        if db_connection is None:
            log.critical("Could not load database information!")
            return 1
    else:
        if os.path.basename(dbfile) == dbfile:
            db_file = os.path.join(datapath, dbfile)
        else:
            db_file = dbfile
        if overwrite and os.path.exists(db_file):
            log.info("Removing file: %s.", db_file)
            os.remove(db_file)
        db_connection = 'sqlite:///'+db_file
    #
    # SQLAlchemy stuff.
    #
    engine = create_engine(db_connection, echo=verbose)
    dbSession.remove()
    dbSession.configure(bind=engine, autoflush=False, expire_on_commit=False)
    log.info("Begin creating tables.")
    for tab in Base.metadata.tables.values():
        tab.schema = schemaname
    Base.metadata.create_all(engine)
    log.info("Finished creating tables.")
    return postgresql


In [4]:
# if options.verbose:
log = get_logger(DEBUG, timestamp=True)
# else:
#     log = get_logger(INFO, timestamp=True)
postgresql = setup_db(overwrite=True, dbfile=os.path.join(os.environ['CSCRATCH'], 'everest.db'), verbose=True)

INFO:<ipython-input-3-499e3d74b407>:87:setup_db:2021-11-02T15:04:40: Removing file: /global/cscratch1/sd/bweaver/everest.db.
INFO:<ipython-input-3-499e3d74b407>:96:setup_db:2021-11-02T15:04:40: Begin creating tables.
2021-11-02 15:04:40,843 INFO sqlalchemy.engine.base.Engine SELECT CAST('test plain returns' AS VARCHAR(60)) AS anon_1
2021-11-02 15:04:40,843 INFO sqlalchemy.engine.base.Engine ()
2021-11-02 15:04:40,845 INFO sqlalchemy.engine.base.Engine SELECT CAST('test unicode returns' AS VARCHAR(60)) AS anon_1
2021-11-02 15:04:40,845 INFO sqlalchemy.engine.base.Engine ()
2021-11-02 15:04:40,847 INFO sqlalchemy.engine.base.Engine PRAGMA main.table_info("target")
2021-11-02 15:04:40,848 INFO sqlalchemy.engine.base.Engine ()
2021-11-02 15:04:40,849 INFO sqlalchemy.engine.base.Engine PRAGMA temp.table_info("target")
2021-11-02 15:04:40,850 INFO sqlalchemy.engine.base.Engine ()
2021-11-02 15:04:40,856 INFO sqlalchemy.engine.base.Engine 
CREATE TABLE target (
	release INTEGER NOT NULL, 
	br

In [5]:
targets = glob.glob(os.path.join(os.environ['DESI_TARGET'], 'catalogs', 'dr9', '1.1.1', 'targets', 'main', 'resolve', 'dark', 'targets-dark-hp-*.fits'))
log.info("Identified %d target files for ingestion.", len(targets))

INFO:<ipython-input-5-129cd5884f6a>:2:<module>:2021-11-02T15:04:46: Identified 460 target files for ingestion.


In [6]:
hdu = 'TARGETS'
expand = {'DCHISQ': ('dchisq_psf', 'dchisq_rex', 'dchisq_dev', 'dchisq_exp', 'dchisq_ser',)}
for target in targets:
    with fits.open(target) as hdulist:
        data = hdulist[hdu].data
    log.info("Read data from %s, HDU %s", target, hdu)
    for col in data.names:
        if data[col].dtype.kind == 'f':
            bad = np.isnan(data[col])
            if np.any(bad):
                nbad = bad.sum()
                log.warning("%d rows of bad data detected in column " +
                            "%s of %s.", nbad, col, target)
                #
                # This replacement may be deprecated in the future.
                #
                data[col][bad] = -9999.0
            assert not np.any(np.isnan(data[col]))
            assert np.all(np.isfinite(data[col]))
    n_rows = len(data)
    data_list = ([data[col].tolist() for col in data.names])
    data_names = [col.lower() for col in data.names]
    log.info("Initial column conversion complete on target = %s.", target)
    if expand is not None:
        for col in expand:
            i = data_names.index(col.lower())
            if isinstance(expand[col], str):
                #
                # Just rename a column.
                #
                log.debug("Renaming column %s (at index %d) to %s.", data_names[i], i, expand[col])
                data_names[i] = expand[col]
            else:
                #
                # Assume this is an expansion of an array-valued column
                # into individual columns.
                #
                del data_names[i]
                del data_list[i]
                for j, n in enumerate(expand[col]):
                    log.debug("Expanding column %d of %s (at index %d) to %s.", j, col, i, n)
                    data_names.insert(i + j, n)
                    data_list.insert(i + j, data[col][:, j].tolist())
                log.debug(data_names)
    log.info("Column expansion complete on %s.", target)
    del data
    data_rows = list(zip(*data_list))
    del data_list
    log.info("Converted columns into rows on target = %s.", target)
    dbSession.bulk_insert_mappings(Target, [dict(zip(data_names, row))
                                                 for row in data_rows])
    log.info("Inserted %d rows in %s for tileid = %s.",
             n_rows, Target.__tablename__, target)
    dbSession.commit()


INFO:<ipython-input-6-f94a76546d8c>:6:<module>:2021-11-02T15:04:51: Read data from /global/cfs/cdirs/desi/target/catalogs/dr9/1.1.1/targets/main/resolve/dark/targets-dark-hp-604.fits, HDU TARGETS
INFO:<ipython-input-6-f94a76546d8c>:23:<module>:2021-11-02T15:04:51: Initial column conversion complete on target = /global/cfs/cdirs/desi/target/catalogs/dr9/1.1.1/targets/main/resolve/dark/targets-dark-hp-604.fits.
DEBUG:<ipython-input-6-f94a76546d8c>:41:<module>:2021-11-02T15:04:51: Expanding column 0 of DCHISQ (at index 9) to dchisq_psf.
DEBUG:<ipython-input-6-f94a76546d8c>:41:<module>:2021-11-02T15:04:51: Expanding column 1 of DCHISQ (at index 9) to dchisq_rex.
DEBUG:<ipython-input-6-f94a76546d8c>:41:<module>:2021-11-02T15:04:51: Expanding column 2 of DCHISQ (at index 9) to dchisq_dev.
DEBUG:<ipython-input-6-f94a76546d8c>:41:<module>:2021-11-02T15:04:51: Expanding column 3 of DCHISQ (at index 9) to dchisq_exp.
DEBUG:<ipython-input-6-f94a76546d8c>:41:<module>:2021-11-02T15:04:51: Expanding