# Stock Market Environment

In [43]:
# https://pettingzoo.farama.org/content/environment_creation/
from __future__ import annotations

import functools
import math
from typing import Any, Dict, Optional, Sequence, Tuple, Union

import gymnasium
import numpy as np
from gymnasium.spaces import Box, Discrete
from gymnasium.spaces import Tuple as TupleSpace
from gymnasium.utils import seeding
from pettingzoo.utils.env import AECEnv, ParallelEnv
from pettingzoo.utils.agent_selector import agent_selector

from src.environment.stock_market import StockMarketEnv

In [2]:
env = StockMarketEnv(num_agents=10)
env.reset()

(array(100.),
 {'correlated_stocks': array([ 63.83968111,   1.        ,  40.63328277,  35.98764514,
         105.96758765, 102.48256775,  46.35390938, 134.59696041,
          46.45290038, 116.80422879,   1.        , 146.53469398,
         172.25595875,   1.        ,  47.27094134, 197.15284372,
          15.98519562,   1.        ,   7.15286487]),
  'uncorrelated_stocks': array([272.42225093, 217.55926924,  84.11957443, 104.03986975,
         107.94425404, 155.54074355,  29.3089015 , 208.01105892,
         180.54239694, 193.42114659]),
  'budgets': array([6529.216 , 9136.279 , 5085.9067, 6105.6934, 9710.354 , 7322.015 ,
         6359.48  , 5481.886 , 5643.1816, 9357.217 ], dtype=float32),
  'shares': array([27735, 81585, 67087,   274, 39415, 85740, 55431,  3359, 76489,
         72965]),
  'valid_mask': array([[False,  True,  True,  True,  True,  True,  True,  True,  True,
           True,  True,  True,  True,  True,  True,  True,  True,  True,
           True,  True,  True,  True,  True,

## Design Idea

- The shared full state of the environment is a numpy array of shape [2, $N_{stocks}$]
- The local observations are simple vector multiplication between share states and masks
- Each agent has their own initial holdings and budget limit.

In [46]:
class StockMarket(ParallelEnv):
    metadata = {"render_modes": ["human"], "name": "stock_market_v2"}

    def __init__(self,
                 num_agents: int,
                 num_company: int = 5,
                 num_correlated_stocks: int = 19,
                 num_uncorrelated_stocks: int = 10,
                 max_shares: int = 100000,
                 start_prices: Union[float, Sequence[float]] = 100.0,
                 budget_range: Tuple[float, float] = (100.0, 10000.0),
                 budget_discount: float = 0.99,
                 step_size: float = 1.0,
                 price_std: float = 100.0,
                 noise_std: float = 10.0,
                 worth_of_stocks: float = 0.1,
                 seed: int = 0) -> None:
        super().__init__()

        # Agent parameters
        self.num_company = num_company
        self.budget_range = budget_range
        self.budget_discount = budget_discount
        self.max_shares = max_shares,
        self.worth_of_stocks = worth_of_stocks
        self.possible_agents = [f'agent_{i}' for i in range(num_agents)]
        self.agents = self.possible_agents[:]  # Just to follow AECEnv
        self._index_map = {
            name: idx for idx, name in enumerate(self.possible_agents)
        }

        # Stock market parameters
        self.dt = step_size
        self.start_prices = start_prices
        self.price_std = price_std
        self.noise_std = noise_std
        self.num_correlated_stocks = num_correlated_stocks
        self.num_uncorrelated_stocks = num_uncorrelated_stocks
        self.num_stocks = num_correlated_stocks + \
                          num_uncorrelated_stocks + 1
        self.seed()
        self._reset_market()

        # Observation and Action Spaces
        self._observation_spaces: Dict[str, Any] = {}
        self._action_spaces: Dict[str, Any] = {}
        for agent in self.agents:
            self._observation_spaces[agent] = Box(
                low=1.0,
                high=+float("inf"),
                shape=(self.num_stocks, )
            )
            self._action_spaces[agent] = TupleSpace((
                Box(low=1.0,  # raw price, with no logarithm
                    high=+float("inf"),
                    shape=(1, )),
                Discrete(2 * max_shares + 1, start=-max_shares)
            ))
        self._state_space = Box(
            low=1.0,  # minimum price is $1.0
            high=+float("inf"),
            shape=(1 + num_correlated_stocks + num_uncorrelated_stocks,),
            dtype=np.float32
        )
        
        self._agent_selector = agent_selector(self.agents)

    def observation_space(self, agent: str) -> gymnasium.spaces.Space:
        return self._observation_spaces[agent]

    def action_space(self, agent: str) -> gymnasium.spaces.Space:
        return self._action_spaces[agent]
    
    def reset(self,
              seed: Optional[int] = None,
              return_info: bool = False,
              options: Optional[Dict] = None) -> Dict[str, Any]:
        if seed is not None:
            self.seed(seed)
        self._reset_market()
        return {
            agent:self.state() * self.valid_mask[agent]
            for agent in self.agents
        }
        

    def seed(self, seed: Optional[int] = None) -> None:
        self._np_rng, seed = seeding.np_random(seed)
    
    def state(self) -> np.ndarray:
        return np.hstack([
            self.current_price,
            self._corr_prices,
            self._uncorr_prices
        ])
    
    def step(self,
             actions: Dict[str, Tuple[np.ndarray, np.ndarray]]
             ) -> Tuple[Dict, Dict, Dict, Dict, Dict]:
        _prop_prices = np.hstack([action[0] for action in actions.values()])
        _prop_shares = np.asarray([action[1] for action in actions.values()],
                                  dtype="float32")
        _prev_price = self.current_price

        # Execute stock market clearing
        profits, delta_shares, close, volatility = self._clear(
            _prop_prices.copy(), _prop_shares.copy(), self.current_price
        )

        # Update correlated stock prices
        diff = self.current_price - _prev_price
        diffs = diff / _prev_price * self._corr_prices + \
            self._np_rng.normal(loc=0,
                                scale=self.noise_std * volatility,
                                size=(self.num_correlated_stocks,))
        self._corr_prices = np.clip(
            self._corr_prices + diffs,
            a_min=1.0, a_max=None
        )

        # Update uncorrelated stock prices
        self._uncorr_prices = np.clip(
            self._uncorr_prices + self._np_rng.normal(
                loc=0, scale=self.price_std,
                size=(self.num_uncorrelated_stocks)
            ),
            a_min=1.0, a_max=None
        )
        self.budgets = self.budgets + profits
        self.shares = self.shares + delta_shares
        c = 1.0 + self.budget_discount * self.budgets + \
            self.shares * self.current_price * self.worth_of_stocks
        
        # Assert if violating the budget constraint
        potential_budget = self.budgets - _prop_prices * _prop_shares
        potential_share_held = self.shares + _prop_shares
        violations = (potential_budget < 0.0) + (potential_share_held < 0.0)
        rewards = {
            agent: -100.0 if violate else self.utility(c[i], self.eta[i])
            for i, (agent, violate) in enumerate(zip(self.agents, violations))
        }

        self.timestep += 1
        done = {agent: True if rewards[agent] < 0.0 else False
                for agent in self.agents}
        next_obs = {
            agent: self.state() * self.valid_mask[agent]
            for agent in self.agents
        }
        if self.timestep >= self.ep_len or any(list(done.values())):
            next_obs = self.reset()
            done = {agent: True for agent in self.agents}
        
        return next_obs, rewards, done, None, {}

    def _clear(self,
               proposed_prices: Dict[str, np.ndarray],
               proposed_shares: Dict[str, np.ndarray],
               close: np.ndarray) -> Tuple:
        # The standard deviation of share_prices will determine
        # correlated stock standard deviation
        volatility = 1.
        share_prices = []
        n = self.num_agents

       # Now randomly order each
        b = np.sum(proposed_shares > 0)
        s = np.sum(proposed_shares < 0)
        bid_indices = self._np_rng.permutation(b)
        seller_indices = self._np_rng.permutation(s)
        bidder_prices = proposed_prices[proposed_shares > 0]
        seller_prices = proposed_prices[proposed_shares < 0]
        bidder_shares_left = np.copy(proposed_shares[proposed_shares > 0])
        seller_shares_left = np.abs(np.copy(proposed_shares[proposed_shares < 0]))
        bid_profits = np.zeros((b))
        seller_profits = np.zeros((s))
        delta_bid_shares = np.zeros((b))
        delta_ask_shares = np.zeros((s))

        i = 0
        while i < b and seller_indices.size > 0:
            bid_idx = bid_indices[i]
            bid_price, bid_vol = bidder_prices[bid_idx], bidder_shares_left[bid_idx]
            to_delete = []
            m = seller_indices.shape[0]
            for j in range(m):
                ask_idx = seller_indices[j]
                ask_price, ask_vol = seller_prices[ask_idx], seller_shares_left[ask_idx]
                if bid_price >= ask_price:  # this may change when adding market maker
                    close = ask_price  # sellers are technically last in transaction even with market markers
                    share_prices.append(bidder_prices[bid_idx]) # This allows us to calculate market volatility
                    share_prices.append(seller_prices[ask_idx])
                    if bid_vol < ask_vol:
                        seller_shares_left[ask_idx] -= bid_vol
                        bidder_shares_left[bid_idx] = 0.
                        delta_bid_shares[bid_idx] += bid_vol
                        delta_ask_shares[ask_idx] -= bid_vol
                        bid_profits[bid_idx] -= bid_vol * bid_price
                        seller_profits[ask_idx] += bid_vol * ask_price
                        break
                    elif bid_vol > ask_vol:
                        seller_shares_left[ask_idx] = 0.
                        bidder_shares_left[bid_idx] -= ask_vol
                        delta_bid_shares[bid_idx] += ask_vol
                        delta_ask_shares[ask_idx] -= ask_vol
                        bid_vol -= ask_vol
                        to_delete.append(j)
                        bid_profits[bid_idx] -= ask_vol * bid_price
                        seller_profits[ask_idx] += ask_vol * ask_price
                    else:
                        seller_shares_left[ask_idx] = 0.
                        bidder_shares_left[bid_idx] = 0.
                        delta_bid_shares[bid_idx] += bid_vol
                        delta_ask_shares[ask_idx] -= ask_vol
                        to_delete.append(j)
                        bid_profits[bid_idx] -= bid_vol * bid_price
                        seller_profits[ask_idx] += ask_vol * ask_price
                        break
            if to_delete:
                seller_indices = np.delete(
                    seller_indices, np.asarray(to_delete))
            i += 1

        if share_prices:
            volatility += np.std(np.asarray(share_prices))

        profits = np.zeros((n))
        delta_shares = np.zeros((n))
        bid_profit_idx = 0
        ask_profit_idx = 0
        for i in range(n):
            if proposed_shares[i] == 0:
                continue
            elif proposed_shares[i] > 0:
                profits[i] = bid_profits[bid_profit_idx]
                delta_shares[i] = delta_bid_shares[bid_profit_idx]
                bid_profit_idx += 1
            else:
                profits[i] = seller_profits[ask_profit_idx]
                delta_shares[i] = delta_ask_shares[ask_profit_idx]
                ask_profit_idx += 1

        return profits, delta_shares, np.asarray(close), volatility
    
    def _reset_market(self) -> None:
        if isinstance(self.start_prices, float):
            self.current_price = self.start_prices
            _other_stocks = np.clip(
                self._np_rng.normal(loc=self.start_prices,
                                    scale=self.price_std,
                                    size=(self.num_correlated_stocks +
                                          self.num_uncorrelated_stocks,)),
                a_min=1.0, a_max=None  # Can never have prices below $1
            )
        else:
            self.current_price = self.start_prices[0]
            _other_stocks = self.start_prices[1:]
        self._corr_prices = _other_stocks[:self.num_correlated_stocks]
        self._uncorr_prices = _other_stocks[-self.num_uncorrelated_stocks:]

        # Randomly initialize masks for agents, format: agent_name, bool_mask
        self.valid_mask = {agent: np.zeros([self.num_stocks,], dtype="bool")
                           for agent in self.agents}
        _idcs = self._np_rng.choice(2, self.num_agents)
        for _idx in _idcs:
            self.valid_mask[self.agents[_idx]][
                1:1+self.num_correlated_stocks] = True
            self.valid_mask[self.agents[_idx]][
                -self.num_uncorrelated_stocks:] = True

        # Initialize budget and holdings
        self.budgets = self.budget_range[0] + self._np_rng.random(
            size=(self.num_agents, ), dtype='float32'
        ) * (self.budget_range[1] - self.budget_range[0])
        self.shares = self._np_rng.integers(low=1,
                                            high=self.max_shares,
                                            size=(self.num_agents,))
        
        # Randomly initialize utility functions
        self.eta = np.clip(
            np.random.normal(loc=1.5, scale=1.5, size=(self.num_agents,)),
            a_min=0, a_max=10
        )
        self.timestep = 0
        self.ep_len = 390

    @staticmethod
    def utility(c: float, eta: float) -> float:
        if eta != 1.0:
            return (c ** (1.0 - eta) - 1.0) / (1.0 - eta)
        else:
            return math.log(c)

env = StockMarket(5)
env.reset()

{'agent_0': array([  0.        ,  14.18634986, 195.30663708,  96.0203079 ,
          1.        ,   3.66224873, 161.91772894, 195.89844083,
        115.00538929,  62.46555497, 344.3207975 ,  95.08870114,
         91.19839787,  62.70912436, 192.74830995, 138.5837778 ,
        183.67039745, 214.37974521,  52.60015283, 184.35587908,
          1.        , 180.9229266 , 226.06379968,  17.0555419 ,
        108.39648349, 142.53301595,   1.        ,  87.5106152 ,
          1.        ,  65.1027801 ]),
 'agent_1': array([  0.        ,  14.18634986, 195.30663708,  96.0203079 ,
          1.        ,   3.66224873, 161.91772894, 195.89844083,
        115.00538929,  62.46555497, 344.3207975 ,  95.08870114,
         91.19839787,  62.70912436, 192.74830995, 138.5837778 ,
        183.67039745, 214.37974521,  52.60015283, 184.35587908,
          1.        , 180.9229266 , 226.06379968,  17.0555419 ,
        108.39648349, 142.53301595,   1.        ,  87.5106152 ,
          1.        ,  65.1027801 ]),
 'agen

In [47]:
for _ in range(10):
    rng_acs = {agent: env.action_space(agent).sample() for agent in env.agents}
    print(env.step(rng_acs))

({'agent_0': array([  0.        ,  62.53934681,  44.77744928,  74.20531805,
        74.19722346, 140.62647748, 139.72160216, 225.0137742 ,
       162.79276883, 113.44854952, 112.77831316, 108.43250937,
       182.40693185, 241.01306113, 137.03925961, 111.17126428,
        84.62581595, 197.58916117, 119.36060701,   1.        ,
       114.5259473 , 233.41560585, 138.2902134 , 163.86817868,
        17.26707658, 245.19455513,  54.66779195,  30.86743433,
        97.95938254, 221.44676554]), 'agent_1': array([  0.        ,  62.53934681,  44.77744928,  74.20531805,
        74.19722346, 140.62647748, 139.72160216, 225.0137742 ,
       162.79276883, 113.44854952, 112.77831316, 108.43250937,
       182.40693185, 241.01306113, 137.03925961, 111.17126428,
        84.62581595, 197.58916117, 119.36060701,   1.        ,
       114.5259473 , 233.41560585, 138.2902134 , 163.86817868,
        17.26707658, 245.19455513,  54.66779195,  30.86743433,
        97.95938254, 221.44676554]), 'agent_2': array([0.

In [4]:
from pettingzoo.mpe import simple_adversary_v2

env = simple_adversary_v2.parallel_env()
print(env.reset().keys())
# actions = {k:env.action_space(k).sample() for k in env.agents}
# env.step(actions)
help(env.state)

dict_keys(['adversary_0', 'agent_0', 'agent_1'])
Help on method state in module pettingzoo.utils.conversions:

state() method of pettingzoo.utils.conversions.aec_to_parallel_wrapper instance
    Returns the state.
    
    State returns a global view of the environment appropriate for
    centralized training decentralized execution methods like QMIX

