diff --git a/docs/managing_data/index.rst b/docs/managing_data/index.rst index 86a7108d4..78979a9f5 100644 --- a/docs/managing_data/index.rst +++ b/docs/managing_data/index.rst @@ -9,6 +9,8 @@ Managing Data ../api/tom_dataproducts/views plotting_data customizing_data_processing + tom_direct_sharing + stream_pub_sub :doc:`Creating Plots from TOM Data ` - Learn how to create plots using plot.ly and your TOM @@ -16,3 +18,9 @@ data to display anywhere in your TOM. :doc:`Adding Custom Data Processing ` - Learn how you can process data into your TOM from uploaded data products. + +:doc:`TOM-TOM Direct Sharing ` - Learn how you can send and receive data between your TOM and another TOM-Toolkit TOM via an API. + +:doc:`Publish and Subscribe to a Kafka Stream ` - Learn how to publish and subscribe to a Kafka stream topic. + + diff --git a/docs/managing_data/stream_pub_sub.rst b/docs/managing_data/stream_pub_sub.rst new file mode 100644 index 000000000..2e072b5a3 --- /dev/null +++ b/docs/managing_data/stream_pub_sub.rst @@ -0,0 +1,78 @@ +Publish and Subscribe to a Kafka Stream +--------------------------------------- + +Publishing data to a stream and subscribing to a stream are handled independently and we describe each below. + + +Publish Data to a Kafka Topic +############################# + +TOM Toolkit supports publishing data to a Kafka stream such as `Hermes `_ (an interface to +`HOPSKOTCH `_) and `GCNClassicOverKafka `_. + +When sharing photometry data via Hermes, the TOM publishes the data to be shared to a topic on the HOPSKOTCH +Kafka stream. At this time, only photometry data is supported. + + +Configuring your TOM to Publish Data to a stream: +************************************************* + +You will need to add a ``DATA_SHARING`` configuration dictionary to your ``settings.py`` that gives the credentials +for the various streams with which you wish to share data. + +.. code:: python + + # Define the valid data sharing destinations for your TOM. + DATA_SHARING = { + 'hermes': { + 'DISPLAY_NAME': os.getenv('HERMES_DISPLAY_NAME', 'Hermes'), + 'BASE_URL': os.getenv('HERMES_BASE_URL', 'https://hermes.lco.global/'), + 'CREDENTIAL_USERNAME': os.getenv('SCIMMA_CREDENTIAL_USERNAME', + 'set SCIMMA_CREDENTIAL_USERNAME value in environment'), + 'CREDENTIAL_PASSWORD': os.getenv('SCIMMA_CREDENTIAL_PASSWORD', + 'set SCIMMA_CREDENTIAL_PASSWORD value in environment'), + 'USER_TOPICS': ['hermes.test', 'tomtoolkit.test'] + }, + } + +Subscribe to a Kafka Topic +########################## + +TOM Toolkit allows a TOM to subscribe to a topic on a Kafka stream, ingesting messages from that topic and handling the data. +This could involve simply logging the message or extracting the data from the message and saving it if it is properly formatted. + +Configuring your TOM to subscribe to a stream: +********************************************** + +First you will need to add ``tom_alertstreams`` to your list of ``INSTALLED_APPS`` in your ``settings.py``. + +.. code:: python + + INSTALLED_APPS = [ + ... + 'tom_alertstreams', + ] + +Then you will need to add an ``ALERT_STREAMS`` configuration dictionary to your ``settings.py``. This gives the credentials +for the various streams to which you wish to subscribe. Additionally, the ``TOPIC_HANDLERS`` section of the stream ``OPTIONS`` +will include a list of handlers for each topic. + +Some alert handlers are included as examples. Below we demonstrate how to connect to a Hermes Topic. You'll want to check +out the ``tom-alertstreams`` `README `_ for more details. + +.. code:: python + + ALERT_STREAMS = [ + { + 'ACTIVE': True, + 'NAME': 'tom_alertstreams.alertstreams.hopskotch.HopskotchAlertStream', + 'OPTIONS': { + 'URL': 'kafka://kafka.scimma.org/', + 'USERNAME': os.getenv('SCIMMA_CREDENTIAL_USERNAME', 'set SCIMMA_CREDENTIAL_USERNAME value in environment'), + 'PASSWORD': os.getenv('SCIMMA_CREDENTIAL_PASSWORD', 'set SCIMMA_CREDENTIAL_USERNAME value in environment'), + 'TOPIC_HANDLERS': { + 'tomtoolkit.test': 'tom_dataproducts.alertstreams.hermes.hermes_alert_handler', + }, + }, + }, + ] diff --git a/docs/managing_data/tom_direct_sharing.rst b/docs/managing_data/tom_direct_sharing.rst new file mode 100644 index 000000000..ee04cb73e --- /dev/null +++ b/docs/managing_data/tom_direct_sharing.rst @@ -0,0 +1,32 @@ +Sharing Data with Other TOMs +############################ + +TOM Toolkit does not yet support direct sharing between TOMs, however we hope to add this functionality soon. + + +.. Configuring your TOM to submit data to another TOM: +.. *************************************************** +.. +.. You will need to add a ``DATA_SHARING`` configuration dictionary to your ``settings.py`` that gives the credentials +.. for the various TOMs with which you wish to share data. +.. +.. .. code:: python +.. +.. # Define the valid data sharing destinations for your TOM. +.. DATA_SHARING = { +.. 'tom-demo-dev': { +.. 'DISPLAY_NAME': os.getenv('TOM_DEMO_DISPLAY_NAME', 'TOM Demo Dev'), +.. 'BASE_URL': os.getenv('TOM_DEMO_BASE_URL', 'http://tom-demo-dev.lco.gtn/'), +.. 'USERNAME': os.getenv('TOM_DEMO_USERNAME', 'set TOM_DEMO_USERNAME value in environment'), +.. 'PASSWORD': os.getenv('TOM_DEMO_PASSWORD', 'set TOM_DEMO_PASSWORD value in environment'), +.. }, +.. 'localhost-tom': { +.. # for testing; share with yourself +.. 'DISPLAY_NAME': os.getenv('LOCALHOST_TOM_DISPLAY_NAME', 'Local'), +.. 'BASE_URL': os.getenv('LOCALHOST_TOM_BASE_URL', 'http://127.0.0.1:8000/'), +.. 'USERNAME': os.getenv('LOCALHOST_TOM_USERNAME', 'set LOCALHOST_TOM_USERNAME value in environment'), +.. 'PASSWORD': os.getenv('LOCALHOST_TOM_PASSWORD', 'set LOCALHOST_TOM_PASSWORD value in environment'), +.. } +.. +.. } +.. \ No newline at end of file diff --git a/tom_alerts/migrations/0005_alertstreammessage.py b/tom_alerts/migrations/0005_alertstreammessage.py new file mode 100644 index 000000000..33c148cf7 --- /dev/null +++ b/tom_alerts/migrations/0005_alertstreammessage.py @@ -0,0 +1,23 @@ +# Generated by Django 4.1 on 2022-10-26 21:16 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('tom_alerts', '0004_auto_20210204_2300'), + ] + + operations = [ + migrations.CreateModel( + name='AlertStreamMessage', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('topic', models.CharField(help_text='The destination or source of sharing for the message.', max_length=500, verbose_name='Message Topic')), + ('message_id', models.CharField(help_text='An external message identifier that can be used to locate the message within the given topic.', max_length=50, verbose_name='Message ID')), + ('date_shared', models.DateTimeField(auto_now_add=True, help_text='The date on which the message is shared. (Date created by default.)', verbose_name='Date Shared')), + ('exchange_status', models.CharField(default='', help_text='Whether this message was sent or received.', max_length=10, verbose_name='Exchange Status')), + ], + ), + ] diff --git a/tom_alerts/migrations/0006_alter_alertstreammessage_exchange_status.py b/tom_alerts/migrations/0006_alter_alertstreammessage_exchange_status.py new file mode 100644 index 000000000..688c604e7 --- /dev/null +++ b/tom_alerts/migrations/0006_alter_alertstreammessage_exchange_status.py @@ -0,0 +1,18 @@ +# Generated by Django 4.1 on 2022-10-26 23:57 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('tom_alerts', '0005_alertstreammessage'), + ] + + operations = [ + migrations.AlterField( + model_name='alertstreammessage', + name='exchange_status', + field=models.CharField(choices=[('published', 'Published'), ('ingested', 'Ingested')], help_text='Whether this message was sent or received.', max_length=10, verbose_name='Exchange Status'), + ), + ] diff --git a/tom_alerts/models.py b/tom_alerts/models.py index 0acace442..6c6eac6a0 100644 --- a/tom_alerts/models.py +++ b/tom_alerts/models.py @@ -33,3 +33,50 @@ class BrokerQuery(models.Model): def __str__(self): return self.name + + +class AlertStreamMessage(models.Model): + """ + Class representing a streaming message containing data sent/received either over Kafka or to/from another TOM + :param topic: The destination or source of sharing for the message. + :type topic: str + + :param message_id: An external message identifier that can be used to locate the message within the given topic. + :type message_id: str + + :param date_shared: The date on which the message is shared. (Date created by default.) + :type date_shared: datetime + + :param exchange_status: Whether this message was sent or received. + :type exchange_status: str + """ + + EXCHANGE_STATUS_CHOICES = ( + ('published', 'Published'), + ('ingested', 'Ingested') + ) + + topic = models.CharField( + max_length=500, + verbose_name='Message Topic', + help_text='The destination or source of sharing for the message.' + ) + message_id = models.CharField( + max_length=50, + verbose_name='Message ID', + help_text='An external message identifier that can be used to locate the message within the given topic.' + ) + date_shared = models.DateTimeField( + auto_now_add=True, + verbose_name='Date Shared', + help_text='The date on which the message is shared. (Date created by default.)' + ) + exchange_status = models.CharField( + max_length=10, + verbose_name='Exchange Status', + choices=EXCHANGE_STATUS_CHOICES, + help_text='Whether this message was sent or received.' + ) + + def __str__(self): + return f'Message {self.message_id} on {self.topic}.' diff --git a/tom_base/settings.py b/tom_base/settings.py index 27c6ed7ef..f45380142 100644 --- a/tom_base/settings.py +++ b/tom_base/settings.py @@ -195,11 +195,23 @@ }, } + +TOM_FACILITY_CLASSES = [ + 'tom_observations.facilities.lco.LCOFacility', + 'tom_observations.facilities.gemini.GEMFacility', + 'tom_observations.facilities.soar.SOARFacility', +] + + # # tom_alerts configuration # LASAIR_TOKEN = os.getenv('LASAIR_TOKEN', None) + +# +# tom_dataproducts configuration +# # Define the valid data product types for your TOM. Be careful when removing items, as previously valid types will no # longer be valid, and may cause issues unless the offending records are modified. DATA_PRODUCT_TYPES = { @@ -214,11 +226,31 @@ 'spectroscopy': 'tom_dataproducts.processors.spectroscopy_processor.SpectroscopyProcessor', } -TOM_FACILITY_CLASSES = [ - 'tom_observations.facilities.lco.LCOFacility', - 'tom_observations.facilities.gemini.GEMFacility', - 'tom_observations.facilities.soar.SOARFacility', -] +# Configuration for the TOM/Kafka Stream receiving data from this TOM +DATA_SHARING = { + 'hermes': { + 'DISPLAY_NAME': os.getenv('HERMES_DISPLAY_NAME', 'Hermes'), + 'BASE_URL': os.getenv('HERMES_BASE_URL', 'https://hermes.lco.global/'), + 'CREDENTIAL_USERNAME': os.getenv('SCIMMA_CREDENTIAL_USERNAME', + 'set SCIMMA_CREDENTIAL_USERNAME value in environment'), + 'CREDENTIAL_PASSWORD': os.getenv('SCIMMA_CREDENTIAL_PASSWORD', + 'set SCIMMA_CREDENTIAL_PASSWORD value in environment'), + 'USER_TOPICS': ['hermes.test', 'tomtoolkit.test'] + }, + 'tom-demo-dev': { + 'DISPLAY_NAME': os.getenv('TOM_DEMO_DISPLAY_NAME', 'TOM Demo Dev'), + 'BASE_URL': os.getenv('TOM_DEMO_BASE_URL', 'http://tom-demo-dev.lco.gtn/'), + 'USERNAME': os.getenv('TOM_DEMO_USERNAME', 'set TOM_DEMO_USERNAME value in environment'), + 'PASSWORD': os.getenv('TOM_DEMO_PASSWORD', 'set TOM_DEMO_PASSWORD value in environment'), + }, + 'localhost-tom': { + # for testing; share with yourself + 'DISPLAY_NAME': os.getenv('LOCALHOST_TOM_DISPLAY_NAME', 'Local'), + 'BASE_URL': os.getenv('LOCALHOST_TOM_BASE_URL', 'http://127.0.0.1:8000/'), + 'USERNAME': os.getenv('LOCALHOST_TOM_USERNAME', 'set LOCALHOST_TOM_USERNAME value in environment'), + 'PASSWORD': os.getenv('LOCALHOST_TOM_PASSWORD', 'set LOCALHOST_TOM_PASSWORD value in environment'), + } +} TOM_CADENCE_STRATEGIES = [ 'tom_observations.cadences.retry_failed_observations.RetryFailedObservationsStrategy', diff --git a/tom_dataproducts/alertstreams/__init__.py b/tom_dataproducts/alertstreams/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tom_dataproducts/alertstreams/hermes.py b/tom_dataproducts/alertstreams/hermes.py new file mode 100644 index 000000000..a44095e0e --- /dev/null +++ b/tom_dataproducts/alertstreams/hermes.py @@ -0,0 +1,165 @@ +import logging +from dateutil.parser import parse + +from django.conf import settings + +# from hop.io import Metadata + +from tom_alerts.models import AlertStreamMessage +from tom_targets.models import Target +from tom_dataproducts.models import ReducedDatum + +import requests + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + + +class BuildHermesMessage(object): + """ + A HERMES Message Object that can be submitted to HOP through HERMES + """ + def __init__(self, title='', submitter='', authors='', message='', topic='hermes.test', **kwargs): + self.title = title + self.submitter = submitter + self.authors = authors + self.message = message + self.topic = topic + self.extra_info = kwargs + + +def publish_photometry_to_hermes(message_info, datums, **kwargs): + """ + Submits a typical hermes photometry alert using the datums supplied to build a photometry table. + -- Stores an AlertStreamMessage connected to each datum to show that the datum has previously been shared. + :param message_info: HERMES Message Object created with BuildHermesMessage + :param datums: Queryset of Reduced Datums to be built into table. + :return: response + """ + stream_base_url = settings.DATA_SHARING['hermes']['BASE_URL'] + submit_url = stream_base_url + 'api/v0/' + 'submit_photometry/' + headers = {'SCIMMA-API-Auth-Username': settings.DATA_SHARING['hermes']['CREDENTIAL_USERNAME'], + 'SCIMMA-API-Auth-Password': settings.DATA_SHARING['hermes']['CREDENTIAL_PASSWORD']} + hermes_photometry_data = [] + hermes_alert = AlertStreamMessage(topic=message_info.topic, exchange_status='published') + hermes_alert.save() + for tomtoolkit_photometry in datums: + tomtoolkit_photometry.message.add(hermes_alert) + hermes_photometry_data.append(create_hermes_phot_table_row(tomtoolkit_photometry, **kwargs)) + alert = { + 'topic': message_info.topic, + 'title': message_info.title, + 'submitter': message_info.submitter, + 'authors': message_info.authors, + 'data': { + 'photometry': hermes_photometry_data, + 'extra_info': message_info.extra_info + }, + 'message_text': message_info.message, + } + + response = requests.post(url=submit_url, json=alert, headers=headers) + return response + + +def create_hermes_phot_table_row(datum, **kwargs): + """Build a row for a Hermes Photometry Table using a TOM Photometry datum + """ + table_row = { + 'target_name': datum.target.name, + 'ra': datum.target.ra, + 'dec': datum.target.dec, + 'date': datum.timestamp.isoformat(), + 'telescope': datum.value.get('telescope', ''), + 'instrument': datum.value.get('instrument', ''), + 'band': datum.value.get('filter', ''), + 'brightness_unit': datum.value.get('unit', 'AB mag'), + } + if datum.value.get('magnitude', None): + table_row['brightness'] = datum.value['magnitude'] + else: + table_row['brightness'] = datum.value.get('limit', None) + table_row['nondetection'] = True + if datum.value.get('magnitude_error', None): + table_row['brightness_error'] = datum.value['magnitude_error'] + return table_row + + +def get_hermes_topics(): + """ + !CURRENTLY UNUSED! + Method to retrieve a list of available topics from HOP. + Intended to be called from forms when building topic list. + TODO: Retrieve list from HOP, currently unavailable due to authentication issues. + :return: List of topics available for users + """ + # stream_base_url = settings.DATA_SHARING['hermes']['BASE_URL'] + # submit_url = stream_base_url + "api/v0/profile/" + # headers = {'SCIMMA-API-Auth-Username': settings.DATA_SHARING['hermes']['CREDENTIAL_USERNAME'], + # 'SCIMMA-API-Auth-Password': settings.DATA_SHARING['hermes']['CREDENTIAL_PASSWORD']} + # user = settings.DATA_SHARING['hermes']['SCIMMA_AUTH_USERNAME'] + # headers = {} + + # response = requests.get(url=submit_url, headers=headers) + topics = settings.DATA_SHARING['hermes']['USER_TOPICS'] + return topics + + +def hermes_alert_handler(alert, metadata): + """Alert Handler to record data streamed through Hermes as a new ReducedDatum. + -- Only Reads Photometry Data + -- Only ingests Data if exact match for Target Name + -- Does not Ingest Data if exact match already exists + -- Requires 'tom_alertstreams' in settings.INSTALLED_APPS + -- Requires ALERT_STREAMS['topic_handlers'] in settings + """ + alert_as_dict = alert.content + photometry_table = alert_as_dict['data'].get('photometry', None) + if photometry_table: + hermes_alert = AlertStreamMessage(topic=alert_as_dict['topic'], exchange_status='ingested') + for row in photometry_table: + try: + target = Target.objects.get(name=row['target_name']) + except Target.DoesNotExist: + continue + + try: + obs_date = parse(row['date']) + except ValueError: + continue + + datum = { + 'target': target, + 'data_type': 'photometry', + 'source_name': alert_as_dict['topic'], + 'source_location': 'Hermes via HOP', # TODO Add message URL here once message ID's exist + 'timestamp': obs_date, + 'value': get_hermes_phot_value(row) + } + new_rd, created = ReducedDatum.objects.get_or_create(**datum) + if created: + hermes_alert.save() + new_rd.message.add(hermes_alert) + new_rd.save() + + +def get_hermes_phot_value(phot_data): + """ + Convert Hermes Message format for a row of Photometry table into parameters accepted by the Reduced Datum model + :param phot_data: Dictionary containing Hermes Photometry table. + :return: Dictionary containing properly formatted parameters for Reduced_Datum + """ + data_dictionary = { + 'magnitude_error': phot_data.get('brightness_error', ''), + 'filter': phot_data['band'], + 'telescope': phot_data.get('telescope', ''), + 'instrument': phot_data.get('instrument', ''), + 'unit': phot_data['brightness_unit'], + } + + if not phot_data.get('nondetection', False): + data_dictionary['magnitude'] = phot_data['brightness'] + else: + data_dictionary['limit'] = phot_data['brightness'] + + return data_dictionary diff --git a/tom_dataproducts/forms.py b/tom_dataproducts/forms.py index 59a766004..8576d3a94 100644 --- a/tom_dataproducts/forms.py +++ b/tom_dataproducts/forms.py @@ -7,6 +7,36 @@ from tom_targets.models import Target +def get_sharing_destination_options(): + """ + Build the Display options and headers for the dropdown form for choosing sharing topics. + Customize for a different selection experience. + :return: Tuple: Possible Destinations and their Display Names + """ + choices = [] + try: + for destination, details in settings.DATA_SHARING.items(): + new_destination = [details.get('DISPLAY_NAME', destination)] + if details.get('USER_TOPICS', None): + # If topics exist for a destination (Such as HERMES) give topics as sub-choices + # for non-selectable Destination + topic_list = [(f'{destination}:{topic}', topic) for topic in details['USER_TOPICS']] + new_destination.append(tuple(topic_list)) + else: + # Otherwise just use destination as option + new_destination.insert(0, destination) + choices.append(tuple(new_destination)) + except AttributeError: + pass + return tuple(choices) + + +DESTINATION_OPTIONS = get_sharing_destination_options() + +DATA_TYPE_OPTIONS = (('photometry', 'Photometry'), + ('spectroscopy', 'Spectroscopy')) + + class AddProductToGroupForm(forms.Form): products = forms.ModelMultipleChoiceField( DataProduct.objects.all(), @@ -46,3 +76,18 @@ def __init__(self, *args, **kwargs): self.fields['groups'] = forms.ModelMultipleChoiceField(Group.objects.none(), required=False, widget=forms.CheckboxSelectMultiple) + + +class DataShareForm(forms.Form): + share_destination = forms.ChoiceField(required=True, choices=DESTINATION_OPTIONS, label="Destination") + share_title = forms.CharField(required=False, label="Title") + share_message = forms.CharField(required=False, label="Message", widget=forms.Textarea()) + share_authors = forms.CharField(required=False, widget=forms.HiddenInput()) + data_type = forms.ChoiceField(required=False, choices=DATA_TYPE_OPTIONS, label="Data Type") + target = forms.ModelChoiceField( + Target.objects.all(), + widget=forms.HiddenInput(), + required=False) + submitter = forms.CharField( + widget=forms.HiddenInput() + ) diff --git a/tom_dataproducts/migrations/0011_reduceddatum_message.py b/tom_dataproducts/migrations/0011_reduceddatum_message.py new file mode 100644 index 000000000..246377019 --- /dev/null +++ b/tom_dataproducts/migrations/0011_reduceddatum_message.py @@ -0,0 +1,19 @@ +# Generated by Django 4.1 on 2022-10-26 21:16 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('tom_alerts', '0005_alertstreammessage'), + ('tom_dataproducts', '0010_manual_20210305_fix_spectroscopy'), + ] + + operations = [ + migrations.AddField( + model_name='reduceddatum', + name='message', + field=models.ManyToManyField(to='tom_alerts.alertstreammessage'), + ), + ] diff --git a/tom_dataproducts/models.py b/tom_dataproducts/models.py index 9ba5bd216..d8fbef1ae 100644 --- a/tom_dataproducts/models.py +++ b/tom_dataproducts/models.py @@ -12,6 +12,7 @@ from PIL import Image from tom_targets.models import Target +from tom_alerts.models import AlertStreamMessage from tom_observations.models import ObservationRecord logger = logging.getLogger(__name__) @@ -311,7 +312,7 @@ class ReducedDatum(models.Model): 'error': .5 } - but could also contain a filter: + but could also contain a filter, a telescope, an instrument, and/or a unit: :: @@ -319,9 +320,14 @@ class ReducedDatum(models.Model): 'magnitude': 18.5, 'magnitude_error': .5, 'filter': 'r' + 'telescope': 'ELP.domeA.1m0a' + 'instrument': 'fa07' } :type value: dict + :param message: Set of ``AlertStreamMessage`` objects this object is associated with. + :type message: ManyRelatedManager object + """ target = models.ForeignKey(Target, null=False, on_delete=models.CASCADE) @@ -334,6 +340,7 @@ class ReducedDatum(models.Model): source_location = models.CharField(max_length=200, default='') timestamp = models.DateTimeField(null=False, blank=False, default=datetime.now, db_index=True) value = models.JSONField(null=False, blank=False) + message = models.ManyToManyField(AlertStreamMessage) class Meta: get_latest_by = ('timestamp',) diff --git a/tom_dataproducts/templates/tom_dataproducts/dataproduct_list.html b/tom_dataproducts/templates/tom_dataproducts/dataproduct_list.html index 7292bc5a0..45366b7df 100644 --- a/tom_dataproducts/templates/tom_dataproducts/dataproduct_list.html +++ b/tom_dataproducts/templates/tom_dataproducts/dataproduct_list.html @@ -43,7 +43,7 @@ {% if product.data_product_type %} {{ product.get_type_display }} {% endif %} - + {% if product.get_file_extension == '.fz' or product.get_file_extension == '.fits' %} {% if product.get_preview %} diff --git a/tom_dataproducts/templates/tom_dataproducts/partials/dataproduct_list_for_target.html b/tom_dataproducts/templates/tom_dataproducts/partials/dataproduct_list_for_target.html index b6c3d3295..eb119fd8a 100644 --- a/tom_dataproducts/templates/tom_dataproducts/partials/dataproduct_list_for_target.html +++ b/tom_dataproducts/templates/tom_dataproducts/partials/dataproduct_list_for_target.html @@ -2,7 +2,14 @@ {% include 'tom_dataproducts/partials/js9_scripts.html' %}

Data

- + + + + + + + + {% for product in products %} @@ -21,9 +28,43 @@

Data

{% if product.data_product_type %} {{ product.get_type_display }} {% endif %} - + + + + + {% endfor %}
FilenameTypeDelete
FilenameTypeShareDelete
+ {% if sharing_destinations %} + + {% else %} +

Not Configured

+ {% endif %}
Delete
+
+ {% csrf_token %} + {% for hidden in data_product_share_form.hidden_fields %} + {{ hidden }} + {% endfor %} +
+
+ {% bootstrap_field data_product_share_form.share_destination %} +
+
+ {% bootstrap_field data_product_share_form.share_title %} +
+
+
+
+ {% bootstrap_field data_product_share_form.share_message %} +
+
+ {% buttons %} + + {% endbuttons %} +
+
+
+
diff --git a/tom_dataproducts/templates/tom_dataproducts/partials/photometry_datalist_for_target.html b/tom_dataproducts/templates/tom_dataproducts/partials/photometry_datalist_for_target.html new file mode 100644 index 000000000..953af3b88 --- /dev/null +++ b/tom_dataproducts/templates/tom_dataproducts/partials/photometry_datalist_for_target.html @@ -0,0 +1,91 @@ +{% load bootstrap4 %} +{% load tom_common_extras %} + +
+ {% csrf_token %} + {% for hidden in target_data_share_form.hidden_fields %} + {{ hidden }} + {% endfor %} +
+
+ Photometry Data +
+ + + + + + + + + + + + + {% for datum in data %} + + + + + + + + + + {% empty %} + + + + {% endfor %} + +
+ + +
TimestampTelescopeFilterMagnitudeErrorSource
{{ datum.timestamp }}{{ datum.telescope }}{{ datum.filter }} + + {% if datum.limit %}>{% endif %} + {{ datum.magnitude|truncate_number }} + {{ datum.magnitude_error }}{{ datum.source }}
No Photometry Data.
+
+
+ Share Selected Data +
+ {% if sharing_destinations %} +
+
+ {% bootstrap_field target_data_share_form.share_destination %} +
+
+ +
+
+ {% else %} + Not Configured + {% endif %} + + +
+
+
+ diff --git a/tom_dataproducts/templates/tom_dataproducts/partials/photometry_for_target.html b/tom_dataproducts/templates/tom_dataproducts/partials/photometry_for_target.html index 11b505923..8cfabb63a 100644 --- a/tom_dataproducts/templates/tom_dataproducts/partials/photometry_for_target.html +++ b/tom_dataproducts/templates/tom_dataproducts/partials/photometry_for_target.html @@ -8,4 +8,4 @@

Photometry

{{ plot|safe }} -
\ No newline at end of file + diff --git a/tom_dataproducts/templates/tom_dataproducts/partials/recent_photometry.html b/tom_dataproducts/templates/tom_dataproducts/partials/recent_photometry.html index 80baa0c56..e69c40b28 100644 --- a/tom_dataproducts/templates/tom_dataproducts/partials/recent_photometry.html +++ b/tom_dataproducts/templates/tom_dataproducts/partials/recent_photometry.html @@ -4,12 +4,20 @@ Recent Photometry - + + + + + {% for datum in data %} - + {% empty %} diff --git a/tom_dataproducts/templates/tom_dataproducts/partials/share_target_data.html b/tom_dataproducts/templates/tom_dataproducts/partials/share_target_data.html new file mode 100644 index 000000000..bcccaabc4 --- /dev/null +++ b/tom_dataproducts/templates/tom_dataproducts/partials/share_target_data.html @@ -0,0 +1,29 @@ +{% load bootstrap4 %} +{% include 'tom_dataproducts/partials/js9_scripts.html' %} +{% load tom_common_extras %} +
+
+ Publish Data for {{ target.name }}: +
+ {% if sharing_destinations %} +
+ {% csrf_token %} + {% for hidden in target_data_share_form.hidden_fields %} + {{ hidden }} + {% endfor %} +
+
+ {% bootstrap_field target_data_share_form.share_destination %} +
+
+ {% bootstrap_field target_data_share_form.data_type %} +
+
+ +
+
+ + {% else %} + Not Configured + {% endif %} +
diff --git a/tom_dataproducts/templatetags/dataproduct_extras.py b/tom_dataproducts/templatetags/dataproduct_extras.py index e1ba594ea..25ed40714 100644 --- a/tom_dataproducts/templatetags/dataproduct_extras.py +++ b/tom_dataproducts/templatetags/dataproduct_extras.py @@ -1,6 +1,8 @@ +import logging from urllib.parse import urlencode from django import template +from django import forms from django.conf import settings from django.contrib.auth.models import Group from django.core.paginator import Paginator @@ -14,12 +16,15 @@ from PIL import Image, ImageDraw import base64 -from tom_dataproducts.forms import DataProductUploadForm +from tom_dataproducts.forms import DataProductUploadForm, DataShareForm from tom_dataproducts.models import DataProduct, ReducedDatum from tom_dataproducts.processors.data_serializers import SpectrumSerializer from tom_observations.models import ObservationRecord from tom_targets.models import Target +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + register = template.Library() @@ -33,9 +38,17 @@ def dataproduct_list_for_target(context, target): else: target_products_for_user = get_objects_for_user( context['request'].user, 'tom_dataproducts.view_dataproduct', klass=target.dataproduct_set.all()) + + initial = {'submitter': context['request'].user, + 'target': target, + 'share_title': f"Updated data for {target.name}."} + form = DataShareForm(initial=initial) + return { 'products': target_products_for_user, - 'target': target + 'target': target, + 'sharing_destinations': form.fields['share_destination'].choices, + 'data_product_share_form': form } @@ -75,7 +88,10 @@ def dataproduct_list_all(context): products = DataProduct.objects.all().order_by('-created') else: products = get_objects_for_user(context['request'].user, 'tom_dataproducts.view_dataproduct') - return {'products': products} + + return { + 'products': products, + } @register.inclusion_tag('tom_dataproducts/partials/upload_dataproduct.html', takes_context=True) @@ -97,13 +113,102 @@ def upload_dataproduct(context, obj): return {'data_product_form': form} +@register.inclusion_tag('tom_dataproducts/partials/share_target_data.html', takes_context=True) +def share_data(context, target): + """ + Publish data to Hermes + """ + + initial = {'submitter': context['request'].user, + 'target': target, + 'share_title': f"Updated data for {target.name} from {getattr(settings, 'TOM_NAME', 'TOM Toolkit')}.", + } + form = DataShareForm(initial=initial) + form.fields['share_title'].widget = forms.HiddenInput() + + context = {'target': target, + 'target_data_share_form': form, + 'sharing_destinations': form.fields['share_destination'].choices} + return context + + @register.inclusion_tag('tom_dataproducts/partials/recent_photometry.html') def recent_photometry(target, limit=1): """ Displays a table of the most recent photometric points for a target. """ photometry = ReducedDatum.objects.filter(data_type='photometry', target=target).order_by('-timestamp')[:limit] - return {'data': [{'timestamp': rd.timestamp, 'magnitude': rd.value['magnitude']} for rd in photometry]} + + # Possibilities for reduced_datums from ZTF/MARS: + # reduced_datum.value: {'error': 0.0929680392146111, 'filter': 'r', 'magnitude': 18.2364940643311} + # reduced_datum.value: {'limit': 20.1023998260498, 'filter': 'g'} + + # for limit magnitudes, set the value of the limit key to True and + # the value of the magnitude key to the limit so the template and + # treat magnitudes as such and prepend a '>' to the limit magnitudes + # see recent_photometry.html + data = [] + for reduced_datum in photometry: + rd_data = {'timestamp': reduced_datum.timestamp} + if 'limit' in reduced_datum.value.keys(): + rd_data['magnitude'] = reduced_datum.value['limit'] + rd_data['limit'] = True + else: + rd_data['magnitude'] = reduced_datum.value['magnitude'] + rd_data['limit'] = False + data.append(rd_data) + + context = {'data': data} + return context + + +@register.inclusion_tag('tom_dataproducts/partials/photometry_datalist_for_target.html', takes_context=True) +def get_photometry_data(context, target): + """ + Displays a table of the all photometric points for a target. + """ + photometry = ReducedDatum.objects.filter(data_type='photometry', target=target).order_by('-timestamp') + + # Possibilities for reduced_datums from ZTF/MARS: + # reduced_datum.value: {'error': 0.0929680392146111, 'filter': 'r', 'magnitude': 18.2364940643311} + # reduced_datum.value: {'limit': 20.1023998260498, 'filter': 'g'} + + # for limit magnitudes, set the value of the limit key to True and + # the value of the magnitude key to the limit so the template and + # treat magnitudes as such and prepend a '>' to the limit magnitudes + # see recent_photometry.html + data = [] + for reduced_datum in photometry: + rd_data = {'id': reduced_datum.pk, + 'timestamp': reduced_datum.timestamp, + 'source': reduced_datum.source_name, + 'filter': reduced_datum.value.get('filter', ''), + 'telescope': reduced_datum.value.get('telescope', ''), + 'magnitude_error': reduced_datum.value.get('magnitude_error', '') + } + + if 'limit' in reduced_datum.value.keys(): + rd_data['magnitude'] = reduced_datum.value['limit'] + rd_data['limit'] = True + else: + rd_data['magnitude'] = reduced_datum.value['magnitude'] + rd_data['limit'] = False + data.append(rd_data) + + initial = {'submitter': context['request'].user, + 'target': target, + 'data_type': 'photometry', + 'share_title': f"Updated data for {target.name} from {getattr(settings, 'TOM_NAME', 'TOM Toolkit')}.", + } + form = DataShareForm(initial=initial) + form.fields['share_title'].widget = forms.HiddenInput() + form.fields['data_type'].widget = forms.HiddenInput() + + context = {'data': data, + 'target': target, + 'target_data_share_form': form, + 'sharing_destinations': form.fields['share_destination'].choices} + return context @register.inclusion_tag('tom_dataproducts/partials/photometry_for_target.html', takes_context=True) @@ -193,9 +298,11 @@ def photometry_for_target(context, target, width=700, height=600, background=Non fig = go.Figure(data=plot_data, layout=layout) fig.update_yaxes(showgrid=grid, color=label_color, showline=True, linecolor=label_color, mirror=True) fig.update_xaxes(showgrid=grid, color=label_color, showline=True, linecolor=label_color, mirror=True) + fig.update_layout(clickmode='event+select') + return { 'target': target, - 'plot': offline.plot(fig, output_type='div', show_link=False) + 'plot': offline.plot(fig, output_type='div', show_link=False), } diff --git a/tom_dataproducts/urls.py b/tom_dataproducts/urls.py index e4b621712..ebd4b450b 100644 --- a/tom_dataproducts/urls.py +++ b/tom_dataproducts/urls.py @@ -4,6 +4,7 @@ from tom_dataproducts.views import DataProductDeleteView, DataProductGroupCreateView from tom_dataproducts.views import DataProductGroupDetailView, DataProductGroupDataView, DataProductGroupDeleteView from tom_dataproducts.views import DataProductUploadView, DataProductFeatureView, UpdateReducedDataView +from tom_dataproducts.views import DataShareView from tom_common.api_router import SharedAPIRootRouter from tom_dataproducts.api_views import DataProductViewSet @@ -24,5 +25,7 @@ path('data/reduced/update/', UpdateReducedDataView.as_view(), name='update-reduced-data'), path('data//delete/', DataProductDeleteView.as_view(), name='delete'), path('data//feature/', DataProductFeatureView.as_view(), name='feature'), + path('data//share/', DataShareView.as_view(), name='share'), + path('target//share/', DataShareView.as_view(), name='share_all'), path('/save/', DataProductSaveView.as_view(), name='save'), ] diff --git a/tom_dataproducts/views.py b/tom_dataproducts/views.py index fd647cf9d..73cb920a8 100644 --- a/tom_dataproducts/views.py +++ b/tom_dataproducts/views.py @@ -1,4 +1,6 @@ from io import StringIO +import logging +import os from urllib.parse import urlencode, urlparse from django.conf import settings @@ -7,6 +9,7 @@ from django.contrib.auth.models import Group from django.core.cache import cache from django.core.cache.utils import make_template_fragment_key +from django.core.exceptions import ImproperlyConfigured from django.core.management import call_command from django.http import HttpResponseRedirect from django.shortcuts import redirect @@ -22,13 +25,22 @@ from tom_common.hooks import run_hook from tom_common.hints import add_hint from tom_common.mixins import Raise403PermissionRequiredMixin +from tom_targets.serializers import TargetSerializer +from tom_targets.models import Target from tom_dataproducts.models import DataProduct, DataProductGroup, ReducedDatum from tom_dataproducts.exceptions import InvalidFileFormatException -from tom_dataproducts.forms import AddProductToGroupForm, DataProductUploadForm +from tom_dataproducts.forms import AddProductToGroupForm, DataProductUploadForm, DataShareForm from tom_dataproducts.filters import DataProductFilter from tom_dataproducts.data_processor import run_data_processor +from tom_dataproducts.alertstreams.hermes import publish_photometry_to_hermes, BuildHermesMessage from tom_observations.models import ObservationRecord from tom_observations.facility import get_service_class +from tom_dataproducts.serializers import DataProductSerializer + +import requests + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) class DataProductSaveView(LoginRequiredMixin, View): @@ -276,6 +288,155 @@ def get(self, request, *args, **kwargs): ) +class DataShareView(FormView): + """ + View that handles the sharing of data either through HERMES or with another TOM. + """ + + form_class = DataShareForm + + def get_form(self, *args, **kwargs): + # TODO: Add permissions + form = super().get_form(*args, **kwargs) + return form + + def form_invalid(self, form): + """ + Adds errors to Django messaging framework in the case of an invalid form and redirects to the previous page. + """ + # TODO: Format error messages in a more human-readable way + messages.error(self.request, 'There was a problem sharing your Data: {}'.format(form.errors.as_json())) + return redirect(form.cleaned_data.get('referrer', '/')) + + def post(self, request, *args, **kwargs): + """ + Method that handles the POST requests for sharing data. + Handles Data Products and All the data of a type for a target as well as individual Reduced Datums. + Submit to Hermes, or Share with TOM (soon). + """ + + data_share_form = DataShareForm(request.POST, request.FILES) + # Check if data points have been selected. + selected_data = request.POST.getlist("share-box") + if data_share_form.is_valid(): + form_data = data_share_form.cleaned_data + # 1st determine if pk is data product, Reduced Datum, or Target. + # Then query relevant Reduced Datums Queryset + product_id = kwargs.get('dp_pk', None) + if product_id: + product = DataProduct.objects.get(pk=product_id) + data_type = product.data_product_type + reduced_datums = ReducedDatum.objects.filter(data_product=product) + else: + target_id = kwargs.get('tg_pk', None) + target = Target.objects.get(pk=target_id) + data_type = form_data['data_type'] + if request.POST.get("share-box", None) is None: + reduced_datums = ReducedDatum.objects.filter(target=target, data_type=data_type) + else: + reduced_datums = ReducedDatum.objects.filter(pk__in=selected_data) + if data_type == 'photometry': + share_destination = form_data['share_destination'] + if 'HERMES' in share_destination.upper(): + # Build and submit hermes table from Reduced Datums + hermes_topic = share_destination.split(':')[1] + destination = share_destination.split(':')[0] + message_info = BuildHermesMessage(title=form_data['share_title'], + submitter=form_data['submitter'], + authors=form_data['share_authors'], + message=form_data['share_message'], + topic=hermes_topic + ) + # Run ReducedDatums Queryset through sharing protocols to make sure they are safe to share. + filtered_reduced_datums = self.get_share_safe_datums(destination, reduced_datums, + topic=hermes_topic) + if filtered_reduced_datums.count() > 0: + response = publish_photometry_to_hermes(message_info, filtered_reduced_datums) + else: + messages.error(self.request, 'No Data to share. (Check sharing Protocol.)') + return redirect(reverse('tom_targets:detail', kwargs={'pk': request.POST.get('target')})) + else: + messages.error(self.request, 'TOM-TOM sharing is not yet supported.') + return redirect(reverse('tom_targets:detail', kwargs={'pk': request.POST.get('target')})) + # response = self.share_with_tom(share_destination, product) + try: + if 'message' in response.json(): + publish_feedback = response.json()['message'] + else: + publish_feedback = f"ERROR: {response.text}" + except ValueError: + publish_feedback = f"ERROR: Returned Response code {response.status_code}" + if "ERROR" in publish_feedback.upper(): + messages.error(self.request, publish_feedback) + else: + messages.success(self.request, publish_feedback) + else: + messages.error(self.request, f'Publishing {data_type} data is not yet supported.') + return redirect(reverse('tom_targets:detail', kwargs={'pk': request.POST.get('target')})) + + def share_with_tom(self, tom_name, product): + """ + When sharing a DataProduct with another TOM we likely want to share the data product itself and let the other + TOM process it rather than share the Reduced Datums + :param tom_name: name of destination tom in settings.DATA_SHARING + :param product: DataProduct model instance + :return: + """ + try: + destination_tom_base_url = settings.DATA_SHARING[tom_name]['BASE_URL'] + username = settings.DATA_SHARING[tom_name]['USERNAME'] + password = settings.DATA_SHARING[tom_name]['PASSWORD'] + except KeyError as err: + raise ImproperlyConfigured(f'Check DATA_SHARING configuration for {tom_name}: Key {err} not found.') + auth = (username, password) + headers = {'Media-Type': 'application/json'} + target = product.target + serialized_target_data = TargetSerializer(target).data + targets_url = destination_tom_base_url + 'api/targets/' + # TODO: Make sure aliases are checked before creating new target + # Attempt to create Target in Destination TOM + response = requests.post(targets_url, headers=headers, auth=auth, data=serialized_target_data) + try: + target_response = response.json() + destination_target_id = target_response['id'] + except KeyError: + # If Target already exists at destination, find ID + response = requests.get(targets_url, headers=headers, auth=auth, data=serialized_target_data) + target_response = response.json() + destination_target_id = target_response['results'][0]['id'] + + serialized_dataproduct_data = DataProductSerializer(product).data + serialized_dataproduct_data['target'] = destination_target_id + dataproducts_url = destination_tom_base_url + 'api/dataproducts/' + # TODO: this should be updated when tom_dataproducts is updated to use django.core.storage + dataproduct_filename = os.path.join(settings.MEDIA_ROOT, product.data.name) + # Save DataProduct in Destination TOM + with open(dataproduct_filename, 'rb') as dataproduct_filep: + files = {'file': (product.data.name, dataproduct_filep, 'text/csv')} + headers = {'Media-Type': 'multipart/form-data'} + response = requests.post(dataproducts_url, data=serialized_dataproduct_data, files=files, + headers=headers, auth=auth) + return response + + def get_share_safe_datums(self, destination, reduced_datums, **kwargs): + """ + Custom sharing protocols used to determine when data is shared with a destination. + This example prevents sharing if a datum has already been published to the given Hermes topic. + :param destination: sharing destination string + :param reduced_datums: selected input datums + :return: queryset of reduced datums to be shared + """ + return reduced_datums + # if 'hermes' in destination: + # message_topic = kwargs.get('topic', None) + # # Remove data points previously shared to the given topic + # filtered_datums = reduced_datums.exclude(Q(message__exchange_status='published') + # & Q(message__topic=message_topic)) + # else: + # filtered_datums = reduced_datums + # return filtered_datums + + class DataProductGroupDetailView(DetailView): """ View that handles the viewing of a specific ``DataProductGroup``. diff --git a/tom_targets/serializers.py b/tom_targets/serializers.py index 6a552f092..a39dec43c 100644 --- a/tom_targets/serializers.py +++ b/tom_targets/serializers.py @@ -28,8 +28,8 @@ class TargetSerializer(serializers.ModelSerializer): json (or other representations). See https://www.django-rest-framework.org/api-guide/serializers/#modelserializer """ - targetextra_set = TargetExtraSerializer(many=True) - aliases = TargetNameSerializer(many=True) + targetextra_set = TargetExtraSerializer(many=True, required=False) + aliases = TargetNameSerializer(many=True, required=False) groups = GroupSerializer(many=True, required=False) # TODO: return groups in detail and list class Meta: diff --git a/tom_targets/templates/tom_targets/target_detail.html b/tom_targets/templates/tom_targets/target_detail.html index 2b06a3def..b58bc942f 100644 --- a/tom_targets/templates/tom_targets/target_detail.html +++ b/tom_targets/templates/tom_targets/target_detail.html @@ -35,6 +35,7 @@ {% target_buttons object %} {% target_data object %} {% recent_photometry object limit=3 %} + {% share_data object %} {% if object.type == 'SIDEREAL' %} {% aladin object %} {% endif %} @@ -94,6 +95,7 @@

Observations

{% photometry_for_target target %} + {% get_photometry_data object %}
{% spectroscopy_for_target target %}
TimestampMagnitude
TimestampMagnitude
{{ datum.timestamp }}{{ datum.magnitude|truncate_number }} + + {% if datum.limit %}>{% endif %} + {{ datum.magnitude|truncate_number }} +