In [2]:
import random
import heapq
import time

# ------------------ CONFIGURATION ------------------
ROWS, COLS = 6, 6           # Grid size
EMPTY, WALL = 0, 1
START = (0, 0)
GOAL = (ROWS - 1, COLS - 1)
WALL_PROBABILITY = 0.25      # Initial walls
CHANGE_PROBABILITY = 0.3     # Probability of wall change during movement

# ------------------ A* SEARCH FUNCTION ------------------
def heuristic(a, b):
    # Manhattan distance
    return abs(a[0] - b[0]) + abs(a[1] - b[1])

def astar(grid, start, goal):
    pq = []
    heapq.heappush(pq, (0, start))
    came_from = {}
    g_score = {start: 0}

    while pq:
        _, current = heapq.heappop(pq)

        if current == goal:
            # Reconstruct path
            path = []
            while current in came_from:
                path.append(current)
                current = came_from[current]
            path.reverse()
            return path

        # Explore neighbors
        for dx, dy in [(-1,0),(1,0),(0,-1),(0,1)]:
            nx, ny = current[0] + dx, current[1] + dy
            neighbor = (nx, ny)

            if 0 <= nx < ROWS and 0 <= ny < COLS and grid[nx][ny] == EMPTY:
                tentative_g = g_score[current] + 1
                if neighbor not in g_score or tentative_g < g_score[neighbor]:
                    came_from[neighbor] = current
                    g_score[neighbor] = tentative_g
                    f = tentative_g + heuristic(neighbor, goal)
                    heapq.heappush(pq, (f, neighbor))

    return None  # No path found

# ------------------ GRID FUNCTIONS ------------------
def generate_grid():
    grid = [[EMPTY for _ in range(COLS)] for _ in range(ROWS)]
    for i in range(ROWS):
        for j in range(COLS):
            if random.random() < WALL_PROBABILITY and (i, j) not in [START, GOAL]:
                grid[i][j] = WALL
    return grid

def random_change(grid):
    x, y = random.randint(0, ROWS-1), random.randint(0, COLS-1)
    if (x, y) not in [START, GOAL]:
        grid[x][y] = WALL if grid[x][y] == EMPTY else EMPTY
        return (x, y)
    return None

def print_grid(grid, agent_pos=None):
    for i in range(ROWS):
        for j in range(COLS):
            if agent_pos == (i, j):
                print("A", end=" ")
            elif (i, j) == GOAL:
                print("G", end=" ")
            elif grid[i][j] == WALL:
                print("#", end=" ")
            else:
                print(".", end=" ")
        print()
    print()

# ------------------ ADAPTIVE AGENT ------------------
def adaptive_agent(grid):
    current = START
    total_steps = 0
    replans = 0
    start_time = time.time()

    while current != GOAL:
        path = astar(grid, current, GOAL)
        if not path:
            # Goal unreachable
            return False, total_steps, replans, time.time() - start_time

        replans += 1
        for step in path:
            # Random dynamic change
            if random.random() < CHANGE_PROBABILITY:
                random_change(grid)

            # Check if path blocked
            if grid[step[0]][step[1]] == WALL:
                break  # Replan from current

            current = step
            total_steps += 1

            if current == GOAL:
                return True, total_steps, replans, time.time() - start_time

    return True, total_steps, replans, time.time() - start_time

# ------------------ RUN 50 SCENARIOS ------------------
success_count = 0

for scenario in range(1, 51):
    print(f"========== Scenario {scenario} ==========")
    grid = generate_grid()
    print("Initial Grid:")
    print_grid(grid, START)

    success, steps, replans, t = adaptive_agent(grid)

    print("Final Grid:")
    print_grid(grid, GOAL if success else START)

    print(f"Path Length: {steps}")
    print(f"Replanning Count: {replans}")
    print(f"Time Taken: {round(t, 4)} sec")
    print(f"Success: {'YES' if success else 'NO'}\n")

    if success:
        success_count += 1

print("========== FINAL EVALUATION ==========")
print(f"Success Rate: {success_count}/50 = {success_count * 2}%")

Initial Grid:
A . . . # . 
. . . . . . 
# . . . . # 
. . . . # # 
. . . . . # 
# # . . . G 

Final Grid:
A # . . # . 
. . . . . . 
# . # . . # 
. . . . # # 
. . . . . # 
# # . . # G 

Path Length: 8
Replanning Count: 1
Time Taken: 0.001 sec
Success: NO

Initial Grid:
A # . . # # 
. . # . . . 
. . . . . . 
. # . # . # 
. . . # # . 
# . # . # G 

Final Grid:
A # . . # # 
. . # . . . 
. . . . . . 
. # . # . # 
. . . # # . 
# . # . # G 

Path Length: 0
Replanning Count: 0
Time Taken: 0.0 sec
Success: NO

Initial Grid:
A . . . # . 
. . # . . . 
. . # . # . 
. # . # # . 
. . # . . . 
. # . . . G 

Final Grid:
. . . . # . 
. . . . . . 
. . # . # . 
. . . # # . 
. . # . . . 
. # . . # A 

Path Length: 10
Replanning Count: 1
Time Taken: 0.0 sec
Success: YES

Initial Grid:
A . . # . . 
. . . . . # 
. # # . . . 
. . # . # . 
. . . # . . 
. . . . . G 

Final Grid:
. . . # . . 
. . . . . # 
. # # . . . 
. . # . # . 
. . # # . . 
. . . . # A 

Path Length: 10
Replanning Count: 1
Time Taken: 0.001 se

In [3]:
# ------------------ RULES ------------------
rules = [
    {"conditions": ["high_traffic", "no_user_activity"], "conclusion": "DDoS"},
    {"conditions": ["many_ports_scanned"], "conclusion": "Port Scan"},
    {"conditions": ["suspicious_files", "unexpected_connections"], "conclusion": "Malware"},
    {"conditions": ["high_cpu", "high_memory"], "conclusion": "Malware"},
    {"conditions": ["multiple_login_failures", "suspicious_ips"], "conclusion": "Brute Force Attack"}
]

# ------------------ SCENARIOS ------------------
attack_scenarios = [
    {"high_traffic": True, "no_user_activity": True, "expected": "DDoS"},
    {"many_ports_scanned": True, "expected": "Port Scan"},
    {"suspicious_files": True, "unexpected_connections": True, "expected": "Malware"},
    {"high_cpu": True, "high_memory": True, "expected": "Malware"},
    {"multiple_login_failures": True, "suspicious_ips": True, "expected": "Brute Force Attack"},
    {"high_traffic": True, "no_user_activity": False, "expected": "Unknown"},
    {"suspicious_files": True, "unexpected_connections": False, "expected": "Unknown"},
    {"many_ports_scanned": False, "expected": "Unknown"},
    {"high_cpu": True, "high_memory": False, "expected": "Unknown"},
    {"multiple_login_failures": True, "suspicious_ips": False, "expected": "Unknown"}
]

# ------------------ EXPERT SYSTEM ------------------
def diagnose(facts):
    known_facts = facts.copy()
    reasoning_trace = []
    threat_detected = "Unknown"
    
    # Forward chaining
    for rule in rules:
        conditions_met = True
        missing_facts = []
        for cond in rule["conditions"]:
            if cond not in known_facts:
                # Ask user if fact is missing
                response = input(f"Is '{cond}' present? (yes/no): ").strip().lower()
                if response == "yes":
                    known_facts[cond] = True
                else:
                    known_facts[cond] = False
            if not known_facts[cond]:
                conditions_met = False
        if conditions_met:
            threat_detected = rule["conclusion"]
            reasoning_trace.append(f"Rule fired: IF {' AND '.join(rule['conditions'])} THEN {rule['conclusion']}")
            break  # Stop after first matching rule

    if not reasoning_trace:
        reasoning_trace.append("No rules fired. Threat unknown.")

    return threat_detected, reasoning_trace

# ------------------ RUN 10 SCENARIOS ------------------
correct_count = 0

for idx, scenario in enumerate(attack_scenarios, 1):
    print(f"\n========== Scenario {idx} ==========")
    # Remove expected key for diagnosis
    facts = {k:v for k,v in scenario.items() if k != "expected"}
    expected = scenario["expected"]
    
    threat, trace = diagnose(facts)
    
    print(f"Diagnosed Threat: {threat}")
    print(f"Expected Threat: {expected}")
    print("Reasoning Trace:")
    for step in trace:
        print(" -", step)
    
    if threat == expected:
        correct_count += 1

accuracy = (correct_count / len(attack_scenarios)) * 100
print(f"\n========== FINAL EVALUATION ==========")
print(f"Accuracy: {correct_count}/{len(attack_scenarios)} = {accuracy}%")


Diagnosed Threat: DDoS
Expected Threat: DDoS
Reasoning Trace:
 - Rule fired: IF high_traffic AND no_user_activity THEN DDoS

Diagnosed Threat: DDoS
Expected Threat: Port Scan
Reasoning Trace:
 - Rule fired: IF high_traffic AND no_user_activity THEN DDoS

Diagnosed Threat: DDoS
Expected Threat: Malware
Reasoning Trace:
 - Rule fired: IF high_traffic AND no_user_activity THEN DDoS

Diagnosed Threat: Port Scan
Expected Threat: Malware
Reasoning Trace:
 - Rule fired: IF many_ports_scanned THEN Port Scan

Diagnosed Threat: Brute Force Attack
Expected Threat: Brute Force Attack
Reasoning Trace:
 - Rule fired: IF multiple_login_failures AND suspicious_ips THEN Brute Force Attack

Diagnosed Threat: Unknown
Expected Threat: Unknown
Reasoning Trace:
 - No rules fired. Threat unknown.

Diagnosed Threat: Brute Force Attack
Expected Threat: Unknown
Reasoning Trace:
 - Rule fired: IF multiple_login_failures AND suspicious_ips THEN Brute Force Attack

Diagnosed Threat: Malware
Expected Threat: Unkno