Skip to content

Commit

Permalink
Merge branch 'dev' into feature/update_lco_facility
Browse files Browse the repository at this point in the history
  • Loading branch information
jchate6 committed Jan 31, 2023
2 parents a555272 + a81f993 commit f1228df
Show file tree
Hide file tree
Showing 23 changed files with 936 additions and 20 deletions.
8 changes: 8 additions & 0 deletions docs/managing_data/index.rst
Expand Up @@ -9,10 +9,18 @@ Managing Data
../api/tom_dataproducts/views
plotting_data
customizing_data_processing
tom_direct_sharing
stream_pub_sub


:doc:`Creating Plots from TOM Data <plotting_data>` - Learn how to create plots using plot.ly and your TOM
data to display anywhere in your TOM.

:doc:`Adding Custom Data Processing <customizing_data_processing>` - Learn how you can process data into your
TOM from uploaded data products.

:doc:`TOM-TOM Direct Sharing <tom_direct_sharing>` - Learn how you can send and receive data between your TOM and another TOM-Toolkit TOM via an API.

:doc:`Publish and Subscribe to a Kafka Stream <stream_pub_sub>` - Learn how to publish and subscribe to a Kafka stream topic.


78 changes: 78 additions & 0 deletions docs/managing_data/stream_pub_sub.rst
@@ -0,0 +1,78 @@
Publish and Subscribe to a Kafka Stream
---------------------------------------

Publishing data to a stream and subscribing to a stream are handled independently and we describe each below.


Publish Data to a Kafka Topic
#############################

TOM Toolkit supports publishing data to a Kafka stream such as `Hermes <https://hermes.lco.global>`_ (an interface to
`HOPSKOTCH <https://hop.scimma.org>`_) and `GCNClassicOverKafka <https://gcn.nasa.gov>`_.

When sharing photometry data via Hermes, the TOM publishes the data to be shared to a topic on the HOPSKOTCH
Kafka stream. At this time, only photometry data is supported.


Configuring your TOM to Publish Data to a stream:
*************************************************

You will need to add a ``DATA_SHARING`` configuration dictionary to your ``settings.py`` that gives the credentials
for the various streams with which you wish to share data.

.. code:: python
# Define the valid data sharing destinations for your TOM.
DATA_SHARING = {
'hermes': {
'DISPLAY_NAME': os.getenv('HERMES_DISPLAY_NAME', 'Hermes'),
'BASE_URL': os.getenv('HERMES_BASE_URL', 'https://hermes.lco.global/'),
'CREDENTIAL_USERNAME': os.getenv('SCIMMA_CREDENTIAL_USERNAME',
'set SCIMMA_CREDENTIAL_USERNAME value in environment'),
'CREDENTIAL_PASSWORD': os.getenv('SCIMMA_CREDENTIAL_PASSWORD',
'set SCIMMA_CREDENTIAL_PASSWORD value in environment'),
'USER_TOPICS': ['hermes.test', 'tomtoolkit.test']
},
}
Subscribe to a Kafka Topic
##########################

TOM Toolkit allows a TOM to subscribe to a topic on a Kafka stream, ingesting messages from that topic and handling the data.
This could involve simply logging the message or extracting the data from the message and saving it if it is properly formatted.

Configuring your TOM to subscribe to a stream:
**********************************************

First you will need to add ``tom_alertstreams`` to your list of ``INSTALLED_APPS`` in your ``settings.py``.

.. code:: python
INSTALLED_APPS = [
...
'tom_alertstreams',
]
Then you will need to add an ``ALERT_STREAMS`` configuration dictionary to your ``settings.py``. This gives the credentials
for the various streams to which you wish to subscribe. Additionally, the ``TOPIC_HANDLERS`` section of the stream ``OPTIONS``
will include a list of handlers for each topic.

Some alert handlers are included as examples. Below we demonstrate how to connect to a Hermes Topic. You'll want to check
out the ``tom-alertstreams`` `README <https://github.com/TOMToolkit/tom-alertstreams>`_ for more details.

.. code:: python
ALERT_STREAMS = [
{
'ACTIVE': True,
'NAME': 'tom_alertstreams.alertstreams.hopskotch.HopskotchAlertStream',
'OPTIONS': {
'URL': 'kafka://kafka.scimma.org/',
'USERNAME': os.getenv('SCIMMA_CREDENTIAL_USERNAME', 'set SCIMMA_CREDENTIAL_USERNAME value in environment'),
'PASSWORD': os.getenv('SCIMMA_CREDENTIAL_PASSWORD', 'set SCIMMA_CREDENTIAL_USERNAME value in environment'),
'TOPIC_HANDLERS': {
'tomtoolkit.test': 'tom_dataproducts.alertstreams.hermes.hermes_alert_handler',
},
},
},
]
32 changes: 32 additions & 0 deletions docs/managing_data/tom_direct_sharing.rst
@@ -0,0 +1,32 @@
Sharing Data with Other TOMs
############################

TOM Toolkit does not yet support direct sharing between TOMs, however we hope to add this functionality soon.


.. Configuring your TOM to submit data to another TOM:
.. ***************************************************
..
.. You will need to add a ``DATA_SHARING`` configuration dictionary to your ``settings.py`` that gives the credentials
.. for the various TOMs with which you wish to share data.
..
.. .. code:: python
..
.. # Define the valid data sharing destinations for your TOM.
.. DATA_SHARING = {
.. 'tom-demo-dev': {
.. 'DISPLAY_NAME': os.getenv('TOM_DEMO_DISPLAY_NAME', 'TOM Demo Dev'),
.. 'BASE_URL': os.getenv('TOM_DEMO_BASE_URL', 'http://tom-demo-dev.lco.gtn/'),
.. 'USERNAME': os.getenv('TOM_DEMO_USERNAME', 'set TOM_DEMO_USERNAME value in environment'),
.. 'PASSWORD': os.getenv('TOM_DEMO_PASSWORD', 'set TOM_DEMO_PASSWORD value in environment'),
.. },
.. 'localhost-tom': {
.. # for testing; share with yourself
.. 'DISPLAY_NAME': os.getenv('LOCALHOST_TOM_DISPLAY_NAME', 'Local'),
.. 'BASE_URL': os.getenv('LOCALHOST_TOM_BASE_URL', 'http://127.0.0.1:8000/'),
.. 'USERNAME': os.getenv('LOCALHOST_TOM_USERNAME', 'set LOCALHOST_TOM_USERNAME value in environment'),
.. 'PASSWORD': os.getenv('LOCALHOST_TOM_PASSWORD', 'set LOCALHOST_TOM_PASSWORD value in environment'),
.. }
..
.. }
..
23 changes: 23 additions & 0 deletions tom_alerts/migrations/0005_alertstreammessage.py
@@ -0,0 +1,23 @@
# Generated by Django 4.1 on 2022-10-26 21:16

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('tom_alerts', '0004_auto_20210204_2300'),
]

operations = [
migrations.CreateModel(
name='AlertStreamMessage',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('topic', models.CharField(help_text='The destination or source of sharing for the message.', max_length=500, verbose_name='Message Topic')),
('message_id', models.CharField(help_text='An external message identifier that can be used to locate the message within the given topic.', max_length=50, verbose_name='Message ID')),
('date_shared', models.DateTimeField(auto_now_add=True, help_text='The date on which the message is shared. (Date created by default.)', verbose_name='Date Shared')),
('exchange_status', models.CharField(default='', help_text='Whether this message was sent or received.', max_length=10, verbose_name='Exchange Status')),
],
),
]
@@ -0,0 +1,18 @@
# Generated by Django 4.1 on 2022-10-26 23:57

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('tom_alerts', '0005_alertstreammessage'),
]

operations = [
migrations.AlterField(
model_name='alertstreammessage',
name='exchange_status',
field=models.CharField(choices=[('published', 'Published'), ('ingested', 'Ingested')], help_text='Whether this message was sent or received.', max_length=10, verbose_name='Exchange Status'),
),
]
47 changes: 47 additions & 0 deletions tom_alerts/models.py
Expand Up @@ -33,3 +33,50 @@ class BrokerQuery(models.Model):

def __str__(self):
return self.name


class AlertStreamMessage(models.Model):
"""
Class representing a streaming message containing data sent/received either over Kafka or to/from another TOM
:param topic: The destination or source of sharing for the message.
:type topic: str
:param message_id: An external message identifier that can be used to locate the message within the given topic.
:type message_id: str
:param date_shared: The date on which the message is shared. (Date created by default.)
:type date_shared: datetime
:param exchange_status: Whether this message was sent or received.
:type exchange_status: str
"""

EXCHANGE_STATUS_CHOICES = (
('published', 'Published'),
('ingested', 'Ingested')
)

topic = models.CharField(
max_length=500,
verbose_name='Message Topic',
help_text='The destination or source of sharing for the message.'
)
message_id = models.CharField(
max_length=50,
verbose_name='Message ID',
help_text='An external message identifier that can be used to locate the message within the given topic.'
)
date_shared = models.DateTimeField(
auto_now_add=True,
verbose_name='Date Shared',
help_text='The date on which the message is shared. (Date created by default.)'
)
exchange_status = models.CharField(
max_length=10,
verbose_name='Exchange Status',
choices=EXCHANGE_STATUS_CHOICES,
help_text='Whether this message was sent or received.'
)

def __str__(self):
return f'Message {self.message_id} on {self.topic}.'
42 changes: 37 additions & 5 deletions tom_base/settings.py
Expand Up @@ -195,11 +195,23 @@
},
}


TOM_FACILITY_CLASSES = [
'tom_observations.facilities.lco.LCOFacility',
'tom_observations.facilities.gemini.GEMFacility',
'tom_observations.facilities.soar.SOARFacility',
]


#
# tom_alerts configuration
#
LASAIR_TOKEN = os.getenv('LASAIR_TOKEN', None)


#
# tom_dataproducts configuration
#
# Define the valid data product types for your TOM. Be careful when removing items, as previously valid types will no
# longer be valid, and may cause issues unless the offending records are modified.
DATA_PRODUCT_TYPES = {
Expand All @@ -214,11 +226,31 @@
'spectroscopy': 'tom_dataproducts.processors.spectroscopy_processor.SpectroscopyProcessor',
}

TOM_FACILITY_CLASSES = [
'tom_observations.facilities.lco.LCOFacility',
'tom_observations.facilities.gemini.GEMFacility',
'tom_observations.facilities.soar.SOARFacility',
]
# Configuration for the TOM/Kafka Stream receiving data from this TOM
DATA_SHARING = {
'hermes': {
'DISPLAY_NAME': os.getenv('HERMES_DISPLAY_NAME', 'Hermes'),
'BASE_URL': os.getenv('HERMES_BASE_URL', 'https://hermes.lco.global/'),
'CREDENTIAL_USERNAME': os.getenv('SCIMMA_CREDENTIAL_USERNAME',
'set SCIMMA_CREDENTIAL_USERNAME value in environment'),
'CREDENTIAL_PASSWORD': os.getenv('SCIMMA_CREDENTIAL_PASSWORD',
'set SCIMMA_CREDENTIAL_PASSWORD value in environment'),
'USER_TOPICS': ['hermes.test', 'tomtoolkit.test']
},
'tom-demo-dev': {
'DISPLAY_NAME': os.getenv('TOM_DEMO_DISPLAY_NAME', 'TOM Demo Dev'),
'BASE_URL': os.getenv('TOM_DEMO_BASE_URL', 'http://tom-demo-dev.lco.gtn/'),
'USERNAME': os.getenv('TOM_DEMO_USERNAME', 'set TOM_DEMO_USERNAME value in environment'),
'PASSWORD': os.getenv('TOM_DEMO_PASSWORD', 'set TOM_DEMO_PASSWORD value in environment'),
},
'localhost-tom': {
# for testing; share with yourself
'DISPLAY_NAME': os.getenv('LOCALHOST_TOM_DISPLAY_NAME', 'Local'),
'BASE_URL': os.getenv('LOCALHOST_TOM_BASE_URL', 'http://127.0.0.1:8000/'),
'USERNAME': os.getenv('LOCALHOST_TOM_USERNAME', 'set LOCALHOST_TOM_USERNAME value in environment'),
'PASSWORD': os.getenv('LOCALHOST_TOM_PASSWORD', 'set LOCALHOST_TOM_PASSWORD value in environment'),
}
}

TOM_CADENCE_STRATEGIES = [
'tom_observations.cadences.retry_failed_observations.RetryFailedObservationsStrategy',
Expand Down
Empty file.

0 comments on commit f1228df

Please sign in to comment.