In [1]:
!pip install pandas scikit-learn scipy numpy

Collecting scikit-learn
  Downloading scikit_learn-1.6.1-cp312-cp312-macosx_12_0_arm64.whl.metadata (31 kB)
Collecting joblib>=1.2.0 (from scikit-learn)
  Using cached joblib-1.4.2-py3-none-any.whl.metadata (5.4 kB)
Collecting threadpoolctl>=3.1.0 (from scikit-learn)
  Using cached threadpoolctl-3.5.0-py3-none-any.whl.metadata (13 kB)
Downloading scikit_learn-1.6.1-cp312-cp312-macosx_12_0_arm64.whl (11.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m11.2/11.2 MB[0m [31m67.4 MB/s[0m eta [36m0:00:00[0m
[?25hUsing cached joblib-1.4.2-py3-none-any.whl (301 kB)
Using cached threadpoolctl-3.5.0-py3-none-any.whl (18 kB)
Installing collected packages: threadpoolctl, joblib, scikit-learn
Successfully installed joblib-1.4.2 scikit-learn-1.6.1 threadpoolctl-3.5.0


In [2]:
import mesa
print(mesa.__version__)

3.1.3


In [5]:
from enum import Enum
from datetime import datetime, timedelta

# Define food options with their prices
food_options = {
    "vegetarian": {"price": 20, "cost": 10},  # Cost is 50% of price
    "meat": {"price": 30, "cost": 15},
    "gluten_free": {"price": 25, "cost": 12.5}
}

# Enum to track the status of customer orders
class OrderStatus(Enum):
    WAITING = 0    # Customer hasn't ordered yet
    ORDERED = 1    # Order has been placed
    PREPARING = 2  # Food is being prepared
    SERVED = 3     # Food has been delivered

class Table:
    """Represents a restaurant table with fixed capacity and customer tracking"""
    def __init__(self, table_id):
        self.table_id = table_id
        self.seats = 4                # Fixed number of seats per table
        self.occupied_seats = 0       # Current number of occupied seats
        self.customers = []           # List of customers at this table

    def is_available(self):
        """Check if table has any available seats"""
        return self.occupied_seats < self.seats

    def add_customer(self, customer):
        """
        Attempt to seat a customer at this table
        Returns True if successful, False if table is full
        """
        if self.is_available():
            self.customers.append(customer)
            self.occupied_seats += 1
            customer.table = self
            return True
        return False

    def remove_customer(self, customer):
        """Remove customer from table when they leave"""
        if customer in self.customers:
            self.customers.remove(customer)
            self.occupied_seats -= 1
            customer.table = None


In [None]:
class ShiftType(Enum):
    FULL = 8    # 8-hour shift
    HALF = 4    # 4-hour shift

class Shift:
    """Represents a work shift for waiters"""
    def __init__(self, start_time, shift_type):
        self.start_time = start_time
        self.duration = shift_type.value      # Hours
        self.end_time = start_time + timedelta(hours=self.duration)
        self.hourly_wage = 25                 # Euros per hour
        self.total_wage = self.duration * self.hourly_wage

    def is_active(self, current_time):
        """Check if shift is currently active"""
        return self.start_time <= current_time < self.end_time

In [6]:
import mesa
import random

class CustomerAgent(mesa.Agent):
    def __init__(self):
        super().__init__()
        # Initialize customer properties
        self.food_preference = random.choice(list(food_options.keys()))
        self.bill = 0                                 # Amount to pay for food
        self.waiting_time = 0                         # Time spent waiting
        self.order_status = OrderStatus.WAITING       # Current order status
        self.satisfaction = 0                         # Overall satisfaction (0-100)
        self.tip = 0                                  # Amount of tip given
        self.rating = 0                               # Rating given (1-5)
        self.assigned_waiter = None                   # Reference to assigned waiter

        # Table assignment and timing
        self.table = None                             # Assigned table
        self.arrival_time = None                      # Time customer arrived
        self.dining_duration = random.randint(30, 90) # Time to spend at restaurant

    def step(self):
        # Increment waiting time if not served yet
        if self.order_status != OrderStatus.SERVED:
            self.waiting_time += 1

        # Check if customer should leave after finishing meal
        if self.table and self.order_status == OrderStatus.SERVED:
            current_time = self.model.current_time
            if (current_time - self.arrival_time).total_minutes() >= self.dining_duration:
                self.leave_restaurant()

    def order_dish(self, waiter):
        # Place order and calculate bill with waiter if not already ordered
        if self.order_status == OrderStatus.WAITING:
            self.order_status = OrderStatus.ORDERED
            self.assigned_waiter = waiter
            self.bill = food_options[self.food_preference]["price"]
            return self.food_preference

    def rate_and_pay(self):
        # Calculate satisfaction based on waiting time, rating, and payment including tip
        self.satisfaction = max(0, 100 - self.waiting_time)
        self.rating = max(1, self.satisfaction / 20)    # Convert satisfaction to 1-5 star rating
        tip_percentage = self.satisfaction / 200        # 0-50% tip based on satisfaction
        self.tip = self.bill * tip_percentage           # Calculate tip based on satisfaction
        return self.bill + self.tip

    def leave_restaurant(self):
        """Handle customer departure process"""
        if self.table:
            self.rate_experience()      # Rate service before leaving
            self.table.remove_customer(self)
            self.model.remove_customer(self)

In [7]:
class WaiterAgent(mesa.Agent):
    def __init__(self):
        super().__init__()
        # Initialize waiter properties
        self.busy = False                # Current serving status
        self.current_orders = []         # List of (customer, order) tuples
        self.tips = 0                    # Total tips received
        self.total_sales = 0             # Track total sales (not just tips)
        self.avg_rating = 0              # Average rating from customers
        self.ratings_count = 0           # Number of ratings received
        self.served_customers = 0        # Total customers served

        # New shift-related properties
        self.shift = None             # Current assigned shift
        self.total_hours = 0          # Hours worked
        self.earnings = 0             # Total earnings (wages + tips)
        self.active = False           # Whether waiter is currently working

    def assign_shift(self, shift):
        """Assign a new shift to the waiter"""
        self.shift = shift
        self.earnings += shift.total_wage  # Add base wage for shift

    def step(self):
        """Update waiter state based on current time"""
        current_time = self.model.current_time

        # Check if waiter should be active based on shift
        if self.shift and self.shift.is_active(current_time):
            self.active = True
        else:
            self.active = False
            self.busy = False  # Not available for new orders when off shift

    def take_order(self, customer):
        # Take order from customer if waiter is available and is on active shift
        if self.active and not self.busy:
            order = customer.order_dish(self)
            if order:
                self.current_orders.append((customer, order))
                self.busy = True
                return True
        return False

    def serve_dish(self, customer):
        # Serve food to customer and collect feedback
        if customer in [c for c, _ in self.current_orders]:
            customer.order_status = OrderStatus.SERVED
            # Remove served customer from current orders
            self.current_orders = [(c, o) for c, o in self.current_orders if c != customer]
            self.busy = len(self.current_orders) > 0
            self.served_customers += 1

            # Process payment and update metrics
            total_payment = customer.rate_and_pay()
            self.total_sales += total_payment - customer.tip
            self.tips += customer.tip
            self.ratings_count += 1
            self.avg_rating = ((self.avg_rating * (self.ratings_count - 1)) + customer.rating) / self.ratings_count

In [8]:
class ManagerAgent(mesa.Agent):
    def __init__(self, unique_id, model):
        super().__init__(unique_id, model)

        # Initialize food inventory for each type
        self.food_inventory = {food: 100 for food in food_options}

        # Track daily statistics
        self.daily_stats = {
            'total_customers': 0,        # Total customers served
            'avg_waiting_time': 0,       # Average time customers wait
            'active_waiters': 0,         # Number of available waiters
            'revenue': 0,                # Total money from sales
            'food_costs': 0,             # Cost of ingredients used
            'staff_wages': 0,            # Wages paid to staff
            'staff_tips': 0,            # Tips earned by staff
            'total_costs': 0,           # Combined costs
            'profit': 0                 # Net profit
        }

        # Define shift schedules
        morning_start = model.opening_time
        afternoon_start = model.opening_time + timedelta(hours=8)
        evening_start = model.opening_time + timedelta(hours=12)

        # Create shift patterns
        self.shifts = {
            'morning_full': Shift(morning_start, ShiftType.FULL),      # 11:00 - 19:00
            'morning_half': Shift(morning_start, ShiftType.HALF),      # 11:00 - 15:00
            'afternoon_full': Shift(afternoon_start, ShiftType.FULL),  # 19:00 - 23:00
            'afternoon_half': Shift(afternoon_start, ShiftType.HALF),  # 19:00 - 23:00
            'evening_half': Shift(evening_start, ShiftType.HALF)       # 23:00 - 03:00
        }

    def assign_shifts(self):
        """
        Assign shifts to waiters based on expected customer volume
        Ensures coverage during peak hours (12:00-14:00 and 17:00-20:00)
        """
        available_shifts = []

        # Morning shifts (covering lunch rush)
        available_shifts.extend([self.shifts['morning_full']] * 3)  # 3 full-time morning staff
        available_shifts.extend([self.shifts['morning_half']] * 2)  # 2 part-time lunch staff

        # Afternoon/Evening shifts (covering dinner rush)
        available_shifts.extend([self.shifts['afternoon_full']] * 3)  # 3 full-time evening staff
        available_shifts.extend([self.shifts['evening_half']] * 2)    # 2 part-time dinner staff

        # Randomly assign shifts to waiters
        random.shuffle(self.model.waiters)
        for waiter, shift in zip(self.model.waiters, available_shifts):
            waiter.assign_shift(shift)

    def step(self):
        """Update restaurant statistics and manage operations"""
        # Update basic statistics
        self.daily_stats['total_customers'] = len(self.model.customers)
        self.daily_stats['active_waiters'] = len([w for w in self.model.waiters if w.active])

        # Calculate average waiting time
        if self.model.customers:
            self.daily_stats['avg_waiting_time'] = np.mean([c.waiting_time for c in self.model.customers])

        # Calculate staff costs every hour
        if self.model.current_time.minute == 0:
            self.calculate_staff_costs()

        # Calculate profit at end of day
        if self.model.current_time == self.model.closing_time:
            self.calculate_daily_profit()

    def calculate_staff_costs(self):
        """Calculate total wages for all staff"""
        # Sum up wages for all active shifts
        self.daily_stats['staff_wages'] = sum(
            w.shift.total_wage for w in self.model.waiters if w.shift
        )
        # Sum up tips
        self.daily_stats['staff_tips'] = sum(
            w.tips for w in self.model.waiters
        )

    def calculate_daily_profit(self):
        """Calculate end-of-day profit considering all revenues and costs"""
        # Calculate total revenue (sales + tips)
        total_sales = sum(w.total_sales for w in self.model.waiters)
        total_tips = sum(w.tips for w in self.model.waiters)
        self.daily_stats['revenue'] = total_sales + total_tips

        # Calculate food costs based on usage
        self.daily_stats['food_costs'] = sum(
            (100 - amount) * food_options[food]['cost']
            for food, amount in self.food_inventory.items()
        )

        # Calculate total costs
        self.daily_stats['total_costs'] = (
            self.daily_stats['food_costs'] +
            self.daily_stats['staff_wages']
        )

        # Calculate net profit
        self.daily_stats['profit'] = (
            self.daily_stats['revenue'] -
            self.daily_stats['total_costs']
        )

    def order_food(self, food_type, amount):
        """Replenish food inventory"""
        if food_type in self.food_inventory:
            self.food_inventory[food_type] += amount

In [9]:
import numpy as np

class RestaurantModel(mesa.Model):
    def __init__(self, n_waiters, seed=None):
        super().__init__(seed=seed)

        # Restaurant operating hours
        self.opening_time = datetime.strptime("11:00", "%H:%M")
        self.closing_time = datetime.strptime("23:00", "%H:%M")
        self.current_time = self.opening_time
        self.time_step = 5            # Each step represents 5 minutes

        # Initialize manager first to set up shifts
        self.manager = ManagerAgent(self.next_id(), self)

        # Initialize restaurant layout and staff
        self.tables = [Table(i) for i in range(100)]  # Create 100 tables
        self.customers = []                           # Active customers
        self.waiters = [WaiterAgent() for _ in range(n_waiters)]
        self.manager = ManagerAgent()

        # Initialize waiters with unique IDs
        self.waiters = [
            WaiterAgent(self.next_id(), self)
            for _ in range(n_waiters)
        ]

        # Assign initial shifts
        self.manager.assign_shifts()

    def is_peak_hour(self):
        """Check if current time is during peak hours"""
        hour = self.current_time.hour
        return (12 <= hour <= 14) or (17 <= hour <= 20)

    def calculate_new_customers(self):
        """Calculate number of new customers based on time of day"""
        base_rate = 2  # Base arrival rate (non-peak)
        if self.is_peak_hour():
            base_rate = 5  # Increased arrival rate during peak hours
        return np.random.poisson(base_rate)  # Random variation in arrivals

    def find_available_table(self):
        """Find a random table with available seats"""
        available_tables = [t for t in self.tables if t.is_available()]
        return random.choice(available_tables) if available_tables else None

    def add_new_customers(self):
        """Add new customers to the restaurant if tables are available"""
        n_new = self.calculate_new_customers()
        for _ in range(n_new):
            if table := self.find_available_table():
                customer = CustomerAgent()
                customer.arrival_time = self.current_time
                table.add_customer(customer)
                self.customers.append(customer)

    def remove_customer(self, customer):
        """Remove customer from restaurant tracking"""
        if customer in self.customers:
            self.customers.remove(customer)

    def step(self):
        """Advance simulation by one time step"""
        # Update time
        self.current_time += timedelta(minutes=self.time_step)

        # Process restaurant operations during open hours
        if self.opening_time <= self.current_time <= self.closing_time:
            self.add_new_customers()

        # Update all agents
        self.datacollector.collect(self)
        agents = self.customers + self.waiters + [self.manager]
        random.shuffle(agents)
        for agent in agents:
            agent.step()

        # Check closing time
        if self.current_time >= self.closing_time:
            self.running = False

        # Reset shifts at the start of each day
        if self.current_time.hour == 11 and self.current_time.minute == 0:
            self.manager.assign_shifts()