diff --git a/.talismanrc b/.talismanrc index 3a7877a..b443cb3 100644 --- a/.talismanrc +++ b/.talismanrc @@ -394,9 +394,13 @@ fileignoreconfig: - filename: tests/unit/contentstack/test_contentstack.py checksum: 98503cbd96cb546a19aed037a6ca28ef54fcea312efcd9bac1171e43760f6e86 - filename: contentstack_management/contentstack.py - checksum: 520f6fa236569a05579011fa67cb29381f187616d96526ecdfad5ec8255231a5 + checksum: 591978d70ecbe5fc3e6587544e9c112a6cd85fd8da2051b48ff87ab6a2e9eb57 - filename: tests/unit/test_oauth_handler.py checksum: 8b6853ba64c3de4f9097ca506719c5e33c7468ae5985b8adcda3eb6461d76be5 - filename: contentstack_management/oauth/oauth_handler.py checksum: e33cfd32d90c0553c4959c0d266fef1247cd0e0fe7bbe85cae98bb205e62c70e +- filename: tests/unit/user_session/test_user_session_totp.py + checksum: 0db30c5a306783b10d345d73cff3c61490d7cbc47273623df47e6849c3e97002 +- filename: tests/unit/contentstack/test_totp_login.py + checksum: cefad0ddc1a2db1bf59d6e04501c4381acc8b44fad1e5e2e24c06e33d827c859 version: "1.0" diff --git a/contentstack_management/contentstack.py b/contentstack_management/contentstack.py index 9d951b3..c9e9cb8 100644 --- a/contentstack_management/contentstack.py +++ b/contentstack_management/contentstack.py @@ -1,4 +1,6 @@ from enum import Enum +import os +import pyotp from ._api_client import _APIClient from contentstack_management.organizations import organization from contentstack_management.stack import stack @@ -36,12 +38,14 @@ def __init__(self, host: str = 'api.contentstack.io', scheme: str = 'https://', region: Region = Region.US.value, version='v3', timeout=2, max_retries: int = 18, early_access: list = None, oauth_config: dict = None, **kwargs): self.endpoint = 'https://api.contentstack.io/v3/' - if region is not None and host is not None and region is not Region.US.value: - self.endpoint = f'{scheme}{region}-{host}/{version}/' - if region is not None and host is None and region is not Region.US.value: - host = 'api.contentstack.com' - self.endpoint = f'{scheme}{region}-{host}/{version}/' - if host is not None and region is None: + + if region is not None and region is not Region.US.value: + if host is not None and host != 'api.contentstack.io': + self.endpoint = f'{scheme}{region}-api.{host}/{version}/' + else: + host = 'api.contentstack.com' + self.endpoint = f'{scheme}{region}-{host}/{version}/' + elif host is not None and host != 'api.contentstack.io': self.endpoint = f'{scheme}{host}/{version}/' if headers is None: headers = {} @@ -91,9 +95,36 @@ def __init__(self, host: str = 'api.contentstack.io', scheme: str = 'https://', ------------------------------- """ - def login(self, email: str, password: str, tfa_token: str = None): - return user_session.UserSession(self.client).login(email, password, tfa_token) - pass + def login(self, email: str, password: str, tfa_token: str = None, mfa_secret: str = None): + """ + Login to Contentstack with optional TOTP support. + + :param email: User's email address + :param password: User's password + :param tfa_token: Optional two-factor authentication token + :param mfa_secret: Optional MFA secret for automatic TOTP generation. + If not provided, will check MFA_SECRET environment variable + :return: Response object from the login request + """ + final_tfa_token = tfa_token + + if not mfa_secret: + mfa_secret = os.getenv('MFA_SECRET') + + if mfa_secret and not tfa_token: + final_tfa_token = self._generate_totp(mfa_secret) + + return user_session.UserSession(self.client).login(email, password, final_tfa_token) + + def _generate_totp(self, secret: str) -> str: + """ + Generate a Time-Based One-Time Password (TOTP) from the provided secret. + + :param secret: The MFA secret key for TOTP generation + :return: The current TOTP code as a string + """ + totp = pyotp.TOTP(secret) + return totp.now() def logout(self): return user_session.UserSession(client=self.client).logout() diff --git a/contentstack_management/user_session/user_session.py b/contentstack_management/user_session/user_session.py index 8c05ca7..91588cd 100644 --- a/contentstack_management/user_session/user_session.py +++ b/contentstack_management/user_session/user_session.py @@ -63,7 +63,7 @@ def login(self, email=None, password=None, tfa_token=None): } if tfa_token is not None: - data["user"]["tf_token"] = tfa_token + data["user"]["tfa_token"] = tfa_token data = json.dumps(data) response = self.client.post(_path, headers=self.client.headers, data=data, json_data=None) diff --git a/requirements.txt b/requirements.txt index 72b0ca9..cc66228 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ requests>=2.32.0,<3.0.0 pylint>=2.0.0 bson>=0.5.9,<1.0.0 requests-toolbelt>=1.0.0,<2.0.0 +pyotp==2.9.0 diff --git a/tests/unit/contentstack/test_contentstack.py b/tests/unit/contentstack/test_contentstack.py index 3ebef2d..e12cb26 100644 --- a/tests/unit/contentstack/test_contentstack.py +++ b/tests/unit/contentstack/test_contentstack.py @@ -8,25 +8,25 @@ class ContentstackRegionUnitTests(unittest.TestCase): def test_au_region(self): """Test that au region creates the correct endpoint URL""" client = contentstack_management.Client(authtoken='your_authtoken', region='au') - expected_endpoint = 'https://au-api.contentstack.io/v3/' + expected_endpoint = 'https://au-api.contentstack.com/v3/' self.assertEqual(client.endpoint, expected_endpoint) def test_gcp_eu_region(self): """Test that gcp-eu region creates the correct endpoint URL""" client = contentstack_management.Client(authtoken='your_authtoken', region='gcp-eu') - expected_endpoint = 'https://gcp-eu-api.contentstack.io/v3/' + expected_endpoint = 'https://gcp-eu-api.contentstack.com/v3/' self.assertEqual(client.endpoint, expected_endpoint) def test_azure_eu_region(self): """Test that azure-eu region creates the correct endpoint URL""" client = contentstack_management.Client(authtoken='your_authtoken', region='azure-eu') - expected_endpoint = 'https://azure-eu-api.contentstack.io/v3/' + expected_endpoint = 'https://azure-eu-api.contentstack.com/v3/' self.assertEqual(client.endpoint, expected_endpoint) def test_azure_na_region(self): """Test that azure-na region creates the correct endpoint URL""" client = contentstack_management.Client(authtoken='your_authtoken', region='azure-na') - expected_endpoint = 'https://azure-na-api.contentstack.io/v3/' + expected_endpoint = 'https://azure-na-api.contentstack.com/v3/' self.assertEqual(client.endpoint, expected_endpoint) def test_au_region_with_custom_host(self): @@ -34,9 +34,9 @@ def test_au_region_with_custom_host(self): client = contentstack_management.Client( authtoken='your_authtoken', region='au', - host='custom.contentstack.io' + host='example.com' ) - expected_endpoint = 'https://au-custom.contentstack.io/v3/' + expected_endpoint = 'https://au-api.example.com/v3/' self.assertEqual(client.endpoint, expected_endpoint) def test_gcp_eu_region_with_custom_host(self): @@ -46,19 +46,19 @@ def test_gcp_eu_region_with_custom_host(self): region='gcp-eu', host='custom.contentstack.io' ) - expected_endpoint = 'https://gcp-eu-custom.contentstack.io/v3/' + expected_endpoint = 'https://gcp-eu-api.custom.contentstack.io/v3/' self.assertEqual(client.endpoint, expected_endpoint) def test_au_region_enum_value(self): """Test that au region using enum value creates the correct endpoint URL""" client = contentstack_management.Client(authtoken='your_authtoken', region=Region.AU.value) - expected_endpoint = 'https://au-api.contentstack.io/v3/' + expected_endpoint = 'https://au-api.contentstack.com/v3/' self.assertEqual(client.endpoint, expected_endpoint) def test_gcp_eu_region_enum_value(self): """Test that gcp-eu region using enum value creates the correct endpoint URL""" client = contentstack_management.Client(authtoken='your_authtoken', region=Region.GCP_EU.value) - expected_endpoint = 'https://gcp-eu-api.contentstack.io/v3/' + expected_endpoint = 'https://gcp-eu-api.contentstack.com/v3/' self.assertEqual(client.endpoint, expected_endpoint) def test_au_region_with_custom_scheme(self): @@ -68,7 +68,7 @@ def test_au_region_with_custom_scheme(self): region='au', scheme='http://' ) - expected_endpoint = 'http://au-api.contentstack.io/v3/' + expected_endpoint = 'http://au-api.contentstack.com/v3/' self.assertEqual(client.endpoint, expected_endpoint) def test_gcp_eu_region_with_custom_scheme(self): @@ -78,7 +78,7 @@ def test_gcp_eu_region_with_custom_scheme(self): region='gcp-eu', scheme='http://' ) - expected_endpoint = 'http://gcp-eu-api.contentstack.io/v3/' + expected_endpoint = 'http://gcp-eu-api.contentstack.com/v3/' self.assertEqual(client.endpoint, expected_endpoint) def test_au_region_with_custom_version(self): @@ -88,7 +88,7 @@ def test_au_region_with_custom_version(self): region='au', version='v2' ) - expected_endpoint = 'https://au-api.contentstack.io/v2/' + expected_endpoint = 'https://au-api.contentstack.com/v2/' self.assertEqual(client.endpoint, expected_endpoint) def test_gcp_eu_region_with_custom_version(self): @@ -98,7 +98,7 @@ def test_gcp_eu_region_with_custom_version(self): region='gcp-eu', version='v2' ) - expected_endpoint = 'https://gcp-eu-api.contentstack.io/v2/' + expected_endpoint = 'https://gcp-eu-api.contentstack.com/v2/' self.assertEqual(client.endpoint, expected_endpoint) def test_au_region_headers(self): @@ -222,13 +222,13 @@ def test_us_region_default_behavior(self): def test_eu_region(self): """Test that eu region creates the correct endpoint URL""" client = contentstack_management.Client(authtoken='your_authtoken', region='eu') - expected_endpoint = 'https://eu-api.contentstack.io/v3/' + expected_endpoint = 'https://eu-api.contentstack.com/v3/' self.assertEqual(client.endpoint, expected_endpoint) def test_gcp_na_region(self): """Test that gcp-na region creates the correct endpoint URL""" client = contentstack_management.Client(authtoken='your_authtoken', region='gcp-na') - expected_endpoint = 'https://gcp-na-api.contentstack.io/v3/' + expected_endpoint = 'https://gcp-na-api.contentstack.com/v3/' self.assertEqual(client.endpoint, expected_endpoint) def test_region_with_none_host(self): diff --git a/tests/unit/contentstack/test_contentstack_integration.py b/tests/unit/contentstack/test_contentstack_integration.py index e1370ef..8adbcef 100644 --- a/tests/unit/contentstack/test_contentstack_integration.py +++ b/tests/unit/contentstack/test_contentstack_integration.py @@ -115,7 +115,7 @@ def test_region_endpoint_construction_logic(self): # Test non-US region with default host client = contentstack_management.Client(region='eu') - self.assertEqual(client.endpoint, 'https://eu-api.contentstack.io/v3/') + self.assertEqual(client.endpoint, 'https://eu-api.contentstack.com/v3/') # Skip custom host tests due to implementation issues # Test custom host without region diff --git a/tests/unit/contentstack/test_totp_login.py b/tests/unit/contentstack/test_totp_login.py new file mode 100644 index 0000000..bf6bd7e --- /dev/null +++ b/tests/unit/contentstack/test_totp_login.py @@ -0,0 +1,146 @@ +import unittest +import os +import sys +from unittest.mock import patch, MagicMock + +# Add the contentstack_management module to the path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..', '..')) + +import contentstack_management +from contentstack_management.contentstack import Client + + +class TOTPLoginTests(unittest.TestCase): + """Unit tests for TOTP login functionality in Contentstack Management Python SDK""" + + def setUp(self): + """Set up test fixtures before each test method""" + self.client = Client() + self.test_email = "test@example.com" + self.test_password = "test_password" + self.test_secret = "JBSWY3DPEHPK3PXP" # Standard test secret for TOTP + self.test_tfa_token = "123456" + + def tearDown(self): + """Clean up after each test method""" + # Clean up environment variables + if 'MFA_SECRET' in os.environ: + del os.environ['MFA_SECRET'] + + def test_login_method_signature_with_totp(self): + """Test that login method accepts TOTP parameters""" + client = contentstack_management.Client() + # Test that the method exists and can be called with the expected parameters + self.assertTrue(hasattr(client, 'login')) + self.assertTrue(callable(client.login)) + + # Test that the method accepts TOTP parameters without error + try: + client.login(self.test_email, self.test_password, tfa_token=self.test_tfa_token) + client.login(self.test_email, self.test_password, mfa_secret=self.test_secret) + client.login(self.test_email, self.test_password, tfa_token=self.test_tfa_token, mfa_secret=self.test_secret) + except Exception as e: + self.fail(f"Login method should accept TOTP parameters without error: {e}") + + def test_generate_totp_method(self): + """Test the _generate_totp method generates correct TOTP codes""" + # Test with a known secret and verify the TOTP generation + totp_code = self.client._generate_totp(self.test_secret) + + # Verify the TOTP code is a 6-digit string + self.assertIsInstance(totp_code, str) + self.assertEqual(len(totp_code), 6) + self.assertTrue(totp_code.isdigit()) + + def test_login_with_mfa_secret_generates_totp(self): + """Test that login with mfa_secret generates TOTP automatically""" + with patch.object(self.client, 'client') as mock_client: + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {'user': {'authtoken': 'test_token'}} + mock_client.post.return_value = mock_response + + # Mock the UserSession class + with patch('contentstack_management.user_session.user_session.UserSession') as mock_user_session: + mock_session_instance = MagicMock() + mock_session_instance.login.return_value = mock_response + mock_user_session.return_value = mock_session_instance + + # Mock the _generate_totp method to return a predictable value + with patch.object(self.client, '_generate_totp', return_value='654321') as mock_generate_totp: + result = self.client.login( + self.test_email, + self.test_password, + mfa_secret=self.test_secret + ) + + # Verify _generate_totp was called with the secret + mock_generate_totp.assert_called_once_with(self.test_secret) + + # Verify UserSession was called with generated TOTP + mock_session_instance.login.assert_called_once_with( + self.test_email, + self.test_password, + '654321' + ) + self.assertEqual(result, mock_response) + + def test_login_with_environment_variable(self): + """Test that login uses MFA_SECRET environment variable when mfa_secret is not provided""" + # Set environment variable + os.environ['MFA_SECRET'] = self.test_secret + + with patch.object(self.client, 'client') as mock_client: + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {'user': {'authtoken': 'test_token'}} + mock_client.post.return_value = mock_response + + # Mock the UserSession class + with patch('contentstack_management.user_session.user_session.UserSession') as mock_user_session: + mock_session_instance = MagicMock() + mock_session_instance.login.return_value = mock_response + mock_user_session.return_value = mock_session_instance + + # Mock the _generate_totp method + with patch.object(self.client, '_generate_totp', return_value='789012') as mock_generate_totp: + result = self.client.login(self.test_email, self.test_password) + + # Verify _generate_totp was called with the environment secret + mock_generate_totp.assert_called_once_with(self.test_secret) + + # Verify UserSession was called with generated TOTP + mock_session_instance.login.assert_called_once_with( + self.test_email, + self.test_password, + '789012' + ) + self.assertEqual(result, mock_response) + + def test_backward_compatibility(self): + """Test that existing login patterns continue to work (backward compatibility)""" + with patch.object(self.client, 'client') as mock_client: + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {'user': {'authtoken': 'test_token'}} + mock_client.post.return_value = mock_response + + # Mock the UserSession class + with patch('contentstack_management.user_session.user_session.UserSession') as mock_user_session: + mock_session_instance = MagicMock() + mock_session_instance.login.return_value = mock_response + mock_user_session.return_value = mock_session_instance + + # Test old pattern: client.login(email, password) + result1 = self.client.login(self.test_email, self.test_password) + + # Test old pattern: client.login(email, password, tfa_token) + result2 = self.client.login(self.test_email, self.test_password, self.test_tfa_token) + + # Both should work without errors + self.assertEqual(result1, mock_response) + self.assertEqual(result2, mock_response) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/unit/user_session/test_user_session_totp.py b/tests/unit/user_session/test_user_session_totp.py new file mode 100644 index 0000000..9107fee --- /dev/null +++ b/tests/unit/user_session/test_user_session_totp.py @@ -0,0 +1,144 @@ +import unittest +import os +import sys +import json +from unittest.mock import patch, MagicMock + +# Add the contentstack_management module to the path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..', '..')) + +from contentstack_management.user_session.user_session import UserSession + + +class UserSessionTOTPTests(unittest.TestCase): + """Unit tests for TOTP-related functionality in UserSession class""" + + def setUp(self): + """Set up test fixtures before each test method""" + self.mock_client = MagicMock() + self.user_session = UserSession(self.mock_client) + self.test_email = "test@example.com" + self.test_password = "test_password" + self.test_tfa_token = "123456" + + def test_login_with_tfa_token_uses_correct_field_name(self): + """Test that login with TFA token uses 'tfa_token' field name (not 'tf_token')""" + # Mock the client post method + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {'user': {'authtoken': 'test_token'}} + self.mock_client.post.return_value = mock_response + + # Call login with TFA token + result = self.user_session.login(self.test_email, self.test_password, self.test_tfa_token) + + # Verify the request was made correctly + self.mock_client.post.assert_called_once() + call_args = self.mock_client.post.call_args + + # Check the data - this is the critical test for the field name fix + expected_data = { + "user": { + "email": self.test_email, + "password": self.test_password, + "tfa_token": self.test_tfa_token # Should be "tfa_token", not "tf_token" + } + } + actual_data = json.loads(call_args[1]['data']) + self.assertEqual(actual_data, expected_data) + + # Verify the correct field name is used + self.assertIn("tfa_token", actual_data["user"]) + self.assertNotIn("tf_token", actual_data["user"]) + self.assertEqual(actual_data["user"]["tfa_token"], self.test_tfa_token) + + # Check the response + self.assertEqual(result, mock_response) + + def test_login_without_tfa_token(self): + """Test login without TFA token (original behavior)""" + # Mock the client post method + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {'user': {'authtoken': 'test_token'}} + self.mock_client.post.return_value = mock_response + + # Call login without TFA token + result = self.user_session.login(self.test_email, self.test_password) + + # Verify the request was made correctly + self.mock_client.post.assert_called_once() + call_args = self.mock_client.post.call_args + + # Check the data + expected_data = { + "user": { + "email": self.test_email, + "password": self.test_password + } + } + actual_data = json.loads(call_args[1]['data']) + self.assertEqual(actual_data, expected_data) + + # Check the response + self.assertEqual(result, mock_response) + + def test_login_with_none_tfa_token(self): + """Test login with None TFA token""" + # Mock the client post method + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {'user': {'authtoken': 'test_token'}} + self.mock_client.post.return_value = mock_response + + # Call login with None TFA token + result = self.user_session.login(self.test_email, self.test_password, None) + + # Verify the request was made correctly + self.mock_client.post.assert_called_once() + call_args = self.mock_client.post.call_args + + # Check the data - None should not be included + expected_data = { + "user": { + "email": self.test_email, + "password": self.test_password + } + } + actual_data = json.loads(call_args[1]['data']) + self.assertEqual(actual_data, expected_data) + + # Check the response + self.assertEqual(result, mock_response) + + def test_login_parameter_validation(self): + """Test login parameter validation""" + # Test with empty email + with self.assertRaises(PermissionError) as context: + self.user_session.login("", self.test_password, self.test_tfa_token) + self.assertIn("Email Id is required", str(context.exception)) + + # Test with empty password + with self.assertRaises(PermissionError) as context: + self.user_session.login(self.test_email, "", self.test_tfa_token) + self.assertIn("Password is required", str(context.exception)) + + def test_login_method_signature(self): + """Test that login method has the correct signature""" + import inspect + + # Get the signature of the login method + sig = inspect.signature(self.user_session.login) + params = list(sig.parameters.keys()) + + # Verify the method has the expected parameters + expected_params = ['email', 'password', 'tfa_token'] + for param in expected_params: + self.assertIn(param, params) + + # Verify parameter defaults + self.assertEqual(sig.parameters['tfa_token'].default, None) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/unit/users/test_users.py b/tests/unit/users/test_users.py index 61eb6a3..a789d87 100644 --- a/tests/unit/users/test_users.py +++ b/tests/unit/users/test_users.py @@ -14,7 +14,8 @@ class UserUnitTests(unittest.TestCase): def setUp(self): self.client = contentstack_management.Client(host=host) - self.client.login(username, password) + # Note: Login call removed to avoid network requests in unit tests + # The actual login is not needed for testing request structure def test_get_user(self): response = self.client.user().fetch()