Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/multisafepay/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

from multisafepay.client.api_key import ApiKey
from multisafepay.client.client import Client
from multisafepay.client.credential_resolver import ScopedCredentialResolver

__all__ = [
"ApiKey",
"Client",
"ScopedCredentialResolver",
Comment thread
zulquer marked this conversation as resolved.
]
67 changes: 60 additions & 7 deletions src/multisafepay/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@

from ..exception.api import ApiException
from .api_key import ApiKey
from .credential_resolver import (
AuthScope,
CredentialResolver,
ScopedCredentialResolver,
)


class Client:
Expand Down Expand Up @@ -46,28 +51,42 @@ class Client:

def __init__(
self: "Client",
api_key: str,
is_production: bool,
api_key: Optional[str] = None,
is_production: bool = False,
transport: Optional[HTTPTransport] = None,
locale: str = "en_US",
base_url: Optional[str] = None,
credential_resolver: Optional[CredentialResolver] = None,
) -> None:
"""
Initialize the Client.

Parameters
----------
api_key (str): The API key for authentication.
api_key (Optional[str]): The API key for authentication.
Optional only when `credential_resolver` is provided.
is_production (bool): Flag indicating if the client is in production mode.
transport (Optional[HTTPTransport], optional): Custom HTTP transport implementation.
Defaults to RequestsTransport if not provided.
locale (str, optional): Locale for the requests. Defaults to "en_US".
base_url (Optional[str], optional): Custom API base URL.
Only allowed when running with `MSP_SDK_BUILD_PROFILE=dev`
and `MSP_SDK_ALLOW_CUSTOM_BASE_URL=1`.
credential_resolver (Optional[CredentialResolver], optional):
Resolver used to derive API keys by auth scope.

Raises
------
ValueError: If no API key or CredentialResolver is provided.

"""
self.api_key = ApiKey(api_key=api_key)
if api_key is None and credential_resolver is None:
raise ValueError(
"api_key is required when credential_resolver is not provided.",
)

self.api_key = ApiKey(api_key=api_key) if api_key is not None else None
self.credential_resolver = credential_resolver
self.url = self._resolve_base_url(
is_production=is_production,
explicit_base_url=base_url,
Expand Down Expand Up @@ -123,6 +142,7 @@ def create_get_request(
endpoint: str,
params: dict[str, Any] = None,
context: Optional[dict[str, Any]] = None,
auth_scope: Optional[AuthScope] = None,
) -> ApiResponse:
"""
Create a GET request.
Expand All @@ -143,6 +163,7 @@ def create_get_request(
self.METHOD_GET,
url,
context=context,
auth_scope=auth_scope,
)

def create_post_request(
Expand All @@ -151,6 +172,7 @@ def create_post_request(
params: dict[str, Any] = None,
request_body: str = None,
context: Optional[dict[str, Any]] = None,
auth_scope: Optional[AuthScope] = None,
) -> ApiResponse:
"""
Create a POST request.
Expand All @@ -173,6 +195,7 @@ def create_post_request(
url,
request_body=request_body,
context=context,
auth_scope=auth_scope,
)

def create_patch_request(
Expand All @@ -181,6 +204,7 @@ def create_patch_request(
params: dict[str, Any] = None,
request_body: str = None,
context: Optional[dict[str, Any]] = None,
auth_scope: Optional[AuthScope] = None,
) -> ApiResponse:
"""
Create a PATCH request.
Expand All @@ -203,13 +227,15 @@ def create_patch_request(
url,
request_body=request_body,
context=context,
auth_scope=auth_scope,
)

def create_delete_request(
self: "Client",
endpoint: str,
params: dict[str, Any] = None,
context: Optional[dict[str, Any]] = None,
auth_scope: Optional[AuthScope] = None,
) -> ApiResponse:
"""
Create a DELETE request.
Expand All @@ -226,7 +252,12 @@ def create_delete_request(

"""
url = self._build_url(endpoint, params)
return self._create_request(self.METHOD_DELETE, url, context=context)
return self._create_request(
self.METHOD_DELETE,
url,
context=context,
auth_scope=auth_scope,
)

def _build_url(
self: "Client",
Expand Down Expand Up @@ -255,12 +286,33 @@ def _build_url(
)
return f"{self.url}{endpoint}?{query_string}"

def _resolve_api_key(
self: "Client",
auth_scope: Optional[AuthScope],
) -> str:
if self.credential_resolver is not None:
resolved_scope = auth_scope or AuthScope(
scope=ScopedCredentialResolver.AUTH_SCOPE_DEFAULT,
)
return self.credential_resolver.resolve(
auth_scope=resolved_scope.scope,
group_id=resolved_scope.group_id,
)

Comment thread
zulquer marked this conversation as resolved.
if self.api_key is None:
raise ValueError(
"api_key is required when credential_resolver is not provided.",
)

return self.api_key.get()

def _create_request(
self: "Client",
method: str,
url: str,
request_body: Optional[dict[str, Any]] = None,
context: Optional[dict[str, Any]] = None,
auth_scope: Optional[AuthScope] = None,
) -> ApiResponse:
Comment thread
zulquer marked this conversation as resolved.
"""
Create and send an HTTP request.
Expand All @@ -277,9 +329,10 @@ def _create_request(
ApiResponse: The API response.

"""
api_key = self._resolve_api_key(auth_scope)
headers = {
"Authorization": "Bearer " + self.api_key.get(),
"accept-encoding": "application/json",
"Authorization": "Bearer " + api_key,
"Accept": "application/json",
"Content-Type": "application/json",
}

Expand Down
106 changes: 106 additions & 0 deletions src/multisafepay/client/credential_resolver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Copyright (c) MultiSafepay, Inc. All rights reserved.

# This file is licensed under the Open Software License (OSL) version 3.0.
# For a copy of the license, see the LICENSE.txt file in the project root.

# See the DISCLAIMER.md file for disclaimer details.

"""Credential resolver contracts and default scoped resolver."""

from dataclasses import dataclass
from typing import Optional, Protocol


@dataclass(frozen=True)
class AuthScope:
"""Auth scope selection payload for credential resolution."""

scope: str
group_id: Optional[str] = None


class CredentialResolver(Protocol):
"""Protocol for resolving API keys by auth scope and context."""

def resolve(
self: "CredentialResolver",
auth_scope: str,
group_id: Optional[str] = None,
) -> str:
"""Resolve the API key to use for a given scope and context."""


class ScopedCredentialResolver:
"""Default resolver implementation for account, partner and group scopes."""

AUTH_SCOPE_DEFAULT = "default_account"
AUTH_SCOPE_PARTNER_AFFILIATE = "partner_affiliate"
AUTH_SCOPE_TERMINAL_GROUP = "terminal_group"

def __init__(
self: "ScopedCredentialResolver",
default_api_key: str,
partner_affiliate_api_key: Optional[str] = None,
terminal_group_api_keys: Optional[dict[str, str]] = None,
) -> None:
"""
Initialize a scoped credential resolver.

Parameters
----------
default_api_key (str): Fallback/default account API key.
partner_affiliate_api_key (Optional[str]): Partner/affiliate API key.
terminal_group_api_keys (Optional[dict[str, str]]): Mapping of
terminal_group_id to API key.

"""
self.default_api_key = (default_api_key or "").strip()
self.partner_affiliate_api_key = (
partner_affiliate_api_key or ""
).strip() or None
self.terminal_group_api_keys = {
group_id: api_key.strip()
for group_id, api_key in (terminal_group_api_keys or {}).items()
if api_key and api_key.strip()
}

if (
not self.default_api_key
and self.partner_affiliate_api_key is None
and not self.terminal_group_api_keys
):
raise ValueError(
"ScopedCredentialResolver requires at least one API key.",
)

def resolve(
self: "ScopedCredentialResolver",
auth_scope: str,
group_id: Optional[str] = None,
) -> str:
"""Resolve API key for the given scope and auth context."""
if auth_scope == self.AUTH_SCOPE_TERMINAL_GROUP:
if not group_id:
raise ValueError(
"Missing terminal_group_id in auth scope.",
)
api_key = self.terminal_group_api_keys.get(group_id)
if not api_key:
raise ValueError(
"No API key configured for terminal_group_id "
f"'{group_id}'.",
)
return api_key

if auth_scope == self.AUTH_SCOPE_PARTNER_AFFILIATE:
api_key = self.partner_affiliate_api_key or self.default_api_key
if not api_key:
raise ValueError(
"No API key configured for partner_affiliate scope.",
)
return api_key

if not self.default_api_key:
raise ValueError("No API key configured for default scope.")

return self.default_api_key
26 changes: 18 additions & 8 deletions src/multisafepay/sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from .api.paths.me.me_manager import MeManager
from .api.paths.recurring.recurring_manager import RecurringManager
from .client.client import Client
from .client.credential_resolver import CredentialResolver


class Sdk:
Expand All @@ -38,19 +39,21 @@ class Sdk:

def __init__(
self: "Sdk",
api_key: str,
is_production: bool,
api_key: Optional[str] = None,
is_production: bool = False,
transport: Optional[HTTPTransport] = None,
locale: str = "en_US",
base_url: Optional[str] = None,
credential_resolver: Optional[CredentialResolver] = None,
) -> None:
"""
Initialize the SDK with the provided configuration.

Parameters
----------
api_key : str
api_key : Optional[str]
The API key for authenticating with the MultiSafePay API.
Optional only when `credential_resolver` is provided.
is_production : bool
Flag indicating whether to use the production environment.
transport : Optional[HTTPTransport], optional
Expand All @@ -60,14 +63,21 @@ def __init__(
The locale to use for requests, by default "en_US".
base_url : Optional[str], optional
Custom API base URL (dev-only guardrails apply), by default None.
credential_resolver : Optional[CredentialResolver], optional
Strategy for resolving API keys per auth scope, by default None.

Raises
------
ValueError: If no API key or CredentialResolver is provided.

"""
self.client = Client(
api_key.strip(),
is_production,
transport,
locale,
base_url,
api_key=api_key,
is_production=is_production,
transport=transport,
locale=locale,
base_url=base_url,
credential_resolver=credential_resolver,
)
self.recurring_manager = RecurringManager(self.client)

Expand Down
Loading
Loading