Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions modules/weko-accounts/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,9 @@ def test_check_in(self, app, mocker):
shibuser.user.roles = MagicMock()

with app.app_context():
# テストでは_find_organization_nameは常にFalseを返すようにモック化
mocker.patch.object(shibuser, '_find_organization_name', return_value=False)

# assign_user_roleがFalseを返す場合のテスト
mocker.patch.object(shibuser, 'assign_user_role', return_value=(False, "test_error"))
result = shibuser.check_in()
Expand Down Expand Up @@ -451,6 +454,58 @@ def test_get_roles_to_add(self, app, mocker):
with pytest.raises(KeyError, match='WEKO_ACCOUNTS_IDP_ENTITY_ID is missing in config'):
shibuser._get_roles_to_add()

#.tox/c1/bin/pytest --cov=weko_accounts tests/test_api.py::TestShibUser::test_find_organization_name -vv -s --cov-branch --cov-report=html --basetemp=/code/modules/weko-workflow/.tox/c1/tmp
def test_find_organization_name(self, shib_user_a, app, mocker):
with app.app_context():
with patch('weko_accounts.api.db.session') as mock_db_session, \
patch('weko_accounts.api.current_app') as mock_current_app:

mock_current_app.config = {
"WEKO_ACCOUNTS_GAKUNIN_ROLE": {
'defaultRole': 'Contributor',
'organizationName': ['Gakunin', 'Gakunin2']
},
"WEKO_ACCOUNTS_ORTHROS_INSIDE_ROLE": {
'defaultRole': 'Repository Administrator',
'organizationName': ['Orthros']
},
"WEKO_ACCOUNTS_ORTHROS_OUTSIDE_ROLE": {
'defaultRole': 'Community Administrator',
'organizationName': ['OutsideOrthros']
},
"WEKO_ACCOUNTS_EXTRA_ROLE": {
'defaultRole': 'None',
'organizationName': ['Extra']
}
}

group_ids = ['test_group_id']

# 学認IdPのorganizationNameに登録がある場合のテスト
mocker.patch("weko_accounts.api.ShibUser.get_organization_from_api", return_value="Gakunin2")
result = shib_user_a._find_organization_name(group_ids)
assert result == True

# 機関内のOrthrosのorganizationNameに登録がある場合のテスト
mocker.patch("weko_accounts.api.ShibUser.get_organization_from_api", return_value="Orthros")
result = shib_user_a._find_organization_name(group_ids)
assert result == True

# 機関外のOrthrosのorganizationNameに登録がある場合のテスト
mocker.patch("weko_accounts.api.ShibUser.get_organization_from_api", return_value="OutsideOrthros")
result = shib_user_a._find_organization_name(group_ids)
assert result == True

# その他のorganizationNameに登録がある場合のテスト
mocker.patch("weko_accounts.api.ShibUser.get_organization_from_api", return_value="Extra")
result = shib_user_a._find_organization_name(group_ids)
assert result == True

# organizationNameに登録がない場合のテスト
mocker.patch("weko_accounts.api.ShibUser.get_organization_from_api", return_value="invalid")
result = shib_user_a._find_organization_name(group_ids)
assert result == False

#.tox/c1/bin/pytest --cov=weko_accounts tests/test_api.py::TestShibUser::test_assign_roles_to_user -vv -s --cov-branch --cov-report=html --basetemp=/code/modules/weko-workflow/.tox/c1/tmp
def test_assign_roles_to_user(self, shib_user_a, app, mocker):
with app.app_context():
Expand Down Expand Up @@ -557,6 +612,37 @@ def test_assign_roles_to_user_exception(self, shib_user_a, app, mocker):
assert mock_datastore.add_role_to_user.call_count == 0
assert mock_db_session.commit.call_count == 1
assert mock_db_session.rollback.call_count == 1

# .tox/c1/bin/pytest --cov=weko_accounts tests/test_api.py::TestShibUser::test_get_ouganization_from_api -vv -s --cov-branch --cov-report=term --basetemp=/code/modules/weko-workflow/.tox/c1/tmp
def test_get_ouganization_from_api(self, app):
"""
PeopleAPIからorganization_nameを取得するメソッドテスト
"""
test_response = {
"entry": [
{
"organizations": [
{"type": "organization", "value": {
"name": "Orthros"
}
}
]
}
]
}
group_id = "test_group_id"
shibuser = ShibUser({})
with app.app_context():
with patch('requests.get') as mock_get, \
patch('weko_accounts.api.current_app') as mock_current_app:
# モックが返すレスポンスを設定
mock_get.return_value.status_code = 200
mock_get.return_value.json = lambda: test_response

# ShibUserクラスのメソッドを呼び出し、結果を確認
result = shibuser.get_organization_from_api(group_id)
assert result == "Orthros" # 期待値を比較


# @classmethod
# def shib_user_logout(cls):
Expand Down
3 changes: 2 additions & 1 deletion modules/weko-accounts/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
_redirect_method,
find_user_by_email
shib_sp_login,
find_user_by_email
)
from weko_admin.models import AdminSettings

Expand Down Expand Up @@ -379,7 +380,7 @@ def test_shib_sp_login(client, redis_connect,mocker, db, users):
# Check if shib_eppn is not included in the blocked user list
try:
db.session.add(AdminSettings(
id=6,
id=11,
name="blocked_user_settings",
settings='{"blocked_ePPNs": ["ePPN1", "ePPN2", "ePPN3", "ePPN5", "ePPP*"]}'
))
Expand Down
4 changes: 2 additions & 2 deletions modules/weko-accounts/weko_accounts/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def index(self):
roles[key] = new_roles[key]
flash(_(f'{key.replace("_", " ").title()} was updated.'), category='success')
AdminSettings.update('default_role_settings', roles)

# 属性マッピングの更新
for key in attributes:
if attributes[key] != new_attributes[key]:
Expand All @@ -113,7 +113,7 @@ def index(self):
_('Blocked user list was updated.'),
category='success')
block_user_list = str(new_eppn_list).replace('"', '\\"')

self.get_latest_current_app()

return self.render(
Expand Down
62 changes: 61 additions & 1 deletion modules/weko-accounts/weko_accounts/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
from .models import ShibbolethUser
from weko_index_tree.models import Index

import urllib.parse
import requests


_datastore = LocalProxy(lambda: current_app.extensions['security'].datastore)


Expand Down Expand Up @@ -287,7 +291,8 @@ def check_in(self):
try:
if current_app.config['WEKO_ACCOUNTS_SHIB_BIND_GAKUNIN_MAP_GROUPS']:
roles_add = self._get_roles_to_add()
self._assign_roles_to_user(roles_add)
if not self._find_organization_name(roles_add):
self._assign_roles_to_user(roles_add)
except Exception as ex:
return str(ex)

Expand Down Expand Up @@ -327,6 +332,40 @@ def _get_roles_to_add(self):
raise ex

return shib_attr_is_member_of

def _find_organization_name(self, group_ids):
"""
事前に各ロールにorganization_nameが登録されていた場合、そのロールを割り当てる
"""
try:
with db.session.begin_nested():
for group in group_ids:
group_id = urllib.parse.quote(group)
organization_name = self.get_organization_from_api(group_id)

setting_role = ""
if organization_name in current_app.config["WEKO_ACCOUNTS_GAKUNIN_ROLE"]["organizationName"]:
setting_role = current_app.config["WEKO_ACCOUNTS_GAKUNIN_ROLE"]["defaultRole"].replace(" ", "_")
elif organization_name in current_app.config["WEKO_ACCOUNTS_ORTHROS_INSIDE_ROLE"]["organizationName"]:
setting_role = current_app.config["WEKO_ACCOUNTS_ORTHROS_INSIDE_ROLE"]["defaultRole"].replace(" ", "_")
elif organization_name in current_app.config["WEKO_ACCOUNTS_ORTHROS_OUTSIDE_ROLE"]["organizationName"]:
setting_role = current_app.config["WEKO_ACCOUNTS_ORTHROS_OUTSIDE_ROLE"]["defaultRole"].replace(" ", "_")
elif organization_name in current_app.config["WEKO_ACCOUNTS_EXTRA_ROLE"]["organizationName"]:
setting_role = current_app.config["WEKO_ACCOUNTS_EXTRA_ROLE"]["defaultRole"].replace(" ", "_")

if len(setting_role) > 0:
role = Role.query.filter_by(name=setting_role).one_or_none()
_datastore.add_role_to_user(self.user, role)
self.shib_user.shib_roles.append(role)
return True

db.session.commit()
return False
except Exception as ex:
current_app.logger.error(f"Error assigning roles: {ex}")
db.session.rollback()
raise ex


def _assign_roles_to_user(self, map_group_names):
try:
Expand Down Expand Up @@ -372,6 +411,27 @@ def _assign_roles_to_user(self, map_group_names):
db.session.rollback()
raise ex

def get_organization_from_api(self, group_id):
url = f"https://cg.gakunin.jp/api/people/@me/{group_id}"
headers = {
'Content-Type': 'application/json',
}

response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
entries = data.get("entry", [])
for entry in entries:
organizations = entry.get("organizations", [])
for organization in organizations:
if organization.get("type") == "organization":
return organization.get("value", {}).get("name")
raise ValueError(f"Organization not found in response: {data}")
else:
raise ValueError(f"API returned error: {response.status_code}")



# ! NEED RELATION SHIB_ATTR
# check_license, error = self.valid_site_license()
# if not check_license:
Expand Down
1 change: 1 addition & 0 deletions modules/weko-accounts/weko_accounts/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,4 @@

WEKO_INNDEXTREE_GAKUNIN_GROUP_DEFAULT_CONTRIBUTE_PERMISSION = False
"""投稿権限のデフォルト権限を設定する"""

6 changes: 3 additions & 3 deletions modules/weko-accounts/weko_accounts/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ def _adjust_shib_admin_DB():
"""
Create or Update Shibboleth Admin database table.
"""
if current_app.config.get('TESTING', False): # テスト環境では何もしない
return
if current_app.config.get('TESTING', False): # テスト環境では何もしない
return

with _app.app_context():
if AdminSettings.query.filter_by(name='blocked_user_settings').first() is None:
Expand Down Expand Up @@ -540,4 +540,4 @@ def dbsession_clean(exception):
db.session.commit()
except:
db.session.rollback()
db.session.remove()
db.session.remove()
Loading