Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ansys/rep/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,7 @@
__version__,
__version_no_dots__,
)
from .auth import AuthApi
from .client import Client
from .exceptions import APIError, ClientError, REPError
from .jms import JmsApi, ProjectApi
161 changes: 113 additions & 48 deletions ansys/rep/client/auth/api/auth_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,34 @@ class AuthApi:
users as well as modify or delete existing ones. Non-admin users are only allowed
to query the list of existing users.

Args:
rep_url (str): The base path for the server to call, e.g. "https://127.0.0.1:8443/rep".
username (str): Username
password (str): Password

Example:

>>> from ansys.rep.client.auth import Client, User
>>> cl = Client(
rep_url="https://127.0.0.1:8443/rep/", username="repadmin", password="repadmin"
)
>>> existing_users = cl.get_users()
>>> new_user = User(username='test_user', password='dummy',
>>> email='test_user@test.com', fullname='Test User',
>>> is_admin=False)
>>> cl.create_user(new_user)
Parameters
----------
client : Client
A REP client object.

Examples
--------

Get users whose first name contains "john":

>>> from ansys.rep.client import Client
>>> from ansys.rep.client.auth import AuthApi, User
>>> cl = Client(
... rep_url="https://127.0.0.1:8443/rep/", username="repadmin", password="repadmin"
... )
>>> auth_api = AuthApi(cl)
>>> users = auth_api.get_users(firstName="john", exact=False)

Create a new user:

>>> new_user = User(
... username="new_user",
... password="dummy",
... email=f"new_user@test.com",
... first_name="New",
... last_name="User",
... )
>>> auth_api.create_user(new_user)

"""

Expand All @@ -45,11 +57,63 @@ def __init__(self, client):

@property
def url(self):
"""Returns the API url"""
return f"{self.client.rep_url}/auth/"

def get_users(self, as_objects=True) -> List[User]:
"""Return a list of users."""
return get_users(self.client, as_objects=as_objects)
@property
def keycloak_admin_client(self) -> KeycloakAdmin:
"""Returns an authenticated client for the Keycloak Admin API"""
return _admin_client(self.client)

def get_users(self, as_objects=True, **query_params) -> List[User]:
"""Return users, filtered according to query parameters

Examples of query parameters are:
- `username`
- `firstName`
- `lastName`
- `exact`

Pagination is also supported using the `first` and `max` parameters.

For the complete list of supported query parameters, please
refer to the Keycloak API documentation.
"""
return get_users(self.keycloak_admin_client, as_objects=as_objects, **query_params)

def get_user(self, id: str) -> User:
"""Returns the user representation for a given user id."""
return get_user(self.keycloak_admin_client, id)

def get_user_groups(self, id: str) -> List[str]:
"""Get name of groups the user belongs to"""
return [g["name"] for g in self.keycloak_admin_client.get_user_groups(id)]

def get_user_realm_roles(self, id: str) -> List[str]:
"""Get name of realm roles for a user"""
return [r["name"] for r in self.keycloak_admin_client.get_realm_roles_of_user(id)]

def user_is_admin(self, id: str) -> bool:
"""Check whether the user is system admin"""

from ansys.rep.client.jms import JmsApi

# the admin keys are configurable settings of JMS
# they need to be queried, can't be hardcoded
jms_api = JmsApi(self.client)
admin_keys = jms_api.get_api_info()["settings"]["admin_keys"]

# query user groups and roles and store in the same format
# as admin keys
user_keys = [f"groups.{name}" for name in self.get_user_groups(id)] + [
f"roles.{name}" for name in self.get_user_realm_roles(id)
]

# match admin and user keys
if set(admin_keys).intersection(user_keys):
return True

return False

def create_user(self, user: User, as_objects=True) -> User:
"""Create a new user.
Expand All @@ -58,7 +122,7 @@ def create_user(self, user: User, as_objects=True) -> User:
user (:class:`ansys.rep.client.auth.User`): A User object. Defaults to None.
as_objects (bool, optional): Defaults to True.
"""
return create_user(self.client, user, as_objects=as_objects)
return create_user(self.keycloak_admin_client, user, as_objects=as_objects)

def update_user(self, user: User, as_objects=True) -> User:
"""Modify an existing user.
Expand All @@ -67,15 +131,15 @@ def update_user(self, user: User, as_objects=True) -> User:
user (:class:`ansys.rep.client.auth.User`): A User object. Defaults to None.
as_objects (bool, optional): Defaults to True.
"""
return update_user(self.client, user, as_objects=as_objects)
return update_user(self.keycloak_admin_client, user, as_objects=as_objects)

def delete_user(self, user: User) -> None:
"""Delete an existing user.

Args:
user (:class:`ansys.rep.client.auth.User`): A User object. Defaults to None.
"""
return delete_user(self.client, user)
return self.keycloak_admin_client.delete_user(user.id)


def _admin_client(client):
Expand All @@ -96,23 +160,29 @@ def _admin_client(client):
return keycloak_admin


def get_users(client, as_objects=True):
admin = _admin_client(client)
data = admin.get_users({})
for d in data:
uid = d["id"]
groups = admin.get_user_groups(uid)
d["groups"] = [g["name"] for g in groups]
realm_roles = admin.get_realm_roles_of_user(uid)
d["realm_roles"] = [r["name"] for r in realm_roles]
d["is_admin"] = d # Force admin check
def get_users(admin_client: KeycloakAdmin, as_objects=True, **query_params):

users = admin_client.get_users(query=query_params)

if not as_objects:
return users

schema = UserSchema(many=True)
users = schema.load(data)
return users
return schema.load(users)


def get_user(admin_client: KeycloakAdmin, id: str, as_objects=True):

user = admin_client.get_user(user_id=id)

if not as_objects:
return user

def create_user(client, user, as_objects=True):
schema = UserSchema(many=False)
return schema.load(user)


def create_user(admin_client: KeycloakAdmin, user: User, as_objects=True):
schema = UserSchema(many=False)
data = schema.dump(user)

Expand All @@ -126,14 +196,11 @@ def create_user(client, user, as_objects=True):
]
data["enabled"] = True

admin = _admin_client(client)
uid = admin.create_user(data)
data = admin.get_user(uid)
user = schema.load(data)
return user
uid = admin_client.create_user(data)
return get_user(admin_client, uid, as_objects)


def update_user(client, user, as_objects=True):
def update_user(admin_client: KeycloakAdmin, user: User, as_objects=True):
schema = UserSchema(many=False)
data = schema.dump(user)

Expand All @@ -146,12 +213,10 @@ def update_user(client, user, as_objects=True):
}
]

admin = _admin_client(client)
data = admin.update_user(user.id, data)
user = schema.load(data)
return user
data = admin_client.update_user(user.id, data)

if not as_objects:
return data

def delete_user(client, user):
admin = _admin_client(client)
admin.delete_user(user.id)
user = schema.load(data)
return user
95 changes: 5 additions & 90 deletions ansys/rep/client/auth/authenticate.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,69 +15,18 @@
log = logging.getLogger(__name__)


def get_oidc_response(
server_url: str,
*,
grant_type: str,
client_id: str,
client_secret: str = None,
username: str = None,
password: str = None,
realm: str = "rep",
scope="openid",
refresh_token=None,
timeout=10,
):
session = requests.Session()
session.verify = False
session.headers = ({"content-type": "application/x-www-form-urlencoded"},)

path = f"auth/realms/{realm}"
if path in server_url:
auth_url = server_url
else:
auth_url = f"{server_url}/{path}"
token_url = f"{auth_url}/protocol/openid-connect/token"

data = {
"client_id": client_id,
"grant_type": grant_type,
"scope": scope,
}
if client_secret is not None:
data["client_secret"] = client_secret
if username is not None:
data["username"] = username
if password is not None:
data["password"] = password
if refresh_token is not None:
data["refresh_token"] = refresh_token

log.debug(
f"Retrieving access token for client {client_id} from {auth_url} using {grant_type} grant"
)
r = session.post(token_url, data=data, timeout=timeout)

if r.status_code != 200:
raise RuntimeError(
f"""Failed to retrieve access token for client {client_id} from {token_url}
using {grant_type} grant, status code {r.status_code}: {r.content.decode()}"""
)

return r.json()


def authenticate(
url: str = "https://127.0.0.1:8443/rep",
realm: str = "rep",
grant_type: str = "password",
scope="openid",
client_id: str = "rep-cli",
client_secret: str = None,
username: str = "repadmin",
password: str = "repadmin",
username: str = None,
password: str = None,
refresh_token: str = None,
timeout: float = 10.0,
**kwargs,
):
"""
Authenticate user with either password or refresh token against REP authentication service.
Expand Down Expand Up @@ -124,46 +73,12 @@ def authenticate(
if refresh_token is not None:
data["refresh_token"] = refresh_token

data.update(**kwargs)

log.debug(
f"Retrieving access token for client {client_id} from {auth_url} using {grant_type} grant"
)
r = session.post(token_url, data=data, timeout=timeout)

raise_for_status(r)
# if r.status_code != 200:
# raise ClientError(f"Failed to retrieve access token for client {client_id} from
# {token_url} using {grant_type} grant, status code {r.status_code}: {r.content.decode()}", **d)

return r.json()

# auth_data={}
# if refresh_token:
# auth_data = {'client_id': client_id,
# 'grant_type': 'refresh_token',
# 'scope': scope,
# 'refresh_token' : refresh_token}
# log.debug("Authenticate on %s with refresh token" % auth_api_url)
# elif password:
# auth_data = {'client_id': client_id,
# 'grant_type': 'password',
# 'scope': scope,
# 'username': username,
# 'password': password}
# log.debug("Authenticate on %s with user %s and password" % (auth_api_url,username))

# with requests.Session() as session:
# # Disable SSL certificate verification and warnings about it
# session.verify = False
# requests.packages.urllib3.disable_warnings(
# requests.packages.urllib3.exceptions.InsecureRequestWarning
# )

# # Set basic content type
# session.headers.update({'content-type': 'application/x-www-form-urlencoded'})

# # Query tokens
# resp = session.post("%s/oauth/token" % auth_api_url, data=auth_data, timeout=timeout)
# raise_for_status(resp)
# log.debug("Authentication successful, returning tokens")

# return resp.json()
11 changes: 0 additions & 11 deletions ansys/rep/client/auth/resource/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ class User(Object):
Last name
email : str, optional
E-mail address (optional).
groups : str
Groups the user belongs to
realm_roles : str
Realm roles assigned to the user
is_admin Whether the user has admin rights or not.

"""

Expand All @@ -39,9 +34,6 @@ def __init__(self,
first_name=missing,
last_name=missing,
email=missing,
groups=missing,
realm_roles=missing,
is_admin=missing,
**kwargs
):
self.id = id
Expand All @@ -50,9 +42,6 @@ def __init__(self,
self.first_name = first_name
self.last_name = last_name
self.email = email
self.groups = groups
self.realm_roles = realm_roles
self.is_admin = is_admin

super().__init__(**kwargs)

Expand Down
Loading