Skip to content

Commit

Permalink
Merge pull request #572 from TOMToolkit/feature/share-photometry
Browse files Browse the repository at this point in the history
Share Photometry
  • Loading branch information
jchate6 committed Jan 11, 2023
2 parents fbc3e59 + 7e23b0a commit be2bfdf
Show file tree
Hide file tree
Showing 22 changed files with 857 additions and 20 deletions.
36 changes: 36 additions & 0 deletions docs/managing_data/customizing_data_sharing.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
Customizing Data Sharing
---------------------------

Currently, data sharing is only possible with HERMES.

You will need to add ``DATA_SHARING`` to your ``settings.py`` that will give the proper credentials for the various
streams, TOMS, etc. with which you desire to share data.

.. code:: python
# Define the valid data sharing destinations for your TOM.
DATA_SHARING = {
'hermes': {
'DISPLAY_NAME': os.getenv('HERMES_DISPLAY_NAME', 'Hermes'),
'BASE_URL': os.getenv('HERMES_BASE_URL', 'https://hermes.lco.global/'),
'CREDENTIAL_USERNAME': os.getenv('SCIMMA_CREDENTIAL_USERNAME',
'set SCIMMA_CREDENTIAL_USERNAME value in environment'),
'CREDENTIAL_PASSWORD': os.getenv('SCIMMA_CREDENTIAL_PASSWORD',
'set SCIMMA_CREDENTIAL_PASSWORD value in environment'),
'USER_TOPICS': ['hermes.test', 'tomtoolkit.test']
},
'tom-demo-dev': {
'DISPLAY_NAME': os.getenv('TOM_DEMO_DISPLAY_NAME', 'TOM Demo Dev'),
'BASE_URL': os.getenv('TOM_DEMO_BASE_URL', 'http://tom-demo-dev.lco.gtn/'),
'USERNAME': os.getenv('TOM_DEMO_USERNAME', 'set TOM_DEMO_USERNAME value in environment'),
'PASSWORD': os.getenv('TOM_DEMO_PASSWORD', 'set TOM_DEMO_PASSWORD value in environment'),
},
'localhost-tom': {
# for testing; share with yourself
'DISPLAY_NAME': os.getenv('LOCALHOST_TOM_DISPLAY_NAME', 'Local'),
'BASE_URL': os.getenv('LOCALHOST_TOM_BASE_URL', 'http://127.0.0.1:8000/'),
'USERNAME': os.getenv('LOCALHOST_TOM_USERNAME', 'set LOCALHOST_TOM_USERNAME value in environment'),
'PASSWORD': os.getenv('LOCALHOST_TOM_PASSWORD', 'set LOCALHOST_TOM_PASSWORD value in environment'),
}
}
3 changes: 3 additions & 0 deletions docs/managing_data/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ Managing Data
../api/tom_dataproducts/views
plotting_data
customizing_data_processing
customizing_data_sharing


:doc:`Creating Plots from TOM Data <plotting_data>` - Learn how to create plots using plot.ly and your TOM
data to display anywhere in your TOM.

:doc:`Adding Custom Data Processing <customizing_data_processing>` - Learn how you can process data into your
TOM from uploaded data products.

:doc:`Adding Custom Data Sharing <customizing_data_sharing>` - Learn how you can share data from your TOM.
23 changes: 23 additions & 0 deletions tom_alerts/migrations/0005_alertstreammessage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Generated by Django 4.1 on 2022-10-26 21:16

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('tom_alerts', '0004_auto_20210204_2300'),
]

operations = [
migrations.CreateModel(
name='AlertStreamMessage',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('topic', models.CharField(help_text='The destination or source of sharing for the message.', max_length=500, verbose_name='Message Topic')),
('message_id', models.CharField(help_text='An external message identifier that can be used to locate the message within the given topic.', max_length=50, verbose_name='Message ID')),
('date_shared', models.DateTimeField(auto_now_add=True, help_text='The date on which the message is shared. (Date created by default.)', verbose_name='Date Shared')),
('exchange_status', models.CharField(default='', help_text='Whether this message was sent or received.', max_length=10, verbose_name='Exchange Status')),
],
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 4.1 on 2022-10-26 23:57

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('tom_alerts', '0005_alertstreammessage'),
]

operations = [
migrations.AlterField(
model_name='alertstreammessage',
name='exchange_status',
field=models.CharField(choices=[('published', 'Published'), ('ingested', 'Ingested')], help_text='Whether this message was sent or received.', max_length=10, verbose_name='Exchange Status'),
),
]
47 changes: 47 additions & 0 deletions tom_alerts/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,50 @@ class BrokerQuery(models.Model):

def __str__(self):
return self.name


class AlertStreamMessage(models.Model):
"""
Class representing a streaming message containing data sent/received either over Kafka or to/from another TOM
:param topic: The destination or source of sharing for the message.
:type topic: str
:param message_id: An external message identifier that can be used to locate the message within the given topic.
:type message_id: str
:param date_shared: The date on which the message is shared. (Date created by default.)
:type date_shared: datetime
:param exchange_status: Whether this message was sent or received.
:type exchange_status: str
"""

EXCHANGE_STATUS_CHOICES = (
('published', 'Published'),
('ingested', 'Ingested')
)

topic = models.CharField(
max_length=500,
verbose_name='Message Topic',
help_text='The destination or source of sharing for the message.'
)
message_id = models.CharField(
max_length=50,
verbose_name='Message ID',
help_text='An external message identifier that can be used to locate the message within the given topic.'
)
date_shared = models.DateTimeField(
auto_now_add=True,
verbose_name='Date Shared',
help_text='The date on which the message is shared. (Date created by default.)'
)
exchange_status = models.CharField(
max_length=10,
verbose_name='Exchange Status',
choices=EXCHANGE_STATUS_CHOICES,
help_text='Whether this message was sent or received.'
)

def __str__(self):
return f'Message {self.message_id} on {self.topic}.'
42 changes: 37 additions & 5 deletions tom_base/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,23 @@
},
}


TOM_FACILITY_CLASSES = [
'tom_observations.facilities.lco.LCOFacility',
'tom_observations.facilities.gemini.GEMFacility',
'tom_observations.facilities.soar.SOARFacility',
]


#
# tom_alerts configuration
#
LASAIR_TOKEN = os.getenv('LASAIR_TOKEN', None)


#
# tom_dataproducts configuration
#
# Define the valid data product types for your TOM. Be careful when removing items, as previously valid types will no
# longer be valid, and may cause issues unless the offending records are modified.
DATA_PRODUCT_TYPES = {
Expand All @@ -214,11 +226,31 @@
'spectroscopy': 'tom_dataproducts.processors.spectroscopy_processor.SpectroscopyProcessor',
}

TOM_FACILITY_CLASSES = [
'tom_observations.facilities.lco.LCOFacility',
'tom_observations.facilities.gemini.GEMFacility',
'tom_observations.facilities.soar.SOARFacility',
]
# Configuration for the TOM/Kafka Stream receiving data from this TOM
DATA_SHARING = {
'hermes': {
'DISPLAY_NAME': os.getenv('HERMES_DISPLAY_NAME', 'Hermes'),
'BASE_URL': os.getenv('HERMES_BASE_URL', 'https://hermes.lco.global/'),
'CREDENTIAL_USERNAME': os.getenv('SCIMMA_CREDENTIAL_USERNAME',
'set SCIMMA_CREDENTIAL_USERNAME value in environment'),
'CREDENTIAL_PASSWORD': os.getenv('SCIMMA_CREDENTIAL_PASSWORD',
'set SCIMMA_CREDENTIAL_PASSWORD value in environment'),
'USER_TOPICS': ['hermes.test', 'tomtoolkit.test']
},
'tom-demo-dev': {
'DISPLAY_NAME': os.getenv('TOM_DEMO_DISPLAY_NAME', 'TOM Demo Dev'),
'BASE_URL': os.getenv('TOM_DEMO_BASE_URL', 'http://tom-demo-dev.lco.gtn/'),
'USERNAME': os.getenv('TOM_DEMO_USERNAME', 'set TOM_DEMO_USERNAME value in environment'),
'PASSWORD': os.getenv('TOM_DEMO_PASSWORD', 'set TOM_DEMO_PASSWORD value in environment'),
},
'localhost-tom': {
# for testing; share with yourself
'DISPLAY_NAME': os.getenv('LOCALHOST_TOM_DISPLAY_NAME', 'Local'),
'BASE_URL': os.getenv('LOCALHOST_TOM_BASE_URL', 'http://127.0.0.1:8000/'),
'USERNAME': os.getenv('LOCALHOST_TOM_USERNAME', 'set LOCALHOST_TOM_USERNAME value in environment'),
'PASSWORD': os.getenv('LOCALHOST_TOM_PASSWORD', 'set LOCALHOST_TOM_PASSWORD value in environment'),
}
}

TOM_CADENCE_STRATEGIES = [
'tom_observations.cadences.retry_failed_observations.RetryFailedObservationsStrategy',
Expand Down
Empty file.
165 changes: 165 additions & 0 deletions tom_dataproducts/alertstreams/hermes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import logging
from dateutil.parser import parse

from django.conf import settings

# from hop.io import Metadata

from tom_alerts.models import AlertStreamMessage
from tom_targets.models import Target
from tom_dataproducts.models import ReducedDatum

import requests

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


class BuildHermesMessage(object):
"""
A HERMES Message Object that can be submitted to HOP through HERMES
"""
def __init__(self, title='', submitter='', authors='', message='', topic='hermes.test', **kwargs):
self.title = title
self.submitter = submitter
self.authors = authors
self.message = message
self.topic = topic
self.extra_info = kwargs


def publish_photometry_to_hermes(message_info, datums, **kwargs):
"""
Submits a typical hermes photometry alert using the datums supplied to build a photometry table.
-- Stores an AlertStreamMessage connected to each datum to show that the datum has previously been shared.
:param message_info: HERMES Message Object created with BuildHermesMessage
:param datums: Queryset of Reduced Datums to be built into table.
:return: response
"""
stream_base_url = settings.DATA_SHARING['hermes']['BASE_URL']
submit_url = stream_base_url + 'api/v0/' + 'submit_photometry/'
headers = {'SCIMMA-API-Auth-Username': settings.DATA_SHARING['hermes']['CREDENTIAL_USERNAME'],
'SCIMMA-API-Auth-Password': settings.DATA_SHARING['hermes']['CREDENTIAL_PASSWORD']}
hermes_photometry_data = []
hermes_alert = AlertStreamMessage(topic=message_info.topic, exchange_status='published')
hermes_alert.save()
for tomtoolkit_photometry in datums:
tomtoolkit_photometry.message.add(hermes_alert)
hermes_photometry_data.append(create_hermes_phot_table_row(tomtoolkit_photometry, **kwargs))
alert = {
'topic': message_info.topic,
'title': message_info.title,
'submitter': message_info.submitter,
'authors': message_info.authors,
'data': {
'photometry': hermes_photometry_data,
'extra_info': message_info.extra_info
},
'message_text': message_info.message,
}

response = requests.post(url=submit_url, json=alert, headers=headers)
return response


def create_hermes_phot_table_row(datum, **kwargs):
"""Build a row for a Hermes Photometry Table using a TOM Photometry datum
"""
table_row = {
'target_name': datum.target.name,
'ra': datum.target.ra,
'dec': datum.target.dec,
'date': datum.timestamp.isoformat(),
'telescope': datum.value.get('telescope', ''),
'instrument': datum.value.get('instrument', ''),
'band': datum.value.get('filter', ''),
'brightness_unit': datum.value.get('unit', 'AB mag'),
}
if datum.value.get('magnitude', None):
table_row['brightness'] = datum.value['magnitude']
else:
table_row['brightness'] = datum.value.get('limit', None)
table_row['nondetection'] = True
if datum.value.get('magnitude_error', None):
table_row['brightness_error'] = datum.value['magnitude_error']
return table_row


def get_hermes_topics():
"""
!CURRENTLY UNUSED!
Method to retrieve a list of available topics from HOP.
Intended to be called from forms when building topic list.
TODO: Retrieve list from HOP, currently unavailable due to authentication issues.
:return: List of topics available for users
"""
# stream_base_url = settings.DATA_SHARING['hermes']['BASE_URL']
# submit_url = stream_base_url + "api/v0/profile/"
# headers = {'SCIMMA-API-Auth-Username': settings.DATA_SHARING['hermes']['CREDENTIAL_USERNAME'],
# 'SCIMMA-API-Auth-Password': settings.DATA_SHARING['hermes']['CREDENTIAL_PASSWORD']}
# user = settings.DATA_SHARING['hermes']['SCIMMA_AUTH_USERNAME']
# headers = {}

# response = requests.get(url=submit_url, headers=headers)
topics = settings.DATA_SHARING['hermes']['USER_TOPICS']
return topics


def hermes_alert_handler(alert, metadata):
"""Alert Handler to record data streamed through Hermes as a new ReducedDatum.
-- Only Reads Photometry Data
-- Only ingests Data if exact match for Target Name
-- Does not Ingest Data if exact match already exists
-- Requires 'tom_alertstreams' in settings.INSTALLED_APPS
-- Requires ALERT_STREAMS['topic_handlers'] in settings
"""
alert_as_dict = alert.content
photometry_table = alert_as_dict['data'].get('photometry', None)
if photometry_table:
hermes_alert = AlertStreamMessage(topic=alert_as_dict['topic'], exchange_status='ingested')
for row in photometry_table:
try:
target = Target.objects.get(name=row['target_name'])
except Target.DoesNotExist:
continue

try:
obs_date = parse(row['date'])
except ValueError:
continue

datum = {
'target': target,
'data_type': 'photometry',
'source_name': alert_as_dict['topic'],
'source_location': 'Hermes via HOP', # TODO Add message URL here once message ID's exist
'timestamp': obs_date,
'value': get_hermes_phot_value(row)
}
new_rd, created = ReducedDatum.objects.get_or_create(**datum)
if created:
hermes_alert.save()
new_rd.message.add(hermes_alert)
new_rd.save()


def get_hermes_phot_value(phot_data):
"""
Convert Hermes Message format for a row of Photometry table into parameters accepted by the Reduced Datum model
:param phot_data: Dictionary containing Hermes Photometry table.
:return: Dictionary containing properly formatted parameters for Reduced_Datum
"""
data_dictionary = {
'magnitude_error': phot_data.get('brightness_error', ''),
'filter': phot_data['band'],
'telescope': phot_data.get('telescope', ''),
'instrument': phot_data.get('instrument', ''),
'unit': phot_data['brightness_unit'],
}

if not phot_data.get('nondetection', False):
data_dictionary['magnitude'] = phot_data['brightness']
else:
data_dictionary['limit'] = phot_data['brightness']

return data_dictionary
Loading

0 comments on commit be2bfdf

Please sign in to comment.