This repository has been archived by the owner on Nov 3, 2023. It is now read-only.
/
agents.py
159 lines (137 loc) · 4.83 KB
/
agents.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import time
from abc import ABC, abstractmethod
from queue import Queue
from parlai.core.agents import Agent
class ChatServiceAgent(Agent, ABC):
"""
Base class for a person on a chat serivce that can act in a ParlAI world.
"""
def __init__(self, opt, manager, receiver_id, task_id):
super().__init__(opt)
self.manager = manager
self.id = receiver_id
self.task_id = task_id
self.acted_packets = {}
self._data = {}
self.msg_queue = Queue()
self.observed_packets = {}
self.message_request_time = None
self.stored_data = {}
self.message_partners = []
# initialize stored data
self.set_stored_data()
@property
def data(self):
"""
ChatServiceAgent data property.
"""
return self._data
@data.setter
def data(self, value):
"""
Setter for ChatServiceAgent.data.
The data within a ChatServiceAgent is persistent, in the sense that keys
_cannot_ be removed from the data. This is important to ensure persistence
of agent state across various parts of the ChatService pipeline.
To ensure this property, we call `agent._data.update(value)` when explicitly
setting the `data` property of an agent. This protects against cases where,
e.g., the `__init__` function sets a property for the agent, and then
later someone manually sets `agent.data = new_data`.
"""
self._data.update(value)
@abstractmethod
def observe(self, act):
"""
Send an agent a message through the manager.
"""
pass
def _send_payload(self, receiver_id, data, quick_replies=None, persona_id=None):
"""
Send a payload through the message manager.
:param receiver_id:
int identifier for agent to send message to
:param data:
object data to send
:param quick_replies:
list of quick replies
:param persona_id:
identifier of persona
:return:
a dictionary of a json response from the manager observing a payload
"""
return self.manager.observe_payload(
receiver_id, data, quick_replies, persona_id
)
@abstractmethod
def put_data(self, message):
"""
Put data into the message queue if it hasn't already been seen.
"""
pass
def _queue_action(self, action, act_id, act_data=None):
"""
Add an action to the queue with given id and info if it hasn't already been
seen.
:param action:
action to be added to message queue
:param act_id:
an identifier to check if the action has been seen or to
mark the action as seen
:param act_data:
any data about the given action you may want to record when
marking it as seen
"""
if act_id not in self.acted_packets:
self.acted_packets[act_id] = act_data
self.msg_queue.put(action)
def set_stored_data(self):
"""
Gets agent state data from manager.
"""
agent_state = self.manager.get_agent_state(self.id)
if agent_state is not None and hasattr(agent_state, 'stored_data'):
self.stored_data = agent_state.stored_data
def get_new_act_message(self):
"""
Get a new act message if one exists, return None otherwise.
"""
if not self.msg_queue.empty():
return self.msg_queue.get()
return None
def act(self):
"""
Pulls a message from the message queue.
If none exist returns None.
"""
msg = self.get_new_act_message()
return msg
def _check_timeout(self, timeout=None):
"""
Return whether enough time has passed than the timeout amount.
"""
if timeout:
return time.time() - self.message_request_time > timeout
return False
def act_blocking(self, timeout=None):
"""
Repeatedly loop until we retrieve a message from the queue.
"""
while True:
if self.message_request_time is None:
self.message_request_time = time.time()
msg = self.act()
if msg is not None:
self.message_request_time = None
return msg
if self._check_timeout(timeout):
return None
time.sleep(0.2)
def episode_done(self):
"""
Return whether or not this agent believes the conversation to be done.
"""
return self.manager.shutting_down