# Import packages

In [None]:

from javascript import require, On, Once, AsyncTask, once, off
import math
import os
import json
from actions import Actions
# Set the timeout to 100,000 milliseconds or disable the timeout
# os.environ['REQ_TIMEOUT'] = '10000000'
os.environ['REQ_TIMEOUT'] = '0'

# Import the javascript libraries for Minecraft
mineflayer = require("mineflayer")
pathfinder = require('mineflayer-pathfinder')
collectBlockPlugin = require('mineflayer-collectblock')
blockFinderPlugin = require('mineflayer-blockfinder')
vec3 = require("vec3")

### Global bot parameters

In [None]:
# edit the serverPort based on your actual situation.
serverPort = 50000 
serverHost = "localhost"
reconnect = True

In [None]:
class Agent:
    def __init__(self, playerUsername, botName, serverHost, serverPort, reconnect=True):
        self.playerUsername = playerUsername
        self.botName = botName
        self.serverHost = serverHost
        self.serverPort = serverPort
        self.reconnect = reconnect
        self.bot = None
        self.botArgs = {
            "host": serverHost,
            "port": serverPort,
            "username": botName,
            "hideErrors": False,
        }
        # edit them with paths for hash files provided in your machine.
        self.hash_path = {
            "option": "option_hash.json", 
            "registry": "blueprint_itemsByName.json"
        }
        self.createBot()
        

    def createBot(self):
        self.bot = mineflayer.createBot(self.botArgs)
        self.bot.loadPlugin(pathfinder.pathfinder)
        self.bot.loadPlugin(collectBlockPlugin.plugin)
        self.bot.loadPlugin(blockFinderPlugin)
        self.mcData = require('minecraft-data')(self.bot.version)
        self.movements = pathfinder.Movements(self.bot, self.mcData)
        self.bot.pathfinder.setMovements(self.movements)
        self.actions = Actions(self.bot, self.mcData, self.movements,self.playerUsername)
        self.startEvents()
        #######
        # default for our provided environment map; 
        # you can edit it based on your environment map to specify a pivot position.
        self.pivot_pos = (1, 1, 1) 
        self.option_hash = self.load_option_hash()
        self.registry_hash = self.load_registry_hash()   
        return self.bot
    
    def log(self, message):
        print(f"[{self.bot.username}] {message}")

    def startEvents(self):
        bot = self.bot
        botName = self.botName
        reconnectState = {"reconnect": self.reconnect}

        @On(bot, "login")
        def handleLogin(this):

            botSocket = bot._client.socket
            print(f"[{botName}] Logged in to {botSocket.server if botSocket.server else botSocket._host }")

        @On(bot, "kicked")
        def handleKicked(this, reason, loggedIn):
            if loggedIn:
                print(f"[{botName}] Kicked whilst trying to connect: {reason}")

        @On(bot, "messagestr")
        def handleMessagestr(this, message, messagePosition, jsonMsg, sender, verified=None):
            if messagePosition == "chat" and "quit" in message:
                reconnectState["reconnect"] = False
                this.quit()
            elif messagePosition == "chat" and'come' in message:
                localPlayers = bot.players
                for player in localPlayers:
                        playerData = localPlayers[player]
                        if playerData["uuid"] == sender:
                            target = localPlayers[player].entity
                            
                if not target:
                    bot.chat("I don't see you !")
                    return
                pos = target.position
                bot.pathfinder.setMovements(self.movements)
                bot.pathfinder.setGoal(pathfinder.goals.GoalNear(pos.x, pos.y, pos.z, 1))
            

        @On(bot, "end")
        def handleEnd(this, reason):
            print(f"[{botName}] Disconnected: {reason}")
            # Turn off old events
            off(bot, "login", handleLogin)
            off(bot, "kicked", handleKicked)
            off(bot, "messagestr", handleMessagestr)

            # Reconnect if the reconnect flag is set to True
            if reconnectState["reconnect"]:
                print(f"[{botName}] Attempting to reconnect")
                self.createBot()

            # Last event listener
            off(bot, "end", handleEnd)
        

    def init_env(self):
        '''
        init environment based on the provided map.
        you can edit the "/fill" command with your specific settings and environment.
        '''
        self.bot.chat("/setworldspawn 0 1 0")
        self.bot.chat("/fill 0 0 0 180 0 180 minecraft:barrier")
        self.bot.chat("/fill 0 0 0 -180 0 180 minecraft:barrier")
        self.bot.chat("/fill 0 0 0 180 0 -180 minecraft:barrier")
        self.bot.chat("/fill 0 0 0 -180 0 -180 minecraft:barrier")
        self.bot.chat("/gamerule doDaylightCycle false")
        self.bot.chat("/time set day")
        self.bot.chat("/gamerule sendCommandFeedback false")

    def load_option_hash(self):
        with open(self.hash_path['option'], 'r') as f1:
            option_hash = json.load(f1)
        return option_hash

    def load_registry_hash(self):
        with open(self.hash_path['registry'], 'r') as f1:
            registry_hash = json.load(f1)
        return registry_hash

    def build_by_blueprint(self, blueprint, reverse_block_materials):
        for h in range(len(blueprint)): # y axis
            for d in range(len(blueprint[h])): # z axis
                for w in range(len(blueprint[h][d])): # x axis
                    # adaptation to several MLLM output formats. please improve it according to the actual situation
                    if blueprint[h][d][w] in reverse_block_materials and reverse_block_materials[blueprint[h][d][w]] != 'air' and reverse_block_materials[blueprint[h][d][w]] != 'Air':
                        if '(' in reverse_block_materials[blueprint[h][d][w]]:
                            # print('yes')
                            stairs_name = reverse_block_materials[blueprint[h][d][w]].split('(')[0].strip(' ')
                            orientation = reverse_block_materials[blueprint[h][d][w]].split('(')[1].split(')')[0]
                            flag = self.option_hash[orientation.replace(', ', ',')]
                            if '_' in stairs_name or stairs_name.islower():
                                self.bot.chat(f'/setblock {self.pivot_pos[0]+w} {self.pivot_pos[1]+h} {self.pivot_pos[2]+d} minecraft:{stairs_name}{flag}')
                            else:
                                self.bot.chat(f'/setblock {self.pivot_pos[0]+w} {self.pivot_pos[1]+h} {self.pivot_pos[2]+d} minecraft:{self.registry_hash[stairs_name]}{flag}')
                        else:
                            if '_' in reverse_block_materials[blueprint[h][d][w]] or reverse_block_materials[blueprint[h][d][w]].islower():
                                self.bot.chat(f'/setblock {self.pivot_pos[0]+w} {self.pivot_pos[1]+h} {self.pivot_pos[2]+d} minecraft:{reverse_block_materials[blueprint[h][d][w]]}')
                            else:
                                self.bot.chat(f'/setblock {self.pivot_pos[0]+w} {self.pivot_pos[1]+h} {self.pivot_pos[2]+d} minecraft:{self.registry_hash[reverse_block_materials[blueprint[h][d][w]]]}')

        # print('finished!')


### Set your agents (building agent and observing agent)

In [None]:
# format: Agent(playerUsername, botName, ...)
agent1 = Agent("forture123123","Agent", "localhost", serverPort)
agent2 = Agent("jessica030327", "bot", "localhost", serverPort)


###
# If you wanna use the visualization tool 'Mineflayer Viewer', just uncomment the following codes, and comment the above 'agent2=Agent()' code line.
# You need to install canvas and corresponding requirements.
###

# canvas = require('canvas')
# viewer = require('prismarine-viewer').mineflayer
# puppeteer = require('puppeteer')
# serverPortViewer = 3100
# new_viewer = viewer(agent2.bot, {'port': serverPortViewer, 'firstPerson': True})
# browser = puppeteer.launch({'executablePath': 'C:/Program Files/Google/Chrome/Application/chrome.exe'})
# page = browser.newPage()
# page.goto(f'http://localhost:3100')


# Data curation

Automatically generate raw data based on real buildings to obtain blueprint data.

In [None]:
from ast import literal_eval
import re
import json
import re


def clean_ansi_escape_codes(s):
    ansi_escape = re.compile(r'\x1b\[[\d;]*[A-Za-z]')
    return ansi_escape.sub('', s)

def clear_irc_color(string):
    pattern = r'\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))'
    return re.sub(pattern, '', string)


def parse_dirty_string(s): # maybe still have some bugs..
    '''
    convert JS class to JSON format.
    e.g.:
    # input_str = "{ waterlogged: false, type: 'bottom' }"
    # print(parse_custom_dict(input_str))  # output: {'waterlogged': False, 'type': 'bottom'}
    '''
    s = s.replace("[Object]", "[]")
    s = clear_irc_color(s)
    
    s = s.replace('\n', '').replace('\r', '') 
    s = re.sub(r'\s+', ' ', s)  
    
    s = re.sub(r'{\s+', '{', s) 
    s = re.sub(r'\s+}', '}', s)  
    
    s = re.sub(r'([{,]\s*)(\w+)(\s*:)',  
               lambda m: f'{m.group(1)}"{m.group(2)}"{m.group(3)}', s)
    s = re.sub(r':\s*(\w+)(\s*[,}])',  
               lambda m: f': "{m.group(1)}"{m.group(2)}' if not m.group(1).lower() in ['true','false','null'] else f': {m.group(1).lower()}{m.group(2)}', 
               s)
    
    s = re.sub(r':\s*true\b', ': true', s, flags=re.IGNORECASE)
    s = re.sub(r':\s*false\b', ': false', s, flags=re.IGNORECASE)
    s = re.sub(r"''", '""', s)
    s = re.sub(r"'([^']+)'", r'"\1"', s)
    try:
        return json.loads(s)
    except json.JSONDecodeError:
        return {}




def output_raw_blockat_data(position1, position2, arch_name):
    agent1.bot.chat(f'/tp Agent {position1[0]-8} {position1[1]} {position1[2]}')
    out = {}
    for w in range(position1[0], position2[0]+1):
        for h in range(position1[1], position2[1]+1):
            for d in range(position1[2], position2[2]+1):
                Position = vec3(w, h, d)
                position_str = f"{w}_{h}_{d}"
                block = agent1.bot.blockAt(Position)
                if block == None:
                    continue
                block_dict = {
                    "type": block.type,
                    "metadata": block.metadata,
                    # "light": block.light,
                    # "skyLight": block.skyLight,
                    # "biome": block.biome,
                    "position": {"x": block.position.x, "y": block.position.y, "z": block.position.z},
                    "stateId": block.stateId,
                    "computedStates": block.computedStates,
                    "name": block.name,
                    "hardness": block.hardness,
                    "displayName": block.displayName,
                    "shapes": block.shapes,
                    "boundingBox": block.boundingBox,
                    "transparent": block.transparent,
                    "diggable": block.diggable,
                    "material": block.material,
                    "harvestTools": block.harvestTools,
                    "drops": block.drops,
                    "_properties": parse_dirty_string(str(block.getProperties())),
                    "isWaterlogged": block.isWaterlogged,  # Handle potential undefined values
                    "entity": block.entity if block.entity is None else parse_dirty_string(str(block.entity)),
                    "painting": block.painting
                }

                out[position_str] = block_dict

    with open(f'./benchmark_data/raw_data_output/{arch_name}.json', 'w') as f1: # edit the output path, or add it into parameters of python functions.
        json.dump(out, f1, indent=4)


with open('village_input.json', 'r') as f1: # an example JSON file to mark the starting block and ending block.
    original_input = json.load(f1)
for k, v in original_input.items():
    print(k)
    output_raw_blockat_data(v["start_pos"], v["end_pos"], k)
    
    # break


