diff --git a/component-library/akfire/README.md b/component-library/akfire/README.md new file mode 100644 index 00000000..2705357c --- /dev/null +++ b/component-library/akfire/README.md @@ -0,0 +1,220 @@ +# Wildfire-Risk Pipeline with IBM CLAIMED + +A scalable, end-to-end ML workflow for pan-European wildfire-probability mapping. Harnessing open data (Copernicus FWI, MODIS burn-area, HARCI-EU infrastructure, OSM), Dask and IBM CLAIMED, the pipeline automates: + +- **Data ingestion & harmonization** into a distributed Zarr cube +- **Baseline modeling** with logistic regression + Monte Carlo uncertainty +- **Advanced training** of monotonic-constraint XGBoost, calibrated via Optuna +- **Batch inference** for historical (2001–2022) and scenario (RCP4.5/8.5, 2023–2050) outputs at 2.5 km resolution + +Containerless operators and Airflow orchestration ensure traceable, CI/CD-friendly execution, while MinIO/S3 provides robust object storage. +Built by **Alpha-Klima**. + +--- + + +## πŸ—‚οΈ Project files + +| Path | Category | Description | +|------------------------------------------------|----------------|----------------------------------------------------------------| +| **operators/** | directory | Python entrypointsβ€”each compiled into a containerless CLAIMED operator. | +| β”œβ”€ `create_training_zarr.py` | operator | Builds the multi-source Zarr training cube. | +| β”œβ”€ `train_logistic.py` | operator | Fits the logistic-regression baseline model. | +| β”œβ”€ `logistic_prediction.py` | operator | Runs batch inference + Monte Carlo uncertainty for the logistic baseline. | +| β”œβ”€ `optimize_xgb_hyperparameters_from_df.py` | operator | Bayesian hyperparameter tuning via Optuna TPE. | +| β”œβ”€ `training_xgboost.py` | operator | Trains XGBoost with monotonic constraints & beta calibration. | +| └─ `xgboost_prediction.py` | operator | Executes tiled XGBoost inference (historical & scenario data). | +| **akfire_claimed_dag.py** | Airflow DAG | Orchestrates the six CLAIMED operators end-to-end. | +| **build_components.sh** | shell script | Compiles & unzips each operator into `:/runneable.py`. | +| **config.json** | template | Template for I/O paths, S3 credentials, temporal bounds & ML parameters. | +| **README.md** | documentation | Project overview, setup & usage instructions. + + +--- + +## πŸ“¦Β Installation + +```bash +pip install claimed==0.1.9 +pip install claimed-cli==0.1.6 +pip install git+https://github.com/claimed-framework/c3.git +``` + +--- + +## 🧠 Why CLAIMED? + +CLAIMED is a modular, reproducible, operator-based execution framework designed to work seamlessly with orchestration tools like Apache Airflow. Its main benefits are: + +- **Modular design**: Reusable components (operators) with explicit dependencies +- **Version control**: Fully traceable builds with rollback support +- **Scalability**: Compatible with Dask and object stores like MinIO +- **Easy CI/CD**: Containerless operators simplify packaging and deployment + +--- + +## πŸ› οΈ Build Operators + +To build an operator, we write a dedicated Python script placed in the **operators/** folder. Each script corresponds to a specific operator definition. + +To build a claimed operator, run: + +```bash +c3_create_containerless_operator -v 0.1.0 operators/create_training_zarr.py +``` + +In this example, **create_training_zarr.py** is the script defining the operator. + +This command generates: + +A **.cwl** metadata file in the same **operators/** folder + +A ZIP archive in the **claimed-operators/** directory: +**claimed-operators/create_training_zarr:0.1.0.zip** + +Repeat this process for each operator script you want to package. + + + + + +## πŸ“‚ Why You Must Unzip Operator ZIPs + +CLAIMED expects a runneable.py inside a directory path that matches the operator name. The CLI **does not look inside the ZIP**, so you must extract it: + +```bash +unzip claimed-operators/create_training_zarr:0.1.0.zip -d claimed-operators/create_training_zarr:0.1.0 +``` + +Now the CLAIMED CLI can locate runneable.py: + +```bash +claimed --component containerless/create_training_zarr:0.1.0 +``` +--- + +## πŸ”„ Automate Operator Builds + +Use a script like **build_components.sh**, this runs c3_create_containerless_operator -v *version* for every file in operators/ and unzips each ZIP into claimed-operators/*name*:*version*/. + +--- + +## 🌍 Required Environment Variables + +CLAIMED requires specific environment variables to locate configuration files, access datasets, and run operators correctly. + +For our pipeline, which uses **MinIO** for object storage, set the following credentials and connection info: + +```bash +export AKFIRE_ACCESS_KEY="..." +export AKFIRE_SECRET_KEY="..." +export AKFIRE_BUCKET="ak-fire-v1-0" +export AKFIRE_ENDPOINT="" +``` + +You must also set the path to the main configuration file used by the pipeline. This **config.json** includes settings for credentials, dataset handling, Zarr creation, training, and prediction: + +```bash +export config="/path/to/config.json" +``` + +To run CLAIMED containerless operators, define the following paths: + +```bash +export CLAIMED_COMPONENTS_DIR="$PWD" +export CLAIMED_CONTAINERLESS_OPERATOR_PATH="$PWD/claimed-operators/create_training_zarr:0.1.0" +``` + +CLAIMED_COMPONENTS_DIR tells CLAIMED where to find the claimed-operators/ directory. Typically, set it to the parent directory containing that folder ($PWD if you're already in the root). + +CLAIMED_CONTAINERLESS_OPERATOR_PATH points to the specific operator package to run. This must be updated per operator. +For example, for the create_training_zarr operator: + +```bash +export CLAIMED_CONTAINERLESS_OPERATOR_PATH="$PWD/claimed-operators/create_training_zarr:0.1.0" +``` + +This path should contain the runnable.py used by CLAIMED to execute the operator logic. + +--- + +## πŸš€ Integration with Apache Airflow + +In our Airflow DAG (`akfire_claimed_dag.py`), each CLAIMED operator is executed using a `BashOperator`. + +The DAG dynamically sets up all required environment variables and resolves paths, including MinIO credentials and the specific operator folder to run. It also downloads the required `config.json` from MinIO before running any task. + +Each step of the pipeline (like `create_training_zarr`, `train_logistic`, etc.) is defined as a `BashOperator` running: + +```bash +claimed --component containerless/: +``` + +Here’s an example for one task: + +```python +BashOperator( + task_id="create_training_zarr", + bash_command="claimed --component containerless/create_training_zarr:0.1.0", + env={ + **ENV_VARS, + "CLAIMED_CONTAINERLESS_OPERATOR_PATH": op_path("create_training_zarr"), + "config": str(config_file), + }, +) +``` + +## 🧰 Local Runner alternative + +If you’re not using Airflow, here’s a standalone Python script to execute every operator in sequence: + +```python +import os, shutil, subprocess +from pathlib import Path +from dataset_generation.utilities.s3_utilities import get_s3_fs + +THIS_DIR = Path(__file__).resolve().parent +COMPONENTS_DIR = THIS_DIR / "claimed-operators" +VERSION = "0.1.0" + +S3 = { + "AKFIRE_ACCESS_KEY": os.getenv("AKFIRE_ACCESS_KEY"), + "AKFIRE_SECRET_KEY": os.getenv("AKFIRE_SECRET_KEY"), + "AKFIRE_BUCKET": os.getenv("AKFIRE_BUCKET"), + "AKFIRE_ENDPOINT": os.getenv("AKFIRE_ENDPOINT"), +} + +def download_config(tmp: Path) -> Path: + fs = get_s3_fs() + tmp.mkdir(exist_ok=True) + cfg = tmp / "config.json" + fs.get(f"{S3['AKFIRE_BUCKET']}/config.json", str(cfg)) + return cfg + +def run_operator(name: str, cfg: Path): + env = os.environ.copy() + env.update(S3) + env["config"] = str(cfg) + env["CLAIMED_COMPONENTS_DIR"] = str(COMPONENTS_DIR) + cmd = ["claimed", "--component", f"containerless/{name}:{VERSION}"] + subprocess.run(cmd, env=env, check=True) + +def main(): + tmp = THIS_DIR / "tmp" + cfg = download_config(tmp) + for op in [ + "create_training_zarr", + "train_logistic", + "logistic_prediction", + "optimize_xgb_hyperparameters_from_df", + "training_xgboost", + "xgboost_prediction", + ]: + print(f"Running {op}…") + run_operator(op, cfg) + shutil.rmtree(tmp) + +if __name__ == "__main__": + main() + +``` \ No newline at end of file diff --git a/component-library/akfire/akfire_claimed_dag.py b/component-library/akfire/akfire_claimed_dag.py new file mode 100644 index 00000000..d89b1da3 --- /dev/null +++ b/component-library/akfire/akfire_claimed_dag.py @@ -0,0 +1,131 @@ +from __future__ import annotations + +import os, shutil +from datetime import timedelta +from pathlib import Path +import pendulum + +from airflow.decorators import dag, task +from airflow.models.baseoperator import chain +from airflow.operators.bash import BashOperator + +from dataset_generation.utilities.s3_utilities import get_s3_fs + + +# Environment variables (loaded once at DAG start) +AKFIRE_ACCESS_KEY = os.getenv("AKFIRE_ACCESS_KEY") +AKFIRE_SECRET_KEY = os.getenv("AKFIRE_SECRET_KEY") +AKFIRE_BUCKET = os.getenv("AKFIRE_BUCKET") +AKFIRE_ENDPOINT = os.getenv("AKFIRE_ENDPOINT") + +THIS_DIR = Path(__file__).resolve().parent +COMPONENTS_DIR = THIS_DIR +VERSION = "0.1.0" + + +def op_path(name: str) -> str: + """returns the claimed-operators/: folder""" + return str(COMPONENTS_DIR / "claimed-operators") + + +ENV_VARS: dict[str, str] = { + "AKFIRE_ACCESS_KEY": AKFIRE_ACCESS_KEY, + "AKFIRE_SECRET_KEY": AKFIRE_SECRET_KEY, + "AKFIRE_BUCKET": AKFIRE_BUCKET, + "AKFIRE_ENDPOINT": AKFIRE_ENDPOINT, + "CLAIMED_COMPONENTS_DIR": str(COMPONENTS_DIR), +} + +START_DATE = pendulum.datetime(2024, 6, 5, 8, 0, tz="Europe/Madrid") +DAG_ID = "claimed_pipeline_dataset_generation" +TAGS = ["claimed", "dataset-generation", "fire"] +CATCHUP = False + +DEFAULT_ARGS = { + "env": { + **ENV_VARS, + "SKIP_TASKS": "", + }, + "retries": 10, + "retry_delay": timedelta(minutes=2), +} + + +def claimed_cmd(component: str) -> str: + """injects the env var Β«configΒ» and executes the component""" + return f"/home/airflow/.local/bin/claimed --component containerless/{component}:{VERSION}" + + +@dag( + dag_id=DAG_ID, + schedule_interval=None, + start_date=START_DATE, + catchup=CATCHUP, + tags=TAGS, + default_args=DEFAULT_ARGS, +) +def claimed_dataset_pipeline(): + tmp_base = THIS_DIR / Path("tmp/akfire_dag") + config_file = tmp_base / "config.json" + + @task + def download_config() -> str: + fs = get_s3_fs() + tmp_base.mkdir(parents=True, exist_ok=True) + fs.get(f"{ENV_VARS['AKFIRE_BUCKET']}/config.json", str(config_file)) + return str(config_file) + + @task(trigger_rule="all_done") + def cleanup(path: str): + """Remove the temporary directory and its contents.""" + if tmp_base.exists(): + shutil.rmtree(tmp_base) + + # ── compact definition of the six steps ────────────────────────── + operators = { + "create_training_zarr": "create_training_zarr", + "train_logistic": "train_logistic", + "logistic_prediction": "logistic_prediction", + "optimize_xgb": "optimize_xgb_hyperparameters_from_df", + "train_xgboost": "training_xgboost", + "xgboost_prediction": "xgboost_prediction", + } + + SKIP_TASKS = DEFAULT_ARGS["env"]["SKIP_TASKS"].split(",") + + tasks = {} + for task_id, comp_name in operators.items(): + if task_id in SKIP_TASKS: + tasks[task_id] = BashOperator( + task_id=task_id, + bash_command="echo 'Skipped by config'", + ) + else: + tasks[task_id] = BashOperator( + task_id=task_id, + bash_command=claimed_cmd(comp_name), + env={ # inherits ENV_VARS + individual component path + **ENV_VARS, + "CLAIMED_CONTAINERLESS_OPERATOR_PATH": op_path(comp_name), + "config": f"{config_file}", + }, + ) + + cfg = download_config() + chain( + cfg, + tasks["create_training_zarr"], + tasks["train_logistic"], + tasks["logistic_prediction"], + tasks["optimize_xgb"], + tasks["train_xgboost"], + tasks["xgboost_prediction"], + cleanup(cfg), + ) + + +if __name__ == "__main__": + claimed_dataset_pipeline().test() + + +claimed_dataset_pipeline() diff --git a/component-library/akfire/build_components.sh b/component-library/akfire/build_components.sh new file mode 100644 index 00000000..f46bc68d --- /dev/null +++ b/component-library/akfire/build_components.sh @@ -0,0 +1,34 @@ +#!/usr/bin/env bash +# build_components.sh – compiles and unpacks all operators to version 0.1.0 +# usage: chmod +x build_components.sh && ./build_components.sh + +set -euo pipefail + +VERSION="0.1.0" +COMPILER="c3_create_containerless_operator" +OUTDIR="claimed-operators" + +operators=( + "operators/create_training_zarr.py" + "operators/train_logistic.py" + "operators/logistic_prediction.py" + "operators/training_xgboost.py" + "operators/xgboost_prediction.py" + "operators/optimize_xgb_hyperparameters_from_df.py" +) + +mkdir -p "$OUTDIR" + +for op in "${operators[@]}"; do + base="$(basename "$op" .py)" + echo ">> Compiling $base:$VERSION" + "$COMPILER" -v "$VERSION" "$op" + + zip="$OUTDIR/${base}:$VERSION.zip" # default output from compiler + dest="$OUTDIR/${base}.$VERSION" # destination folder + echo " Unpacking to $dest" + rm -rf "$dest" + unzip -q "$zip" -d "$dest" +done + +echo "Components version $VERSION ready in $OUTDIR/" diff --git a/component-library/akfire/config.json b/component-library/akfire/config.json new file mode 100644 index 00000000..87902e56 --- /dev/null +++ b/component-library/akfire/config.json @@ -0,0 +1,193 @@ +{ + "output_dir": "data_inputs", + "credentials_path": "../credentials.json", + "training_data_path": "training_data", + "training_models_path": "training_models", + "outputs_path": "data_outputs", + "figures_path": "figures", + "years": [ + 2001, + 2002, + 2003, + 2004, + 2005, + 2006, + 2007, + 2008, + 2009, + 2010, + 2011, + 2012, + 2013, + 2014, + 2015, + 2016, + 2017, + 2018, + 2019, + 2020, + 2021, + 2022 + ], + "last_year": 2050, + "folders": { + "shapefile": "shape_file_country_codes", + "gee": "EarthEngineData", + "cds": "CDSData", + "infrastructure": "infrastructures" + }, + "shapefile": { + "url": "", + "archive_name": "", + "filtered_shapefile_name": "", + "rasterized_zarr": "", + "attributes_filter": { + "values": [ + "AL", + "AT", + "BE", + "BG", + "CH", + "CY", + "CZ", + "DE", + "DK", + "EE", + "ES", + "FI", + "FR", + "GB", + "GR", + "HR", + "HU", + "IE", + "IS", + "IT", + "LI", + "LT", + "LU", + "LV", + "ME", + "MK", + "MT", + "NL", + "NO", + "PL", + "PT", + "RO", + "RS", + "SE", + "SI", + "SK", + "TR" + ] + }, + "bounding_box": [], + "raster_resolution": 2500, + "naming_pattern": "country_{code}", + "adjust_boundaries": false + }, + "gee": { + "output_files": { + "modis_lc": "", + "modis_burn_date": "" + }, + "collection": { + "lc": "", + "burn": "" + }, + "band": { + "lc": "LC_Type1", + "burn": "BurnDate" + }, + "scale": 500, + "crs": "EPSG:3035", + "frequency": { + "lc": "", + "burn": "" + }, + "max_workers": 8 + }, + "cds": { + "base_request": { + "time_aggregation": "seasonal_indicators", + "version": "v1_0" + }, + "scenarios": { + "historical": [ + "2001_2005" + ], + "rcp4_5": [ + "2006_2010", + "2011_2015", + "2016_2020", + "2021_2025", + "2026_2030", + "2031_2035", + "2036_2040", + "2041_2045", + "2046_2050" + ], + "rcp8_5": [ + "2006_2010", + "2011_2015", + "2016_2020", + "2021_2025", + "2026_2030", + "2031_2035", + "2036_2040", + "2041_2045", + "2046_2050" + ] + }, + "product_types": [], + "dataset": "", + "variables": [], + "scenario": "rcp85", + "overwrite": false + }, + "infrastructure": { + "url": "", + "files_to_extract": [] + }, + "population_cities": { + "centres_type": [ + "city", + "town" + ] + }, + "xgboost": { + "model": "xgboost", + "variables_to_drop": [], + "bayesian_opt": 0, + "default_params": {}, + "xgboost_monotonic_constraints": [], + "train_rounds": 0, + "early_stopping_rounds": 0, + "prob_threshold": 0, + "monte_carlo_iterations": 0, + "optimized_params": {} + }, + "logistic_regression": { + "model": "logistic_regression", + "variables_to_drop": [], + "standardize_features": [], + "reciprocal_standardize_features": [], + "polynomial_features": {}, + "prob_threshold": 0, + "params": {}, + "train_test_split": { + "test_size": 0 + }, + "evaluation": { + "metrics": [], + "thresholds": {} + }, + "random_state": 0, + "six_years_window": 0, + "squared": 0, + "num_sim": 0 + }, + "prediction_data_scenario": "rcp85", + "training_data_scenario": "rcp45" +} \ No newline at end of file diff --git a/component-library/akfire/operators/create_training_zarr.cwl b/component-library/akfire/operators/create_training_zarr.cwl new file mode 100644 index 00000000..6f179e63 --- /dev/null +++ b/component-library/akfire/operators/create_training_zarr.cwl @@ -0,0 +1,21 @@ +cwlVersion: v1.2 +class: CommandLineTool + +baseCommand: "claimed" + +inputs: + component: + type: string + default: containerless/claimed-operators/create_training_zarr:0.1.0 + inputBinding: + position: 1 + prefix: --component + config: + type: string + default: None + inputBinding: + position: 2 + prefix: --config + + +outputs: [] diff --git a/component-library/akfire/operators/create_training_zarr.py b/component-library/akfire/operators/create_training_zarr.py new file mode 100644 index 00000000..31d00725 --- /dev/null +++ b/component-library/akfire/operators/create_training_zarr.py @@ -0,0 +1,49 @@ +""" +Create Training Zarr Operator + +This script runs the Wildfire `create_training_zarr` pipeline using a Dask cluster. +It expects a configuration file (json) specified via `--config` or the `config` environment variable. +The Dask cluster is started locally with 4 workers, each using 1 thread and 12 GiB of memory. + +""" + +# operators/create_training_zarr.py +import os, argparse, dask.config +from dask.distributed import LocalCluster, Client +from Wildfire_data_prep.training_zarr import create_training_zarr + + +def parse_args() -> str: + p = argparse.ArgumentParser() + p.add_argument("--config", default=os.environ.get("config")) + a = p.parse_args() + if a.config is None: + p.error("--config missing (and env var 'config' not set)") + return a.config + + +def main() -> None: + cfg = parse_args() + dask.config.set( + { + "dataframe.shuffle.method": "p2p", + "distributed.worker.memory.target": 0.6, + "distributed.worker.memory.spill": 0.7, + "distributed.worker.memory.pause": 0.8, + "distributed.worker.memory.terminate": 0.95, + } + ) + with ( + LocalCluster( + n_workers=4, + threads_per_worker=1, + memory_limit="12GiB", + local_directory="/tmp/dask-worker-space", + ) as cluster, + Client(cluster), + ): + create_training_zarr(cfg) + + +if __name__ == "__main__": + main() diff --git a/component-library/akfire/operators/logistic_prediction.cwl b/component-library/akfire/operators/logistic_prediction.cwl new file mode 100644 index 00000000..93aabbda --- /dev/null +++ b/component-library/akfire/operators/logistic_prediction.cwl @@ -0,0 +1,21 @@ +cwlVersion: v1.2 +class: CommandLineTool + +baseCommand: "claimed" + +inputs: + component: + type: string + default: containerless/claimed-operators/logistic_prediction:0.1.0 + inputBinding: + position: 1 + prefix: --component + config: + type: string + default: None + inputBinding: + position: 2 + prefix: --config + + +outputs: [] diff --git a/component-library/akfire/operators/logistic_prediction.py b/component-library/akfire/operators/logistic_prediction.py new file mode 100644 index 00000000..cf38d756 --- /dev/null +++ b/component-library/akfire/operators/logistic_prediction.py @@ -0,0 +1,50 @@ +""" +Logistic Prediction Operator + +This operator runs the logistic regression prediction workflow using Dask for parallel execution. +It expects a configuration file specifying model input paths, output destinations, and any preprocessing options. +The config can be passed via the `--config` flag or the `config` environment variable. + +The Dask cluster is started locally with 4 workers, each using 1 thread and 12 GiB of memory. +""" + +# operators/create_training_zarr.py +import os, argparse, dask.config +from dask.distributed import LocalCluster, Client +from dataset_generation.training_ml_wf.predict_logistic import logistic_prediction + + +def parse_args() -> str: + p = argparse.ArgumentParser() + p.add_argument("--config", default=os.environ.get("config")) + a = p.parse_args() + if a.config is None: + p.error("--config missing (and env var 'config' not set)") + return a.config + + +def main() -> None: + cfg = parse_args() + dask.config.set( + { + "dataframe.shuffle.method": "p2p", + "distributed.worker.memory.target": 0.6, + "distributed.worker.memory.spill": 0.7, + "distributed.worker.memory.pause": 0.8, + "distributed.worker.memory.terminate": 0.95, + } + ) + with ( + LocalCluster( + n_workers=4, + threads_per_worker=1, + memory_limit="12GiB", + local_directory="/tmp/dask-worker-space", + ) as cluster, + Client(cluster), + ): + logistic_prediction(cfg) + + +if __name__ == "__main__": + main() diff --git a/component-library/akfire/operators/optimize_xgb_hyperparameters_from_df.cwl b/component-library/akfire/operators/optimize_xgb_hyperparameters_from_df.cwl new file mode 100644 index 00000000..725bfe4b --- /dev/null +++ b/component-library/akfire/operators/optimize_xgb_hyperparameters_from_df.cwl @@ -0,0 +1,21 @@ +cwlVersion: v1.2 +class: CommandLineTool + +baseCommand: "claimed" + +inputs: + component: + type: string + default: containerless/claimed-operators/optimize_xgb_hyperparameters_from_df:0.1.0 + inputBinding: + position: 1 + prefix: --component + config: + type: string + default: None + inputBinding: + position: 2 + prefix: --config + + +outputs: [] diff --git a/component-library/akfire/operators/optimize_xgb_hyperparameters_from_df.py b/component-library/akfire/operators/optimize_xgb_hyperparameters_from_df.py new file mode 100644 index 00000000..9bca403c --- /dev/null +++ b/component-library/akfire/operators/optimize_xgb_hyperparameters_from_df.py @@ -0,0 +1,55 @@ +""" +XGBoost Hyperparameter Optimization Operator + +Runs Bayesian optimization to tune XGBoost hyperparameters using cross-validation scores +derived from a preprocessed DataFrame input. The configuration should specify the dataset path, +parameter search space, optimization criteria, and output destination. + +The Dask cluster is started locally with 4 workers, each using 1 thread and 12 GiB of memory. + +Config path must be passed via `--config` or the `config` environment variable. + +""" + +# operators/create_training_zarr.py +import os, argparse, dask.config +from dask.distributed import LocalCluster, Client +from dataset_generation.training_ml_wf.bayesian_optimization import ( + optimize_xgb_hyperparameters_from_df, +) + + +def parse_args() -> str: + p = argparse.ArgumentParser() + p.add_argument("--config", default=os.environ.get("config")) + a = p.parse_args() + if a.config is None: + p.error("--config missing (and env var 'config' not set)") + return a.config + + +def main() -> None: + cfg = parse_args() + dask.config.set( + { + "dataframe.shuffle.method": "p2p", + "distributed.worker.memory.target": 0.6, + "distributed.worker.memory.spill": 0.7, + "distributed.worker.memory.pause": 0.8, + "distributed.worker.memory.terminate": 0.95, + } + ) + with ( + LocalCluster( + n_workers=4, + threads_per_worker=1, + memory_limit="12GiB", + local_directory="/tmp/dask-worker-space", + ) as cluster, + Client(cluster), + ): + optimize_xgb_hyperparameters_from_df(cfg) + + +if __name__ == "__main__": + main() diff --git a/component-library/akfire/operators/train_logistic.cwl b/component-library/akfire/operators/train_logistic.cwl new file mode 100644 index 00000000..aa7501b7 --- /dev/null +++ b/component-library/akfire/operators/train_logistic.cwl @@ -0,0 +1,21 @@ +cwlVersion: v1.2 +class: CommandLineTool + +baseCommand: "claimed" + +inputs: + component: + type: string + default: containerless/claimed-operators/train_logistic:0.1.0 + inputBinding: + position: 1 + prefix: --component + config: + type: string + default: None + inputBinding: + position: 2 + prefix: --config + + +outputs: [] diff --git a/component-library/akfire/operators/train_logistic.py b/component-library/akfire/operators/train_logistic.py new file mode 100644 index 00000000..7a3fc774 --- /dev/null +++ b/component-library/akfire/operators/train_logistic.py @@ -0,0 +1,53 @@ +""" +Logistic Regression Training Operator + +This operator trains a logistic regression model using data specified in a configuration file. +It supports distributed training via a local Dask cluster for efficient parallel computation. +The config should define paths to training data, output model location, and training parameters. + +The Dask cluster is started locally with 4 workers, each using 1 thread and 12 GiB of memory. + +Configuration is required via `--config` flag or `config` environment variable. + +""" + +# operators/create_training_zarr.py +import os, argparse, dask.config +from dask.distributed import LocalCluster, Client +from dataset_generation.training_ml_wf.training_logistic import train_logistic + + +def parse_args() -> str: + p = argparse.ArgumentParser() + p.add_argument("--config", default=os.environ.get("config")) + a = p.parse_args() + if a.config is None: + p.error("--config missing (and env var 'config' not set)") + return a.config + + +def main() -> None: + cfg = parse_args() + dask.config.set( + { + "dataframe.shuffle.method": "p2p", + "distributed.worker.memory.target": 0.6, + "distributed.worker.memory.spill": 0.7, + "distributed.worker.memory.pause": 0.8, + "distributed.worker.memory.terminate": 0.95, + } + ) + with ( + LocalCluster( + n_workers=4, + threads_per_worker=1, + memory_limit="12GiB", + local_directory="/tmp/dask-worker-space", + ) as cluster, + Client(cluster), + ): + train_logistic(cfg) + + +if __name__ == "__main__": + main() diff --git a/component-library/akfire/operators/training_xgboost.cwl b/component-library/akfire/operators/training_xgboost.cwl new file mode 100644 index 00000000..d1684e3a --- /dev/null +++ b/component-library/akfire/operators/training_xgboost.cwl @@ -0,0 +1,21 @@ +cwlVersion: v1.2 +class: CommandLineTool + +baseCommand: "claimed" + +inputs: + component: + type: string + default: containerless/claimed-operators/training_xgboost:0.1.0 + inputBinding: + position: 1 + prefix: --component + config: + type: string + default: None + inputBinding: + position: 2 + prefix: --config + + +outputs: [] diff --git a/component-library/akfire/operators/training_xgboost.py b/component-library/akfire/operators/training_xgboost.py new file mode 100644 index 00000000..ac393835 --- /dev/null +++ b/component-library/akfire/operators/training_xgboost.py @@ -0,0 +1,52 @@ +""" +XGBoost Model Training Operator + +Trains an XGBoost model using parameters and data defined in a configuration file. +The Dask cluster is started locally with 4 workers, each using 1 thread and 12 GiB of memory. + +The configuration should specify training data paths, model parameters, and output destinations. + +Pass the config file via `--config` or set the `config` environment variable. + +""" + +# operators/create_training_zarr.py +import os, argparse, dask.config +from dask.distributed import LocalCluster, Client +from dataset_generation.training_ml_wf.training_xgboost import training_xgboost + + +def parse_args() -> str: + p = argparse.ArgumentParser() + p.add_argument("--config", default=os.environ.get("config")) + a = p.parse_args() + if a.config is None: + p.error("--config missing (and env var 'config' not set)") + return a.config + + +def main() -> None: + cfg = parse_args() + dask.config.set( + { + "dataframe.shuffle.method": "p2p", + "distributed.worker.memory.target": 0.6, + "distributed.worker.memory.spill": 0.7, + "distributed.worker.memory.pause": 0.8, + "distributed.worker.memory.terminate": 0.95, + } + ) + with ( + LocalCluster( + n_workers=4, + threads_per_worker=1, + memory_limit="12GiB", + local_directory="/tmp/dask-worker-space", + ) as cluster, + Client(cluster), + ): + training_xgboost(cfg) + + +if __name__ == "__main__": + main() diff --git a/component-library/akfire/operators/xgboost_prediction.cwl b/component-library/akfire/operators/xgboost_prediction.cwl new file mode 100644 index 00000000..4431f791 --- /dev/null +++ b/component-library/akfire/operators/xgboost_prediction.cwl @@ -0,0 +1,21 @@ +cwlVersion: v1.2 +class: CommandLineTool + +baseCommand: "claimed" + +inputs: + component: + type: string + default: containerless/claimed-operators/xgboost_prediction:0.1.0 + inputBinding: + position: 1 + prefix: --component + config: + type: string + default: None + inputBinding: + position: 2 + prefix: --config + + +outputs: [] diff --git a/component-library/akfire/operators/xgboost_prediction.py b/component-library/akfire/operators/xgboost_prediction.py new file mode 100644 index 00000000..7bc32920 --- /dev/null +++ b/component-library/akfire/operators/xgboost_prediction.py @@ -0,0 +1,53 @@ +""" +XGBoost Prediction Operator + +This operator loads a trained XGBoost model and performs predictions on input data +defined in a configuration file. The Dask cluster is started locally with 4 workers, +each using 1 thread and 12 GiB of memory. + +The configuration must specify the model path, input features, and prediction output location. + +Pass the config file via `--config` or set the `config` environment variable. + +""" + +# operators/create_training_zarr.py +import os, argparse, dask.config +from dask.distributed import LocalCluster, Client +from dataset_generation.training_ml_wf.predict_xgboost import xgboost_prediction + + +def parse_args() -> str: + p = argparse.ArgumentParser() + p.add_argument("--config", default=os.environ.get("config")) + a = p.parse_args() + if a.config is None: + p.error("--config missing (and env var 'config' not set)") + return a.config + + +def main() -> None: + cfg = parse_args() + dask.config.set( + { + "dataframe.shuffle.method": "p2p", + "distributed.worker.memory.target": 0.6, + "distributed.worker.memory.spill": 0.7, + "distributed.worker.memory.pause": 0.8, + "distributed.worker.memory.terminate": 0.95, + } + ) + with ( + LocalCluster( + n_workers=4, + threads_per_worker=1, + memory_limit="12GiB", + local_directory="/tmp/dask-worker-space", + ) as cluster, + Client(cluster), + ): + xgboost_prediction(cfg) + + +if __name__ == "__main__": + main()