Skip to content

Commit

Permalink
Merge 44b49e1 into 33a20c2
Browse files Browse the repository at this point in the history
  • Loading branch information
adybbroe committed Aug 16, 2023
2 parents 33a20c2 + 44b49e1 commit f151fc8
Show file tree
Hide file tree
Showing 8 changed files with 527 additions and 271 deletions.
77 changes: 72 additions & 5 deletions activefires_pp/geojson_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Copyright (c) 2022 Adam Dybbroe
# Copyright (c) 2022 - 2023 Adam Dybbroe

# Author(s):

Expand All @@ -22,15 +22,20 @@

"""Geojson utilities."""

import os
import pyproj
import geojson
from geojson import Feature, Point, FeatureCollection, dump
import json
import logging
from trollsift import Parser, globify
import pytz
from datetime import datetime
import numpy as np

LOG = logging.getLogger(__name__)
from activefires_pp.utils import json_serial

logger = logging.getLogger(__name__)


def read_geojson_data(filename):
Expand All @@ -41,9 +46,9 @@ def read_geojson_data(filename):
with open(filename, "r") as fpt:
return geojson.load(fpt)
except json.decoder.JSONDecodeError:
LOG.exception("Geojson file invalid and cannot be read: %s", str(filename))
logger.exception("Geojson file invalid and cannot be read: %s", str(filename))
else:
LOG.error("No valid filename to read: %s", str(filename))
logger.error("No valid filename to read: %s", str(filename))


def get_geojson_files_in_observation_time_order(path, pattern, time_interval):
Expand Down Expand Up @@ -72,6 +77,57 @@ def get_geojson_files_in_observation_time_order(path, pattern, time_interval):
return files.tolist()


def geojson_feature_collection_from_detections(detections, platform_name=None):
"""Create the Geojson feature collection from fire detection data."""
if len(detections) == 0:
logger.debug("No detections to save!")
return None

# Convert points to GeoJSON
features = []
for idx in range(len(detections)):
starttime = detections.iloc[idx].starttime
endtime = detections.iloc[idx].endtime
mean_granule_time = starttime.to_pydatetime() + (endtime.to_pydatetime() -
starttime.to_pydatetime()) / 2.

prop = {'power': detections.iloc[idx].power,
'tb': detections.iloc[idx].tb,
'confidence': int(detections.iloc[idx].conf),
'id': detections.iloc[idx].detection_id,
'observation_time': json_serial(mean_granule_time)
}
if platform_name:
prop['platform_name'] = platform_name
else:
logger.debug("No platform name specified for output")

feat = Feature(
geometry=Point(map(float, [detections.iloc[idx].longitude, detections.iloc[idx].latitude])),
properties=prop)
features.append(feat)

return FeatureCollection(features)


def map_coordinates_in_feature_collection(feature_collection, epsg_str):
"""Map the Point coordinates of all data in Feature Collection."""
outp = pyproj.Proj(init='EPSG:'+epsg_str)

mapped_features = []
# Iterate through each feature of the feature collection
for feature in feature_collection['features']:
lon, lat = feature['geometry']['coordinates']
prop = feature['properties']
feature_out = Feature(geometry=Point(map(float, [lon, lat])), properties=prop)
# Project/transform coordinate pairs of each Point
result = outp(lon, lat)
feature_out['geometry']['coordinates'] = [result[0], result[1]]
mapped_features.append(feature_out)

return FeatureCollection(mapped_features)


def store_geojson_alarm(fires_alarms_dir, file_parser, idx, alarm):
"""Store the fire alarm to a geojson file."""
utc = pytz.timezone('utc')
Expand All @@ -82,6 +138,17 @@ def store_geojson_alarm(fires_alarms_dir, file_parser, idx, alarm):
'platform_name': platform_name})
output_filename = fires_alarms_dir / fname
with open(output_filename, 'w') as fpt:
geojson.dump(alarm, fpt)
dump(alarm, fpt)

return output_filename


def store_geojson(output_filename, feature_collection):
"""Store the Geojson feature collection of fire detections on disk."""
path = os.path.dirname(output_filename)
if not os.path.exists(path):
logger.info("Create directory: %s", path)
os.makedirs(path)

with open(output_filename, 'w') as fpt:
dump(feature_collection, fpt)
108 changes: 42 additions & 66 deletions activefires_pp/post_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import numpy as np
import os
from six.moves.urllib.parse import urlparse
from geojson import Feature, Point, FeatureCollection, dump

import logging
import signal
from queue import Empty
Expand All @@ -42,9 +42,12 @@
from matplotlib.path import Path
import shapely

from activefires_pp.geojson_utils import store_geojson
from activefires_pp.geojson_utils import geojson_feature_collection_from_detections
from activefires_pp.geojson_utils import map_coordinates_in_feature_collection

from activefires_pp.utils import datetime_utc2local
from activefires_pp.utils import get_local_timezone_offset
from activefires_pp.utils import json_serial
from activefires_pp.config import read_config
from activefires_pp.geometries_from_shapefiles import ShapeGeometry

Expand Down Expand Up @@ -253,50 +256,6 @@ def store(output_filename, detections):
return None


def geojson_feature_collection_from_detections(detections, platform_name=None):
"""Create the Geojson feature collection from fire detection data."""
if len(detections) == 0:
logger.debug("No detections to save!")
return None

# Convert points to GeoJSON
features = []
for idx in range(len(detections)):
starttime = detections.iloc[idx].starttime
endtime = detections.iloc[idx].endtime
mean_granule_time = starttime.to_pydatetime() + (endtime.to_pydatetime() -
starttime.to_pydatetime()) / 2.

prop = {'power': detections.iloc[idx].power,
'tb': detections.iloc[idx].tb,
'confidence': int(detections.iloc[idx].conf),
'id': detections.iloc[idx].detection_id,
'observation_time': json_serial(mean_granule_time)
}
if platform_name:
prop['platform_name'] = platform_name
else:
logger.debug("No platform name specified for output")

feat = Feature(
geometry=Point(map(float, [detections.iloc[idx].longitude, detections.iloc[idx].latitude])),
properties=prop)
features.append(feat)

return FeatureCollection(features)


def store_geojson(output_filename, feature_collection):
"""Store the Geojson feature collection of fire detections on disk."""
path = os.path.dirname(output_filename)
if not os.path.exists(path):
logger.info("Create directory: %s", path)
os.makedirs(path)

with open(output_filename, 'w') as fpt:
dump(feature_collection, fpt)


def get_mask_from_multipolygon(points, geometry, start_idx=1):
"""Get mask for points from a shapely Multipolygon."""
shape = geometry.geoms[0]
Expand Down Expand Up @@ -357,6 +316,7 @@ def __init__(self, configfile, shp_borders, shp_mask, regional_filtermask=None):
self.output_topic = self.options['publish_topic']
self.infile_pattern = self.options.get('af_pattern_ibands')
self.outfile_pattern_national = self.options.get('geojson_file_pattern_national')
self.outfile_pattern_national_sweref99 = self.options.get('geojson_file_pattern_national_sweref99')
self.outfile_pattern_regional = self.options.get('geojson_file_pattern_regional')
self.output_dir = self.options.get('output_dir', '/tmp')
self.filepath_detection_id_cache = self.options.get('filepath_detection_id_cache')
Expand Down Expand Up @@ -431,6 +391,31 @@ def check_incoming_message_and_get_filename(self, msg):

return filename

def _national_save_and_publish(self, feature_collection, ndata, af_shapeff, msg, sweref99=False):
"""Take the feature collection and store the results in a Geojson file and publish."""
if feature_collection is None:
logger.info("No geojson file created, number of fires after filtering = %d", ndata)
output_messages = self._generate_no_fires_messages(msg,
'No true fire detections inside National borders') # noqa
return

fmda = af_shapeff.metadata
if sweref99:
pout = Parser(self.outfile_pattern_national_sweref99)
else:
pout = Parser(self.outfile_pattern_national)

out_filepath = os.path.join(self.output_dir, pout.compose(fmda))
logger.debug("Output file path = %s", out_filepath)

store_geojson(out_filepath, feature_collection)
output_messages = self.get_output_messages(out_filepath, msg, ndata, sweref99=sweref99)

for output_msg in output_messages:
if output_msg:
logger.debug("Sending message: %s", str(output_msg))
self.publisher.send(str(output_msg))

def do_postprocessing_on_message(self, msg, filename):
"""Do the fires post processing on a message."""
platform_name = msg.data.get('platform_name')
Expand All @@ -456,26 +441,14 @@ def do_postprocessing_on_message(self, msg, filename):

# 1) Create geojson feature collection
# 2) Dump geojson data to disk

feature_collection = geojson_feature_collection_from_detections(afdata,
platform_name=af_shapeff.platform_name)

fmda = af_shapeff.metadata
pout = Parser(self.outfile_pattern_national)
out_filepath = os.path.join(self.output_dir, pout.compose(fmda))
logger.debug("Output file path = %s", out_filepath)
self._national_save_and_publish(feature_collection, len(afdata), af_shapeff, msg)

if feature_collection is None:
logger.info("No geojson file created, number of fires after filtering = %d", len(afdata))
output_messages = self._generate_no_fires_messages(msg,
'No true fire detections inside National borders') # noqa
else:
store_geojson(out_filepath, feature_collection)
output_messages = self.get_output_messages(out_filepath, msg, len(afdata))

for output_msg in output_messages:
if output_msg:
logger.debug("Sending message: %s", str(output_msg))
self.publisher.send(str(output_msg))
sweref99_fc = map_coordinates_in_feature_collection(feature_collection, '3006')
self._national_save_and_publish(sweref99_fc, len(afdata), af_shapeff, msg, sweref99=True)

# Do the regional filtering now:
if not self.regional_filtermask:
Expand Down Expand Up @@ -579,20 +552,23 @@ def fires_filtering(self, msg, af_shapeff):

return afdata_ff

def get_output_messages(self, filepath, msg, number_of_data):
def get_output_messages(self, filepath, msg, number_of_data, sweref99=False):
"""Generate the adequate output message(s) depending on if an output file was created or not."""
logger.info("Geojson file created! Number of fires = %d", number_of_data)
return [self._generate_output_message(filepath, msg)]
return [self._generate_output_message(filepath, msg, sweref99=sweref99)]

def _generate_output_message(self, filepath, input_msg, region=None):
def _generate_output_message(self, filepath, input_msg, region=None, sweref99=False):
"""Create the output message to publish."""
output_topic = generate_posttroll_topic(self.output_topic, region)
to_send = prepare_posttroll_message(input_msg, region)
to_send['uri'] = ('ssh://%s/%s' % (self.host, filepath))
to_send['uid'] = os.path.basename(filepath)
to_send['type'] = 'GEOJSON-filtered'
to_send['format'] = 'geojson'
to_send['product'] = 'afimg'
if sweref99:
to_send['product'] = 'afimg_sweref99'
else:
to_send['product'] = 'afimg'
pubmsg = Message(output_topic, 'file', to_send)
return pubmsg

Expand Down
Loading

0 comments on commit f151fc8

Please sign in to comment.