Skip to content

Commit

Permalink
[App] Update multi-node examples (#15700)
Browse files Browse the repository at this point in the history
Co-authored-by: Jirka Borovec <6035284+Borda@users.noreply.github.com>
Co-authored-by: Carlos Mocholí <carlossmocholi@gmail.com>
  • Loading branch information
3 people committed Nov 21, 2022
1 parent 08d14ec commit 8306797
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 49 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci-app-tests.yml
Expand Up @@ -11,7 +11,7 @@ on:
- ".github/workflows/ci-app-tests.yml"
- "src/lightning_app/**"
- "tests/tests_app/**"
- "examples/app_*" # some tests_app tests call examples files
- "examples/app_*/**" # some tests_app tests call examples files
- "requirements/app/**"
- "setup.py"
- ".actions/**"
Expand Down
Expand Up @@ -5,8 +5,7 @@


class LightningTrainerDistributed(L.LightningWork):
@staticmethod
def run():
def run(self):
model = BoringModel()
trainer = L.Trainer(max_epochs=10, strategy="ddp")
trainer.fit(model)
Expand Down
Expand Up @@ -22,8 +22,7 @@ def distributed_train(local_rank: int, main_address: str, main_port: int, num_no
# 2. PREPARE DISTRIBUTED MODEL
model = torch.nn.Linear(32, 2)
device = torch.device(f"cuda:{local_rank}") if torch.cuda.is_available() else torch.device("cpu")
device_ids = device if torch.cuda.is_available() else None
model = DistributedDataParallel(model, device_ids=device_ids).to(device)
model = DistributedDataParallel(model, device_ids=[local_rank]).to(device)

# 3. SETUP LOSS AND OPTIMIZER
criterion = torch.nn.MSELoss()
Expand Down
2 changes: 1 addition & 1 deletion docs/source-app/levels/basic/hero_components.rst
@@ -1,7 +1,7 @@
.. lit_tabs::
:titles: Hello world; Hello GPU world; PyTorch & ⚡⚡⚡ Trainer (1+ cloud GPUs); Train PyTorch (cloud GPU); Train PyTorch (32 cloud GPUs); Deploy a model on cloud GPUs; Run a model script; XGBoost; Streamlit demo
:code_files: /levels/basic/hello_components/hello_world.py; /levels/basic/hello_components/hello_world_gpu.py; /levels/basic/hello_components/pl_multinode.py; /levels/basic/hello_components/train_pytorch.py; /levels/basic/hello_components/pt_multinode.py; /levels/basic/hello_components/deploy_model.py; /levels/basic/hello_components/run_ptl_script.py; /levels/basic/hello_components/xgboost.py; /levels/basic/hello_components/streamlit_demo.py
:highlights: 7; 10, 11; 10-12, 17, 18; 4, 8, 12, 18-19, 26; 5, 10, 22, 28, 32, 42, 58-60; 3, 11-12, 25, 29; 7, 10; 15, 21; 9, 15, 24
:highlights: 7; 10, 11; 9-11, 16, 17; 4, 8, 12, 18-19, 26; 5, 10, 22, 27, 31, 41, 57-59; 3, 11-12, 25, 29; 7, 10; 15, 21; 9, 15, 24
:enable_run: true
:tab_rows: 3
:height: 620px
Expand Up @@ -26,7 +26,7 @@ or cloud GPUs without code changes.
.. lit_tabs::
:descriptions: import Lightning; We're using a demo LightningModule; Move your training code here (usually your main.py); Pass your component to the multi-node executor (it works on CPU or single GPUs also); Select the number of machines (nodes). Here we choose 2.; Choose from over 15+ machine types. This one has 4 v100 GPUs.; Initialize the App object that executes the component logic.
:code_files: /levels/basic/hello_components/pl_multinode.py; /levels/basic/hello_components/pl_multinode.py; /levels/basic/hello_components/pl_multinode.py; /levels/basic/hello_components/pl_multinode.py; /levels/basic/hello_components/pl_multinode.py; /levels/basic/hello_components/pl_multinode.py; /levels/basic/hello_components/pl_multinode.py;
:highlights: 2; 4; 10-12; 15-18; 17; 18; 20
:highlights: 2; 4; 9-11; 14-17; 16; 17; 19
:enable_run: true
:tab_rows: 5
:height: 420px
Expand Down
19 changes: 8 additions & 11 deletions examples/app_multi_node/train_lt.py
@@ -1,3 +1,4 @@
# app.py
import lightning as L
from lightning.app.components import LightningTrainerMultiNode
from lightning.pytorch.demos.boring_classes import BoringModel
Expand All @@ -6,18 +7,14 @@
class LightningTrainerDistributed(L.LightningWork):
def run(self):
model = BoringModel()
trainer = L.Trainer(
max_steps=1000,
strategy="ddp",
)
trainer = L.Trainer(max_epochs=10, strategy="ddp")
trainer.fit(model)


# Run over 2 nodes of 4 x V100
app = L.LightningApp(
LightningTrainerMultiNode(
LightningTrainerDistributed,
num_nodes=2,
cloud_compute=L.CloudCompute("gpu-fast-multi"), # 4 x V100
)
# 8 GPU: (2 nodes of 4 x v100)
component = LightningTrainerMultiNode(
LightningTrainerDistributed,
num_nodes=4,
cloud_compute=L.CloudCompute("gpu-fast-multi"), # 4 x v100
)
app = L.LightningApp(component)
54 changes: 23 additions & 31 deletions examples/app_multi_node/train_pytorch.py
@@ -1,3 +1,5 @@
# app.py
# ! pip install torch
import torch
from torch.nn.parallel.distributed import DistributedDataParallel

Expand All @@ -6,7 +8,7 @@


def distributed_train(local_rank: int, main_address: str, main_port: int, num_nodes: int, node_rank: int, nprocs: int):
# 1. Setting distributed environment
# 1. SET UP DISTRIBUTED ENVIRONMENT
global_rank = local_rank + node_rank * nprocs
world_size = num_nodes * nprocs

Expand All @@ -18,52 +20,42 @@ def distributed_train(local_rank: int, main_address: str, main_port: int, num_no
init_method=f"tcp://{main_address}:{main_port}",
)

# 2. Prepare the model
model = torch.nn.Sequential(
torch.nn.Linear(1, 1),
torch.nn.ReLU(),
torch.nn.Linear(1, 1),
)

# 3. Setup distributed training
# 2. PREPARE DISTRIBUTED MODEL
model = torch.nn.Linear(32, 2)
device = torch.device(f"cuda:{local_rank}") if torch.cuda.is_available() else torch.device("cpu")
model = DistributedDataParallel(model.to(device), device_ids=[local_rank] if torch.cuda.is_available() else None)
model = DistributedDataParallel(model, device_ids=[local_rank]).to(device)

# 4. Prepare loss and optimizer
# 3. SETUP LOSS AND OPTIMIZER
criterion = torch.nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

# 5. Train the model for 1000 steps.
for step in range(1000):
# 4.TRAIN THE MODEL FOR 50 STEPS
for step in range(50):
model.zero_grad()
x = torch.tensor([0.8]).to(device)
target = torch.tensor([1.0]).to(device)
x = torch.randn(64, 32).to(device)
output = model(x)
loss = criterion(output, target)
loss = criterion(output, torch.ones_like(output))
print(f"global_rank: {global_rank} step: {step} loss: {loss}")
loss.backward()
optimizer.step()

# 5. VERIFY ALL COPIES OF THE MODEL HAVE THE SAME WEIGTHS AT END OF TRAINING
weight = model.module.weight.clone()
torch.distributed.all_reduce(weight)
assert torch.equal(model.module.weight, weight / world_size)

print("Multi Node Distributed Training Done!")


class PyTorchDistributed(L.LightningWork):
def run(
self,
main_address: str,
main_port: int,
num_nodes: int,
node_rank: int,
):
def run(self, main_address: str, main_port: int, num_nodes: int, node_rank: int):
nprocs = torch.cuda.device_count() if torch.cuda.is_available() else 1
torch.multiprocessing.spawn(
distributed_train, args=(main_address, main_port, num_nodes, node_rank, nprocs), nprocs=nprocs
)


# Run over 2 nodes of 4 x V100
app = L.LightningApp(
MultiNode(
PyTorchDistributed,
num_nodes=2,
cloud_compute=L.CloudCompute("gpu-fast-multi"), # 4 x V100
)
)
# 32 GPUs: (8 nodes x 4 v 100)
compute = L.CloudCompute("gpu-fast-multi") # 4xV100
component = MultiNode(PyTorchDistributed, num_nodes=2, cloud_compute=compute)
app = L.LightningApp(component)

0 comments on commit 8306797

Please sign in to comment.