# CS 486/686 Assignment 3

## Question 1

### (a)

In [None]:
from typing import List


def get_successors(cur: str) -> List[str]:
    dangerous_states = {"0201", "0101", "0102", "1212", "2002"}
    successors = []

    for i in range(4):
        num = int(cur[i])
        for change in [1, -1]:
            new_num = (num + change) % 10
            new_state = cur[:i] + str(new_num) + cur[i+1:]
            if new_state not in dangerous_states:
                successors.append(new_state)

    return successors


if __name__ == "__main__":
    print(get_successors("0000"))
    print(get_successors("0100"))

### (b)

In [None]:
from collections import deque

TARGET = "0202"


def search() -> int:  # BFS
    frontier = deque([("0000", 0, ["0000"])])

    while frontier:
        current_state, turns, path = frontier.popleft()
        if current_state == TARGET:
            print(path)
            return turns

        for successor in get_successors(current_state):
            frontier.append((successor, turns + 1, path[:] + [successor]))

    return -1


if __name__ == "__main__":
    print(search())

### (c)

In [None]:
def dfs(limit: int) -> int:
    frontier = [("0000", 0)]

    while frontier:
        current_state, turns = frontier.pop()
        if current_state == TARGET:
            return turns

        if turns == limit:
            continue

        for successor in get_successors(current_state):
            frontier.append((successor, turns + 1))

    return -1


MAX_DEPTH = 20 # K


def ids_search() -> int:
    depth = 1
    while depth <= MAX_DEPTH:
        result = dfs(depth)
        if result != -1:
            return result
        depth += 1


if __name__ == "__main__":
    print(ids_search())

**Is IDS guaranteed to find a solution if it exists?**
- Yes, IDS is complete. It will find the solution if it exists because it explores all depths incrementally, ensuring that no state is left unexplored within the maximum depth.

**Space Complexity:**
- The space complexity of IDS is $O(bM)$, where $b$ is the branching factor, and $M$ is the depth of the goal node.

### (d)

In [None]:
def get_heuristic(cur: str) -> int:
    heuristic = 0
    for i in range(4):
        cur_digit = int(cur[i])
        target_digit = int(TARGET[i])
        distance = abs(cur_digit - target_digit)
        distance = min(distance, 10 - distance)
        heuristic += distance
    return heuristic

if __name__ == "__main__":
    print(get_heuristic("0000"))
    print(get_heuristic("1234"))

The Manhattan distance is admissible because it never overestimates the true cost to reach the goal. In this problem, the Manhattan distance represents the minimum number of moves required to match each digit in the current state to the corresponding digit in the target state.

The Manhattan distance heuristic for a lock state is computed by summing the absolute differences between the digits of the current state and the target state. Since each wheel can wrap around, we take the minimum number of clockwise and anti-clockwise turns (e.g., moving from 9 to 0 is a single move).

### (e)

**Manhattan Distance Consistency**:
- When you move from state $n$ to a neighbour state $m$ by turning a wheel by one position, the Manhattan distance decreases by at most one, hence $ h(n) - h(m) \leq cost(n, m) = 1 $ holds true.

Thus, as the Manhattan distance heuristic is consistent, ensuring that A* search with multi-path pruning remains optimal.

### (f)

Minimum number of turns: 6

0000 -> 1000 -> 1100 -> 1200 -> 1201 -> 1202 -> 0202

## Question 2

In [None]:
import math
import random
from typing import List, Tuple
# Those libraries should be enough for your implementation.
# Do not change the existing function signatures in this file.

random.seed(486686)


def generate_neighbor(given_items: List[Tuple[int, int]], selection: List[int]) -> List[int]:
    """
    Given a list of items to choose from and a current selection,
    returns a neighboring selection by randomly adding or removing an item.

    :param given_items: a list of items to choose from
    :param selection: a current selection
    :return: a neighboring selection
    """

    neighbor = selection[:]
    i = random.randint(0, len(given_items) - 1)
    if neighbor[i] > 0:
        neighbor[i] += random.choice([-1, 1])
    else:
        neighbor[i] += 1
    return neighbor


def knapsack_solver(capacity: int, items: List[Tuple[int, int]]) -> List[int]:
    """
    Given a list of items to choose from and a maximum capacity of a knapsack,
    returns a selection of items that maximize the total value of items in the knapsack
    through the Simulated Annealing Algorithm.

    :param capacity: the maximum capacity of the knapsack
    :param items: a list of items to choose from (weight, value)
    :return: a selection of items that maximize the total value of items in the knapsack
    """

    def calculate_total_value(selection: List[int]) -> int:
        nonlocal items
        return sum(selection[i] * items[i][1] for i in range(len(items)))

    def calculate_total_weight(selection: List[int]) -> int:
        nonlocal items
        return sum(selection[i] * items[i][0] for i in range(len(items)))

    # Initial selection
    current_selection = [0] * len(items)
    current_value = 0
    best_selection = current_selection[:]
    best_value = current_value

    # Simulated annealing parameters
    T = 1000.0
    cooling_rate = 0.99
    min_T = 0.01

    while T > min_T:
        neighbor_selection = generate_neighbor(items, current_selection)
        if calculate_total_weight(neighbor_selection) <= capacity:
            neighbor_value = calculate_total_value(neighbor_selection)
            delta_value = neighbor_value - current_value

            if delta_value > 0 or math.exp(delta_value / T) > random.random():
                current_selection = neighbor_selection
                current_value = neighbor_value

            if current_value > best_value:
                best_selection = current_selection[:]
                best_value = current_value

            T *= cooling_rate

    return best_selection


if __name__ == '__main__':
    capacity = 50
    items = [(10, 60), (20, 200), (30, 120)]
    result = knapsack_solver(capacity, items)
    expected = [1, 2, 0]
    if result == expected:
        print("Pass")
    else:
        print("Incorrect")
        print(f"Expected: {expected}")
        print(f"Output: {result}")

## Question 3

In [None]:
from heapq import heappop, heappush


def parse_input(input_string):
    lines = input_string.strip().split('\n')
    pacman_pos = tuple(map(int, lines[0].split()))
    food_pos = tuple(map(int, lines[1].split()))
    rows, cols = map(int, lines[2].split())
    grid = [list(line) for line in lines[3:3 + rows]]
    return pacman_pos, food_pos, grid, rows, cols


def manhattan_distance(start, end):
    return abs(start[0] - end[0]) + abs(start[1] - end[1])


def get_neighbors(position, grid, rows, cols):
    x, y = position
    neighbors = []
    for dx, dy in [(-1, 0), (0, -1), (0, 1), (1, 0)]:  # Up, Left, Right, Down
        nx, ny = x + dx, y + dy
        if 0 <= nx < rows and 0 <= ny < cols and grid[nx][ny] != '%':
            neighbors.append((nx, ny))
    return neighbors


def a_star_search(pacman_pos, food_pos, grid, rows, cols, h=manhattan_distance):
    frontier = []
    heappush(frontier, (0 + h(pacman_pos, food_pos),
                        0, pacman_pos, [pacman_pos]))  # tuple(f-value, cost, cur_pos, path)
    visited = set()

    while frontier:
        _, cost, cur_pos, path = heappop(frontier)
        visited.add(cur_pos)
        if cur_pos == food_pos:
            return path

        neighbors = get_neighbors(cur_pos, grid, rows, cols)
        for neighbor in neighbors:
            if neighbor not in visited:
                heappush(frontier, (cost + 1 + h(neighbor, food_pos),
                                    cost + 1, neighbor, path + [neighbor]))

    return []


def initialize(grid: str) -> list[list]:
    pacman_pos, food_pos, grid, rows, cols = parse_input(grid)
    path = a_star_search(pacman_pos, food_pos, grid, rows, cols)
    return [[str(x), str(y)] for x, y in path]


if __name__ == '__main__':
    grid = """3 9
5 1
7 20
%%%%%%%%%%%%%%%%%%%%
%--------------%---%
%-%%-%%-%%-%%-%%-%-%
%--------P-------%-%
%%%%%%%%%%%%%%%%%%-%
%.-----------------%
%%%%%%%%%%%%%%%%%%%%"""

    output = initialize(grid)
    expected = [['3', '9'], ['3', '10'], ['3', '11'], ['3', '12'], ['3', '13'],
                ['3', '14'], ['3', '15'], ['3', '16'], ['2', '16'], ['1', '16'],
                ['1', '17'], ['1', '18'], ['2', '18'], ['3', '18'], ['4', '18'],
                ['5', '18'], ['5', '17'], ['5', '16'], ['5', '15'], ['5', '14'],
                ['5', '13'], ['5', '12'], ['5', '11'], ['5', '10'], ['5', '9'],
                ['5', '8'], ['5', '7'], ['5', '6'], ['5', '5'], ['5', '4'],
                ['5', '3'], ['5', '2'], ['5', '1']]

    if output == expected:
        print("Pass")
    else:
        print("Incorrect")
        print(f"Expected: {expected}")
        print(f"Output: {output}")

## Question 4

In [None]:
import numpy as np
from tqdm.auto import tqdm
from scipy.spatial.distance import cdist
from typing import Optional, Union


class KMeans:
    '''
    Sample Class for KMeans Clustering
    DO NOT MODIFY any part of this code unless you are requested to do so.
    '''

    def __init__(
        self,
        n_clusters: int = 3,
        max_iter: int = 30,
        tol: float = 1e-4,
        init: str = 'random',
        n_init: int = 300,
        seed: Optional[int] = None,
        verbose: bool = False
    ) -> None:
        self.n_clusters = n_clusters
        self.max_iter = max_iter
        self.tol = tol
        self.init = init
        self.n_init = n_init
        self.seed = seed
        self.centroids: Optional[np.ndarray] = None
        self.best_centroids: Optional[np.ndarray] = None
        self.best_objective: float = np.inf
        self.verbose = verbose

        if self.seed is not None:
            np.random.seed(self.seed)

    def fit(self, X: np.ndarray, algorithm: str = 'lloyd') -> None:
        for init in tqdm(range(self.n_init)):
            self.centroids = self._initialize_centroids(X)

            if init == 0:
                self.best_centroids = self.centroids.copy()
                self.best_objective = np.inf

            if algorithm == 'lloyd':
                self._lloyd(X)
            elif algorithm == 'elkan':
                self._elkan(X)
            elif algorithm == 'hamerly':
                self._hamerly(X)
            else:
                raise ValueError("Unknown algorithm type")

        self.centroids = self.best_centroids

    def _initialize_centroids(self, X: np.ndarray) -> np.ndarray:
        if self.init == 'kmeans++':
            return self._kmeans_pp(X)
        return self._random_init(X)

    def _random_init(self, X: np.ndarray) -> np.ndarray:
        indices = np.random.choice(X.shape[0], self.n_clusters, replace=False)
        return X[indices]

    def _kmeans_pp(self, X: np.ndarray) -> np.ndarray:
        centroids = [X[np.random.choice(X.shape[0])]]

        for _ in range(self.n_clusters - 1):
            # Compute the distance of each point to the nearest centroid
            dist_sq = np.array([min([np.inner(c-x, c-x) for c in centroids])
                                for x in X])

            # Choose the next centroid based on the probability proportional to the squared distance
            probs = dist_sq / dist_sq.sum()
            centroids.append(X[np.random.choice(X.shape[0], p=probs)])

        return np.array(centroids)

    def _lloyd(self, X: np.ndarray) -> None:
        for iteration in range(self.max_iter):
            # Assignment step
            clusters = self._assign_clusters(X)

            # Update centroids
            new_centroids = np.array([
                X[clusters == i].mean(axis=0) if np.any(clusters == i)
                else self.centroids[i]
                for i in range(self.n_clusters)
            ])

            # Compute the new objective value
            new_objective = self._compute_objective(X, clusters, new_centroids)

            # Update the best centroids if the objective improves
            if new_objective < self.best_objective:
                self.best_centroids = new_centroids.copy()
                self.best_objective = new_objective

            # Check for convergence
            if np.linalg.norm(self.centroids - new_centroids) <= self.tol:
                break

            # Always update centroids
            self.centroids = new_centroids
            if iteration % 10 == 0 and self.verbose:
                print(f"Iteration {iteration}: objective = {new_objective}")

    def _elkan(self, X: np.ndarray) -> None:
        centroid_dists = np.zeros((self.n_clusters, self.n_clusters))
        distances = cdist(X, self.centroids, 'euclidean')
        clusters = np.argmin(distances, axis=1)
        upper_bounds = np.min(distances, axis=1)
        lower_bounds = distances
        p = np.zeros(self.n_clusters)

        for iteration in range(self.max_iter):
            # Step 1: Compute distances between centroids
            centroid_dists = cdist(self.centroids, self.centroids, 'euclidean')
            np.fill_diagonal(centroid_dists, np.inf)
            s = np.min(centroid_dists, axis=1) / 2

            # Step 2: Update bounds
            upper_bounds += p[clusters]
            lower_bounds = np.maximum(lower_bounds - p, 0)

            # Step 3: Assignment step
            mask1 = upper_bounds > s[clusters]
            X_masked = X[mask1]
            distances = np.linalg.norm(
                X_masked[:, np.newaxis] - self.centroids, axis=2)
            # The commented code below is the algorithm in the original paper but it is hard to vectorize
            # distances = np.full((X_masked.shape[0], self.n_clusters), np.inf)
            # for i in range(self.n_clusters):
            #     mask2 = upper_bounds[mask1] > np.maximum(
            #         lower_bounds[mask1][:, i], centroid_dists[clusters[mask1], i])
            #     distances[mask2, i] = np.linalg.norm(
            #         X_masked[mask2] - self.centroids[i], axis=1)
            upper_bounds[mask1] = np.min(distances, axis=1)
            lower_bounds[mask1] = distances
            clusters[mask1] = np.argmin(distances, axis=1)

            # Step 4: Update centroids
            new_centroids = np.array([
                X[clusters == i].mean(axis=0) if np.any(clusters == i)
                else self.centroids[i]
                for i in range(self.n_clusters)
            ])

            # Calculate the distance that each centroid moved
            p = np.linalg.norm(new_centroids - self.centroids, axis=1)

            # Check for convergence
            if np.linalg.norm(self.centroids - new_centroids) <= self.tol:
                break

            # Compute the new objective value
            new_objective = self._compute_objective(X, clusters, new_centroids)

            # Update the best centroids if the objective improves
            if new_objective < self.best_objective:
                self.best_centroids = new_centroids.copy()
                self.best_objective = new_objective

            # Always update centroids
            self.centroids = new_centroids
            if iteration % 10 == 0 and self.verbose:
                print(f"Iteration {iteration}: objective = {new_objective}")

    def _hamerly(self, X: np.ndarray) -> None:
        centroid_dists = np.zeros((self.n_clusters, self.n_clusters))
        distances = cdist(X, self.centroids, 'euclidean')
        clusters = np.argmin(distances, axis=1)
        upper_bounds = np.min(distances, axis=1)
        lower_bounds = np.partition(distances, 1, axis=1)[:, 1]
        p = np.zeros(self.n_clusters)

        for iteration in range(self.max_iter):
            # Step 1: Compute distances between centroids
            centroid_dists = cdist(self.centroids, self.centroids, 'euclidean')
            np.fill_diagonal(centroid_dists, np.inf)
            s = np.min(centroid_dists, axis=1) / 2

            # Step 2: Update bounds
            sorted_indices = np.argpartition(p, 2)
            r = sorted_indices[0]
            r_prime = sorted_indices[1]
            upper_bounds += p[clusters]
            mask = r == clusters
            lower_bounds[mask] -= p[r_prime]
            lower_bounds[~mask] -= p[r]

            # Step 3: Assignment step
            mask = upper_bounds > np.maximum(s[clusters], lower_bounds)
            distances = np.linalg.norm(
                X[mask][:, np.newaxis] - self.centroids, axis=2)
            upper_bounds[mask] = np.min(distances, axis=1)
            lower_bounds[mask] = np.partition(distances, 2, axis=1)[:, 1]
            clusters[mask] = np.argmin(distances, axis=1)

            # Step 4: Update centroids
            new_centroids = np.array([
                X[clusters == i].mean(axis=0) if np.any(clusters == i)
                else self.centroids[i]
                for i in range(self.n_clusters)
            ])

            # Calculate the distance that each centroid moved
            p = np.linalg.norm(new_centroids - self.centroids, axis=1)

            # Check for convergence
            if np.linalg.norm(self.centroids - new_centroids) <= self.tol:
                break

            # Compute the new objective value
            new_objective = self._compute_objective(X, clusters, new_centroids)

            # Update the best centroids if the objective improves
            if new_objective < self.best_objective:
                self.best_centroids = new_centroids.copy()
                self.best_objective = new_objective

            # Always update centroids
            self.centroids = new_centroids
            if iteration % 10 == 0 and self.verbose:
                print(f"Iteration {iteration}: objective = {new_objective}")

    def _assign_clusters(self, X: np.ndarray, point_index: Optional[int] = None, use_bounds: bool = False,
                         upper_bounds: Optional[np.ndarray] = None,
                         lower_bounds: Optional[np.ndarray] = None) -> Union[np.ndarray, int]:
        if point_index is None:
            if use_bounds and upper_bounds is not None and lower_bounds is not None:
                distances = np.full((X.shape[0], self.n_clusters), np.inf)
                for i in range(self.n_clusters):
                    mask = upper_bounds < lower_bounds[:, i]
                    distances[mask, i] = np.linalg.norm(
                        X[mask] - self.centroids[i], axis=1)
                return np.argmin(distances, axis=1)
            else:
                distances = cdist(X, self.centroids, 'euclidean')
                return np.argmin(distances, axis=1)
        else:
            distances = cdist(X[point_index:point_index + 1],
                              self.centroids, 'euclidean')
            return np.argmin(distances)

    def _compute_objective(self, X: np.ndarray, clusters: np.ndarray, centroids: np.ndarray) -> float:
        objective = 0.0
        for i in range(self.n_clusters):
            mask = clusters == i
            size = np.sum(mask)
            objective += size * \
                np.sum(np.linalg.norm(X[mask] - centroids[i], axis=1) ** 2)
        return objective

    def predict(self, X: np.ndarray) -> np.ndarray:
        return self._assign_clusters(X)