From d30ae8453ddd82a128ba1077969f417ac2ab12c2 Mon Sep 17 00:00:00 2001 From: ed Date: Thu, 21 Mar 2024 20:10:27 +0000 Subject: [PATCH] idp: precise expansion of `${u}` (fixes #79); it is now possible to grant access to users other than `${u}` (the user which the volume belongs to) previously, permissions did not apply correctly to IdP volumes due to the way `${u}` and `${g}` was expanded, which was a funky iteration over all known users/groups instead of... just expanding them? also adds another sanchk that a volume's URL must contain a `${u}` to be allowed to mention `${u}` in the accs list, and similarly for `${g}` / `@${g}` since users can be in multiple groups --- copyparty/authsrv.py | 45 ++++++++++++++++---------------- tests/res/idp/6.conf | 24 +++++++++++++++++ tests/test_idp.py | 62 ++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 106 insertions(+), 25 deletions(-) create mode 100644 tests/res/idp/6.conf diff --git a/copyparty/authsrv.py b/copyparty/authsrv.py index 777d1892..ecaf63fa 100644 --- a/copyparty/authsrv.py +++ b/copyparty/authsrv.py @@ -1224,7 +1224,9 @@ def _read_vol_str_idp( if un.startswith("@"): grp = un[1:] uns = [x[0] for x in un_gns.items() if grp in x[1]] - if not uns and grp != "${g}" and not self.args.idp_h_grp: + if grp == "${g}": + unames.append(un) + elif not uns and not self.args.idp_h_grp: t = "group [%s] must be defined with --grp argument (or in a [groups] config section)" raise CfgEx(t % (grp,)) @@ -1234,31 +1236,28 @@ def _read_vol_str_idp( # unames may still contain ${u} and ${g} so now expand those; un_gn = [(un, gn) for un, gns in un_gns.items() for gn in gns] - if "*" not in un_gns: - # need ("*","") to match "*" in unames - un_gn.append(("*", "")) - - for _, dst, vu, vg in vols: - unames2 = set() - for un, gn in un_gn: - # if vu/vg (volume user/group) is non-null, - # then each non-null value corresponds to - # ${u}/${g}; consider this a filter to - # apply to unames, as well as un_gn - if (vu and vu != un) or (vg and vg != gn): - continue - for uname in unames + ([un] if vu or vg else []): - if uname == "${u}": - uname = vu or un - elif uname in ("${g}", "@${g}"): - uname = vg or gn + for src, dst, vu, vg in vols: + unames2 = set(unames) - if vu and vu != uname: - continue + if "${u}" in unames: + if not vu: + t = "cannot use ${u} in accs of volume [%s] because the volume url does not contain ${u}" + raise CfgEx(t % (src,)) + unames2.add(vu) + + if "@${g}" in unames: + if not vg: + t = "cannot use @${g} in accs of volume [%s] because the volume url does not contain @${g}" + raise CfgEx(t % (src,)) + unames2.update([un for un, gn in un_gn if gn == vg]) + + if "${g}" in unames: + t = 'the accs of volume [%s] contains "${g}" but the only supported way of specifying that is "@${g}"' + raise CfgEx(t % (src,)) - if uname: - unames2.add(uname) + unames2.discard("${u}") + unames2.discard("@${g}") self._read_vol_str(lvl, list(unames2), axs[dst]) diff --git a/tests/res/idp/6.conf b/tests/res/idp/6.conf new file mode 100644 index 00000000..4d926f75 --- /dev/null +++ b/tests/res/idp/6.conf @@ -0,0 +1,24 @@ +# -*- mode: yaml -*- +# vim: ft=yaml: + +[global] + idp-h-usr: x-idp-user + idp-h-grp: x-idp-group + +[/get/${u}] + /get/${u} + accs: + g: * + r: ${u}, @su + m: @su + +[/priv/${u}] + /priv/${u} + accs: + r: ${u}, @su + m: @su + +[/team/${g}/${u}] + /team/${g}/${u} + accs: + r: @${g} diff --git a/tests/test_idp.py b/tests/test_idp.py index 0354f89d..03fb628f 100644 --- a/tests/test_idp.py +++ b/tests/test_idp.py @@ -15,6 +15,16 @@ def dump(self, vfs): print(json.dumps(vfs, indent=4, sort_keys=True, default=lambda o: o.__dict__)) def log(self, src, msg, c=0): + m = "%s" % (msg,) + if ( + "warning: filesystem-path does not exist:" in m + or "you are sharing a system directory:" in m + or "reinitializing due to new user from IdP:" in m + or m.startswith("hint: argument") + or (m.startswith("loaded ") and " config files:" in m) + ): + return + print(("[%s] %s" % (src, msg)).encode("ascii", "replace").decode("ascii")) def nav(self, au, vp): @@ -30,14 +40,16 @@ def assertAxs(self, axs, expected): self.assertEqual(unpacked, expected + [[]] * pad) def assertAxsAt(self, au, vp, expected): - self.assertAxs(self.nav(au, vp).axs, expected) + vn = self.nav(au, vp) + self.assertAxs(vn.axs, expected) def assertNodes(self, vfs, expected): got = list(sorted(vfs.nodes.keys())) self.assertEqual(got, expected) def assertNodesAt(self, au, vp, expected): - self.assertNodes(self.nav(au, vp), expected) + vn = self.nav(au, vp) + self.assertNodes(vn, expected) def prep(self): here = os.path.abspath(os.path.dirname(__file__)) @@ -140,6 +152,11 @@ def test_4(self): self.assertEqual(self.nav(au, "vg/iga1").realpath, "/g1-iga") self.assertEqual(self.nav(au, "vg/iga2").realpath, "/g2-iga") + au.idp_checkin(None, "iub", "iga") + self.assertAxsAt(au, "vu/iua", [["iua"]]) + self.assertAxsAt(au, "vg/iga1", [["iua", "iub"]]) + self.assertAxsAt(au, "vg/iga2", [["iua", "iub", "ua"]]) + def test_5(self): """ one IdP user in multiple groups @@ -169,3 +186,44 @@ def test_5(self): self.assertAxsAt(au, "g", [["iua"]]) self.assertAxsAt(au, "ga", [["iua"]]) self.assertAxsAt(au, "gb", [["iua"]]) + + def test_6(self): + """ + IdP volumes with anon-get and other users/groups (github#79) + """ + _, cfgdir, xcfg = self.prep() + au = AuthSrv(Cfg(c=[cfgdir + "/6.conf"], **xcfg), self.log) + + self.assertAxs(au.vfs.axs, []) + self.assertEqual(au.vfs.vpath, "") + self.assertEqual(au.vfs.realpath, "") + self.assertNodes(au.vfs, []) + + au.idp_checkin(None, "iua", "") + star = ["*", "iua"] + self.assertNodes(au.vfs, ["get", "priv"]) + self.assertAxsAt(au, "get/iua", [["iua"], [], [], [], star]) + self.assertAxsAt(au, "priv/iua", [["iua"], [], []]) + + au.idp_checkin(None, "iub", "") + star = ["*", "iua", "iub"] + self.assertNodes(au.vfs, ["get", "priv"]) + self.assertAxsAt(au, "get/iua", [["iua"], [], [], [], star]) + self.assertAxsAt(au, "get/iub", [["iub"], [], [], [], star]) + self.assertAxsAt(au, "priv/iua", [["iua"], [], []]) + self.assertAxsAt(au, "priv/iub", [["iub"], [], []]) + + au.idp_checkin(None, "iuc", "su") + star = ["*", "iua", "iub", "iuc"] + self.assertNodes(au.vfs, ["get", "priv", "team"]) + self.assertAxsAt(au, "get/iua", [["iua", "iuc"], [], ["iuc"], [], star]) + self.assertAxsAt(au, "get/iub", [["iub", "iuc"], [], ["iuc"], [], star]) + self.assertAxsAt(au, "get/iuc", [["iuc"], [], ["iuc"], [], star]) + self.assertAxsAt(au, "priv/iua", [["iua", "iuc"], [], ["iuc"]]) + self.assertAxsAt(au, "priv/iub", [["iub", "iuc"], [], ["iuc"]]) + self.assertAxsAt(au, "priv/iuc", [["iuc"], [], ["iuc"]]) + self.assertAxsAt(au, "team/su/iuc", [["iuc"]]) + + au.idp_checkin(None, "iud", "su") + self.assertAxsAt(au, "team/su/iuc", [["iuc", "iud"]]) + self.assertAxsAt(au, "team/su/iud", [["iuc", "iud"]])