Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hermes Sharing with Updated Schema #622

Merged
merged 23 commits into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3, 3.9, 3.8]
python-version: ["3.11", "3.10", "3.9", "3.8"]
os: [ubuntu-latest, macos-latest]
steps:
- uses: actions/checkout@v2
Expand Down
8 changes: 3 additions & 5 deletions docs/managing_data/stream_pub_sub.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ TOM Toolkit supports publishing data to a Kafka stream such as `Hermes <https://
`HOPSKOTCH <https://hop.scimma.org>`_) and `GCNClassicOverKafka <https://gcn.nasa.gov>`_.

When sharing photometry data via Hermes, the TOM publishes the data to be shared to a topic on the HOPSKOTCH
Kafka stream. At this time, only photometry data is supported.
Kafka stream. At this time, only photometry data is supported by TOM Toolkit. To submit via the Hermes API, you will
need to copy your Hermes API Key from your Hermes profile page.


Configuring your TOM to Publish Data to a stream:
Expand All @@ -27,10 +28,7 @@ for the various streams with which you wish to share data.
'hermes': {
'DISPLAY_NAME': os.getenv('HERMES_DISPLAY_NAME', 'Hermes'),
'BASE_URL': os.getenv('HERMES_BASE_URL', 'https://hermes.lco.global/'),
'CREDENTIAL_USERNAME': os.getenv('SCIMMA_CREDENTIAL_USERNAME',
'set SCIMMA_CREDENTIAL_USERNAME value in environment'),
'CREDENTIAL_PASSWORD': os.getenv('SCIMMA_CREDENTIAL_PASSWORD',
'set SCIMMA_CREDENTIAL_PASSWORD value in environment'),
'HERMES_API_KEY': os.getenv('HERMES_API_KEY', 'set HERMES_API_KEY value in environment'),
'USER_TOPICS': ['hermes.test', 'tomtoolkit.test']
},
}
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
'astroplan~=0.8',
'astropy>=5.0',
'beautifulsoup4~=4.9',
'django>=3.1,<5.0', # TOM Toolkit requires db math functions
'django>=3.1,<5', # TOM Toolkit requires db math functions
'djangorestframework~=3.12',
'django-bootstrap4>=3,<24',
'django-contrib-comments~=2.0', # Earlier version are incompatible with Django >= 3.0
Expand Down
18 changes: 18 additions & 0 deletions tom_alerts/migrations/0007_alter_alertstreammessage_message_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 4.1.6 on 2023-03-22 20:57

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('tom_alerts', '0006_alter_alertstreammessage_exchange_status'),
]

operations = [
migrations.AlterField(
model_name='alertstreammessage',
name='message_id',
field=models.CharField(help_text='An external message identifier that can be used to locate the message within the given topic.', max_length=50, null=True, verbose_name='Message ID'),
),
]
1 change: 1 addition & 0 deletions tom_alerts/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class AlertStreamMessage(models.Model):
)
message_id = models.CharField(
max_length=50,
null=True,
verbose_name='Message ID',
help_text='An external message identifier that can be used to locate the message within the given topic.'
)
Expand Down
2 changes: 1 addition & 1 deletion tom_common/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ def test_user_can_update_self(self):
'password2': 'forc34eva!',
}
response = self.client.post(reverse('user-update', kwargs={'pk': user.id}), data=user_data, follow=True)
self.assertContains(response, 'Profile updated')
user.refresh_from_db()
self.assertEqual(user.first_name, 'Luke')
self.assertContains(response, 'Profile updated')

def test_user_cannot_update_other(self):
user = User.objects.create(username='luke', password='forc3')
Expand Down
157 changes: 122 additions & 35 deletions tom_dataproducts/alertstreams/hermes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# from hop.io import Metadata

from tom_alerts.models import AlertStreamMessage
from tom_targets.models import Target
from tom_targets.models import Target, TargetList
from tom_dataproducts.models import ReducedDatum

import requests
Expand Down Expand Up @@ -37,13 +37,17 @@ def publish_photometry_to_hermes(message_info, datums, **kwargs):
:return: response
"""
stream_base_url = settings.DATA_SHARING['hermes']['BASE_URL']
submit_url = stream_base_url + 'api/v0/' + 'submit_photometry/'
headers = {'SCIMMA-API-Auth-Username': settings.DATA_SHARING['hermes']['CREDENTIAL_USERNAME'],
'SCIMMA-API-Auth-Password': settings.DATA_SHARING['hermes']['CREDENTIAL_PASSWORD']}
submit_url = stream_base_url + 'api/v0/' + 'submit_message/'
# You will need your Hermes API key. This can be found on your Hermes profile page.
headers = {'Authorization': f"Token {settings.DATA_SHARING['hermes']['HERMES_API_KEY']}"}

hermes_photometry_data = []
hermes_target_list = []
hermes_alert = AlertStreamMessage(topic=message_info.topic, exchange_status='published')
hermes_alert.save()
for tomtoolkit_photometry in datums:
if tomtoolkit_photometry.target.name not in [target['name'] for target in hermes_target_list]:
hermes_target_list.append(create_hermes_target_table_row(tomtoolkit_photometry.target, **kwargs))
tomtoolkit_photometry.message.add(hermes_alert)
hermes_photometry_data.append(create_hermes_phot_table_row(tomtoolkit_photometry, **kwargs))
alert = {
Expand All @@ -52,8 +56,9 @@ def publish_photometry_to_hermes(message_info, datums, **kwargs):
'submitter': message_info.submitter,
'authors': message_info.authors,
'data': {
'targets': hermes_target_list,
'photometry': hermes_photometry_data,
'extra_info': message_info.extra_info
'extra_data': message_info.extra_info
},
'message_text': message_info.message,
}
Expand All @@ -62,51 +67,82 @@ def publish_photometry_to_hermes(message_info, datums, **kwargs):
return response


def create_hermes_target_table_row(target, **kwargs):
"""Build a row for a Hermes Target Table from a TOM target Model.
"""
if target.type == "SIDEREAL":
target_table_row = {
'name': target.name,
'ra': target.ra,
'dec': target.dec,
}
if target.epoch:
target_table_row['epoch'] = target.epoch
if target.pm_ra:
target_table_row['pm_ra'] = target.pm_ra
if target.pm_dec:
target_table_row['pm_dec'] = target.pm_dec
else:
target_table_row = {
'name': target.name,
'orbital_elements': {
"epoch_of_elements": target.epoch_of_elements,
"eccentricity": target.eccentricity,
"argument_of_the_perihelion": target.arg_of_perihelion,
"mean_anomaly": target.mean_anomaly,
"orbital_inclination": target.inclination,
"longitude_of_the_ascending_node": target.lng_asc_node,
"semimajor_axis": target.semimajor_axis,
"epoch_of_perihelion": target.epoch_of_perihelion,
"perihelion_distance": target.perihdist,
}
}
target_table_row['aliases'] = [alias.name for alias in target.aliases.all()]
return target_table_row


def create_hermes_phot_table_row(datum, **kwargs):
"""Build a row for a Hermes Photometry Table using a TOM Photometry datum
"""
table_row = {
phot_table_row = {
'target_name': datum.target.name,
'ra': datum.target.ra,
'dec': datum.target.dec,
'date': datum.timestamp.isoformat(),
'date_obs': datum.timestamp.isoformat(),
'telescope': datum.value.get('telescope', ''),
'instrument': datum.value.get('instrument', ''),
'band': datum.value.get('filter', ''),
'bandpass': datum.value.get('filter', ''),
'brightness_unit': datum.value.get('unit', 'AB mag'),
}
if datum.value.get('magnitude', None):
table_row['brightness'] = datum.value['magnitude']
phot_table_row['brightness'] = datum.value['magnitude']
else:
table_row['brightness'] = datum.value.get('limit', None)
table_row['nondetection'] = True
phot_table_row['limiting_brightness'] = datum.value.get('limit', None)
if datum.value.get('magnitude_error', None):
table_row['brightness_error'] = datum.value['magnitude_error']
return table_row
phot_table_row['brightness_error'] = datum.value['magnitude_error']
return phot_table_row


def get_hermes_topics():
def get_hermes_topics(**kwargs):
"""
!CURRENTLY UNUSED!
Method to retrieve a list of available topics from HOP.
Intended to be called from forms when building topic list.
TODO: Retrieve list from HOP, currently unavailable due to authentication issues.
:return: List of topics available for users
Extend this method to restrict topics for individual users.
:return: List of writable topics available for TOM.
"""
# stream_base_url = settings.DATA_SHARING['hermes']['BASE_URL']
# submit_url = stream_base_url + "api/v0/profile/"
# headers = {'SCIMMA-API-Auth-Username': settings.DATA_SHARING['hermes']['CREDENTIAL_USERNAME'],
# 'SCIMMA-API-Auth-Password': settings.DATA_SHARING['hermes']['CREDENTIAL_PASSWORD']}
# user = settings.DATA_SHARING['hermes']['SCIMMA_AUTH_USERNAME']
# headers = {}

# response = requests.get(url=submit_url, headers=headers)
topics = settings.DATA_SHARING['hermes']['USER_TOPICS']
try:
stream_base_url = settings.DATA_SHARING['hermes']['BASE_URL']
submit_url = stream_base_url + "api/v0/profile/"
headers = {'Authorization': f"Token {settings.DATA_SHARING['hermes']['HERMES_API_KEY']}"}

response = requests.get(url=submit_url, headers=headers)

topics = response.json()['writable_topics']
except KeyError:
topics = settings.DATA_SHARING['hermes']['USER_TOPICS']
return topics


def hermes_alert_handler(alert, metadata):
"""Alert Handler to record data streamed through Hermes as a new ReducedDatum.
"""Example Alert Handler to record data streamed through Hermes as a new ReducedDatum.
-- Only Reads Photometry Data
-- Only ingests Data if exact match for Target Name
-- Does not Ingest Data if exact match already exists
Expand All @@ -115,8 +151,11 @@ def hermes_alert_handler(alert, metadata):
"""
alert_as_dict = alert.content
photometry_table = alert_as_dict['data'].get('photometry', None)
# target_table = alert_as_dict['data'].get('targets', None)
if photometry_table:
hermes_alert = AlertStreamMessage(topic=alert_as_dict['topic'], exchange_status='ingested')
hermes_alert = AlertStreamMessage(topic=alert_as_dict['topic'],
exchange_status='ingested',
message_id=alert_as_dict.get("uuid", None))
target_name = ''
query = []
for row in photometry_table:
Expand All @@ -126,10 +165,12 @@ def hermes_alert_handler(alert, metadata):
if query:
target = query[0]
else:
# add conditional statements for whether to ingest a target here.
# target = create_new_hermes_target(target_table, target_name, target_list_name="new_hermes_object")
continue

try:
obs_date = parse(row['date'])
obs_date = parse(row['date_obs'])
except ValueError:
continue

Expand All @@ -156,15 +197,61 @@ def get_hermes_phot_value(phot_data):
"""
data_dictionary = {
'magnitude_error': phot_data.get('brightness_error', ''),
'filter': phot_data['band'],
'filter': phot_data['bandpass'],
'telescope': phot_data.get('telescope', ''),
'instrument': phot_data.get('instrument', ''),
'unit': phot_data['brightness_unit'],
}

if not phot_data.get('nondetection', False):
if phot_data.get('brightness', None):
data_dictionary['magnitude'] = phot_data['brightness']
else:
data_dictionary['limit'] = phot_data['brightness']
elif phot_data.get('limiting_brightness', None):
data_dictionary['limit'] = phot_data['limiting_brightness']

return data_dictionary


def create_new_hermes_target(target_table, target_name=None, target_list_name=None):
"""
Ingest a target into your TOM from Hermes.
Takes a target_table and a target_name. If no target name is given, every target on the target table will be
ingested.
:param target_table: Hermes Target table from a Hermes Message
:param target_name: Name for individual target to ingest from target table.
:param target_list_name: Name of TargetList within which new target should be placed.
:return:
"""
target = None
for hermes_target in target_table:
if target_name == hermes_target['name'] or target_name is None:

new_target = {"name": hermes_target.pop('name')}
if "ra" in hermes_target and "dec" in hermes_target:
new_target['type'] = 'SIDEREAL'
new_target['ra'] = hermes_target.pop('ra')
new_target['dec'] = hermes_target.pop('dec')
new_target['pm_ra'] = hermes_target.pop('pm_ra', None)
new_target['pm_dec'] = hermes_target.pop('pm_dec', None)
new_target['epoch'] = hermes_target.pop('epoch', None)
elif "orbital_elements" in hermes_target:
orbital_elements = hermes_target.pop('orbital_elements')
new_target['type'] = 'NON_SIDEREAL'
new_target['epoch_of_elements'] = orbital_elements.pop('epoch_of_elements', None)
new_target['mean_anomaly'] = orbital_elements.pop('mean_anomaly', None)
new_target['arg_of_perihelion'] = orbital_elements.pop('argument_of_the_perihelion', None)
new_target['eccentricity'] = orbital_elements.pop('eccentricity', None)
new_target['lng_asc_node'] = orbital_elements.pop('longitude_of_the_ascending_node', None)
new_target['inclination'] = orbital_elements.pop('orbital_inclination', None)
new_target['semimajor_axis'] = orbital_elements.pop('semimajor_axis', None)
new_target['epoch_of_perihelion'] = orbital_elements.pop('epoch_of_perihelion', None)
new_target['perihdist'] = orbital_elements.pop('perihelion_distance', None)
aliases = hermes_target.pop('aliases', [])
target = Target(**new_target)
target.full_clean()
target.save(names=aliases, extras=hermes_target)
if target_list_name:
target_list, created = TargetList.objects.get_or_create(name=target_list_name)
if created:
logger.debug(f'New target_list created: {target_list_name}')
target_list.targets.add(target)
return target
2 changes: 1 addition & 1 deletion tom_dataproducts/data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

def run_data_processor(dp):
"""
Reads the `data_product_type` from the dp parameter and imports the corresponding `DataProcessor` specified in
Reads the `data_product_type` from the dp parameter and imports the corresponding `DATA_PROCESSORS` specified in
`settings.py`, then runs `process_data` and inserts the returned values into the database.

:param dp: DataProduct which will be processed into a list
Expand Down
11 changes: 10 additions & 1 deletion tom_dataproducts/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from tom_dataproducts.models import DataProductGroup, DataProduct
from tom_observations.models import ObservationRecord
from tom_targets.models import Target
from tom_dataproducts.alertstreams.hermes import get_hermes_topics


def get_sharing_destination_options():
Expand All @@ -20,7 +21,11 @@ def get_sharing_destination_options():
if details.get('USER_TOPICS', None):
# If topics exist for a destination (Such as HERMES) give topics as sub-choices
# for non-selectable Destination
topic_list = [(f'{destination}:{topic}', topic) for topic in details['USER_TOPICS']]
if destination == "hermes":
destination_topics = get_hermes_topics()
else:
destination_topics = details['USER_TOPICS']
topic_list = [(f'{destination}:{topic}', topic) for topic in destination_topics]
new_destination.append(tuple(topic_list))
else:
# Otherwise just use destination as option
Expand Down Expand Up @@ -91,3 +96,7 @@ class DataShareForm(forms.Form):
submitter = forms.CharField(
widget=forms.HiddenInput()
)

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fields['share_destination'].choices = DESTINATION_OPTIONS
5 changes: 2 additions & 3 deletions tom_dataproducts/processors/photometry_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,9 @@ def _process_photometry_from_plaintext(self, data_product):
time.format = 'datetime'
value = {
'timestamp': time.to_datetime(timezone=utc),
'magnitude': datum['magnitude'],
'filter': datum['filter'],
'error': datum['error']
}
for column_name in datum.colnames:
value[column_name] = datum[column_name]
photometry.append(value)

return photometry
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Generated by Django 4.1.6 on 2023-03-22 20:57

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('tom_targets', '0019_auto_20210811_0018'),
]

operations = [
migrations.AlterField(
model_name='targetname',
name='created',
field=models.DateTimeField(auto_now_add=True, help_text='The time at which this target name was created.'),
),
migrations.AlterField(
model_name='targetname',
name='modified',
field=models.DateTimeField(auto_now=True, help_text='The time at which this target name was changed in the TOM database.', verbose_name='Last Modified'),
),
]