Skip to content

Commit

Permalink
feat: add authorize methods
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinHjelmare committed May 20, 2022
1 parent bfc36e4 commit 3bb9347
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 10 deletions.
2 changes: 1 addition & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ version = "0.1.0"
[tool.poetry.dependencies]
aiohttp = "^3.8"
python = "^3.9"
yarl = "^1.7"

[tool.poetry.dev-dependencies]
bandit = "^1.7"
Expand Down
40 changes: 31 additions & 9 deletions src/aiortm/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

import hashlib
from http import HTTPStatus
from typing import Any
from typing import Any, cast

from aiohttp import ClientResponse, ClientResponseError, ClientSession
from yarl import URL

from .exceptions import (
APIAuthError,
Expand Down Expand Up @@ -51,31 +52,52 @@ async def request(self, url: str, **kwargs: Any) -> ClientResponse:
headers=headers,
)

async def authenticate_desktop(self) -> tuple[str, str]:
"""Authenticate as a desktop application."""
data = await self.call_api("rtm.auth.getFrob", api_key=self.api_key)
frob: str = data["frob"]
url = self.generate_authorize_url(
api_key=self.api_key, perms=self.permission, frob=frob
)
return url, frob

def generate_authorize_url(self, **params: Any) -> str:
"""Generate a URL for authorization."""
all_params = params | {"api_sig": self._sign_request(params)}
return str(URL(AUTH_URL).with_query(all_params))

async def get_token(self, frob: str) -> dict[str, Any]:
"""Fetch the authentication token with the frob."""
data = await self.call_api("rtm.auth.getToken", api_key=self.api_key, frob=frob)
auth_data: dict[str, Any] = data["auth"]
self.auth_token = cast(str, auth_data["token"])
return auth_data

async def check_token(self) -> bool:
"""Check if auth token is valid."""
if self.auth_token is None:
return False
try:
await self._call_api(
await self.call_api(
"rtm.auth.checkToken", api_key=self.api_key, auth_token=self.auth_token
)
except APIAuthError:
return False

return True

async def _call_api_auth(self, api_method: str, **params: Any) -> dict[str, Any]:
async def call_api_auth(self, api_method: str, **params: Any) -> dict[str, Any]:
"""Call an api method that requires authentication."""
if self.auth_token is None:
raise RuntimeError("Missing authentication token.")
all_params = {"api_key": self.api_key, "auth_token": self.auth_token, **params}
return await self._call_api(api_method, **all_params)
all_params = {"api_key": self.api_key, "auth_token": self.auth_token} | params
return await self.call_api(api_method, **all_params)

async def _call_api(self, api_method: str, **params: Any) -> dict[str, Any]:
async def call_api(self, api_method: str, **params: Any) -> dict[str, Any]:
"""Call an api method."""
params |= {"format": "json"}
params |= {"api_sig": self._sign_request(params)}
response = await self.request(REST_URL, method=api_method, **params)
all_params = params | {"format": "json"}
all_params |= {"api_sig": self._sign_request(all_params)}
response = await self.request(REST_URL, method=api_method, **all_params)

try:
response.raise_for_status()
Expand Down

0 comments on commit 3bb9347

Please sign in to comment.