Skip to content

Commit

Permalink
Merge 0249f13 into 46b894f
Browse files Browse the repository at this point in the history
  • Loading branch information
adybbroe committed Jul 13, 2023
2 parents 46b894f + 0249f13 commit 672023e
Show file tree
Hide file tree
Showing 9 changed files with 733 additions and 119 deletions.
30 changes: 23 additions & 7 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ jobs:
fail-fast: true
matrix:
os: ["ubuntu-latest", "macos-latest"]
python-version: ["3.9", "3.10"]
python-version: ["3.9", "3.10", "3.11"]
experimental: [false]
include:
- python-version: "3.10"
- python-version: "3.11"
os: "ubuntu-latest"
experimental: true

Expand All @@ -25,18 +25,34 @@ jobs:

steps:
- name: Checkout source
uses: actions/checkout@v2
uses: actions/checkout@v3

- name: Setup Conda Environment
uses: conda-incubator/setup-miniconda@v2
with:
miniconda-version: "latest"
miniforge-variant: Mambaforge
miniforge-version: latest
use-mamba: true
python-version: ${{ matrix.python-version }}
mamba-version: "*"
channels: conda-forge,defaults
environment-file: continuous_integration/environment.yaml
activate-environment: test-environment

- name: Set cache environment variables
shell: bash -l {0}
run: |
echo "DATE=$(date +'%Y%m%d')" >> $GITHUB_ENV
CONDA_PREFIX=$(python -c "import sys; print(sys.prefix)")
echo "CONDA_PREFIX=$CONDA_PREFIX" >> $GITHUB_ENV
- uses: actions/cache@v3
with:
path: ${{ env.CONDA_PREFIX }}
key: ${{ matrix.os }}-${{matrix.python-version}}-conda-${{ hashFiles('continuous_integration/environment.yaml') }}-${{ env.DATE }}-${{matrix.experimental}}-${{ env.CACHE_NUMBER }}
id: cache

- name: Update environment
run: mamba env update -n test-environment -f continuous_integration/environment.yaml
if: steps.cache.outputs.cache-hit != 'true'

- name: Install unstable dependencies
if: matrix.experimental == true
shell: bash -l {0}
Expand Down
235 changes: 163 additions & 72 deletions activefires_pp/post_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
#
COL_NAMES = ["latitude", "longitude", "tb", "along_scan_res", "along_track_res", "conf", "power"]

NO_FIRES_TEXT = 'No fire detections for this granule'


logger = logging.getLogger(__name__)
logging.getLogger("fiona").setLevel(logging.WARNING)
Expand Down Expand Up @@ -251,8 +253,8 @@ def store(output_filename, detections):
return None


def store_geojson(output_filename, detections, platform_name=None):
"""Store the filtered AF detections in Geojson format on disk."""
def geojson_feature_collection_from_detections(detections, platform_name=None):
"""Create the Geojson feature collection from fire detection data."""
if len(detections) == 0:
logger.debug("No detections to save!")
return None
Expand Down Expand Up @@ -280,7 +282,11 @@ def store_geojson(output_filename, detections, platform_name=None):
properties=prop)
features.append(feat)

feature_collection = FeatureCollection(features)
return FeatureCollection(features)


def store_geojson(output_filename, feature_collection):
"""Store the Geojson feature collection of fire detections on disk."""
path = os.path.dirname(output_filename)
if not os.path.exists(path):
logger.info("Create directory: %s", path)
Expand All @@ -289,8 +295,6 @@ def store_geojson(output_filename, detections, platform_name=None):
with open(output_filename, 'w') as fpt:
dump(feature_collection, fpt)

return output_filename


def get_mask_from_multipolygon(points, geometry, start_idx=1):
"""Get mask for points from a shapely Multipolygon."""
Expand Down Expand Up @@ -346,7 +350,6 @@ def __init__(self, configfile, shp_borders, shp_mask, regional_filtermask=None):
self._set_options_from_config(config)

self.host = socket.gethostname()

self.timezone = self.options.get('timezone', 'GMT')

self.input_topic = self.options['subscribe_topics'][0]
Expand All @@ -355,10 +358,14 @@ def __init__(self, configfile, shp_borders, shp_mask, regional_filtermask=None):
self.outfile_pattern_national = self.options.get('geojson_file_pattern_national')
self.outfile_pattern_regional = self.options.get('geojson_file_pattern_regional')
self.output_dir = self.options.get('output_dir', '/tmp')
self.filepath_detection_id_cache = self.options.get('filepath_detection_id_cache')

frmt = self.options['regional_shapefiles_format']
self.regional_shapefiles_globstr = globify(frmt)

self._fire_detection_id = None
self._initialize_fire_detection_id()

self.listener = None
self.publisher = None
self.loop = False
Expand Down Expand Up @@ -402,6 +409,88 @@ def signal_shutdown(self, *args, **kwargs):
"""Shutdown the Active Fires postprocessing."""
self.close()

def check_incoming_message_and_get_filename(self, msg):
"""Check the message content and return filename if okay."""
if msg.type not in ['file', 'collection', 'dataset']:
logger.debug("Message type not supported: %s", str(msg.type))
return None

filename = get_filename_from_uri(msg.data.get('uri'))
if not os.path.exists(filename):
logger.warning("File does not exist: %s", filename)
return None

file_ok = check_file_type_okay(msg.data.get('type'))
if not file_ok:
output_messages = self._generate_no_fires_messages(msg, NO_FIRES_TEXT)
for output_msg in output_messages:
logger.debug("Sending message: %s", str(output_msg))
self.publisher.send(str(output_msg))
return None

return filename

def do_postprocessing_on_message(self, msg, filename):
"""Do the fires post processing on a message."""
platform_name = msg.data.get('platform_name')
af_shapeff = ActiveFiresShapefileFiltering(filename, platform_name=platform_name,
timezone=self.timezone)
afdata = af_shapeff.get_af_data(self.infile_pattern)
if len(afdata) == 0:
output_messages = self._generate_no_fires_messages(msg, NO_FIRES_TEXT)
for output_msg in output_messages:
logger.debug("Sending message: %s", str(output_msg))
self.publisher.send(str(output_msg))
return

afdata = self.fires_filtering(msg, af_shapeff)
logger.debug("After fires_filtering...: Number of fire detections = %d", len(afdata))
if len(afdata) == 0:
logger.debug("No fires - so no regional filtering to be done!")
return

# It is here that we should add a uniue day-ID to each of the detections!
afdata = self.add_unique_day_id(afdata)
self.save_id_to_file()

# 1) Create geojson feature collection
# 2) Dump geojson data to disk
feature_collection = geojson_feature_collection_from_detections(afdata,
platform_name=af_shapeff.platform_name)

fmda = af_shapeff.metadata
pout = Parser(self.outfile_pattern_national)
out_filepath = os.path.join(self.output_dir, pout.compose(fmda))
logger.debug("Output file path = %s", out_filepath)

if feature_collection is None:
logger.info("No geojson file created, number of fires after filtering = %d", len(afdata))
output_messages = self._generate_no_fires_messages(msg,
'No true fire detections inside National borders') # noqa
else:
store_geojson(out_filepath, feature_collection)
output_messages = self.get_output_messages(out_filepath, msg, len(afdata))

for output_msg in output_messages:
if output_msg:
logger.debug("Sending message: %s", str(output_msg))
self.publisher.send(str(output_msg))

# Do the regional filtering now:
if not self.regional_filtermask:
logger.info("No regional filtering is attempted.")
return

# FIXME! If afdata is empty (len=0) then it seems all data are inside all regions!
af_shapeff = ActiveFiresShapefileFiltering(afdata=afdata, platform_name=platform_name,
timezone=self.timezone)
regional_fmask = af_shapeff.get_regional_filtermasks(self.regional_filtermask,
globstr=self.regional_shapefiles_globstr)
regional_messages = self.regional_fires_filtering_and_publishing(msg, regional_fmask, af_shapeff)
for region_msg in regional_messages:
logger.debug("Sending message: %s", str(region_msg))
self.publisher.send(str(region_msg))

def run(self):
"""Run the AF post processing."""
while self.loop:
Expand All @@ -411,60 +500,11 @@ def run(self):
except Empty:
continue
else:
if msg.type not in ['file', 'collection', 'dataset']:
logger.debug("Message type not supported: %s", str(msg.type))
continue

platform_name = msg.data.get('platform_name')
filename = get_filename_from_uri(msg.data.get('uri'))
if not os.path.exists(filename):
logger.warning("File does not exist!")
filename = self.check_incoming_message_and_get_filename(msg)
if not filename:
continue

file_ok = check_file_type_okay(msg.data.get('type'))
no_fires_text = 'No fire detections for this granule'
output_messages = self._generate_no_fires_messages(msg, no_fires_text)
if not file_ok:
for output_msg in output_messages:
logger.debug("Sending message: %s", str(output_msg))
self.publisher.send(str(output_msg))
continue

af_shapeff = ActiveFiresShapefileFiltering(filename, platform_name=platform_name,
timezone=self.timezone)
afdata = af_shapeff.get_af_data(self.infile_pattern)

if len(afdata) == 0:
logger.debug("Sending message: %s", str(output_msg))
self.publisher.send(str(output_msg))
continue

output_messages, afdata = self.fires_filtering(msg, af_shapeff)
logger.debug("After fires_filtering...: Number of messages = %d", len(output_messages))

for output_msg in output_messages:
if output_msg:
logger.debug("Sending message: %s", str(output_msg))
self.publisher.send(str(output_msg))

# Do the regional filtering now:
if not self.regional_filtermask:
logger.info("No regional filtering is attempted.")
continue

if len(afdata) == 0:
logger.debug("No fires - so no regional filtering to be done!")
continue

# FIXME! If afdata is empty (len=0) then it seems all data are inside all regions!
af_shapeff = ActiveFiresShapefileFiltering(afdata=afdata, platform_name=platform_name,
timezone=self.timezone)
regional_fmask = af_shapeff.get_regional_filtermasks(self.regional_filtermask,
globstr=self.regional_shapefiles_globstr)
regional_messages = self.regional_fires_filtering_and_publishing(msg, regional_fmask, af_shapeff)
for region_msg in regional_messages:
logger.debug("Sending message: %s", str(region_msg))
self.publisher.send(str(region_msg))
self.do_postprocessing_on_message(msg, filename)

def regional_fires_filtering_and_publishing(self, msg, regional_fmask, afsff_obj):
"""From the regional-fires-filter-mask and the fire detection data send regional messages."""
Expand All @@ -489,13 +529,18 @@ def regional_fires_filtering_and_publishing(self, msg, regional_fmask, afsff_obj
out_filepath = os.path.join(self.output_dir, pout.compose(fmda))
logger.debug("Output file path = %s", out_filepath)
data_in_region = afdata[regional_fmask[region_name]['mask']]
filepath = store_geojson(out_filepath, data_in_region, platform_name=fmda['platform'])
if not filepath:

# filepath = store_geojson(out_filepath, data_in_region, platform_name=fmda['platform'])
feature_collection = geojson_feature_collection_from_detections(data_in_region,
platform_name=fmda['platform'])
if feature_collection is None:
logger.warning("Something wrong happended storing regional " +
"data to Geojson - area: {name}".format(name=str(region_name)))
continue

outmsg = self._generate_output_message(filepath, msg, regional_fmask[region_name])
store_geojson(out_filepath, feature_collection)

outmsg = self._generate_output_message(out_filepath, msg, regional_fmask[region_name])
output_messages.append(outmsg)
logger.info("Geojson file created! Number of fires in region = %d", len(data_in_region))

Expand Down Expand Up @@ -531,20 +576,12 @@ def fires_filtering(self, msg, af_shapeff):
afdata_ff = af_shapeff.get_af_data()
logger.debug("After fires_filtering: Number of fire detections left: %d", len(afdata_ff))

filepath = store_geojson(out_filepath, afdata_ff, platform_name=af_shapeff.platform_name)
out_messages = self.get_output_messages(filepath, msg, len(afdata_ff))

return out_messages, afdata_ff
return afdata_ff

def get_output_messages(self, filepath, msg, number_of_data):
"""Generate the adequate output message(s) depending on if an output file was created or not."""
if filepath:
logger.info("geojson file created! Number of fires after filtering = %d", number_of_data)
return [self._generate_output_message(filepath, msg)]
else:
logger.info("No geojson file created, number of fires after filtering = %d", number_of_data)
return self._generate_no_fires_messages(msg,
'No true fire detections inside National borders')
logger.info("Geojson file created! Number of fires = %d", number_of_data)
return [self._generate_output_message(filepath, msg)]

def _generate_output_message(self, filepath, input_msg, region=None):
"""Create the output message to publish."""
Expand Down Expand Up @@ -574,6 +611,60 @@ def _check_borders_shapes_exists(self):
if not os.path.exists(self.shp_borders):
raise OSError("Shape file does not exist! Filename = %s" % self.shp_borders)

def _initialize_fire_detection_id(self):
"""Initialize the fire detection ID."""
if self.filepath_detection_id_cache and os.path.exists(self.filepath_detection_id_cache):
self._fire_detection_id = self.get_id_from_file()
else:
self._fire_detection_id = {'date': datetime.utcnow(), 'counter': 0}

def update_fire_detection_id(self):
"""Update the fire detection ID registry."""
now = datetime.utcnow()
tdelta = now - self._fire_detection_id['date']
if tdelta.total_seconds() > 24*3600:
self._initialize_fire_detection_id()
elif tdelta.total_seconds() > 0 and self._fire_detection_id['date'].day != now.day:
self._initialize_fire_detection_id()

self._fire_detection_id['counter'] = self._fire_detection_id['counter'] + 1

def save_id_to_file(self):
"""Save the (current) detection id on disk."""
with open(self.filepath_detection_id_cache, 'w') as fpt:
id_ = self._create_id_string()
fpt.write(id_)

def get_id_from_file(self):
"""Read the latest stored detection id string from disk and convert to internal format."""
with open(self.filepath_detection_id_cache, 'r') as fpt:
idstr = fpt.read()

return self._get_id_from_string(idstr)

def _get_id_from_string(self, idstr):
"""Get the detection id from string."""
datestr, counter = idstr.split('-')
return {'date': datetime.strptime(datestr, '%Y%m%d'),
'counter': int(counter)}

def _create_id_string(self):
"""From the internal fire detection id create the id string to be exposed to the user."""
return (self._fire_detection_id['date'].strftime('%Y%m%d') +
'-' + str(self._fire_detection_id['counter']))

def add_unique_day_id(self, afdata):
"""Add a unique detection id - date + a running number for the day."""
# Add id's to the detections:
id_list = []
for _i in range(len(afdata)):
self.update_fire_detection_id()
id_ = self._create_id_string()
id_list.append(id_)

afdata['detection_id'] = id_list
return afdata

def close(self):
"""Shutdown the Active Fires postprocessing."""
logger.info('Terminating Active Fires post processing.')
Expand Down
Loading

0 comments on commit 672023e

Please sign in to comment.