Skip to content

Commit

Permalink
Try to get password prompts from sudo command with pty.
Browse files Browse the repository at this point in the history
  • Loading branch information
Steve235lab committed May 11, 2024
1 parent d316a99 commit 39d1d49
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 25 deletions.
2 changes: 2 additions & 0 deletions interpreter/core/computer/terminal/languages/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ def __init__(
self.start_cmd = [os.environ.get("SHELL", "bash")]

def preprocess_code(self, code):
# Check if running sudo command
self.is_sudo = "sudo" in code
return preprocess_shell(code)

def line_postprocessor(self, line):
Expand Down
138 changes: 113 additions & 25 deletions interpreter/core/computer/terminal/languages/subprocess_language.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import pty
import queue
import re
import subprocess
Expand All @@ -16,6 +17,8 @@ def __init__(self):
self.verbose = False
self.output_queue = queue.Queue()
self.done = threading.Event()
self.slave_fd = None
self.is_sudo = False

def detect_active_line(self, line):
return None
Expand Down Expand Up @@ -47,28 +50,52 @@ def start_process(self):

my_env = os.environ.copy()
my_env["PYTHONIOENCODING"] = "utf-8"
self.process = subprocess.Popen(
self.start_cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=0,
universal_newlines=True,
env=my_env,
encoding="utf-8",
errors="replace",
)
threading.Thread(
target=self.handle_stream_output,
args=(self.process.stdout, False),
daemon=True,
).start()
threading.Thread(
target=self.handle_stream_output,
args=(self.process.stderr, True),
daemon=True,
).start()
if self.is_sudo:
# Create a pseudo-terminal to get prompts of sudo command
master_fd, slave_fd = pty.openpty()
self.slave_fd = slave_fd
self.process = subprocess.Popen(
self.start_cmd,
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
text=True,
bufsize=0,
universal_newlines=True,
env=my_env,
encoding="utf-8",
errors="replace",
)

# Handle output from the pseudo-terminal
threading.Thread(
target=self.handle_sudo_output,
args=(master_fd,),
daemon=True
).start()
else:
self.process = subprocess.Popen(
self.start_cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=0,
universal_newlines=True,
env=my_env,
encoding="utf-8",
errors="replace",
)
threading.Thread(
target=self.handle_stream_output,
args=(self.process.stdout, False),
daemon=True,
).start()
threading.Thread(
target=self.handle_stream_output,
args=(self.process.stderr, True),
daemon=True,
).start()

def run(self, code):
retry_count = 0
Expand All @@ -94,9 +121,13 @@ def run(self, code):
self.done.clear()

try:
self.process.stdin.write(code + "\n")
self.process.stdin.flush()
break
if self.is_sudo:
os.write(self.slave_fd, (code + "\n").encode('utf-8')) # 修改这里,使用os.write
break
else:
self.process.stdin.write(code + "\n")
self.process.stdin.flush()
break
except:
if retry_count != 0:
# For UX, I like to hide this if it happens once. Obviously feels better to not see errors
Expand Down Expand Up @@ -191,3 +222,60 @@ def handle_stream_output(self, stream, is_error_stream):
print("Stream closed while reading.")
else:
raise e

def handle_sudo_output(self, fd):
with os.fdopen(fd, 'r') as stream:
try:
for line in iter(stream.readline, ""):
# print(f"Received output line:\n{line}\n---")
if self.verbose:
print(f"Received output line:\n{line}\n---")

line = self.line_postprocessor(line)

if line is None:
continue # `line = None` is the postprocessor's signal to discard completely

if self.detect_active_line(line):
active_line = self.detect_active_line(line)
self.output_queue.put(
{
"type": "console",
"format": "active_line",
"content": active_line,
}
)
# Sometimes there's a little extra on the same line, so be sure to send that out
line = re.sub(r"##active_line\d+##", "", line)
if line:
self.output_queue.put(
{"type": "console", "format": "output", "content": line}
)
elif self.detect_end_of_execution(line):
# Sometimes there's a little extra on the same line, so be sure to send that out
line = line.replace("##end_of_execution##", "").strip()
if line:
self.output_queue.put(
{"type": "console", "format": "output", "content": line}
)
self.done.set()
elif "KeyboardInterrupt" in line:
self.output_queue.put(
{
"type": "console",
"format": "output",
"content": "KeyboardInterrupt",
}
)
time.sleep(0.1)
self.done.set()
else:
self.output_queue.put(
{"type": "console", "format": "output", "content": line}
)
except ValueError as e:
if "operation on closed file" in str(e):
if self.verbose:
print("Stream closed while reading.")
else:
raise e

0 comments on commit 39d1d49

Please sign in to comment.