From a79f908198bfbfe671e0fbc441274df8f481343f Mon Sep 17 00:00:00 2001 From: Yash Alapuria Date: Mon, 31 Jul 2023 15:03:32 +0530 Subject: [PATCH 1/3] 1.4 --- mnest/Entities.py | 70 ++++++++++++-- mnest/NeuralNetwork.py | 208 +++++++++++++++++++++++++++++++++++++++++ mnest/perceptron.py | 89 ++++++++++++++++++ 3 files changed, 358 insertions(+), 9 deletions(-) create mode 100644 mnest/NeuralNetwork.py create mode 100644 mnest/perceptron.py diff --git a/mnest/Entities.py b/mnest/Entities.py index 5c91884..7ac9a16 100644 --- a/mnest/Entities.py +++ b/mnest/Entities.py @@ -2,6 +2,10 @@ from scipy.signal import convolve2d from .Laws import * import os +import random +import tensorflow as tf +from .perceptron import * +from .NeuralNetwork import Perceptron data_file_path = os.path.join(os.path.dirname(__file__), 'data', 'random_seed.txt') @@ -11,7 +15,7 @@ class Agent: def __init__(self, world, layer_name, child, position: Vector2 = Vector2(0, 0), direction=E, - brain_type='Q-Table', action_list=('move', 'stay')): + brain_type='Deep-Q', action_list=('move', 'stay')): self.world = world self.layer_name = layer_name @@ -23,7 +27,7 @@ def __init__(self, world, layer_name, child, position: Vector2 = Vector2(0, 0), # Fun Fact. # As We are storing the position of the element inside the layer. and not a copy of the values. - # It is stored as a reference and hence we do not have to update the world values everytime. + # It is stored as a reference and hence we do not have to update the world values every time. # Frankly for all those who are reading this. I was about to write code to update the values when I accidentally # ran the simulation to test and saw them updating automatically. # I jumped* up and down around the room (* Literally.) @@ -79,7 +83,7 @@ def move(self): self.direction *= -1 # Flip direction. # print('Reflect down') - # Right + # Down if self.position.y >= self.world.r_length: if self.world.periodic_boundary: self.position = 0 @@ -134,7 +138,7 @@ def __init__(self, world, layer_name, def disperse(self): """ Uses matrices and convolutions to disperse the essence - In general the rule of thumb is that the dispersion matrix con have values where the total of the values is 1. + In general the rule of thumb is that the dispersion matrix can have values where the total of the values is 1. Also, the sum of all values that is dispersed(Total - Center_Value) = 1 - Center_Value. of the matrix # we need the original layer :return: @@ -155,6 +159,21 @@ def decay(self, decay_type): mask = self.world.layers[self.layer_name] < 0 self.world.layers[self.layer_name][mask] = 0 +''' +# Defining The Q-Network Architecture +class QNetwork(tf.keras.Model): + def __init__(self, state_dim, action_dim): + super(QNetwork, self).__init__() + self.FirstLayer = tf.keras.layers.Dense(64, activation='relu') + self.SecondLayer = tf.keras.layers.Dense(64, activation='relu') + self.OutputLayer = tf.keras.layers.Dense(action_dim) + + def call(self, state): + x = self.FirstLayer(state) + x = self.SecondLayer(x) + output = self.OutputLayer(x) + return output +''' # AI for the Entities. class Brain: @@ -171,10 +190,15 @@ def __init__(self, brain_type: str, action_list: list, learning_rate=0.2, self.min_exploration = min_exploration self.discounted_return = discounted_return # Gamma or Lambda + # Neural Network variables + '''self.q_network = QNetwork(sta)''' + if self.brain_type == 'Q-Table': self.q_table = {} elif self.brain_type == 'Deep-Q': - pass + print("Hello There 3") + self.q_table = Perceptron() + # pass else: print('There seems to be some mistake on the brain type.') @@ -208,6 +232,7 @@ def predict_action(self, state: str): self.add_state(state) else: + ''' Changes in PRedict_Action''' # Exploit if state in self.q_table: q_values = self.q_table[state] # q_values for that state @@ -225,8 +250,32 @@ def predict_action(self, state: str): self.exploration_rate -= self.exploration_decay elif self.brain_type == 'Deep-Q': - action = 0 - pass + # action = 0 + # print("Hello There 1") + # pass + + '''# Calling Perceptron.predict()''' + +# training = Perceptron(state)[0] +# action = np.argmax(training) + + # Checking Exploration or Exploitation + if np.random.random() < self.exploration_rate: + # Explore + action = np.random.randint(len(self.action_list)) + + else: + # Exploit + action = Perceptron.predict(state) + + if self.exploration_rate > self.min_exploration: + self.exploration_rate -= self.exploration_decay + + '''Exploration => Learning part, call predict for prediction. Fit is for learning. So fit will run only once, and predict everytime (or something like that) + + # Decaying exploration_rate + if self.exploration_rate > self.min_exploration: + self.exploration_rate -= self.exploration_decay ''' else: action = None @@ -237,7 +286,7 @@ def predict_action(self, state: str): def learn(self, state_observed: str, action_taken: int, next_state: str, reward_earned: float): """ - + :param next_state: :param state_observed: :param action_taken: @@ -258,7 +307,10 @@ def learn(self, state_observed: str, action_taken: int, next_state: str, reward_ values_next_state[action_taken] = new_value elif self.brain_type == 'Deep-Q': - pass + Perceptron.fit() + + # pass + else: print('There seems to be some mistake on the brain type.') pass diff --git a/mnest/NeuralNetwork.py b/mnest/NeuralNetwork.py new file mode 100644 index 0000000..ecf6b28 --- /dev/null +++ b/mnest/NeuralNetwork.py @@ -0,0 +1,208 @@ +import random +from typing import Any +import numpy as np +import sys +import h5py +import os +import pickle +# from .perceptron import * + +# class NeuralNetwork(Perceptron.fit): +# def __init__(self, state_dim, action_dim): +# super(NeuralNetwork, self).__init__() +# self.FirstLayer = Perceptron.predict +# self.SecondLayer = Perceptron.predict + + +''' +1. Adam Optimizer +` 2. MeanSquareError ` +3. QNetwork +4. DenseLayer +5. Agent +6. Neural Network + +''' + +global reward + + +class Optimizer: + pass + + + +class MyMeanSquareError: + def __call__(self, y_true, y_pred): + return np.mean(np.square(y_true - y_pred)) + + + +class ModelCheckpoint: + # def save_model(self, model): + # # Save the model architecture + # model.save + + def __init__(self, model): + self.model = model + self.step_count = 0 + + def saveModelWeights(self, step_count, save_path): + # Save Model Weights + with open(save_path + "_weights.pkl", "wb") as f: + pickle.dump(self.model.get_weights(), f) + + # Save Step Count + with open(save_path + "_stepCounts.pkl", "wb") as f: + pickle.dump(self.step_count, f) + + def loadModelWeights(self, save_path): + # Load Model Weights + with open(save_path + "_weights.pkl", "rb") as f: + weights = pickle.load(f) + self.model.set_weights(weights) + + # Load Step Count + with open(save_path + '_stepCount.pkl', "rb") as f: + self.step_count = pickle.load(f) + + def incrementStepCount(self): + self.step_count += 1 + +# This is Neural Network with 1 hidden layer +class Perceptron: + def __init__(self, data, weights = None): + print("Perceptron Init Ran") + random.shuffle(data) + inputs = np.array([[float(x) for x in row[0:-1]] for row in data]) + self.inputs = np.hstack((inputs, [[1]] * len(inputs))) # Append 1 to each input row, for the bias weight + self.outputs = np.array([float(row[-1]) for row in data]) # Change no. of o/p to no. of actions + self.numInputs = len(self.inputs[0]) + + if weights == None: + weights = np.array([random.uniform(0, 100) \ + for x in range(self.numInputs)]) + weights[-1] = -1 # Set initial value of bias weights + self.weights = weights + self.error = float(sys.maxsize) # Initialise error to some very high value + self.smallestError = self.error + self.bestWeights = self.weights + self.fitHistory = [] + + def predict(self, x_i): + # Activation functions is the dot product of input vector and weight vector + y = np.dot(x_i, self.weights) + + if y > 0: + return y + else : + return 0.01*y + + ''' + def fit(self, state_dim, action_dim, learning_rate=0.001, gamma=0.99, epsilon=1.0, epsilon_decay=0.999, epsilon_min=0.01, numIters = 100, breakSoon = True): + self.state_dim = state_dim + self.action_dim = action_dim + self.learning_rate = learning_rate + self.gamma = gamma + self.epsilon = epsilon + self.epsilon_decay = epsilon_decay + self.epsilon_min = epsilon_min + self.loss_fn = MyMeanSquareError(1, 0.83) + ''' + + ''' + def nextMove(location, action): + row, col = location + if action == "UP": + return (row - 1, col) + elif action == "DOWN": + return (row + 1, col) + elif action == "LEFT": + return(row, col - 1) + elif action == "RIGHT": + return(row, col + 1) + else: + return location + ''' + + # Deciding Rewards & Punishment + def Reward(self, location): + + self.location = location + if location == 'Target': + print("Reached Target") + return 100 + + elif location == "Home": + print("Reached Home") + return 200 + + # elif location != "Home" and location != "Target": + elif location != "Home" or location != "Target": + return -300 + + else: + return -1 + + + def fit(self, state, lr = 0.5, numIters = 100, breakSoon = True): + errorList = [] + for iter in range(numIters): + totalError = 0.0 + for i in range(len(self.outputs)): + + # Checking The Difference Between The Actual & Predicted Output + pred = self.predict(self.inputs[i]) + # Error is the difference between true and predicted class + error = self.outputs[i] - pred + + + '''# Rewards Function''' + # Calling Reward Function + self.state = state + reward = Perceptron.Reward(state) + reward += reward + print("Total rewards: " + reward) + + + # Multiplying with the error yields a positive or negative adjustment depending on a positive or negative prediction error + self.weights = self.weights + \ + lr * error * self.inputs[i] + # totalError += abs(error)**2 + totalError += abs(error) + + self.saveBestFit(self.weights, totalError) + if breakSoon: + if totalError == 0.0: + break + self.printWeights() + errorList.append(totalError) + + # Store error history for the convenient plotting + self.fitHistory = errorList + self.error = totalError + + + + # Store the best performing weights for reuse + def saveBestFit(self, w, e): + if e < self.smallestError: + self.smallestError = e + self.bestWeights = w + + def printWeights(self): + print("\t".join(map(str, self.weights)), file=sys.stderr) + + # Ideally we should split data into train/test sets to feed this method. For now, just use the data passed during initialization. + def test(self): + e = 0.0 + for i in range(len(self.inputs)): + pred = self.predict(self.inputs[i]) + e += self.outputs[i] - pred + print(e, file=sys.stdout) + + def __str__(self) -> str: + s = "Inputs (1 sample): {}\n".format(self.inputs[0]) + s += "weights: {}\n".format(self.weights) + s += "Error: {}\n".format(self.error) + return s \ No newline at end of file diff --git a/mnest/perceptron.py b/mnest/perceptron.py new file mode 100644 index 0000000..7789dfd --- /dev/null +++ b/mnest/perceptron.py @@ -0,0 +1,89 @@ +import random +import numpy as np +import sys + + +class Perceptron: + def __init__(self, data, weights = None): + random.shuffle(data) + inputs = np.array([[float(x) for x in row[0:-1]] for row in data]) + self.inputs = np.hstack((inputs, [[1]] * len(inputs))) # Append 1 to each input row, for the bias weight + self.outputs = np.array([float(row[-1]) for row in data]) + self.numInputs = len(self.inputs[0]) + + ''' + Below until line 22, it says about the weights of the neuron. And there are 100 neuron in the Hidden Layer. + Line 21 tells us that inital weight is '-1' + ''' + if weights == None: + weights = np.array([random.uniform(0, 100) \ + for x in range(self.numInputs)]) + weights[-1] = -1 # set initial value of bias weight + self.weights = weights + self.error = float(sys.maxsize) # initialise error to some very high value + self.smallestError = self.error + self.bestWeights = self.weights + self.fitHistory = [] + + # Changing into Leaky ReLU + def predict(self, x_i): + ''' + - I will call this Bit Function + - There is no normalization going on in this function + - This function is similar to 'RELU', except relu passes the (x, 0) & This function passes (1. 0). + ''' + y = np.dot(x_i, self.weights) # Activation function is the dot product of input vector and weight vector + + # return 1 if y > 0 else 0 + + if y > 0: + return y + else: + return 0.01*y + + ''' + - lr => Learning Rate. (In general) LR is used to change the weight. Higher the LR, faster the NN will be, but it will be less effective. + + - numIters => numIters are basically number of iteration which will take place. Here, it will be 100. + ''' + def fit(self, lr=1, numIters = 100, breakSoon=True): + errorList = [] + for iter in range(numIters): + totalError = 0.0 + for i in range(len(self.outputs)): + pred = self.predict(self.inputs[i]) + error = self.outputs[i] - pred # Error is the difference between true and predicted class + self.weights = self.weights + \ + lr * error * self.inputs[i] # multiplying with the error yields a positive or negative adjustment depending on a positive or negative prediction error + totalError += abs(error) + + self.saveBestFit(self.weights, totalError) + if breakSoon: + if totalError == 0.0: + break + self.printWeights() + errorList.append(totalError) + + self.fitHistory = errorList # Store error history for convenient plotting + self.error = totalError + + def saveBestFit(self, w, e): # Store the best performing weights for reuse + if e < self.smallestError: + self.smallestError = e + self.bestWeights = w + + def printWeights(self): + print("\t".join(map(str, self.weights)), file=sys.stderr) + + def test(self): # Ideally we should split data into train/test sets to feed this method. For now, just use the data passed during initialisation. + e = 0.0 + for i in range(len(self.inputs)): + pred = self.predict(self.inputs[i]) + e += self.outputs[i] - pred + print(e, file=sys.stdout) + + def __str__(self): + s = "inputs (1 sample): {}\n".format(self.inputs[0]) + s += "weights: {}\n".format(self.weights) + s += "error: {}\n".format(self.error) + return s From 293270e5114ff83a88f2ae7ade2023e200c321fa Mon Sep 17 00:00:00 2001 From: Yash Alapuria Date: Fri, 4 Aug 2023 12:36:23 +0530 Subject: [PATCH 2/3] 1.4.1 --- mnest/Entities.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mnest/Entities.py b/mnest/Entities.py index 7ac9a16..a6db411 100644 --- a/mnest/Entities.py +++ b/mnest/Entities.py @@ -3,8 +3,8 @@ from .Laws import * import os import random -import tensorflow as tf -from .perceptron import * +# import tensorflow as tf +# from .perceptron import * from .NeuralNetwork import Perceptron From 2aaca91c1f72900253a3c638894b6ffeed59b4cd Mon Sep 17 00:00:00 2001 From: Yash Alapuria Date: Tue, 17 Oct 2023 12:31:31 +0530 Subject: [PATCH 3/3] Done with adding the Neural Network, testing needs to be done --- mnest/Entities.py | 14 +- mnest/NeuralNetwork.py | 4 +- mnest/TestNeuralNetwork.py | 288 +++++++++++++++++++++++++++++++++++++ mnest/test.py | 128 +++++++++++++++++ 4 files changed, 427 insertions(+), 7 deletions(-) create mode 100644 mnest/TestNeuralNetwork.py create mode 100644 mnest/test.py diff --git a/mnest/Entities.py b/mnest/Entities.py index a6db411..3f596e7 100644 --- a/mnest/Entities.py +++ b/mnest/Entities.py @@ -6,6 +6,7 @@ # import tensorflow as tf # from .perceptron import * from .NeuralNetwork import Perceptron +import TestNeuralNetwork data_file_path = os.path.join(os.path.dirname(__file__), 'data', 'random_seed.txt') @@ -196,12 +197,13 @@ def __init__(self, brain_type: str, action_list: list, learning_rate=0.2, if self.brain_type == 'Q-Table': self.q_table = {} elif self.brain_type == 'Deep-Q': - print("Hello There 3") - self.q_table = Perceptron() + print("Deep-Q Started") + self.q_table = TestNeuralNetwork.Perceptron() # pass else: print('There seems to be some mistake on the brain type.') + # state of add_state becomes the i/p for the perceptron def add_state(self, state: str): """ This function is applicable to the Q-Table type Brain. @@ -235,6 +237,7 @@ def predict_action(self, state: str): ''' Changes in PRedict_Action''' # Exploit if state in self.q_table: + # q_value is the array of output. q_values = self.q_table[state] # q_values for that state predict_list = np.where(q_values == max(q_values))[0] # list of all indices with max q_values action = np.random.choice(predict_list) @@ -266,12 +269,12 @@ def predict_action(self, state: str): else: # Exploit - action = Perceptron.predict(state) + action = TestNeuralNetwork.Perceptron.predict(state) if self.exploration_rate > self.min_exploration: self.exploration_rate -= self.exploration_decay - '''Exploration => Learning part, call predict for prediction. Fit is for learning. So fit will run only once, and predict everytime (or something like that) + '''Exploration => Learning part, call predict for prediction. Fit is for learning. So fit will run only once, and predict every time (or something like that) # Decaying exploration_rate if self.exploration_rate > self.min_exploration: @@ -307,8 +310,7 @@ def learn(self, state_observed: str, action_taken: int, next_state: str, reward_ values_next_state[action_taken] = new_value elif self.brain_type == 'Deep-Q': - Perceptron.fit() - + TestNeuralNetwork.main() # pass else: diff --git a/mnest/NeuralNetwork.py b/mnest/NeuralNetwork.py index ecf6b28..89a4fff 100644 --- a/mnest/NeuralNetwork.py +++ b/mnest/NeuralNetwork.py @@ -74,6 +74,8 @@ class Perceptron: def __init__(self, data, weights = None): print("Perceptron Init Ran") random.shuffle(data) + + # state would be 1 step at a time inputs = np.array([[float(x) for x in row[0:-1]] for row in data]) self.inputs = np.hstack((inputs, [[1]] * len(inputs))) # Append 1 to each input row, for the bias weight self.outputs = np.array([float(row[-1]) for row in data]) # Change no. of o/p to no. of actions @@ -92,7 +94,7 @@ def __init__(self, data, weights = None): def predict(self, x_i): # Activation functions is the dot product of input vector and weight vector y = np.dot(x_i, self.weights) - +# Modify it so that gives array output if y > 0: return y else : diff --git a/mnest/TestNeuralNetwork.py b/mnest/TestNeuralNetwork.py new file mode 100644 index 0000000..d335249 --- /dev/null +++ b/mnest/TestNeuralNetwork.py @@ -0,0 +1,288 @@ +import random +import numpy as np +from collections import deque +import sys + +''' +Here, considering number of agents to be 100 +''' + +class WorldEnvironment: + + global sense_state + + def __init__(self, gridSize): + self.gridSize = gridSize + # Initializing Grid: Create a 2D array of the world with all cells set to None (empty) + self.grid = np.zeros(gridSize) + + def sense_state(self, sense_type): + """ + + :param sense_type: + :return: + """ + # first update the state_list then sense the state. + self.update() + + if sense_type == 'Initial': + self.current_observed_state = self.state_hash + # print('initial', self.current_observed_state, self.result_observed_state) + elif sense_type == 'Final': + self.result_observed_state = self.state_hash + # print('final', self.current_observed_state, self.result_observed_state) + else: + print('Something seems wrong with the sense type given.') + + WorldEnvironment.sense_state = sense_type + + def reset(self): + # Reset grid to initial state + self.grid = np.zeros(self.gridSize) + ''' + Placing 100 ants randomly on the grid + + For testing consider 2-10 + ''' + for _ in range(10): + row = np.random.randint(0, self.gridSize[0]) + col = np.random.randint(0, self.gridSize[1]) + self.grid[row, col] = 1 + + def step(self, actions, next_state): + # Reward for each agent + rewards = np.zeros(10) # Change to 100 + + for agent in range(10): # Change to 100 + if next_state[agent] == WorldEnvironment.sense_state[agent]: + rewards[agent] += 1 + + for i in range(10): # Change to 100 + # Moving Up + if actions[i] == 0 and self.grid[i // 10, i % 10] != 0: + newRow = max(i // 10 - 1, 0) + if self.grid[newRow, i % 10] == 0: + self.grid[i // 10, i % 10] = 0 + self.grid[newRow, i % 10] = 1 + + # Moving Down + elif actions[i] == 1 and self.grid[i // 10, i % 10] != 0: + newRow = min(i // 10 + 1, self.grid_size[0] - 1) + if self.grid[newRow, i % 10] == 0: + self.grid[i // 10, i % 10] = 0 + self.grid[newRow, i % 10] = 1 + + # Moving Left + elif actions[i] == 2 and self.grid[i // 10, i % 10] != 0: + newCol = max(i % 10 - 1, 0) + if self.grid[i // 10, newCol] == 0: + self.grid[i // 10, i % 10] = 0 + self.grid[i // 10, newCol] = 1 + + # Moving Right + elif actions[i] == 3 and self.grid[i // 10, i % 10] != 0: + newCol = min(i % 10 + 1, self.gridSize[1] - 1) + if self.grid[i // 10, newCol] == 0: + self.grid[i // 10, i % 10] = 0 + self.grid[i // 10, newCol] = 1 + + # Calculating The Reward Based On The Environment + ''' + if self.grid[i // 10, i % 10] == 1: + rewards[i] = 1 + ''' + for j in range(10): # Change to 100 + if next_state[i] == next_state[j]: + rewards[i] -= 0.5 + rewards[j] -= 0.5 + + # return rewards, self.grid.copy() + return rewards + +class ExperienceReplay: + def __init__(self, buffer_size): + self.buffer_size = buffer_size + self.buffer = deque(maxlen=buffer_size) + + def add_experience(self, experience): + self.buffer.append(experience) + + def sample_batch(self, batch_size): + if len(self.buffer) < batch_size: + return None + + batch = random.sample(self.buffer, batch_size) + states, actions, rewards, next_states, done_flags = zip(*batch) + + return states, actions, rewards, next_states, done_flags + + def size(self): + return len(self.buffer) + + def is_full(self): + return len(self.buffer) == self.buffer_size + + def clear(self): + self.buffer.clear() + pass + + +# Five Hidden Layer +class NeuralNetwork: + def __init__(self, input_dim, output_dim, hidden_dim): + self.input_dim = input_dim + self.output_dim = output_dim + self.hidden_dim = hidden_dim + + # Initialize weights and biases for each agents + self.weights = {} + self.biases = {} + + # Create separate networks for each agent + self.networks = [] + # Assuming each agent has its own output + for agent in range(output_dim): + network = { + 'input_hidden': np.random.randn(input_dim, hidden_dim[0]), + # Single output per agent + 'hidden_output': np.random.randn(hidden_dim[-1], 1) + } + self.weights[agent] = network + self.biases[agent] = { + 'input_hidden': np.zeros((1, hidden_dim[0])), + 'hidden_output': np.zeros((1, 1)) + } + self.networks.append(network) + + # Forward pass through the Q-Network for the specified agent + def forward(self, state, agent): + x = state + for layer in range(self.num_hidden_layers): + x = np.dot(x, self.weights[agent]['input_hidden']) + self.biases[agent]['input_hidden'] + x = np.maximum(0, x) #ReLU Activation + + q_values = np.dot(x, self.weights[agent]['hidden_output']) + self.biases[agent]['hidden_output'] + return q_values + + def get_weights(self, agent): + return self.weights[agent], self.biases[agent] + + def set_weights(self, agent, weights, biases): + self.weights[agent] = weights + self.biases[agent] = biases + + ''' + # Deciding Rewards & Punishment + def Rewards(self, location): + self.location = location + if location == 'Target': + print("Reached Target") + return 100 + + elif location == "Home": + print("Reached Home") + return 200 + + # elif location != "Home" and location != "Target": + elif location != "Home" or location != "Target": + return -300 + + else: + return -1 + ''' + + +class Perceptron: + def __init__(self, data, hidden_dim, weights = None): + print("Perceptron with Hidden Layer Init Ran") + random.shuffle(data) + + inputs = np.array([[float(x) for x in row[0:-1]] for row in data]) + self.inputs = np.hstack((inputs, [[1]] * len(inputs))) # Append 1 to each input row for bias weights + self.outputs = np.array([float(row[-1]) for row in data]) + self.num_inputs = len(self.inputs[0]) + + self.hidden_dim = hidden_dim + if weights is None: + weights = np.array([random.uniform(0, 100) for _ in range(self.num_inputs)]) + weights[-1] = -1 # Set Initial value of bias weights + self.weights = weights + self.hidden_weights = np.array([random.uniform(0, 100) for _ in range(hidden_dim)]) + self.error = float(sys.maxsize) + self.smallest_error = self.error + self.best_weights = self.weights + self.fit_history = [] + + def predict(self, x_i): + hidden_activation = np.dot(x_i, self.hidden_weights) + hidden_output = np.maximum(hidden_activation, 0) # ReLU Activation + combined = np.hstack((hidden_output, [1])) # Append 1 for bias weights + output = np.dot(combined, self.weights) + if output > 0: + return output + else: + return 0.01 * output + + def fit(self, state, lr=0.5, num_iters=100, break_soon=True): + error_list = [] + for iter in range(num_iters): + total_error = 0.0 + for i in range(len(self.outputs)): + pred = self.predict(self.inputs[i]) + error = self.outputs[i] - pred + + self.state = state + reward = WorldEnvironment.step() # Called Reward function + reward += reward + print("Total Rewards: ", reward) + + self.weights = self.weights + lr * error * self.inputs[i] + total_error += abs(error) + + self.save_best_fit(self.weights, total_error) + if break_soon: + if total_error == 0.0: + break + self.print_weights() + error_list.append(total_error) + + self.fit_history = error_list + self.error = total_error + + def save_best_fit(self, w, e): + if e < self.smallest_error: + self.smallest_error = e + self.best_weights = w + + def print_weights(self): + print("\t".join(map(str, self.weights)), file=sys.stderr) + + def test(self): + e = 0.0 + for i in range(len(self.inputs)): + pred = self.predict(self.inputs[i]) + e += self.outputs[i] - pred + print(e, file=sys.stdout) + + def __str__(self) -> str: + s = "Input (1 sample): {}\n".format(self.inputs[0]) + s += "Weights: {}\n".format(self.weights) + s += "Hidden Weights: {}\n".format(self.hidden_weights) + s += "Error: {}\n".format(self.error) + + +def main(): + # Creating The Environment + gridSize = (10, 10) + env = WorldEnvironment(gridSize) + + # Reset The Environment + env.reset() + + # Simulate a few steps in the environment + for _ in range(10): + # Random Action for 100 agents + actions = np.random.randint(0, 4, size=100) + rewards, newGrid = env.step(actions) + print("Rewards:", rewards) + print(newGrid) \ No newline at end of file diff --git a/mnest/test.py b/mnest/test.py new file mode 100644 index 0000000..c08ddf5 --- /dev/null +++ b/mnest/test.py @@ -0,0 +1,128 @@ +import numpy as np + +class DQN: + def __init__(self, input_dim, output_dim, hidden_dims): + self.input_dim = input_dim + self.output_dim = output_dim + self.hidden_dims = hidden_dims + self.num_hidden_layers = len(hidden_dims) + + # Initialize weights and biases for the Q-network + self.weights = {} + self.biases = {} + + # Create separate networks for each agent + self.networks = [] + for agent in range(output_dim): # Assuming each agent has its own output + network = { + 'input_hidden': np.random.randn(input_dim, hidden_dims[0]), + 'hidden_output': np.random.randn(hidden_dims[-1], 1) # Single output per agent + } + self.weights[agent] = network + self.biases[agent] = { + 'input_hidden': np.zeros((1, hidden_dims[0])), + 'hidden_output': np.zeros((1, 1)) + } + self.networks.append(network) + + def forward(self, state, agent): + # Forward pass through the Q-network for the specified agent + x = state + for layer in range(self.num_hidden_layers): + x = np.dot(x, self.weights[agent]['input_hidden']) + self.biases[agent]['input_hidden'] + x = np.maximum(0, x) # ReLU activation + + q_values = np.dot(x, self.weights[agent]['hidden_output']) + self.biases[agent]['hidden_output'] + return q_values + + def get_weights(self, agent): + return self.weights[agent], self.biases[agent] + + def set_weights(self, agent, weights, biases): + self.weights[agent] = weights + self.biases[agent] = biases + + + + + + + +class PerceptronWithHiddenLayer: + def __init__(self, data, hidden_dim, weights=None): + print("Perceptron with Hidden Layer Init Ran") + random.shuffle(data) + + inputs = np.array([[float(x) for x in row[0:-1]] for row in data]) + self.inputs = np.hstack((inputs, [[1]] * len(inputs))) # Append 1 to each input row for bias weight + self.outputs = np.array([float(row[-1]) for row in data]) + self.num_inputs = len(self.inputs[0]) + + self.hidden_dim = hidden_dim + if weights is None: + weights = np.array([random.uniform(0, 100) for _ in range(self.num_inputs)]) + weights[-1] = -1 # Set initial value of bias weights + self.weights = weights + self.hidden_weights = np.array([random.uniform(0, 100) for _ in range(hidden_dim)]) + self.error = float(sys.maxsize) + self.smallest_error = self.error + self.best_weights = self.weights + self.fit_history = [] + + def predict(self, x_i): + hidden_activation = np.dot(x_i, self.hidden_weights) + hidden_output = np.maximum(hidden_activation, 0) # ReLU activation + combined = np.hstack((hidden_output, [1])) # Append 1 for bias + output = np.dot(combined, self.weights) + if output > 0: + return output + else: + return 0.01 * output + + def fit(self, state, lr=0.5, num_iters=100, break_soon=True): + error_list = [] + for iter in range(num_iters): + total_error = 0.0 + for i in range(len(self.outputs)): + pred = self.predict(self.inputs[i]) + error = self.outputs[i] - pred + + self.state = state + reward = PerceptronWithHiddenLayer.reward(state) + reward += reward + print("Total rewards:", reward) + + self.weights = self.weights + lr * error * self.inputs[i] + total_error += abs(error) + + self.save_best_fit(self.weights, total_error) + if break_soon: + if total_error == 0.0: + break + self.print_weights() + error_list.append(total_error) + + self.fit_history = error_list + self.error = total_error + + def save_best_fit(self, w, e): + if e < self.smallest_error: + self.smallest_error = e + self.best_weights = w + + def print_weights(self): + print("\t".join(map(str, self.weights)), file=sys.stderr) + + def test(self): + e = 0.0 + for i in range(len(self.inputs)): + pred = self.predict(self.inputs[i]) + e += self.outputs[i] - pred + print(e, file=sys.stdout) + + def __str__(self) -> str: + s = "Inputs (1 sample): {}\n".format(self.inputs[0]) + s += "weights: {}\n".format(self.weights) + s += "hidden_weights: {}\n".format(self.hidden_weights) + s += "Error: {}\n".format(self.error) + return s