-
Notifications
You must be signed in to change notification settings - Fork 0
/
market.py
173 lines (152 loc) · 4.96 KB
/
market.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import random
from agent import Buyer, Seller
class Market:
"""
A market is the place where Buyers purchases objects from Sellers.
"""
def __init__(
self,
nb_trades_per_day
):
"""
Instantiate the market
:param nb_trades_per_day:
"""
self.sellers = []
self.buyers = []
self.trades = []
self.nb_trades_per_day = nb_trades_per_day
def populate_market(
self,
nb_buyers: int = 0,
nb_sellers: int = 0
):
"""
Add random Buyers and Sellers to the market.
:param nb_buyers:
:param nb_sellers:
:return:
"""
if nb_sellers == 0:
nb_sellers = random.randint(2, 2)
if nb_buyers == 0:
nb_buyers = random.randint(2, 2)
for i in range(nb_sellers):
name = f"Seller {i + 1}"
inventory = random.randint(1, 5)
min_price = random.randint(3, 8)
self.sellers.append(Seller(name, inventory, 0, min_price, min_price + random.randint(1, 5)))
for i in range(nb_buyers):
name = f"Buyer {i + 1}"
inventory = 0
max_budget = random.randint(3, 10)
self.buyers.append(Buyer(name, inventory, 10, max_budget, max_budget + random.randint(1, 5)))
def record_trade(
self,
seller,
buyer,
quantity,
price
):
"""
Record a trade between two agents
:param seller:
:param buyer:
:param quantity:
:param price:
:return:
"""
self.trades.append((seller, buyer, quantity, price))
def get_active_sellers(self):
"""
Get Sellers.
:return: List of active Sellers.
"""
return [seller for seller in self.sellers if seller.inventory > 0 and seller.has_recently_sold is True]
def get_active_buyers(self):
"""
Get Sellers.
:return: List of active Buyers.
"""
return [buyer for buyer in self.buyers if buyer.money > 0]
def get_agents(self):
"""
Get the list of all agents.
:return:
"""
return random.shuffle(self.sellers + self.buyers)
def check_market_activity(self):
"""
Check whether some agents are still active
:return:
"""
return len(self.get_active_sellers()) > 0 and len(self.get_active_buyers()) > 0
def run_trades(self):
"""
Start the market economy.
:return:
"""
while True:
if not self.check_market_activity():
print("Market is inactive.")
break
# Do all trades during a day
for i in range(self.nb_trades_per_day):
self.make_trades()
self.update_price_expectations()
def make_trades(self):
"""
For each buyer, try to trade.
:return:
"""
for buyer in self.get_active_buyers():
sellers = self.get_active_sellers()
if sellers:
seller = random.choice(sellers)
trade_price = self.get_trade_price(buyer.max_budget, seller.min_price)
if trade_price is not None:
seller.inventory -= 1
seller.money += trade_price
seller.has_recently_sold = True
buyer.inventory += 1
buyer.money -= trade_price
buyer.has_recently_bought = True
self.record_trade(seller, buyer, 1, trade_price)
@staticmethod
def get_trade_price(
buyer: Buyer,
seller: Seller
):
"""
For now, trades occur only if both price expectations are equal.
TODO: N proposals, luck factor, complexity if items have more characteristics
:param buyer:
:param seller:
:return:
"""
# If the buyer can not afford the item, return None
if buyer.price_expectation == seller.price_expectation:
return buyer.price_expectation
return None
def update_price_expectations(self):
"""
All agents update their price expectations at the end of the day
:return:
"""
for buyer in self.buyers:
if buyer.has_recently_bought:
buyer.price_expectation += 1
else:
buyer.price_expectation = min(1, buyer.price_expectation - 1)
for seller in self.sellers:
if seller.has_recently_sold:
seller.price_expectation += 1
else:
seller.price_expectation = min(1, seller.price_expectation - 1)
def print_trades(self):
"""
Print trades history.
:return:
"""
for seller, buyer, quantity, price in self.trades:
print(f"{seller.name} sold {quantity} units to {buyer.name} for {price}.")