# Kart Behvior-Clone

In [1]:
import numpy as np
import random
import copy
import datetime
import platform
import torch
import os
import torch.nn.functional as F
from torchvision.utils import save_image
from torch.utils.tensorboard import SummaryWriter
from collections import deque
from pathlib import Path
from mlagents_envs.environment import UnityEnvironment, ActionTuple
from mlagents_envs.side_channel.engine_configuration_channel\
                             import EngineConfigurationChannel
from mlagents.trainers.demo_loader import demo_to_buffer
from mlagents.trainers.buffer import BufferKey, ObservationKeyPrefix


## Setting environments

In [2]:
# Global Setting
cur_dir = os.getcwd()
env_dir = os.path.abspath(os.path.join(cur_dir, "..", "Unity6000_Envs"))
test_dir = os.path.abspath(os.path.join(cur_dir, "temp", "pytorch_output"))


### Pytorch Device

In [3]:
# Pytorch Device
if torch.backends.mps.is_available():
    g_device = torch.device("mps")
elif torch.cuda.is_available():
    g_device = torch.device("cuda")
else:
    g_device = torch.device("cpu")

print(g_device)


mps


### Unity Enviroment

In [4]:
# Unity Enviroment
game = "Kart"
os_name = platform.system()

if os_name == 'Linux':
    env_name = os.path.join(env_dir, f"{game}_{os_name}.x86_64")
elif os_name == 'Darwin':
    env_name = os.path.join(env_dir, f"{game}_{os_name}.app")

### Seting parameters for BC Network

In [5]:
# Seting parameters for Behavior Clone Network
state_size = 12 * 4
action_size = 1

load_model = False
train_mode = True

batch_size = 128
learning_rate = 3e-4
discount_factor = 0.9

train_epoch = 50000
test_step = 1000

print_interval = 10
save_interval = 100

unity_base_port = 1900

In [6]:
# NN model : Save and Load
date_time = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
save_path = os.path.join(test_dir, f"saved_models/{game}/BC/{date_time}")
Path(save_path).mkdir(parents=True, exist_ok=True)
save_model_path = os.path.join(save_path, 'Kart_BC.ckpt')
# print(f"save_path :{save_path}")
# print(f"save_model_path :{save_model_path}")
load_path = "" # Need to update

## Actor class -> Behavioral Cloning Actor class

In [7]:
# Actor class -> Behavioral Cloning Actor class
class Actor(torch.nn.Module):
    def __init__(self):
        super(Actor, self).__init__()
        self.fc1 = torch.nn.Linear(state_size, 128)
        self.fc2 = torch.nn.Linear(128, 128)
        self.mu = torch.nn.Linear(128, action_size)

    def forward(self, state):
        x = torch.relu(self.fc1(state))
        x = torch.relu(self.fc2(x))
        return torch.tanh(self.mu(x))

## BCAgent class


In [8]:
# BCAgent class
class BCAgent():
    def __init__(self):
        self.actor = Actor().to(g_device)
        self.optimizer = torch.optim.Adam(self.actor.parameters(), lr=learning_rate)
        self.writer = SummaryWriter(save_path)

        if load_model == True:
            print(f"... Load Model from {load_path}/ckpt ...")
            checkpoint = torch.load(load_path+'/ckpt', map_location=g_device)
            self.actor.load_state_dict(checkpoint["actor"])
            self.optimizer.load_state_dict(checkpoint["optimizer"])

    #
    def get_action(self, state, training=False):
        self.actor.train(training)
        action = self.actor(torch.FloatTensor(state).to(g_device)).cpu().detach().numpy()
        return action

    #
    def train_model(self, state, action):
        losses = []

        rand_idx = torch.randperm(len(state))
        for iter in range(int(np.ceil(len(state)/batch_size))):
            _state = state[rand_idx[iter*batch_size: (iter+1)*batch_size]]
            _action = action[rand_idx[iter*batch_size: (iter+1)*batch_size]]

            action_pred = self.actor(_state)
            loss = F.mse_loss(_action, action_pred).mean()

            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()
            losses.append(loss.item())

        return np.mean(losses)

    #
    def save_model(self):
        print(f"... Save Model to {save_model_path} ...")
        torch.save({
            "actor" : self.actor.state_dict(),
            "optimizer" : self.optimizer.state_dict(),
        }, save_model_path)


    def write_summray(self, loss, epoch):
        self.writer.add_scalar("model/loss", loss, epoch)

## Train BC Model

In [9]:
agent = BCAgent()
# Demonstration
demo_path = os.path.abspath(os.path.join(cur_dir, "demo", "KartAgent.demo"))

if train_mode:
    # get the information of demo
    behavior_spec, demo_buffer = demo_to_buffer(demo_path, 1)
    print(demo_buffer._fields.keys())

    demo_to_tensor = lambda key: torch.FloatTensor(demo_buffer[key]).to(g_device)
    state = demo_to_tensor((ObservationKeyPrefix.OBSERVATION, 0))
    action = demo_to_tensor(BufferKey.CONTINUOUS_ACTION)
    reward = demo_to_tensor(BufferKey.ENVIRONMENT_REWARDS)
    done = demo_to_tensor(BufferKey.DONE)

    ret = reward.clone()
    for t in reversed(range(len(ret) - 1)):
        ret[t] += (1. - done[t]) * (discount_factor * ret[t+1])

    # use the pair of (state, action) which is greater than 0
    state, action = map(lambda x: x[ret > 0], [state, action])

    losses = []
    for epoch in range(1, train_epoch+1):
        loss = agent.train_model(state, action)
        losses.append(loss)

        # record tensorboard
        if epoch % print_interval == 0:
            mean_loss = np.mean(losses)
            print(f"{epoch} Epoch / Loss: {mean_loss:.8f}" )
            agent.write_summray(mean_loss, epoch)
            losses = []

        if epoch % save_interval == 0:
            agent.save_model()



dict_keys([<BufferKey.DONE: 'done'>, <BufferKey.ENVIRONMENT_REWARDS: 'environment_rewards'>, (<ObservationKeyPrefix.OBSERVATION: 'obs'>, 0), <BufferKey.CONTINUOUS_ACTION: 'continuous_action'>, <BufferKey.PREV_ACTION: 'prev_action'>])


  demo_to_tensor = lambda key: torch.FloatTensor(demo_buffer[key]).to(g_device)


10 Epoch / Loss: 0.06342151
20 Epoch / Loss: 0.02161611
30 Epoch / Loss: 0.01580998
40 Epoch / Loss: 0.01405029
50 Epoch / Loss: 0.01238610
60 Epoch / Loss: 0.01151048
70 Epoch / Loss: 0.01084438
80 Epoch / Loss: 0.00972556
90 Epoch / Loss: 0.00950892
100 Epoch / Loss: 0.00916479
... Save Model to /Users/hyunjae.k/110_HyunJae_Git/2025_Playgrounds/Unity_Robotics_Playgrounds/Agent_Scripts/temp/pytorch_output/saved_models/Kart/BC/20250917203537/Kart_BC.ckpt ...
110 Epoch / Loss: 0.00887570
120 Epoch / Loss: 0.00871482
130 Epoch / Loss: 0.00831754
140 Epoch / Loss: 0.00864271
150 Epoch / Loss: 0.00797472
160 Epoch / Loss: 0.00752518
170 Epoch / Loss: 0.00753493
180 Epoch / Loss: 0.00767598
190 Epoch / Loss: 0.00730436
200 Epoch / Loss: 0.00726893
... Save Model to /Users/hyunjae.k/110_HyunJae_Git/2025_Playgrounds/Unity_Robotics_Playgrounds/Agent_Scripts/temp/pytorch_output/saved_models/Kart/BC/20250917203537/Kart_BC.ckpt ...
210 Epoch / Loss: 0.00728161
220 Epoch / Loss: 0.00712113
230 Epo

## Test the pretrained BC model

In [13]:
# Start Play for testing
print("PLAY START-Kart")

# Unity Enviroment
game = "Kart"
os_name = platform.system()
if os_name == 'Linux':
    env_name = os.path.join(env_dir, f"{game}_{os_name}.x86_64")
elif os_name == 'Darwin':
    env_name = os.path.join(env_dir, f"{game}_{os_name}.app")

# Setup Unity Environment
engine_configuration_channel = EngineConfigurationChannel()
env = UnityEnvironment(file_name=env_name,
                       side_channels=[engine_configuration_channel],
                       base_port=unity_base_port)
env.reset()

# setup unity ml agent
behavior_name = list(env.behavior_specs.keys())[0]
spec = env.behavior_specs[behavior_name]
engine_configuration_channel.set_configuration_parameters(time_scale=1.0)
dec, term = env.get_steps(behavior_name)

# TEST
episode, score = 0, 0
for step in range(test_step):
    state = dec.obs[0]
    action = agent.get_action(state, False)
    action_tuple = ActionTuple()
    action_tuple.add_continuous(action)
    env.set_actions(behavior_name, action_tuple)
    env.step()

    dec, term = env.get_steps(behavior_name)
    done = len(term.agent_id) > 0
    reward = term.reward if done else dec.reward
    next_state = term.obs[0] if done else dec.obs[0]
    score += reward[0]

    if done:
        episode += 1

        # print out
        print(f"{episode} Episode / Step: {step} / Score: {score:.2f} ")
        score = 0

env.close()

PLAY START-Kart
[UnityMemory] Configuration Parameters - Can be set up in boot.config
    "memorysetup-allocator-temp-initial-block-size-main=262144"
    "memorysetup-allocator-temp-initial-block-size-worker=262144"
    "memorysetup-bucket-allocator-granularity=16"
    "memorysetup-bucket-allocator-bucket-count=8"
    "memorysetup-bucket-allocator-block-size=4194304"
    "memorysetup-bucket-allocator-block-count=1"
    "memorysetup-main-allocator-block-size=16777216"
    "memorysetup-thread-allocator-block-size=16777216"
    "memorysetup-gfx-main-allocator-block-size=16777216"
    "memorysetup-gfx-thread-allocator-block-size=16777216"
    "memorysetup-cache-allocator-block-size=4194304"
    "memorysetup-typetree-allocator-block-size=2097152"
    "memorysetup-profiler-bucket-allocator-granularity=16"
    "memorysetup-profiler-bucket-allocator-bucket-count=8"
    "memorysetup-profiler-bucket-allocator-block-size=4194304"
    "memorysetup-profiler-bucket-allocator-block-count=1"
    "memo

In [14]:
# Start Play for testing
print("PLAY START-Kart-Country")

# Unity Enviroment
game = "Kart_Country"
os_name = platform.system()
if os_name == 'Linux':
    env_name = os.path.join(env_dir, f"{game}_{os_name}.x86_64")
elif os_name == 'Darwin':
    env_name = os.path.join(env_dir, f"{game}_{os_name}.app")

# Setup Unity Environment
engine_configuration_channel = EngineConfigurationChannel()
env = UnityEnvironment(file_name=env_name,
                       side_channels=[engine_configuration_channel],
                       base_port=unity_base_port)
env.reset()

# setup unity ml agent
behavior_name = list(env.behavior_specs.keys())[0]
spec = env.behavior_specs[behavior_name]
engine_configuration_channel.set_configuration_parameters(time_scale=1.0)
dec, term = env.get_steps(behavior_name)

# TEST
episode, score = 0, 0
for step in range(test_step):
    state = dec.obs[0]
    action = agent.get_action(state, False)
    action_tuple = ActionTuple()
    action_tuple.add_continuous(action)
    env.set_actions(behavior_name, action_tuple)
    env.step()

    dec, term = env.get_steps(behavior_name)
    done = len(term.agent_id) > 0
    reward = term.reward if done else dec.reward
    next_state = term.obs[0] if done else dec.obs[0]
    score += reward[0]

    if done:
        episode += 1

        # print out
        print(f"{episode} Episode / Step: {step} / Score: {score:.2f} ")
        score = 0

env.close()

PLAY START-Kart-Country
[UnityMemory] Configuration Parameters - Can be set up in boot.config
    "memorysetup-allocator-temp-initial-block-size-main=262144"
    "memorysetup-allocator-temp-initial-block-size-worker=262144"
    "memorysetup-bucket-allocator-granularity=16"
    "memorysetup-bucket-allocator-bucket-count=8"
    "memorysetup-bucket-allocator-block-size=4194304"
    "memorysetup-bucket-allocator-block-count=1"
    "memorysetup-main-allocator-block-size=16777216"
    "memorysetup-thread-allocator-block-size=16777216"
    "memorysetup-gfx-main-allocator-block-size=16777216"
    "memorysetup-gfx-thread-allocator-block-size=16777216"
    "memorysetup-cache-allocator-block-size=4194304"
    "memorysetup-typetree-allocator-block-size=2097152"
    "memorysetup-profiler-bucket-allocator-granularity=16"
    "memorysetup-profiler-bucket-allocator-bucket-count=8"
    "memorysetup-profiler-bucket-allocator-block-size=4194304"
    "memorysetup-profiler-bucket-allocator-block-count=1"
 

In [1]:
# Start Play for testing
print("PLAY START-Kart-Mountain")

# Unity Enviroment
game = "Kart_Mountain"
os_name = platform.system()
if os_name == 'Linux':
    env_name = os.path.join(env_dir, f"{game}_{os_name}.x86_64")
elif os_name == 'Darwin':
    env_name = os.path.join(env_dir, f"{game}_{os_name}.app")

# Setup Unity Environment
engine_configuration_channel = EngineConfigurationChannel()
env = UnityEnvironment(file_name=env_name,
                       side_channels=[engine_configuration_channel],
                       base_port=unity_base_port)
env.reset()

# setup unity ml agent
behavior_name = list(env.behavior_specs.keys())[0]
spec = env.behavior_specs[behavior_name]
engine_configuration_channel.set_configuration_parameters(time_scale=1.0)
dec, term = env.get_steps(behavior_name)

# TEST
episode, score = 0, 0
for step in range(test_step):
    state = dec.obs[0]
    action = agent.get_action(state, False)
    action_tuple = ActionTuple()
    action_tuple.add_continuous(action)
    env.set_actions(behavior_name, action_tuple)
    env.step()

    dec, term = env.get_steps(behavior_name)
    done = len(term.agent_id) > 0
    reward = term.reward if done else dec.reward
    next_state = term.obs[0] if done else dec.obs[0]
    score += reward[0]

    if done:
        episode += 1

        # print out
        print(f"{episode} Episode / Step: {step} / Score: {score:.2f} ")
        score = 0

env.close()

PLAY START-Kart-Mountain


NameError: name 'platform' is not defined