Skip to content

Commit

Permalink
Merge 582d012 into 6463205
Browse files Browse the repository at this point in the history
  • Loading branch information
adybbroe committed Oct 20, 2022
2 parents 6463205 + 582d012 commit 0974cf1
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 34 deletions.
11 changes: 6 additions & 5 deletions activefires_pp/geometries_from_shapefiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Loading shapefile geometries from files.
"""
"""Loading shapefile geometries from files."""

from glob import glob
import os
Expand All @@ -33,8 +32,8 @@ class ShapeGeometry(object):
"""Geometry from a shape file."""

def __init__(self, shapefilepath, globstr='*.shp'):
"""Initialize the ShapeGeometry class."""
self.filepaths = _get_shapefile_paths(shapefilepath, globstr)

self.geometries = None
self.attributes = None
self._get_proj()
Expand All @@ -51,7 +50,6 @@ def load(self):

def _get_proj(self):
"""Get and return the Proj.4 string."""

self.proj4str = []
for filepath in self.filepaths:
prj_filename = filepath.strip('.shp') + '.prj'
Expand Down Expand Up @@ -84,4 +82,7 @@ def _get_shapefile_paths(path, globstr='*.shp'):
if os.path.isfile(path):
return [path]

return glob(os.path.join(path, globstr))
shapefile_paths = glob(os.path.join(path, globstr))
if len(shapefile_paths) == 0:
raise OSError('No matching shapefiles found on disk. Path = %s, glob-string = %s', path, globstr)
return shapefile_paths
33 changes: 21 additions & 12 deletions activefires_pp/post_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Post processing on the Active Fire detections.
"""
"""Post processing on the Active Fire detections."""

import socket
from trollsift import Parser, globify
Expand Down Expand Up @@ -84,6 +83,7 @@ class ActiveFiresShapefileFiltering(object):
"""

def __init__(self, filepath=None, afdata=None, platform_name=None, timezone='GMT'):
"""Initialize the ActiveFiresShapefileFiltering class."""
self.input_filepath = filepath
self._afdata = afdata
if afdata is None:
Expand All @@ -95,9 +95,7 @@ def __init__(self, filepath=None, afdata=None, platform_name=None, timezone='GMT
self.platform_name = platform_name

def get_af_data(self, filepattern=None, localtime=True):
"""Read the Active Fire results from file - ascii formatted output from CSPP VIIRS-AF.
"""
"""Read the Active Fire results from file - ascii formatted output from CSPP VIIRS-AF."""
if self._afdata is not None:
# Make sure the attrs are populated with metadata instance attribute
self._afdata.attrs.update(self.metadata)
Expand Down Expand Up @@ -152,7 +150,6 @@ def fires_filtering(self, shapefile, start_geometries_index=1, inside=True):
If *inside* is True the filtering will keep those detections that are inside the polygon.
If *inside* is False the filtering will disregard the detections that are inside the polygon.
"""

detections = self._afdata

lons = detections.longitude.values
Expand All @@ -176,6 +173,8 @@ def get_regional_filtermasks(self, shapefile, globstr):
lons = detections.longitude.values
lats = detections.latitude.values

logger.debug("Before ShapeGeometry instance - shapefile name = %s" % str(shapefile))
logger.debug("Shape file glob-string = %s" % str(globstr))
shape_geom = ShapeGeometry(shapefile, globstr)
shape_geom.load()

Expand Down Expand Up @@ -295,7 +294,6 @@ def store_geojson(output_filename, detections, platform_name=None):

def get_mask_from_multipolygon(points, geometry):
"""Get mask for points from a shapely Multipolygon."""

shape = geometry.geoms[0]
pth = Path(shape.exterior.coords)
mask = pth.contains_points(points)
Expand All @@ -315,6 +313,8 @@ def get_mask_from_multipolygon(points, geometry):
def get_global_mask_from_shapefile(shapefile, lonlats, start_geom_index=0):
"""Given geographical (lon,lat) points get a mask to apply when filtering."""
lons, lats = lonlats

logger.debug("Getting the global mask from file: shapefile file path = %s" % str(shapefile))
shape_geom = ShapeGeometry(shapefile)
shape_geom.load()

Expand Down Expand Up @@ -344,6 +344,7 @@ def __init__(self, configfile, shp_boarders, shp_mask, regional_filtermask=None)
super().__init__()
self.shp_boarders = shp_boarders
self.shp_filtermask = shp_mask

self.regional_filtermask = regional_filtermask
self.configfile = configfile
self.options = {}
Expand Down Expand Up @@ -376,6 +377,8 @@ def _setup_and_start_communication(self):
now = datetime_utc2local(datetime.now(), self.timezone)
logger.debug("Output times for timezone: {zone} Now = {time}".format(zone=str(self.timezone), time=now))

self._check_boarders_shapes_exists()

self.listener = ListenerContainer(topics=[self.input_topic])
self.publisher = NoisyPublisher("active_fires_postprocessing")
self.publisher.start()
Expand Down Expand Up @@ -444,6 +447,7 @@ def run(self):
continue

output_messages, afdata = self.fires_filtering(msg, af_shapeff)
logger.debug("After fires_filtering...: Number of messages = %d", len(output_messages))

for output_msg in output_messages:
if output_msg:
Expand Down Expand Up @@ -495,15 +499,15 @@ def regional_fires_filtering_and_publishing(self, msg, regional_fmask, afsff_obj
filepath = store_geojson(out_filepath, data_in_region, platform_name=fmda['platform'])
if not filepath:
logger.warning("Something wrong happended storing regional " +
"data to Geojson - area: {name}".format(str(region_name)))
"data to Geojson - area: {name}".format(name=str(region_name)))
continue

outmsg = self._generate_output_message(filepath, msg, regional_fmask[region_name])
output_messages.append(outmsg)
logger.info("Geojson file created! Number of fires in region = %d", len(data_in_region))

logger.debug("Regional masking done. Number of regions with fire " +
"detections on this granule: %s", str(regions_with_detections))
"detections on this granule: %s" % str(regions_with_detections))
return output_messages

def fires_filtering(self, msg, af_shapeff):
Expand All @@ -524,12 +528,15 @@ def fires_filtering(self, msg, af_shapeff):

# National filtering:
af_shapeff.fires_filtering(self.shp_boarders)

# Metadata should be transfered here!
afdata_ff = af_shapeff.get_af_data()

if len(afdata_ff) > 0:
logger.debug("Doing the fires filtering: shapefile-mask = %s", str(self.shp_filtermask))
af_shapeff.fires_filtering(self.shp_filtermask, start_geometries_index=0, inside=False)
afdata_ff = af_shapeff.get_af_data()
logger.debug("After fires_filtering: Number of fire detections left: %d", len(afdata_ff))

filepath = store_geojson(out_filepath, afdata_ff, platform_name=af_shapeff.platform_name)
out_messages = self.get_output_messages(filepath, msg, len(afdata_ff))
Expand All @@ -548,7 +555,6 @@ def get_output_messages(self, filepath, msg, number_of_data):

def _generate_output_message(self, filepath, input_msg, region=None):
"""Create the output message to publish."""

output_topic = generate_posttroll_topic(self.output_topic, region)
to_send = prepare_posttroll_message(input_msg, region)
to_send['uri'] = ('ssh://%s/%s' % (self.host, filepath))
Expand All @@ -561,7 +567,6 @@ def _generate_output_message(self, filepath, input_msg, region=None):

def _generate_no_fires_messages(self, input_msg, msg_string):
"""Create the output messages to publish."""

to_send = prepare_posttroll_message(input_msg)
to_send['info'] = msg_string
publish_messages = []
Expand All @@ -571,6 +576,11 @@ def _generate_no_fires_messages(self, input_msg, msg_string):

return publish_messages

def _check_boarders_shapes_exists(self):
"""Check that the national boarders shapefile exists on disk."""
if not os.path.exists(self.shp_boarders):
raise OSError("Shape file does not exist! Filename = %s" % self.shp_boarders)

def close(self):
"""Shutdown the Active Fires postprocessing."""
logger.info('Terminating Active Fires post processing.')
Expand Down Expand Up @@ -613,7 +623,6 @@ def generate_posttroll_topic(output_topic, region=None):

def prepare_posttroll_message(input_msg, region=None):
"""Create the basic posttroll-message fields and return."""

to_send = input_msg.data.copy()
to_send.pop('dataset', None)
to_send.pop('collection', None)
Expand Down
6 changes: 3 additions & 3 deletions activefires_pp/spatiotemporal_alarm_filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import signal
from queue import Empty
from threading import Thread
from requests.exceptions import HTTPError
from requests.exceptions import HTTPError, ConnectionError
from posttroll.listener import ListenerContainer
from posttroll.message import Message
from posttroll.publisher import NoisyPublisher
Expand Down Expand Up @@ -189,8 +189,8 @@ def send_alarms(self, geojson_alarms, msg):
try:
post_alarm(alarm['features'], self.restapi_url, self._xauth_token)
LOG.info('Alarm sent - status OK')
except HTTPError:
LOG.exception('Failed sending alarm!')
except (HTTPError, ConnectionError) as err:
LOG.exception('Failed sending alarm! Error: %s', str(err))
LOG.error('Data: %s', str(alarm['features']))

output_message = _create_output_message(msg, self.output_topic, alarm, output_filename)
Expand Down
10 changes: 10 additions & 0 deletions activefires_pp/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,13 @@ def fake_past_detections_dir(tmp_path):
fpt.write(PAST_ALARMS_MONSTERAS2)

yield file_path.parent


@pytest.fixture
def fake_national_boarders_shapefile(tmp_path):
"""Write fake national boarders shape file."""
file_path = tmp_path / 'some_national_boarders_shape.yaml'
with open(file_path, 'w') as fpt:
fpt.write('')

yield file_path
71 changes: 57 additions & 14 deletions activefires_pp/tests/test_fires_filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""
"""
"""Test the Fires filtering functionality."""

import pytest
from unittest.mock import patch
import pandas as pd
import numpy as np
Expand Down Expand Up @@ -75,11 +75,12 @@
CONFIG_EXAMPLE = {'publish_topic': '/VIIRS/L2/Fires/PP',
'subscribe_topics': 'VIIRS/L2/AFI',
'af_pattern_ibands':
'AFIMG_{platform:s}_d{start_time:%Y%m%d_t%H%M%S%f}_e{end_hour:%H%M%S%f}_b{orbit:s}_c{processing_time:%Y%m%d%H%M%S%f}_cspp_dev.txt',
'AFIMG_{platform:s}_d{start_time:%Y%m%d_t%H%M%S%f}_e{end_hour:%H%M%S%f}' +
'_b{orbit:s}_c{processing_time:%Y%m%d%H%M%S%f}_cspp_dev.txt',
'geojson_file_pattern_national': 'AFIMG_{platform:s}_d{start_time:%Y%m%d_t%H%M%S}.geojson',
'geojson_file_pattern_regional': 'AFIMG_{platform:s}_d{start_time:%Y%m%d_t%H%M%S}_{region_name:s}.geojson',
'geojson_file_pattern_regional': 'AFIMG_{platform:s}_d{start_time:%Y%m%d_t%H%M%S}_' +
'{region_name:s}.geojson',
'regional_shapefiles_format': 'omr_{region_code:s}_Buffer.{ext:s}',

'output_dir': '/path/where/the/filtered/results/will/be/stored',
'timezone': 'Europe/Stockholm'}

Expand All @@ -90,23 +91,28 @@
"b{orbit:s}_c{processing_time:%Y%m%d%H%M%S%f}_cspp_dev.txt")

TEST_REGIONAL_MASK = {}
TEST_REGIONAL_MASK['Bergslagen (RRB)'] = {'mask': np.array([False, False, False, False, False, False, False, False, False,
False, False, False, False, True, False, False, False, False]),
TEST_REGIONAL_MASK['Bergslagen (RRB)'] = {'mask': np.array([False, False, False, False, False,
False, False, False, False,
False, False, False, False, True,
False, False, False, False]),
'attributes': {'Join_Count': 2, 'TARGET_FID': 142,
'Kod_omr': '1438', 'KNNAMN': 'Dals-Ed',
'LANDAREAKM': 728.0, 'KNBEF96': 5287.0,
'OBJECTID': 1080804, 'Datum_Tid': '2016-06-13',
'Testomr': 'Bergslagen (RRB)',
'Shape_Leng': 2131994.36042, 'Shape_Area': 53512139344.2},
'all_inside_test_area': False, 'some_inside_test_area': True}
TEST_REGIONAL_MASK['Västerviks kommun'] = {'mask': np.array([False, False, False, False, False, False, False, False, False,
False, False, False, False, False, False, False, False, False]),
TEST_REGIONAL_MASK['Västerviks kommun'] = {'mask': np.array([False, False, False, False, False,
False, False, False, False,
False, False, False, False, False,
False, False, False, False]),
'attributes': {'Join_Count': 30, 'TARGET_FID': 85,
'Kod_omr': '0883', 'KNNAMN': 'Västervik',
'LANDAREAKM': 1870.5, 'KNBEF96': 39579.0,
'OBJECTID': 1079223, 'Datum_Tid': '2016-06-13',
'Testomr': 'Västerviks kommun',
'Shape_Leng': 251653.298274, 'Shape_Area': 2040770168.02},
'Shape_Leng': 251653.298274,
'Shape_Area': 2040770168.02},
'all_inside_test_area': False, 'some_inside_test_area': False}

FAKE_MASK1 = np.array([False, False, False, False, False, False, False, False, True,
Expand All @@ -117,7 +123,6 @@
@patch('activefires_pp.post_processing._read_data')
def test_add_start_and_end_time_to_active_fires_data_utc(readdata):
"""Test adding start and end times to the active fires data."""

myfilepath = TEST_ACTIVE_FIRES_FILEPATH

fstream = io.StringIO(TEST_ACTIVE_FIRES_FILE_DATA)
Expand All @@ -140,7 +145,6 @@ def test_add_start_and_end_time_to_active_fires_data_utc(readdata):
@patch('activefires_pp.post_processing._read_data')
def test_add_start_and_end_time_to_active_fires_data_localtime(readdata):
"""Test adding start and end times to the active fires data."""

myfilepath = TEST_ACTIVE_FIRES_FILEPATH

fstream = io.StringIO(TEST_ACTIVE_FIRES_FILE_DATA)
Expand Down Expand Up @@ -243,7 +247,6 @@ def test_regional_fires_filtering(setup_comm, get_config, gethostname):
@patch('activefires_pp.post_processing.get_global_mask_from_shapefile', side_effect=[FAKE_MASK1, FAKE_MASK2])
def test_general_national_fires_filtering(get_global_mask, setup_comm, get_config, gethostname):
"""Test the general/basic national fires filtering."""

get_config.return_value = CONFIG_EXAMPLE
gethostname.return_value = "my.host.name"

Expand Down Expand Up @@ -278,10 +281,50 @@ def test_general_national_fires_filtering(get_global_mask, setup_comm, get_confi

store_geojson.assert_called_once()
get_output_msg.assert_called_once()
get_global_mask.call_count == 2
assert get_global_mask.call_count == 2

assert isinstance(result, pd.core.frame.DataFrame)
assert len(result) == 1
np.testing.assert_almost_equal(result.iloc[0]['latitude'], 59.52483368)
np.testing.assert_almost_equal(result.iloc[0]['longitude'], 17.1681633)
assert outmsg == ["my fake output message"]


@pytest.mark.usefixtures("fake_national_boarders_shapefile")
@patch('socket.gethostname')
@patch('activefires_pp.post_processing.read_config')
@patch('activefires_pp.post_processing.ActiveFiresPostprocessing._setup_and_start_communication')
def test_checking_national_boarders_shapefile_file_exists(setup_comm, get_config,
gethostname, fake_national_boarders_shapefile):
"""Test the checking of the national boarders shapefile - boarders shapefile exists."""
get_config.return_value = CONFIG_EXAMPLE
gethostname.return_value = "my.host.name"

myconfigfile = "/my/config/file/path"
mymask_file = "/my/shape/file/with/polygons/to/filter/out"

afpp = ActiveFiresPostprocessing(myconfigfile, fake_national_boarders_shapefile, mymask_file)
afpp._check_boarders_shapes_exists()

assert afpp.shp_boarders.name == 'some_national_boarders_shape.yaml'
assert afpp.shp_boarders.is_file()


@patch('socket.gethostname')
@patch('activefires_pp.post_processing.read_config')
@patch('activefires_pp.post_processing.ActiveFiresPostprocessing._setup_and_start_communication')
def test_checking_national_boarders_shapefile_file_nonexisting(setup_comm, get_config, gethostname):
"""Test the checking of the national boarders shapefile - boarders shapefile does not exist."""
get_config.return_value = CONFIG_EXAMPLE
gethostname.return_value = "my.host.name"

myconfigfile = "/my/config/file/path"
myboarders_file = "/my/shape/file/with/country/boarders"
mymask_file = "/my/shape/file/with/polygons/to/filter/out"

afpp = ActiveFiresPostprocessing(myconfigfile, myboarders_file, mymask_file)
with pytest.raises(OSError) as exec_info:
afpp._check_boarders_shapes_exists()

expected = "Shape file does not exist! Filename = /my/shape/file/with/country/boarders"
assert str(exec_info.value) == expected

0 comments on commit 0974cf1

Please sign in to comment.