In [None]:

import os
import pandas as pd
import shutil
import xgboost as xgb

from nvflare.apis.fl_constant import JobConstants 

from sklearn.metrics import roc_auc_score

In [None]:
from multi_modality_fl.utils.data_management import GlobalExperimentsConfiguration, write_json, read_json

### Import Dataset for Training/Testing and Validation

In [None]:
current_experiment = GlobalExperimentsConfiguration(
    base_path=os.path.join(os.getcwd(), os.path.join('multi_modality_fl', 'experiments')),
    experiment_name='federated_base_xgb_bagging',
    random_seed=0
)

current_experiment.create_experiment(
    dataset_folder='/Users/benjamindanek/Code/federated_learning_multi_modality_ancestry/data',
    dataset=GlobalExperimentsConfiguration.MULTIMODALITY,
    split_method=GlobalExperimentsConfiguration.SKLEARN
)

### Define The Experiments

In [None]:
SERVER_KEY = "server"
"""server FL site"""
SERVER_VAL = f"app_{SERVER_KEY}"
"""server FL app name"""

def CLIENT_KEY(site_name_prefix, i):
    """client FL site"""
    return f"{site_name_prefix}{i}"

def CLIENT_VAL(site_name_prefix, i): 
    """client FL app name"""
    return f"app_{site_name_prefix}{i}"

def get_deploy_map(site_name_prefix: str, n_sites: int):
    """
    Generate a map of which apps in the job being uploaded will be deployed to which FL client sites.
    
    https://nvflare.readthedocs.io/en/main/real_world_fl/job.html#deploy-map
    """
    deploy_map = {SERVER_VAL: [SERVER_KEY]}
    for i in range(1, n_sites + 1):
        deploy_map[CLIENT_VAL(site_name_prefix, i)] = [CLIENT_KEY(site_name_prefix, i)]

    return deploy_map

In [None]:
# define nvflare experiments as jobs
ALL_JOBS_PATH = os.path.join(current_experiment.experiment_path, 'jobs')
"""The portion of the experiment data path which is reserved for nvflare job definitions"""

# root for this series of jobs
# It is convenient to conduct several experiments at a time, so this interface was developed. 
JOB_BASE_FOLDER = 'xgb_base_bag'
"""The root of all jobs for the current experiment. (ie `random_forest_base`)"""

# Base folder for jobs
base_path = os.path.join(ALL_JOBS_PATH, JOB_BASE_FOLDER)
if not os.path.exists(base_path):
    os.makedirs(base_path, exist_ok=True)

In [None]:
# create base job
# we copy the base job when generating new jobs & change a few aspects in the design of experiemnts

# 1. define meta
base_job_meta_path = os.path.join(base_path, JobConstants.META_FILE)
base_job_meta = {
  "name": "xgboost_tree_bagging",
  "resource_spec": {},
  "deploy_map": {
    "app": [
      "@ALL"
    ]
  }
}


# src from: https://github.com/NVIDIA/NVFlare/blob/main/examples/advanced/random_forest/jobs/random_forest_base/meta.json
write_json(base_job_meta, base_job_meta_path)

# 2. define server & client configs
base_job_root = os.path.join(base_path, "app")
base_job_configs = os.path.join(base_job_root, "config")
if not os.path.exists(base_job_configs):
    os.makedirs(base_job_configs, exist_ok=True)

# 2.1. define base job config for server
BASE_MODEL_SAVE_NAME = "xgboost_model.json"
base_job_server_config_path = os.path.join(base_job_configs, JobConstants.SERVER_JOB_CONFIG)
base_job_server_config = {
  "format_version": 2,

  "server": {
    "heart_beat_timeout": 600,
    "task_request_interval": 0.05
  },

  "task_data_filters": [],
  "task_result_filters": [],

  "components": [
    {
      "id": "persistor",
      "path": "nvflare.app_opt.xgboost.tree_based.model_persistor.XGBModelPersistor",
      "args": {
        "save_name": BASE_MODEL_SAVE_NAME
      }
    },
    {
      "id": "shareable_generator",
      "path": "nvflare.app_opt.xgboost.tree_based.shareable_generator.XGBModelShareableGenerator",
      "args": {}
    },
    {
      "id": "aggregator",
      "path": "nvflare.app_opt.xgboost.tree_based.bagging_aggregator.XGBBaggingAggregator",
      "args": {}
    }
  ],
  "workflows": [
    {
      "id": "scatter_and_gather",
      "name": "ScatterAndGather",
      "args": {
        "min_clients": 5,
        "num_rounds": 101,
        "start_round": 0,
        "wait_time_after_min_received": 0,
        "aggregator_id": "aggregator",
        "persistor_id": "persistor",
        "shareable_generator_id": "shareable_generator",
        "train_task_name": "train",
        "train_timeout": 0,
        "allow_empty_global_weights": 1, 
        "task_check_period": 0.01,
        "persist_every_n_rounds": 0,
        "snapshot_every_n_rounds": 0
      }
    }
  ]
}
# src: https://github.com/NVIDIA/NVFlare/blob/main/examples/advanced/random_forest/jobs/random_forest_base/app/config/config_fed_server.json
write_json(base_job_server_config, base_job_server_config_path)

# 2.2. define the base job config for a client
base_job_client_config_path = os.path.join(base_job_configs, JobConstants.CLIENT_JOB_CONFIG)
base_job_client_config = {
  "format_version": 2,

  "executors": [
    {
      "tasks": [
        "train"
      ],
      "executor": {
        "id": "Executor",
        "path": "custom_executor.FedXGBTreeExecutor_multi_modality",
        "args": {
          "data_split_filename": "data_split.json",
          "training_mode": "bagging",
          "num_client_bagging": 5,
          "num_local_parallel_tree": 1,
          "local_subsample": 1,
          "lr_mode": "scaled",
          "local_model_path": "model.json",
          "global_model_path": "model_global.json",
          "learning_rate": 0.1,
          "objective": "binary:logistic",
          "max_depth": 8,
          "eval_metric": "auc",
          "tree_method": "hist",
          "nthread": 16
        }
      }
    }
  ],
  "task_result_filters": [],
  "task_data_filters": [],
  "components": []
}




# src: https://github.com/NVIDIA/NVFlare/blob/main/examples/advanced/xgboost/histogram-based/jobs/base/app/config/config_fed_client.json
write_json(base_job_client_config, base_job_client_config_path)

# 3. copy over custom contents for this experiment
# source
custom_data = os.path.join(os.getcwd(), "multi_modality_fl", "models", "nvflare", "xgboost_base_bagging_custom")

# destination
base_job_custom = os.path.join(base_job_root, "custom")
os.makedirs(base_job_custom, exist_ok=True)
# recursive copy
shutil.copytree(custom_data, base_job_custom, dirs_exist_ok=True)

In [None]:
# each job base folder has a meta file defining the topology of the federation for that particular job.
# each job within the base folder shares the same meta file, but has different parameters specified.

# SOME PARAMETERS TO PLAY WITH: https://github.com/NVIDIA/NVFlare/blob/e217679c4de035564a6ed9c2e2658197b0c8e701/examples/advanced/random_forest/utils/prepare_job_config.py#L35-L44
from typing import Any, Callable


def prepare_nvflare_xgbrf_experiment(
        job_name: str, 
        site_prefix: str, 
        num_clients: int, 
        split_method: str, 
        site_config_naming_fn: Callable[..., Any], 
        local_subsample: str, 
        lr_mode: str
    ) -> dict:

    job = {
        "prefix": site_prefix,
        "n_sites": num_clients,
        "split_method": split_method,
        "num_local_parallel_tree": 5, # number of trees in XGB RF
        "local_subsample": local_subsample, # hyper-parameter https://www.r-bloggers.com/2021/08/feature-subsampling-for-random-forest-regression/
        "lr_scale": num_clients, # investigate this param: https://github.com/NVIDIA/NVFlare/blob/e217679c4de035564a6ed9c2e2658197b0c8e701/examples/advanced/random_forest/utils/prepare_job_config.py#L93
        "lr_mode": lr_mode, # or "scaled"
        "nthread": 16,
        "tree_method": "hist", # client classifier should be "hist" since we use "tree" in server level
        "training_mode": "bagging", # | "cyclic"
        "num_client_bagging": num_clients, # 1 if cyclic
        "num_rounds": 100,
        "_name": job_name
    }

    # make the folder for the job
    path = os.path.join(ALL_JOBS_PATH, job_name)
    if not os.path.exists(path):
        os.makedirs(path, exist_ok=True) 
    job["_path"] = path

    # 1. define the meta file for this job by augmenting the base file and writing it to a new job folder
    meta_config = read_json(base_job_meta_path)
    meta_config["name"] = job_name
    meta_config["deploy_map"] = get_deploy_map(job["prefix"], job["n_sites"])
    meta_config["min_clients"] = job["n_sites"]
    write_json(meta_config, os.path.join(path, JobConstants.META_FILE))


    # 2. define the server & client configs for this job by augmenting the base file and writing it to a new job folder
    # 2.1.1 make job server config
    server_config = read_json(base_job_server_config_path)
    server_config["workflows"][0]["args"]["num_rounds"] = job["num_rounds"]
    server_config["workflows"][0]["args"]["min_clients"] = num_clients

    server_app_name = SERVER_VAL
    server_config_path = os.path.join(path, server_app_name, "config")
    if not os.path.exists(server_config_path):
        os.makedirs(server_config_path, exist_ok=True)
    write_json(server_config, os.path.join(server_config_path, JobConstants.SERVER_JOB_CONFIG))

    # 2.2 make job client config
    for site_idx in range(job["n_sites"]):
        
        client_app_name = CLIENT_VAL(job["prefix"], site_idx + 1)
        client_path = os.path.join(path, client_app_name)
        if not os.path.exists(client_path):
            os.makedirs(client_path, exist_ok=True)

        client_config_path = os.path.join(client_path, "config")
        if not os.path.exists(client_config_path):
            os.makedirs(client_config_path, exist_ok=True)

        # 2.2.1 update client config
        client_config = read_json(base_job_client_config_path)
       
        # path for json which defines site split
        data_split_name = os.path.join(current_experiment.experiment_path, site_config_naming_fn(site_idx))

        client_config["executors"][0]["executor"]["args"]["data_split_filename"] = data_split_name
        client_config["executors"][0]["executor"]["args"]["lr_scale"] = job["lr_scale"]
        client_config["executors"][0]["executor"]["args"]["lr_mode"] = job["lr_mode"]
        client_config["executors"][0]["executor"]["args"]["nthread"] = job["nthread"]
        client_config["executors"][0]["executor"]["args"]["tree_method"] = job["tree_method"]
        client_config["executors"][0]["executor"]["args"]["training_mode"] = job["training_mode"]
        client_config["executors"][0]["executor"]["args"]["num_client_bagging"] = job["num_client_bagging"]
        # src: https://github.com/NVIDIA/NVFlare/blame/e217679c4de035564a6ed9c2e2658197b0c8e701/examples/advanced/random_forest/utils/prepare_job_config.py#L75
        write_json(client_config, os.path.join(client_config_path, JobConstants.CLIENT_JOB_CONFIG))

        # 2.2.2 copy over client custom files
        client_custom_path = os.path.join(client_path, "custom")
        if not os.path.exists(client_custom_path):
            os.makedirs(client_custom_path, exist_ok=True)
        shutil.copytree(base_job_custom, client_custom_path, dirs_exist_ok=True)

    return job
        

In [None]:
from nvflare.private.fed.app.simulator.simulator_runner import SimulatorRunner  

for fold_idx in range(current_experiment.K):
    current_experiment.set_fold(fold_idx=fold_idx)
    
    # use a validation dataset
    current_experiment.set_validation_dataset()

    # generate data for several site configurations
    # each configuration is a json, so there is no duplication of underlying data
    site_configs = [1, 2, 3, 4, 5, 6, 7, 8]
    site_prefixes = ["site-"] * len(site_configs)
    split_methods = ["uniform"] * len(site_configs)

    for i in range(len(site_configs)):
        num_clients, site_prefix, split_method = site_configs[i], site_prefixes[i], split_methods[i]
        
        client_dataframes = current_experiment.get_stratified_client_subsets(
            dataset=current_experiment.training_dataset,
            num_clients=num_clients,
            method=split_method
        )
        
        # translate data frames into client splits & write the data
        client_splits = []
        training_data_paths = []
        validaton_data_paths = []
        for i, df in enumerate(client_dataframes):
            start, end = 0, len(df)
            client_splits.append((start, end))
            
            training_subset_path = os.path.join(current_experiment.experiment_path, f'client_training_{i}_stratified.h5')
            df.to_hdf(training_subset_path, key='df', mode='w')
            training_data_paths.append(training_subset_path)

            validation_subset_path = os.path.join(current_experiment.experiment_path, f'client_validation_{i}_stratified.h5')
            df.to_hdf(validation_subset_path, key='df', mode='w')
            validaton_data_paths.append(validation_subset_path)


        def site_naming_fn(site_index):
            """Used for naming files in the client data split json"""
            return f"{site_prefix}{site_index + 1}"

        def to_subset_id(site_index: int):
            """Name client data subsets in a human readable fashion. site_number is 1 indexed"""
            return f'{split_method}_sid_{site_index}_of_{num_clients}.json'

        filenames, client_jsons = current_experiment.nvflare_multi_site_split_json(
            data_source_path=training_data_paths, 
            validation_data_source_path=validaton_data_paths,
            client_splits=client_splits,
            site_naming_fn=site_naming_fn,
            site_config_naming_fn=to_subset_id,
        )

        for filename, client_json in zip(filenames, client_jsons):
            print(filename, client_json)
            write_json(client_json, os.path.join(current_experiment.experiment_path, filename))


        # 2. define the simulation job

        # hyper-parem defaults from tutorial
        local_subsample=1
        lr_mode="uniform"

        def get_job_name(local_subsample, lr_mode):
            """
            The unique id for this experiment in the context of NVFlare
            Args:
                local_subsample: Local random forest subsample rate https://github.com/NVIDIA/NVFlare/blame/e217679c4de035564a6ed9c2e2658197b0c8e701/examples/advanced/random_forest/utils/prepare_job_config.py#L38
                lr_mode: Whether to use uniform or scaled shrinkage
            """
            local_subsample = int(local_subsample * 100)
            return f"xgbbase_bagging_{site_prefix}_{num_clients}_sites_{local_subsample}_ls_{split_method}_sm_{lr_mode}_lr"

        job_name = get_job_name(local_subsample, lr_mode)

        job_config = prepare_nvflare_xgbrf_experiment(
            job_name=job_name,
            site_prefix=site_prefix,
            num_clients=num_clients,
            split_method=split_method,
            site_config_naming_fn=to_subset_id, # used for getting the config file for the sites
            local_subsample=local_subsample,
            lr_mode=lr_mode
        )

        workspace_path = f"/tmp/nvflare/workspaces/{job_name}"

        print(job_config["_path"])

        simulator = SimulatorRunner(
            job_folder=job_config["_path"],
            workspace=workspace_path,
            n_clients=num_clients,
            threads=num_clients
        )
        run_status = simulator.run()
        print("Simulator finished with run_status", run_status)


        # validate simulation
        args = {}
        model_path = os.path.join(workspace_path, "simulate_job", SERVER_VAL, BASE_MODEL_SAVE_NAME)
        num_trees = 100 # hyper-param?
        param = {}
        param["objective"] = "binary:logistic"
        param["eta"] = 0.1
        param["max_depth"] = 8
        param["eval_metric"] = "auc"
        param["nthread"] = 16
        param["num_parallel_tree"] = num_trees


        validation_results = []
        for name, dataset in current_experiment.get_combined_test_dataset():
            
            X, y = current_experiment.as_features_labels(dataset, current_experiment.LABEL_COL)

            dmat = xgb.DMatrix(X, label=y)

            # validate model performance
            bst = xgb.Booster(param, model_file=model_path)
            y_pred = bst.predict(dmat)
            y_pred = 1*(y_pred > 0.5)

            current_experiment.add_to_kfold_table(
                algorithm_name='Federated XGB Base Classifier (Bagging)', 
                num_clients=num_clients, 
                split_method=split_method,
                val_name=name,
                y_true=y, 
                y_pred=y_pred,
            )

    
print(current_experiment.experiment_results)
        

In [None]:
internal_only = current_experiment.kfold_table[current_experiment.kfold_table['val_name'] == 'internal_validation'].groupby(current_experiment.metadata_column_names)
display(internal_only.mean())
exteral_only = current_experiment.kfold_table[current_experiment.kfold_table['val_name'] == 'external_validation'].groupby(current_experiment.metadata_column_names)
display(exteral_only.mean())
current_experiment.write_results('/Users/benjamindanek/Code/federated_learning_multi_modality_ancestry/multi_modality_fl/results/dataframes')

In [None]:
write_json(current_experiment.k_fold_results_to_stats(), '/Users/benjamindanek/Code/federated_learning_multi_modality_ancestry/multi_modality_fl/results/site_configurations/federated_random_forest_xgboost')