# ADTs Gymnasium Env

In [5]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import json
from adt_env import AttackDefenseTreeEnv

# Set random seed for reproducibility
np.random.seed(42)

In [6]:
# Load the JSON
with open('envs/adt_nuovo_env.json', 'r') as f:
    env_spec = json.load(f)

In [3]:
# Explore the state space
print("State Space Variables:")
state_df = pd.DataFrame.from_dict(env_spec['state_space'], orient='index')
display(state_df)

State Space Variables:


Unnamed: 0,type,low,high,description
DataExfiltration,discrete,0,1,Goal achievement state
current_player,discrete,0,1,"Current player (0=attacker, 1=defender)"
AccesstoSensitiveFiles,discrete,0,2,Attacker attribute: AccesstoSensitiveFiles
AccesstoReverseShell,discrete,0,2,Attacker attribute: AccesstoReverseShell
AccesstoMySQL,discrete,0,2,Attacker attribute: AccesstoMySQL
WebReconSuccesful,discrete,0,2,Attacker attribute: WebReconSuccesful
AccesstoExecuteArbitraryCode,discrete,0,2,Attacker attribute: AccesstoExecuteArbitraryCode
UnencryptedFiles,discrete,1,2,Initial system attribute: UnencryptedFiles
SOCKS5ProxyActive,discrete,1,2,Initial system attribute: SOCKS5ProxyActive
MisconfiguredApache,discrete,1,2,Initial system attribute: MisconfiguredApache


In [8]:
# Explore attacker actions
print("Attacker Actions:")
attacker_actions = []
for action_id, action_spec in env_spec['action_space']['attacker']['actions'].items():
    attacker_actions.append({
        'ID': action_id,
        'Name': action_spec['name'],
        'Effect': action_spec['effect'],
        'Cost': action_spec['cost'],
        'Preconditions': action_spec['preconditions']['conditions'],
        'Logic': action_spec['preconditions']['logic'],
        'Refinement': action_spec['refinement']
    })

attacker_df = pd.DataFrame(attacker_actions)
display(attacker_df)

Attacker Actions:


Unnamed: 0,ID,Name,Effect,Cost,Preconditions,Logic,Refinement
0,0,exfiltrateData,DataExfiltration,50,"[AccesstoMySQL, AccesstoExecuteArbitraryCode]",OR,disjunctive
1,1,getLoginData,AccesstoMySQL,10,"[UnencryptedFiles, AccesstoSensitiveFiles]",AND,conjunctive
2,2,bufferOverflow,AccesstoExecuteArbitraryCode,40,"[SOCKS5ProxyActive, WebReconSuccesful]",AND,conjunctive
3,3,getFiles,AccesstoSensitiveFiles,30,"[MisconfiguredApache, AccesstoReverseShell]",AND,conjunctive
4,4,webRecon,WebReconSuccesful,5,"[WebserverPubliclyExposed, WebserverPubliclyEx...",OR,disjunctive
5,5,pathTraversal,AccesstoReverseShell,20,"[CGIscriptsenabled, WebReconSuccesful, Vulnera...",AND,conjunctive


In [5]:
# Explore defender actions
print("Defender Actions:")
defender_actions = []
for action_id, action_spec in env_spec['action_space']['defender']['actions'].items():
    defender_actions.append({
        'ID': action_id,
        'Name': action_spec['name'],
        'Effect': action_spec['effect'],
        'Cost': action_spec['cost']
    })

defender_df = pd.DataFrame(defender_actions)
display(defender_df)

Defender Actions:


Unnamed: 0,ID,Name,Effect,Cost
0,0,changeCredentials,AccesstoMySQL,200
1,1,changeFilePermissions,AccesstoSensitiveFiles,60
2,2,encryptFile,UnencryptedFiles,150
3,3,deactivateSOCKS5Proxy,SOCKS5ProxyActive,120
4,4,reconfigureApache,MisconfiguredApache,50
5,5,disableCGIScripts,CGIscriptsenabled,45
6,6,updateApache,VulnerableApacheHTTPServerVersion,20


In [4]:
# Create the environment
env = AttackDefenseTreeEnv("envs/adt_nuovo_env.json", render_mode="human")

print(f"Environment loaded successfully!")
print(f"State space: {env.observation_space}")
print(f"Action space: {env.action_space}")
print(f"Goal: {env.goal}")
print(f"Number of attacker actions: {env.num_attacker_actions}")
print(f"Number of defender actions: {env.num_defender_actions}")

Number of attacker actions: 6
Number of defender actions: 7
Environment loaded successfully!
State space: Box([0 0 0 0 0 0 0 1 1 1 1 1 1], [1 1 2 2 2 2 2 2 2 2 2 2 2], (13,), int32)
Action space: Discrete(7)
Goal: DataExfiltration
Number of attacker actions: 6
Number of defender actions: 7


In [7]:
# Reset environment and examine initial state
obs, info = env.reset()
print("Initial observation vector:", obs)
print("Initial info:", info)
print("\nInitial state dictionary:")
for key, value in env.state.items():
    print(f"  {key}: {value}")

Initial observation vector: [0 0 0 0 0 0 0 1 1 1 1 1 1]
Initial info: {'current_player': 'attacker', 'goal': 'DataExfiltration', 'terminal': False}

Initial state dictionary:
  DataExfiltration: 0
  current_player: 0
  AccesstoSensitiveFiles: 0
  AccesstoReverseShell: 0
  AccesstoMySQL: 0
  WebReconSuccesful: 0
  AccesstoExecuteArbitraryCode: 0
  UnencryptedFiles: 1
  SOCKS5ProxyActive: 1
  MisconfiguredApache: 1
  WebserverPubliclyExposed: 1
  CGIscriptsenabled: 1
  VulnerableApacheHTTPServerVersion: 1


In [8]:
# Function to get action details
def get_action_details(env, player, action_id):
    if player == "attacker":
        return env.attacker_actions[str(action_id)]
    else:
        return env.defender_actions[str(action_id)]

# Test available actions for current player
available_actions = env.get_available_actions()
current_player = "attacker" if env.state["current_player"] == 0 else "defender"

print(f"Current player: {current_player}")
print(f"Available actions: {available_actions}")
print("\nAction details:")
for action_id in available_actions:
    action_spec = get_action_details(env, current_player, action_id)
    print(f"  Action {action_id}: {action_spec['name']} (cost: {action_spec['cost']})")

Available actions for attacker: {'0': {'name': 'exfiltrateData', 'preconditions': {'conditions': ['AccesstoMySQL', 'AccesstoExecuteArbitraryCode'], 'logic': 'OR', 'required_values': {'AccesstoMySQL': 1, 'AccesstoExecuteArbitraryCode': 1}}, 'effect': 'DataExfiltration', 'cost': 50, 'refinement': 'disjunctive'}, '1': {'name': 'getLoginData', 'preconditions': {'conditions': ['UnencryptedFiles', 'AccesstoSensitiveFiles'], 'logic': 'AND', 'required_values': {'UnencryptedFiles': 1, 'AccesstoSensitiveFiles': 1}}, 'effect': 'AccesstoMySQL', 'cost': 10, 'refinement': 'conjunctive'}, '2': {'name': 'bufferOverflow', 'preconditions': {'conditions': ['SOCKS5ProxyActive', 'WebReconSuccesful'], 'logic': 'AND', 'required_values': {'SOCKS5ProxyActive': 1, 'WebReconSuccesful': 1}}, 'effect': 'AccesstoExecuteArbitraryCode', 'cost': 40, 'refinement': 'conjunctive'}, '3': {'name': 'getFiles', 'preconditions': {'conditions': ['MisconfiguredApache', 'AccesstoReverseShell'], 'logic': 'AND', 'required_values':

In [9]:
# Test a single action step
if available_actions:
    test_action = available_actions[0]  # Take first available action
    action_spec = get_action_details(env, current_player, test_action)
    
    print(f"Testing action {test_action}: {action_spec['name']}")
    print(f"Before action - {action_spec['effect']}: {env.state[action_spec['effect']]}")
    
    obs, reward, terminated, truncated, info = env.step(test_action)
    
    print(f"After action - {action_spec['effect']}: {env.state[action_spec['effect']]}")
    print(f"Reward: {reward}")
    print(f"Terminated: {terminated}")
    print(f"Action info: {info}")

Testing action 4: webRecon
Before action - WebReconSuccesful: 0
After action - WebReconSuccesful: 1
Reward: -5
Terminated: False
Action info: {'current_player': 'defender', 'action_taken': 'webRecon', 'action_valid': True, 'terminal': False}


In [10]:
# Reset for full game simulation
env.reset()

# Track game history
game_history = []
step = 0
max_steps = 20

print("=== Full Game Simulation ===")
env.render()

while step < max_steps:
    current_player = "attacker" if env.state["current_player"] == 0 else "defender"
    available_actions = env.get_available_actions()
    
    if available_actions:
        # Choose action (random for demo)
        action = np.random.choice(available_actions)
        action_spec = get_action_details(env, current_player, action)
    else:
        # No actions available, choose wait/pass
        if current_player == "attacker":
            action = env.num_attacker_actions - 1
        else:
            action = env.num_defender_actions - 1
        action_spec = {"name": "wait", "cost": 0}
    
    # Execute action
    obs, reward, terminated, truncated, info = env.step(action)
    
    # Record step
    game_history.append({
        'step': step + 1,
        'player': current_player,
        'action': action_spec['name'],
        'reward': reward,
        'terminated': terminated,
        'goal_state': env.state[env.goal]
    })
    
    print(f"\nStep {step + 1}: {current_player} -> {action_spec['name']} (reward: {reward})")
    
    if terminated or truncated:
        print(f"\n=== Game Ended after {step + 1} steps! ===")
        env.render()
        break
    
    step += 1

# Display game history
print("\n=== Game History ===")
history_df = pd.DataFrame(game_history)
display(history_df)

=== Full Game Simulation ===

=== Current Player: Attacker ===
Goal (DataExfiltration): 0
Key Attributes:
  DataExfiltration: 0
  AccesstoSensitiveFiles: 0
  AccesstoReverseShell: 0
  AccesstoMySQL: 0
  WebReconSuccesful: 0
  AccesstoExecuteArbitraryCode: 0
  UnencryptedFiles: 1
  SOCKS5ProxyActive: 1
  MisconfiguredApache: 1
  WebserverPubliclyExposed: 1
  CGIscriptsenabled: 1
  VulnerableApacheHTTPServerVersion: 1
Available actions for attacker: {'0': {'name': 'exfiltrateData', 'preconditions': {'conditions': ['AccesstoMySQL', 'AccesstoExecuteArbitraryCode'], 'logic': 'OR', 'required_values': {'AccesstoMySQL': 1, 'AccesstoExecuteArbitraryCode': 1}}, 'effect': 'DataExfiltration', 'cost': 50, 'refinement': 'disjunctive'}, '1': {'name': 'getLoginData', 'preconditions': {'conditions': ['UnencryptedFiles', 'AccesstoSensitiveFiles'], 'logic': 'AND', 'required_values': {'UnencryptedFiles': 1, 'AccesstoSensitiveFiles': 1}}, 'effect': 'AccesstoMySQL', 'cost': 10, 'refinement': 'conjunctive'},

Unnamed: 0,step,player,action,reward,terminated,goal_state
0,1,attacker,webRecon,-5,False,0
1,2,defender,changeCredentials,-200,False,0
2,3,attacker,pathTraversal,-20,False,0
3,4,defender,changeFilePermissions,-60,False,0
4,5,attacker,bufferOverflow,-40,False,0
5,6,defender,wait,0,False,0
6,7,attacker,exfiltrateData,-550,True,1


In [9]:
# Let's compare with the PRISM model
import sys
sys.path.append('../')
import tree_to_prism as tp
from tree import Tree

# Load the same tree and generate PRISM model
tree = tp.parse_file('../trees/adt_nuovo.xml')
df = tree.to_dataframe()
goal, actions_to_goal, initial_attributes, attacker_actions, defender_actions, df_attacker, df_defender = tp.get_info(df)

print("PRISM Model Attacker Actions:")
for action_name, action_data in attacker_actions.items():
    print(f"{action_name}: preconditions={action_data['preconditions']}, refinement={action_data['refinement']}")

print("\nGym Environment Attacker Actions:")
for action_id, action_spec in env_spec['action_space']['attacker']['actions'].items():
    print(f"{action_spec['name']}: preconditions={action_spec['preconditions']['conditions']}, refinement={action_spec['refinement']}")

PRISM Model Attacker Actions:
exfiltrateData: preconditions=['AccesstoExecuteArbitraryCode', 'AccesstoMySQL'], refinement=disjunctive
getLoginData: preconditions=['UnencryptedFiles', 'AccesstoSensitiveFiles'], refinement=conjunctive
bufferOverflow: preconditions=['SOCKS5ProxyActive', 'WebReconSuccesful'], refinement=conjunctive
getFiles: preconditions=['AccesstoReverseShell', 'MisconfiguredApache'], refinement=conjunctive
webRecon: preconditions=['WebserverPubliclyExposed', 'WebserverPubliclyExposed'], refinement=disjunctive
pathTraversal: preconditions=['VulnerableApacheHTTPServerVersion', 'CGIscriptsenabled', 'WebReconSuccesful'], refinement=conjunctive

Gym Environment Attacker Actions:
exfiltrateData: preconditions=['AccesstoMySQL', 'AccesstoExecuteArbitraryCode'], refinement=disjunctive
getLoginData: preconditions=['UnencryptedFiles', 'AccesstoSensitiveFiles'], refinement=conjunctive
bufferOverflow: preconditions=['SOCKS5ProxyActive', 'WebReconSuccesful'], refinement=conjunctive
g

In [10]:
# Let's examine the raw tree dataframe to understand the issue
print("Raw tree dataframe for webRecon action:")
webRecon_rows = df[df['Action'] == 'webRecon']
print(webRecon_rows[['Label', 'Action', 'Role', 'Parent', 'Children', 'Refinement']].to_string())

print("\n" + "="*80)
print("Raw tree dataframe for pathTraversal action:")
pathTraversal_rows = df[df['Action'] == 'pathTraversal']
print(pathTraversal_rows[['Label', 'Action', 'Role', 'Parent', 'Children', 'Refinement']].to_string())

Raw tree dataframe for webRecon action:
       Label    Action      Role             Parent                    Children   Refinement
14  webRecon  webRecon  Attacker  WebReconSuccesful  {WebserverPubliclyExposed}  disjunctive
24  webRecon  webRecon  Attacker  WebReconSuccesful  {WebserverPubliclyExposed}  disjunctive

Raw tree dataframe for pathTraversal action:
            Label         Action      Role                Parent                                                                   Children   Refinement
20  pathTraversal  pathTraversal  Attacker  AccesstoReverseShell  {VulnerableApacheHTTPServerVersion, CGIscriptsenabled, WebReconSuccesful}  conjunctive


In [11]:
# Test the fixed environment
print("=== TESTING FIXED ENVIRONMENT ===")

# Load the fixed JSON
with open('envs/adt_nuovo_env_fixed.json', 'r') as f:
    env_spec_fixed = json.load(f)

# Compare PRISM vs Fixed Environment
tree_fresh = tp.parse_file('../trees/adt_nuovo.xml')
df_fresh = tree_fresh.to_dataframe()
goal_fresh, actions_to_goal_fresh, initial_attributes_fresh, attacker_actions_fresh, defender_actions_fresh, df_attacker_fresh, df_defender_fresh = tp.get_info(df_fresh)

print("FIXED PRISM Model Attacker Actions:")
for action_name, action_data in attacker_actions_fresh.items():
    print(f"{action_name}: preconditions={action_data['preconditions']}, refinement={action_data['refinement']}")

print("\nFIXED Gym Environment Attacker Actions:")
for action_id, action_spec in env_spec_fixed['action_space']['attacker']['actions'].items():
    print(f"{action_spec['name']}: preconditions={action_spec['preconditions']['conditions']}, refinement={action_spec['refinement']}")

print("\n=== Checking for duplicates ===")
for action_name, action_data in attacker_actions_fresh.items():
    prec = action_data['preconditions']
    if len(prec) != len(set(prec)):
        print(f"WARNING: {action_name} still has duplicates: {prec}")
    else:
        print(f"OK: {action_name} has no duplicates: {prec}")

=== TESTING FIXED ENVIRONMENT ===
FIXED PRISM Model Attacker Actions:
exfiltrateData: preconditions=['AccesstoExecuteArbitraryCode', 'AccesstoMySQL'], refinement=disjunctive
getLoginData: preconditions=['UnencryptedFiles', 'AccesstoSensitiveFiles'], refinement=conjunctive
bufferOverflow: preconditions=['SOCKS5ProxyActive', 'WebReconSuccesful'], refinement=conjunctive
getFiles: preconditions=['AccesstoReverseShell', 'MisconfiguredApache'], refinement=conjunctive
webRecon: preconditions=['WebserverPubliclyExposed', 'WebserverPubliclyExposed'], refinement=disjunctive
pathTraversal: preconditions=['VulnerableApacheHTTPServerVersion', 'CGIscriptsenabled', 'WebReconSuccesful'], refinement=conjunctive

FIXED Gym Environment Attacker Actions:
exfiltrateData: preconditions=['AccesstoMySQL', 'AccesstoExecuteArbitraryCode'], refinement=disjunctive
getLoginData: preconditions=['AccesstoSensitiveFiles', 'UnencryptedFiles'], refinement=conjunctive
bufferOverflow: preconditions=['SOCKS5ProxyActive', 

In [13]:
# Debug the webRecon issue specifically - examining Children field
print("Debug webRecon processing:")

# Get fresh tree and dataframe
tree_debug = tp.parse_file('../trees/adt_nuovo.xml')
df_debug = tree_debug.to_dataframe()

# Show all webRecon rows
webRecon_rows = df_debug[df_debug['Action'] == 'webRecon']
print("All webRecon rows:")
for idx, row in webRecon_rows.iterrows():
    print(f"Row {idx}: Children={row['Children']} (type: {type(row['Children'])})")
    print(f"  Converting to list: {list(row['Children'])}")
    print(f"  Role filtering: {[p for p in row['Children'] if row['Role']==df_debug.loc[df_debug['Label'] == p]['Role'].values[0]]}")

# Get just the role filtering step to see what happens
df_defender_debug = df_debug.loc[df_debug["Role"] == "Defender"]
print(f"\nDefender labels: {df_defender_debug['Label'].values}")

# Now see what the actual filtering does for the first webRecon
first_webRecon = webRecon_rows.iloc[0]
raw_children = first_webRecon['Children'] 
print(f"\nFirst webRecon raw children: {raw_children}")
filtered_children = [p for p in raw_children if first_webRecon["Role"]==df_debug.loc[df_debug["Label"] == p]["Role"].values[0]]
print(f"After role filtering: {filtered_children}")
final_filtered = [p for p in filtered_children if p not in df_defender_debug["Label"].values]
print(f"After defender filtering: {final_filtered}")

# Test creating a fresh action dict
test_action = {
    "preconditions": final_filtered,
    "effect": first_webRecon["Parent"], 
    "refinement": df_debug.loc[df_debug['Label'] == first_webRecon["Parent"]]["Refinement"].values[0]
}
print(f"\nTest action dict: {test_action}")

Debug webRecon processing:
All webRecon rows:
Row 14: Children={'WebserverPubliclyExposed'} (type: <class 'set'>)
  Converting to list: ['WebserverPubliclyExposed']
  Role filtering: ['WebserverPubliclyExposed']
Row 24: Children={'WebserverPubliclyExposed'} (type: <class 'set'>)
  Converting to list: ['WebserverPubliclyExposed']
  Role filtering: ['WebserverPubliclyExposed']

Defender labels: ['ChangeCredentials' 'ChangeFilesPermissions' 'EncryptFile'
 'DeactivateSOCKS5proxy' 'ReconfigureApache' 'DisableCGIScripts'
 'UpdateApache']

First webRecon raw children: {'WebserverPubliclyExposed'}
After role filtering: ['WebserverPubliclyExposed']
After defender filtering: ['WebserverPubliclyExposed']

Test action dict: {'preconditions': ['WebserverPubliclyExposed'], 'effect': 'WebReconSuccesful', 'refinement': 'disjunctive'}


In [14]:
# Let's trace the actual get_info execution step by step
import importlib
importlib.reload(tp)  # Reload the module to get our changes

tree_trace = tp.parse_file('../trees/adt_nuovo.xml')
df_trace = tree_trace.to_dataframe()

# Call get_info and check the results
goal_trace, actions_to_goal_trace, initial_attributes_trace, attacker_actions_trace, defender_actions_trace, df_attacker_trace, df_defender_trace = tp.get_info(df_trace)

print("After reloading tree_to_prism module:")
print("webRecon action data:", attacker_actions_trace.get('webRecon', 'NOT FOUND'))

# Double check by rebuilding from scratch 
print("\n=== Manual verification ===")
all_actions = {}
webRecon_count = 0

for _, row in df_trace.iterrows():
    action = row["Action"]
    if action == "webRecon":
        webRecon_count += 1
        print(f"webRecon instance {webRecon_count}: {row['Children']}")
        
        if action not in all_actions:
            all_actions[action] = list(row['Children'])
            print(f"  Created new: {all_actions[action]}")
        else:
            existing = set(all_actions[action])
            new_items = set(row['Children'])
            combined = list(existing.union(new_items))
            all_actions[action] = combined
            print(f"  Updated: {all_actions[action]}")

print(f"\nFinal manual result: {all_actions.get('webRecon', 'NOT FOUND')}")

After reloading tree_to_prism module:
webRecon action data: {'preconditions': ['WebserverPubliclyExposed'], 'effect': 'WebReconSuccesful', 'cost': '5', 'time': '1', 'refinement': 'disjunctive'}

=== Manual verification ===
webRecon instance 1: {'WebserverPubliclyExposed'}
  Created new: ['WebserverPubliclyExposed']
webRecon instance 2: {'WebserverPubliclyExposed'}
  Updated: ['WebserverPubliclyExposed']

Final manual result: ['WebserverPubliclyExposed']


In [15]:
# Final verification test
print("=== FINAL VERIFICATION ===")

# Reload modules to ensure we have the latest changes
import importlib
importlib.reload(tp)

# Test the regenerated environment
with open('envs/adt_nuovo_env.json', 'r') as f:
    env_spec_final = json.load(f)

# Test PRISM model generation  
tree_final = tp.parse_file('../trees/adt_nuovo.xml')
goal_final, actions_to_goal_final, initial_attributes_final, attacker_actions_final, defender_actions_final, df_attacker_final, df_defender_final = tp.get_info(tree_final.to_dataframe())

print("PRISM Model Actions (Fixed):")
for action_name, action_data in attacker_actions_final.items():
    prec = action_data['preconditions']
    has_dups = len(prec) != len(set(prec))
    status = "❌ HAS DUPLICATES" if has_dups else "✅ NO DUPLICATES"
    print(f"  {action_name}: {prec} {status}")

print("\nGym Environment Actions (Fixed):")
for action_id, action_spec in env_spec_final['action_space']['attacker']['actions'].items():
    prec = action_spec['preconditions']['conditions']
    has_dups = len(prec) != len(set(prec))
    status = "❌ HAS DUPLICATES" if has_dups else "✅ NO DUPLICATES"
    print(f"  {action_spec['name']}: {prec} {status}")

# Test action logic consistency
print("\n=== Logic Consistency Check ===")
for action_id, action_spec in env_spec_final['action_space']['attacker']['actions'].items():
    action_name = action_spec['name']
    gym_logic = action_spec['preconditions']['logic']
    gym_refinement = action_spec['refinement']
    
    # Expected logic based on refinement
    expected_logic = "OR" if gym_refinement == "disjunctive" else "AND"
    
    prism_action = attacker_actions_final.get(action_name, {})
    prism_refinement = prism_action.get('refinement', 'UNKNOWN')
    
    consistency = "✅ CONSISTENT" if gym_logic == expected_logic else "❌ INCONSISTENT"
    print(f"  {action_name}: refinement={gym_refinement}, logic={gym_logic}, expected={expected_logic} {consistency}")

print("\n🎉 Precondition analysis complete!")

=== FINAL VERIFICATION ===
PRISM Model Actions (Fixed):
  exfiltrateData: ['AccesstoExecuteArbitraryCode', 'AccesstoMySQL'] ✅ NO DUPLICATES
  getLoginData: ['UnencryptedFiles', 'AccesstoSensitiveFiles'] ✅ NO DUPLICATES
  bufferOverflow: ['SOCKS5ProxyActive', 'WebReconSuccesful'] ✅ NO DUPLICATES
  getFiles: ['AccesstoReverseShell', 'MisconfiguredApache'] ✅ NO DUPLICATES
  webRecon: ['WebserverPubliclyExposed'] ✅ NO DUPLICATES
  pathTraversal: ['VulnerableApacheHTTPServerVersion', 'CGIscriptsenabled', 'WebReconSuccesful'] ✅ NO DUPLICATES

Gym Environment Actions (Fixed):
  exfiltrateData: ['AccesstoMySQL', 'AccesstoExecuteArbitraryCode'] ✅ NO DUPLICATES
  getLoginData: ['AccesstoSensitiveFiles', 'UnencryptedFiles'] ✅ NO DUPLICATES
  bufferOverflow: ['WebReconSuccesful', 'SOCKS5ProxyActive'] ✅ NO DUPLICATES
  getFiles: ['AccesstoReverseShell', 'MisconfiguredApache'] ✅ NO DUPLICATES
  webRecon: ['WebserverPubliclyExposed'] ✅ NO DUPLICATES
  pathTraversal: ['VulnerableApacheHTTPServerVersio

In [16]:
# Test the fixed environment functionality
print("=== TESTING FIXED ENVIRONMENT FUNCTIONALITY ===")

# Create environment with the fixed specification
env_fixed = AttackDefenseTreeEnv("envs/adt_nuovo_env.json", render_mode="human")

print(f"Environment loaded successfully!")
print(f"Goal: {env_fixed.goal}")

# Reset and test initial state
obs, info = env_fixed.reset()
print("\nInitial state:")
for key, value in env_fixed.state.items():
    if key.startswith(('UnencryptedFiles', 'SOCKS5ProxyActive', 'MisconfiguredApache', 'WebserverPubliclyExposed', 'CGIscriptsenabled', 'VulnerableApacheHTTPServerVersion')):
        print(f"  {key}: {value} (initial attribute)")

# Test webRecon action which had the duplicate issue
print(f"\n=== Testing webRecon action (was problematic) ===")
current_player = "attacker" if env_fixed.state["current_player"] == 0 else "defender"
available_actions = env_fixed.get_available_actions()

print(f"Current player: {current_player}")
print(f"Available actions: {available_actions}")

# Find webRecon action
webRecon_action_id = None
for action_id in available_actions:
    action_spec = env_fixed.attacker_actions[str(action_id)]
    if action_spec['name'] == 'webRecon':
        webRecon_action_id = action_id
        break

if webRecon_action_id is not None:
    action_spec = env_fixed.attacker_actions[str(webRecon_action_id)]
    print(f"\nFound webRecon as action {webRecon_action_id}")
    print(f"  Preconditions: {action_spec['preconditions']}")
    print(f"  Effect: {action_spec['effect']}")
    
    # Check if preconditions are met
    print(f"  Checking preconditions:")
    prec_conditions = action_spec['preconditions']['conditions']
    for condition in prec_conditions:
        value = env_fixed.state[condition]
        print(f"    {condition} = {value} (required: 1)")
    
    # Execute the action
    print(f"\nExecuting webRecon action...")
    print(f"  Before: {action_spec['effect']} = {env_fixed.state[action_spec['effect']]}")
    
    obs, reward, terminated, truncated, info = env_fixed.step(webRecon_action_id)
    
    print(f"  After: {action_spec['effect']} = {env_fixed.state[action_spec['effect']]}")
    print(f"  Reward: {reward}")
    print(f"  Action successful: {info.get('action_successful', 'unknown')}")
else:
    print("webRecon action not available (this is expected if preconditions aren't met)")

print("\n✅ Environment functionality test complete!")

=== TESTING FIXED ENVIRONMENT FUNCTIONALITY ===
Number of attacker actions: 6
Number of defender actions: 7
Environment loaded successfully!
Goal: DataExfiltration

Initial state:
  UnencryptedFiles: 1 (initial attribute)
  SOCKS5ProxyActive: 1 (initial attribute)
  MisconfiguredApache: 1 (initial attribute)
  WebserverPubliclyExposed: 1 (initial attribute)
  CGIscriptsenabled: 1 (initial attribute)
  VulnerableApacheHTTPServerVersion: 1 (initial attribute)

=== Testing webRecon action (was problematic) ===
Current player: attacker
Available actions: [4]

Found webRecon as action 4
  Preconditions: {'conditions': ['WebserverPubliclyExposed'], 'logic': 'OR', 'required_values': {'WebserverPubliclyExposed': 1}}
  Effect: WebReconSuccesful
  Checking preconditions:
    WebserverPubliclyExposed = 1 (required: 1)

Executing webRecon action...
  Before: WebReconSuccesful = 0
  After: WebReconSuccesful = 1
  Reward: -5
  Action successful: unknown

✅ Environment functionality test complete!


In [17]:
# Test the refactored function-based approach
print("=== TESTING REFACTORED FUNCTION-BASED APPROACH ===")

# Load the refactored environment
with open('../test_refactored_env.json', 'r') as f:
    env_spec_refactored = json.load(f)

print("Refactored Environment - Attacker Actions:")
for action_id, action_spec in env_spec_refactored['action_space']['attacker']['actions'].items():
    prec = action_spec['preconditions']['conditions']
    has_dups = len(prec) != len(set(prec))
    status = "❌ HAS DUPLICATES" if has_dups else "✅ NO DUPLICATES"
    print(f"  {action_spec['name']}: {prec} {status}")

print("\nRefactored Environment - Defender Actions:")
for action_id, action_spec in env_spec_refactored['action_space']['defender']['actions'].items():
    prec = action_spec['preconditions']['conditions']
    has_dups = len(prec) != len(set(prec))
    status = "❌ HAS DUPLICATES" if has_dups else "✅ NO DUPLICATES"
    print(f"  {action_spec['name']}: {prec} {status}")

# Compare with PRISM model to ensure consistency
print("\n=== CONSISTENCY CHECK WITH PRISM MODEL ===")
# Reload to ensure we have the latest tree_to_prism changes
import importlib
import sys
sys.path.append('../')
import tree_to_prism as tp
importlib.reload(tp)

tree_consistency = tp.parse_file('../trees/adt_nuovo.xml')
goal_c, actions_to_goal_c, initial_attributes_c, attacker_actions_c, defender_actions_c, df_attacker_c, df_defender_c = tp.get_info(tree_consistency.to_dataframe())

print("PRISM vs Gym Environment Comparison:")
for action_name, prism_data in attacker_actions_c.items():
    prism_prec = set(prism_data['preconditions'])
    
    # Find corresponding action in gym env
    gym_prec = None
    for action_id, action_spec in env_spec_refactored['action_space']['attacker']['actions'].items():
        if action_spec['name'] == action_name:
            gym_prec = set(action_spec['preconditions']['conditions'])
            break
    
    if gym_prec is not None:
        match = "✅ MATCH" if prism_prec == gym_prec else "❌ MISMATCH"
        print(f"  {action_name}: PRISM={prism_prec}, GYM={gym_prec} {match}")
    else:
        print(f"  {action_name}: ❌ NOT FOUND IN GYM ENV")

print("\n🎉 Function-based refactoring test complete!")

=== TESTING REFACTORED FUNCTION-BASED APPROACH ===
Refactored Environment - Attacker Actions:
  exfiltrateData: ['AccesstoMySQL', 'AccesstoExecuteArbitraryCode'] ✅ NO DUPLICATES
  getLoginData: ['UnencryptedFiles', 'AccesstoSensitiveFiles'] ✅ NO DUPLICATES
  bufferOverflow: ['WebReconSuccesful', 'SOCKS5ProxyActive'] ✅ NO DUPLICATES
  getFiles: ['AccesstoReverseShell', 'MisconfiguredApache'] ✅ NO DUPLICATES
  webRecon: ['WebserverPubliclyExposed'] ✅ NO DUPLICATES
  pathTraversal: ['WebReconSuccesful', 'CGIscriptsenabled', 'VulnerableApacheHTTPServerVersion'] ✅ NO DUPLICATES

Refactored Environment - Defender Actions:
  changeCredentials: [] ✅ NO DUPLICATES
  changeFilePermissions: [] ✅ NO DUPLICATES
  encryptFile: [] ✅ NO DUPLICATES
  deactivateSOCKS5Proxy: [] ✅ NO DUPLICATES
  reconfigureApache: [] ✅ NO DUPLICATES
  disableCGIScripts: [] ✅ NO DUPLICATES
  updateApache: [] ✅ NO DUPLICATES

=== CONSISTENCY CHECK WITH PRISM MODEL ===
PRISM vs Gym Environment Comparison:
  exfiltrateData: 

In [19]:
# Final test with the new function-based environment
print("=== FINAL TEST WITH FUNCTION-BASED TREE-TO-ENV ===")

# Test the environment creation and basic functionality
env_final = AttackDefenseTreeEnv("envs/adt_nuovo_env.json", render_mode="human")

print(f"✅ Environment loaded successfully!")
print(f"Goal: {env_final.goal}")

# Reset and test a few actions
obs, info = env_final.reset()
print(f"✅ Environment reset successfully")

# Test webRecon action (previously had duplicates)
available_actions = env_final.get_available_actions()
webRecon_id = None
for action_id in available_actions:
    action_spec = env_final.attacker_actions[str(action_id)]
    if action_spec['name'] == 'webRecon':
        webRecon_id = action_id
        break

if webRecon_id is not None:
    print(f"✅ webRecon action found (ID: {webRecon_id})")
    action_spec = env_final.attacker_actions[str(webRecon_id)]
    print(f"  Preconditions: {action_spec['preconditions']['conditions']}")
    print(f"  Logic: {action_spec['preconditions']['logic']}")
    
    # Execute the action
    obs, reward, terminated, truncated, info = env_final.step(webRecon_id)
    print(f"  ✅ Action executed successfully, reward: {reward}")
else:
    print("❌ webRecon action not found")

print(f"\n🎉 REFACTORING COMPLETE!")
print(f"Tree-to-env now uses the same function-based pattern as tree-to-prism:")
print(f"  ✅ get_gym_environment(tree) - for standard environments")
print(f"  ✅ get_gym_environment_time(tree) - for time-based environments") 
print(f"  ✅ Shared get_info(df) function for consistency")
print(f"  ✅ Fixed duplicate preconditions issue")
print(f"  ✅ Consistent logic handling between PRISM and Gym environments")
print(f"  ✅ All preconditions properly deduplicated and validated")

=== FINAL TEST WITH FUNCTION-BASED TREE-TO-ENV ===
Number of attacker actions: 6
Number of defender actions: 7
✅ Environment loaded successfully!
Goal: DataExfiltration
✅ Environment reset successfully
✅ webRecon action found (ID: 4)
  Preconditions: ['WebserverPubliclyExposed']
  Logic: OR
  ✅ Action executed successfully, reward: -5

🎉 REFACTORING COMPLETE!
Tree-to-env now uses the same function-based pattern as tree-to-prism:
  ✅ get_gym_environment(tree) - for standard environments
  ✅ get_gym_environment_time(tree) - for time-based environments
  ✅ Shared get_info(df) function for consistency
  ✅ Fixed duplicate preconditions issue
  ✅ Consistent logic handling between PRISM and Gym environments
  ✅ All preconditions properly deduplicated and validated


In [None]:
# Test the turn system
print("=== TESTING TURN SYSTEM ===")

# Create environment with the fixed transitions
env_turn_test = AttackDefenseTreeEnv("envs/adt_nuovo_env.json", render_mode="human")

obs, info = env_turn_test.reset()

print("Initial state:")
print(f"  Current player: {info['current_player']}")
print(f"  Goal state: {env_turn_test.state[env_turn_test.goal]}")

# Test a few steps to verify turn switching
for step in range(4):
    print(f"\n--- Step {step + 1} ---")
    current_player = "attacker" if env_turn_test.state["current_player"] == 0 else "defender"
    print(f"Current player: {current_player}")
    
    available_actions = env_turn_test.get_available_actions()
    print(f"Available actions: {available_actions}")
    
    if available_actions:
        action = available_actions[0]  # Take first available action
        
        # Get action name for display
        if current_player == "attacker":
            action_name = env_turn_test.attacker_actions[str(action)]["name"]
            action_cost = env_turn_test.attacker_actions[str(action)]["cost"]
        else:
            action_name = env_turn_test.defender_actions[str(action)]["name"]
            action_cost = env_turn_test.defender_actions[str(action)]["cost"]
            
        print(f"Taking action: {action} ({action_name}, cost: {action_cost})")
        
        obs, reward, terminated, truncated, info = env_turn_test.step(action)
        
        print(f"After action:")
        print(f"  Current player: {info['current_player']}")
        print(f"  Action taken: {info['action_taken']}")
        print(f"  Valid action: {info['action_valid']}")
        print(f"  Reward: {reward}")
        print(f"  Goal state: {env_turn_test.state[env_turn_test.goal]}")
        
        if terminated:
            print("🎯 Game terminated!")
            break
    else:
        print("❌ No available actions")
        break

print("\n✅ Turn system test completed!")