Skip to content

Commit

Permalink
Additions to auth url generation.
Browse files Browse the repository at this point in the history
  • Loading branch information
EvieePy committed Feb 13, 2024
1 parent 5b5e7f9 commit 78d2e87
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 11 deletions.
53 changes: 43 additions & 10 deletions twitchio/authentication/oauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"""
from __future__ import annotations

import secrets
import urllib.parse
from typing import TYPE_CHECKING, ClassVar

Expand All @@ -32,6 +33,7 @@

if TYPE_CHECKING:
from ..types_.responses import (
AuthorizationURLResponse,
ClientCredentialsResponse,
RefreshTokenResponse,
UserTokenResponse,
Expand All @@ -43,12 +45,15 @@
class OAuth(HTTPClient):
CONTENT_TYPE_HEADER: ClassVar[dict[str, str]] = {"Content-Type": "application/x-www-form-urlencoded"}

def __init__(self, *, client_id: str, client_secret: str, redirect_uri: str | None = None) -> None:
def __init__(
self, *, client_id: str, client_secret: str, redirect_uri: str | None = None, scopes: Scopes | None = None
) -> None:
super().__init__()

self.client_id = client_id
self.client_secret = client_secret
self.redirect_uri = redirect_uri
self.scopes = scopes

async def validate_token(self, token: str, /) -> ValidateTokenPayload:
token = token.removeprefix("Bearer ").removeprefix("OAuth ")
Expand All @@ -72,15 +77,16 @@ async def refresh_token(self, refresh_token: str, /) -> RefreshTokenPayload:

return RefreshTokenPayload(data)

async def user_access_token(self, code: str, /) -> UserTokenPayload:
if not self.redirect_uri:
raise ValueError("Missing redirect_uri")
async def user_access_token(self, code: str, /, *, redirect_uri: str | None = None) -> UserTokenPayload:
redirect = redirect_uri or self.redirect_uri
if not redirect:
raise ValueError('"redirect_uri" is a required parameter or attribute which is missing.')

params = self._create_params(
{
"code": code,
"grant_type": "authorization_code",
"redirect_uri": self.redirect_uri,
"redirect_uri": redirect,
# "scope": " ".join(SCOPES), #TODO
# "state": #TODO
}
Expand All @@ -105,20 +111,47 @@ async def client_credentials_token(self) -> ClientCredentialsPayload:

return ClientCredentialsPayload(data)

def get_authorization_url(self, scopes: Scopes, state: str = "") -> str:
if not self.redirect_uri:
raise ValueError("Missing redirect_uri")
def get_authorization_url(
self,
*,
scopes: Scopes | None = None,
state: str | None = None,
redirect_uri: str | None = None,
force_verify: bool = False,
) -> AuthorizationURLPayload:
redirect = redirect_uri or self.redirect_uri
if not redirect:
raise ValueError('"redirect_uri" is a required parameter or attribute which is missing.')

scopes = scopes or self.scopes
if not scopes:
raise ValueError('"scopes" is a required parameter or attribute which is missing.')

if state is None:
state = secrets.token_urlsafe(32)

params = {
"client_id": self.client_id,
"redirect_uri": urllib.parse.quote(self.redirect_uri),
"redirect_uri": urllib.parse.quote(redirect),
"response_type": "code",
"scope": scopes.urlsafe(),
"force_verify": "true" if force_verify else "false",
"state": state,
}

route: Route = Route("GET", "/oauth2/authorize", use_id=True, params=params)
return route.url
data: AuthorizationURLResponse = {
"url": route.url,
"client_id": self.client_id,
"redirect_uri": redirect,
"response_type": "code",
"scopes": scopes.selected,
"force_verify": force_verify,
"state": state,
}

payload: AuthorizationURLPayload = AuthorizationURLPayload(data)
return payload

def _create_params(self, extra_params: dict[str, str]) -> dict[str, str]:
params = {
Expand Down
19 changes: 19 additions & 0 deletions twitchio/authentication/payloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"ValidateTokenPayload",
"ClientCredentialsPayload",
"UserTokenPayload",
"AuthorizationURLPayload",
)


Expand Down Expand Up @@ -103,3 +104,21 @@ def __init__(self, raw: ClientCredentialsResponse, /) -> None:
self.access_token: str = raw["access_token"]
self.expires_in: int = raw["expires_in"]
self.token_type: str = raw["token_type"]


class AuthorizationURLPayload(BasePayload):
__slots__ = ("url", "client_id", "redirect_uri", "response_type", "scopes", "force_verify", "state")

def __init__(self, raw: AuthorizationURLResponse, /) -> None:
super().__init__(raw)

self.url: str = raw["url"]
self.client_id: str = raw["client_id"]
self.redirect_uri: str = raw["redirect_uri"]
self.response_type: str = raw["response_type"]
self.scopes: list[str] = raw["scopes"]
self.force_verify: bool = raw["force_verify"]
self.state: str = raw["state"]

def __str__(self) -> str:
return self.url
19 changes: 18 additions & 1 deletion twitchio/types_/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"OAuthResponses",
"UserTokenResponse",
"RawResponse",
"AuthorizationURLResponse",
)


Expand Down Expand Up @@ -60,5 +61,21 @@ class ClientCredentialsResponse(TypedDict):
token_type: str


OAuthResponses: TypeAlias = RefreshTokenResponse | ValidateTokenResponse | ClientCredentialsResponse | UserTokenResponse
class AuthorizationURLResponse(TypedDict):
url: str
client_id: str
redirect_uri: str
response_type: str
scopes: list[str]
force_verify: bool
state: str


OAuthResponses: TypeAlias = (
RefreshTokenResponse
| ValidateTokenResponse
| ClientCredentialsResponse
| UserTokenResponse
| AuthorizationURLResponse
)
RawResponse: TypeAlias = dict[str, Any]

0 comments on commit 78d2e87

Please sign in to comment.