# Libraries

In [None]:
import gym
import matplotlib.pyplot as plt
from pyswip import Prolog
from utils import process_state, perform_action,extract_monsters, set_health, check_death
from nle import nethack
from utilsbattle import get_money_location, get_stair_location, get_target_location, get_reference_des
import run

# Simulation parameters

In [None]:
NUM_EPISODES = 5
MAX_STEPS = 100
PATH = 'kb.pl'
RISK_THRESHOLD=50# when the agent feels it's not healthy anymore
MONEY_THRESHOLD=1
MONEY=40
ARROWS= 2
ARROWS_THRESHOLD=4

Initialize the knowledge base.

In [None]:
KB = Prolog()
KB.consult(PATH)

#KB.consult(BATTLE_PATH)


#loading initial agent paramters into the knowledge base
#KB.asserta(f"health_threshold({RISK_THRESHOLD})")
#KB.asserta(f"money_threshold({MONEY_THRESHOLD})")
#KB.asserta(f"money({MONEY})")
#KB.retractall('onPlan(_)')
#KB.asserta(f"arrows_threshold({ARROWS_THRESHOLD})")
#KB.asserta(f"arrows({ARROWS})")





#### Main code
- Perform `NUM_EPISODES` experiences in the environment.
- Use `Prolog` to define the axioms and choose the action to perform.
- The main goal is to _reach and eat_ the `apple`.

In [13]:
def vacuum_experiment(des_file,room_type,starting_money,starting_health = 16,starting_items=[],debug = False) :

    rewards = []
    action=None

    #ran=run.Run()
    for _ in range(1):

        #set agent parameters
        KB.asserta(f"health_threshold({RISK_THRESHOLD})")
        KB.asserta(f"money_threshold({MONEY_THRESHOLD})")
        KB.asserta(f"money({starting_money})")
        KB.retractall('onPlan(_)')
        KB.retractall('battle_begin')
        #des_file=ran.getdes()
        #room_type=ran.gettype()
        
        env = gym.make("MiniHack-Skill-Custom-v0",
                        character="Agent-cav-hum-neu-mal",
                        observation_keys=('screen_descriptions','inv_strs','blstats','message','pixel','chars','inv_oclasses','glyphs'),
                        wizard = True,
                        max_episode_steps = MAX_STEPS ,
                        options =(
                            "color",  # Display color for different monsters, objects, etc
                            "showexp",  # Display the experience points on the status line
                            "nobones",  # Disallow saving and loading bones files
                            "nolegacy",  # Not display an introductory message when starting the game
                            "nocmdassist",  # No command assistance
                            "disclose:+i +a +v +g +c +o",  # End of game prompt replies
                            "runmode:teleport",  # Update the map after movement has finished
                            "mention_walls",  # Give feedback when walking against a wall
                            "nosparkle",  # Not display sparkly effect for resisted magical attacks
                            "showscore",  # Shows approximate accumulated score on the bottom line
                            "race:dwa","gender:mal","align:neu","role:cav",
                            "!autopickup",
                            "pettype:none",
                        ),
                        des_file=des_file,
                )
        obs=env.reset(wizkit_items = starting_items + ['20 scroll of fire','6 potion of confusion','unicorn horn'])
        #print(des_file)
        # count the number of steps of the current episode
        steps = 0
        # store the cumulative reward
        reward = 0.0
        # collect obs['pixel'] to visualize
        ep_states = []
        #agent starts in the shop
        in_battle = False
        #agent start with no plan for battle
        planned_actions = False
    

        #process level information
        if any(map(lambda x:x==room_type,['mazeroom','minibossroom'])):
            KB.asserta("corridors")
        monsters=extract_monsters(des_file)
        goal_x,goal_y=-1,-1
        level_heigth=len(obs['screen_descriptions'])
        level_width=len(obs['screen_descriptions'][0])
        for i in range(level_heigth):
            for j in range(level_width):
                objs = bytes(obs['screen_descriptions'][i][j]).decode('utf-8').rstrip('\x00')
                if 'closed door' in objs and i > goal_x:
                    goal_x,goal_y=i,j
        KB.asserta(f'battlefield_start({goal_x},{goal_y})')

        print(f'battlefield_start({goal_x},{goal_y})')
            
        ep_states.append(obs['pixel'])
        done = False

        #set health to starting health (max 16)
        obs, reward, done, info= set_health(env,obs,starting_health)
        for item in obs['inv_strs']:
            item = bytes(item).decode('utf-8').rstrip('\x00')
            print(item)

        

        #---- obtain information about gold and stairs down of the level ------#
        game_map = obs['chars']
        game = obs['pixel']

        #retrieve map informations
        amulet_des=get_reference_des(des_file)
        amulet_map = get_target_location(game_map,symbol='"')
                
        stair_des_pos = get_stair_location(des_file)

        #when looking at des to get information, the position of the items in the real envirment is translated by a certain vector
        delta_x, delta_y = (amulet_map[0]-amulet_des[0],amulet_map[1]-amulet_des[1])


        gold_pos_list = get_money_location(des_file)

        gold_pos_list = [(x+delta_x,y+delta_y)for (_,(x,y)) in gold_pos_list] #modify here if interested in gold amount on ground

        #tell to KB gold positions
        for (x,y) in gold_pos_list :
            KB.asserta(f'position(gold,_,{x},{y})')
        
        #tell to KB stairs down location
        target = (stair_des_pos[0]+delta_x,stair_des_pos[1]+delta_y)
        KB.asserta(f'position(stairs,_,{target[0]},{target[1]})')

        #------ end level processing ------------#

        #---- initialize metric to measure ------#
        metric=dict()
        metric["steps"]=0
        metric["damage"]=0
        metric["money_gain"]=0
        metric["money_loss"]=0
        metric["healing_count"]=0
        # Main loop
        while not done and steps < MAX_STEPS:

            process_state(obs, KB)

            #check if process state has seen that we are in the position to start battle
            if (not in_battle) and len(list(KB.query('battle_begin'))) > 0 : #if yes switch to battle mode
            
                in_battle = True
                #retrieve money amount
                

            #only done to identify monster position on 
            if not in_battle :
                for monster_name,pos_x,pos_y in monsters:
                    KB.asserta(f"position(enemy,\'{monster_name}\',{pos_x},{pos_y})")
            try:
                actions=list(KB.query('action(X)'))
                action = actions[0]
                action = action['X']
            except Exception as e:
                #print("no action available")
                #print(list(KB.query("position(object,Type,R,C)")))
                print(e)
                action = None
            # Perform the action in the environment
            print(f'ACTION:\t {action}')

            if action:
                # gathering info for metric measurement
                agent_r=obs['blstats'][1]
                agent_c=obs['blstats'][0]
                agent_health=int(obs['blstats'][10])
                agent_money= int(list(KB.query("money(M)"))[0]["M"])

                obs, reward, done, info ,planned_actions = perform_action(action, env,KB,planned_actions,obs)
                
                #compare old state with the new one for metric measurments
                agent_new_money=int(list(KB.query("money(M)"))[0]["M"])
                agent_new_health= int(obs['blstats'][10])
                if not done :
                    if agent_r - obs['blstats'][1] != 0 or agent_c - obs['blstats'][0] != 0:
                        metric["steps"]+=1
                    if agent_health > agent_new_health:
                        metric["damage"]+=agent_health - agent_new_health
                    elif 'quaff' in action or 'eat' in action :
                        metric["healing_count"]+=1
                    if agent_new_money > agent_money:
                        metric["money_gain"]+= agent_new_money - agent_money
                    elif agent_new_money < agent_money:
                        metric["money_loss"]+= agent_money - agent_new_money
                
                ep_states.append(obs['pixel'])
                if debug: env.render()
                #env.render()
            else:
                break
            steps += 1

            is_dead, final_obs, final_reward, final_done, final_info = check_death(env,obs)
            if is_dead :
                obs = final_obs
                reward = final_reward
                done = final_done
                info = final_info

        if steps == MAX_STEPS:
            print("reached maximum amount of steps")
        if debug: env.render()
        # Display game with interface
        #show_match(ep_states)
        rewards.append(reward)
        
        # reset the environment and retract axioms that may cause errors
        #print(list(KB.query('battle_begin')))
        #print(list(KB.query('position(stairs,_,X,Y)')))
        metric["health"]= int(obs['blstats'][10])
        metric["total_money"]= int(list(KB.query("money(M)"))[0]["M"])

        obs = env.reset(wizkit_items = ['healing potion','20 scroll of fire','6 potion of confusion','unicorn horn'])
        KB.retractall('stepping_on(agent,_,_)')
        KB.retractall('position(_,_,_,_)')
        KB.retractall('shopping_done')
        KB.retractall('corridors')
        KB.retractall('battle_begin')
        KB.retractall('battlefield_start(_,_)')
        KB.retractall('money(_)')
        #ran.nextlevel()

    #print(f'After {NUM_EPISODES} episodes, mean return is {sum(rewards)/NUM_EPISODES}')
    #print("The rewards of the episodes are:", rewards)
    return metric, info

In [12]:
for item in obs['inv_strs']:
    item = bytes(item).decode('utf-8').rstrip('\x00')
    print(item)

NameError: name 'obs' is not defined

In [None]:
for idx,action in enumerate(env.actions):
    print(idx,action)

In [None]:
#27 to drop
#69 to take off
#52 quaff
#54 read
#env.step(54)
#env.render()
#env.step(env.actions.index(ord('f')))
#you feel less confused now.
#env.render()

set_health(env,1)
obs,_,_,_=env.step(1)
env.render()


In [14]:
ran = run.Run(seed=0)
all_metrics = []
all_info = []
money = MONEY
health = 16
items=[]
i = 1
debug = False
for _ in range(5):
    print(f'\tLEVEL:{i}')
    if i == 2: debug = True
    metric, info = vacuum_experiment(ran.getdes(),ran.gettype(),money,starting_health=health,starting_items=items, debug= debug)
    all_metrics.append(metric)
    all_info.append(info)
    health = metric['health']
    money = metric['total_money']
    if health == 0 : break
    ran.nextlevel()
    i +=1



	LEVEL:1




battlefield_start(17,42)
a +1 club (weapon in hand)
a +2 sling (alternate weapon; not wielded)
15 uncursed flint stones (in quiver pouch)
27 uncursed rocks
an uncursed +0 leather armor (being worn)


















































ACTION:	 get_key(northeast)
ACTION:	 get_key(northeast)
ACTION:	 get_key(northeast)
ACTION:	 get_key(northeast)
ACTION:	 get_key(east)
ACTION:	 get_key(east)
ACTION:	 get_key(east)
ACTION:	 get_key(east)
ACTION:	 get_key(east)
ACTION:	 get_key(east)
ACTION:	 pick_key
ACTION:	 get_key(north)
ACTION:	 pick_key
ACTION:	 apply(east)
ACTION:	 get_into_battlefield(east)
ACTION:	 get_into_battlefield(east)
ACTION:	 get_into_battlefield(east)
ACTION:	 get_into_battlefield(east)
ACTION:	 get_into_battlefield(south)
ACTION:	 get_into_battlefield(south)
ACTION:	 get_into_battlefield(south)
ACTION:	 get_into_battlefield(south)
ACTION:	 get_into_battlefield(south)
ACTION:	 get_into_battlefield(south)
ACTION:	 get_into_battlefield(south)
ACTION:	 get_into_



battlefield_start(17,42)
What do you want to use or apply? [h or ?*] 
a +1 club (weapon in hand)
a +2 sling (alternate weapon; not wielded)
22 uncursed flint stones (in quiver pouch)
20 uncursed rocks
an uncursed +0 leather armor (being worn)


















































ACTION:	 get_to_item(northeast)

[0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m [0;30m 
[0;30m [0;30m [0;30m [0;30m 



In [9]:
all_metrics

[{'steps': 57,
  'damage': 2,
  'money_gain': 20,
  'money_loss': 0,
  'healing_count': 0,
  'health': 16,
  'total_money': 60},
 {'steps': 36,
  'damage': 17,
  'money_gain': 5,
  'money_loss': 10,
  'healing_count': 0,
  'health': 0,
  'total_money': 55}]

In [10]:
all_info

[{'end_status': <StepStatus.TASK_SUCCESSFUL: 2>, 'is_ascended': False},
 {'end_status': <StepStatus.DEATH: 1>, 'is_ascended': False}]