# Run PyTorchJob From Function

In this Notebook we are going to create [Kubeflow PyTorchJob](https://www.kubeflow.org/docs/components/training/pytorch/).

The PyTorchJob will run distributive training using [DistributedDataParallel strategy](https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html).

> **Note**: Kubeflow Trainer v2 is currently released. Some modifications are needed with v2 api. (New CR and new python API)

ref:[Train-CNN-with-FashionMNIST.ipynb](https://github.com/kubeflow/trainer/blob/release-1.9/examples/pytorch/image-classification/Train-CNN-with-FashionMNIST.ipynb)

## Install Kubeflow Python SDKs

You need to install PyTorch packages and Kubeflow SDKs to run this Notebook.

In [3]:
# !pip install torch==2.1.2
!pip install -U kubeflow-training
!pip install tensorboard
!pip install numpy --upgrade

Collecting kubeflow-training
  Using cached kubeflow_training-1.9.3-py3-none-any.whl.metadata (1.8 kB)
Collecting retrying>=1.3.3 (from kubeflow-training)
  Using cached retrying-1.4.2-py3-none-any.whl.metadata (5.5 kB)
Using cached kubeflow_training-1.9.3-py3-none-any.whl (113 kB)
Using cached retrying-1.4.2-py3-none-any.whl (10 kB)
Installing collected packages: retrying, kubeflow-training
Successfully installed kubeflow-training-1.9.3 retrying-1.4.2
Collecting tensorboard
  Using cached tensorboard-2.20.0-py3-none-any.whl.metadata (1.8 kB)
Collecting absl-py>=0.4 (from tensorboard)
  Using cached absl_py-2.3.1-py3-none-any.whl.metadata (3.3 kB)
Collecting grpcio>=1.48.2 (from tensorboard)
  Using cached grpcio-1.74.0-cp311-cp311-manylinux_2_17_aarch64.whl.metadata (3.8 kB)
Collecting markdown>=2.6.8 (from tensorboard)
  Using cached markdown-3.8.2-py3-none-any.whl.metadata (5.1 kB)
Collecting tensorboard-data-server<0.8.0,>=0.7.0 (from tensorboard)
  Using cached tensorboard_data_se

## Create Train Script for CTR prediction Model

- Model: [DCNv2](https://arxiv.org/abs/2008.13535)
- Dataset: [TaobaoAd_x1](https://github.com/reczoo/Datasets/tree/main/Taobao/TaobaoAd_x1)
    * Please refer to the link for dataset description
    * Due to hardware limit, we sample 1% of `train.csv` as `train_sample.csv` for the model training 

In [6]:
def train_pytorch_model(parameters):
    import logging
    import pandas as pd
    import numpy as np
    import pickle
    import os
    import sys
    
    os.chdir("/home/jovyan")
    sys.path.append("/home/jovyan")
    
    import torch
    import torch.distributed as dist
    import torch.nn.functional as F
    from torch import nn
    from torch.utils.data import DistributedSampler, DataLoader
    from torch.utils.data import Dataset
    from sklearn.preprocessing import StandardScaler, LabelEncoder
    from datetime import datetime

    from model import DCNv2
    from feature_encoder import FeatureEncoder


    logging.basicConfig(
        format="%(asctime)s %(levelname)-8s %(message)s",
        datefmt="%Y-%m-%dT%H:%M:%SZ",
        level=logging.INFO,
    )

    time_str = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")


    # Custom dataset
    class CTRDataset(Dataset):
        def __init__(self, df, feature_encoder, label_col=None, is_train=True):
            self.encoder = feature_encoder
            self.label_col = label_col
    
            if is_train:
                self.encoder.fit(df)
    
            self.dense, self.sparse = self.encoder.transform(df, is_train=is_train)
            self.dense_tensor = torch.tensor(self.dense, dtype=torch.float32)
            self.sparse_tensor = torch.tensor(self.sparse, dtype=torch.long)
    
            if label_col:
                self.labels = torch.tensor(df[label_col].values, dtype=torch.float32).unsqueeze(1)
            else:
                self.labels = None
    
        def __len__(self):
            return len(self.dense_tensor)
    
        def __getitem__(self, idx):
            if self.labels is not None:
                return self.dense_tensor[idx], self.sparse_tensor[idx], self.labels[idx]
            else:
                return self.dense_tensor[idx], self.sparse_tensor[idx]
                

    # IF GPU is available, nccl dist backend is used. Otherwise, gloo dist backend is used.
    if torch.cuda.is_available():
        device = "cuda"
        backend = "nccl"
    else:
        device = "cpu"
        backend = "gloo"
    
    logging.info(f"Using Device: {device}, Backend: {backend}")

    # Setup PyTorch DDP. Distributed environment will be set automatically by Training Operator.
    dist.init_process_group(backend=backend)
    Distributor = torch.nn.parallel.DistributedDataParallel
    local_rank = int(os.getenv("LOCAL_RANK", 0))
    logging.info(
        "Distributed Training for WORLD_SIZE: {}, RANK: {}, LOCAL_RANK: {}".format(
            dist.get_world_size(),
            dist.get_rank(),
            local_rank,
        )
    )

    dense_cols = ["price"]
    sparse_cols = ["userid", "cms_segid", "cms_group_id", "final_gender_code", "age_level", "pvalue_level",
                "shopping_level", "occupation", "new_user_class_level", "adgroup_id", "cate_id",
                "campaign_id", "customer", "brand", "pid", "btag"]
    label_col = "clk"

    # Prepare dataset
    df = pd.read_csv("train_sample.csv")
    encoder = FeatureEncoder(dense_cols, sparse_cols)
    train_set = CTRDataset(df, encoder, label_col=label_col, is_train=True)
    encoder.save("preprocess_metadata.pkl")

    # Every PyTorchJob worker gets distributed sampler of dataset.
    train_loader = torch.utils.data.DataLoader(
        train_set,
        batch_size=128,
        sampler=DistributedSampler(train_set),
    )

    # Attach model to the correct device.
    device = torch.device(f"{device}:{local_rank}")
    sparse_cardinalities = encoder.get_sparse_cardinalities()
    model = DCNv2(
        dense_dim=len(dense_cols),
        sparse_cardinalities=sparse_cardinalities
    ).to(device)
    model = Distributor(model)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    loss_fn = torch.nn.BCELoss()

    # Start Training.
    logging.info(f"Start training for RANK: {dist.get_rank()}. WORLD_SIZE: {dist.get_world_size()}")

    for epoch in range(int(parameters["NUM_EPOCHS"])):
        model.train()
        total_loss = 0
        for batch_idx, (dense, sparse, label) in enumerate(train_loader):
            # Attach tensors to the device.
            dense, sparse, label = dense.to(device), sparse.to(device), label.to(device)

            optimizer.zero_grad()
            pred = model(dense, sparse)
            loss = loss_fn(pred, label)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
            if batch_idx % 10 == 0 and dist.get_rank() == 0:
                logging.info(
                    "Train Epoch: {} [{}/{} ({:.0f}%)]\tloss={:.4f}".format(
                        epoch,
                        batch_idx * len(label),
                        len(train_loader.dataset),
                        100.0 * batch_idx / len(train_loader),
                        loss.item(),
                    )
                )

        
        if dist.get_rank() == 0:
            logging.info(f"Epoch {epoch}, Loss: {total_loss/len(train_loader):.4f}")

    if dist.get_rank() == 0:
        logging.info("Training is finished")
        
        # save weights
        torch.save(model.module.state_dict(), 'model_weights.pth')

        

## Run Training Locally in the Notebook


In [None]:
# Set dist env variables to run the above training locally on the Notebook.
import os

os.environ["RANK"] = "0"
os.environ["LOCAL_RANK"] = "0"
os.environ["WORLD_SIZE"] = "1"
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "1234"

# Train Model locally in the Notebook.
train_pytorch_model({"NUM_EPOCHS": "1"})

## Start Distributive Training with PyTorchJob

Before creating PyTorchJob, you have to create `TrainingClient()`. It uses [Kubernetes Python client](https://github.com/kubernetes-client/python) to communicate with Kubernetes API server. You can set path and context for [the kubeconfig file](https://kubernetes.io/docs/concepts/configuration/organize-cluster-access-kubeconfig/). The default location for the kubeconfig is `~/.kube/config`.

Kubeflow Training Operator automatically set the appropriate env variables (`MASTER_PORT`, `MASTER_ADDR`, `WORLD_SIZE`, `RANK`) for each PyTorchJob container.

PyTorchJob will train model on 3 epochs with 2 PyTorch workers.

In [7]:
from kubeflow.training import TrainingClient, constants

# Start PyTorchJob Training.
pytorchjob_name = "train-pytorch"

# Since we set `job_kind = PyTorchJob` APIs are going to use PyTorchJob as a default Job kind.
training_client = TrainingClient(job_kind=constants.PYTORCHJOB_KIND)

training_client.create_job(
    name=pytorchjob_name,
    train_func=train_pytorch_model,
    parameters={"NUM_EPOCHS": "3"}, # Input parameters for the train function.
    num_workers=2,  # How many PyTorch Nodes will be created.
    num_procs_per_worker=1, # How many procs per node will be used (e.g. number of CPUs/GPUs in a single Node)
    resources_per_worker={},  # "cpu": "0.5"
    base_image="quay.io/jupyter/pytorch-notebook:latest",
    volume_mounts=[
        {
            "mountPath": "/home/jovyan", 
            "name": "model-volume"
        }
    ],
    volumes=[
        {
            "name": "model-volume",
            "persistentVolumeClaim": {
                "claimName": "torch-workspace"
            }
        }
    ]
)

<!-- ### Check the PyTorchJob Status -->

Use `TrainingClient()` APIs to get information about created PyTorchJob.

In [22]:
print(f"PyTorchJob Status: {training_client.is_job_running(name=pytorchjob_name)}")

PyTorchJob Status: True


### Get PyTorchJob Pod Names

Since we used 3 workers, PyTorchJob will create 1 master pod and 2 worker pods to execute distributed training.

In [23]:
training_client.get_job_pod_names(pytorchjob_name)

['train-pytorch-master-0', 'train-pytorch-worker-0']

### Get PyTorchJob Training Logs

We can get the logs from the master pod.

Every worker processes 20000 data samples on each epoch since we distribute 60000 samples across 3 workers.

In [25]:
logs, _ = training_client.get_job_logs(pytorchjob_name)

print(logs["train-pytorch-master-0"])

2025-08-18T15:25:36Z INFO     Using Device: cpu, Backend: gloo
2025-08-18T15:25:36Z INFO     Distributed Training for WORLD_SIZE: 2, RANK: 0, LOCAL_RANK: 0
[Gloo] Rank 0 is connected to 1 peer ranks. Expected number of connected peer ranks is : 1



## Delete PyTorchJob

When PyTorchJob is finished, you can delete the resource.

In [8]:
training_client.delete_job(pytorchjob_name)