Приступаем к созданию первого сервиса для отправки признаков в одну очередь, а истинных ответов — в другую.

Создадим файл features.py с признаками.

In [1]:
import numpy as np
from sklearn.datasets import load_diabetes
X, y = load_diabetes(return_X_y=True)
random_row = np.random.randint(0, X.shape[0]-1)

Таким образом в нашем скрипте можно будет получить случайный вектор признаков X[random_row] и истинный ответ для него y[random_row].

Давайте подключимся к брокеру и попробуем отправлять в очередь y_true случайную метку y[random_row]:

In [4]:
import pika
# Подключение к серверу на локальном хосте:
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

Если вы захотите подключиться к удаленному серверу, то вместо localhost укажите его IP-адрес.

In [5]:
# Создадим очередь, с которой будем работать:
channel.queue_declare(queue='y_true')

<METHOD(['channel_number=1', 'frame_type=1', "method=<Queue.DeclareOk(['consumer_count=0', 'message_count=0', 'queue=y_true'])>"])>

Теперь отправим сообщение и закроем подключение:

In [6]:
# Опубликуем сообщение
# exchange определяет, в какую очередь отправляется сообщение, 
# параметр routing_key указывает имя очереди, 
# параметр body тело самого сообщения, 
channel.basic_publish(exchange='',
                      routing_key='y_true',
                      body=y[random_row])
print('Сообщение с правильным ответом, отправлено в очередь')
# Закроем подключение 
connection.close()

TypeError: object of type 'numpy.float64' has no len()

Давайте попробуем запустить код. Код выдал ошибку. Надеемся, вы уже догадались, почему это произошло.

Потому что наш объект не был сериализован!

ЗАДАНИЕ 1. 

Воспользуйтесь стандартной библиотекой json, чтобы сериализовать наш объект в этот формат. Таким образом, в качестве параметра body мы будем передавать байт-строку вектора признака.

Обратите внимание, что у вас не получится сериализовать объект array. Просто превратите его в список с помощью функции list().

In [7]:
import pika
import json
import numpy as np
from sklearn.datasets import load_diabetes
X, y = load_diabetes(return_X_y=True)
random_row = np.random.randint(0, X.shape[0]-1)

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='Features')
channel.queue_declare(queue='y_true')

channel.basic_publish(exchange='',
                      routing_key='Features',
                      body=json.dumps(list(X[random_row])))
print('Сообщение с вектором признаков, отправлено в очередь')

channel.basic_publish(exchange='',
                      routing_key='y_true',
                      body=json.dumps(y[random_row]))

print('Сообщение с правильным ответом, отправлено в очередь')
connection.close()

Сообщение с вектором признаков, отправлено в очередь
Сообщение с правильным ответом, отправлено в очередь
