In [1]:
!pip install paho-mqtt


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.2[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [None]:
!pip install ipywidgets

In [None]:
!pip install ipycanvas

# MQTT初期化

In [None]:
# --- 1. ライブラリのインストール ---
# !pip install paho-mqtt

import paho.mqtt.client as mqtt
import json
import time

# --- 2. MQTTブローカー設定 ---
BROKER_ADDRESS = "127.0.0.1"  # WSL2の場合はWSLのIPに変更
BROKER_PORT = 1883
AMR_ID = "amr_01"
VELOCITY_TOPIC = f"amr/{AMR_ID}/velocity"
STATUS_TOPIC = f"amr/{AMR_ID}/status"

# --- 3. コールバック関数の定義 ---
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("Connected to MQTT Broker!")
        # Subscribe to Status Topic
        client.subscribe(STATUS_TOPIC)
    else:
        print(f"Failed to connect, return code {rc}")


# --- 4. MQTTクライアント設定 ---
client = mqtt.Client(client_id="jupyter_test")
client.on_connect = on_connect

client.connect(BROKER_ADDRESS, BROKER_PORT, 60)

# --- 5. UnityにVelocityを送信する関数 ---
def send_velocity(linear, angular):
    cmd = {"linear": linear, "angular": angular}
    payload = json.dumps(cmd)
    client.publish(VELOCITY_TOPIC, payload)
    print(f"Published to {VELOCITY_TOPIC}: {payload}")

# Velocity Publish
ジョイスティックで操作した速度情報をUnityに送信します。（並進速度、回転速度はテキストボックスで指定可能）

In [None]:
import ipywidgets as widgets
from ipycanvas import Canvas, hold_canvas
from IPython.display import display

# --- 設定 ---
canvas_size = 200
center = canvas_size // 2
radius = 80
dead_zone = 0.1

max_linear = widgets.FloatText(value=1.0, description='Max Linear:')
max_angular = widgets.FloatText(value=1.0, description='Max Angular:')

canvas = Canvas(width=canvas_size, height=canvas_size)

def draw_joystick(x, y):
    with hold_canvas(canvas):
        canvas.clear()
        canvas.stroke_style = '#888'
        canvas.stroke_circle(center, center, radius)
        canvas.fill_style = '#1e90ff'
        canvas.fill_circle(x, y, 20)

draw_joystick(center, center)


def on_mouse_move(x, y):
    dx = x - center
    dy = y - center

    if (dx**2 + dy**2) > radius**2:
        return

    draw_joystick(x, y)

    nx = dx / radius
    ny = -dy / radius  # 上方向正

    # デッドゾーン処理
    if abs(nx) < dead_zone:
        nx = 0.0
    if abs(ny) < dead_zone:
        ny = 0.0

    # 前後方向 -> linear, 左右方向 -> angular
    linear = ny * max_linear.value
    angular = nx * max_angular.value

    # 後退時に回転の左右を反転
    if linear < 0:
        angular = -angular

    send_velocity(linear, angular)

canvas.on_mouse_move(on_mouse_move)
display(widgets.HBox([max_linear, max_angular]))
display(canvas)

# AMR status subscribe
AMRの状態をMQTTで購読し、表示します。

In [None]:
import ipywidgets as widgets
import subprocess

status_label = widgets.Label(value="Status: 受信待ち...")

def one_time_status():
    try:
        result = subprocess.run([
            "mosquitto_sub",
            "-h", f"{BROKER_ADDRESS}",
            "-p", f"{BROKER_PORT}",
            "-t", f"{STATUS_TOPIC}",
            "-C", "1"  # 1メッセージ受信で終了
        ], stdout=subprocess.PIPE, text=True, timeout=30)
        status = result.stdout.strip()
        if status:
            status_label.value = f"Status: {status}"
        else:
            status_label.value = "Status: 受信なし"
    except Exception as e:
        status_label.value = f"Status: エラー: {e}"

display(status_label)
one_time_status()