-
Notifications
You must be signed in to change notification settings - Fork 44
/
authenticate.py
158 lines (136 loc) · 4.88 KB
/
authenticate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
"""
Example scripts that performs XBL authentication
"""
import argparse
import asyncio
import http.server
import os
import queue
import socketserver
import threading
from urllib.parse import parse_qs, urlparse
import webbrowser
from xbox.webapi.authentication.manager import AuthenticationManager
from xbox.webapi.authentication.models import OAuth2TokenResponse
from xbox.webapi.common.signed_session import SignedSession
from xbox.webapi.scripts import CLIENT_ID, CLIENT_SECRET, REDIRECT_URI, TOKENS_FILE
QUEUE = queue.Queue(1)
class AuthCallbackRequestHandler(http.server.BaseHTTPRequestHandler):
"""
Handles the auth callback that's received when Windows Live auth flow completed
"""
def do_GET(self):
try:
url_path = self.requestline.split(" ")[1]
query_params = parse_qs(urlparse(url_path).query)
except Exception as e:
self.send_error(
400,
explain=f"Invalid request='{self.requestline}' - Failed to parse URL Path, error={e}",
)
self.end_headers()
return
if query_params.get("error"):
error_description = query_params.get("error_description")
self.send_error(
400, explain=f"Auth callback failed - Error: {error_description}"
)
self.end_headers()
return
auth_code = query_params.get("code")
if not auth_code:
self.send_error(
400,
explain=f"Auth callback failed - No code received - Original request: {self.requestline}",
)
self.end_headers()
return
if isinstance(auth_code, list):
auth_code = auth_code[0]
elif isinstance(auth_code, str):
pass
else:
raise Exception(f"Invalid code query param: {auth_code}")
# Put auth_code into queue for do_auth to receive
QUEUE.put(auth_code)
response_body = b"<script>window.close()</script>"
self.send_response(200)
self.send_header("Content-Type", "text/html")
self.send_header("Content-Length", str(len(response_body)))
self.end_headers()
self.wfile.write(response_body)
async def do_auth(
client_id: str, client_secret: str, redirect_uri: str, token_filepath: str
):
async with SignedSession() as session:
auth_mgr = AuthenticationManager(
session, client_id, client_secret, redirect_uri
)
# Refresh tokens if we have them
if os.path.exists(token_filepath):
with open(token_filepath) as f:
tokens = f.read()
auth_mgr.oauth = OAuth2TokenResponse.model_validate_json(tokens)
await auth_mgr.refresh_tokens()
# Request new ones if they are not valid
if not (auth_mgr.xsts_token and auth_mgr.xsts_token.is_valid()):
auth_url = auth_mgr.generate_authorization_url()
webbrowser.open(auth_url)
# Wait for auth code from http server thread
code = QUEUE.get()
await auth_mgr.request_tokens(code)
with open(token_filepath, mode="w") as f:
print(f"Finished authentication, writing tokens to {token_filepath}")
f.write(auth_mgr.oauth.json())
async def async_main():
parser = argparse.ArgumentParser(description="Authenticate with XBL")
parser.add_argument(
"--tokens",
"-t",
default=TOKENS_FILE,
help=f"Token filepath. Default: '{TOKENS_FILE}'",
)
parser.add_argument(
"--client-id",
"-cid",
default=os.environ.get("CLIENT_ID", CLIENT_ID),
help="OAuth2 Client ID",
)
parser.add_argument(
"--client-secret",
"-cs",
default=os.environ.get("CLIENT_SECRET", CLIENT_SECRET),
help="OAuth2 Client Secret",
)
parser.add_argument(
"--redirect-uri",
"-ru",
default=os.environ.get("REDIRECT_URI", REDIRECT_URI),
help="OAuth2 Redirect URI",
)
parser.add_argument(
"--port",
"-p",
default=8080,
type=int,
help="""
HTTP Server port for awaiting auth callback
* NOTE: Changing this will break default auth flow and requires providing own OAUTH parameters
""",
)
args = parser.parse_args()
with socketserver.TCPServer(
("0.0.0.0", args.port), AuthCallbackRequestHandler
) as httpd:
print(f"Serving HTTP Server for auth callback at port {args.port}")
server_thread = threading.Thread(target=httpd.serve_forever)
# Exit the server thread when the main thread terminates
server_thread.daemon = True
server_thread.start()
await do_auth(
args.client_id, args.client_secret, args.redirect_uri, args.tokens
)
def main():
asyncio.run(async_main())
if __name__ == "__main__":
main()