diff --git a/src/oidcop/session/database.py b/src/oidcop/session/database.py index 47171d3e..bf75991b 100644 --- a/src/oidcop/session/database.py +++ b/src/oidcop/session/database.py @@ -126,46 +126,46 @@ def get(self, path: List[str]) -> Union[SessionInfo, Grant]: def delete(self, path: List[str]): uid, client_id, grant_id = self._eval_path(path) - try: - _user_info = self.db[uid] - except KeyError: - pass - else: - if client_id: - if client_id in _user_info.subordinate: - try: - _client_info = self.db[self.session_key(uid, client_id)] - except KeyError: - pass - else: - if grant_id: - if grant_id in _client_info.subordinate: - try: - self.db.__delitem__( - self.session_key(uid, client_id, grant_id)) - except KeyError: - pass - _client_info.subordinate.remove(grant_id) - else: - for grant_id in _client_info.subordinate: - self.db.__delitem__( - self.session_key(uid, client_id, grant_id)) - _client_info.subordinate = [] - - if len(_client_info.subordinate) == 0: - self.db.__delitem__(self.session_key(uid, client_id)) - _user_info.subordinate.remove(client_id) - else: - self.db[client_id] = _client_info - - if len(_user_info.subordinate) == 0: - self.db.__delitem__(uid) - else: - self.db[uid] = _user_info - else: - pass + + if uid not in self.db: + return + elif not client_id: + self.db.__delitem__(uid) + return + + _user_info = self.db[uid] + skey_uid_client = self.session_key(uid, client_id) + skey_uid_client_grant = self.session_key( + uid, client_id, grant_id or '' + ) + + if client_id not in _user_info.subordinate: + self.db.__delitem__(client_id) + return + + elif skey_uid_client in self.db: + _client_info = self.db[skey_uid_client] + if grant_id: + if skey_uid_client_grant in self.db: + self.db.__delitem__(skey_uid_client_grant) + if grant_id in _client_info.subordinate: + _client_info.subordinate.remove(grant_id) else: - self.db.__delitem__(uid) + for grant_id in _client_info.subordinate: + if skey_uid_client_grant in self.db: + self.db.__delitem__(skey_uid_client_grant) + _client_info.subordinate = [] + + if len(_client_info.subordinate) == 0: + self.db.__delitem__(skey_uid_client) + _user_info.subordinate.remove(client_id) + else: + self.db[client_id] = _client_info + + if len(_user_info.subordinate) == 0: + self.db.__delitem__(uid) + else: + self.db[uid] = _user_info def update(self, path: List[str], new_info: dict): _info = self.get(path)