In [1]:
from IPython.display import clear_output

!pip install -U pymoo
!pip install omegaconf
!pip install dill
clear_output()

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import sys
sys.path.append('/content/drive/MyDrive/Colab Notebooks/Moonshot/src')
%cd /content/drive/MyDrive/Colab Notebooks/Moonshot/20241228

/content/drive/MyDrive/Colab Notebooks/Moonshot/20241228


In [15]:
from robotic_system.data_manager import Manager
from simulator.agent import SimulationAgent
from simulator.simulation import run_simulation

from omegaconf import OmegaConf
import numpy as np
import os, copy

property_file = os.path.join('properties', 'test.yaml')
properties = OmegaConf.load(property_file)

manager = Manager(properties)
tasks = manager.read_task()
module_types = manager.read_module_type()
robot_types = manager.read_robot_type(module_types)
modules = manager.read_module(module_types)
robots = manager.read_robot(robot_types, modules)

for t_name, task in tasks.items():
    print(task)

print(module_types)

# # シミュレーション設定
# seed = properties.simulation.seed
# max_step = properties.simulation.maxSimulationStep
# max_step = 100
# battery_limit = properties.simulation.batteryLimit
# charge_station = properties.simulation.chargeStation

# # シミュレーションエージェントの生成
# agents = [SimulationAgent(robot, battery_limit) for _, robot in robots.items()]

# task_priority = []
# np.random.seed(1111)
# for _ in agents:
#     keys = tasks.keys()
#     # 辞書のキーをリストに変換してシャッフルする
#     task_priority_ = np.array(list(keys))  # keysをリストに変換
#     np.random.shuffle(task_priority_)  # その場で並び替え
#     task_priority.append(copy.deepcopy(task_priority_))


# agent_history, task_history = run_simulation(task_priority, agents, tasks, seed, max_step, charge_station, 0.001)
# print(task_history)
# print(agent_history)

Transport(name='i_t_000', coordinate=array([0., 0.]), workload=[10, 0], task_dependency=[], required_abilities={<RobotPerformanceAttributes.TRANSPORT: 0>: 12, <RobotPerformanceAttributes.MANUFACTURE: 1>: 0}, other_attrs={'category': 'Inflatable'}, origin_destination=[[0.0, 0.0], [5.0, 5.0], 0.5])
Manufacture(name='i_m_000', coordinate=array([5., 5.]), workload=[5, 0], task_dependency=[Transport(name='i_t_000', coordinate=array([0., 0.]), workload=[10, 0], task_dependency=[], required_abilities={<RobotPerformanceAttributes.TRANSPORT: 0>: 12, <RobotPerformanceAttributes.MANUFACTURE: 1>: 0}, other_attrs={'category': 'Inflatable'}, origin_destination=[[0.0, 0.0], [5.0, 5.0], 0.5])], required_abilities={<RobotPerformanceAttributes.TRANSPORT: 0>: 0, <RobotPerformanceAttributes.MANUFACTURE: 1>: 6}, other_attrs={'category': 'Inflatable'})
Manufacture(name='i_m_001', coordinate=array([5., 5.]), workload=[8, 0], task_dependency=[Transport(name='i_t_000', coordinate=array([0., 0.]), workload=[10,

In [9]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import matplotlib.gridspec as gridspec
from matplotlib import rc
from matplotlib.collections import PolyCollection
import matplotlib.patches as patches
from matplotlib.patches import Circle, Rectangle, Ellipse, Polygon
from math import cos, sin, pi

from robotic_system.core import RobotState

# 描画する正三角形のパラメータ
triangle_size = 0.5  # 辺の長さ
rectangle_size = 0.5  # 辺の長さ
pentagon_size = 0.4 # 辺の長さ
ellipse_size = (0.5, 0.2) # 半径
circle_size = 0.4 # 半径


# 正三角形の頂点を計算
def calculate_equilateral_triangle_vertices(center):
    x, y = center
    # 頂点を計算
    vertices = [
        (x + triangle_size * cos(pi / 6 + i * 2 * pi / 3), y + triangle_size * sin(pi / 6 + i * 2 * pi / 3))
        for i in range(3)
    ]
    return vertices

# 逆正三角形の頂点を計算する関数
def calculate_inverted_triangle_vertices(center):
    x, y = center
    # 頂点を計算
    vertices = [
        (x + triangle_size * cos(-pi / 6 + i * 2 * pi / 3), y + triangle_size * sin(-pi / 6 + i * 2 * pi / 3))
        for i in range(3)
    ]
    return vertices

# 正方形の左下の座標を計算する関数
def calculate_rectangle_vertices(center):
    return (center[0] - rectangle_size / 2, center[1] - rectangle_size / 2)

# 五角形の頂点を計算する関数
def calculate_pentagon_vertices(center):
    x, y = center
    # 5つの頂点を計算
    vertices = [
        (x + pentagon_size * cos(2 * pi * i / 5), y + pentagon_size * sin(2 * pi * i / 5))
        for i in range(5)
    ]
    return vertices

def save_animation(agent_history, task_history, numsave_dir, name, max_step, agent_name_list, task_name_list, file_name):
    # 描画するグラフの設定
    # ax[0]: ロボット移動軌跡
    # ax[2]: タスク残量
    # ax[3]: スケジュール表

    fig = plt.figure(figsize=(7.5, 5))
    gs = gridspec.GridSpec(2, 2, height_ratios=(3, 1))
    ax = [plt.subplot(gs[0, 0]), plt.subplot(gs[0, 1]), plt.subplot(gs[1, 0]), plt.subplot(gs[1, 1])]
    legend_flag = True  # 凡例描画のフラグ
    plt.close()

    # カラーマップを用意する
    color_map = {
        'Transport': 'blue',
        'Manufacture': 'green'
    }

    # アニメーション用のグラフ保管場所
    ims = []

    # スケジュール
    scheduleBar = {}
    for member in RobotState:
        scheduleBar[member] = [[] for x in range(len(agent_name_list))]

    assert len(task_history) == len(agent_history), "Mismatch: task_history and agent_history must be of the same length."

    for t, agents in enumerate(agent_history):
        # 描画設定
        if legend_flag:  # 一回のみ凡例を描画
            ax[0].legend(loc='lower center', bbox_to_anchor=(1.1, 1.1), ncol=4)
            ax[0].set_xlim([-6.0, 6.0])
            ax[0].set_ylim([-6.0, 6.0])
            ax[0].tick_params(labelbottom=False, labelleft=False, labelright=False, labeltop=False, length=0)
            ax[0].tick_params(length=0)
            ax[0].set_title("2D simulation")
            ax[1].axis("off")
            ax[1].set_xlim([-6.0, 6.0])
            ax[1].set_ylim([-6.0, 6.0])
            ax[1].tick_params(labelbottom=False, labelleft=False, labelright=False, labeltop=False, length=0)
            ax[1].tick_params(length=0)
            ax[2].set_xlim(0, len(task_name_list))
            ax[2].set_ylim(0, 20)
            ax[2].set_xticks([n for n in range(len(task_name_list))])
            ax[2].set_title("Leftover Jobs")
            handles = [plt.Rectangle((0,0),1,1, color=color_map[cat]) for cat in color_map]
            labels = list(color_map.keys())
            ax[2].set_xticklabels(task_name_list, rotation=45, ha='right') # 横軸ラベル回転
            ax[2].legend(handles, labels) # 凡例を追加

            ax[3].set_xlim(0, max_step)
            ax[3].set_ylim(0, len(agent_name_list))
            ax[3].set_title('Timeline')
            ax[3].set_xlabel("period")
            ax[3].set_ylabel("robots")
            ax[3].tick_params(labelbottom=True, labelleft=True, labelright=False, labeltop=False, length=0)
            legend_flag = False
        # subplot0：2D
        im = []
        for r, agent in enumerate(agents):
            robot_coodinate = agent.robot.coordinate
            robot_state = agent.robot.state
            robot_type = agent.robot.type.name
            if robot_type == 'TWSH':
                vertices = calculate_equilateral_triangle_vertices(robot_coodinate)
                triangle = Polygon(vertices, closed=True, color=robot_state.color, alpha=0.8)
                ax[0].add_patch(triangle)
                im.append(triangle)
            if robot_type == 'TWDH':
                vertices = calculate_inverted_triangle_vertices(robot_coodinate)
                triangle = Polygon(vertices, closed=True, color=robot_state.color, alpha=0.8)
                ax[0].add_patch(triangle)
                im.append(triangle)
            if robot_type == 'QWSH':
                vertices = calculate_rectangle_vertices(robot_coodinate)
                rect = Rectangle(vertices, rectangle_size, rectangle_size, color=robot_state.color, alpha=0.8)
                ax[0].add_patch(rect)
                im.append(rect)
            if robot_type == 'QWDH':
                vertices = calculate_pentagon_vertices(robot_coodinate)
                pentagon = Polygon(vertices, closed=True, color=robot_state.color, alpha=0.8)
                ax[0].add_patch(pentagon)
                im.append(pentagon)
            if robot_type == 'Dragon':
                ellipse = Ellipse(robot_coodinate, ellipse_size[0], ellipse_size[1], color=robot_state.color, alpha=0.8)
                ax[0].add_patch(ellipse)
                im.append(ellipse)
            if robot_type == 'Minimal':
                circle = Circle(robot_coodinate, circle_size, color=robot_state.color, alpha=0.8)
                ax[0].add_patch(circle)
                im.append(circle)
            # subplot2：スケジュール表のデータ保存
            scheduleBar[robot_state][r].append((t, 1.0))

        # subplot1：棒グラフ
        tasks = task_history[t]
        keys = [task.name for task in tasks.values()]
        colors = [color_map[type(task).__name__] for task in tasks.values()]
        height = [task.workload[0] - task.workload[1] for task in tasks.values()]
        bars = ax[2].bar(keys, height, color=colors)
        for bar in bars:
            im.append(bar)

        # subplot2：スケジュール表
        for r in range(len(robots)):
            for state, schedule in scheduleBar.items():
                im.append(ax[3].broken_barh(schedule[r], (r, 1), facecolors=state.value[1]))
        im.append(ax[3].axvline(x=t, color='white', linestyle='--'))  # 縦棒の追加

        # 各ターンの図を保管
        ims.append(im)
    # タスクが早めに終わった場合のデータ補完
    if t != max_step -1:
        for _ in range(10):
            ims.append(im)

    ani = animation.ArtistAnimation(fig, ims, blit=True, interval=2000)
    ani.save(file_name, writer="ffmpeg", fps=15, dpi=500)

    return ani

In [10]:
from IPython.display import HTML

agents = agent_history[-1]
tasks = task_history[-1]
agent_name_list = [agent.robot.name for agent in agents]
task_name_list = [task_name for task_name, data in tasks.items()]
ani = save_animation(agent_history, task_history, _ ,_, max_step, agent_name_list, task_name_list, 'without_break.mp4')
# HTML(ani.to_jshtml())

