for use with NDVI product from LTDR raw dataset

- Prepares list of all files
- Builds list of day files to process
- Processes day files
- Builds list of day files to aggregate to months
- Run month aggregation
- Builds list of month files to aggregate to years
- Run year aggregation

example LTDR product file names (ndvi product code is AVH13C1)

AVH13C1.A1981181.N07.004.2013227210959.hdf

split file name by "."
eg:

full file name - "AVH13C1.A1981181.N07.004.2013227210959.hdf"

0     product code        AVH13C1
1     date of image       A1981181
2     sensor code         N07
3     misc                004
4     processed date      2013227210959
5     extension           hdf

In [41]:
import os
import re
import csv
import ssl
import sys
import json
import hashlib
from io import StringIO
from pathlib import Path
from itertools import chain
from datetime import datetime
from urllib.parse import urljoin
from collections import OrderedDict
from configparser import ConfigParser
from typing import Any, Generator, List, Literal, Tuple, Type, Union

In [42]:
import rasterio
from rasterio.crs import CRS
import requests
import numpy as np
import pandas as pd
from typing import Optional

TODO: check on line below, should have been set up by dockerfile

In [43]:
conda install -c conda-forge gdal

Collecting package metadata (current_repodata.json): done
Solving environment: done


  current version: 23.7.4
  latest version: 24.1.0

Please update conda by running

    $ conda update -n base -c defaults conda

Or to minimize the number of packages updated during conda update use

     conda install conda=24.1.0



# All requested packages already installed.


Note: you may need to restart the kernel to use updated packages.


In [44]:
from osgeo import gdal, osr

In [45]:
from dataset import Dataset

In [46]:
def get_config_dict(config_file="config.ini"):
    config = ConfigParser()
    config.read(config_file)
    token = config["main"]["token"]

    return {
        token: config["main"]["token"],
        "years": [int(y) for y in config["main"]["years"].split(", ")],
        "raw_dir": Path(config["main"]["raw_dir"]),
        "output_dir": Path(config["main"]["output_dir"]),
        "overwrite_download": config["main"].getboolean("overwrite_download"),
        "validate_download": config["main"].getboolean("validate_download"),
        "overwrite_processing": config["main"].getboolean("overwrite_processing"),
        "backend": config["run"]["backend"],
        "task_runner": config["run"]["task_runner"],
        "run_parallel": config["run"].getboolean("run_parallel"),
        "max_workers": int(config["run"]["max_workers"]),
        "log_dir": Path(config["main"]["raw_dir"]) / "logs"
    }

In [47]:
config = ConfigParser()
config.read("config.ini")
token = config["main"]["token"]
years = [int(y) for y in config["main"]["years"].split(", ")]
raw_dir = Path(config["main"]["raw_dir"])
output_dir = Path(config["main"]["output_dir"])
overwrite_download = config["main"].getboolean("overwrite_download")
validate_download = config["main"].getboolean("validate_download")
overwrite_processing = config["main"].getboolean("overwrite_processing")
backend = config["run"]["backend"]
task_runner = config["run"]["task_runner"]
run_parallel = config["run"].getboolean("run_parallel")
max_workers = int(config["run"]["max_workers"])
log_dir = Path(config["main"]["raw_dir"]) / "logs"
build_list = ["daily", "monthly","yearly"]
auth_headers = { "Authorization": f"Bearer {token}" }
name = "Long-term Data Record NDVI"

In [48]:
 def get_logger():
        """
        This function will return a logger that implements the Python logging API:
        https://docs.python.org/3/library/logging.html

        If you are using Prefect, the logs will be managed by Prefect
        """
        if backend == "prefect":
            from prefect import get_run_logger
            return get_run_logger()
        else:
            return logging.getLogger("dataset")

In [49]:
dataset_url = "https://ladsweb.modaps.eosdis.nasa.gov/api/v2/content/details/allData/465/"
sensors = [
            "N07_AVH13C1",
            "N09_AVH13C1",
            "N11_AVH13C1",
            "N14_AVH13C1",
            "N16_AVH13C1",
            "N18_AVH13C1",
            "N19_AVH13C1",
        ]

In [50]:
def build_sensor_download_list(sensor: str):
        logger = get_logger()

        # generates dictionaries that represent each entry in a directory
        def dir_contents(dir_url: str) -> List[dict]:
            logger.debug(f"Fetching {dir_url}")
            description: dict = json.loads(requests.get(dir_url).content)
            return description["content"]

        # validates md5 hash of a file
        def validate(filepath: Union[str, os.PathLike], md5: str) -> bool:
            with open(filepath, "rb") as chk:
                data = chk.read()
                return md5 == hashlib.md5(data).hexdigest()

        # this is what we'll return
        # list of tuples, each including:
        #   1. a boolean "does the file need to be downloaded?"
        #   2. another tuple: (url_of_download, dst_path_of_download)
        download_list: List[Tuple[bool, Tuple[str, Type[Path]]]] = []

        sensor_dir: str = urljoin(dataset_url, sensor)
        # for each year the sensor collected data
        for year_details in dir_contents(sensor_dir):
            # is this a year we'd like data from?
            if int(year_details["name"]) in years:
                year_dir: str = "/".join([sensor_dir, year_details["name"]])
                # for each day the sensor collected data in this year
                for day_details in dir_contents(year_dir):
                    day_dir: str = "/".join([year_dir, day_details["name"]])
                    # for each file the sensor created for this day
                    for file_detail in dir_contents(day_dir):
                        day_download_url: str = file_detail["downloadsLink"]
                        dst = raw_dir / sensor / year_details["name"] / day_details["name"] / file_detail["name"]
                        # if file is already downloaded, and we aren't in overwrite mode
                        if dst.exists() and not overwrite_download:
                            if validate_download:
                                if validate(dst, file_detail["md5sum"]):
                                    logger.info(f"File validated: {dst.as_posix()}")
                                    download_list.append((False, (day_download_url, dst)))
                                else:
                                    logger.info(f"File validation failed, queuing for download: {dst.as_posix()}")
                                    download_list.append((True, (day_download_url, dst)))
                            else:
                                logger.info(f"File exists, skipping: {dst.as_posix()}")
                                download_list.append((False, (day_download_url, dst)))
                        else:
                            logger.info(f"Queuing for download: {day_download_url}")
                            download_list.append((True, (day_download_url, dst)))
        return download_list

In [51]:
def tmp_to_dst_file(final_dst, tmp_dir=None):
        logger = get_logger()
        with TemporaryDirectory(dir=tmp_dir) as tmp_sub_dir:
            tmp_file = mkstemp(dir=tmp_sub_dir)[1]
            logger.debug(f"Created temporary file {tmp_file} with final destination {str(final_dst)}")
            yield tmp_file
            try:
                shutil.move(tmp_file, final_dst)
            except:
                logger.exception(f"Failed to transfer temporary file {tmp_file} to final destination {str(final_dst)}")
            else:
                logger.debug(f"Successfully transferred {tmp_file} to final destination {str(final_dst)}")

In [52]:
def download(src_url: str, final_dst_path: Union[str, os.PathLike]) -> None:
        logger = dataset.get_logger()
        logger.info(f"Downloading {str(final_dst_path)}...")
        with requests.get(src_url, headers=auth_headers, stream=True) as src:
            src.raise_for_status()
            with tmp_to_dst_file(final_dst_path) as dst_path:
                with open(dst_path, "wb") as dst:
                    for chunk in src.iter_content(chunk_size=8192):
                        dst.write(chunk)

In [53]:
def build_process_list(downloaded_files):

        # filter options to accept/deny based on sensor, year
        # all values must be strings
        # do not enable/use both accept/deny for a given field

        ops = {
            "use_sensor_accept": False,
            "sensor_accept": [],
            "use_sensor_deny": False,
            "sensor_deny": [],
            "use_year_accept": True,
            "year_accept": ["2019", "2020"],
            "use_year_deny": False,
            "year_deny": ["2019"]
        }

        df_dict_list = []

        for input_path in downloaded_files:
            items = input_path.stem.split(".")
            year = items[1][1:5]
            day = items[1][5:8]
            sensor = items[2]
            month = "{0:02d}".format(datetime.strptime(f"{year}+{day}", "%Y+%j").month)
            output_path = output_dir / "daily" / f"avhrr_ndvi_v5_{sensor}_{year}_{day}.tif"
            df_dict_list.append({
                "input_path": input_path,
                "sensor": sensor,
                "year": year,
                "month": month,
                "day": day,
                "year_month": year+"_"+month,
                "year_day": year+"_"+day,
                "output_path": output_path
            })

        df = pd.DataFrame(df_dict_list).sort_values(by=["input_path"])

        # df = df.drop_duplicates(subset="year_day", take_last=True)
        sensors = sorted(list(set(df["sensor"])))
        years = sorted(list(set(df["year"])))
        filter_sensors = None
        if ops['use_sensor_accept']:
            filter_sensors = [i for i in sensors if i in ops['sensor_accept']]
        elif ops['use_sensor_deny']:
            filter_sensors = [i for i in sensors if i not in ops['sensor_deny']]
        if filter_sensors:
            df = df.loc[df["sensor"].isin(filter_sensors)]
        filter_years = None
        if ops['use_year_accept']:
            filter_years = [i for i in years if i in ops['year_accept']]
        elif ops['use_year_deny']:
            filter_years = [i for i in years if i not in ops['year_deny']]
        if filter_years:
            df = df.loc[df["year"].isin(filter_years)]
        return df

In [54]:
def create_mask(qa_array, mask_vals):
        qa_mask_vals = [abs(x - 15) for x in mask_vals]
        mask_bin_array = [0] * 16
        for x in qa_mask_vals:
            mask_bin_array[x] = 1
        mask_bin = int("".join(map(str, mask_bin_array)), 2)

        flag = lambda i: (i & 65535 & mask_bin) != 0

        qa_mask = pd.DataFrame(qa_array).applymap(flag).to_numpy()
        return qa_mask

In [55]:
def process_daily_data(src, output_path):
            """
            Process input raster and create output in output directory

            Unpack NDVI subdataset from a HDF container
            Reproject to EPSG:4326
            Set values <0 (other than nodata) to 0
            Write to COG

            Parts of code pulled from:

            https://gis.stackexchange.com/questions/174017/extract-scientific-layers-from-modis-hdf-dataeset-using-python-gdal
            https://gis.stackexchange.com/questions/42584/how-to-call-gdal-translate-from-python-code
            https://stackoverflow.com/questions/10454316/how-to-project-and-resample-a-grid-to-match-another-grid-with-gdal-python/10538634#10538634
            https://jgomezdans.github.io/gdal_notes/reprojection.html

            Notes:

            Rebuilding geotransform is not really necessary in this case but might
            be useful for future data prep scripts that can use this as startng point.

            """

            logger = get_logger()

            year = src.name.split(".")[1][1:5]
            day = src.name.split(".")[1][5:8]
            sensor = src.name.split(".")[2]

            if output_path.exists() and not overwrite_processing:
                logger.info(f"Skipping day, already processed: {sensor} {year} {day}")
            else:
                logger.info(f"Processing day: {sensor} {year} {day}")

                # list of qa fields and bit numbers
                # https://ltdr.modaps.eosdis.nasa.gov/ltdr/docs/AVHRR_LTDR_V5_Document.pdf

                qa_bits = {
                    15: "Polar flag: latitude > 60deg (land) or > 50deg (ocean)",
                    14: "BRDF-correction issues",
                    13: "RHO3 value is invalid",
                    12: "Channel 5 value is invalid",
                    11: "Channel 4 value is invalid",
                    10: "Channel 3 value is invalid",
                    9: "Channel 2 (NIR) value is invalid",
                    8: "Channel 1 (visible) value is invalid",
                    7: "Channel 1-5 are invalid",
                    6: "Pixel is at night (high solar zenith angle)",
                    5: "Pixel is over dense dark vegetation",
                    4: "Pixel is over sun glint",
                    3: "Pixel is over water",
                    2: "Pixel contains cloud shadow",
                    1: "Pixel is cloudy",
                    0: "Unused"
                }

                # qa_mask_vals = [15, 9, 8, 6, 4, 3, 2, 1]
                qa_mask_vals = [15, 9, 8, 1]

                ndvi_gdal_path = f"HDF4_EOS:EOS_GRID:\"{src.as_posix()}\":Grid:NDVI"
                qa_gdal_path = f"HDF4_EOS:EOS_GRID:\"{src.as_posix()}\":Grid:QA"

                # open data subdataset
                with rasterio.open(ndvi_gdal_path) as ndvi_src:
                    ndvi_array = ndvi_src.read(1)

                    # open quality assurance subdataset
                    with rasterio.open(qa_gdal_path) as qa_src:
                        qa_array = qa_src.read(1)

                        # create mask array using our chosen mask values
                        qa_mask = create_mask(qa_array, qa_mask_vals)

                        # apply mask to dataset
                        ndvi_array[qa_mask] = -9999

                    ndvi_array[np.where((ndvi_array < 0) & (ndvi_array > -9999))] = 0
                    ndvi_array[np.where(ndvi_array > 10000)] = 10000

                    profile = {
                        "count": 1,
                        "driver": "COG",
                        "compress": "LZW",
                        "dtype": "int16",
                        "nodata": -9999,
                        "height": 3600,
                        "width": 7200,
                        "crs": CRS.from_epsg(4326),
                        "transform": ndvi_src.transform,
                    }

                    with tmp_to_dst_file(output_path) as dst_path:
                        with rasterio.open(dst_path, "w", **profile) as dst:
                            # for some reason rasterio raises an exception if we don't specify that there is one index
                            dst.write(ndvi_array, indexes=1)

In [56]:
def process_monthly_data(year_month, month_files, month_path):
            logger = get_logger()
            if os.path.exists(month_path) and not overwrite_processing:
                logger.info(f"Skipping month, already processed: {year_month}")
            else:
                logger.info(f"Processing month: {year_month}")
                data, meta = aggregate_rasters(file_list=month_files, method="max")
                write_raster(month_path, data, meta)

In [57]:
def aggregate_rasters(file_list, method="mean"):
            """
            Aggregate multiple rasters

            Aggregates multiple rasters with same features (dimensions, transform,
            pixel size, etc.) and creates single layer using aggregation method
            specified.

            Supported methods: mean (default), max, min, sum

            Arguments
                file_list (list): list of file paths for rasters to be aggregated
                method (str): method used for aggregation

            Return
                result: rasterio Raster instance
            """
            logger = get_logger()
            store = None
            for ix, file_path in enumerate(file_list):

                try:
                    raster = rasterio.open(file_path)
                except:
                    logger.error(f"Could not include file in aggregation ({str(file_path)})")
                    continue

                active = raster.read(masked=True)

                if store is None:
                    store = active.copy()

                else:
                    # make sure dimensions match
                    if active.shape != store.shape:
                        raise Exception("Dimensions of rasters do not match")

                    if method == "max":
                        store = np.ma.array((store, active)).max(axis=0)

                        # non masked array alternatives
                        # store = np.maximum.reduce([store, active])
                        # store = np.vstack([store, active]).max(axis=0)

                    elif method == "mean":
                        if ix == 1:
                            weights = (~store.mask).astype(int)

                        store = np.ma.average(np.ma.array((store, active)), axis=0, weights=[weights, (~active.mask).astype(int)])
                        weights += (~active.mask).astype(int)

                    elif method == "min":
                        store = np.ma.array((store, active)).min(axis=0)

                    elif method == "sum":
                        store = np.ma.array((store, active)).sum(axis=0)

                    else:
                        raise Exception("Invalid method")

            store = store.filled(raster.nodata)
            return store, raster.profile

In [58]:
def write_raster(path, data, meta):
        logger = get_logger()
        os.makedirs(os.path.dirname(path), exist_ok=True)
        meta["dtype"] = data.dtype
        with tmp_to_dst_file(path) as write_path:
            with rasterio.open(write_path, "w", **meta) as result:
                try:
                    result.write(data)
                except:
                    logger.exception("Error writing raster to {path}")

In [59]:
def process_yearly_data(year, year_files, year_path):
        logger = get_logger()
        if os.path.exists(year_path) and not overwrite_processing:
            logger.info(f"Skipping year, already processed: {year}")
        else:
            logger.info(f"Processing year: {year}")
            data, meta = aggregate_rasters(file_list=year_files, method="mean")
            write_raster(year_path, data, meta)

In [60]:
def init_retries(cur_retries: int, cur_retry_delay: int, save_settings: bool=False):
        """
        Given a number of task retries and a retry_delay,
        checks to make sure those values are valid
        (ints greater than or equal to zero), and
        optionally sets class variables to keep their
        settings
        """
        if isinstance(retries, int):
            if cur_retries < 0:
                raise ValueError("Number of task retries must be greater than or equal to zero")
            elif save_settings:
                retries = cur_retries
        elif retries is None:
            cur_retries = retries
        else:
            raise TypeError("retries must be an int greater than or equal to zero")

        if isinstance(cur_retry_delay, int):
            if cur_retry_delay < 0:
                raise ValueError("Retry delay must be greater than or equal to zero")
            elif save_settings:
                retry_delay = cur_retry_delay
        elif cur_retry_delay is None:
            cur_retry_delay = retry_delay
        else:
            raise TypeError("retry_delay must be an int greater than or equal to zero, representing the number of seconds to wait before retrying a task")

        return cur_retries, cur_retry_delay

In [61]:
def error_wrapper(func, args):
        """
        This is the wrapper that is used when running individual tasks
        It will always return a TaskResult!
        """
        logger = get_logger()

        for try_no in range(retries + 1):
            try:
                return TaskResult(0, "Success", args, func(*args))
            except Exception as e:
                if try_no < retries:
                    logger.error(f"Task failed with exception (retrying): {repr(e)}")
                    time.sleep(retry_delay)
                    continue
                else:
                    logger.error(f"Task failed with exception (giving up): {repr(e)}")
                    return TaskResult(1, repr(e), args, None)

In [62]:
 def run_serial_tasks(name, func, input_list):
        """
        Run tasks in serial (locally), given a function and list of inputs
        This will always return a list of TaskResults!
        """
        logger = get_logger()
        logger.debug(f"run_serial_tasks - input_list: {input_list}")
        return [error_wrapper(func, i) for i in input_list]

TODO: fix chunksize

In [63]:
chunksize = 1
def run_concurrent_tasks(name, func, input_list, force_sequential):
        """
        Run tasks concurrently (locally), given a function a list of inputs
        This will always return a list of TaskResults!
        """
        pool_size = 1 if force_sequential else 10
        with multiprocessing.Pool(pool_size) as pool:
            results = pool.starmap(error_wrapper, [(func, i) for i in input_list], chunksize=chunksize)
        return results

In [64]:
def run_tasks(func,
                  input_list,
                  allow_futures: bool=True,
                  name: Optional[str]=None,
                  retries: Optional[int]=3,
                  retry_delay: Optional[int]=60,
                  force_sequential: bool=False,
                  force_serial: bool=False):
        """
        Run a bunch of tasks, calling one of the above run_tasks functions
        This is the function that should be called most often from self.main()
        It will return a ResultTuple of TaskResults
        """

        timestamp = datetime.today()

        if not callable(func):
            raise TypeError("Function passed to run_tasks is not callable")

        # Save global retry settings, and override with current values
        old_retries, old_retry_delay = retries, retry_delay
        retries, retry_delay = init_retries(retries, retry_delay)

        logger = get_logger()

        if name is None:
            try:
                name = func.__name__
            except AttributeError:
                logger.warning("No name given for task run, and function does not have a name (multiple unnamed functions may result in log files being overwritten)")
                name = "unnamed"
        elif not isinstance(name, str):
            raise TypeError("Name of task run must be a string")

        if backend == "serial" or force_serial:
            results = run_serial_tasks(name, func, input_list)
        elif backend == "concurrent":
            results = run_concurrent_tasks(name, func, input_list, force_sequential)
        elif backend == "prefect":
            results = run_prefect_tasks(name, func, input_list, force_sequential)
        else:
            raise ValueError("Requested backend not recognized. Have you called this Dataset's run function?")

        if len(results) == 0:
            raise ValueError(f"Task run {name} yielded no results. Did it receive any inputs?")

        success_count = sum(1 for r in results if r.status_code == 0)
        error_count = len(results) - success_count
        if error_count == 0:
            logger.info(f"Task run {name} completed with {success_count} successes and no errors")
        else:
            logger.warning(f"Task run {name} completed with {error_count} errors and {success_count} successes")

        # Restore global retry settings
        retries, retry_delay = old_retries, old_retry_delay

        return ResultTuple(results, name, timestamp)

NameError: name 'Optional' is not defined

In [40]:
def run_prefect_tasks(name, func, input_list, force_sequential):
        """
        Run tasks using Prefect, using whichever task runner decided in self.run()
        This will always return a list of TaskResults!
        """

        from prefect import task
        logger = get_logger()

        task_wrapper = task(func, name=name, retries=retries, retry_delay_seconds=retry_delay, persist_result=True)

        futures = []
        for i in input_list:
            w = [i[1] for i in futures] if force_sequential else None
            futures.append((i, task_wrapper.submit(*i, wait_for=w, return_state=False)))

        results = []


        states = [(i[0], i[1].wait()) for i in futures]

        while states:
            for ix, (inputs, state) in enumerate(states):
                if state.is_completed():
                    # print('complete', ix, inputs)
                    logger.info(f'complete - {ix} - {inputs}')

                    results.append(TaskResult(0, "Success", inputs, state.result()))
                elif state.is_failed() or state.is_crashed() or state.is_cancelled():
                    # print('fail', ix, inputs)
                    logger.info(f'fail - {ix} - {inputs}')

                    try:
                        msg = repr(state.result(raise_on_failure=True))
                    except Exception as e:
                        msg = f"Unable to retrieve error message - {e}"
                    results.append(TaskResult(1, msg, inputs, None))
                else:
                    # print('not ready', ix, inputs)
                    continue
                _ = states.pop(ix)
            time.sleep(5)


        # for inputs, future in futures:
        #     state = future.wait(60*60*2)
        #     if state.is_completed():
        #         results.append(TaskResult(0, "Success", inputs, state.result()))
        #     elif state.is_failed() or state.is_crashed():
        #         try:
        #             msg = repr(state.result(raise_on_failure=False))
        #         except:
        #             msg = "Unable to retrieve error message"
        #         results.append(TaskResult(1, msg, inputs, None))
        #     else:
        #         pass

        # while futures:
        #     for ix, (inputs, future) in enumerate(futures):
        #         state = future.get_state()
        #         # print(repr(state))
        #         # print(repr(future))
        #         if state.is_completed():
        #             print('complete', ix, inputs)
        #             results.append(TaskResult(0, "Success", inputs, future.result()))
        #         elif state.is_failed() or state.is_crashed() or state.is_cancelled():
        #             print('fail', ix, inputs)
        #             try:
        #                 msg = repr(future.result(raise_on_failure=True))
        #             except Exception as e:
        #                 msg = f"Unable to retrieve error message - {e}"
        #             results.append(TaskResult(1, msg, inputs, None))
        #         else:
        #             # print('not ready', ix, inputs)
        #             continue
        #         _ = futures.pop(ix)
        #         # future.release()
        #     time.sleep(5)

        return results

build download list

In [None]:
raw_file_list = run_tasks(build_sensor_download_list, [[s] for s in sensors])

We have a list of lists (from each sensor), merge them into one

In [None]:
file_list = [i for i in chain(*raw_file_list.results())]

Extract list of files to download from file_list

In [None]:
download_list = [i[1] for i in file_list if i[0]]

Download data

In [None]:
if len(download_list) > 0:
    run_tasks(download, download_list).results()

Make a list of all daily files, regardless of how the downloads went

In [None]:
day_files = [i[1][1] for i in file_list]

Build day dataframe

In [None]:
day_df = build_process_list(day_files)

build month dataframe
Using pandas "named aggregation" to make ensure predictable column names in output.
See bottom of this page:
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.core.groupby.DataFrameGroupBy.aggregate.html
see also https://pandas.pydata.org/pandas-docs/stable/user_guide/groupby.html#groupby-aggregate-named

In [None]:
month_df = day_df[["output_path", "year", "year_month"]].groupby("year_month", as_index=False).aggregate(
            day_path_list = pd.NamedAgg(column="output_path",   aggfunc=lambda x: tuple(x)),
            count =         pd.NamedAgg(column="output_path",   aggfunc="count"),
            year =          pd.NamedAgg(column="year",          aggfunc="last")
        )
minimum_days_in_month = 20
month_df = month_df.loc[month_df["count"] >= minimum_days_in_month]
month_df["output_path"] = month_df.apply(
            lambda x: (output_dir / "monthly/avhrr_ndvi_v5_{}.tif".format(x["year_month"])).as_posix(), axis=1
        )

build year dataframe

In [None]:
year_df = month_df[["output_path", "year"]].groupby("year", as_index=False).aggregate({
            "output_path": [lambda x: tuple(x), "count"]
        })
year_df.columns = ["year", "month_path_list", "count"]

year_df["output_path"] = year_df["year"].apply(
            lambda x: (output_dir / f"yearly/avhrr_ndvi_v5_{x}.tif").as_posix()
        )

Make _qlist arrays, which are handled by prep_xxx_data functions as lists of tasks

In [None]:
day_qlist = []
for _, row in day_df.iterrows():
    day_qlist.append([row["input_path"], row["output_path"]])

month_qlist = []
for _, row in month_df.iterrows():
    month_qlist.append([row["year_month"], row["day_path_list"], row["output_path"]])

year_qlist = []
for _, row in year_df.iterrows():
    year_qlist.append([row["year"], row["month_path_list"], row["output_path"]]

In [None]:
if "daily" in build_list:
    os.makedirs(output_dir / "daily", exist_ok=True)
    run_tasks(process_daily_data, day_qlist)

if "monthly" in build_list:
    os.makedirs(output_dir / "monthly", exist_ok=True)
    run_tasks(process_monthly_data, month_qlist)

if "yearly" in build_list:
    os.makedirs(output_dir / "yearly", exist_ok=True)
    run_tasks(process_yearly_data, year_qlist)