Skip to content

Commit

Permalink
Merge pull request #14 from TOMToolkit/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
jnation3406 committed Apr 27, 2023
2 parents 8853f0a + b986aad commit 8d6b36b
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def get_moc_url_from_skymap_fits_url(skymap_fits_url):
def handle_message(message):
# It receives a bytestring message or a Kafka message in the LIGO GW format
# fields must be extracted from the message text and stored into in the model
# It returns the nonlocalizedevent and event sequence ingested or None, None.
if not isinstance(message, bytes):
bytes_message = message.value()
else:
Expand Down Expand Up @@ -100,6 +101,9 @@ def handle_message(message):
f'{event_sequence} for NonLocalizedEvent: {nonlocalizedevent}')
logger.warning(warning_msg)

return nonlocalizedevent, event_sequence
return None, None


def handle_retraction(message):
# It receives a bytestring message or a Kafka message in the LIGO GW format
Expand All @@ -118,6 +122,8 @@ def handle_retraction(message):
retracted_event = NonLocalizedEvent.objects.get(event_id=fields['trigger_num'])
retracted_event.state = NonLocalizedEvent.NonLocalizedEventState.RETRACTED
retracted_event.save()
return retracted_event
except NonLocalizedEvent.DoesNotExist:
logger.warning((f"Got a Retraction notice for event id {fields['trigger_num']}"
f"which does not exist in the database"))
return None
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def handle_igwn_message(message: JSONBlob, metadata: Metadata):
)
logger.error(traceback.format_exc())

logger.warning(f"Storing igwn alert: {alert}")
logger.debug(f"Storing igwn alert: {alert}")

# Now ingest the sequence for that event
sequence_number = get_sequence_number(alert['superevent_id'])
Expand All @@ -98,3 +98,5 @@ def handle_igwn_message(message: JSONBlob, metadata: Metadata):
f'{event_sequence} for NonLocalizedEvent: {nonlocalizedevent}'
)
logger.warning(warning_msg)

return nonlocalizedevent, event_sequence
3 changes: 2 additions & 1 deletion tom_nonlocalizedevents/healpix_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
EventLocalization, CredibleRegion)
from django.db import transaction
from django.conf import settings
from django.utils import timezone
from healpix_alchemy.constants import HPX, LEVEL
from healpix_alchemy.types import Tile, Point
import sqlalchemy as sa
Expand Down Expand Up @@ -122,7 +123,7 @@ def create_localization_for_skymap(nonlocalizedevent: NonLocalizedEvent, skymap_
skymap = Table.read(BytesIO(skymap_bytes))
distance_mean = skymap.meta['DISTMEAN']
distance_std = skymap.meta['DISTSTD']
date = parse(skymap.meta['DATE'])
date = parse(skymap.meta['DATE']).replace(tzinfo=timezone.utc)
skymap_uuid = uuid.UUID(skymap_hash)
skymap_version = get_skymap_version(nonlocalizedevent, skymap_hash=skymap_uuid, is_combined=is_combined)
if not skymap_url:
Expand Down
95 changes: 95 additions & 0 deletions tom_nonlocalizedevents/ingestion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import logging
import requests
import traceback

from tom_nonlocalizedevents.healpix_utils import create_localization_for_skymap
from tom_nonlocalizedevents.models import NonLocalizedEvent, EventSequence, ExternalCoincidence


logger = logging.getLogger(__name__)


def ingest_sequence_from_hermes_message(message):
event_id = message.get('message', {}).get('data', {}).get('superevent_id', None)
if not event_id:
logger.warn(f"Got a malformed HermesBroker message, cannot created nonlocalizedevent. Message = {message}")
return

data = message.get('message', {}).get('data', {})

if data.get('alert_type', '') == 'RETRACTION':
NonLocalizedEvent.objects.update_or_create(
event_id=event_id,
event_type=NonLocalizedEvent.NonLocalizedEventType.GRAVITATIONAL_WAVE,
defaults={'state': NonLocalizedEvent.NonLocalizedEventState.RETRACTED}
)
return

nonlocalizedevent, nle_created = NonLocalizedEvent.objects.get_or_create(
event_id=event_id,
event_type=NonLocalizedEvent.NonLocalizedEventType.GRAVITATIONAL_WAVE,
)
if nle_created:
logger.info(f"Ingested a new GW event with id {event_id} from HermesBroker")

localization = None
skymap_url = data.get('urls', {}).get('skymap')
if skymap_url:
try:
skymap_resp = requests.get(skymap_url)
skymap_resp.raise_for_status()
localization = create_localization_for_skymap(
nonlocalizedevent=nonlocalizedevent,
skymap_bytes=skymap_resp.content,
skymap_url=skymap_url
)
except Exception as e:
localization = None
logger.error(
f"Failed to retrieve and process localization from skymap file at {skymap_url}. Exception: {e}"
)
logger.error(traceback.format_exc())

combined_skymap_url = data.get('urls', {}).get('combined_skymap')
external_coincidence = None
if combined_skymap_url:
try:
combined_skymap_resp = requests.get(combined_skymap_url)
combined_skymap_resp.raise_for_status()
combined_localization = create_localization_for_skymap(
nonlocalizedevent=nonlocalizedevent,
skymap_bytes=combined_skymap_resp.content,
skymap_url=combined_skymap_url,
is_combined=True
)
external_coincidence, _ = ExternalCoincidence.objects.get_or_create(
localization=combined_localization,
defaults={'details': data.get('external_coinc')}
)
except Exception as e:
combined_localization = None
logger.error((
f"Failed to retrieve and process combined localization from combined skymap file at"
f"{combined_skymap_url}. Exception: {e}"
))
logger.error(traceback.format_exc())

# Now ingest the sequence for that event
sequence_number = data.get('sequence_num')
event_sequence, es_created = EventSequence.objects.update_or_create(
nonlocalizedevent=nonlocalizedevent,
localization=localization,
external_coincidence=external_coincidence,
sequence_id=sequence_number,
details=data.get('event'),
defaults={
'event_subtype': data.get('alert_type'),
'ingestor_source': 'hop'
}
)
if es_created and localization is None:
warning_msg = (
f'{"Creating" if es_created else "Updating"} EventSequence without EventLocalization:'
f'{event_sequence} for NonLocalizedEvent: {nonlocalizedevent}'
)
logger.warning(warning_msg)
2 changes: 1 addition & 1 deletion tom_nonlocalizedevents/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@
path('', views.NonLocalizedEventListView.as_view(), name='index'),
path('<int:pk>/', SupereventPkView.as_view(), name='detail'),
path('<str:event_id>/', SupereventIdView.as_view(), name='event-detail'),
path('alert/createfrom', views.CreateEventFromSCiMMAAlertView.as_view(), name='create-from-alert')
path('alert/createfrom', views.CreateEventFromHermesAlertView.as_view(), name='create-from-alert')
]
74 changes: 29 additions & 45 deletions tom_nonlocalizedevents/views.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import logging

from django.contrib import messages
from django.core.cache import cache
Expand All @@ -13,11 +14,15 @@
from rest_framework import permissions, viewsets
from django_filters.rest_framework import DjangoFilterBackend

from tom_nonlocalizedevents.ingestion import ingest_sequence_from_hermes_message
from tom_nonlocalizedevents.models import EventCandidate, EventLocalization, NonLocalizedEvent
from tom_nonlocalizedevents.serializers import (EventCandidateSerializer, EventLocalizationSerializer,
NonLocalizedEventSerializer)


logger = logging.getLogger(__name__)


class NonLocalizedEventListView(LoginRequiredMixin, ListView):
"""
Unadorned Django ListView subclass for NonLocalizedEvent model.
Expand All @@ -32,72 +37,51 @@ def get_queryset(self):


# from the tom_alerts query_result.html

class CreateEventFromSCiMMAAlertView(View):
class CreateEventFromHermesAlertView(View):
"""
Creates the models.NonLocalizedEvent instance and redirect to NonLocalizedEventDetailView
"""
pass

def post(self, request, *args, **kwargs):
"""
"""
# the request.POST is a QueryDict object;
# for SCiMMA, alerts: list items are PKs to skip.dev.hop.scimma.org/api/alerts/PK/
query_id = self.request.POST['query_id']
# broker_name = self.request.POST['broker'] # should be "SCIMMA"
# broker_class = get_service_class(broker_name) # should be <class 'tom_scimma.scimma.SCIMMABroker'>
alerts = [int(id) for id in request.POST.getlist('alerts', [])]

errors = []
# events is a list[str] of NonLocalizedEvent event_id's: (e.g. 'MS230417a')
# (i.e the selected events from the query result form)
events = request.POST.getlist('events', [])

# if the user didn't select an alert; warn and re-direct back
if not alerts:
messages.warning(request, 'Please select at least one alert from which to create an event.')
if not events:
messages.warning(request, 'Please select at least one Event to create.')
reverse_url: str = reverse('tom_alerts:run', kwargs={'pk': query_id})
return redirect(reverse_url)

# try to extract EventID from Alert and use it to create a NoneLocalizedEvent
for alert_id in alerts:
cached_alert = json.loads(cache.get(f'alert_{alert_id}')) # cached by tom_alerts.views.py::RunQueryView
# Create NonLocalizedEvents for each of the selected events.
for event_id in events:
logger.debug(f'Creating event {event_id}...')
# extract the Hermes event from the cache
# (it was cached by tom_alerts.views.py::RunQueryView)
cached_event = json.loads(cache.get(f'alert_{event_id}'))

# early return: alert not in cache
if not cached_alert:
messages.error(request, 'Could not create event(s). Try re-running the query to refresh the cache.')
if not cached_event:
messages.error(request, 'Could not createn event(s). Try re-running the query to refresh the cache.')
return redirect(reverse('tom_alerts:run', kwargs={'pk': query_id}))

# early return: alert not LVC/LVC COUNTERPART NOTICE
if cached_alert.get('topic', '') != 'lvc.lvc-counterpart':
messages.error(request,
('Only Alerts from the lvc.lvc-counterpart topic have '
'parsed event_trig_num required for Event origination. '
'Please select an appropriate alert.'))
return redirect(reverse('tom_alerts:run', kwargs={'pk': query_id}))
# the NonLocalizedEvent is created by handling all the messages from
# the event sequence as if they were ingested
for sequenced_message in cached_event['sequences']:
logger.debug(f"Creating sequence from HermesBroker: {sequenced_message}")
ingest_sequence_from_hermes_message(sequenced_message)

# early return: event_trig_num not found in parsed alert message
event_trig_num = cached_alert['message'].get('event_trig_num', None)
if event_trig_num is None:
messages.error(request,
(f'Could not create event for alert: {alert_id}. '
'event_trig_num not found in alert message.'))
return redirect(reverse('tom_alerts:run', kwargs={'pk': query_id}))
return redirect(reverse('nonlocalizedevents:index'))

nonlocalizedevent, created = NonLocalizedEvent.objects.get_or_create(event_id=event_trig_num)
if not created:
# the nonlocalizedevent already existed
messages.warning(request, f'Event {event_trig_num} already exists.')
errors.append(nonlocalizedevent.event_id)

if len(alerts) == len(errors):
# zero nonlocalizedevents created
return redirect(reverse('tom_alerts:run', kwargs={'pk': query_id}))
elif len(alerts) == 1:
# one nonlocalizedevent created
return redirect(reverse('nonlocalizedevents:detail', kwargs={'pk': nonlocalizedevent.pk}))
else:
# multipe nonlocalizedevent created
return redirect(reverse('nonlocalizedevents:index'))

#
# Django Rest Framework Views

#

class NonLocalizedEventViewSet(viewsets.ModelViewSet):
"""
Expand Down

0 comments on commit 8d6b36b

Please sign in to comment.