Skip to content

Commit

Permalink
Merge 45e2fd8 into 6463205
Browse files Browse the repository at this point in the history
  • Loading branch information
adybbroe committed Oct 24, 2022
2 parents 6463205 + 45e2fd8 commit 781ee30
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 31 deletions.
87 changes: 65 additions & 22 deletions activefires_pp/spatiotemporal_alarm_filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
are:
* Time threshold: 6 hours
* Spacial threshold: 800 meters
* Spatial threshold: 800 meters
* Threshold defining the longest possible fire before it will be split into smaller fires: 1.2 km
"""

Expand All @@ -41,7 +42,7 @@
import signal
from queue import Empty
from threading import Thread
from requests.exceptions import HTTPError
from requests.exceptions import HTTPError, ConnectionError
from posttroll.listener import ListenerContainer
from posttroll.message import Message
from posttroll.publisher import NoisyPublisher
Expand Down Expand Up @@ -78,9 +79,9 @@ def __init__(self, configfile):
super().__init__()
self.configfile = configfile
self.options = {}
self.time_and_space_thresholds = {}

config = read_config(self.configfile)
self._set_options_from_config(config)
self._read_and_check_configuration()

self.input_topic = self.options['subscribe_topics'][0]
LOG.debug("Input topic: %s", self.input_topic)
Expand All @@ -107,6 +108,39 @@ def _setup_and_start_communication(self):
self.loop = True
signal.signal(signal.SIGTERM, self.signal_shutdown)

def _read_and_check_configuration(self):
"""Read and check the configuration parameters."""
config = read_config(self.configfile)
self._check_and_set_thresholds_from_config(config)
self._set_options_from_config(config)

def _check_and_set_thresholds_from_config(self, config):
"""Check that adequate thresholds are provided in config file."""
if 'time_and_space_thresholds' not in config:
raise OSError('Missing time and space thresholds from configuration!')

self._log_message_per_threshold = {
'hour_threshold': ("Threshold defining when a new detection should trigger an " +
"alarm on the same spot (in hours): ",
"Threshold in time is missing!"),
'long_fires_threshold_km': ("Threshold defining the maximum extention of a fire before " +
"dividing it in smaller pieces (in km): %3.1f",
"Threshold for splitting long fires is missing!"),
"spatial_threshold_km": ("Threshold defining the maximum distance between two detections " +
"beyond which they must trigger more than one alarm (km): %3.1f",
"A spatial threshold is missing!")
}
for thr_type in self._log_message_per_threshold:
self._check_and_set_threshold(thr_type, config)

def _check_and_set_threshold(self, thr_type, config):
"""Check if threshold exists in config and set it accordingly or raise an error."""
if thr_type in config['time_and_space_thresholds']:
self.time_and_space_thresholds[thr_type] = config['time_and_space_thresholds'][thr_type]
LOG.info(self._log_message_per_threshold[thr_type][0], self.time_and_space_thresholds[thr_type])
else:
raise OSError(self._log_message_per_threshold[thr_type][1])

def _set_options_from_config(self, config):
"""From the configuration on disk set the option dictionary with all metadata for real-time processing."""
for item in config:
Expand Down Expand Up @@ -169,7 +203,8 @@ def spatio_temporal_alarm_filtering(self, msg):
return None

geojson_alarms = create_alarms_from_fire_detections(ffdata,
self.fire_alarms_dir, self.sos_alarms_file_pattern)
self.fire_alarms_dir, self.sos_alarms_file_pattern,
self.time_and_space_thresholds)

if len(geojson_alarms) == 0:
LOG.info("No alarms to be triggered!")
Expand All @@ -189,8 +224,8 @@ def send_alarms(self, geojson_alarms, msg):
try:
post_alarm(alarm['features'], self.restapi_url, self._xauth_token)
LOG.info('Alarm sent - status OK')
except HTTPError:
LOG.exception('Failed sending alarm!')
except (HTTPError, ConnectionError) as err:
LOG.exception('Failed sending alarm! Error: %s', str(err))
LOG.error('Data: %s', str(alarm['features']))

output_message = _create_output_message(msg, self.output_topic, alarm, output_filename)
Expand Down Expand Up @@ -232,8 +267,9 @@ def dump_collection(idx, features):


def create_alarms_from_fire_detections(fire_data, past_detections_dir, sos_alarms_file_pattern,
long_fires_threshold=1.2, hour_threshold=6.0):
time_space_thresholds):
"""Create alarm(s) from a set of detections."""
long_fires_threshold = time_space_thresholds.get('long_fires_threshold_km', 1.2)
gathered_fires = join_fire_detections(fire_data)

# Now go through the gathered fires and split long/large clusters of
Expand All @@ -249,7 +285,7 @@ def create_alarms_from_fire_detections(fire_data, past_detections_dir, sos_alarm
# Check against the most recent alarms:
alarm_should_be_triggered = check_if_fire_should_trigger_alarm(fire_alarm, past_detections_dir,
sos_alarms_file_pattern,
hour_thr=hour_threshold)
time_space_thresholds)
if alarm_should_be_triggered:
lonlat = fire_alarm['features']['geometry']['coordinates']
power = fire_alarm['features']['properties']['power']
Expand All @@ -267,7 +303,6 @@ def find_neighbours(feature, other_features, thr_dist=0.8):
geom = feat['geometry']
lon, lat = geom['coordinates']
km_dist = distance.distance((lat0, lon0), (lat, lon)).kilometers
# print("Id: %d Distance: %f" % (key, km_dist))
if km_dist < thr_dist:
idx.append(key)

Expand Down Expand Up @@ -324,7 +359,7 @@ def join_fire_detections(gdata):
return feature_collections


def split_large_fire_clusters(features, km_threshold):
def split_large_fire_clusters(features, large_fires_threshold):
"""Take a list of fire detection features and split in smaller clusters/chains."""
num_features = len(features)
LOG.debug("Split large fire clusters - Number of features: %d" % num_features)
Expand All @@ -344,8 +379,8 @@ def split_large_fire_clusters(features, km_threshold):
max_distance = km_dist
max_2comb = tup2

if max_distance < km_threshold:
LOG.debug("Only one cluster - (max_distance, threshold) = (%f, %f)" % (max_distance, km_threshold))
if max_distance < large_fires_threshold:
LOG.debug("Only one cluster - (max_distance, threshold) = (%f, %f)" % (max_distance, large_fires_threshold))
return {'only-one-cluster': list(features.values())}

# Now we have located the two detections in the collection that are
Expand All @@ -355,7 +390,7 @@ def split_large_fire_clusters(features, km_threshold):
start_index = max_2comb[0]
while num_features > 0:
features = gather_neighbours_to_new_collection(start_index, features, feature_collections,
thr_dist=km_threshold)
large_fires_threshold)
if not features:
break
start_index = min(features.keys())
Expand Down Expand Up @@ -398,36 +433,44 @@ def create_single_point_alarms_from_collections(features):
return fcollection


def get_single_point_fires_as_collections(fires, threshold):
def get_single_point_fires_as_collections(fires, long_fires_threshold):
"""Split larger fires into smaller parts and make a single fire-detection out of each and return as collection."""
features = split_large_fire_clusters(fires, threshold)
features = split_large_fire_clusters(fires, long_fires_threshold)
return create_single_point_alarms_from_collections(features)


def check_if_fire_should_trigger_alarm(gjson_data, past_alarms_dir, sos_alarms_file_pattern,
hour_thr=16, km_threshold=0.8):
time_space_thresholds):
"""Check if fire point should trigger an alarm.
The fire point is a GeoJSON object. A search back in time X hours (X=16) is
The fire point is a GeoJSON object. A search back in time X hours (e.g. X=16) is
done, and a check for previous fire alarms on that location is done. Only
if no previous fire alarm at the same position (determined by a distance
threshold) is found an alarm should be issued.
threshold) is found, an alarm should be issued.
"""
utc = pytz.timezone('utc')
end_time = datetime.fromisoformat(gjson_data["properties"]["observation_time"])
end_time = end_time.astimezone(utc).replace(tzinfo=None)

hour_thr = time_space_thresholds.get('hour_threshold', 16)
km_threshold = time_space_thresholds.get('spatial_threshold_km', 0.8)

LOG.debug("Check if fire detection should trigger alarm. Thresholds = (%3.1f h, %3.1f km)",
hour_thr, km_threshold)

start_time = end_time - timedelta(hours=hour_thr)
recent_files = get_recent_geojson_files(past_alarms_dir, sos_alarms_file_pattern, (start_time, end_time))
recent_files = get_recent_geojson_files(past_alarms_dir, sos_alarms_file_pattern,
(start_time, end_time))

# If directory is empty there is no history present and this alarm should be triggered:
if len(recent_files) == 0:
LOG.info("Directory empty - no history present - alarm should be triggered!")
return True

lon0, lat0 = gjson_data["geometry"]["coordinates"]
# Go though the most recent files and see if an alarm has been triggered for the "same" position.
# Go through the files in reverse order, take the most recent file first!
# Go through the most recent files and see if an alarm has been triggered
# for the "same" position. Go through the files in reverse order, take the
# most recent file first!
shall_trigger_alarm = True
for filename in recent_files[::-1]:
gjdata = read_geojson_data((past_alarms_dir / filename))
Expand Down
5 changes: 5 additions & 0 deletions activefires_pp/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
fire_alarms_dir: /path/where/the/filtered/alarms/will/be/stored
restapi_url: "https://xxx.smhi.se:xxxx"
time_and_space_thresholds:
hour_threshold: 6
long_fires_threshold_km: 1.2
spatial_threshold_km: 0.8
"""

TEST_YAML_TOKENS = """xauth_tokens:
Expand Down
76 changes: 67 additions & 9 deletions activefires_pp/tests/test_spatiotemporal_alarm_filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
from unittest.mock import patch
import pathlib
import json
import logging

from activefires_pp.geojson_utils import read_geojson_data
from activefires_pp.spatiotemporal_alarm_filtering import create_alarms_from_fire_detections
from activefires_pp.spatiotemporal_alarm_filtering import join_fire_detections
Expand Down Expand Up @@ -262,8 +264,10 @@ def test_create_alarms_from_fire_detections(fake_past_detections_dir):

pattern = "sos_{start_time:%Y%m%d_%H%M%S}_{id:d}.geojson"
# Default distance threshold but disregarding past alarms:
space_time_threshold = {'hour_threshold': 0.0,
'long_fires_threshold_km': 1.2}
alarms = create_alarms_from_fire_detections(json_test_data, fake_past_detections_dir,
pattern, 1.2, 0.0)
pattern, space_time_threshold)
assert len(alarms) == 3
assert alarms[0]['features']['geometry']['coordinates'] == [16.247334, 57.172443]
assert alarms[0]['features']['properties']['power'] == 5.85325146
Expand All @@ -286,9 +290,10 @@ def test_create_alarms_from_fire_detections(fake_past_detections_dir):
assert alarms[2]['features']['properties']['tb'] == 310.37322998
assert alarms[2]['features']['properties']['observation_time'] == '2021-06-19T02:58:45.700000+02:00'

# Default thresholds (distance=1.2km, time-interval=6hours):
space_time_threshold = {'hour_threshold': 6.0,
'long_fires_threshold_km': 1.2}
alarms = create_alarms_from_fire_detections(json_test_data, fake_past_detections_dir,
pattern)
pattern, space_time_threshold)

assert len(alarms) == 1
assert alarms[0]['features']['geometry']['coordinates'] == [16.249069, 57.156235]
Expand All @@ -298,10 +303,10 @@ def test_create_alarms_from_fire_detections(fake_past_detections_dir):
assert alarms[0]['features']['properties']['tb'] == 310.37322998
assert alarms[0]['features']['properties']['observation_time'] == '2021-06-19T02:58:45.700000+02:00'

# Default time-interval(=6hours) threshold but smaller distance threshold
# (the threshold used to split larger fires):
space_time_threshold = {'hour_threshold': 6.0,
'long_fires_threshold_km': 0.6}
alarms = create_alarms_from_fire_detections(json_test_data, fake_past_detections_dir,
pattern, 0.6)
pattern, space_time_threshold)

assert len(alarms) == 2
assert alarms[0]['features']['geometry']['coordinates'] == [16.242212, 57.157097]
Expand All @@ -318,8 +323,10 @@ def test_create_alarms_from_fire_detections(fake_past_detections_dir):
assert alarms[1]['features']['properties']['tb'] == 310.37322998
assert alarms[1]['features']['properties']['observation_time'] == '2021-06-19T02:58:45.700000+02:00'

space_time_threshold = {'hour_threshold': 16.0,
'long_fires_threshold_km': 1.2}
alarms = create_alarms_from_fire_detections(json_test_data, fake_past_detections_dir,
pattern, 1.2, 16.0)
pattern, space_time_threshold)

assert len(alarms) == 1
assert alarms[0]['features']['geometry']['coordinates'] == [16.249069, 57.156235]
Expand All @@ -329,8 +336,9 @@ def test_create_alarms_from_fire_detections(fake_past_detections_dir):
assert alarms[0]['features']['properties']['tb'] == 310.37322998
assert alarms[0]['features']['properties']['observation_time'] == '2021-06-19T02:58:45.700000+02:00'

space_time_threshold = {}
alarms = create_alarms_from_fire_detections(json_test_data, fake_past_detections_dir,
pattern, 1.2)
pattern, space_time_threshold)

assert len(alarms) == 1
assert alarms[0]['features']['geometry']['coordinates'] == [16.249069, 57.156235]
Expand Down Expand Up @@ -377,7 +385,10 @@ def test_alarm_filter_runner_init(setup_comm,
'publish_topic': '/VIIRS/L2/Fires/PP/SOSAlarm',
'geojson_file_pattern_alarms': 'sos_{start_time:%Y%m%d_%H%M%S}_{id:d}.geojson',
'fire_alarms_dir': '/path/where/the/filtered/alarms/will/be/stored',
'restapi_url': 'https://xxx.smhi.se:xxxx'}
'restapi_url': 'https://xxx.smhi.se:xxxx',
'time_and_space_thresholds': {'hour_threshold': 6,
'long_fires_threshold_km': 1.2,
'spatial_threshold_km': 0.8}}


@pytest.mark.usefixtures("fake_token_file")
Expand Down Expand Up @@ -456,3 +467,50 @@ def test_alarm_filter_runner_call_spatio_temporal_alarm_filtering_no_alarms(crea
dummy_msg = None
result = alarm_runner.spatio_temporal_alarm_filtering(dummy_msg)
assert result is None


@patch('activefires_pp.spatiotemporal_alarm_filtering.AlarmFilterRunner._setup_and_start_communication')
def test_check_and_set_threshold_threshold_okay(setup_comm,
monkeypatch, fake_yamlconfig_file,
fake_token_file, caplog):
"""Test checking and setting a spatio-temporal threshold - threshold accepted."""
from activefires_pp.config import read_config

monkeypatch.setenv("FIREALARMS_XAUTH_FILEPATH", str(fake_token_file))

alarm_runner = AlarmFilterRunner(str(fake_yamlconfig_file))
config = read_config(alarm_runner.configfile)

thr_type = 'hour_threshold'
alarm_runner.time_and_space_thresholds[thr_type] = None
alarm_runner._log_message_per_threshold = {thr_type:
('okay message - hours: %3.1f', 'error message - hours'), }

with caplog.at_level(logging.INFO):
alarm_runner._check_and_set_threshold(thr_type, config)

log_output = 'okay message - hours: 6.0'
assert log_output in caplog.text
assert alarm_runner.time_and_space_thresholds[thr_type] == 6.0


@patch('activefires_pp.spatiotemporal_alarm_filtering.AlarmFilterRunner._setup_and_start_communication')
def test_check_and_set_threshold_wrong_threshold(setup_comm,
monkeypatch, fake_yamlconfig_file,
fake_token_file, caplog):
"""Test checking and setting a spatio-temporal threshold - wrong threshold name."""
from activefires_pp.config import read_config

monkeypatch.setenv("FIREALARMS_XAUTH_FILEPATH", str(fake_token_file))

alarm_runner = AlarmFilterRunner(str(fake_yamlconfig_file))
config = read_config(alarm_runner.configfile)

thr_type = 'unknown_threshold'
alarm_runner._log_message_per_threshold = {thr_type:
('okay message - hours: %3.1f', 'error message - hours'), }
with pytest.raises(OSError) as exec_info:
alarm_runner._check_and_set_threshold(thr_type, config)

expected = alarm_runner._log_message_per_threshold[thr_type][1]
assert str(exec_info.value) == expected
6 changes: 6 additions & 0 deletions examples/spatio_temporal_alarm_filtering.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,9 @@ fire_alarms_dir: /path/where/the/filtered/alarms/will/be/stored
#geojson_fires_dir: /path/to/where/the/filtered/detections/are/stored

restapi_url: "https://xxx.smhi.se:xxxx"

# Thresholds in time and space:
time_and_space_thresholds:
hour_threshold: 6
long_fires_threshold_km: 1.2
spatial_threshold_km: 0.8

0 comments on commit 781ee30

Please sign in to comment.