Skip to content

Commit

Permalink
more flexible token spec for uftpfs (closes #34)
Browse files Browse the repository at this point in the history
  • Loading branch information
BerndSchuller committed Oct 17, 2023
1 parent 71b54fa commit bc67153
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 4 deletions.
28 changes: 24 additions & 4 deletions pyunicore/uftpfs.py
@@ -1,4 +1,6 @@
""" UFTP extension to the pyfilesystem FTPFS class """
from os import getenv

from fs.ftpfs import FTPFS
from fs.opener import Opener

Expand Down Expand Up @@ -68,12 +70,30 @@ def _parse(self, resource_url):
base_dir = tok2[1] if len(tok) > 1 else "/"
return auth_url, base_dir

def open_fs(self, fs_url, parse_result, writeable, create, cwd):
auth_url, base_dir = self._parse(parse_result.resource)
cred = uc_credentials.create_credential(
def _read_token(self, token_spec):
token = None
if token_spec:
if token_spec.startswith("@@"):
env_name = token_spec[2:]
token = getenv(env_name, None)
elif token_spec.startswith("@"):
file_name = token_spec[1:]
with open(file_name) as f:
token = f.read().strip()
else:
token = token_spec
return token

def _create_credential(self, parse_result):
token = self._read_token(parse_result.params.get("token", None))
return uc_credentials.create_credential(
username=parse_result.username,
password=parse_result.password,
token=parse_result.params.get("token", None),
token=token,
identity=parse_result.params.get("identity", None),
)

def open_fs(self, fs_url, parse_result, writeable, create, cwd):
auth_url, base_dir = self._parse(parse_result.resource)
cred = self._create_credential(parse_result)
return UFTPFS(auth_url, cred, base_dir)
31 changes: 31 additions & 0 deletions tests/unit/test_uftpfs.py
@@ -1,4 +1,5 @@
import unittest
from os import environ

from pyunicore.uftpfs import UFTPOpener

Expand All @@ -14,6 +15,36 @@ def test_parse_url(self):
self.assertEqual("https://localhost:9000/rest/auth/TEST", auth_url)
self.assertEqual("/data-dir", base_dir)

def test_credential_username_password(self):
print("*** test_credential_username_password")
parse_result = P()
cred = UFTPOpener()._create_credential(parse_result)
self.assertEqual(cred.username, "demouser")
self.assertEqual(cred.password, "test123")

def test_credential_token(self):
print("*** test_credential_token")
parse_result = P()
parse_result.username = None
parse_result.password = None
parse_result.params["token"] = "some_token"
cred = UFTPOpener()._create_credential(parse_result)
self.assertEqual(cred.token, "some_token")
parse_result.params["token"] = "@tests/unit/token.txt"
cred = UFTPOpener()._create_credential(parse_result)
self.assertEqual(cred.token, "some_token")
parse_result.params["token"] = "@@MY_VAR"
environ["MY_VAR"] = "some_token"
cred = UFTPOpener()._create_credential(parse_result)
self.assertEqual(cred.token, "some_token")


class P:
def __init__(self):
self.username = "demouser"
self.password = "test123"
self.params = {}


if __name__ == "__main__":
unittest.main()
2 changes: 2 additions & 0 deletions tests/unit/token.txt
@@ -0,0 +1,2 @@

some_token

0 comments on commit bc67153

Please sign in to comment.