Skip to content

Commit

Permalink
#3002 use pinentry for ssh confirmations and passwords / passphrases
Browse files Browse the repository at this point in the history
  • Loading branch information
totaam committed May 12, 2021
1 parent bc25f4b commit 2d2022d
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 51 deletions.
56 changes: 15 additions & 41 deletions xpra/client/gtk_base/gtk_client_base.py
Expand Up @@ -18,7 +18,7 @@
)
from xpra.os_util import (
bytestostr, strtobytes, hexstr, monotonic_time, load_binary_file,
WIN32, OSX, POSIX, is_Wayland, is_gnome, is_kde, which,
WIN32, OSX, POSIX, is_Wayland,
)
from xpra.simple_stats import std_unit
from xpra.exit_codes import EXIT_PASSWORD_REQUIRED
Expand Down Expand Up @@ -272,28 +272,14 @@ def do_process_challenge_prompt(self, packet, prompt="password"):
authlog = Logger("auth")
self.show_progress(100, "authentication")
PINENTRY = os.environ.get("XPRA_PINENTRY", "")
authlog("do_process_challenge_prompt%s PINENTRY=%s", (packet, prompt), PINENTRY)
if PINENTRY.lower() not in FALSE_OPTIONS:
pinentry_cmd = PINENTRY
def find_pinentry_bin():
if is_gnome():
return which("pinentry-gnome3")
if is_kde():
return which("pinentry-qt")
return None
if PINENTRY=="" or PINENTRY.lower()=="auto":
#figure out if we should use it:
if WIN32 or OSX:
#not enabled by default on those platforms
pinentry_cmd = None
else:
pinentry_cmd = find_pinentry_bin()
if PINENTRY.lower() in TRUE_OPTIONS:
pinentry_cmd = find_pinentry_bin()
if pinentry_cmd:
start_thread(self.handle_challenge_with_pinentry,
"pinentry", True, (packet, prompt, pinentry_cmd))
return True
from xpra.scripts.main import get_pinentry_command
pinentry_cmd = get_pinentry_command(PINENTRY)
authlog("do_process_challenge_prompt%s get_pinentry_command(%s)=%s",
(packet, prompt), PINENTRY, pinentry_cmd)
if pinentry_cmd:
start_thread(self.handle_challenge_with_pinentry,
"pinentry", True, (packet, prompt, pinentry_cmd))
return True
return self.do_process_challenge_prompt_dialog(packet, prompt)

def stop_pinentry(self):
Expand All @@ -306,7 +292,6 @@ def stop_pinentry(self):
def handle_challenge_with_pinentry(self, packet, prompt="password", cmd="pinentry"):
authlog = Logger("auth")
authlog("handle_challenge_with_pinentry%s", (packet, prompt, cmd))
from xpra.scripts.main import do_run_pinentry
try:
proc = Popen([cmd], stdin=PIPE, stdout=PIPE, stderr=PIPE)
except OSError:
Expand All @@ -322,23 +307,12 @@ def handle_challenge_with_pinentry(self, packet, prompt="password", cmd="pinentr
cinfo = conn.get_info()
endpoint = pretty_socket(cinfo.get("endpoint", conn.target)).split("?")[0]
q += " for %s" % endpoint
messages = [
"SETPROMPT Xpra Server Authentication:",
"SETDESC %s:" % q,
"GETPIN",
]
def get_input():
if not messages:
return None
return messages.pop(0)
def process_output(message, output):
if message=="GETPIN":
if output.startswith(b"D "):
password = output[2:].rstrip(b"\n\r").decode()
self.idle_add(self.send_challenge_reply, packet, password)
else:
self.idle_add(self.quit, EXIT_PASSWORD_REQUIRED)
do_run_pinentry(proc, get_input, process_output)
def got_pin(value):
self.idle_add(self.send_challenge_reply, packet, value)
def no_pin():
self.idle_add(self.quit, EXIT_PASSWORD_REQUIRED)
from xpra.scripts.main import pinentry_getpin
pinentry_getpin(proc, "Xpra Server Authentication:", q, got_pin, no_pin)
return True
return self.do_process_challenge_prompt_dialog(packet, prompt)

Expand Down
25 changes: 19 additions & 6 deletions xpra/net/ssh.py
Expand Up @@ -11,7 +11,11 @@
from time import sleep
from subprocess import PIPE, Popen

from xpra.scripts.main import InitException, InitExit, shellquote, host_target_string
from xpra.scripts.main import (
InitException, InitExit,
shellquote, host_target_string,
get_pinentry_command, run_pinentry_getpin, run_pinentry_confirm,
)
from xpra.platform.paths import get_ssh_known_hosts_files
from xpra.platform import get_username
from xpra.scripts.config import parse_bool
Expand All @@ -36,6 +40,7 @@
WINDOW_SIZE = envint("XPRA_SSH_WINDOW_SIZE", 2**27-1)
TIMEOUT = envint("XPRA_SSH_TIMEOUT", 60)
SKIP_UI = envbool("XPRA_SKIP_UI", False)
PINENTRY = envbool("XPRA_SSH_PINENTRY", POSIX and not OSX)

VERIFY_HOSTKEY = envbool("XPRA_SSH_VERIFY_HOSTKEY", True)
VERIFY_STRICT = envbool("XPRA_SSH_VERIFY_STRICT", False)
Expand Down Expand Up @@ -97,6 +102,10 @@ def main_thread_run():
return code[0]

def dialog_pass(title="Password Input", prompt="enter password", icon="") -> str:
if PINENTRY:
pinentry_cmd = get_pinentry_command()
if pinentry_cmd:
return run_pinentry_getpin(pinentry_cmd, title, prompt)
from xpra.client.gtk_base.pass_dialog import PasswordInputDialogWindow
dialog = PasswordInputDialogWindow(title, prompt, icon)
try:
Expand All @@ -116,14 +125,18 @@ def dialog_confirm(title, prompt, qinfo=(), icon="", buttons=(("OK", 1),)) -> in
return r


def confirm_key(info=()) -> bool:
def confirm_key(info=(), title="Confirm Key", prompt="Are you sure you want to continue connecting?") -> bool:
if SKIP_UI:
return False
from xpra.platform.paths import get_icon_filename
if PINENTRY:
pinentry_cmd = get_pinentry_command()
if pinentry_cmd:
messages = list(info)+["", prompt]
return run_pinentry_confirm(pinentry_cmd, title, "%0A".join(messages))
if use_gui_prompt():
icon = get_icon_filename("authentication", "png") or ""
prompt = "Are you sure you want to continue connecting?"
code = dialog_confirm("Confirm Key", prompt, info, icon, buttons=[("yes", 200), ("NO", 201)])
code = dialog_confirm(title, prompt, info, icon, buttons=[("yes", 200), ("NO", 201)])
log("dialog return code=%s", code)
r = code==200
log.info("host key %sconfirmed", ["not ", ""][r])
Expand All @@ -141,7 +154,7 @@ def input_pass(prompt) -> str:
if SKIP_UI:
return None
from xpra.platform.paths import get_icon_filename
if use_gui_prompt():
if PINENTRY or use_gui_prompt():
icon = get_icon_filename("authentication", "png") or ""
return dialog_pass("Password Input", prompt, icon)
from getpass import getpass
Expand Down Expand Up @@ -513,7 +526,7 @@ def adddnscheckinfo(q):
else:
assert (not keys) or (host_key.get_name() not in keys)
if not keys:
log.warn("Warning: unknown SSH host")
log.warn("Warning: unknown SSH host '%s'", host)
else:
log.warn("Warning: unknown %s SSH host key", keyname())
qinfo = [
Expand Down
95 changes: 91 additions & 4 deletions xpra/scripts/main.py
Expand Up @@ -1396,6 +1396,38 @@ def sockpathfail_cb(msg):
raise InitException("unsupported display type: %s" % dtype)


def get_pinentry_command(setting="yes"):
log = Logger("exec")
log("get_pinentry_command(%s)", setting)
if setting.lower() in FALSE_OPTIONS:
return None
from xpra.os_util import is_gnome, is_kde, which
def find_pinentry_bin():
if is_gnome():
return which("pinentry-gnome3")
if is_kde():
return which("pinentry-qt")
return None
if setting.lower() in TRUE_OPTIONS:
return find_pinentry_bin() or which("pinentry")
if setting=="" or setting.lower()=="auto":
#figure out if we should use it:
if WIN32 or OSX:
#not enabled by default on those platforms
return None
return find_pinentry_bin()
return setting

def popen_pinentry(pinentry_cmd):
try:
return Popen([pinentry_cmd], stdin=PIPE, stdout=PIPE, stderr=PIPE)
except OSError as e:
log = Logger("exec")
log("popen_pinentry(%s) failed", pinentry_cmd, exc_info=True)
log.error("Error: failed to run '%s'", pinentry_cmd)
log.error(" %s", e)
return None

def run_pinentry(extra_args):
messages = list(extra_args)
log = Logger("exec")
Expand All @@ -1409,10 +1441,10 @@ def process_output(message, line):
log.error(" %s", line.rstrip(b"\n\r").decode())
else:
log("pinentry sent %r", line)
try:
proc = Popen(["pinentry"], stdin=PIPE, stdout=PIPE, stderr=PIPE)
except OSError as e:
raise InitExit(EXIT_UNSUPPORTED, "cannot run pinentry: %s" % (e,)) from None
pinentry_cmd = get_pinentry_command() or "pinentry"
proc = popen_pinentry(pinentry_cmd)
if not proc:
raise InitExit(EXIT_UNSUPPORTED, "cannot run pinentry")
return do_run_pinentry(proc, get_input, process_output)

def do_run_pinentry(proc, get_input, process_output):
Expand All @@ -1431,6 +1463,61 @@ def do_run_pinentry(proc, get_input, process_output):
proc.terminate()
log("pinentry ended: %s" % proc.poll())

def pinentry_getpin(pinentry_proc, title, description, pin_cb, err_cb):
messages = [
"SETPROMPT %s" % title,
"SETDESC %s:" % description,
"GETPIN",
]
def get_input():
if not messages:
return None
return messages.pop(0)
def process_output(message, output):
if message=="GETPIN":
if output.startswith(b"D "):
pin_value = output[2:].rstrip(b"\n\r").decode()
pin_cb(pin_value)
else:
err_cb()
do_run_pinentry(pinentry_proc, get_input, process_output)
return True

def run_pinentry_getpin(pinentry_cmd, title, description):
proc = popen_pinentry(pinentry_cmd)
if proc is None:
return None
values = []
def rec(value=None):
values.append(value)
try:
pinentry_getpin(proc, title, description, rec, rec)
finally:
noerr(proc.terminate)
if not values:
return None
return values[0]

def run_pinentry_confirm(pinentry_cmd, title, prompt):
proc = popen_pinentry(pinentry_cmd)
if proc is None:
return None
messages = [
"SETPROMPT %s" % title,
"SETDESC %s:" % prompt,
"CONFIRM",
]
def get_input():
if not messages:
return None
return messages.pop(0)
confirm_values = []
def process_output(message, output):
if message=="CONFIRM":
confirm_values.append(output.strip(b"\n\r"))
do_run_pinentry(proc, get_input, process_output)
return len(confirm_values)==1 and confirm_values[0]==b"OK"


def run_dialog(extra_args):
from xpra.client.gtk_base.confirm_dialog import show_confirm_dialog
Expand Down

0 comments on commit 2d2022d

Please sign in to comment.