Skip to content

Commit

Permalink
Merge branch 'webhook_subscription_rework'
Browse files Browse the repository at this point in the history
  • Loading branch information
Teekeks committed Oct 8, 2020
2 parents 980c1f3 + e2c49b0 commit 669eb52
Show file tree
Hide file tree
Showing 3 changed files with 235 additions and 118 deletions.
24 changes: 17 additions & 7 deletions twitchAPI/oauth.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# Copyright (c) 2020. Lena "Teekeks" During <info@teawork.de>
"""
User OAuth Authenticator and helper functions
"""
from .twitch import Twitch
from .helper import build_url, build_scope, get_uuid, TWITCH_AUTH_BASE_URL
from .types import AuthScope
Expand All @@ -18,11 +21,14 @@ def refresh_access_token(refresh_token: str,
app_secret: str):
"""Simple helper function for refreshing a user access token.
:param refresh_token: str, the current refresh_token
:param app_id: str, the id of your app
:param app_secret: str, the secret key of your app
:param refresh_token: the current refresh_token
:type refresh_token: str
:param app_id: the id of your app
:type app_id: str
:param app_secret: the secret key of your app
:type app_secret: str
:return: access_token, refresh_token
:rtype: str, str
:rtype: (str, str)
"""
param = {
'refresh_token': refresh_token,
Expand All @@ -39,9 +45,12 @@ def refresh_access_token(refresh_token: str,
class UserAuthenticator:
"""Simple to use client for the Twitch User authentication flow.
:param twitch: :class:`twitchAPI.twitch.Twitch` instance
:param scopes: List of :class:`twitchAPI.types.AuthScope`, the desired Auth scopes
"""
:param twitch: A twitch instance
:type twitch: :class:`twitchAPI.twitch.Twitch`
:param scopes: List of the desired Auth scopes
:type scopes: [:class:`twitchAPI.types.AuthScope`]
:param force_verify: If this is true, the user will always be prompted for authorization by twitch, default False
:type force_verify: bool"""

__twitch: 'Twitch' = None
port: int = 17563
Expand Down Expand Up @@ -143,6 +152,7 @@ def authenticate(self,
:param callback_func: Function to call once the authentication finnished.
:return: None if callback_func is set, otherwise access_token and refresh_token
:rtype: None or (str, str)
"""
self.__callback_func = callback_func
self.__start()
Expand Down
152 changes: 126 additions & 26 deletions twitchAPI/twitch.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Copyright (c) 2020. Lena "Teekeks" During <info@teawork.de>

"""
The Twitch API client.
"""
import requests
from typing import Union, List, Optional
from .helper import build_url, TWITCH_API_BASE_URL, TWITCH_AUTH_BASE_URL, make_fields_datetime, build_scope, \
Expand All @@ -11,6 +13,11 @@
class Twitch:
"""
Twitch API client
:param app_id: Your app id
:type app_id: str
:param app_secret: Your app secret
:type app_secret: str
"""
app_id: Optional[str] = None
app_secret: Optional[str] = None
Expand All @@ -19,9 +26,13 @@ class Twitch:
__has_app_auth: bool = False

__user_auth_token: Optional[str] = None
__user_auth_refresh_token: Optional[str] = None
__user_auth_scope: List[AuthScope] = []
__has_user_auth: bool = False

auto_refresh_auth: bool = True
"""If set to true, auto refresh the auth token once it expires"""

def __init__(self, app_id: str, app_secret: str):
self.app_id = app_id
self.app_secret = app_secret
Expand All @@ -48,60 +59,125 @@ def __generate_header(self, auth_type: 'AuthType', required_scope: List[AuthScop
f'Bearer {self.__user_auth_token if self.__has_user_auth else self.__app_auth_token}'
return header

def __refresh_used_token(self):
"""Refreshes the currently used token"""
if self.__has_user_auth:
from .oauth import refresh_access_token
self.__user_auth_token,\
self.__user_auth_refresh_token = refresh_access_token(self.__user_auth_refresh_token,
self.app_id,
self.app_secret)
else:
self.__generate_app_token()

def __api_post_request(self,
url: str,
auth_type: 'AuthType',
required_scope: List[AuthScope],
data: Optional[dict] = None) -> requests.Response:
data: Optional[dict] = None,
retries: int = 1) -> requests.Response:
"""Make POST request with authorization"""
headers = self.__generate_header(auth_type, required_scope)
req = None
if data is None:
return requests.post(url, headers=headers)
req = requests.post(url, headers=headers)
else:
return requests.post(url, headers=headers, json=data)
req = requests.post(url, headers=headers, json=data)
if not auth_type == AuthType.NONE and self.auto_refresh_auth and retries > 0:
if req.status_code == 401:
# unauthorized, lets try to refresh the token once
self.__refresh_used_token()
return self.__api_post_request(url, auth_type, required_scope, data=data, retries=retries - 1)
elif req.status_code == 503:
# service unavailable, retry exactly once as recommended by twitch documentation
return self.__api_post_request(url, auth_type, required_scope, data=data, retries=0)
return req

def __api_put_request(self,
url: str,
auth_type: 'AuthType',
required_scope: List[AuthScope],
data: Optional[dict] = None) -> requests.Response:
data: Optional[dict] = None,
retries: int = 1) -> requests.Response:
"""Make PUT request with authorization"""
headers = self.__generate_header(auth_type, required_scope)
req = None
if data is None:
return requests.put(url, headers=headers)
req = requests.put(url, headers=headers)
else:
return requests.put(url, headers=headers, json=data)
req = requests.put(url, headers=headers, json=data)
if not auth_type == AuthType.NONE and self.auto_refresh_auth and retries > 0:
if req.status_code == 401:
# unauthorized, lets try to refresh the token once
self.__refresh_used_token()
return self.__api_put_request(url, auth_type, required_scope, data=data, retries=retries - 1)
elif req.status_code == 503:
# service unavailable, retry exactly once as recommended by twitch documentation
return self.__api_put_request(url, auth_type, required_scope, data=data, retries=0)
return req

def __api_patch_request(self,
url: str,
auth_type: 'AuthType',
required_scope: List[AuthScope],
data: Optional[dict] = None) -> requests.Response:
data: Optional[dict] = None,
retries: int = 1) -> requests.Response:
"""Make PUT request with authorization"""
headers = self.__generate_header(auth_type, required_scope)
req = None
if data is None:
return requests.patch(url, headers=headers)
req = requests.patch(url, headers=headers)
else:
return requests.patch(url, headers=headers, json=data)
req = requests.patch(url, headers=headers, json=data)
if not auth_type == AuthType.NONE and self.auto_refresh_auth and retries > 0:
if req.status_code == 401:
# unauthorized, lets try to refresh the token once
self.__refresh_used_token()
return self.__api_patch_request(url, auth_type, required_scope, data=data, retries=retries - 1)
elif req.status_code == 503:
# service unavailable, retry exactly once as recommended by twitch documentation
return self.__api_patch_request(url, auth_type, required_scope, data=data, retries=0)
return req

def __api_delete_request(self,
url: str,
auth_type: 'AuthType',
required_scope: List[AuthScope],
data: Optional[dict] = None) -> requests.Response:
data: Optional[dict] = None,
retries: int = 1) -> requests.Response:
"""Make PUT request with authorization"""
headers = self.__generate_header(auth_type, required_scope)
req = None
if data is None:
return requests.delete(url, headers=headers)
req = requests.delete(url, headers=headers)
else:
return requests.delete(url, headers=headers, json=data)
req = requests.delete(url, headers=headers, json=data)
if not auth_type == AuthType.NONE and self.auto_refresh_auth and retries > 0:
if req.status_code == 401:
# unauthorized, lets try to refresh the token once
self.__refresh_used_token()
return self.__api_delete_request(url, auth_type, required_scope, data=data, retries=retries - 1)
elif req.status_code == 503:
# service unavailable, retry exactly once as recommended by twitch documentation
return self.__api_delete_request(url, auth_type, required_scope, data=data, retries=0)
return req

def __api_get_request(self, url: str,
auth_type: 'AuthType',
required_scope: List[AuthScope]) -> requests.Response:
required_scope: List[AuthScope],
retries: int = 1) -> requests.Response:
"""Make GET request with authorization"""
headers = self.__generate_header(auth_type, required_scope)
return requests.get(url, headers=headers)
req = requests.get(url, headers=headers)
if not auth_type == AuthType.NONE and self.auto_refresh_auth and retries > 0:
if req.status_code == 401:
# unauthorized, lets try to refresh the token once
self.__refresh_used_token()
return self.__api_get_request(url, auth_type, required_scope, retries - 1)
elif req.status_code == 503:
# service unavailable, retry exactly once as recommended by twitch documentation
return self.__api_get_request(url, auth_type, required_scope, 0)
return req

def __generate_app_token(self) -> None:
params = {
Expand Down Expand Up @@ -132,14 +208,19 @@ def authenticate_app(self, scope: List[AuthScope]) -> None:
self.__generate_app_token()
self.__has_app_auth = True

def set_user_authentication(self, token: str, scope: List[AuthScope]) -> None:
def set_user_authentication(self, token: str, scope: List[AuthScope], refresh_token: Optional[str] = None) -> None:
"""Set a user token to be used.
:param token: the generated user token
:param scope: List of `AuthScope` that the given user token has
:param refresh_token: str, has to be provided if ``auto_refresh_user_auth`` is True
:return: None
:raises: ValueError
"""
if refresh_token is None and self.auto_refresh_user_auth:
raise ValueError('refresh_token has to be provided when auto_refresh_user_auth is True')
self.__user_auth_token = token
self.__user_auth_refresh_token = refresh_token
self.__user_auth_scope = scope
self.__has_user_auth = True

Expand All @@ -151,6 +232,14 @@ def get_app_token(self) -> Union[str, None]:
"""
return self.__app_auth_token

def get_user_auth_token(self) -> Union[str, None]:
"""Returns the current user auth token, None if no user Authentication is set
:return: current user auth token
:rtype: str, None
"""
return self.__user_auth_token

# ======================================================================================================================
# API calls
# ======================================================================================================================
Expand All @@ -162,25 +251,36 @@ def get_extension_analytics(self,
ended_at: Optional[datetime] = None,
started_at: Optional[datetime] = None,
report_type: Optional[AnalyticsReportType] = None) -> dict:
"""Requires User authentication with scope :class:`~AuthScope.ANALYTICS_READ_EXTENSION`\n
"""Requires User authentication with scope :class:`twitchAPI.types.AuthScope.ANALYTICS_READ_EXTENSION`\n
For detailed documentation, see here: https://dev.twitch.tv/docs/api/reference#get-extension-analytics
:param after: optional str
:param extension_id: optional str
:param first: optional int range 1 to 100
:param ended_at: optional :class:`datetime`
:param started_at: optional :class:`datetime`
:param report_type: optional :class:`~twitchAPI.types.AnalyticsReportType`
:param after: cursor for forward pagination
:type after: str
:param extension_id: If this is specified, the returned URL points to an analytics report for just the specified
extension.
:type extension_id: str
:param first: Maximum number of objects returned, range 1 to 100, default 20
:type first: int
:param ended_at: Ending date/time for returned reports, if this is provided, `started_at` must also be
specified.
:type ended_at: :class:`datetime.datetime`
:param started_at: Starting date/time for returned reports, if this is provided, `ended_at` must also be
specified.
:type started_at: :class:`datetime.datetime`
:param report_type: Type of analytics report that is returned
:type report_type: :class:`~twitchAPI.types.AnalyticsReportType`
:rtype: dict
:raises: :class:`twitchAPI.types.UnauthorizedException`, :class:`twitchAPI.types.MissingScopeException`,
ValueError
"""
if ended_at is not None or started_at is not None:
# you have to put in both:
if ended_at is None or started_at is None:
raise Exception('you must specify both ended_at and started_at')
raise ValueError('you must specify both ended_at and started_at')
if started_at > ended_at:
raise Exception('started_at must be before ended_at')
raise ValueError('started_at must be before ended_at')
if first > 100 or first < 1:
raise Exception('first must be between 1 and 100')
raise ValueError('first must be between 1 and 100')
url_params = {
'after': after,
'ended_at': ended_at.isoformat() if ended_at is not None else None,
Expand Down

0 comments on commit 669eb52

Please sign in to comment.