Skip to content

Commit

Permalink
Merge pull request #22 from wpreimes/parallel_img2ts
Browse files Browse the repository at this point in the history
Fix logging issue
  • Loading branch information
wpreimes committed Jan 17, 2024
2 parents cad72d1 + 503cc2b commit 0ebd632
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 35 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Changelog
Unreleased changes in master branch
===================================

-
- Added option to parallelize Img2Ts process

Version 0.10
============
Expand Down
17 changes: 12 additions & 5 deletions src/repurpose/img2ts.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ class that uses the read_img iterator of the input_data dataset
Default: True
n_proc: int, optional (default: 1)
Number of parallel processes. Multiprocessing is only used when
`n_proc` > 1. Applies to data reading and writing.
`n_proc` > 1. Applies to data reading and writing. Should be chosen
according to the file connection. A slow connection might be overloaded
by too many processes trying to read data (e.g. network).
If unsure, better leave this at 1.
"""

def __init__(self,
Expand Down Expand Up @@ -193,6 +196,9 @@ def __init__(self,

self.n_proc = n_proc

self.log_filename = \
f"img2ts_{datetime.now().strftime('%Y%m%d%H%M')}.log"

def _read_image(self, date, target_grid):
"""
Function to parallelize reading image data from dataset.
Expand All @@ -214,7 +220,6 @@ def _read_image(self, date, target_grid):
orthogonal: bool
Whether the image fits the orthogonal time series format or not.
"""

# optional on-the-fly spatial resampling
resample_kwargs = {
'methods': self.r_methods,
Expand Down Expand Up @@ -383,8 +388,8 @@ def _write_non_orthogonal(self,
dataout.add_global_attr(
'geospatial_lon_max', np.max(cell_lons))

# for this dataset we have to loop through the gpis since each time series
# can be different in length
# for this dataset we have to loop through the gpis since each
# time series can be different in length
for i, (gpi, gpi_lon, gpi_lat) in enumerate(
zip(cell_gpis, cell_lons, cell_lats)):
gpi_data = {}
Expand Down Expand Up @@ -433,7 +438,7 @@ def calc(self):
os.path.join(self.outputpath, self.gridname), self.target_grid)

for img_stack_dict, timestamps in self.img_bulk():
# ==================================================================
# =================================================================
start_time = datetime.now()

# temporally drop grids, due to issue when pickling them...
Expand Down Expand Up @@ -500,6 +505,7 @@ def calc(self):
ITER_KWARGS=ITER_KWARGS,
STATIC_KWARGS=STATIC_KWARGS,
log_path=os.path.join(self.outputpath, '000_log'),
log_filename=self.log_filename,
loglevel="INFO",
ignore_errors=True,
n_proc=self.n_proc,
Expand Down Expand Up @@ -557,6 +563,7 @@ def img_bulk(self):
STATIC_KWARGS={'target_grid': target_grid},
show_progress_bars=False,
log_path=os.path.join(self.outputpath, '000_log'),
log_filename=self.log_filename,
loglevel="INFO",
ignore_errors=True,
n_proc=self.n_proc,
Expand Down
68 changes: 41 additions & 27 deletions src/repurpose/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ class ImageBaseConnection:
This protects against processing gaps due to e.g. temporary network issues.
"""

def __init__(self, reader, max_retries=20, retry_delay_s=10):
def __init__(self, reader, max_retries=99, retry_delay_s=1):
"""
Parameters
----------
reader: MultiTemporalImageBase
Reader object for which the filelist is created
max_retries: int
max_retries: int, optional (default: 10)
Number of retries when a file is in the filelist but reading
fails.
retry_delay_s: int
retry_delay_s: int, optional (default: 1)
Number of seconds to wait after each failed retry.
"""
self.reader = reader
Expand All @@ -61,36 +61,44 @@ def grid(self):
def tstamps_for_daterange(self, *args, **kwargs):
return self.reader.tstamps_for_daterange(*args, **kwargs)

def _gen_filelist(self):
return glob(os.path.join(self.reader.path, '**'), recursive=True)
def _gen_filelist(self) -> list:
flist = glob(os.path.join(self.reader.path, '**'), recursive=True)
return flist

def read(self, timestamp, **kwargs):
retries = 0
retry = 0
img = None
error = None
while img is None and retries <= self.max_retries:
filename = None
filename = None

while (img is None) and (retry <= self.max_retries):
try:
filename = self.reader._build_filename(timestamp)
if filename is None:
filename = self.reader._build_filename(timestamp)
img = self.reader.read(timestamp, **kwargs)
except Exception as e:
logging.error(f"Error reading file (try {retry+1}) "
f"at {timestamp}: {e}. "
f"Trying again.")
if filename is not None:
if filename not in self.filelist:
logging.error(
f"File at {timestamp} does not exist.")
break
else:
img = None
error = e
time.sleep(self.retry_delay_s)
# else:
img = None
error = e
time.sleep(self.retry_delay_s)

retries += 1
retry += 1

if img is None:
raise IOError(f"Reading file {filename} failed even after "
f"{retries} retries: {error}")
logging.error(f"Reading file at {timestamp} failed after "
f"{retry} retries: {error}")
else:
logging.info(f"Success reading {filename} after {retries} "
f"retries")
return img
logging.info(f"Success reading {filename} after {retry} "
f"tries.")
return img


def rootdir() -> Path:
Expand Down Expand Up @@ -125,6 +133,7 @@ def parallel_process_async(
ignore_errors=False,
activate_logging=True,
log_path=None,
log_filename=None,
loglevel="WARNING",
verbose=False,
progress_bar_label="Processed"
Expand Down Expand Up @@ -163,6 +172,9 @@ def parallel_process_async(
If False, no logging is done at all (neither to file nor to stdout).
log_path: str, optional (default: None)
If provided, a log file is created in the passed directory.
log_filename: str, optional (default: None)
Name of the logfile in `log_path to create. If None is chosen, a name
is created automatically. If `log_path is None, this has no effect.
loglevel: str, optional (default: "WARNING")
Log level to use for logging. Must be one of
["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"].
Expand All @@ -178,22 +190,23 @@ def parallel_process_async(
"""
if activate_logging:
logger = logging.getLogger()
streamHandler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)

if STATIC_KWARGS is None:
STATIC_KWARGS = dict()

if verbose:
streamHandler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.setLevel('DEBUG')
logger.addHandler(streamHandler)

if log_path is not None:
log_file = os.path.join(
log_path,
f"{FUNC.__name__}_{datetime.now().strftime('%Y%m%d%H%M')}.log")
if log_filename is None:
d = datetime.now().strftime('%Y%m%d%H%M')
log_filename = f"{FUNC.__name__}_{d}.log"
log_file = os.path.join(log_path, log_filename)
else:
log_file = None

Expand All @@ -204,6 +217,7 @@ def parallel_process_async(
level=loglevel.upper(),
format="%(levelname)s %(asctime)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
force=True,
)
else:
logger = None
Expand Down Expand Up @@ -280,9 +294,9 @@ def error(e) -> None:
logger.handlers.clear()

handlers = logger.handlers[:]
handlers.clear()
for handler in handlers:
logger.removeHandler(handler)
handler.close()
handlers.clear()

return results
2 changes: 0 additions & 2 deletions tests/test_img2ts.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@
from pygeogrids import BasicGrid
from pygeogrids.netcdf import load_grid
from pynetcf.time_series import OrthoMultiTs, GriddedNcIndexedRaggedTs, GriddedNcOrthoMultiTs
from glob import glob
import xarray as xr
import pytest

import tempfile
import numpy.testing as nptest
Expand Down

0 comments on commit 0ebd632

Please sign in to comment.