Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/routes/v1/lsst/resolver/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def resolve_name(payload: dict) -> pd.DataFrame:
results = client.scan(
"",
to_evaluate,
"r:diaObjectId,f:cdsxmatch,r:ra,r:dec",
"r:diaObjectId,f:main_label_classifier,r:ra,r:dec",
0,
False,
False,
Expand Down
10 changes: 10 additions & 0 deletions apps/routes/v1/lsst/schema/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@
example="/api/v1/sources",
required=True,
),
"major_version": fields.Integer(
description="Major version. Default is latest.",
example=9,
required=False,
),
"minor_version": fields.Integer(
description="Minor version. Default is latest.",
example=0,
required=False,
),
},
)

Expand Down
171 changes: 118 additions & 53 deletions apps/routes/v1/lsst/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,6 @@
from line_profiler import profile


def flatten_nested(schema, entry_name):
""" """
blobs = [cat["type"] for cat in schema["fields"] if cat["name"] == entry_name][0]

if isinstance(blobs, list):
# nullable entry
# Take the dict entry (non-null)
mask = [isinstance(blob, dict) for blob in blobs]
index = mask.index(True)
dic = blobs[index]
else:
dic = blobs

return dic["fields"]


def sort_dict(adict):
""" """
return {key: adict[key] for key in sorted(adict.keys())}
Expand All @@ -49,121 +33,204 @@ def extract_schema(payload: dict) -> Response:
All fields are initially defined in `fink_broker.rubin.hbase_utils`

"""
# LSST candidate fields
# FIXME: allow user to input a version?
r = requests.get(
"https://usdf-alert-schemas-dev.slac.stanford.edu/subjects/alert-packet/versions/latest/schema"
if ("major_version" not in payload) or ("minor_version" not in payload):
# Get latest version
r = requests.get(
"https://raw.githubusercontent.com/lsst/alert_packet/refs/heads/main/python/lsst/alert/packet/schema/latest.txt"
)
version = "{}".format(r.json())
major_version, minor_version = [int(i) for i in version.split(".")]
else:
major_version = payload["major_version"]
minor_version = payload["minor_version"]

base_url = "https://raw.githubusercontent.com/lsst/alert_packet/refs/heads/main/python/lsst/alert/packet/schema"

r_root = requests.get(
"{}/{}/{}/lsst.v{}_{}.alert.avsc".format(
base_url, major_version, minor_version, major_version, minor_version
)
)
rubin_schema = r.json()
root_schema = r_root.json()

# root level should be everywhere
root_rubin_names = ["observation_reason", "target_name", "diaSourceId"]
root_list = [i for i in rubin_schema["fields"] if i["name"] in root_rubin_names]
root_list = [i for i in root_schema["fields"] if i["name"] in root_rubin_names]

cutout_rubin_names = ["cutoutDifference", "cutoutTemplate", "cutoutScience"]
cutout_list = [i for i in rubin_schema["fields"] if i["name"] in cutout_rubin_names]
cutout_list = [i for i in root_schema["fields"] if i["name"] in cutout_rubin_names]

# Other fields
r_diaSource = requests.get(
"{}/{}/{}/lsst.v{}_{}.diaSource.avsc".format(
base_url, major_version, minor_version, major_version, minor_version
)
)
diaSource_schema = r_diaSource.json()["fields"]

r_diaObject = requests.get(
"{}/{}/{}/lsst.v{}_{}.diaObject.avsc".format(
base_url, major_version, minor_version, major_version, minor_version
)
)
diaObject_schema = r_diaObject.json()["fields"]

r_ssSource = requests.get(
"{}/{}/{}/lsst.v{}_{}.ssSource.avsc".format(
base_url, major_version, minor_version, major_version, minor_version
)
)
ssSource_schema = r_ssSource.json()["fields"]

# Fink Science modules
fink_science = [
fink_source_science = [
{
"name": "cdsxmatch",
"name": "crossmatches_simbad_otype",
"type": "string",
"doc": "Object type of the closest source from SIMBAD database; if exists within 1 arcsec. See https://api.fink-portal.org/api/v1/classes",
},
{
"name": "gcvs",
"name": "crossmatches_gcvs_type",
"type": "string",
"doc": "Object type of the closest source from GCVS catalog; if exists within 1 arcsec.",
},
{
"name": "vsx",
"name": "crossmatches_vizier:B/vsx/vsx_Type",
"type": "string",
"doc": "Object type of the closest source from VSX catalog; if exists within 1 arcsec.",
},
{
"name": "DR3Name",
"name": "crossmatches_vizier:I/355/gaiadr3_DR3Name",
"type": "string",
"doc": "Unique source designation of closest source from Gaia catalog; if exists within 1 arcsec.",
},
{
"name": "Plx",
"name": "crossmatches_vizier:I/355/gaiadr3_Plx",
"type": "double",
"doc": "Absolute stellar parallax (in milli-arcsecond) of the closest source from Gaia catalog; if exists within 1 arcsec.",
},
{
"name": "e_Plx",
"name": "crossmatches_vizier:I/355/gaiadr3_e_Plx",
"type": "double",
"doc": "Standard error of the stellar parallax (in milli-arcsecond) of the closest source from Gaia catalog; if exists within 1 arcsec.",
},
{
"name": "x3hsp",
"name": "crossmatches_x3hsp_type",
"type": "string",
"doc": "Counterpart (cross-match) to the 3HSP catalog if exists within 1 arcminute.",
},
{
"name": "x4lac",
"name": "crossmatches_x4lac_type",
"type": "string",
"doc": "Counterpart (cross-match) to the 4LAC DR3 catalog if exists within 1 arcminute.",
},
{
"name": "mangrove_HyperLEDA_name",
"name": "crossmatches_mangrove_HyperLEDA_name",
"type": "string",
"doc": "HyperLEDA source designation of closest source from Mangrove catalog; if exists within 1 arcmin.",
},
{
"name": "mangrove_2MASS_name",
"name": "crossmatches_mangrove_2MASS_name",
"type": "string",
"doc": "2MASS source designation of closest source from Mangrove catalog; if exists within 1 arcmin.",
},
{
"name": "mangrove_lum_dist",
"name": "crossmatches_mangrove_lum_dist",
"type": "string",
"doc": "Luminosity distance of closest source from Mangrove catalog; if exists within 1 arcmin.",
},
{
"name": "mangrove_ang_dist",
"name": "crossmatches_mangrove_ang_dist",
"type": "string",
"doc": "Angular distance of closest source from Mangrove catalog; if exists within 1 arcmin.",
},
{
"name": "spicy_id",
"name": "crossmatches_vizier:J/ApJS/254/33/table1_SPICY",
"type": "string",
"doc": "Unique source designation of closest source from SPICY catalog; if exists within 1.2 arcsec.",
},
{
"name": "spicy_class",
"name": "crossmatches_vizier:J/ApJS/254/33/table1_class",
"type": "string",
"doc": "Class name of closest source from SPICY catalog; if exists within 1.2 arcsec.",
},
{
"name": "tns",
"name": "crossmatches_tns_type",
"type": "string",
"doc": "TNS label, if it exists.",
},
{
"name": "gaiaClass",
"name": "crossmatches_vizier:I/358/vclassre_Class",
"type": "str",
"doc": "Name of best class from Gaia DR3 Part 4. Variability (I/358/vclassre).",
},
{
"name": "gaiaVarFlag",
"name": "crossmatches_vizier:I/355/gaiadr3_VarFlag",
"type": "int",
"doc": "Photometric variability flag from Gaia DR3. 1 if the source is variable, 0 otherwise.",
},
{
"name": "classifiers_cats_class",
"type": "int",
"doc": "CATS classifier broad class prediction with the highest probability. 11=SN-like, 12=Fast (e.g. KN, ulens, Novae, ...), 13=Long (e.g. SLSN, TDE, ...), 21=Periodic (e.g. RRLyrae, EB, ...), 22=Non-periodic (e.g. AGN). See https://arxiv.org/abs/2404.08798",
},
{
"name": "classifiers_earlySNIa_score",
"type": "float",
"doc": "Score (0...1) for the early SN Ia classifier (binary classifier). See https://arxiv.org/abs/2404.08798",
},
{
"name": "classifiers_slsn_score",
"type": "float",
"doc": "Score (0...1) for the super-luminous SN classifier (binary classifier). See https://arxiv.org/abs/2404.08798",
},
{
"name": "classifiers_snnSnVsOthers_score",
"type": "float",
"doc": "Score (0...1) for the SN classifier (binary classifier) using SuperNNova. See https://arxiv.org/abs/2404.08798",
},
{
"name": "fink_broker_version",
"type": "string",
"doc": "fink-broker schema version used",
"doc": "fink-broker schema version used to process the alert",
},
{
"name": "fink_science_version",
"type": "string",
"doc": "fink-science schema version used",
"doc": "fink-science schema version used to process the alert",
},
{
"name": "lsst_schema_version",
"type": "string",
"doc": "LSST schema version used",
"doc": "LSST schema version used to generate the alert",
},
]

fink_object_science = [
{
"name": "is_cataloged",
"type": "boolean",
"doc": "True if the last diaSource (alert) of the diaObject (object) has a counterpart in either SIMBAD or Gaia DR3. False otherwise.",
},
{
"name": "is_sso",
"type": "boolean",
"doc": "True if the diaSource is associate to a known Solar System object. False otherwise.",
},
{
"name": "is_first",
"type": "boolean",
"doc": "True if the alert is not a Solar System object and has no history (first detection at this location).",
},
{
"name": "main_label_classifier",
"type": "int",
"doc": "Main prediction from Fink classifiers for the last received alert of this object. This is currently set to the CATS prediction only (f:classifiers_cats_class). Subject to change.",
},
{
"name": "main_label_crossmatch",
"type": "string",
"doc": "Main association from various crossmatches for the last received alert of this object. This is currently set to the SIMBAD label only (f:crossmatches_simbad_otype). Subject to change.",
},
]

Expand All @@ -173,13 +240,13 @@ def extract_schema(payload: dict) -> Response:
"Rubin original fields (r:)": sort_dict(
{
i["name"]: {"type": i["type"], "doc": i.get("doc", "TBD")}
for i in flatten_nested(rubin_schema, "diaSource") + root_list
for i in diaSource_schema + root_list
}
),
"Fink science module outputs (f:)": sort_dict(
{
i["name"]: {"type": i["type"], "doc": i.get("doc", "TBD")}
for i in fink_science
for i in fink_source_science
}
),
}
Expand All @@ -189,13 +256,13 @@ def extract_schema(payload: dict) -> Response:
"Rubin original fields (r:)": sort_dict(
{
i["name"]: {"type": i["type"], "doc": i.get("doc", "TBD")}
for i in flatten_nested(rubin_schema, "diaObject") + root_list
for i in diaObject_schema + root_list
}
),
"Fink science module outputs (f:)": sort_dict(
{
i["name"]: {"type": i["type"], "doc": i.get("doc", "TBD")}
for i in fink_science
for i in fink_object_science
}
),
}
Expand All @@ -204,15 +271,13 @@ def extract_schema(payload: dict) -> Response:
"Rubin original fields (r:)": sort_dict(
{
i["name"]: {"type": i["type"], "doc": i.get("doc", "TBD")}
for i in flatten_nested(rubin_schema, "diaSource")
+ flatten_nested(rubin_schema, "diaObject")
+ root_list
for i in diaSource_schema + diaObject_schema + root_list
}
),
"Fink science module outputs (f:)": sort_dict(
{
i["name"]: {"type": i["type"], "doc": i.get("doc", "TBD")}
for i in fink_science
for i in fink_source_science + fink_object_science
}
),
}
Expand All @@ -230,7 +295,7 @@ def extract_schema(payload: dict) -> Response:
"Rubin original fields (r:)": sort_dict(
{
i["name"]: {"type": i["type"], "doc": i.get("doc", "TBD")}
for i in flatten_nested(rubin_schema, "ssSource") + root_list
for i in ssSource_schema + root_list
}
),
"Fink science module outputs (f:)": sort_dict(
Expand Down
25 changes: 17 additions & 8 deletions apps/utils/decoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,23 @@ def format_lsst_hbase_output(
if _ in pdfs.columns:
pdfs = pdfs.drop(columns=_)

# Type conversion
for col in pdfs.columns:
pdfs[col] = convert_datatype(
pdfs[col],
hbase_type_converter[schema_client.type(col)],
)
# Create a dictionary to hold the new columns
new_columns = {}

# Use fixed schema
for col in schema_client.columnNames():
if col in pdfs.columns:
# Type conversion
new_columns[col] = convert_datatype(
pdfs[col],
hbase_type_converter[schema_client.type(col)],
)
else:
# Column is only NaN so it was not transferred
new_columns[col] = np.nan

# Create a new DataFrame with the new columns (overwrite)
pdfs = pd.DataFrame(new_columns)

# Booleans
pdfs = pdfs.replace(to_replace={"true": True, "false": False})
Expand All @@ -198,8 +209,6 @@ def format_lsst_hbase_output(
if col in pdfs.columns:
pdfs[col] = pdfs[col].replace("nan", "[]")

pdfs = pdfs.copy() # Fix Pandas' "DataFrame is highly fragmented" warning

# Display only the last alert
if (
group_alerts
Expand Down