#Question 1. Maze Problem

In [None]:
import heapq

MAZE = [
    [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0],
    [0, 1, 0, 0, 0, 1, 1, 1, 1, 1, 0],
    [0, 1, 0, 1, 0, 1, 0, 0, 0, 1, 0],
    [0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0],
    [0, 1, 0, 1, 0, 0, 0, 1, 1, 1, 0],
    [1, 1, 0, 1, 0, 1, 0, 1, 0, 0, 0],
    [0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 1],
    [0, 1, 1, 1, 0, 1, 0, 1, 1, 1, 0],
    [0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0],
    [0, 1, 0, 1, 0, 1, 1, 1, 0, 1, 0],
    [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0]
]

class MazeProblem:
  def __init__(self, maze):
    self.maze = maze
    self.adj_list = {}
    self.generate_graph()

  # Generates the adjacency list graph based on the maze grid
  def generate_graph(self):
    for i in range(len(self.maze)):
      for j in range(len(self.maze[0])):
        # Creates a label for the current cell position
        current_position_label = (chr(i + 65), j+1)

        # Defines an empty set if the position is not defined
        if current_position_label not in self.adj_list:
          self.adj_list[current_position_label] = []

        # Get valid indices positions in the grid
        positions = []
        if i > 0:
          positions.append((i-1, j))
        if j > 0:
          positions.append((i, j-1))
        if i < len(self.maze) - 1:
          positions.append((i+1, j))
        if j < len(self.maze[0]) - 1:
          positions.append((i, j+1))

        # Iterates over the possible directions
        for adj_i, adj_j in positions:
          # Checks if this is a wall
          if self.maze[adj_i][adj_j] == 1:
            continue
          # Otherwise add it as a neighbor to the current position label
          self.adj_list[current_position_label].append((chr(adj_i + 65), adj_j+1))

  # Retrieves the entry or index of a node from within the priority queue.
  # If it does not exist, return None
  def _get_entry_in_p_queue(self, p_queue, node):
    index = 0
    for entry in p_queue:
      if entry[2] == node:
        return (entry, index)
      index += 1
    return None

  # Given the index of an entry in the priority queue, delete it, and then add a new_entry
  def _replace_entry_in_p_queue(self, p_queue, index: int, new_entry):
    p_queue.pop(index)
    heapq.heapify(p_queue)
    heapq.heappush(p_queue, new_entry)

  # Defines the heuristic used in the A star search (Manhattan distance)
  def _heuristic_(self, from_node: tuple, to_node: tuple):
    x = abs((ord(from_node[0]) - 65) - (ord(to_node[0]) - 65))
    y = abs(from_node[1] - to_node[1])
    return x + y


  def A_star_search(self, start_node: tuple, target_node: tuple):
    # Initialize open set (priority queue) and visited set.
    p_queue = []
    visited = set()
    # Pushes the starting node to the heap
    # The entry tuple is defined as: Heuristic + pathcost, pathcost, current_node, path
    heapq.heappush(p_queue, [self._heuristic_(start_node, target_node), 0, start_node, [start_node]])

    while len(p_queue) > 0:
      # Retrieve the lowest cost entry
      current_cost, path_cost, current_node, current_path = heapq.heappop(p_queue)

      # Check if the current node is the target
      if current_node == target_node:
        return path_cost, current_path

      # Add current node to visited set
      visited.add(current_node)

      for neighbor_node in self.adj_list[current_node]:
        # Creates new path in memory and new entry that can be added to priority queue
        new_path = current_path.copy()
        new_path.append(neighbor_node)
        new_entry = [path_cost + 1 + self._heuristic_(neighbor_node, target_node), path_cost+1, neighbor_node, new_path]

        possible_p_queue_entry = self._get_entry_in_p_queue(p_queue, neighbor_node)
        if possible_p_queue_entry is None and neighbor_node not in visited:
          # Add this new path to the priority queue
          heapq.heappush(p_queue, new_entry)
        elif possible_p_queue_entry is not None and possible_p_queue_entry[0][0] > new_entry[0]:
          # Replace existing priority queue node with this updated, better path
          self._replace_entry_in_p_queue(p_queue, possible_p_queue_entry[1], new_entry)

    return None


In [None]:
# Instantiates the MazeProblem class that contains the algorithm
problem_1 = MazeProblem(MAZE)

# 1) Reach goal 1 from starting point
result = problem_1.A_star_search(('K', 6), ('A', 6))
print("Part 1: From ('K', 6) to ('A', 6)")
print("Path: ", result[1])
print("Tiles Traversed: ", result[0])

# 2) Reach goal 2 from starting point
result = problem_1.A_star_search(('K', 6), ('E', 3))
print("\nPart 2: From ('K', 6) to ('E', 3)")
print("Path: ", result[1])
print("Tiles Traversed: ", result[0])

Part 1: From ('K', 6) to ('A', 6)
Path:  [('K', 6), ('K', 7), ('K', 8), ('K', 9), ('J', 9), ('I', 9), ('I', 8), ('I', 7), ('H', 7), ('G', 7), ('G', 8), ('G', 9), ('F', 9), ('F', 10), ('F', 11), ('E', 11), ('D', 11), ('C', 11), ('B', 11), ('A', 11), ('A', 10), ('A', 9), ('A', 8), ('A', 7), ('A', 6)]
Tiles Traversed:  24

Part 2: From ('K', 6) to ('E', 3)
Path:  [('K', 6), ('K', 5), ('J', 5), ('I', 5), ('I', 4), ('I', 3), ('J', 3), ('K', 3), ('K', 2), ('K', 1), ('J', 1), ('I', 1), ('H', 1), ('G', 1), ('G', 2), ('G', 3), ('F', 3), ('E', 3)]
Tiles Traversed:  17


# Question 2. States of the TicTacToe Board

In [1]:
import json

class TicTacToeProblem:
  def __init__(self):
    self.adj_list = {}

  # Generates all of the tic tac toe states
  def Generate(self, starting_state = 'X'):
    print("Generating States...")
    self._generate_states_("---------", starting_state)
    print(f"Done! Generated {len(self.adj_list)} states")

  def __check_leaf_state(self, current: str):
    win_indices = [
        [0, 1, 2], [3, 4, 5], [6,7, 8],
        [0, 3, 6], [1, 4, 7], [2, 5, 8],
        [0, 4, 8], [2, 4, 6]
    ]
    # Check if any of the players has won, thus no more states are valid after this point.
    for position in win_indices:
      if all(current[i] == 'O' for i in position) or all(current[i] == 'X' for i in position):
        return True

    # Checks if there are no more moves allowed.
    if all(tile != '-' for tile in current):
      return True
    return False

  # Generates the states using DFS
  def _generate_states_(self, current: str, move: str):
    # Base Case
    if self.__check_leaf_state(current):
      # Creates a leaf node in the adj_list
      self.adj_list[current] = set()
      return

    if current not in self.adj_list:
        self.adj_list[current] = set()
    moveComplement = 'X' if move == 'O' else 'O'

    # Iterates over next valid states
    for i in range(len(current)):
      if current[i] != '-':
        continue
      new_node = current[:i] + move + current[i+1:]

      # Adds the node adjacency to the adj_list
      self.adj_list[current].add(new_node)

      # Recursively generate states for the adjacent states
      self._generate_states_(new_node, moveComplement)

  def export_json(self, path = "states.json"):
    # Grabs the adjacency lists
    data = {key: list(values) for key, values in self.adj_list.items()}

    # Write the dictionary to a JSON file
    with open(path, "w") as json_file:
      json.dump(data, json_file, indent=4)
      print(f"Exported states to '{path}'.")

  def import_json(self, path = "states.json"):
    # Read the JSON data from the file
    with open(path, "r") as json_file:
      data = json.load(json_file)

    # Sets the adjacency lists
    self.adj_list = {key: set(values) for key, values in data.items()}
    print(f"Imported states from '{path}'.")


In [3]:
# Instantiates the TicTacToeProblem class that contains the state space generation
problem_2 = TicTacToeProblem()

# Generates with X going first.
%time problem_2.Generate('X')

# Outputs as json file
problem_2.export_json("game_states.json")

Generating States...
Done! Generated 5478 states
CPU times: user 4.22 s, sys: 17.3 ms, total: 4.24 s
Wall time: 4.69 s
Exported states to 'game_states.json'.


# Problem 3. MNIST Digit Classification

In [None]:
import tensorflow as tf
import numpy as np

# Load the dataset
(X_train, Y_train),(X_test, Y_test)=tf.keras.datasets.mnist.load_data()

# Normalize the grayscale pixel values to 0-1
X_train, X_test = X_train / 255.0, X_test / 255.0

print(X_train.shape, X_test.shape, Y_train.shape, Y_test.shape)

(60000, 28, 28) (10000, 28, 28) (60000,) (10000,)


In [None]:
# Defines the model
model = tf.keras.models.Sequential()
model.add(tf.keras.layers.InputLayer((28, 28)))
model.add(tf.keras.layers.Flatten())
model.add(tf.keras.layers.Dense(512, activation='relu'))
model.add(tf.keras.layers.Dense(256, activation='relu'))
model.add(tf.keras.layers.Dense(128, activation='relu'))
model.add(tf.keras.layers.Dense(10, activation='softmax'))

model.compile(optimizer='adam', loss="sparse_categorical_crossentropy", metrics=["sparse_categorical_accuracy"])
model.summary()

Model: "sequential_11"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 flatten_7 (Flatten)         (None, 784)               0         
                                                                 
 dense_38 (Dense)            (None, 512)               401920    
                                                                 
 dense_39 (Dense)            (None, 256)               131328    
                                                                 
 dense_40 (Dense)            (None, 128)               32896     
                                                                 
 dense_41 (Dense)            (None, 10)                1290      
                                                                 
Total params: 567434 (2.16 MB)
Trainable params: 567434 (2.16 MB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [None]:
# Trains the model
model.fit(X_train, Y_train, validation_split=0.2, batch_size=32, epochs=16)

Epoch 1/16
Epoch 2/16
Epoch 3/16
Epoch 4/16
Epoch 5/16
Epoch 6/16
Epoch 7/16
Epoch 8/16
Epoch 9/16
Epoch 10/16
Epoch 11/16
Epoch 12/16
Epoch 13/16
Epoch 14/16
Epoch 15/16
Epoch 16/16


<keras.src.callbacks.History at 0x7a52005acbe0>

In [None]:
from sklearn.metrics import confusion_matrix, classification_report

# Evaluates the model
Y_predict = np.argmax(model.predict(X_test), axis=1)

cm = confusion_matrix(Y_test, Y_predict)
print("Training Set Confusion Matrix")
print(cm)
print("Accuracy Report:")
print(classification_report(Y_test, Y_predict))

Training Set Confusion Matrix
[[ 969    0    1    1    0    2    3    2    2    0]
 [   0 1119    1    3    0    1    2    4    5    0]
 [   0    2  922   13    1    0    2   85    7    0]
 [   0    0    0  997    0    2    0    8    2    1]
 [   2    0    2    1  950    0    5    7    1   14]
 [   1    0    0   21    0  859    1    1    4    5]
 [   2    3    0    1    3    4  940    0    5    0]
 [   0    0    1    0    0    0    0 1022    1    4]
 [   1    0    3    7    1    7    0    8  943    4]
 [   2    2    0    5    4    1    0   18    5  972]]
Accuracy Report:
              precision    recall  f1-score   support

           0       0.99      0.99      0.99       980
           1       0.99      0.99      0.99      1135
           2       0.99      0.89      0.94      1032
           3       0.95      0.99      0.97      1010
           4       0.99      0.97      0.98       982
           5       0.98      0.96      0.97       892
           6       0.99      0.98      0.98

# Problem 4. California Housing Prices

In [None]:
import tensorflow as tf
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler


# # Load the dataset
df = pd.read_csv("cal_housing.data", header=None)
X = df.iloc[:, :-1].to_numpy()
Y = df.iloc[:, -1].to_numpy()

X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=42)


# Preprocess the dataset
X_scaler = StandardScaler()
X_train = X_scaler.fit_transform(X_train)
X_test = X_scaler.transform(X_test)

Y_scaler = StandardScaler()
Y_train = Y_scaler.fit_transform(Y_train.reshape(-1, 1)).flatten()

print(X_train.shape, X_test.shape, Y_train.shape, Y_test.shape)

(16512, 8) (4128, 8) (16512,) (4128,)


In [None]:
# Defines the model
model = tf.keras.models.Sequential()
model.add(tf.keras.layers.InputLayer(8))
model.add(tf.keras.layers.Dense(256, activation='relu'))
model.add(tf.keras.layers.Dropout(0.3))
model.add(tf.keras.layers.Dense(128, activation='relu'))
model.add(tf.keras.layers.Dropout(0.3))
model.add(tf.keras.layers.Dense(64, activation='relu'))
model.add(tf.keras.layers.Dropout(0.3))
model.add(tf.keras.layers.Dense(32, activation='relu'))
model.add(tf.keras.layers.Dropout(0.2))
model.add(tf.keras.layers.Dense(1))

model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0005), loss="mean_squared_error", metrics=["mse"])
model.summary()

Model: "sequential_26"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_101 (Dense)           (None, 256)               2304      
                                                                 
 dropout_26 (Dropout)        (None, 256)               0         
                                                                 
 dense_102 (Dense)           (None, 128)               32896     
                                                                 
 dropout_27 (Dropout)        (None, 128)               0         
                                                                 
 dense_103 (Dense)           (None, 64)                8256      
                                                                 
 dropout_28 (Dropout)        (None, 64)                0         
                                                                 
 dense_104 (Dense)           (None, 32)              

In [None]:
# Trains the model
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=16, restore_best_weights=True)
model.fit(X_train, Y_train, validation_split=0.2, batch_size=32, epochs=200, callbacks=[early_stopping])

Epoch 1/200
Epoch 2/200
Epoch 3/200
Epoch 4/200
Epoch 5/200
Epoch 6/200
Epoch 7/200
Epoch 8/200
Epoch 9/200
Epoch 10/200
Epoch 11/200
Epoch 12/200
Epoch 13/200
Epoch 14/200
Epoch 15/200
Epoch 16/200
Epoch 17/200
Epoch 18/200
Epoch 19/200
Epoch 20/200
Epoch 21/200
Epoch 22/200
Epoch 23/200
Epoch 24/200
Epoch 25/200
Epoch 26/200
Epoch 27/200
Epoch 28/200
Epoch 29/200
Epoch 30/200
Epoch 31/200
Epoch 32/200
Epoch 33/200
Epoch 34/200
Epoch 35/200
Epoch 36/200
Epoch 37/200
Epoch 38/200
Epoch 39/200
Epoch 40/200
Epoch 41/200
Epoch 42/200
Epoch 43/200
Epoch 44/200
Epoch 45/200
Epoch 46/200
Epoch 47/200
Epoch 48/200
Epoch 49/200
Epoch 50/200
Epoch 51/200
Epoch 52/200
Epoch 53/200
Epoch 54/200
Epoch 55/200
Epoch 56/200
Epoch 57/200
Epoch 58/200
Epoch 59/200
Epoch 60/200
Epoch 61/200
Epoch 62/200
Epoch 63/200
Epoch 64/200
Epoch 65/200
Epoch 66/200
Epoch 67/200
Epoch 68/200
Epoch 69/200
Epoch 70/200
Epoch 71/200
Epoch 72/200
Epoch 73/200
Epoch 74/200
Epoch 75/200
Epoch 76/200
Epoch 77/200
Epoch 78

<keras.src.callbacks.History at 0x7a51e89ab4c0>

In [None]:
from sklearn.metrics import mean_squared_error, mean_absolute_error, mean_absolute_percentage_error, r2_score

# Evaluate the model
Y_pred = Y_scaler.inverse_transform(model.predict(X_test).reshape(-1, 1)).flatten()

print("MSE: ", mean_squared_error(Y_test, Y_pred))
print("MAE: ", mean_absolute_error(Y_test, Y_pred))
print("MAPE: ", mean_absolute_percentage_error(Y_test, Y_pred))
print("R2_score: ", r2_score(Y_test, Y_pred))

MSE:  2641750938.5155864
MAE:  34744.302734375
MAPE:  0.20106786078966926
R2_score:  0.7984025611559922


In [None]:
# Calculate percentage difference
percentage_difference = np.abs((Y_pred - Y_test) / Y_test) * 100

# Get predictions within +-15%
within_threshold = np.count_nonzero(percentage_difference <= 15)
print(within_threshold / Y_pred.shape[0])

0.5363372093023255
