From 2d2022d184f31f53c2328b5e5ca804e5ea46ff6c Mon Sep 17 00:00:00 2001 From: totaam Date: Wed, 12 May 2021 20:21:00 +0700 Subject: [PATCH] #3002 use pinentry for ssh confirmations and passwords / passphrases --- xpra/client/gtk_base/gtk_client_base.py | 56 ++++----------- xpra/net/ssh.py | 25 +++++-- xpra/scripts/main.py | 95 +++++++++++++++++++++++-- 3 files changed, 125 insertions(+), 51 deletions(-) diff --git a/xpra/client/gtk_base/gtk_client_base.py b/xpra/client/gtk_base/gtk_client_base.py index 5710fac903..1b89c9d9d1 100644 --- a/xpra/client/gtk_base/gtk_client_base.py +++ b/xpra/client/gtk_base/gtk_client_base.py @@ -18,7 +18,7 @@ ) from xpra.os_util import ( bytestostr, strtobytes, hexstr, monotonic_time, load_binary_file, - WIN32, OSX, POSIX, is_Wayland, is_gnome, is_kde, which, + WIN32, OSX, POSIX, is_Wayland, ) from xpra.simple_stats import std_unit from xpra.exit_codes import EXIT_PASSWORD_REQUIRED @@ -272,28 +272,14 @@ def do_process_challenge_prompt(self, packet, prompt="password"): authlog = Logger("auth") self.show_progress(100, "authentication") PINENTRY = os.environ.get("XPRA_PINENTRY", "") - authlog("do_process_challenge_prompt%s PINENTRY=%s", (packet, prompt), PINENTRY) - if PINENTRY.lower() not in FALSE_OPTIONS: - pinentry_cmd = PINENTRY - def find_pinentry_bin(): - if is_gnome(): - return which("pinentry-gnome3") - if is_kde(): - return which("pinentry-qt") - return None - if PINENTRY=="" or PINENTRY.lower()=="auto": - #figure out if we should use it: - if WIN32 or OSX: - #not enabled by default on those platforms - pinentry_cmd = None - else: - pinentry_cmd = find_pinentry_bin() - if PINENTRY.lower() in TRUE_OPTIONS: - pinentry_cmd = find_pinentry_bin() - if pinentry_cmd: - start_thread(self.handle_challenge_with_pinentry, - "pinentry", True, (packet, prompt, pinentry_cmd)) - return True + from xpra.scripts.main import get_pinentry_command + pinentry_cmd = get_pinentry_command(PINENTRY) + authlog("do_process_challenge_prompt%s get_pinentry_command(%s)=%s", + (packet, prompt), PINENTRY, pinentry_cmd) + if pinentry_cmd: + start_thread(self.handle_challenge_with_pinentry, + "pinentry", True, (packet, prompt, pinentry_cmd)) + return True return self.do_process_challenge_prompt_dialog(packet, prompt) def stop_pinentry(self): @@ -306,7 +292,6 @@ def stop_pinentry(self): def handle_challenge_with_pinentry(self, packet, prompt="password", cmd="pinentry"): authlog = Logger("auth") authlog("handle_challenge_with_pinentry%s", (packet, prompt, cmd)) - from xpra.scripts.main import do_run_pinentry try: proc = Popen([cmd], stdin=PIPE, stdout=PIPE, stderr=PIPE) except OSError: @@ -322,23 +307,12 @@ def handle_challenge_with_pinentry(self, packet, prompt="password", cmd="pinentr cinfo = conn.get_info() endpoint = pretty_socket(cinfo.get("endpoint", conn.target)).split("?")[0] q += " for %s" % endpoint - messages = [ - "SETPROMPT Xpra Server Authentication:", - "SETDESC %s:" % q, - "GETPIN", - ] - def get_input(): - if not messages: - return None - return messages.pop(0) - def process_output(message, output): - if message=="GETPIN": - if output.startswith(b"D "): - password = output[2:].rstrip(b"\n\r").decode() - self.idle_add(self.send_challenge_reply, packet, password) - else: - self.idle_add(self.quit, EXIT_PASSWORD_REQUIRED) - do_run_pinentry(proc, get_input, process_output) + def got_pin(value): + self.idle_add(self.send_challenge_reply, packet, value) + def no_pin(): + self.idle_add(self.quit, EXIT_PASSWORD_REQUIRED) + from xpra.scripts.main import pinentry_getpin + pinentry_getpin(proc, "Xpra Server Authentication:", q, got_pin, no_pin) return True return self.do_process_challenge_prompt_dialog(packet, prompt) diff --git a/xpra/net/ssh.py b/xpra/net/ssh.py index 4c9a5ecbc0..6910e6ca75 100644 --- a/xpra/net/ssh.py +++ b/xpra/net/ssh.py @@ -11,7 +11,11 @@ from time import sleep from subprocess import PIPE, Popen -from xpra.scripts.main import InitException, InitExit, shellquote, host_target_string +from xpra.scripts.main import ( + InitException, InitExit, + shellquote, host_target_string, + get_pinentry_command, run_pinentry_getpin, run_pinentry_confirm, + ) from xpra.platform.paths import get_ssh_known_hosts_files from xpra.platform import get_username from xpra.scripts.config import parse_bool @@ -36,6 +40,7 @@ WINDOW_SIZE = envint("XPRA_SSH_WINDOW_SIZE", 2**27-1) TIMEOUT = envint("XPRA_SSH_TIMEOUT", 60) SKIP_UI = envbool("XPRA_SKIP_UI", False) +PINENTRY = envbool("XPRA_SSH_PINENTRY", POSIX and not OSX) VERIFY_HOSTKEY = envbool("XPRA_SSH_VERIFY_HOSTKEY", True) VERIFY_STRICT = envbool("XPRA_SSH_VERIFY_STRICT", False) @@ -97,6 +102,10 @@ def main_thread_run(): return code[0] def dialog_pass(title="Password Input", prompt="enter password", icon="") -> str: + if PINENTRY: + pinentry_cmd = get_pinentry_command() + if pinentry_cmd: + return run_pinentry_getpin(pinentry_cmd, title, prompt) from xpra.client.gtk_base.pass_dialog import PasswordInputDialogWindow dialog = PasswordInputDialogWindow(title, prompt, icon) try: @@ -116,14 +125,18 @@ def dialog_confirm(title, prompt, qinfo=(), icon="", buttons=(("OK", 1),)) -> in return r -def confirm_key(info=()) -> bool: +def confirm_key(info=(), title="Confirm Key", prompt="Are you sure you want to continue connecting?") -> bool: if SKIP_UI: return False from xpra.platform.paths import get_icon_filename + if PINENTRY: + pinentry_cmd = get_pinentry_command() + if pinentry_cmd: + messages = list(info)+["", prompt] + return run_pinentry_confirm(pinentry_cmd, title, "%0A".join(messages)) if use_gui_prompt(): icon = get_icon_filename("authentication", "png") or "" - prompt = "Are you sure you want to continue connecting?" - code = dialog_confirm("Confirm Key", prompt, info, icon, buttons=[("yes", 200), ("NO", 201)]) + code = dialog_confirm(title, prompt, info, icon, buttons=[("yes", 200), ("NO", 201)]) log("dialog return code=%s", code) r = code==200 log.info("host key %sconfirmed", ["not ", ""][r]) @@ -141,7 +154,7 @@ def input_pass(prompt) -> str: if SKIP_UI: return None from xpra.platform.paths import get_icon_filename - if use_gui_prompt(): + if PINENTRY or use_gui_prompt(): icon = get_icon_filename("authentication", "png") or "" return dialog_pass("Password Input", prompt, icon) from getpass import getpass @@ -513,7 +526,7 @@ def adddnscheckinfo(q): else: assert (not keys) or (host_key.get_name() not in keys) if not keys: - log.warn("Warning: unknown SSH host") + log.warn("Warning: unknown SSH host '%s'", host) else: log.warn("Warning: unknown %s SSH host key", keyname()) qinfo = [ diff --git a/xpra/scripts/main.py b/xpra/scripts/main.py index 06ef8127ae..2f02223439 100755 --- a/xpra/scripts/main.py +++ b/xpra/scripts/main.py @@ -1396,6 +1396,38 @@ def sockpathfail_cb(msg): raise InitException("unsupported display type: %s" % dtype) +def get_pinentry_command(setting="yes"): + log = Logger("exec") + log("get_pinentry_command(%s)", setting) + if setting.lower() in FALSE_OPTIONS: + return None + from xpra.os_util import is_gnome, is_kde, which + def find_pinentry_bin(): + if is_gnome(): + return which("pinentry-gnome3") + if is_kde(): + return which("pinentry-qt") + return None + if setting.lower() in TRUE_OPTIONS: + return find_pinentry_bin() or which("pinentry") + if setting=="" or setting.lower()=="auto": + #figure out if we should use it: + if WIN32 or OSX: + #not enabled by default on those platforms + return None + return find_pinentry_bin() + return setting + +def popen_pinentry(pinentry_cmd): + try: + return Popen([pinentry_cmd], stdin=PIPE, stdout=PIPE, stderr=PIPE) + except OSError as e: + log = Logger("exec") + log("popen_pinentry(%s) failed", pinentry_cmd, exc_info=True) + log.error("Error: failed to run '%s'", pinentry_cmd) + log.error(" %s", e) + return None + def run_pinentry(extra_args): messages = list(extra_args) log = Logger("exec") @@ -1409,10 +1441,10 @@ def process_output(message, line): log.error(" %s", line.rstrip(b"\n\r").decode()) else: log("pinentry sent %r", line) - try: - proc = Popen(["pinentry"], stdin=PIPE, stdout=PIPE, stderr=PIPE) - except OSError as e: - raise InitExit(EXIT_UNSUPPORTED, "cannot run pinentry: %s" % (e,)) from None + pinentry_cmd = get_pinentry_command() or "pinentry" + proc = popen_pinentry(pinentry_cmd) + if not proc: + raise InitExit(EXIT_UNSUPPORTED, "cannot run pinentry") return do_run_pinentry(proc, get_input, process_output) def do_run_pinentry(proc, get_input, process_output): @@ -1431,6 +1463,61 @@ def do_run_pinentry(proc, get_input, process_output): proc.terminate() log("pinentry ended: %s" % proc.poll()) +def pinentry_getpin(pinentry_proc, title, description, pin_cb, err_cb): + messages = [ + "SETPROMPT %s" % title, + "SETDESC %s:" % description, + "GETPIN", + ] + def get_input(): + if not messages: + return None + return messages.pop(0) + def process_output(message, output): + if message=="GETPIN": + if output.startswith(b"D "): + pin_value = output[2:].rstrip(b"\n\r").decode() + pin_cb(pin_value) + else: + err_cb() + do_run_pinentry(pinentry_proc, get_input, process_output) + return True + +def run_pinentry_getpin(pinentry_cmd, title, description): + proc = popen_pinentry(pinentry_cmd) + if proc is None: + return None + values = [] + def rec(value=None): + values.append(value) + try: + pinentry_getpin(proc, title, description, rec, rec) + finally: + noerr(proc.terminate) + if not values: + return None + return values[0] + +def run_pinentry_confirm(pinentry_cmd, title, prompt): + proc = popen_pinentry(pinentry_cmd) + if proc is None: + return None + messages = [ + "SETPROMPT %s" % title, + "SETDESC %s:" % prompt, + "CONFIRM", + ] + def get_input(): + if not messages: + return None + return messages.pop(0) + confirm_values = [] + def process_output(message, output): + if message=="CONFIRM": + confirm_values.append(output.strip(b"\n\r")) + do_run_pinentry(proc, get_input, process_output) + return len(confirm_values)==1 and confirm_values[0]==b"OK" + def run_dialog(extra_args): from xpra.client.gtk_base.confirm_dialog import show_confirm_dialog