In [None]:
import numpy as np
import random
%matplotlib inline
from IPython.display import HTML
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
from matplotlib.animation import FuncAnimation

In [None]:
class Trader:
    """Represents a single trader on the grid."""  
    BUYER, INACTIVE, SELLER = 1, 0, -1 
    def __init__(self):
        self.state = self.INACTIVE

    def activate(self, new_state: int):
        self.state = new_state
    
    def deactivate(self):
        self.state = self.INACTIVE


In [None]:
class Market:
    """Represents a grid of traders."""
    def __init__(self, width: int, height: int):
        self.width = width
        self.height = height
        self.grid = np.array([[Trader() for _ in range(width)] for _ in range(height)])

    def random_activation(self, p: float):
        """Randomly activate a fraction p of traders in the grid."""
        BUYER, SELLER = 1, -1
        total_traders = self.width * self.height
        num_active_traders = int(p * total_traders)

        indices = np.random.choice(total_traders, num_active_traders, replace=False)
        for index in indices:
            x, y = divmod(index, self.width)
            self.grid[x, y].activate(np.random.choice([BUYER, SELLER]))

    def get_neighbors(self, x: int, y: int):
        """Get the neighbors of a cell at position (x, y) using Von Neumann neighborhood."""
        directions = [(-1, 0), (1, 0), (0, -1), (0, 1)]
        return [
            ((x + dx) % self.height, (y + dy) % self.width)
            for dx, dy in directions
        ]

        return [
            (nx, ny)
            for dx, dy in neumann_directions
            if 0 <= (nx := x + dx) < self.height and 0 <= (ny := y + dy) < self.width
        ]

    def retrieve_trader_states(self):
        """Retrieve the current states of all traders in the grid."""
        return np.vectorize(lambda trader: trader.state)(self.grid)

    def update_trader_states(self, updates):
        """Apply updates to the market."""
        for x, y, new_state in updates:
            self.grid[x, y].state = new_state


In [37]:
m1 = Market(20,20)
m1.retrieve_trader_states().shape

(20, 20)

In [11]:
class PercolationDynamics:
    def __init__(self, market, ph: float, pe: float, pd: float):
        self.market = market
        self.ph = ph  # Probability to activate inactive neighbors
        self.pe = pe  # Probability to activate an inactive trader
        self.pd = pd  # Probability to deactivate a trader

    def trader_update(self, grid, x: int, y: int):
        """
        Determine the state updates for a trader at position (x, y).
        Returns a list of updates [(x, y, new_state)].
        """
        BUYER, INACTIVE, SELLER = 1, 0, -1
        updates = []
        trader = grid[x][y]

        if trader.state == INACTIVE:
            if np.random.random() < self.pe:
                new_state = np.random.choice([BUYER, SELLER])
                updates.append((x, y, new_state))
        else:
            neighbors = self.market.get_neighbors(x, y)
            inactive_neighbors = [
                (nx, ny) for nx, ny in neighbors if grid[nx][ny].state == INACTIVE
            ]

            # Activate inactive neighbors with a probability `ph`
            updates.extend(
                (nx, ny, np.random.choice([BUYER, SELLER]))
                for nx, ny in inactive_neighbors
                if np.random.random() < self.ph
            )

            # Deactivate the trader with a probability proportional to inactive neighbors
            if np.random.random() < self.pd * len(inactive_neighbors):
                updates.append((x, y, INACTIVE))

        return updates

    def log_all_updates(self):
        grid = self.market.grid
        all_updates = []

        for x in range(self.market.height):
            for y in range(self.market.width):
                all_updates.extend(self.trader_update(grid, x, y))

        return all_updates


In [109]:
def animate_market(market, dynamics, num_steps=100):
    """
    Animates the market dynamics over a given number of steps.

    Args:
        market (Market): The Market instance representing the grid of traders.
        dynamics (PercolationDynamics): The dynamics controlling the trader behavior.
        num_steps (int): Number of steps to animate.

    Returns:
        HTML: HTML object containing the animation video.
    """
    # Initialize the grid states for animation
    grid_states = []

    # Set up the figure and axes for grid and line plots
    fig, (ax, ax_line) = plt.subplots(1, 2, figsize=(16, 8))

    # Initialize grid plot with the correct shape
    initial_state = market.retrieve_trader_states()
    
    grid_plot = ax.imshow(
        initial_state, cmap="coolwarm", vmin=-1, vmax=1
    )
    ax.set_title("Cellular Automata (Grid)")
    ax.axis("off")

    # Set up the line plot for tracking buyers, sellers, and total participants
    buyers_line, = ax_line.plot([], [], label="Buyers (1)", color="blue")
    sellers_line, = ax_line.plot([], [], label="Sellers (-1)", color="red")
    inactive_line, = ax_line.plot([], [], label="Inactive (0)", color="gray")

    ax_line.set_title("Market Participation Over Time")
    ax_line.set_xlim(0, num_steps)
    ax_line.set_ylim(0, market.width * market.height)  # Max total participants
    ax_line.set_xlabel("Steps")
    ax_line.set_ylabel("Count")
    ax_line.legend()

    # Arrays to store counts over time
    buyers_counts = []
    sellers_counts = []
    inactive_counts = []

    def update(frame):
        """Update function for each animation frame."""
        # Log updates and apply them to the market
        updates = dynamics.log_all_updates()
        market.update_trader_states(updates)
        grid_state = market.retrieve_trader_states()
        grid_states.append(grid_state)
        grid_plot.set_array(grid_state)

        buyers_count = np.sum(grid_state == 1)
        sellers_count = np.sum(grid_state == -1)
        inactive_count = (market.width * market.height) - (buyers_count + sellers_count)

        buyers_counts.append(buyers_count)
        sellers_counts.append(sellers_count)
        inactive_counts.append(inactive_count)

        buyers_line.set_data(np.arange(len(buyers_counts)), buyers_counts)
        sellers_line.set_data(np.arange(len(sellers_counts)), sellers_counts)
        inactive_line.set_data(np.arange(len(inactive_counts)), inactive_counts)

        max_count = max(max(buyers_counts), max(sellers_counts), max(inactive_counts)) 
        ax_line.set_ylim(0, max(max_count, 1.1 * (market.width * market.height)) / 2)
        return grid_plot, buyers_line, sellers_line, inactive_line



    ani = FuncAnimation(fig, update, frames=num_steps, interval=200, blit=True)
    # Display the animation in the notebook
    plt.close(fig)  # Prevent duplicate static figures in notebooks
    return HTML(ani.to_html5_video())


In [112]:
pd = 0.05
pe = 0.0001
ph = 0.0485
width, height = 20, 20
market = Market(width, height)
market.random_activation(0.75)
dynamics = PercolationDynamics(market, ph=ph, pd=pd, pe=pe)
animate_market(market, dynamics, 100)