Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MPU rank specifiers for EP & DDP #9362

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions examples/nlp/language_modeling/megatron_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,10 @@ def nemo_export(cfg):
app_state.tensor_model_parallel_size = cfg.tensor_model_parallel_size
app_state.pipeline_model_parallel_size = cfg.pipeline_model_parallel_size
(
app_state.data_parallel_rank,
app_state.tensor_model_parallel_rank,
app_state.pipeline_model_parallel_rank,
app_state.expert_model_parallel_rank,
app_state.model_parallel_size,
app_state.data_parallel_size,
app_state.pipeline_model_parallel_split_rank,
Expand Down
10 changes: 8 additions & 2 deletions examples/nlp/language_modeling/megatron_gpt_continue_training.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,10 @@ def load_from_checkpoint_dir(cls, cfg, trainer, modify_confg_fn):
app_state.tensor_model_parallel_size = cfg.model.tensor_model_parallel_size
app_state.pipeline_model_parallel_size = cfg.model.pipeline_model_parallel_size
(
app_state.data_parallel_rank,
app_state.tensor_model_parallel_rank,
app_state.pipeline_model_parallel_rank,
app_state.expert_model_parallel_rank,
app_state.model_parallel_size,
app_state.data_parallel_size,
app_state.pipeline_model_parallel_split_rank,
Expand All @@ -115,7 +117,11 @@ def load_from_checkpoint_dir(cls, cfg, trainer, modify_confg_fn):
gpt_cfg = modify_confg_fn(hparams_file.cfg, cfg, add_cfg_to_tree=True)
with tempfile.NamedTemporaryFile(suffix='.yaml') as f:
OmegaConf.save(config=gpt_cfg, f=f.name)
model = cls.load_from_checkpoint(checkpoint_path=checkpoint_path, trainer=trainer, hparams_file=f.name,)
model = cls.load_from_checkpoint(
checkpoint_path=checkpoint_path,
trainer=trainer,
hparams_file=f.name,
)
return model


Expand Down Expand Up @@ -145,7 +151,7 @@ def main(cfg) -> None:
scaler = None
if cfg.trainer.precision in [16, '16', '16-mixed']:
scaler = GradScaler(
init_scale=cfg.model.get('native_amp_init_scale', 2 ** 32),
init_scale=cfg.model.get('native_amp_init_scale', 2**32),
growth_interval=cfg.model.get('native_amp_growth_interval', 1000),
hysteresis=cfg.model.get('hysteresis', 2),
)
Expand Down
1 change: 1 addition & 0 deletions examples/nlp/language_modeling/megatron_gpt_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ def main(cfg) -> None:
app_state.pipeline_model_parallel_size = cfg.pipeline_model_parallel_size
app_state.expert_model_parallel_size = cfg.get('expert_model_parallel_size', 1)
(
app_state.data_parallel_rank,
app_state.tensor_model_parallel_rank,
app_state.pipeline_model_parallel_rank,
app_state.expert_model_parallel_rank,
Expand Down
6 changes: 5 additions & 1 deletion examples/nlp/language_modeling/megatron_gpt_validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,10 @@ def main(cfg) -> None:
app_state.pipeline_model_parallel_size = cfg.pipeline_model_parallel_size
app_state.virtual_pipeline_model_parallel_size = cfg.virtual_pipeline_model_parallel_size
(
app_state.data_parallel_rank,
app_state.tensor_model_parallel_rank,
app_state.pipeline_model_parallel_rank,
app_state.expert_model_parallel_rank,
app_state.model_parallel_size,
app_state.data_parallel_size,
app_state.pipeline_model_parallel_split_rank,
Expand All @@ -140,7 +142,9 @@ def main(cfg) -> None:
with tempfile.NamedTemporaryFile(suffix='.yaml') as f:
OmegaConf.save(config=pretrained_cfg, f=f.name)
model = MegatronGPTModel.load_from_checkpoint(
checkpoint_path=checkpoint_path, trainer=trainer, hparams_file=f.name,
checkpoint_path=checkpoint_path,
trainer=trainer,
hparams_file=f.name,
)
else:
raise ValueError("need at least a nemo file or checkpoint dir")
Expand Down
6 changes: 5 additions & 1 deletion examples/nlp/language_modeling/megatron_retro_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ def __init__(self, sentences, neighbors):
self.sentences = sentences
self.neighbors = neighbors

def __len__(self,):
def __len__(
self,
):
return len(self.sentences)

def __getitem__(self, idx):
Expand All @@ -84,8 +86,10 @@ def main(cfg) -> None:
app_state.tensor_model_parallel_size = cfg.tensor_model_parallel_size
app_state.pipeline_model_parallel_size = cfg.pipeline_model_parallel_size
(
app_state.data_parallel_rank,
app_state.tensor_model_parallel_rank,
app_state.pipeline_model_parallel_rank,
app_state.expert_model_parallel_rank,
app_state.model_parallel_size,
app_state.data_parallel_size,
app_state.pipeline_model_parallel_split_rank,
Expand Down
6 changes: 5 additions & 1 deletion examples/nlp/language_modeling/megatron_retro_qatask_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ def __init__(self, sentences, neighbors):
self.sentences = sentences
self.neighbors = neighbors

def __len__(self,):
def __len__(
self,
):
return len(self.sentences)

def __getitem__(self, idx):
Expand Down Expand Up @@ -116,8 +118,10 @@ def main(cfg) -> None:
app_state.tensor_model_parallel_size = cfg.tensor_model_parallel_size
app_state.pipeline_model_parallel_size = cfg.pipeline_model_parallel_size
(
app_state.data_parallel_rank,
app_state.tensor_model_parallel_rank,
app_state.pipeline_model_parallel_rank,
app_state.expert_model_parallel_rank,
app_state.model_parallel_size,
app_state.data_parallel_size,
app_state.pipeline_model_parallel_split_rank,
Expand Down
17 changes: 14 additions & 3 deletions examples/nlp/language_modeling/megatron_t5_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,22 @@ def main():
"--tokens_to_generate", type=int, default="16", required=False, help="How many tokens to add to prompt"
)
parser.add_argument(
"--tensor_model_parallel_size", type=int, default=-1, required=False,
"--tensor_model_parallel_size",
type=int,
default=-1,
required=False,
)
parser.add_argument(
"--pipeline_model_parallel_size", type=int, default=-1, required=False,
"--pipeline_model_parallel_size",
type=int,
default=-1,
required=False,
)
parser.add_argument(
"--pipeline_model_parallel_split_rank", type=int, default=-1, required=False,
"--pipeline_model_parallel_split_rank",
type=int,
default=-1,
required=False,
)
parser.add_argument("--precision", default="16", type=str, help="PyTorch Lightning Trainer precision flag")
parser.add_argument("--decoder_starts_with_pad", action="store_true", help="Decoder starts with pad token")
Expand Down Expand Up @@ -89,8 +98,10 @@ def main():
if args.tensor_model_parallel_size > 1 or args.pipeline_model_parallel_size > 1:
app_state.model_parallel_size = args.tensor_model_parallel_size * args.pipeline_model_parallel_size
(
app_state.data_parallel_rank,
app_state.tensor_model_parallel_rank,
app_state.pipeline_model_parallel_rank,
app_state.expert_model_parallel_rank,
app_state.model_parallel_size,
app_state.data_parallel_size,
app_state.pipeline_model_parallel_split_rank,
Expand Down
10 changes: 8 additions & 2 deletions examples/nlp/language_modeling/megatron_t5_seq2seq_finetune.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,10 @@ def load_from_checkpoint_dir(cls, cfg, trainer, modify_confg_fn):
app_state.tensor_model_parallel_size = cfg.model.tensor_model_parallel_size
app_state.pipeline_model_parallel_size = cfg.model.pipeline_model_parallel_size
(
app_state.data_parallel_rank,
app_state.tensor_model_parallel_rank,
app_state.pipeline_model_parallel_rank,
app_state.expert_model_parallel_rank,
app_state.model_parallel_size,
app_state.data_parallel_size,
app_state.pipeline_model_parallel_split_rank,
Expand All @@ -133,7 +135,11 @@ def load_from_checkpoint_dir(cls, cfg, trainer, modify_confg_fn):
t5_cfg = modify_confg_fn(hparams_file.cfg, cfg, add_cfg_to_tree=True)
with tempfile.NamedTemporaryFile(suffix='.yaml') as f:
OmegaConf.save(config=t5_cfg, f=f.name)
model = cls.load_from_checkpoint(checkpoint_path=checkpoint_path, trainer=trainer, hparams_file=f.name,)
model = cls.load_from_checkpoint(
checkpoint_path=checkpoint_path,
trainer=trainer,
hparams_file=f.name,
)
return model


Expand Down Expand Up @@ -162,7 +168,7 @@ def main(cfg) -> None:
scaler = None
if cfg.trainer.precision in [16, '16', '16-mixed']:
scaler = GradScaler(
init_scale=cfg.model.get('native_amp_init_scale', 2 ** 32),
init_scale=cfg.model.get('native_amp_init_scale', 2**32),
growth_interval=cfg.model.get('native_amp_growth_interval', 1000),
hysteresis=cfg.model.get('hysteresis', 2),
)
Expand Down
12 changes: 10 additions & 2 deletions examples/nlp/machine_translation/nmt_transformer_infer_megatron.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ def main(cfg) -> None:
app_state = AppState()
app_state.model_parallel_size = cfg.tensor_model_parallel_size * cfg.pipeline_model_parallel_size
(
app_state.data_parallel_rank,
app_state.tensor_model_parallel_rank,
app_state.pipeline_model_parallel_rank,
app_state.expert_model_parallel_rank,
app_state.model_parallel_size,
app_state.data_parallel_size,
app_state.pipeline_model_parallel_split_rank,
Expand Down Expand Up @@ -101,13 +103,19 @@ def main(cfg) -> None:
src_text.append(line.strip())
if len(src_text) == cfg.batch_size:
translations = model.translate(
text=src_text, source_lang=cfg.source_lang, target_lang=cfg.target_lang,
text=src_text,
source_lang=cfg.source_lang,
target_lang=cfg.target_lang,
)
for translation in translations:
tgt_f.write(translation + "\n")
src_text = []
if len(src_text) > 0:
translations = model.translate(text=src_text, source_lang=cfg.source_lang, target_lang=cfg.target_lang,)
translations = model.translate(
text=src_text,
source_lang=cfg.source_lang,
target_lang=cfg.target_lang,
)
for translation in translations:
tgt_f.write(translation + "\n")

Expand Down
1 change: 1 addition & 0 deletions nemo/collections/multimodal/parts/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ def create_neva_model_and_processor(cfg):
app_state.tensor_model_parallel_size = cfg.tensor_model_parallel_size
app_state.pipeline_model_parallel_size = cfg.pipeline_model_parallel_size
(
_,
app_state.tensor_model_parallel_rank,
app_state.pipeline_model_parallel_rank,
app_state.expert_model_parallel_rank,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,8 +532,6 @@ def setup_mcore_distributed_parallel(self):
config,
ddp_config,
model_chunk,
data_parallel_group=parallel_state.get_data_parallel_group(with_context_parallel=True),
expert_data_parallel_group=parallel_state.get_data_modulo_expert_parallel_group(),
# Turn off bucketing for model_chunk 2 onwards, since communication for these
# model chunks is overlapped with compute anyway.
disable_bucketing=(model_chunk_idx > 0),
Expand Down
7 changes: 7 additions & 0 deletions nemo/collections/nlp/modules/common/megatron/megatron_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
from megatron.core.parallel_state import (
RankGenerator,
get_pipeline_model_parallel_rank,
set_data_parallel_rank,
set_data_parallel_world_size,
set_expert_model_parallel_rank,
set_expert_model_parallel_world_size,
set_pipeline_model_parallel_rank,
Expand Down Expand Up @@ -96,6 +98,7 @@ def initialize_model_parallel_for_nemo(
app_state.use_fp8 = use_fp8
app_state.init_mpi_proc_group = init_mpi_proc_group
(
app_state.data_parallel_rank,
app_state.tensor_model_parallel_rank,
app_state.pipeline_model_parallel_rank,
app_state.expert_model_parallel_rank,
Expand All @@ -122,6 +125,9 @@ def initialize_model_parallel_for_nemo(
set_expert_model_parallel_world_size(app_state.expert_model_parallel_size)
set_expert_model_parallel_rank(app_state.expert_model_parallel_rank)

set_data_parallel_world_size(app_state.data_parallel_size)
set_data_parallel_rank(app_state.data_parallel_rank)

set_pipeline_model_parallel_rank(app_state.pipeline_model_parallel_rank)
if HAVE_INTERLEAVED:
set_virtual_pipeline_model_parallel_world_size(app_state.virtual_pipeline_model_parallel_size)
Expand Down Expand Up @@ -354,6 +360,7 @@ def fake_initialize_model_parallel(
logging.info(f'Rank {rank} has embedding rank: {embedding_rank}')

return (
data_parallel_rank,
tensor_model_parallel_rank,
pipeline_model_parallel_rank,
expert_model_parallel_rank,
Expand Down
2 changes: 1 addition & 1 deletion nemo/collections/nlp/modules/common/tokenizer_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def get_tokenizer(
use_fast: (only for HuggingFace AutoTokenizer) set to True to use fast HuggingFace tokenizer
bpe_dropout: (experimental) BPE dropout tries to corrupt the standard segmentation
procedure of BPE to help
model better learn word compositionality and become robust to segmentation errors.
model better learn word compositionality and become robust to segmentation errors.
It has emperically been shown to improve inference time BLEU scores.
"""
if special_tokens is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ def main(cfg) -> None:
if cfg.tensor_model_parallel_size > 1 or cfg.pipeline_model_parallel_size > 1:
app_state.model_parallel_size = cfg.tensor_model_parallel_size * cfg.pipeline_model_parallel_size
(
app_state.data_parallel_rank,
app_state.tensor_model_parallel_rank,
app_state.pipeline_model_parallel_rank,
app_state.expert_model_parallel_rank,
app_state.model_parallel_size,
app_state.data_parallel_size,
app_state.pipeline_model_parallel_split_rank,
Expand Down
18 changes: 14 additions & 4 deletions scripts/nlp_language_modeling/merge_lora_weights/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ def __init__(self, sentences):
super().__init__()
self.sentences = sentences

def __len__(self,):
def __len__(
self,
):
return len(self.sentences)

def __getitem__(self, idx):
Expand Down Expand Up @@ -131,9 +133,12 @@ def fix_for_O2(state_dict):


def merge(
base_model_state_dict: Dict[str, Any], lora_state_dicts: List[Dict], num_layers: int, mcore: bool,
base_model_state_dict: Dict[str, Any],
lora_state_dicts: List[Dict],
num_layers: int,
mcore: bool,
):
"""
"""
Iterate through all the feedforward weights in all the layers.
Collect the corresponding lora weights for each layer and across tp ranks.
Computes the "full rank" weight from the two low-rank weights and add it to the feedforward weight.
Expand Down Expand Up @@ -213,7 +218,9 @@ def main(cfg) -> None:

if cfg.tensor_model_parallel_size < 0 or cfg.pipeline_model_parallel_size < 0:
model_config = MegatronGPTModel.restore_from(
restore_path=cfg.gpt_model_file, trainer=trainer, return_config=True,
restore_path=cfg.gpt_model_file,
trainer=trainer,
return_config=True,
)

with open_dict(cfg):
Expand Down Expand Up @@ -252,10 +259,13 @@ def main(cfg) -> None:
app_state.tensor_model_parallel_size = cfg.tensor_model_parallel_size
app_state.pipeline_model_parallel_size = cfg.pipeline_model_parallel_size
(
app_state.data_parallel_rank,
app_state.tensor_model_parallel_rank,
app_state.pipeline_model_parallel_rank,
app_state.expert_model_parallel_rank,
app_state.model_parallel_size,
app_state.data_parallel_size,
app_state.pipeline_model_parallel_split_rank,
app_state.virtual_pipeline_model_parallel_rank,
) = fake_initialize_model_parallel(
world_size=app_state.model_parallel_size,
Expand Down
1 change: 1 addition & 0 deletions tests/collections/nlp/test_initialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def test_fake_initialize(nodes, num_gpu, tp, pp, cp, ep):
) = old_fake_initialize_model_parallel(nodes * num_gpu, 0, tp, pp, None, None, ep, cp)

(
_,
m_tensor_model_parallel_rank,
n_pipeline_model_parallel_rank,
n_expert_model_parallel_rank,
Expand Down
Loading