In [1]:
import sys
import sklearn
import tensorflow as tf
from tensorflow import keras
import numpy as np
import os
import matplotlib.pyplot as plt

In [2]:
# MNIST학습 모델 만들기

(X_train_full, y_train_full), (X_test, y_test) = keras.datasets.mnist.load_data()
X_train_full = X_train_full[..., np.newaxis].astype(np.float32) / 255.
X_test = X_test[..., np.newaxis].astype(np.float32) / 255.
X_valid, X_train = X_train_full[:5000], X_train_full[5000:]
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]
X_new = X_test[:3]

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz


In [3]:
model = keras.models.Sequential([
    keras.layers.Flatten(input_shape=[28, 28, 1]),
    keras.layers.Dense(100, activation="relu"),
    keras.layers.Dense(10, activation="softmax")
])
model.compile(loss="sparse_categorical_crossentropy",
              optimizer=keras.optimizers.SGD(learning_rate=1e-2),
              metrics=["accuracy"])
model.fit(X_train, y_train, epochs=10, validation_data=(X_valid, y_valid))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.src.callbacks.History at 0x79f39a4a7a30>

In [4]:
# 내보낼 모델 위치 정보

model_version = "0001"
model_name = "my_mnist_model"
model_path = os.path.join(model_name, model_version)
model_path

'my_mnist_model/0001'

In [5]:
# 모델 내보내기

tf.saved_model.save(model, model_path)

In [6]:
for root, dirs, files in os.walk(model_name):
    indent = '    ' * root.count(os.sep)
    print('{}{}/'.format(indent, os.path.basename(root)))
    for filename in files:
        print('{}{}'.format(indent + '    ', filename))

my_mnist_model/
    0001/
        fingerprint.pb
        saved_model.pb
        variables/
            variables.index
            variables.data-00000-of-00001
        assets/


계산 그래프를 정의한 saved_model.pb와 변수값을 담고 있는 variables 그리고 부가적인 데이터가 들어있는 assets이 있다.

In [None]:
# saved_model_cli로 저장된 모델 검사

!saved_model_cli show --dir {model_path} --all

In [None]:
url = "https://storage.googleapis.com/tensorflow-serving-apt"
src = "stable tensorflow-model-server tensorflow-model-server-universal"
!echo 'deb {url} {src}' > /etc/apt/sources.list.d/tensorflow-serving.list
!curl '{url}/tensorflow-serving.release.pub.gpg' | apt-key add -
!apt update -q && apt-get install -y tensorflow-model-server
%pip install -q -U tensorflow-serving-api

In [9]:
# 서버 실행

os.environ["MODEL_DIR"] = os.path.split(os.path.abspath(model_path))[0]

In [10]:
%%bash --bg
nohup tensorflow_model_server \
     --rest_api_port=8501 \
     --model_name=my_mnist_model \
     --model_base_path="${MODEL_DIR}" >server.log 2>&1

!tail server.log

In [11]:
# 쿼리를 위한 JSON 데이터

import json

input_data_json = json.dumps({
    "signature_name": "serving_default",
    "instances": X_new.tolist(),
})

In [12]:
input_data_json[:100] + "..." + input_data_json[-10:]

'{"signature_name": "serving_default", "instances": [[[[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0... [0.0]]]]}'

In [14]:
# HTTP POST 매서드로 TF 서빙에 전송

import requests

SERVER_URL = 'http://localhost:8501/v1/models/my_mnist_model:predict'
response = requests.post(SERVER_URL, data=input_data_json)
response.raise_for_status()
response = response.json()

In [15]:
# 응답

response.keys()

dict_keys(['predictions'])

In [16]:
y_proba = np.array(response["predictions"])
y_proba.round(2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.97, 0.01, 0.  , 0.  , 0.01, 0.  , 0.  , 0.  ],
       [0.  , 0.98, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]])

모델이 첫 번째 이미지를 7이라고 예측하고 두 번째 이미지를 2, 세 번째 이미지를 1로 예측한다.

REST API는 JSON기반이기 때문에 텍스트를 사용한다.

따라서 모든 실수와 문자열 간의 변환을 위해 비효율적인 표현방식을 사용하여 큰 넘파이 배열을 전송할 때 응답속도가 느리다는 단점이 있다.

In [17]:
# gRPC API 사용하기

from tensorflow_serving.apis.predict_pb2 import PredictRequest

request = PredictRequest()
request.model_spec.name = model_name
request.model_spec.signature_name = "serving_default"
input_name = model.input_names[0]
request.inputs[input_name].CopyFrom(tf.make_tensor_proto(X_new))

In [20]:
import grpc
from tensorflow_serving.apis import prediction_service_pb2_grpc

channel = grpc.insecure_channel('localhost:8500') # gRPC 통신 채널
predict_service = prediction_service_pb2_grpc.PredictionServiceStub(channel)
response = predict_service.Predict(request, timeout=10.0)
response

outputs {
  key: "dense_1"
  value {
    dtype: DT_FLOAT
    tensor_shape {
      dim {
        size: 3
      }
      dim {
        size: 10
      }
    }
    float_val: 5.527950270334259e-05
    float_val: 5.7247678597605045e-08
    float_val: 0.0005158474668860435
    float_val: 0.0025093748699873686
    float_val: 5.276315846458601e-07
    float_val: 0.00011779415217461064
    float_val: 9.380375587397793e-09
    float_val: 0.9965420365333557
    float_val: 2.7229811166762374e-05
    float_val: 0.0002318693877896294
    float_val: 0.0007611507317051291
    float_val: 0.0002578407875262201
    float_val: 0.974459707736969
    float_val: 0.01315978541970253
    float_val: 1.2955864825414665e-08
    float_val: 0.002541010733693838
    float_val: 0.007578456774353981
    float_val: 1.6366380251042756e-08
    float_val: 0.001242046244442463
    float_val: 8.777319315811383e-09
    float_val: 4.369686939753592e-05
    float_val: 0.9818556904792786
    float_val: 0.005598597228527069
    f

In [21]:
# 응답을 텐서로 변환

output_name = model.output_names[0]
outputs_proto = response.outputs[output_name]
y_proba = tf.make_ndarray(outputs_proto)
y_proba.round(2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.97, 0.01, 0.  , 0.  , 0.01, 0.  , 0.  , 0.  ],
       [0.  , 0.98, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]],
      dtype=float32)

In [22]:
# 새로운 버전의 모델 배포하기

model = keras.models.Sequential([
    keras.layers.Flatten(input_shape=[28, 28, 1]),
    keras.layers.Dense(50, activation="relu"),
    keras.layers.Dense(50, activation="relu"),
    keras.layers.Dense(10, activation="softmax")
])
model.compile(loss="sparse_categorical_crossentropy",
              optimizer=keras.optimizers.SGD(learning_rate=1e-2),
              metrics=["accuracy"])
history = model.fit(X_train, y_train, epochs=10, validation_data=(X_valid, y_valid))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [23]:
model_version = "0002"
model_name = "my_mnist_model"
model_path = os.path.join(model_name, model_version)
tf.saved_model.save(model, model_path)

In [24]:
for root, dirs, files in os.walk(model_name):
    indent = '    ' * root.count(os.sep)
    print('{}{}/'.format(indent, os.path.basename(root)))
    for filename in files:
        print('{}{}'.format(indent + '    ', filename))

my_mnist_model/
    0002/
        fingerprint.pb
        saved_model.pb
        variables/
            variables.index
            variables.data-00000-of-00001
        assets/
    0001/
        fingerprint.pb
        saved_model.pb
        variables/
            variables.index
            variables.data-00000-of-00001
        assets/


새로운 버전이 추가된 것을 볼 수 있다.

In [25]:
import requests

SERVER_URL = 'http://localhost:8501/v1/models/my_mnist_model:predict'

response = requests.post(SERVER_URL, data=input_data_json)
response.raise_for_status()
response = response.json()

y_proba = np.array(response["predictions"])
y_proba.round(2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.98, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ]])

응답도 잘 되었다.

In [26]:
# GPU 사용하여 계산 속도 높이기

tf.config.list_physical_devices('GPU')

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]

In [27]:
from tensorflow.python.client.device_lib import list_local_devices

devices = list_local_devices()
devices

[name: "/device:CPU:0"
 device_type: "CPU"
 memory_limit: 268435456
 locality {
 }
 incarnation: 12060627275045159122
 xla_global_id: -1,
 name: "/device:GPU:0"
 device_type: "GPU"
 memory_limit: 14357954560
 locality {
   bus_id: 1
   links {
   }
 }
 incarnation: 10270904221930885266
 physical_device_desc: "device: 0, name: Tesla T4, pci bus id: 0000:00:04.0, compute capability: 7.5"
 xla_global_id: 416903419]

In [28]:
def create_model():
    return keras.models.Sequential([
        keras.layers.Conv2D(filters=64, kernel_size=7, activation="relu",
                            padding="same", input_shape=[28, 28, 1]),
        keras.layers.MaxPooling2D(pool_size=2),
        keras.layers.Conv2D(filters=128, kernel_size=3, activation="relu",
                            padding="same"),
        keras.layers.Conv2D(filters=128, kernel_size=3, activation="relu",
                            padding="same"),
        keras.layers.MaxPooling2D(pool_size=2),
        keras.layers.Flatten(),
        keras.layers.Dense(units=64, activation='relu'),
        keras.layers.Dropout(0.5),
        keras.layers.Dense(units=10, activation='softmax'),
    ])

In [30]:
# 분산 전략 API를 사용해 대규모 훈련

distribution = tf.distribute.MirroredStrategy()

with distribution.scope():
    model = create_model()
    model.compile(loss="sparse_categorical_crossentropy",
                  optimizer=keras.optimizers.SGD(learning_rate=1e-2),
                  metrics=["accuracy"])

batch_size = 100
model.fit(X_train, y_train, epochs=10, validation_data=(X_valid, y_valid), batch_size=batch_size)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.src.callbacks.History at 0x79f390415f00>

In [31]:
model.predict(X_new).round(2)



array([[0., 0., 0., 0., 0., 0., 0., 1., 0., 0.],
       [0., 0., 1., 0., 0., 0., 0., 0., 0., 0.],
       [0., 1., 0., 0., 0., 0., 0., 0., 0., 0.]], dtype=float32)

In [35]:
# 사용자 정의 훈련 루프

K = keras.backend

distribution = tf.distribute.MirroredStrategy()

with distribution.scope():
    model = create_model()
    optimizer = keras.optimizers.SGD()

with distribution.scope():
    dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train)).repeat().batch(batch_size)
    input_iterator = distribution.make_dataset_iterator(dataset)

@tf.function
def train_step():
    def step_fn(inputs):
        X, y = inputs
        with tf.GradientTape() as tape:
            Y_proba = model(X)
            loss = K.sum(keras.losses.sparse_categorical_crossentropy(y, Y_proba)) / batch_size

        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))
        return loss

    per_replica_losses = distribution.experimental_run(step_fn, input_iterator)
    mean_loss = distribution.reduce(tf.distribute.ReduceOp.SUM,
                                    per_replica_losses, axis=None)
    return mean_loss

n_epochs = 10
with distribution.scope():
    input_iterator.initializer
    for epoch in range(n_epochs):
        print("Epoch {}/{}".format(epoch + 1, n_epochs))
        for iteration in range(len(X_train) // batch_size):
            print("\rLoss: {:.3f}".format(train_step().numpy()), end="")
        print()

Epoch 1/10
Loss: 0.432
Epoch 2/10
Loss: 0.321
Epoch 3/10
Loss: 0.280
Epoch 4/10
Loss: 0.264
Epoch 5/10
Loss: 0.255
Epoch 6/10
Loss: 0.251
Epoch 7/10
Loss: 0.245
Epoch 8/10
Loss: 0.239
Epoch 9/10
Loss: 0.238
Epoch 10/10
Loss: 0.234


텐서플로 클러스터는 일반적으로 여러 서버에서 병렬로 실행되는 텐서플로 프로세스의 그룹이다.

신경망을 훈련하거나 실행하는 작업을 완료하기 위해 프로세스가 서로 대화하는데, 클러스터에 있는 개별 TF 프로세스를 TF 서버라고 하고, TF 서버는 IP 주소, 포트, 타입을 가진다.

이때 타입은 worker, chief, ps, evaluator가 있다.

 - 여기서 worker는 일반적으로 하나 이상의 GPU를 가진 머신에서 계산을 수행한다.

 - chief도 계산을 수행 하지만 텐서 보드 로그를 기록하거나 체크포인트를 저장하는 등의 추가적인 작업을 처리한다.

  클러스터에는 하나의 치프가 있고 일반적으로 첫 번째 워커가 치프이다.

 - 파라미터 서버(ps)는 일반적으로 CPU만 가진 머신으로 변수 값만 가지고 있다.

 - evaluator는 평가를 담당하고, 일반적으로 클러스터 내에 하나의 evaluator가 있다.

텐서플로 클러스터를 시작하려면 먼저 모든 TF 서버를 지정해야 한다.

In [36]:
# 텐서플로 클러스테에서 모델 훈련

cluster_spec = {
    "worker": [
        "machine-a.example.com:2222",  # /job:worker/task:0
        "machine-b.example.com:2222"   # /job:worker/task:1
    ],
    "ps": ["machine-c.example.com:2222"] # /job:ps/task:0
}

In [37]:
# 다른 태스크와 통신하기 위해 방화벽 설치

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": cluster_spec,
    "task": {"type": "worker", "index": 1}
})
os.environ["TF_CONFIG"]

'{"cluster": {"worker": ["machine-a.example.com:2222", "machine-b.example.com:2222"], "ps": ["machine-c.example.com:2222"]}, "task": {"type": "worker", "index": 1}}'

In [38]:
resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
resolver.cluster_spec()

ClusterSpec({'ps': ['machine-c.example.com:2222'], 'worker': ['machine-a.example.com:2222', 'machine-b.example.com:2222']})

In [40]:
%%writefile my_mnist_multiworker_task.py

import os
import numpy as np
import tensorflow as tf
from tensorflow import keras
import time

distribution = tf.distribute.MultiWorkerMirroredStrategy()

resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
print("Starting task {}{}".format(resolver.task_type, resolver.task_id))

# 워커 #0이 체크포인트 저장과 텐서보드 로깅을 수행
if resolver.task_id == 0:
    root_logdir = os.path.join(os.curdir, "my_mnist_multiworker_logs")
    run_id = time.strftime("run_%Y_%m_%d-%H_%M_%S")
    run_dir = os.path.join(root_logdir, run_id)
    callbacks = [
        keras.callbacks.TensorBoard(run_dir),
        keras.callbacks.ModelCheckpoint("my_mnist_multiworker_model.h5",
                                        save_best_only=True),
    ]
else:
    callbacks = []

# MNIST 데이터셋을 불러오기
(X_train_full, y_train_full), (X_test, y_test) = keras.datasets.mnist.load_data()
X_train_full = X_train_full[..., np.newaxis] / 255.
X_valid, X_train = X_train_full[:5000], X_train_full[5000:]
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]

with distribution.scope():
    model = keras.models.Sequential([
        keras.layers.Conv2D(filters=64, kernel_size=7, activation="relu",
                            padding="same", input_shape=[28, 28, 1]),
        keras.layers.MaxPooling2D(pool_size=2),
        keras.layers.Conv2D(filters=128, kernel_size=3, activation="relu",
                            padding="same"),
        keras.layers.Conv2D(filters=128, kernel_size=3, activation="relu",
                            padding="same"),
        keras.layers.MaxPooling2D(pool_size=2),
        keras.layers.Flatten(),
        keras.layers.Dense(units=64, activation='relu'),
        keras.layers.Dropout(0.5),
        keras.layers.Dense(units=10, activation='softmax'),
    ])
    model.compile(loss="sparse_categorical_crossentropy",
                  optimizer=keras.optimizers.SGD(learning_rate=1e-2),
                  metrics=["accuracy"])

model.fit(X_train, y_train, validation_data=(X_valid, y_valid),
          epochs=10, callbacks=callbacks)

Writing my_mnist_multiworker_task.py


In [41]:
# subprocess 모델을 사용해 두 워커를 각각의 개별 프로세스로 시작

import subprocess

cluster_spec = {"worker": ["127.0.0.1:9901", "127.0.0.1:9902"]}

for index, worker_address in enumerate(cluster_spec["worker"]):
    os.environ["TF_CONFIG"] = json.dumps({
        "cluster": cluster_spec,
        "task": {"type": "worker", "index": index}
    })
    subprocess.Popen("python my_mnist_multiworker_task.py", shell=True)