From f7fa3fa58d323905e5cc4188031d332e33a67c1d Mon Sep 17 00:00:00 2001 From: Aryaman Date: Wed, 6 Mar 2024 18:44:45 +0530 Subject: [PATCH] added borg logic for validating urls #1631 --- src/vorta/utils.py | 187 +++++++++++++++++++++++++++++ src/vorta/views/repo_add_dialog.py | 12 +- 2 files changed, 198 insertions(+), 1 deletion(-) diff --git a/src/vorta/utils.py b/src/vorta/utils.py index 033b486a5..42c51a8ba 100644 --- a/src/vorta/utils.py +++ b/src/vorta/utils.py @@ -530,3 +530,190 @@ def func(x): return i, item return None + + +class Location: + """Object representing a repository location""" + + # user must not contain "@", ":" or "/". + # Quoting adduser error message: + # "To avoid problems, the username should consist only of letters, digits, + # underscores, periods, at signs and dashes, and not start with a dash + # (as defined by IEEE Std 1003.1-2001)." + # We use "@" as separator between username and hostname, so we must + # disallow it within the pure username part. + optional_user_re = r""" + (?:(?P[^@:/]+)@)? + """ + + # path must not contain :: (it ends at :: or string end), but may contain single colons. + # to avoid ambiguities with other regexes, it must also not start with ":" nor with "//" nor with "ssh://". + local_path_re = r""" + (?!(:|//|ssh://|socket://|smb://)) # not starting with ":" or // or ssh:// or socket:// + (?P([^:]|(:(?!:)))+) # any chars, but no "::" + """ + + # file_path must not contain :: (it ends at :: or string end), but may contain single colons. + # it must start with a / and that slash is part of the path. + file_path_re = r""" + (?P(([^/]*)/([^:]|(:(?!:)))+)) # start opt. servername, then /, then any chars, but no "::" + """ + + # abs_path must not contain :: (it ends at :: or string end), but may contain single colons. + # it must start with a / and that slash is part of the path. + abs_path_re = r""" + (?P(/([^:]|(:(?!:)))+)) # start with /, then any chars, but no "::" + """ + + # host NAME, or host IP ADDRESS (v4 or v6, v6 must be in square brackets) + host_re = r""" + (?P( + (?!\[)[^:/]+(?ssh):// # ssh:// + """ + + optional_user_re + + host_re + + r""" # user@ (optional), host name or address + (?::(?P\d+))? # :port (optional) + """ + + abs_path_re, + re.VERBOSE, + ) # path + + socket_re = re.compile( + r""" + (?Psocket):// # socket:// + """ + + abs_path_re, + re.VERBOSE, + ) # path + + file_re = re.compile( + r""" + (?Pfile):// # file:// + """ + + file_path_re, + re.VERBOSE, + ) # servername/path or path + + local_re = re.compile(local_path_re, re.VERBOSE) # local path + + win_file_re = re.compile( + r""" + (?:file://)? # optional file protocol + (?P + (?:[a-zA-Z]:)? # Drive letter followed by a colon (optional) + (?:[^:]+) # Anything which does not contain a :, at least one char + ) + """, + re.VERBOSE, + ) + + def __init__(self, text="", overrides={}, other=False): + self.repo_env_var = "BORG_OTHER_REPO" if other else "BORG_REPO" + self.valid = False + self.proto = None + self.user = None + self._host = None + self.port = None + self.path = None + self.raw = None + self.processed = None + self.parse(text, overrides) + + def parse(self, text, overrides={}): + if not text: + # we did not get a text to parse, so we try to fetch from the environment + text = os.environ.get(self.repo_env_var) + if text is None: + return + + self.raw = text # as given by user, might contain placeholders + # self.processed = replace_placeholders(self.raw, overrides) # after placeholder replacement + valid = self._parse(text) + if valid: + self.valid = True + else: + self.valid = False + + def _parse(self, text): + def normpath_special(p): + # avoid that normpath strips away our relative path hack and even makes p absolute + relative = p.startswith("/./") + p = os.path.normpath(p) + return ("/." + p) if relative else p + + m = self.ssh_re.match(text) + if m: + self.proto = m.group("proto") + self.user = m.group("user") + self._host = m.group("host") + self.port = m.group("port") and int(m.group("port")) or None + self.path = normpath_special(m.group("path")) + return True + m = self.file_re.match(text) + if m: + self.proto = m.group("proto") + self.path = normpath_special(m.group("path")) + return True + m = self.socket_re.match(text) + if m: + self.proto = m.group("proto") + self.path = normpath_special(m.group("path")) + return True + m = self.local_re.match(text) + if m: + self.proto = "file" + self.path = normpath_special(m.group("path")) + return True + + return False + + def __str__(self): + items = [ + "proto=%r" % self.proto, + "user=%r" % self.user, + "host=%r" % self.host, + "port=%r" % self.port, + "path=%r" % self.path, + ] + return ", ".join(items) + + def __repr__(self): + return "Location(%s)" % self + + @property + def host(self): + # strip square brackets used for IPv6 addrs + if self._host is not None: + return self._host.lstrip("[").rstrip("]") + + def canonical_path(self): + if self.proto in ("file", "socket"): + return self.path + else: + if self.path and self.path.startswith("~"): + path = "/" + self.path # /~/x = path x relative to home dir + elif self.path and not self.path.startswith("/"): + path = "/./" + self.path # /./x = path x relative to cwd + else: + path = self.path + return "ssh://{}{}{}{}".format( + f"{self.user}@" if self.user else "", + self._host, # needed for ipv6 addrs + f":{self.port}" if self.port else "", + path, + ) + + +def is_valid_url(url): + location = Location(url) + return True if location.valid else False diff --git a/src/vorta/views/repo_add_dialog.py b/src/vorta/views/repo_add_dialog.py index cd265a333..f3fbc1dfc 100644 --- a/src/vorta/views/repo_add_dialog.py +++ b/src/vorta/views/repo_add_dialog.py @@ -14,7 +14,13 @@ from vorta.borg.init import BorgInitJob from vorta.keyring.abc import VortaKeyring from vorta.store.models import RepoModel -from vorta.utils import borg_compat, choose_file_dialog, get_asset, get_private_keys +from vorta.utils import ( + borg_compat, + choose_file_dialog, + get_asset, + get_private_keys, + is_valid_url, +) from vorta.views.partials.password_input import PasswordInput, PasswordLineEdit from vorta.views.utils import get_colored_icon @@ -102,6 +108,10 @@ def init_ssh_key(self): def validate(self): """Pre-flight check for valid input and borg binary.""" + if not is_valid_url(self.values['repo_url']): + self._set_status(self.tr('Please use a supported URL format.')) + return False + if self.is_remote_repo and not re.match(r'.+:.+', self.values['repo_url']): self._set_status(self.tr('Please enter a valid repo URL or select a local path.')) return False