In [28]:
# pip3 install logger https://github.com/UBTEDU/Yan_ADK/archive/latest.tar.gz
import time
import json

import openadk
from openadk.rest import ApiException
from openadk.models.motions_parameter import MotionsParameter
from openadk.models.motions_operation import MotionsOperation
from openadk.api.motions_api import MotionsApi
from loguru import logger


class Robot:
    def __init__(self, node=None):
        self.node = node
        self.configuration = None  # 标识是否连接
        self.is_connected = False

    def connect(self, robot_ip="raspberrypi.local"):
        # 如果未完成，则对robot的调用都弹出提醒
        self.configuration = openadk.Configuration()  # openadk global
        self.configuration.host = f'http://{robot_ip}:9090/v1'  # /ui
        if self.ping_robot() == "online":
            if self.node:
                self.node.pub_notification("Device(Yanshee) Connected!", type="SUCCESS")  # 由一个积木建立连接到时触发
            self.is_connected = True
            return True

    # 装饰器 require connect
    def ping_robot(self):
        if not self.configuration:
            return
        self.devices_api_instance = openadk.DevicesApi(
            openadk.ApiClient(self.configuration))
        try:
            # Get system's battery information
            api_response = self.devices_api_instance.get_devices_battery()
            logger.debug(api_response)
            return "online"
        except ApiException as e:  # global
            error_message = f"Exception when calling DevicesApi->get_devices_battery: {str(e)}"
            logger.error(error_message)
            if node:
                node.pub_notification(error_message, type="ERROR")

    def play(
        self,
        name="wave",  # GetUp, Hug, Bow , HappyBirthday, Forward Backward OneStepForward TurnLeft TurnRight
        operation='start',
        direction='both',
        speed="fast"):  # ['start', 'pause', 'resume', 'stop']

        if not self.configuration:
            return
        motion_api_instance = MotionsApi(openadk.ApiClient(self.configuration))
        timestamp = int(time.time())
        kw = {"name": name, "speed": speed, "repeat": 1}
        if direction:
            kw["direction"] = direction
        motion = MotionsParameter(**kw)
        body = MotionsOperation(motion=motion,
                                operation=operation,
                                timestamp=timestamp)
        try:
            ret = motion_api_instance.put_motions(body)
        except ApiException as e:
            error_message = f"Exception when calling DevicesApi->get_devices_battery: {str(e)}"
            logger.error(error_message)
            if node:
                node.pub_notification(error_message, type="ERROR")
        # assert ret.code == 0
        return ret

    def bow(self):
        if not self.configuration:
            return
        self.play(name="bow", direction="", speed="slow", operation="start")

    def say(self, content="你好"):
        if not self.configuration:
            return
        timestamp = int(time.time())
        api_instance = openadk.VoiceApi(openadk.ApiClient(self.configuration))
        body = openadk.VoiceTTSStr(content)  # VoiceTTSStr |
        api_response = api_instance.put_voice_tts(body)
        
    def start_auto_transform(self):
        if not self.configuration:
            return
        api_instance = openadk.VoiceApi(openadk.ApiClient(self.configuration))
        body = openadk.VoiceIatRequest()  # VoiceIatRequest |  (optional)

        try:
            # Start auto transform
            api_response = api_instance.put_voice_iat(body=body)
            logger.debug(api_response)
        except ApiException as e:
            error_message = f"Exception when calling VoiceApi->put_voice_iat: {str(e)}"
            logger.error(error_message)
            if node:
                node.pub_notification(error_message, type="ERROR")
    
    def get_transform_result(self):
        if not self.configuration:
            return
        api_instance = openadk.VoiceApi(openadk.ApiClient(self.configuration))
        try:
            # Get auto transform(iat) result
            api_response = api_instance.get_voice_iat()
            logger.debug(api_response)
        except ApiException as e:
            error_message = f"Exception when calling VoiceApi->get_voice_iat: {str(e)}"
            logger.error(error_message)
            if node:
                node.pub_notification(error_message, type="ERROR")
        # 接口返回的是以转义字符结尾的字符串，去除转义字符并转为 python dict
        normalization_res = api_response.data.replace("\x00", "")
        n = json.loads(normalization_res)
        # 将结果拼接为语音指令
        cmd = ""
        for i in n['text']['ws']:
            cmd += i['cw'][0]['w']
        return cmd

In [29]:
robot = Robot()

In [30]:
robot.connect("192.168.31.109")

2020-06-04 16:13:49.938 | DEBUG    | __main__:ping_robot:38 - {'code': 0,
 'data': {'charging': 0, 'percent': 91, 'voltage': 4021},
 'msg': 'success'}


True

In [10]:
robot.bow()
robot.say("欢迎来到 codelab")

In [23]:
robot.play(name="OneStepForward")

{'code': 0, 'data': {'total_time': 3350}, 'msg': 'success'}

In [24]:
robot.play(name="Forward")

{'code': 0, 'data': {'total_time': 41430}, 'msg': 'success'}

In [5]:
robot.play(name="Stop")

In [25]:
robot.start_auto_transform()

2020-06-04 16:11:43.717 | DEBUG    | __main__:start_auto_transform:96 - {'code': 0, 'data': '{}', 'msg': 'Success'}


In [31]:
robot.get_transform_result() # after start auto transform

2020-06-04 16:13:55.455 | DEBUG    | __main__:get_transform_result:110 - {'code': 0,
 'data': '{"text":{"bg":0,"ed":0,"ls":false,"sn":1,"ws":[{"bg":0,"cw":[{"sc":0,"w":"你好"}]}]}}\x00',
 'msg': 'Success',
 'status': 'idle',
 'timestamp': None}


'你好'