Skip to content

Commit

Permalink
Merge branch 'sweref99_projection_output' into smhi-msb-enhancements2023
Browse files Browse the repository at this point in the history
# Conflicts:
#	activefires_pp/post_processing.py
#	activefires_pp/tests/conftest.py
#	activefires_pp/tests/test_fires_filtering.py
#	activefires_pp/tests/test_geojson_utils.py
  • Loading branch information
Adam.Dybbroe committed Jul 18, 2023
2 parents 34e1cb6 + efe15ba commit 9068b02
Show file tree
Hide file tree
Showing 8 changed files with 567 additions and 496 deletions.
88 changes: 83 additions & 5 deletions activefires_pp/geojson_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Copyright (c) 2022 Adam Dybbroe
# Copyright (c) 2022 - 2023 Adam Dybbroe

# Author(s):

Expand All @@ -22,15 +22,20 @@

"""Geojson utilities."""

import os
import pyproj
import geojson
from geojson import Feature, Point, FeatureCollection, dump
import json
import logging
from trollsift import Parser, globify
import pytz
from datetime import datetime
import numpy as np

LOG = logging.getLogger(__name__)
from activefires_pp.utils import json_serial

logger = logging.getLogger(__name__)


def read_geojson_data(filename):
Expand All @@ -41,9 +46,9 @@ def read_geojson_data(filename):
with open(filename, "r") as fpt:
return geojson.load(fpt)
except json.decoder.JSONDecodeError:
LOG.exception("Geojson file invalid and cannot be read: %s", str(filename))
logger.exception("Geojson file invalid and cannot be read: %s", str(filename))
else:
LOG.error("No valid filename to read: %s", str(filename))
logger.error("No valid filename to read: %s", str(filename))


def get_geojson_files_in_observation_time_order(path, pattern, time_interval):
Expand Down Expand Up @@ -72,6 +77,68 @@ def get_geojson_files_in_observation_time_order(path, pattern, time_interval):
return files.tolist()


def geojson_feature_collection_from_detections(detections, platform_name=None):
"""Create the Geojson feature collection from fire detection data."""
if len(detections) == 0:
logger.debug("No detections to save!")
return None

# Convert points to GeoJSON
features = []
for idx in range(len(detections)):
starttime = detections.iloc[idx].starttime
endtime = detections.iloc[idx].endtime
mean_granule_time = starttime.to_pydatetime() + (endtime.to_pydatetime() -
starttime.to_pydatetime()) / 2.

prop = {'power': detections.iloc[idx].power,
'tb': detections.iloc[idx].tb,
'confidence': int(detections.iloc[idx].conf),
'observation_time': json_serial(mean_granule_time)
}

try:
prop['tb_celcius'] = detections.iloc[idx].tb_celcius
except AttributeError:
logger.debug("Failed adding the TB in celcius!")
pass
try:
prop['id'] = detections.iloc[idx].detection_id
except AttributeError:
logger.debug("Failed adding the unique detection id!")
pass

if platform_name:
prop['platform_name'] = platform_name
else:
logger.debug("No platform name specified for output")

feat = Feature(
geometry=Point(map(float, [detections.iloc[idx].longitude, detections.iloc[idx].latitude])),
properties=prop)
features.append(feat)

return FeatureCollection(features)


def map_coordinates_in_feature_collection(feature_collection, epsg_str):
"""Map the Point coordinates of all data in Feature Collection."""
outp = pyproj.Proj(init='EPSG:'+epsg_str)

mapped_features = []
# Iterate through each feature of the feature collection
for feature in feature_collection['features']:
lon, lat = feature['geometry']['coordinates']
prop = feature['properties']
feature_out = Feature(geometry=Point(map(float, [lon, lat])), properties=prop)
# Project/transform coordinate pairs of each Point
result = outp(lon, lat)
feature_out['geometry']['coordinates'] = [result[0], result[1]]
mapped_features.append(feature_out)

return FeatureCollection(mapped_features)


def store_geojson_alarm(fires_alarms_dir, file_parser, idx, alarm):
"""Store the fire alarm to a geojson file."""
utc = pytz.timezone('utc')
Expand All @@ -82,6 +149,17 @@ def store_geojson_alarm(fires_alarms_dir, file_parser, idx, alarm):
'platform_name': platform_name})
output_filename = fires_alarms_dir / fname
with open(output_filename, 'w') as fpt:
geojson.dump(alarm, fpt)
dump(alarm, fpt)

return output_filename


def store_geojson(output_filename, feature_collection):
"""Store the Geojson feature collection of fire detections on disk."""
path = os.path.dirname(output_filename)
if not os.path.exists(path):
logger.info("Create directory: %s", path)
os.makedirs(path)

with open(output_filename, 'w') as fpt:
dump(feature_collection, fpt)
133 changes: 35 additions & 98 deletions activefires_pp/post_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import numpy as np
import os
from six.moves.urllib.parse import urlparse
from geojson import Feature, Point, FeatureCollection, dump

import logging
import signal
from queue import Empty
Expand All @@ -42,10 +42,13 @@
from matplotlib.path import Path
import shapely

from activefires_pp.geojson_utils import store_geojson
from activefires_pp.geojson_utils import geojson_feature_collection_from_detections
from activefires_pp.geojson_utils import map_coordinates_in_feature_collection

from activefires_pp.utils import datetime_utc2local
from activefires_pp.utils import UnitConverter
from activefires_pp.utils import get_local_timezone_offset
from activefires_pp.utils import json_serial
from activefires_pp.config import read_config
from activefires_pp.geometries_from_shapefiles import ShapeGeometry

Expand Down Expand Up @@ -254,61 +257,6 @@ def store(output_filename, detections):
return None


def geojson_feature_collection_from_detections(detections, platform_name=None):
"""Create the Geojson feature collection from fire detection data."""
if len(detections) == 0:
logger.debug("No detections to save!")
return None

# Convert points to GeoJSON
features = []
for idx in range(len(detections)):
starttime = detections.iloc[idx].starttime
endtime = detections.iloc[idx].endtime
mean_granule_time = starttime.to_pydatetime() + (endtime.to_pydatetime() -
starttime.to_pydatetime()) / 2.

prop = {'power': detections.iloc[idx].power,
'tb': detections.iloc[idx].tb,
'confidence': int(detections.iloc[idx].conf),
'observation_time': json_serial(mean_granule_time)
}

try:
prop['tb_celcius'] = detections.iloc[idx].tb_celcius
except AttributeError:
logger.debug("Failed adding the TB in celcius!")
pass
try:
prop['id'] = detections.iloc[idx].detection_id
except AttributeError:
logger.debug("Failed adding the unique detection id!")
pass

if platform_name:
prop['platform_name'] = platform_name
else:
logger.debug("No platform name specified for output")

feat = Feature(
geometry=Point(map(float, [detections.iloc[idx].longitude, detections.iloc[idx].latitude])),
properties=prop)
features.append(feat)

return FeatureCollection(features)


def store_geojson(output_filename, feature_collection):
"""Store the Geojson feature collection of fire detections on disk."""
path = os.path.dirname(output_filename)
if not os.path.exists(path):
logger.info("Create directory: %s", path)
os.makedirs(path)

with open(output_filename, 'w') as fpt:
dump(feature_collection, fpt)


def get_mask_from_multipolygon(points, geometry, start_idx=1):
"""Get mask for points from a shapely Multipolygon."""
shape = geometry.geoms[0]
Expand Down Expand Up @@ -370,6 +318,7 @@ def __init__(self, configfile, shp_borders, shp_mask, regional_filtermask=None):
self.infile_pattern = self.options.get('af_pattern_ibands')

self.outfile_pattern_national = self.options.get('geojson_file_pattern_national')
self.outfile_pattern_national_sweref99 = self.options.get('geojson_file_pattern_national_sweref99')
self.outfile_pattern_regional = self.options.get('geojson_file_pattern_regional')

# self.regional_outputs = self.options.get('geojson-regional')
Expand Down Expand Up @@ -454,6 +403,31 @@ def check_incoming_message_and_get_filename(self, msg):

return filename

def _national_save_and_publish(self, feature_collection, ndata, af_shapeff, msg, sweref99=False):
"""Take the fearure collection and store the results in a Geojson file and publish."""
if feature_collection is None:
logger.info("No geojson file created, number of fires after filtering = %d", ndata)
output_messages = self._generate_no_fires_messages(msg,
'No true fire detections inside National borders') # noqa
return

fmda = af_shapeff.metadata
if sweref99:
pout = Parser(self.outfile_pattern_national_sweref99)
else:
pout = Parser(self.outfile_pattern_national)

out_filepath = os.path.join(self.output_dir, pout.compose(fmda))
logger.debug("Output file path = %s", out_filepath)

store_geojson(out_filepath, feature_collection)
output_messages = self.get_output_messages(out_filepath, msg, ndata)

for output_msg in output_messages:
if output_msg:
logger.debug("Sending message: %s", str(output_msg))
self.publisher.send(str(output_msg))

def do_postprocessing_on_message(self, msg, filename):
"""Do the fires post processing on a message."""
platform_name = msg.data.get('platform_name')
Expand All @@ -480,26 +454,14 @@ def do_postprocessing_on_message(self, msg, filename):

# 1) Create geojson feature collection
# 2) Dump geojson data to disk

feature_collection = geojson_feature_collection_from_detections(afdata,
platform_name=af_shapeff.platform_name)

fmda = af_shapeff.metadata
pout = Parser(self.outfile_pattern_national)
out_filepath = os.path.join(self.output_dir, pout.compose(fmda))
logger.debug("Output file path = %s", out_filepath)

if feature_collection is None:
logger.info("No geojson file created, number of fires after filtering = %d", len(afdata))
output_messages = self._generate_no_fires_messages(msg,
'No true fire detections inside National borders') # noqa
else:
store_geojson(out_filepath, feature_collection)
output_messages = self.get_output_messages(out_filepath, msg, len(afdata))
self._national_save_and_publish(feature_collection, len(afdata), af_shapeff, msg)

for output_msg in output_messages:
if output_msg:
logger.debug("Sending message: %s", str(output_msg))
self.publisher.send(str(output_msg))
sweref99_fc = map_coordinates_in_feature_collection(feature_collection, '3006')
self._national_save_and_publish(sweref99_fc, len(afdata), af_shapeff, msg, sweref99=True)

# Do the regional filtering now:
if not self.regional_filtermask:
Expand Down Expand Up @@ -602,31 +564,6 @@ def fires_filtering(self, msg, af_shapeff):

return afdata_ff

# def create_output(self, data, metadata, outputs):
# """Create geojson output and return filepaths."""
# paths_and_units = []
# for item in outputs:
# for output in item:
# filepath = os.path.join(self.output_dir, item[output]['parser'].compose(metadata))
# if 'unit' in item[output]:
# paths_and_units.append({'filepath': filepath, 'unit': item[output]['unit']})
# else:
# paths_and_units.append({'filepath': filepath})

# filepaths = []
# for item in paths_and_units:
# out_filepath = item['filepath']
# logger.debug("Output file path = %s", out_filepath)
# if 'unit' in item:
# filepath = store_geojson(out_filepath, data, platform_name=metadata['platform'],
# units={'temperature': item['unit']})
# else:
# filepath = store_geojson(out_filepath, data, platform_name=metadata['platform'])

# filepaths.append(filepath)

# return filepaths

def get_output_messages(self, filepath, msg, number_of_data):
"""Generate the adequate output message(s) depending on if an output file was created or not."""
logger.info("Geojson file created! Number of fires = %d", number_of_data)
Expand Down
Loading

0 comments on commit 9068b02

Please sign in to comment.