[참고](https://github.com/rickiepark/handson-ml2/blob/master/19_training_and_deploying_at_scale.ipynb)

모델의 버전을 관리할 때 한 모델에서 다른 모델로 부드럽게 이전해야 하고 <br/> 
문제가 생겼을 때 이전 모델을 롤백(rollback)하거나 **A/B 테스트**를 위해 여러 다른 모델을 동시에 실행할 수 있음.

# 19.1 텐서플로 모델 서빙
## 19.1.1 텐서플로 서빙 사용하기
먼저 모델을 텐서플로의 **SavedModel 포맷**으로 내보내야 한다.

### SavedModel로 내보내기

In [1]:
import tensorflow as tf
from tensorflow import keras
import numpy as np

(X_train_full, y_train_full), (X_test, y_test) = keras.datasets.mnist.load_data()
X_train_full = X_train_full[..., np.newaxis].astype(np.float32) / 255.
X_test = X_test[..., np.newaxis].astype(np.float32) / 255.
X_valid, X_train = X_train_full[:5000], X_train_full[5000:]
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]
X_new = X_test[:3]

In [2]:
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.Flatten(input_shape=[28, 28, 1]),
    keras.layers.Dense(100, activation="relu"),
    keras.layers.Dense(10, activation="softmax")
])
model.compile(loss="sparse_categorical_crossentropy",
              optimizer=keras.optimizers.SGD(lr=1e-2),
              metrics=["accuracy"])
model.fit(X_train, y_train, epochs=10, validation_data=(X_valid, y_valid))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x26c84ab6580>

In [3]:
np.round(model.predict(X_new), 2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.97, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]],
      dtype=float32)

In [4]:
import os

model_version = "0001"
model_name = "my_mnist_model"
model_path = os.path.join(model_name, model_version)
model_path

'my_mnist_model\\0001'

In [5]:
tf.saved_model.save(model, model_path)

INFO:tensorflow:Assets written to: my_mnist_model\0001\assets


In [6]:
# 디렉터리 구조
for root, dirs, files in os.walk(model_name):
    indent = '    ' * root.count(os.sep)
    print('{}{}/'.format(indent, os.path.basename(root)))
    for filename in files:
        print('{}{}'.format(indent + '    ', filename))

my_mnist_model/
    0001/
        saved_model.pb
        assets/
        variables/
            variables.data-00000-of-00001
            variables.index


In [7]:
!saved_model_cli show --dir {model_path}

The given SavedModel contains the following tag-sets:
'serve'


2021-01-26 11:31:07.743906: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'cudart64_110.dll'; dlerror: cudart64_110.dll not found
2021-01-26 11:31:07.744232: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.


In [8]:
!saved_model_cli show --dir {model_path} --all


MetaGraphDef with tag-set: 'serve' contains the following SignatureDefs:

signature_def['__saved_model_init_op']:
  The given SavedModel SignatureDef contains the following input(s):
  The given SavedModel SignatureDef contains the following output(s):
    outputs['__saved_model_init_op'] tensor_info:
        dtype: DT_INVALID
        shape: unknown_rank
        name: NoOp
  Method name is: 

signature_def['serving_default']:
  The given SavedModel SignatureDef contains the following input(s):
    inputs['flatten_input'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1, 28, 28, 1)
        name: serving_default_flatten_input:0
  The given SavedModel SignatureDef contains the following output(s):
    outputs['dense_1'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1, 10)
        name: StatefulPartitionedCall:0
  Method name is: tensorflow/serving/predict

Defined Functions:
  Function Name: '__call__'
    Option #1
      Callable with:
        Argument #1
          flatte

2021-01-26 11:31:12.646396: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'cudart64_110.dll'; dlerror: cudart64_110.dll not found
2021-01-26 11:31:12.646722: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.




  Function Name: '_default_save_signature'
    Option #1
      Callable with:
        Argument #1
          flatten_input: TensorSpec(shape=(None, 28, 28, 1), dtype=tf.float32, name='flatten_input')

  Function Name: 'call_and_return_all_conditional_losses'
    Option #1
      Callable with:
        Argument #1
          inputs: TensorSpec(shape=(None, 28, 28, 1), dtype=tf.float32, name='inputs')
        Argument #2
          DType: bool
          Value: False
        Argument #3
          DType: NoneType
          Value: None
    Option #2
      Callable with:
        Argument #1
          inputs: TensorSpec(shape=(None, 28, 28, 1), dtype=tf.float32, name='inputs')
        Argument #2
          DType: bool
          Value: True
        Argument #3
          DType: NoneType
          Value: None
    Option #3
      Callable with:
        Argument #1
          flatten_input: TensorSpec(shape=(None, 28, 28, 1), dtype=tf.float32, name='flatten_input')
        Argument #2
          DType

SavedModel은 하나 이상의 **메타그래프**(metagraph)를 포함한다.

In [9]:
np.save("my_mnist_tests.npy", X_new)

In [10]:
input_name = model.input_names[0]
input_name

'flatten_input'

In [11]:
!saved_model_cli run --dir {model_path} --tag_set serve \
                     --signature_def serving_default    \
                     --inputs {input_name}=my_mnist_tests.npy

Result for output key dense_1:
[[1.1437385e-04 1.5036889e-07 9.8796247e-04 2.7837253e-03 3.7688069e-06
  7.7121476e-05 3.9443869e-08 9.9555272e-01 5.3149332e-05 4.2698340e-04]
 [8.2202948e-04 3.4976089e-05 9.8832816e-01 7.0123803e-03 1.2931406e-07
  2.3006395e-04 2.5330544e-03 9.5024943e-10 1.0390545e-03 8.6442768e-08]
 [4.4567176e-05 9.7030860e-01 9.0850079e-03 2.2700275e-03 4.8450279e-04
  2.8607429e-03 2.2839641e-03 8.3487984e-03 4.0172054e-03 2.9646422e-04]]


2021-01-26 11:31:18.198156: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'cudart64_110.dll'; dlerror: cudart64_110.dll not found
2021-01-26 11:31:18.198556: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2021-01-26 11:31:22.473472: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-26 11:31:22.475301: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'nvcuda.dll'; dlerror: nvcuda.dll not found
2021-01-26 11:31:22.475591: W tensorflow/stream_executor/cuda/cuda_driver.cc:326] failed call to cuInit: UNKNOWN ERROR (303)
2021-01-26 11:31:22.482046: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: Pingu-Kim
2021-01-26 11:31:22.482594: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] ho

### 텐서플로 서빙 설치하기 ???
### REST API로 TF 서빙에 쿼리하기

In [12]:
import json

input_data_json = json.dumps({
    "signature_name": "serving_default",
    "instances": X_new.tolist(),
})

In [13]:
# JSON 문자열로 만들기
repr(input_data_json)[:1500] + "..."

'\'{"signature_name": "serving_default", "instances": [[[[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]], [[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]], [[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]], [[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]], [[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0

In [14]:
# HTTP POST 메서드로 TF 서빙에 전송
import requests

SERVER_URL = 'http://localhost:8501/v1/models/my_mnist_model:predict'
response = requests.post(SERVER_URL, data=input_data_json) #???
response.raise_for_status() # raise an exception in case of error
response = response.json()

ConnectionError: HTTPConnectionPool(host='localhost', port=8501): Max retries exceeded with url: /v1/models/my_mnist_model:predict (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x0000026C862FEA30>: Failed to establish a new connection: [WinError 10061] 대상 컴퓨터에서 연결을 거부했으므로 연결하지 못했습니다'))

In [None]:
response.keys()

In [None]:
y_proba = np.array(response["predictions"])
y_proba.round(2)

### gRPC API로 TF 서빙에 쿼리하기

In [15]:
from tensorflow_serving.apis.predict_pb2 import PredictRequest # ???

request = PredictRequest()
request.model_spec.name = model_name
request.model_spec.signature_name = "serving_default"
input_name = model.input_names[0]
request.inputs[input_name].CopyFrom(tf.make_tensor_proto(X_new))

In [16]:
# 서버로 요청을 보내고 응답을 받는다.
import grpc
from tensorflow_serving.apis import prediction_service_pb2_grpc

channel = grpc.insecure_channel('localhost:8500')
predict_service = prediction_service_pb2_grpc.PredictionServiceStub(channel)
response = predict_service.Predict(request, timeout=10.0) # ???

_InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "failed to connect to all addresses"
	debug_error_string = "{"created":"@1611628316.643000000","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":4134,"referenced_errors":[{"created":"@1611628316.643000000","description":"failed to connect to all addresses","file":"src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc","file_line":398,"grpc_status":14}]}"
>

In [None]:
# PredictResponse 프로토콜 버퍼를 텐서로 바꾼다.
output_name = model.output_names[0]
outputs_proto = response.outputs[output_name]
y_proba = tf.make_ndarray(outputs_proto)
y_proba.round(2)

### 새로운 버전의 모델 배포하기

In [17]:
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.Flatten(input_shape=[28, 28, 1]),
    keras.layers.Dense(50, activation="relu"),
    keras.layers.Dense(50, activation="relu"),
    keras.layers.Dense(10, activation="softmax")
])
model.compile(loss="sparse_categorical_crossentropy",
              optimizer=keras.optimizers.SGD(lr=1e-2),
              metrics=["accuracy"])
history = model.fit(X_train, y_train, epochs=10, validation_data=(X_valid, y_valid))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


## 19.1.2 GCP(Google Cloud Platform) AI 플랫폼에서 예측 서비스 만들기
## 19.1.3 예측 서비스 사용하기
### 구글 API 클라이언트 라이브러리
### 구글 클라우드 클라이언트 라이브러리

In [19]:
project_id = "onyx-smoke-242003"

In [21]:
import googleapiclient.discovery

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "my_service_account_private_key.json" # GCP에서 다운로드 받아야 함...
model_id = "my_mnist_model"
model_path = "projects/{}/models/{}".format(project_id, model_id)
model_path += "/versions/v0001/" # if you want to run a specific version
ml_resource = googleapiclient.discovery.build("ml", "v1").projects()

In [22]:
# 리소스 객체를 사용해 예측 결과를 반환하는 함수
def predict(X):
    input_data_json = {"signature_name": "serving_default",
                       "instances": X.tolist()}
    request = ml_resource.predict(name=model_path, body=input_data_json)
    response = request.execute()
    if "error" in response:
        raise RuntimeError(response["error"])
    return np.array([pred[output_name] for pred in response["predictions"]])

In [24]:
# 함수 작동 확인
Y_probas = predict(X_new)
np.round(Y_probas, 2)

# 19.2 모바일 또는 임베디드 장치에 모델 배포하기

    converter = tf.lite.TFLiteConverter.from_saved_model(saved_model_path)
    tflite_model = converter.convert()
    with open("converted_model.tflite", "wb") as f :
        f.write(tflite_model)

훈련 후 양자화

    converter.optimizations = [tf.lite.Optimize.OPTIMIZE_FOR_SIZE]
    
정확소 손실이 너무 크면 **양자화를 고려한 훈련**(quantization-aware training)이 필요할 수도 있음.

### 브라우저를 위한 텐서플로

    import * as tf from '@tensorflow/tfjs' ;
    const model = await tf.loadLayersModel('https://example.com/tfjs/model.json') ;
    const image = tf.fromPixels(webcamElemant) ;
    const prediction = model.predict(image) ;

# 19.3 계산 속도를 높이기 위해 GPU 사용하기
## 19.3.1 GPU 구매하기

In [26]:
tf.test.is_gpu_available()

Instructions for updating:
Use `tf.config.list_physical_devices('GPU')` instead.


False

In [27]:
tf.config.list_physical_devices('GPU')

[]

In [28]:
tf.test.gpu_device_name()

''

## 19.3.2 GPU를 장착한 가상 머신 사용하기
## 19.3.3 코랩(Colaboratory)
## 19.3.4 GPU RAM 관리하기


    for gpu in tf.config.experimental.list_physical_devices("GPU") :
        tf.config.experimental.set_virtual_device_configuration(
            gpu,
            [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=2048)])

텐서플로가 필요할 때만 메모리를 점유하게 만드는 것

    for gpu intf.config.experimental.list_physical_devices("GPU") :
        tf.config.experimental.set_memory_growth(gpu, True)

GPU를 두 개 이상의 가상 GPU로 나누고 싶을 때

    physical_gpus = tf.config.experimental.list_physical_devices("GPU")
    tf.config.experimental.set_virtual_device_configuration(
        physical_gpus[0],
        [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=2048),
        tf.config.experimental.VirtualDeviceConfiguration(memory_limit=2048)])

## 19.3.5 디바이스에 연산과 변수 할당하기
텐서플로 백서에는 가용한 모든 장치에 연산을 완전히 자동으로 분산하는 **동적 배치자**(dynamic placer)알고리즘이 소개되어 있다.

In [29]:
a = tf.Variable(42.0)
a.device

'/job:localhost/replica:0/task:0/device:CPU:0'

In [30]:
b = tf.Variable(42)
b.device

'/job:localhost/replica:0/task:0/device:CPU:0'

In [31]:
# 연산을 기본 장치 대신에 다른 장치에 배치
with tf.device("/cpu:0") :
    c = tf.Variable(42.0)
    
c.device

'/job:localhost/replica:0/task:0/device:CPU:0'

## 19.3.6 다중 장치에서 병렬 실행
CPU의 평가 큐에 있는 연산은 **inter-op 스레드 풀**(thread pool)로 보내진다.

# 19.4 다중 장치에서 모델 훈련하기
여러 장치에서 하나의 모델을 훈련하는 방법
1. 모델을 여러 장치에 분할하는 **모델 병렬화**(model parallelism)
2. 모델을 각 장치에 복사하고 복사본(replica)을 데이터 일부분에서 훈련하는 **데이터 병렬화**(data parallelism)

## 19.4.1 모델 병렬화
## 19.4.2 데이터 병렬화
데이터 병렬화 : 복제 모델에서 계산된 그레이디언트를 평균하고 그 결과를 사용해 모델 파라미터를 업데이트하는 것.

### 미러드 전략을 사용한 데이터 병렬화
미러드 전략(morrored strategy) : 모델 파라미터를 모든 GPU에 완전이 똑같이 복사하고 항상 모든 GPU에 동일한 파라미터 업데이트를 적용하는 것. <br/>
올리듀스(AllReduce) 알고리즘을 사용해 모든 GPU에서 얻은 그레이디언트의 평균을 효율적으로 계산하고 그 결과를 모든 GPU에게 배포할 수 있다.

### 중앙 집중적인 파라미터를 사용한 데이터 병렬화
계산을 수행하는 GPU 장치인 **워커**(worker) 밖에 모델 파라미터를 저장하는 방식. <br/>
분산 환경에서는 모든 파라미터를 **파라미터 서버**(parameter server)라 부르는 하나 이상의 CPU만 있는 서버에 저장할 수 있다.

### 동기 업데이트(synchronous update)
모든 그레이디언트가 준비될 때까지 그레이디언트 수집기가 기다린 다음 평균 그레이디언트를 계산하여 모델 파라미터를 업데이트할 옵티마이저에게 전달.

### 비동기 업데이트(asynchronous updates)
복제 모델이 그레이디언트 계산을 끝낼 때마다 즉시 이를 사용해 모델 파라미터를 업데이트한다. <br/>
낡은 그레이디언트(stale gradient) : 그레이디언트가 심하게 오래된 것. <br/> 
하나의 복제 모델만 사용해서 처음 몇 번의 에포크를 시작함(**준비 단계**, warmup phase)으로써 낡은 그레이디언트 현상을 줄일 수 있다.

### 대역폭 포화

## 19.4.3 분산 전략 API를 사용한 대규모 훈련


In [32]:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

In [33]:
def create_model():
    return keras.models.Sequential([
        keras.layers.Conv2D(filters=64, kernel_size=7, activation="relu",
                            padding="same", input_shape=[28, 28, 1]),
        keras.layers.MaxPooling2D(pool_size=2),
        keras.layers.Conv2D(filters=128, kernel_size=3, activation="relu",
                            padding="same"), 
        keras.layers.Conv2D(filters=128, kernel_size=3, activation="relu",
                            padding="same"),
        keras.layers.MaxPooling2D(pool_size=2),
        keras.layers.Flatten(),
        keras.layers.Dense(units=64, activation='relu'),
        keras.layers.Dropout(0.5),
        keras.layers.Dense(units=10, activation='softmax'),
    ])

In [34]:
batch_size = 100
model = create_model()
model.compile(loss="sparse_categorical_crossentropy",
              optimizer=keras.optimizers.SGD(lr=1e-2),
              metrics=["accuracy"])
model.fit(X_train, y_train, epochs=10,
          validation_data=(X_valid, y_valid), batch_size=batch_size)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x26c87be7640>

In [35]:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

distribution = tf.distribute.MirroredStrategy()

# Change the default all-reduce algorithm:
#distribution = tf.distribute.MirroredStrategy(
#    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())

# Specify the list of GPUs to use:
#distribution = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])

# Use the central storage strategy instead:
#distribution = tf.distribute.experimental.CentralStorageStrategy()

#resolver = tf.distribute.cluster_resolver.TPUClusterResolver()
#tf.tpu.experimental.initialize_tpu_system(resolver)
#distribution = tf.distribute.experimental.TPUStrategy(resolver)

with distribution.scope():
    model = create_model()
    model.compile(loss="sparse_categorical_crossentropy",
                  optimizer=keras.optimizers.SGD(lr=1e-2),
                  metrics=["accuracy"])

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


In [36]:
batch_size = 100 # must be divisible by the number of workers
model.fit(X_train, y_train, epochs=10,
          validation_data=(X_valid, y_valid), batch_size=batch_size)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x26c89e60400>

In [38]:
# 모델을 로드하여 가능한 모든 장치에서 실행하고 싶다면
with distribution.scope() :
    mirrored_model = keras.models.load_model("my_mnist_model.h5")

## 19.4.4 텐서플로 클러스터에서 모델 훈련하기
텐서플로 클러스터(TensorFlow cluster) : 다른 머신에서 동시에 실행되는 텐서플로 프로세스 그룹.
태스크(task, TF 서버) : 클러스터에 있는 개별 TF 프로세스. IP주소, 포트, 타입(역할role 또는 잡job)을 가진다.
- 워커(worker) : GPU를 한 개 이상 가진 머신에서 계산을 수행.
- 치프(chief) : 계산을 수행하지만 텐서보드 로그를 작성하거나 체크포인트를 저장하는 것과 같은 추가적인 일을 처리한다.
- 파라미터 서버(parameter server) : 변숫값만 저장하고 일반적으로 CPU만 있는 머신을 사용한다.
- 이밸류에이터(evaluator) : 평가를 처리.

In [39]:
# 클러스터 명세(cluster specification)
cluster_spec = tf.train.ClusterSpec({
    "worker": ["127.0.0.1:9901", "127.0.0.1:9902"],
    "ps": ["127.0.0.1:9903"]
})

In [40]:
# 클러스터를 사용하고 시작하려는 태스크가 첫 번째 워커라는 것을 지정
import os
import json

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["my-work0.example.com:9876", "my-work1.example.com:9876"],
        "ps": ["my-ps0.example.com:9876"]
    },
    "task": {"type": "worker", "index": 0}
})
print("TF_CONFIG='{}'".format(os.environ["TF_CONFIG"]))

TF_CONFIG='{"cluster": {"worker": ["my-work0.example.com:9876", "my-work1.example.com:9876"], "ps": ["my-ps0.example.com:9876"]}, "task": {"type": "worker", "index": 0}}'
