Negotiation environment - simulates a buyer–seller interaction over a fixed number of rounds and captures essential elements such as turns, offers, and rewards

In [None]:
class Negotiation_RL_Agent(gym.Env):
    
    def __init__(self, max_rounds, buyer_max_amt, 
                 seller_min_amt, initial_selling_price, 
                 gamma_seller, gamma_buyer, max_increase_rate):
        
        super(Negotiation_RL_Agent, self).__init__()
        
        
        self.max_rounds = max_rounds
        self.buyer_max_amt = buyer_max_amt
        self.seller_min_amt = seller_min_amt
        self.initial_selling_price = initial_selling_price
        self.gamma_seller = gamma_seller
        self.gamma_buyer = gamma_buyer
        self.max_increase_rate = max_increase_rate
        
        # Define the observation space: [current_offer, round, turn, deal_status]
        low = np.array([seller_min_amt, 0, 0, 0], dtype=np.float32)
        high = np.array([buyer_max_amt, max_rounds, 1, 1], dtype=np.float32)
        self.observation_space = spaces.Box(low=low, high=high, dtype=np.float32)
        
        self.action_space = spaces.Discrete(3)  # Placeholder for action space.
        
        self.init_buyer_price = 0
        self.final_price = 0
        
        
    def reset(self):
        self.round = 0
        
        # Initialize the current offer to the initial selling price.
        self.current_offer = self.initial_selling_price
        self.turn = 0
        self.deal_status = False
        return self.get_observation()
        
    
    def step(self,action):
        
        done = False
        info = {}
        
        reward = {'buyer': 0, 'seller': 0}
        
        if self.round == 1:
            self.init_buyer_price = self.current_offer
        
        # Check if maximum rounds are reached or not
        if self.round >= self.max_rounds:
            done = True
            reward['buyer'] = -10
            reward['seller'] = -10  
            return self.get_observation(), reward, done, info
        
        if action == 1:
            
            # Accept the offer; negotiation ends
            done = True
            self.deal_status = True
            self.final_price = self.current_offer
            reward['seller'] = self.gamma_seller * (self.initial_selling_price - self.final_price)
            print(f"Init Buyer Price: {self.init_buyer_price}")
            print(f"Initial Price: {self.initial_selling_price}")
            print(f"Final Price: {self.final_price}")
            reward['buyer'] = self.gamma_buyer * (self.final_price - self.init_buyer_price)
            
        elif action == 2:
            
            # Reject the negotiation; negotiation ends
            done = True
            reward['buyer'] = -5
            reward['seller'] = -5
            
        elif isinstance(action, tuple) and action[0] == 0:
            # Counteroffer action
            new_offer = action[1]
            
            if self.turn == 0:
                if new_offer >= self.current_offer or new_offer < self.seller_min_amt:
                    # Invalid counteroffer by buyer: penalize and do not update offer.
                    reward['buyer'] = -3
                else:
                    self.current_offer = new_offer
                    self.turn = 1 # Switch turn to seller
                    reward['buyer'] = -1
                    reward['seller'] = -1   
                    self.round += 1
             
            # Seller is negotiating and they should propose a new price higher than current_offer, up to buyer_max_amt.       
            elif self.turn == 1:
                if new_offer <= self.current_offer or new_offer > self.buyer_max_amt or new_offer > self.initial_selling_price:
                    reward['seller'] = -3
                else:
                    self.current_offer = new_offer
                    self.turn = 0 # Switch turn to buyer
                    reward['buyer'] = -1
                    reward['seller'] = -1   
                    self.round += 1
                
        else: 
            
            # Unrecognized action
            reward['buyer'] = -2
            reward['seller'] = -2   
            
        if self.round >= self.max_rounds:
            done = True
            reward['buyer'] = -10
            reward['seller'] = -10
            
        return self.get_observation(), reward, done, info
    
    
    def get_observation(self):
        return np.array([self.current_offer, self.round, self.turn, int(self.deal_status)], dtype=np.float32)
    
    def render(self, mode='human'):
        turn_str = "Buyer" if self.turn == 0 else "Seller"
        print(f"Round: {self.round}, Turn: {turn_str}, Current Offer: {self.current_offer}, Deal Status: {self.deal_status}")
        
if __name__ == "__main__":
    
    in_selling_price = np.random.randint(800, 1500)
    print(f"Initial Selling Price: {in_selling_price}")
    Negotiation_RL_env = Negotiation_RL_Agent(max_rounds=10, buyer_max_amt=1500, seller_min_amt=500, initial_selling_price=in_selling_price, gamma_seller=1.0, gamma_buyer=1.0, max_increase_rate = 0.05)
    state = Negotiation_RL_env.reset()
    Negotiation_RL_env.render()
    
    human_buyer_mode = True  # Buyer is controlled by a human.
    done = False
    
    
    while not done:
        if state[2] == 0 and human_buyer_mode:
            inp = input("Buyer - Enter action: accept (1), reject (2), or counteroffer (0 new_price): ")
            parts = inp.strip().split()
            if parts[0] == "0":
                if len(parts) < 2:
                    print("Please provide a new price for your counteroffer.")
                    continue
                try:
                    new_price = float(parts[1])
                except:
                    print("Invalid price. Try again.")
                    continue
                action = (0, new_price)
            elif parts[0] in ["1", "2"]:
                action = int(parts[0])
            else:
                print("Invalid input. Try again.")
                continue
        else:
            # When it's seller's turn, generate a counteroffer automatically.
            if state[2] == 1:
                # Seller should propose a new price strictly higher than current_offer but no more than buyer_max_amt.
                low_bound = state[0] + 1  # must be greater than current_offer
                allowed_increase = max(1, int(state[0] * Negotiation_RL_env.max_increase_rate))
                # high_bound = Negotiation_RL_env.buyer_max_amt
                high_bound = min(state[0] + allowed_increase, Negotiation_RL_env.initial_selling_price, Negotiation_RL_env.buyer_max_amt)
                # if low_bound >= high_bound:
                if low_bound > high_bound:
                    new_price = int(state[0])
                else:
                    new_price = np.random.randint(low_bound, high_bound + 1)
                action = (0, new_price)
                print(f"Seller proposes counteroffer: {new_price}")
            else:
                action = Negotiation_RL_env.action_space.sample()
        
        state, reward, done, info = Negotiation_RL_env.step(action)
        Negotiation_RL_env.render()
        print("Reward:", reward)
    
    print("Negotiation ended.")

# The code above is a simulation of a negotiation environment with a buyer and seller agent.

Initial Selling Price: 1294
Round: 0, Turn: Buyer, Current Offer: 1294, Deal Status: False
Round: 0, Turn: Buyer, Current Offer: 1294, Deal Status: False
Reward: {'buyer': -5, 'seller': -5}
Negotiation ended.


In [None]:
class Nego_Trial(gym.Env):
    def __init__(self, seller_min_amt, max_rounds=10, initial_selling_price=None, 
                 gamma_seller=1.0, gamma_buyer=1.0):
        """
        Parameters:
          seller_min_amt: The seller's cost (floor)—the seller will not sell below this.
          max_rounds: Maximum negotiation rounds.
          initial_selling_price: The seller's target price (ceiling for seller counteroffers).
          gamma_seller: Reward multiplier for seller on acceptance.
          gamma_buyer: Reward multiplier for buyer on acceptance.
        """
        super(Nego_Trial, self).__init__()
        self.max_rounds = max_rounds
        self.seller_min_amt = seller_min_amt
        if initial_selling_price is None:
            initial_selling_price = np.random.randint(2000, 10000)
        self.initial_selling_price = initial_selling_price
        self.gamma_seller = gamma_seller
        self.gamma_buyer = gamma_buyer
        
        # Observation space: [current_offer, round, turn, deal_status]
        low = np.array([seller_min_amt, 0, 0, 0], dtype=np.float32)
        high = np.array([self.initial_selling_price, max_rounds, 1, 1], dtype=np.float32)
        self.observation_space = spaces.Box(low=low, high=high, dtype=np.float32)
        
        # Action space is conceptual:
        # Accept: 1, Reject: 2, Counteroffer: (0, new_price)
        self.action_space = spaces.Discrete(3)  # placeholder
        
    def reset(self):
        self.round = 0
        # Start negotiation at the target price.
        self.current_offer = self.initial_selling_price
        # For seller, set the last counteroffer to the initial target.
        self.last_seller_offer = self.initial_selling_price
        self.turn = 0  # 0: Buyer, 1: Seller.
        self.deal_status = False
        return self.get_observation()
    
    def step(self, action):
        done = False
        info = {}
        reward = {'buyer': 0, 'seller': 0}
        
        if self.round >= self.max_rounds:
            done = True
            reward['buyer'] = -10
            reward['seller'] = -10  
            return self.get_observation(), reward, done, info
        
        if action == 1:
            # Accept: negotiation ends.
            done = True
            self.deal_status = True
            final_price = self.current_offer
            reward['seller'] = self.gamma_seller * (final_price - self.seller_min_amt)
            reward['buyer']  = self.gamma_buyer * (self.initial_selling_price - final_price)
        elif action == 2:
            # Reject: negotiation ends.
            done = True
            reward['buyer'] = -5
            reward['seller'] = -5
        elif isinstance(action, tuple) and action[0] == 0:
            new_offer = action[1]
            if self.turn == 0:
                # Buyer’s counteroffer must be strictly lower than current_offer and at least seller_min_amt.
                if new_offer >= self.current_offer or new_offer < self.seller_min_amt:
                    reward['buyer'] = -3  # Invalid counteroffer.
                else:
                    self.current_offer = new_offer
                    self.turn = 1  # Pass turn to seller.
                    reward['buyer'] = -1
                    reward['seller'] = -1
                    self.round += 1
            elif self.turn == 1:
                # Seller’s counteroffer: must be strictly higher than buyer's offer (current_offer).
                # Additionally, seller's new offer must not exceed his last counteroffer.
                if new_offer <= self.current_offer or new_offer > self.last_seller_offer:
                    reward['seller'] = -3  # Invalid counteroffer.
                else:
                    self.current_offer = new_offer
                    self.last_seller_offer = new_offer  # Update seller's last offer.
                    self.turn = 0  # Pass turn to buyer.
                    reward['buyer'] = -1
                    reward['seller'] = -1
                    self.round += 1
        else:
            reward['buyer'] = -2
            reward['seller'] = -2
        
        if self.round >= self.max_rounds:
            done = True
            reward['buyer'] = -10
            reward['seller'] = -10
            
        return self.get_observation(), reward, done, info
    
    def get_observation(self):
        return np.array([self.current_offer, self.round, self.turn, int(self.deal_status)], dtype=np.float32)
    
    def render(self, mode='human'):
        turn_str = "Buyer" if self.turn == 0 else "Seller"
        print(f"Round: {self.round}, Turn: {turn_str}, Current Offer: {self.current_offer}, Deal Status: {self.deal_status}")

In [None]:
if __name__ == "__main__":
    # Generate a random initial selling price between 2000 and 10000.
    random_initial_price = np.random.randint(2000, 10000)
    print(f"Initial Selling Price (target): {random_initial_price}")
    # For example, set seller_min_amt = 1500.
    env = Nego_Trial(seller_min_amt=1500, max_rounds=10, initial_selling_price=random_initial_price,
                     gamma_seller=1.0, gamma_buyer=1.0)
    
    state = env.reset()
    env.render()
    
    human_buyer_mode = True  # Buyer is controlled by a human.
    done = False
    
    while not done:
        if state[2] == 0 and human_buyer_mode:
            inp = input("Buyer - Enter action: accept (1), reject (2), or counteroffer (0 new_price): ")
            parts = inp.strip().split()
            if parts[0] == "0":
                if len(parts) < 2:
                    print("Please provide a new price for your counteroffer.")
                    continue
                try:
                    new_price = int(float(parts[1]))
                except:
                    print("Invalid price. Try again.")
                    continue
                action = (0, new_price)
            elif parts[0] in ["1", "2"]:
                action = int(parts[0])
            else:
                print("Invalid input. Try again.")
                continue
        else:
            if state[2] == 1:
                # Seller's turn: automatically generate a counteroffer.
                # The seller must propose a counteroffer strictly greater than the current_offer but not above the last seller offer.
                low_bound = int(state[0]) + 1  # must be greater than the current offer.
                high_bound = env.last_seller_offer  # Seller cannot go higher than his previous counteroffer.
                if low_bound > high_bound:
                    new_price = int(state[0])
                else:
                    new_price = np.random.randint(low_bound, high_bound + 1)
                action = (0, new_price)
                print(f"Seller proposes counteroffer: {new_price}")
            else:
                action = env.action_space.sample()
        
        state, reward, done, info = env.step(action)
        env.render()
        print("Reward:", reward)
    
    print("Negotiation ended.")

Initial Selling Price (target): 2318
Round: 0, Turn: Buyer, Current Offer: 2318, Deal Status: False


KeyboardInterrupt: Interrupted by user