## PLAsTiCC mini alert stream
This code is a collection of pieces from the tests of ap\_association that are being pulled together to make some test alerts that will be edited to make a plasticc alert stream.

Part of the plasticc alerts repository https://github.com/LSSTDESC/plasticc_alerts that is being developed by Cameron Stockton and Renee Hlozek

In [2]:
import os
import numpy as np
import pandas as pd
import shutil
import tempfile
import unittest
import datetime

from lsst.ap.association import (PackageAlertsTask,
                                 PackageAlertsConfig,
                                 AssociationTask,
                                 make_dia_source_schema,
                                 make_dia_object_schema)
from lsst.ap.pipe import ApPipeTask
import lsst.utils.tests
import lsst.pex.exceptions as pexExcept
import lsst.daf.persistence as dafPersist
import lsst.pipe.base as pipeBase
from lsst.afw.cameraGeom.testUtils import DetectorWrapper
import lsst.afw.fits as afwFits
import lsst.afw.image as afwImage
import lsst.afw.image.utils as afwImageUtils
import lsst.daf.base as dafBase
from lsst.dax.apdb import Apdb, ApdbConfig
import lsst.geom as geom
import lsst.meas.base.tests
from lsst.utils import getPackageDir
import lsst.utils.tests
import lsst.afw.geom as afwGeom

In [3]:
# adding what we need for images 

from astropy.table import Table
from lsst.daf.persistence import Butler
import lsst.afw.display as afwDisplay

from astropy.visualization import ZScaleInterval

from desc_dc2_dm_data import REPOS

In [4]:
def _data_file_name(basename, module_name):
    """Return path name of a data file.

    Parameters
    ----------
    basename : `str`
        Name of the file to add to the path string.
    module_name : `str`
        Name of lsst stack package environment variable.

    Returns
    -------
    data_file_path : `str`
       Full path of the file to load from the "data" directory in a given
       repository.
    """
    return os.path.join(getPackageDir(module_name), "data", basename)

### Pull the relevant functions from the test directory of ap\_association

In [5]:
def makeExposure(flipX=False, flipY=False):
    """Create an exposure and flip the x or y (or both) coordinates.

    Returns bounding boxes that are right or left handed around the bounding
    polygon.

    Parameters
    ----------
    flipX : `bool`
        Flip the x coordinate in the WCS.
    flipY : `bool`
        Flip the y coordinate in the WCS.

    Returns
    -------
    exposure : `lsst.afw.image.Exposure`
        Exposure with a valid bounding box and wcs.
    """
    metadata = dafBase.PropertySet()

    metadata.set("SIMPLE", "T")
    metadata.set("BITPIX", -32)
    metadata.set("NAXIS", 2)
    metadata.set("NAXIS1", 1024)
    metadata.set("NAXIS2", 1153)
    metadata.set("RADECSYS", 'FK5')
    metadata.set("EQUINOX", 2000.)

    metadata.setDouble("CRVAL1", 215.604025685476)
    metadata.setDouble("CRVAL2", 53.1595451514076)
    metadata.setDouble("CRPIX1", 1109.99981456774)
    metadata.setDouble("CRPIX2", 560.018167811613)
    metadata.set("CTYPE1", 'RA---SIN')
    metadata.set("CTYPE2", 'DEC--SIN')

    xFlip = 1
    if flipX:
        xFlip = -1
    yFlip = 1
    if flipY:
        yFlip = -1
    metadata.setDouble("CD1_1", xFlip * 5.10808596133527E-05)
    metadata.setDouble("CD1_2", yFlip * 1.85579539217196E-07)
    metadata.setDouble("CD2_2", yFlip * -5.10281493481982E-05)
    metadata.setDouble("CD2_1", xFlip * -8.27440751733828E-07)

    wcs = afwGeom.makeSkyWcs(metadata)
    exposure = afwImage.makeExposure(
        afwImage.makeMaskedImageFromArrays(np.ones((1024, 1153))), wcs)
    detector = DetectorWrapper(id=23, bbox=exposure.getBBox()).detector
    visit = afwImage.VisitInfo(
        exposureId=1234,
        exposureTime=200.,
        date=dafBase.DateTime("2014-05-13T17:00:00.000000000",
                              dafBase.DateTime.Timescale.TAI))
    exposure.setDetector(detector)
    exposure.getInfo().setVisitInfo(visit)
    #exposure.setFilter(afwImage.Filter('g')) # need to fix

    return exposure

In [6]:
def makeDiaObjects(nObjects, exposure):
    """Make a test set of DiaObjects.

    Parameters
    ----------
    nObjects : `int`
        Number of objects to create.
    exposure : `lsst.afw.image.Exposure`
        Exposure to create objects over.

    Returns
    -------
    diaObjects : `pandas.DataFrame`
        DiaObjects generated across the exposure.
    """
    bbox = geom.Box2D(exposure.getBBox())
    rand_x = np.random.uniform(bbox.getMinX(), bbox.getMaxX(), size=nObjects)
    rand_y = np.random.uniform(bbox.getMinY(), bbox.getMaxY(), size=nObjects)

    midPointTaiMJD = exposure.getInfo().getVisitInfo().getDate().get(
        system=dafBase.DateTime.MJD)

    wcs = exposure.getWcs()

    data = []
    for idx, (x, y) in enumerate(zip(rand_x, rand_y)):
        coord = wcs.pixelToSky(x, y)
        htmIdx = 1
        newObject = {"ra": coord.getRa().asDegrees(),
                     "decl": coord.getDec().asDegrees(),
                     "radecTai": midPointTaiMJD,
                     "diaObjectId": idx,
                     "pixelId": htmIdx,
                     "pmParallaxNdata": 0,
                     "nearbyObj1": 0,
                     "nearbyObj2": 0,
                     "nearbyObj3": 0,
                     "flags": 1,
                     "nDiaSources": 5}
        for f in ["u", "g", "r", "i", "z", "y"]:
            newObject["%sPSFluxNdata" % f] = 0
        data.append(newObject)

    return pd.DataFrame(data=data)

In [7]:
def makeDiaSources(nSources, diaObjectIds, exposure):
    """Make a test set of DiaSources.

    Parameters
    ----------
    nSources : `int`
        Number of sources to create.
    diaObjectIds : `numpy.ndarray`
        Integer Ids of diaobjects to "associate" with the DiaSources.
    exposure : `lsst.afw.image.Exposure`
        Exposure to create sources over.
    pixelator : `lsst.sphgeom.HtmPixelization`
        Object to compute spatial indicies from.

    Returns
    -------
    diaSources : `pandas.DataFrame`
        DiaSources generated across the exposure.
    """
    bbox = geom.Box2D(exposure.getBBox())
    rand_x = np.random.uniform(bbox.getMinX(), bbox.getMaxX(), size=nSources)
    rand_y = np.random.uniform(bbox.getMinY(), bbox.getMaxY(), size=nSources)

    midPointTaiMJD = exposure.getInfo().getVisitInfo().getDate().get(
        system=dafBase.DateTime.MJD)

    wcs = exposure.getWcs()
    ccdVisitId = exposure.getInfo().getVisitInfo().getExposureId()

    data = []
    for idx, (x, y) in enumerate(zip(rand_x, rand_y)):
        coord = wcs.pixelToSky(x, y)
        htmIdx = 1
        objId = diaObjectIds[idx % len(diaObjectIds)]
        # Put together the minimum values for the alert.
        data.append({"ra": coord.getRa().asDegrees(),
                     "decl": coord.getDec().asDegrees(),
                     "x": x,
                     "y": y,
                     "ccdVisitId": ccdVisitId,
                     "diaObjectId": objId,
                     "ssObjectId": 0,
                     "parentDiaSourceId": 0,
                     "prv_procOrder": 0,
                     "diaSourceId": idx,
                     "pixelId": htmIdx,
                     "midPointTai": midPointTaiMJD + 1.0 * idx,
                     "filterName": exposure.getFilter().getCanonicalName(),
                     "filterId": 0,
                     "psNdata": 0,
                     "trailNdata": 0,
                     "dipNdata": 0,
                     "flags": 1})

    return pd.DataFrame(data=data)

In [8]:
def _roundTripThroughApdb(objects, sources, dateTime):
    """Run object and source catalogs through the Apdb to get the correct
    table schemas.

    Parameters
    ----------
    objects : `pandas.DataFrame`
        Set of test DiaObjects to round trip.
    sources : `pandas.DataFrame`
        Set of test DiaSources to round trip.
    dateTime : `datetime.datetime`
        Time for the Apdb.

    Returns
    -------
    objects : `pandas.DataFrame`
        Round tripped objects.
    sources : `pandas.DataFrame`
        Round tripped sources.
    """
    tmpFile = tempfile.NamedTemporaryFile()

    apdbConfig = ApdbConfig()
    apdbConfig.db_url = "sqlite:///" + tmpFile.name
    apdbConfig.isolation_level = "READ_UNCOMMITTED"
    apdbConfig.dia_object_index = "baseline"
    apdbConfig.dia_object_columns = []
    apdbConfig.schema_file = _data_file_name(
        "apdb-schema.yaml", "dax_apdb")
    apdbConfig.column_map = _data_file_name(
        "apdb-ap-pipe-afw-map.yaml", "ap_association")
    apdbConfig.extra_schema_file = _data_file_name(
        "apdb-ap-pipe-schema-extra.yaml", "ap_association")

    apdb = Apdb(config=apdbConfig,
                afw_schemas=dict(DiaObject=make_dia_object_schema(),
                                 DiaSource=make_dia_source_schema()))
    apdb.makeSchema()

    minId = objects["pixelId"].min()
    maxId = objects["pixelId"].max()
    diaObjects = apdb.getDiaObjects([[minId, maxId + 1]], return_pandas=True).append(objects)
    diaSources = apdb.getDiaSources(np.unique(objects["diaObjectId"]),
                                    dateTime,
                                    return_pandas=True).append(sources)

    apdb.storeDiaSources(diaSources)
    apdb.storeDiaObjects(diaObjects, dateTime)

    diaObjects = apdb.getDiaObjects([[minId, maxId + 1]], return_pandas=True)
    diaSources = apdb.getDiaSources(np.unique(diaObjects["diaObjectId"]),
                                    dateTime,
                                    return_pandas=True)
    diaObjects.set_index("diaObjectId", drop=False, inplace=True)
    diaSources.set_index(["diaObjectId", "filterName", "diaSourceId"],
                         drop=False,
                         inplace=True)

    return (diaObjects, diaSources)

### Now use the functions above to make a single exposure, a diaObject and some diaSources

In [9]:
exposure = makeExposure()

In [10]:
objects = makeDiaObjects(10, exposure)

In [11]:
sources = makeDiaSources(50, np.array(objects.diaObjectId), exposure)

### Pull these into the ap db with the date and time

In [12]:
dateTime = datetime.datetime.now()

In [13]:
diaObjects, diaSources = _roundTripThroughApdb(objects, sources, dateTime)

In [14]:
diaObjects

Unnamed: 0_level_0,diaObjectId,validityStart,validityEnd,ra,raErr,decl,declErr,ra_decl_Cov,radecTai,pmRa,...,yPSFluxPercentile75,yPSFluxPercentile95,yPSFluxMin,yPSFluxMax,yPSFluxStetsonJ,yPSFluxLinearSlope,yPSFluxLinearIntercept,yPSFluxMaxSlope,yPSFluxErrMean,parent
diaObjectId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
0,0,2020-12-14 07:37:02.630128,,215.533889,,53.164966,,,56790.708333,,...,,,,,,,,,,
1,1,2020-12-14 07:37:02.630128,,215.543033,,53.160865,,,56790.708333,,...,,,,,,,,,,
2,2,2020-12-14 07:37:02.630128,,215.6046,,53.138477,,,56790.708333,,...,,,,,,,,,,
3,3,2020-12-14 07:37:02.630128,,215.596846,,53.1802,,,56790.708333,,...,,,,,,,,,,
4,4,2020-12-14 07:37:02.630128,,215.59854,,53.1589,,,56790.708333,,...,,,,,,,,,,
5,5,2020-12-14 07:37:02.630128,,215.559906,,53.157456,,,56790.708333,,...,,,,,,,,,,
6,6,2020-12-14 07:37:02.630128,,215.527639,,53.164308,,,56790.708333,,...,,,,,,,,,,
7,7,2020-12-14 07:37:02.630128,,215.523414,,53.182846,,,56790.708333,,...,,,,,,,,,,
8,8,2020-12-14 07:37:02.630128,,215.510619,,53.170459,,,56790.708333,,...,,,,,,,,,,
9,9,2020-12-14 07:37:02.630128,,215.604015,,53.165435,,,56790.708333,,...,,,,,,,,,,


In [15]:
diaSources

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,diaSourceId,ccdVisitId,diaObjectId,ssObjectId,parentDiaSourceId,prv_procOrder,ssObjectReassocTime,midPointTai,ra,raErr,...,iyyPSF,ixyPSF,extendedness,spuriousness,flags,pixelId,filterName,filterId,isDipole,bboxSize
diaObjectId,filterName,diaSourceId,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1
0,_unknown_,0,0,1234,0,0,0,0,,56790.708333,215.599346,,...,,,,,1,1,_unknown_,0,,
0,_unknown_,10,10,1234,0,0,0,0,,56800.708333,215.570467,,...,,,,,1,1,_unknown_,0,,
0,_unknown_,20,20,1234,0,0,0,0,,56810.708333,215.572188,,...,,,,,1,1,_unknown_,0,,
0,_unknown_,30,30,1234,0,0,0,0,,56820.708333,215.582256,,...,,,,,1,1,_unknown_,0,,
0,_unknown_,40,40,1234,0,0,0,0,,56830.708333,215.517858,,...,,,,,1,1,_unknown_,0,,
1,_unknown_,1,1,1234,1,0,0,0,,56791.708333,215.544432,,...,,,,,1,1,_unknown_,0,,
1,_unknown_,11,11,1234,1,0,0,0,,56801.708333,215.54396,,...,,,,,1,1,_unknown_,0,,
1,_unknown_,21,21,1234,1,0,0,0,,56811.708333,215.556401,,...,,,,,1,1,_unknown_,0,,
1,_unknown_,31,31,1234,1,0,0,0,,56821.708333,215.566655,,...,,,,,1,1,_unknown_,0,,
1,_unknown_,41,41,1234,1,0,0,0,,56831.708333,215.542816,,...,,,,,1,1,_unknown_,0,,


### What do we have in the columns of the diaObjects?

In [16]:
cols = diaObjects.columns
for col in cols:
    print(col)

diaObjectId
validityStart
validityEnd
ra
raErr
decl
declErr
ra_decl_Cov
radecTai
pmRa
pmRaErr
pmDecl
pmDeclErr
parallax
parallaxErr
pmRa_pmDecl_Cov
pmRa_parallax_Cov
pmDecl_parallax_Cov
pmParallaxLnL
pmParallaxChi2
pmParallaxNdata
uPSFluxMean
uPSFluxMeanErr
uPSFluxSigma
uPSFluxChi2
uPSFluxNdata
uFPFluxMean
uFPFluxMeanErr
uFPFluxSigma
gPSFluxMean
gPSFluxMeanErr
gPSFluxSigma
gPSFluxChi2
gPSFluxNdata
gFPFluxMean
gFPFluxMeanErr
gFPFluxSigma
rPSFluxMean
rPSFluxMeanErr
rPSFluxSigma
rPSFluxChi2
rPSFluxNdata
rFPFluxMean
rFPFluxMeanErr
rFPFluxSigma
iPSFluxMean
iPSFluxMeanErr
iPSFluxSigma
iPSFluxChi2
iPSFluxNdata
iFPFluxMean
iFPFluxMeanErr
iFPFluxSigma
zPSFluxMean
zPSFluxMeanErr
zPSFluxSigma
zPSFluxChi2
zPSFluxNdata
zFPFluxMean
zFPFluxMeanErr
zFPFluxSigma
yPSFluxMean
yPSFluxMeanErr
yPSFluxSigma
yPSFluxChi2
yPSFluxNdata
yFPFluxMean
yFPFluxMeanErr
yFPFluxSigma
uLcPeriodic
gLcPeriodic
rLcPeriodic
iLcPeriodic
zLcPeriodic
yLcPeriodic
uLcNonPeriodic
gLcNonPeriodic
rLcNonPeriodic
iLcNonPeriodic
zLcNonP

### As a dummy test, we want to assign a flux to the g band for these diaObjects, so that we can do so from the PLAsTiCC simulations

The first thing to do is to pring out the values of the flux and then change them.

In [17]:
for c,row in enumerate(diaObjects['gFPFluxMean']):
    #print(diaObjects.loc[c,'gFPFluxMean'], 'before changing')
    row=np.random.rand()*30
    diaObjects.loc[c,'gFPFluxMean']=row    
    #print(diaObjects.loc[c,'gFPFluxMean'], 'after changing')

In [18]:
diaObjects['gFPFluxMean']

diaObjectId
0     17.8862
1     24.9024
2     1.13982
3     5.80546
4     6.30327
5    0.466024
6     5.79468
7     4.03257
8     12.3032
9      27.162
Name: gFPFluxMean, dtype: object

### Let's now read in real objects and sources from the DC2 simulations

In [19]:
object_table_link = "/global/cscratch1/sd/bos0109/results/association_fix_test/diaObject_table.pqt"
object_table = pd.read_parquet(object_table_link, engine='pyarrow')

assoc_table_link = "/global/cscratch1/sd/bos0109/results/association_fix_test/assoc_table.pqt"
assoc_table = pd.read_parquet(assoc_table_link, engine='pyarrow')

source_table_link = "/global/cscratch1/sd/bos0109/results/association_fix_test/full_table_diasrc.pqt"
source_table = pd.read_parquet(source_table_link, engine='pyarrow')

In [20]:
object_table = object_table.head(2)
assoc_table = assoc_table.head(2)

In [21]:
#Helper function for create_source_table
def _find_row(object_number):
    lst = []
    for srcId in assoc_table["diaSrcIds"][object_number]:
        i = 0
        while srcId != source_table["id"][i]:
            i += 1
    
        lst.append(i)
    
    return lst

def create_source_table(object_number):
    row_numbers = _find_row(object_number)
    ids_ = []
    for row in row_numbers:
        ids_.append(source_table['id'][row])
    
    #Now i have a whole list of ids i need to grab
        
    return source_table.query(f'id in {ids_}')

In [22]:
source_table_0 = create_source_table(0)
source_table_1 = create_source_table(1)
#The second one takes a while because there are a lot of sources for 1 object

### The association table is also important because it links the sources to the objects

In [31]:
assoc_table

Unnamed: 0,diaObjectId,diaSrcIds
0,20407348128382977,"[458095239102546, 11713786981384289, 148695130..."
1,20407348128382978,"[458095239102549, 10702459085258959, 107025922..."


### Repeating the above but now on 'real' objects
What do the real objects contain?

In [33]:
object_table

Unnamed: 0,id,coord_ra,coord_dec,parent,nobs,base_PsfFlux_instFlux_Mean_u,base_PsfFlux_instFlux_Ndata_u,base_PsfFlux_instFlux_MeanErr_u,base_PsfFlux_instFlux_Sigma_u,base_PsfFlux_instFlux_Mean_g,...,base_PsfFlux_instFlux_Mean_z,base_PsfFlux_instFlux_Ndata_z,base_PsfFlux_instFlux_MeanErr_z,base_PsfFlux_instFlux_Sigma_z,base_PsfFlux_instFlux_Mean_y,base_PsfFlux_instFlux_Ndata_y,base_PsfFlux_instFlux_MeanErr_y,base_PsfFlux_instFlux_Sigma_y,tract,patch
0,20407348128382977,1.010832,-0.542038,0,11,,0,,,3.881205,...,24.917757,2,6490.442459,9178.871752,,0,,,4640,30
1,20407348128382978,1.01076,-0.542315,0,61,,0,,,22.796342,...,15.473279,8,0.293193,0.829276,,0,,,4640,30


In [34]:
for c,row in enumerate(object_table['base_PsfFlux_instFlux_Mean_z']):
    print(object_table.loc[c,'base_PsfFlux_instFlux_Mean_z'], 'before changing')
    row=row*10
    object_table.loc[c,'base_PsfFlux_instFlux_Mean_z']=row    
    print(object_table.loc[c,'base_PsfFlux_instFlux_Mean_z'], 'after changing')

24.917757085294927 before changing
249.17757085294926 after changing
15.473278576890086 before changing
154.73278576890087 after changing


In [36]:
_roundTripThroughApdb(objects, sources, dateTime)
_roundTripThroughApdb(object_table, source_table, dateTime)

KeyError: 'pixelId'