Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhanced monitor with system metrics logging and wandb support #90

Merged
merged 14 commits into from May 24, 2022
Merged
Expand Up @@ -39,8 +39,12 @@ RUN conda install -y numpy=1.21.2 scikit-learn=1.0.2 scipy=1.7.3 pandas=1.4.1 -c
RUN conda install -y pytorch=1.10.1 torchvision=0.11.2 torchaudio=0.10.1 cudatoolkit=11.3 -c pytorch -c conda-forge \
&& conda clean -a -y

# torch helper package
RUN conda install -y fvcore iopath -c fvcore -c iopath -c conda-forge \
&& conda clean -a -y

# auxiliaries (communications, monitoring, etc.)
RUN conda install -y wandb tensorboard tensorboardX -c conda-forge \
RUN conda install -y wandb tensorboard tensorboardX pympler -c conda-forge \
&& pip install grpcio grpcio-tools protobuf==3.19.4 setuptools==61.2.0 \
&& conda clean -a -y

Expand Down
6 changes: 5 additions & 1 deletion enviroment/docker_files/federatedscope-torch1.10.Dockerfile
Expand Up @@ -39,7 +39,11 @@ RUN conda install -y numpy=1.21.2 scikit-learn=1.0.2 scipy=1.7.3 pandas=1.4.1 -c
RUN conda install -y pytorch=1.10.1 torchvision=0.11.2 torchaudio=0.10.1 cudatoolkit=11.3 -c pytorch -c conda-forge \
&& conda clean -a -y

# torch helper package
RUN conda install -y fvcore iopath -c fvcore -c iopath -c conda-forge \
&& conda clean -a -y

# auxiliaries (communications, monitoring, etc.)
RUN conda install -y wandb tensorboard tensorboardX -c conda-forge \
RUN conda install -y wandb tensorboard tensorboardX pympler -c conda-forge \
&& pip install grpcio grpcio-tools protobuf==3.19.4 setuptools==61.2.0 \
&& conda clean -a -y
Expand Up @@ -39,6 +39,10 @@ RUN conda install -y numpy=1.21.2 scikit-learn=1.0.2 scipy=1.7.3 pandas=1.4.1 -c
RUN conda install -y pytorch==1.8.0 torchvision==0.9.0 torchaudio==0.8.0 cudatoolkit=10.2 -c pytorch \
&& conda clean -a -y

# torch helper package
RUN conda install -y fvcore iopath -c fvcore -c iopath -c conda-forge \
&& conda clean -a -y

# for graph
RUN conda install -y pyg==2.0.1 -c pyg \
&& conda install -y rdkit=2021.09.4 -c conda-forge \
Expand All @@ -52,6 +56,6 @@ RUN conda install -y sentencepiece textgrid typeguard -c conda-forge \
&& conda clean -a -y

# auxiliaries (communications, monitoring, etc.)
RUN conda install -y wandb tensorboard tensorboardX -c conda-forge \
RUN conda install -y wandb tensorboard tensorboardX pympler -c conda-forge \
&& pip install grpcio grpcio-tools protobuf==3.19.4 setuptools==61.2.0 \
&& conda clean -a -y
3 changes: 3 additions & 0 deletions enviroment/requirements-torch1.10-application.txt
Expand Up @@ -24,4 +24,7 @@ transformers==4.16.2
tokenizers==0.10.3
torchtext
datasets
fvcore
pympler
iopath

5 changes: 4 additions & 1 deletion enviroment/requirements-torch1.10.txt
Expand Up @@ -13,4 +13,7 @@ tensorboardX
grpcio
grpcio-tools
protobuf==3.19.4
setuptools==61.2.0
setuptools==61.2.0
fvcore
pympler
iopath
3 changes: 3 additions & 0 deletions enviroment/requirements-torch1.8-application.txt
Expand Up @@ -24,5 +24,8 @@ transformers==4.16.2
tokenizers==0.10.3
torchtext
datasets
fvcore
pympler
iopath


15 changes: 10 additions & 5 deletions federatedscope/core/auxiliaries/trainer_builder.py
Expand Up @@ -22,22 +22,25 @@ def get_trainer(model=None,
device=None,
config=None,
only_for_eval=False,
is_attacker=False):
is_attacker=False,
monitor=None):
if config.trainer.type == 'general':
if config.backend == 'torch':
from federatedscope.core.trainers import GeneralTorchTrainer
trainer = GeneralTorchTrainer(model=model,
data=data,
device=device,
config=config,
only_for_eval=only_for_eval)
only_for_eval=only_for_eval,
monitor=monitor)
elif config.backend == 'tensorflow':
from federatedscope.core.trainers.tf_trainer import GeneralTFTrainer
trainer = GeneralTFTrainer(model=model,
data=data,
device=device,
config=config,
only_for_eval=only_for_eval)
only_for_eval=only_for_eval,
monitor=monitor)
else:
raise ValueError
elif config.trainer.type == 'none':
Expand Down Expand Up @@ -70,7 +73,8 @@ def get_trainer(model=None,
data=data,
device=device,
config=config,
only_for_eval=only_for_eval)
only_for_eval=only_for_eval,
monitor=monitor)
else:
# try to find user registered trainer
trainer = None
Expand All @@ -81,7 +85,8 @@ def get_trainer(model=None,
data=data,
device=device,
config=config,
only_for_eval=only_for_eval)
only_for_eval=only_for_eval,
monitor=monitor)
if trainer is None:
raise ValueError('Trainer {} is not provided'.format(
config.trainer.type))
Expand Down
108 changes: 105 additions & 3 deletions federatedscope/core/auxiliaries/utils.py
@@ -1,3 +1,5 @@
import copy
import json
import logging
import os
import random
Expand All @@ -11,6 +13,7 @@
import signal

import numpy as np

# Blind torch
try:
import torch
Expand Down Expand Up @@ -38,7 +41,6 @@ def setup_seed(seed):

def update_logger(cfg, clear_before_add=False):
import os
import sys
import logging

root_logger = logging.getLogger("federatedscope")
Expand All @@ -62,6 +64,8 @@ def update_logger(cfg, clear_before_add=False):
# ================ create outdir to save log, exp_config, models, etc,.
if cfg.outdir == "":
cfg.outdir = os.path.join(os.getcwd(), "exp")
if cfg.expname == "":
cfg.expname = f"{cfg.federate.method}_{cfg.model.type}_on_{cfg.data.type}"
cfg.outdir = os.path.join(cfg.outdir, cfg.expname)

# if exist, make directory with given name and time
Expand All @@ -85,10 +89,37 @@ def update_logger(cfg, clear_before_add=False):
"%(asctime)s (%(module)s:%(lineno)d) %(levelname)s: %(message)s")
fh.setFormatter(logger_formatter)
root_logger.addHandler(fh)
#sys.stderr = sys.stdout
# sys.stderr = sys.stdout

root_logger.info(f"the output dir is {cfg.outdir}")

if cfg.wandb.use:
init_wandb(cfg)


def init_wandb(cfg):
try:
import wandb
except ImportError:
logger.error("cfg.wandb.use=True but not install the wandb package")
exit()
dataset_name = cfg.data.type
method_name = cfg.federate.method
exp_name = cfg.expname

tmp_cfg = copy.deepcopy(cfg)
tmp_cfg.cfg_check_funcs = []
import yaml
cfg_yaml = yaml.safe_load(tmp_cfg.dump())

wandb.init(project=cfg.wandb.name_project,
entity=cfg.wandb.name_user,
config=cfg_yaml,
group=dataset_name,
job_type=method_name,
name=exp_name,
notes=f"{method_name}, {exp_name}")


def get_dataset(type, root, transform, target_transform, download=True):
if isinstance(type, str):
Expand Down Expand Up @@ -159,7 +190,6 @@ def get_random(type, sample_shape, params, device):


def batch_iter(data, batch_size=64, shuffled=True):

assert 'x' in data and 'y' in data
data_x = data['x']
data_y = data['y']
Expand Down Expand Up @@ -265,3 +295,75 @@ def block(self):

def exceed_max_failure(self, num_failure):
return num_failure > self.max_failure


def logfile_2_wandb_dict(exp_log_f, raw_out=True):
"""
parse the logfiles [exp_print.log, eval_results.log] into wandb_dict that contains non-nested dicts

:param exp_log_f: opened exp_log file
:param raw_out: True indicates "exp_print.log", otherwise indicates "eval_results.log",
the difference is whether contains the logger header such as "2022-05-02 16:55:02,843 (client:197) INFO:"

:return: tuple including (all_log_res, exp_stop_normal, last_line, log_res_best)
"""
log_res_best = {}
exp_stop_normal = False
all_log_res = []
last_line = None
for line in exp_log_f:
last_line = line
if " Find new best result" in line:
# e.g.,
# 2022-03-22 10:48:42,562 (server:459) INFO: Find new best result for client_individual.test_acc with value 0.5911787974683544
parse_res = line.split("INFO: ")[1].split("with value")
best_key, best_val = parse_res[-2], parse_res[-1]
# client_individual.test_acc -> client_individual/test_acc
best_key = best_key.replace("Find new best result for",
"").replace(".", "/")
log_res_best[best_key.strip()] = float(best_val.strip())

if "'Role': 'Server #'" in line:
if raw_out:
line = line.split("INFO: ")[1]
res = line.replace("\'", "\"")
res = json.loads(s=res)
if res['Role'] == 'Server #':
cur_round = res['Round']
res.pop('Role')
if cur_round != "Final" and 'Results_raw' in res:
res.pop('Results_raw')

log_res = {}
for key, val in res.items():
if not isinstance(val, dict):
log_res[key] = val
else:
if cur_round != "Final":
for key_inner, val_inner in val.items():
assert not isinstance(
val_inner, dict), "Un-expected log format"
log_res[f"{key}/{key_inner}"] = val_inner

else:
exp_stop_normal = True
if key == "Results_raw":
for final_type, final_type_dict in res[
"Results_raw"].items():
for inner_key, inner_val in final_type_dict.items(
):
log_res_best[
f"{final_type}/{inner_key}"] = inner_val
# log_res_best = {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the commented codes for?

# for best_res_type, val_dict in val.items():
# for key_inner, val_inner in val_dict.items():
# assert not isinstance(val_inner, dict), "Un-expected log format"
# log_res_best[f"{best_res_type}/{key_inner}"] = val_inner
# if log_res_best is not None and "Results_weighted_avg/val_loss" in log_res and \
# log_res_best["client_summarized_weighted_avg/val_loss"] > \
# log_res["Results_weighted_avg/val_loss"]:
# print("Missing the results of last round, update best results")
# for key, val in log_res.items():
# log_res_best[key.replace("Results", "client_summarized")] = val
all_log_res.append(log_res)
return all_log_res, exp_stop_normal, last_line, log_res_best
6 changes: 5 additions & 1 deletion federatedscope/core/communication.py
Expand Up @@ -11,9 +11,10 @@ class StandaloneCommManager(object):
"""
The communicator used for standalone mode
"""
def __init__(self, comm_queue):
def __init__(self, comm_queue, monitor=None):
self.comm_queue = comm_queue
self.neighbors = dict()
self.monitor = monitor # used to track the communication related metrics

def receive(self):
# we don't need receive() in standalone
Expand All @@ -37,6 +38,8 @@ def get_neighbors(self, neighbor_id=None):

def send(self, message):
self.comm_queue.append(message)
download_bytes, upload_bytes = message.count_bytes()
self.monitor.track_upload_bytes(upload_bytes)


class gRPCCommManager(object):
Expand All @@ -60,6 +63,7 @@ def __init__(self, host='0.0.0.0', port='50050', client_num=2):
port=port,
options=options)
self.neighbors = dict()
self.monitor = None # used to track the communication related metrics

def serve(self, max_workers, host, port, options):
"""
Expand Down
2 changes: 0 additions & 2 deletions federatedscope/core/configs/__init__.py
Expand Up @@ -13,7 +13,6 @@
if "config" in all_sub_configs:
all_sub_configs.remove('config')


from federatedscope.core.configs.config import CN, init_global_cfg
__all__ = __all__ + \
[
Expand All @@ -28,4 +27,3 @@
for base_config in base_configs:
all_sub_configs.pop(all_sub_configs.index(base_config))
all_sub_configs.insert(0, base_config)

6 changes: 4 additions & 2 deletions federatedscope/core/configs/cfg_data.py
Expand Up @@ -13,9 +13,11 @@ def extend_data_cfg(cfg):
cfg.data.args = [] # args for external dataset, eg. [{'download': True}]
cfg.data.splitter = ''
cfg.data.splitter_args = [] # args for splitter, eg. [{'alpha': 0.5}]
cfg.data.transform = [] # transform for x, eg. [['ToTensor'], ['Normalize', {'mean': [0.1307], 'std': [0.3081]}]]
cfg.data.transform = [
] # transform for x, eg. [['ToTensor'], ['Normalize', {'mean': [0.1307], 'std': [0.3081]}]]
cfg.data.target_transform = [] # target_transform for y, use as above
cfg.data.pre_transform = [] # pre_transform for `torch_geometric` dataset, use as above
cfg.data.pre_transform = [
] # pre_transform for `torch_geometric` dataset, use as above
cfg.data.batch_size = 64
cfg.data.drop_last = False
cfg.data.sizes = [10, 5]
Expand Down
21 changes: 20 additions & 1 deletion federatedscope/core/configs/config.py
@@ -1,10 +1,13 @@
import copy
import logging
import os

from yacs.config import CfgNode

import federatedscope.register as register

logger = logging.getLogger(__name__)


class CN(CfgNode):
"""
Expand Down Expand Up @@ -83,7 +86,9 @@ def clean_unused_sub_cfgs(self):

def freeze(self):
"""
make the cfg attributes immutable, and save the freezed cfg_check_funcs into "self.outdir/config.yaml" for better reproducibility
1) make the cfg attributes immutable;
2) save the frozen cfg_check_funcs into "self.outdir/config.yaml" for better reproducibility;
3) if self.wandb.use=True, update the frozen config

:return:
"""
Expand All @@ -96,6 +101,20 @@ def freeze(self):
tmp_cfg = copy.deepcopy(self)
tmp_cfg.cfg_check_funcs = []
print(tmp_cfg.dump())
if self.wandb.use:
# update the frozen config
try:
import wandb
except ImportError:
logger.error(
"cfg.wandb.use=True but not install the wandb package")
exit()

import yaml
cfg_yaml = yaml.safe_load(tmp_cfg.dump())
wandb.config.update(cfg_yaml, allow_val_change=True)

logger.info("the used configs are: \n" + str(tmp_cfg))

super(CN, self).freeze()

Expand Down