# Instruction from:

https://www.katacoda.com/orm-chris-fregly/scenarios/01_kubeflow_install


# Background
Deep learning has shown that being able to train large models on vasts amount of data can drastically improve model performance. 


However, consider the problem of training a deep network with millions, or even billions of parameters. How do we achieve this without waiting for days, or even multiple weeks? Dean et al propose a different training paradigm which allows us to train and serve a model on multiple physical machines. The auth|ors propose two novel methodologies to accomplish this, namely, `model parallelism` and `data parallelism`.


## Model Parallelism
When a big model can not fit into a single node's memory, model parallel training can be employed to handle the big model. Model parallelism training has two key features:
1. Each worker task is responsible for estimating different part of the model parameters. So the computation logic in each worker is different from other one else.
2. There is application-level data communication between workers. 

![Model Parallelism](./img/model_parallelism.jpg)


## Data Parallelism

The algorithm distributes the data between various tasks.
1. Each worker task is responsible for estimating different part of the dataset
2. Tasks then exchange their estimate(s) with each other to come up with the right estimate for the step.

![Data Parallelism](./img/data_parallelism.png)



# Distributed Training in Tensorflow 
"Data Parallelism" is the most common training configuration, it involves multiple tasks in a `worker` job training the same model on different mini-batches of data, updating shared parameters hosted in one or more tasks in a `ps` (parameter server) job. All tasks typically run on different machines or containers. There are many ways to specify this structure in TensorFlow, and Tensorflow team are building libraries that will simplify the work of specifying a replicated model. Other platforms like `MXnet`, `Petuum` also have the same abstraction. 

- __In-graph replication__. In this approach, the client builds a single tf.Graph that contains one set of parameters (in tf.Variable nodes pinned to /job:ps); and multiple copies of the compute-intensive part of the model, each pinned to a different task in /job:worker.

- __Between-graph replication__. In this approach, there is a separate client for each /job:worker task, typically in the same process as the worker task. Each client builds a similar graph containing the parameters (pinned to /job:ps as before using tf.train.replica_device_setter to map them deterministically to the same tasks); and a single copy of the compute-intensive part of the model, pinned to the local task in /job:worker.

- __Asynchronous training__. In this approach, each replica of the graph has an independent training loop that executes without coordination. It is compatible with both forms of replication above.

- __Synchronous training__. In this approach, all of the replicas read the same values for the current parameters, compute gradients in parallel, and then apply them together. It is compatible with in-graph replication (e.g. using gradient averaging as in the CIFAR-10 multi-GPU trainer), and between-graph replication (e.g. using the tf.train.SyncReplicasOptimizer).


## Examples

We will introduce two frameworks in the distributed training. Tensorflow and PyTorch

### Tensorflow

#### Check Tensorflow PS Job

In [1]:
!cat ./distributed-training-jobs/distributed-tensorflow-job.yaml

apiVersion: "kubeflow.org/v1"
kind: "TFJob"
metadata:
  name: "distributed-tensorflow-job"
spec:
  tfReplicaSpecs:
    PS:
      replicas: 1 
      restartPolicy: Never
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "false"
        spec:
          containers:
            - name: tensorflow
              image: gcr.io/kubeflow-ci/tf-dist-mnist-test:1.0
    Worker:
      replicas: 2 
      restartPolicy: Never
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "false"
        spec:
          containers:
            - name: tensorflow
              image: gcr.io/kubeflow-ci/tf-dist-mnist-test:1.0


#### Submit TFJob distributed training job

In [2]:
!kubectl create -f distributed-training-jobs/distributed-tensorflow-job.yaml

tfjob.kubeflow.org/distributed-tensorflow-job created


#### Get all TFJobs

In [3]:
!kubectl get tfjob

NAME                         STATE     AGE
distributed-tensorflow-job   Created   33s


#### Check TFJob Status

In [4]:
!kubectl describe tfjob distributed-tensorflow-job

Name:         distributed-tensorflow-job
Namespace:    anonymous
Labels:       <none>
Annotations:  <none>
API Version:  kubeflow.org/v1
Kind:         TFJob
Metadata:
  Creation Timestamp:  2020-05-09T01:47:33Z
  Generation:          1
  Resource Version:    7391
  Self Link:           /apis/kubeflow.org/v1/namespaces/anonymous/tfjobs/distributed-tensorflow-job
  UID:                 0d23c537-9197-11ea-a820-0242ac110012
Spec:
  Tf Replica Specs:
    PS:
      Replicas:        1
      Restart Policy:  Never
      Template:
        Metadata:
          Annotations:
            sidecar.istio.io/inject:  false
        Spec:
          Containers:
            Image:  gcr.io/kubeflow-ci/tf-dist-mnist-test:1.0
            Name:   tensorflow
    Worker:
      Replicas:        2
      Restart Policy:  Never
      Template:
        Metadata:
          Annotations:
            sidecar.istio.io/inject:  false
        Spec:
          Containers:
            Image:  gcr.io/kubeflow-ci/tf-dist-mnist-te

#### Check all the pods created by this TFJob

In [5]:
!kubectl get pod | grep distributed-tensorflow-job

distributed-tensorflow-job-ps-0       1/1     Running   0          64s
distributed-tensorflow-job-worker-0   1/1     Running   0          64s
distributed-tensorflow-job-worker-1   1/1     Running   0          63s


#### Check logs of one worker pod
Re-run the following cell periodically to see the logs.

In [6]:
!kubectl logs distributed-tensorflow-job-worker-0

  from ._conv import register_converters as _register_converters
2020-05-09 01:48:40.915473: I tensorflow/core/platform/cpu_feature_guard.cc:137] Your CPU supports instructions that this TensorFlow binary was not compiled to use: SSE4.1 SSE4.2 AVX AVX2 FMA
2020-05-09 01:48:40.934830: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job ps -> {0 -> distributed-tensorflow-job-ps-0.anonymous.svc:2222}
2020-05-09 01:48:40.935041: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job worker -> {0 -> localhost:2222, 1 -> distributed-tensorflow-job-worker-1.anonymous.svc:2222}
2020-05-09 01:48:40.935572: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:324] Started server with target: grpc://localhost:2222
Instructions for updating:
Please switch to tf.train.MonitoredTrainingSession
2020-05-09 01:48:41.722287: I tensorflow/core/distributed_runtime/master_session.cc:1017] Start master 

### PyTorch

In [7]:
!cat ./distributed-training-jobs/distributed-pytorch-job.yaml

apiVersion: "kubeflow.org/v1"
kind: "PyTorchJob"
metadata:
  name: "distributed-pytorch-job"
spec:
  pytorchReplicaSpecs:
    Master:
      replicas: 1
      restartPolicy: OnFailure
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "false"
        spec:
          containers:
            - name: pytorch
              image: gcr.io/kubeflow-ci/pytorch-dist-mnist_test:1.0
              args: ["--backend", "gloo"]
              # Comment out the below resources to use the CPU.
              #resources:
                #limits:
                  #nvidia.com/gpu: 1
    Worker:
      replicas: 2
      restartPolicy: OnFailure
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "false"
        spec:
          containers:
            - name: pytorch
              image: gcr.io/kubeflow-ci/pytorch-dist-mnist_test:1.0
              args: ["--backend", "gloo"]
              # Co

In [8]:
!kubectl apply -f ./distributed-training-jobs/distributed-pytorch-job.yaml

pytorchjob.kubeflow.org/distributed-pytorch-job created


In [9]:
!kubectl describe pytorchjob distributed-pytorch-job

Name:         distributed-pytorch-job
Namespace:    anonymous
Labels:       <none>
Annotations:  kubectl.kubernetes.io/last-applied-configuration:
                {"apiVersion":"kubeflow.org/v1","kind":"PyTorchJob","metadata":{"annotations":{},"name":"distributed-pytorch-job","namespace":"anonymous"}...
API Version:  kubeflow.org/v1
Kind:         PyTorchJob
Metadata:
  Creation Timestamp:  2020-05-09T01:51:11Z
  Generation:          1
  Resource Version:    9510
  Self Link:           /apis/kubeflow.org/v1/namespaces/anonymous/pytorchjobs/distributed-pytorch-job
  UID:                 8eec4837-9197-11ea-a820-0242ac110012
Spec:
  Pytorch Replica Specs:
    Master:
      Replicas:        1
      Restart Policy:  OnFailure
      Template:
        Metadata:
          Annotations:
            sidecar.istio.io/inject:  false
        Spec:
          Containers:
            Args:
              --backend
              gloo
            Image:  gcr.io/kubeflow-ci/pytorc

In [10]:
!kubectl get pod | grep distributed-pytorch-job

distributed-pytorch-job-master-0      0/1     ContainerCreating   0          7s
distributed-pytorch-job-worker-0      0/1     Init:0/1            0          6s
distributed-pytorch-job-worker-1      0/1     Init:0/1            0          4s


#### Check logs of one worker pod
Re-run the following cell periodically to see the logs.

In [11]:
!kubectl logs distributed-pytorch-job-master-0

Error from server (BadRequest): container "pytorch" in pod "distributed-pytorch-job-master-0" is waiting to start: ContainerCreating
