-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
core.py
372 lines (318 loc) · 14.6 KB
/
core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
"""
This file defines the Interpreter class.
It's the main file. `from interpreter import interpreter` will import an instance of this class.
"""
import json
import os
import threading
import time
from datetime import datetime
from typing import Any
from ..terminal_interface.terminal_interface import terminal_interface
from ..terminal_interface.utils.display_markdown_message import display_markdown_message
from ..terminal_interface.utils.local_storage_path import get_storage_path
from ..terminal_interface.utils.oi_dir import oi_dir
from .computer.computer import Computer
from .default_system_message import default_system_message
from .llm.llm import Llm
from .respond import respond
from .server import server
from .utils.telemetry import send_telemetry
from .utils.truncate_output import truncate_output
class OpenInterpreter:
"""
This class (one instance is called an `interpreter`) is the "grand central station" of this project.
Its responsibilities are to:
1. Given some user input, prompt the language model.
2. Parse the language models responses, converting them into LMC Messages.
3. Send code to the computer.
4. Parse the computer's response (which will already be LMC Messages).
5. Send the computer's response back to the language model.
...
The above process should repeat—going back and forth between the language model and the computer— until:
6. Decide when the process is finished based on the language model's response.
"""
def __init__(
self,
messages: str | None = None,
offline: bool = False,
auto_run: bool = False,
verbose: bool = False,
debug: bool = False,
max_output: int = 2800,
safe_mode: str = "off",
shrink_images: bool = False,
force_task_completion: bool = False,
force_task_completion_message: str = """Proceed. You CAN run code on my machine. If you want to run code, start your message with "```"! If the entire task I asked for is done, say exactly 'The task is done.' If you need some specific information (like username or password) say EXACTLY 'Please provide more information.' If it's impossible, say 'The task is impossible.' (If I haven't provided a task, say exactly 'Let me know what you'd like to do next.') Otherwise keep going.""",
force_task_completion_breakers: list[str] = [
"the task is done.",
"the task is impossible.",
"let me know what you'd like to do next.",
"please provide more information.",
],
anonymous_telemetry: bool = bool(os.getenv("ANONYMIZED_TELEMETRY", True))
== True,
in_terminal_interface: bool = False,
conversation_history: bool = True,
conversation_filename: bool = None,
conversation_history_path: str = get_storage_path("conversations"),
os: bool = False,
speak_messages: bool = False,
llm: Llm = None,
system_message=default_system_message,
custom_instructions="",
computer: Computer | None = None,
sync_computer: bool = True,
import_computer_api: bool = False,
skills_path=None,
import_skills: bool = True,
multi_line: bool = False,
):
# State
self.messages: list[dict[str, Any]] = [] if messages is None else messages
self.responding = False
self.last_messages_count = 0
# Settings
self.offline = offline
self.auto_run = auto_run
self.verbose = verbose
self.debug = debug
self.max_output = max_output
self.safe_mode = safe_mode
self.shrink_images = shrink_images
self.anonymous_telemetry = anonymous_telemetry
self.in_terminal_interface = in_terminal_interface
self.multi_line = multi_line
# Loop messages
self.force_task_completion = force_task_completion
self.force_task_completion_message = force_task_completion_message
self.force_task_completion_breakers = force_task_completion_breakers
# Conversation history
self.conversation_history = conversation_history
self.conversation_filename = conversation_filename
self.conversation_history_path = conversation_history_path
# OS control mode related attributes
self.os = os
self.speak_messages = speak_messages
# LLM
self.llm = Llm(self) if llm is None else llm
# These are LLM related
self.system_message = system_message
self.custom_instructions = custom_instructions
# Computer
self.computer = Computer(self) if computer is None else computer
self.sync_computer = sync_computer
self.computer.import_computer_api = import_computer_api
# Skills
if skills_path:
self.computer.skills.path = skills_path
self.import_skills = import_skills
if import_skills:
if self.verbose:
print("Importing skills")
self.computer.skills.import_skills()
def server(self, *args, **kwargs):
server(self, *args, **kwargs)
def wait(self):
while self.responding:
time.sleep(0.2)
# Return new messages
return self.messages[self.last_messages_count :]
def chat(
self,
message: str | None = None,
display: bool = True,
stream: bool = False,
blocking: bool = True,
) -> list[dict[str, Any]] | str:
try:
self.responding = True
if self.anonymous_telemetry and not self.offline:
message_type = type(
message
).__name__ # Only send message type, no content
send_telemetry(
"started_chat",
properties={
"in_terminal_interface": self.in_terminal_interface,
"message_type": message_type,
"os_mode": self.os,
},
)
if not blocking:
chat_thread = threading.Thread(
target=self.chat, args=(message, display, stream, True)
) # True as in blocking = True
chat_thread.start()
return
if stream:
return self._streaming_chat(message=message, display=display)
# If stream=False, *pull* from the stream.
for _ in self._streaming_chat(message=message, display=display):
pass
# Return new messages
self.responding = False
return self.messages[self.last_messages_count :]
except Exception as e:
self.responding = False
if self.anonymous_telemetry and not self.offline:
message_type = type(message).__name__
send_telemetry(
"errored",
properties={
"error": str(e),
"in_terminal_interface": self.in_terminal_interface,
"message_type": message_type,
"os_mode": self.os,
},
)
raise
def _streaming_chat(self, message: str | None = None, display: bool = True):
# Sometimes a little more code -> a much better experience!
# Display mode actually runs interpreter.chat(display=False, stream=True) from within the terminal_interface.
# wraps the vanilla .chat(display=False) generator in a display.
# Quite different from the plain generator stuff. So redirect to that
if display:
yield from terminal_interface(self, message)
return
# One-off message
if message or message == "":
if message == "":
message = "No entry from user - please suggest something to enter."
## We support multiple formats for the incoming message:
# Dict (these are passed directly in)
if isinstance(message, dict):
if "role" not in message:
message["role"] = "user"
self.messages.append(message)
# String (we construct a user message dict)
elif isinstance(message, str):
self.messages.append(
{"role": "user", "type": "message", "content": message}
)
# List (this is like the OpenAI API)
elif isinstance(message, list):
self.messages = message
# Now that the user's messages have been added, we set last_messages_count.
# This way we will only return the messages after what they added.
self.last_messages_count = len(self.messages)
# DISABLED because I think we should just not transmit images to non-multimodal models?
# REENABLE this when multimodal becomes more common:
# Make sure we're using a model that can handle this
# if not self.llm.supports_vision:
# for message in self.messages:
# if message["type"] == "image":
# raise Exception(
# "Use a multimodal model and set `interpreter.llm.supports_vision` to True to handle image messages."
# )
# This is where it all happens!
yield from self._respond_and_store()
# Save conversation if we've turned conversation_history on
if self.conversation_history:
# If it's the first message, set the conversation name
if not self.conversation_filename:
first_few_words = "_".join(
self.messages[0]["content"][:25].split(" ")[:-1]
)
for char in '<>:"/\\|?*!': # Invalid characters for filenames
first_few_words = first_few_words.replace(char, "")
date = datetime.now().strftime("%B_%d_%Y_%H-%M-%S")
self.conversation_filename = (
"__".join([first_few_words, date]) + ".json"
)
# Check if the directory exists, if not, create it
if not os.path.exists(self.conversation_history_path):
os.makedirs(self.conversation_history_path)
# Write or overwrite the file
with open(
os.path.join(
self.conversation_history_path, self.conversation_filename
),
"w",
) as f:
json.dump(self.messages, f)
return
raise Exception(
"`interpreter.chat()` requires a display. Set `display=True` or pass a message into `interpreter.chat(message)`."
)
def _respond_and_store(self):
"""
Pulls from the respond stream, adding delimiters. Some things, like active_line, console, confirmation... these act specially.
Also assembles new messages and adds them to `self.messages`.
"""
# Utility function
def is_active_line_chunk(chunk):
return "format" in chunk and chunk["format"] == "active_line"
last_flag_base = None
for chunk in respond(self):
if chunk["content"] == "":
continue
# Handle the special "confirmation" chunk, which neither triggers a flag or creates a message
if chunk["type"] == "confirmation":
# Emit a end flag for the last message type, and reset last_flag_base
if last_flag_base:
yield {**last_flag_base, "end": True}
last_flag_base = None
yield chunk
# We want to append this now, so even if content is never filled, we know that the execution didn't produce output.
# ... rethink this though.
self.messages.append(
{
"role": "computer",
"type": "console",
"format": "output",
"content": "",
}
)
continue
# Check if the chunk's role, type, and format (if present) match the last_flag_base
if (
last_flag_base
and "role" in chunk
and "type" in chunk
and last_flag_base["role"] == chunk["role"]
and last_flag_base["type"] == chunk["type"]
and (
"format" not in last_flag_base
or (
"format" in chunk
and chunk["format"] == last_flag_base["format"]
)
)
):
# If they match, append the chunk's content to the current message's content
# (Except active_line, which shouldn't be stored)
if not is_active_line_chunk(chunk):
self.messages[-1]["content"] += chunk["content"]
else:
# If they don't match, yield a end message for the last message type and a start message for the new one
if last_flag_base:
yield {**last_flag_base, "end": True}
last_flag_base = {"role": chunk["role"], "type": chunk["type"]}
# Don't add format to type: "console" flags, to accomodate active_line AND output formats
if "format" in chunk and chunk["type"] != "console":
last_flag_base["format"] = chunk["format"]
yield {**last_flag_base, "start": True}
# Add the chunk as a new message
if not is_active_line_chunk(chunk):
self.messages.append(chunk)
# Yield the chunk itself
yield chunk
# Truncate output if it's console output
if chunk["type"] == "console" and chunk["format"] == "output":
self.messages[-1]["content"] = truncate_output(
self.messages[-1]["content"], self.max_output
)
# Yield a final end flag
if last_flag_base:
yield {**last_flag_base, "end": True}
def reset(self):
self.computer.terminate() # Terminates all languages
self.messages = []
self.last_messages_count = 0
def display_message(self, markdown):
# This is just handy for start_script in profiles.
display_markdown_message(markdown)
def get_oi_dir(self):
# Again, just handy for start_script in profiles.
return oi_dir