In [None]:
!pip install pygame
!pip install numpy
!pip install tensorflow
!pip install keras




In [None]:
import pygame
import random
import numpy as np
import tensorflow as tf
from tensorflow import keras

# Initialize Pygame
pygame.init()

# Constants
WIDTH, HEIGHT = 300, 300
CELL_RADIUS = 1
CELL_COLOR = (0, 255, 0)
FOOD_COLOR = (255, 0, 0)
BOUNDARY_COLOR = (255, 255, 255)
BG_COLOR = (0, 0, 0)
VISION_FIELD_SIZE = 20  # Size of the vision field (extends 20 pixels in each direction)
FOOD_SIZE = 3  # Size of food particles
FONT_SIZE = 20
LIVING_PENALTY = 0.01

# Create a window
screen = pygame.display.set_mode((WIDTH, HEIGHT))
pygame.display.set_caption("Deep Q-Learning for Cell Navigation")

# Font
font = pygame.font.Font(None, FONT_SIZE)


import socket
import json

# Define the server and port for the dashboard
DASHBOARD_HOST = 'localhost'
DASHBOARD_PORT = 12345

# Create a socket to connect to the dashboard
dashboard_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
dashboard_socket.connect((DASHBOARD_HOST, DASHBOARD_PORT))

# Function to send data to the dashboard
def send_data_to_dashboard(tstamp, loss_data, visual_field_data):
    data = {
        'timestamp': tstamp,
        'loss_1': loss_data[0],
        'loss_2': loss_data[1],
        'loss_3': loss_data[2],
        'loss_4': loss_data[3],
        'image_1': visual_field_data[0].tolist(),
        'image_2': visual_field_data[1].tolist(),
        'image_3': visual_field_data[2].tolist(),
        'image_4': visual_field_data[3].tolist(),
    }
    data_json = json.dumps(data)
    dashboard_socket.sendall(data_json.encode('utf-8'))
    print("sending:")
    print(data)


# Food class
class Food:
    def __init__(self, x, y):
        self.x = x
        self.y = y
        self.radius = FOOD_SIZE

    def draw(self):
        pygame.draw.circle(screen, FOOD_COLOR, (self.x, self.y), self.radius)

# Cell class
class Cell:
    def __init__(self, x, y, color, mother_cell=True):
        self.x = x
        self.y = y
        self.size = CELL_RADIUS
        self.color = color
        self.vision_field = np.zeros((2 * VISION_FIELD_SIZE + 1, 2 * VISION_FIELD_SIZE + 1))
        self.actions = 4
        self.q_network = self.build_q_network()
        self.target_network = self.build_q_network()
        self.update_target_network()
        self.memory = []
        self.gamma = 0.6
        self.epsilon = 0.2
        self.batch_size = 32
        self.frames_without_food = 0
        self.frames_to_survive = random.randint(200, 600)
        self.lifespan = 1000
        self.frames_alive = 0
        self.max_size = CELL_RADIUS * 3
        self.mother_cell = mother_cell
        self.label_text = font.render("Mother "+str(self.size), True, (255, 255, 255)) if mother_cell else font.render("Daughter", True, (255, 255, 255))

        # Load model weights if not a mother cell
        if not mother_cell:
            color_string = f"({self.color[0]}, {self.color[1]}, {self.color[2]})"
            self.q_network.load_weights("cell_brain_"+(color_string)+".h5")
            self.target_network.load_weights("cell_brain_"+(color_string)+".h5")

    def build_q_network(self):
        model = keras.Sequential([
            keras.layers.Input(shape=(2 * VISION_FIELD_SIZE + 1, 2 * VISION_FIELD_SIZE + 1, 1)),
            keras.layers.Conv2D(32, (3, 3), activation='relu'),
            keras.layers.Flatten(),
            keras.layers.Dense(64, activation='relu'),
            keras.layers.Dense(self.actions)
        ])
        model.compile(optimizer=keras.optimizers.Adam(learning_rate=0.0001), loss='mse')
        return model

    def update_target_network(self):
        self.target_network.set_weights(self.q_network.get_weights())

    def act(self, state):
        if np.random.rand() < self.epsilon:
            return random.choice(range(self.actions))
        q_values = self.q_network.predict(np.expand_dims(state, axis=0))
        return np.argmax(q_values)

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
        if len(self.memory) > 2500:
            self.memory.pop(0)

    def replay(self):
        if len(self.memory) < self.batch_size:
            return
        samples = random.sample(self.memory, self.batch_size)
        states, actions, rewards, next_states, dones = zip(*samples)
        states = np.stack(states)
        next_states = np.stack(next_states)
        q_values = self.q_network.predict(states)
        next_q_values = self.target_network.predict(next_states)
        for i in range(self.batch_size):
            if dones[i]:
                q_values[i][actions[i]] = rewards[i]
            else:
                q_values[i][actions[i]] = rewards[i] + self.gamma * np.max(next_q_values[i])
        history = self.q_network.fit(states, q_values, verbose=0, epochs=1)
        print(f"Training loss: {history.history['loss'][0]:.4f}")

    def move(self, action):
        if action == 0:
            self.y -= 4
        elif action == 1:
            self.y += 4
        elif action == 2:
            self.x -= 4
        elif action == 3:
            self.x += 4
        self.x = max(self.size, min(self.x, WIDTH - self.size))
        self.y = max(self.size, min(self.y, HEIGHT - self.size))

    def update_vision_field(self, foods, cells):
        self.vision_field = np.zeros((2 * VISION_FIELD_SIZE + 1, 2 * VISION_FIELD_SIZE + 1))
        for dx in range(-VISION_FIELD_SIZE, VISION_FIELD_SIZE + 1):
            for dy in range(-VISION_FIELD_SIZE, VISION_FIELD_SIZE + 1):
                x, y = self.x + dx, self.y + dy

                # Check if the position is within the screen boundaries
                if 0 <= x < WIDTH and 0 <= y < HEIGHT:
                    # Check for food particles
                    for food in foods:
                        if abs(x - food.x) <= food.radius and abs(y - food.y) <= food.radius:
                            self.vision_field[dx + VISION_FIELD_SIZE][dy + VISION_FIELD_SIZE] = 1

                    # Check for other cells
                    for other_cell in cells:
                        if other_cell != self and abs(x - other_cell.x) <= other_cell.size and abs(y - other_cell.y) <= other_cell.size:
                            self.vision_field[dx + VISION_FIELD_SIZE][dy + VISION_FIELD_SIZE] = -1

                else:
                    # The position is outside the screen boundaries, you can mark this as a boundary
                    self.vision_field[dx + VISION_FIELD_SIZE][dy + VISION_FIELD_SIZE] = -1

    def split(self):
        if self.frames_alive >= self.lifespan:
            self.frames_alive = 0
            new_x = self.x + random.randint(-self.size, self.size)
            new_y = self.y + random.randint(-self.size, self.size)
            return Cell(new_x, new_y, self.color, mother_cell=False)
        return None

    def feed(self, food):
        distance = ((self.x - food.x) ** 2 + (self.y - food.y) ** 2) ** 0.5
        if distance < self.size + FOOD_SIZE:
            self.size += 0.1  # Increase the size when feeding
            if(self.size > 5):
                self.size = 5
            self.lifespan += 50
            return True
        return False

    def draw(self):
    # Draw the cell with its current size
        pygame.draw.circle(screen, self.color, (self.x, self.y), self.size)

    # Draw the label text
        screen.blit(font.render("Mother "+ str(self.size) if self.mother_cell else "Daughter "+ str(self.size), True, (255, 255, 255)), (self.x - int(self.size), self.y - int(self.size) - FONT_SIZE))


# Create a list of cells and food particles
cell_colors = [(0, 255, 0)]
cells = [Cell(random.randint(CELL_RADIUS, WIDTH - CELL_RADIUS), random.randint(CELL_RADIUS, HEIGHT - CELL_RADIUS), color)
         for color in cell_colors]
daughter_cells = []  # daughter_cells (Created by reproduction)
foods = [Food(random.randint(CELL_RADIUS, WIDTH - CELL_RADIUS), random.randint(CELL_RADIUS, HEIGHT - CELL_RADIUS)) for _ in range(250)]

# Placeholders for loss data and visual field data
loss_data = [0.0] * 4
visual_field_data = [np.zeros((2 * VISION_FIELD_SIZE + 1, 2 * VISION_FIELD_SIZE + 1))] * 4

BOUNDARY_WIDTH = 2
BOUNDARY_HEIGHT = 2

# Main loop
running = True
episode = 0
frames_since_last_send = 0
while running:
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            running = False

    # Clear the screen
    screen.fill(BG_COLOR)
            # Draw boundaries
    pygame.draw.rect(screen, BOUNDARY_COLOR, (0, 0, WIDTH, BOUNDARY_WIDTH))  # Top boundary
    pygame.draw.rect(screen, BOUNDARY_COLOR, (0, HEIGHT - BOUNDARY_WIDTH, WIDTH, BOUNDARY_WIDTH))  # Bottom boundary
    pygame.draw.rect(screen, BOUNDARY_COLOR, (0, 0, BOUNDARY_WIDTH, HEIGHT))  # Left boundary
    pygame.draw.rect(screen, BOUNDARY_COLOR, (WIDTH - BOUNDARY_WIDTH, 0, BOUNDARY_WIDTH, HEIGHT))  # Right boundary

    frames_since_last_send += 1
    # Iterate through cells
    all_cells = cells + daughter_cells
    # Iterate through cells
    for i, cell in enumerate(all_cells):
        # Update the cell's vision field
        cell.update_vision_field(foods, cells)

        # Get the current state (cell's vision field)
        state = cell.vision_field

        # Choose an action using epsilon-greedy policy
        action = cell.act(state)

        # Save current state
        current_x, current_y = cell.x, cell.y

        # Move the cell
        cell.move(action)
        reward = 0
        # Check for boundary collision
        if current_x != cell.x or current_y != cell.y:
            reward -= 0.2  # Negative reward for hitting the boundary

        for food in foods:
            distance = ((cell.x - food.x) ** 2 + (cell.y - food.y) ** 2) ** 0.5
            if (distance < (cell.size + FOOD_SIZE) / 2 + food.radius):  # Updated condition
                foods.remove(food)  # Remove eaten food
                if cell.size < 5.1:
                    cell.size += 0.1  # Increase the size when feeding
                cell.lifespan += 50
                reward += 1.0


        # Living Penalty
        reward -= LIVING_PENALTY
        # Border dwelling penalty
        if (cell.x > WIDTH - 5 or cell.y > HEIGHT - 5 or cell.x < 5 or cell.y < 5):
            reward -= 0.1

        # Check for reproduction
       # if cell.size > 2.1 and cell.mother_cell:
       #     reward += 7  # Reward for reproduction

       #     cell.size /= 2.0  # Reduce the size by half
        #    new_cell1 = Cell(cell.x + 10, cell.y + 10, cell.color, mother_cell=False)
       #     new_cell2 = Cell(cell.x - 10, cell.y - 10, cell.color, mother_cell=False)
      ##      daughter_cells.extend([new_cell1, new_cell2])

        # Remember the experience
        next_state = cell.vision_field
        cell.remember(state, action, reward, next_state, len(foods) == 0)

        # Handle cell death (In Backup)
        cell.frames_without_food += 1
        if cell.frames_without_food >= cell.frames_to_survive:
            cell.frames_without_food = 0  # Reset the frame counter
            #food_radius = cell.size * 2
            reward -= 2
            cell.size = 1
            cell.x = WIDTH/2
            cell.y = HEIGHT/2

           # for _ in range(10):  # Spawn 10 food particles
          #      new_food_x = random.randint(int(cell.x - food_radius), int(cell.x + food_radius))
           #     new_food_y = random.randint(int(cell.y - food_radius), int(cell.y + food_radius))
           #     foods.append(Food(new_food_x, new_food_y))  # Spawn food particles
        # Draw the cell
        cell.draw()

        # Replay and update target network

        if episode % 50 == 0:
            cell.replay()
        if episode % 1000 == 0:
            cell.update_target_network()

        # Save model data for daughter cells
        if cell.mother_cell and episode % 100 == 0:
            color_string = f"({cell.color[0]}, {cell.color[1]}, {cell.color[2]})"
            model_filename = f"cell_brain_{color_string}.h5"
            cell.q_network.save(model_filename)
            print(f"Saved a model checkpoint for daughter cell {color_string}")
        # Collect loss data and visual field data for the dashboard
            # Check if 'loss' exists in the history
        if(cell.mother_cell):
            if hasattr(cell.q_network.history, 'history') and 'loss' in cell.q_network.history.history:
                print("Was true!")
                loss_data[i] = cell.q_network.history.history['loss'][0]
                print(cell.q_network.history.history['loss'][0])
            else:
                loss_data[i] = loss_data[i]

            visual_field_data[i] = cell.vision_field
            print("loss data format in iteration")
            print(loss_data)

        # Spawn a new food particle periodically
        if episode % 10 == 0:
            new_food_x = random.randint(CELL_RADIUS, WIDTH - CELL_RADIUS)
            new_food_y = random.randint(CELL_RADIUS, HEIGHT - CELL_RADIUS)
            foods.append(Food(new_food_x, new_food_y))

        # Increase frames alive
        cell.frames_alive += 1


    # Draw food particles
    for food in foods:
        food.draw()

    # Update the display
    pygame.display.flip()


    # Send data to the dashboard every 3 frames
    if frames_since_last_send >= 3:
        print(loss_data)
        print("loss format")
        send_data_to_dashboard(episode, loss_data, visual_field_data)
        frames_since_last_send = 0

    # Cap the frame rate
    pygame.time.delay(50)

    episode += 1
# Close the dashboard socket when done
dashboard_socket.close()
# Quit Pygame
pygame.quit()

ConnectionRefusedError: ignored