Skip to content

Commit

Permalink
UserSession moved to MojangAuthenticatedUser/MicrosoftAuthenticatedUser
Browse files Browse the repository at this point in the history
  • Loading branch information
Lucino772 committed Nov 13, 2021
1 parent 5645a93 commit 9749e10
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 189 deletions.
167 changes: 167 additions & 0 deletions mojang/account/ext/_profile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
from abc import ABCMeta, abstractmethod
from typing import Optional

import msal

from .. import session
from ..auth import microsoft, security, yggdrasil


class AuthenticatedUser(metaclass=ABCMeta):
def __init__(self, access_token: str, refresh_token: str) -> None:
self.__name = None
self.__uuid = None
self.__is_legacy = False
self.__is_demo = False
self.__names = None
self.__skin = None
self.__cape = None

self.__name_change_allowed = False
self.__created_at = False

self._access_token = access_token
self._refresh_token = refresh_token

self._fetch_profile()

@abstractmethod
def refresh(self):
raise NotImplementedError

@abstractmethod
def close(self):
raise NotImplementedError

@property
def name(self):
return self.__name

@property
def uuid(self):
return self.__uuid

@property
def is_legacy(self):
return self.__is_legacy

@property
def is_demo(self):
return self.__is_demo

@property
def names(self):
return self.__names

@property
def skin(self):
return self.__skin

@property
def cape(self):
return self.__cape

@property
def name_change_allowed(self):
return self.__name_change_allowed

@property
def created_at(self):
return self.__created_at

def _fetch_profile(self):
# Load profile
profile = session.get_profile(self._access_token)
self.__name = profile.name
self.__uuid = profile.uuid
self.__names = profile.names
self.__skin = profile.skin
self.__cape = profile.cape
self.__is_legacy = profile.is_legacy
self.__is_demo = profile.is_demo
del profile

# Load name change
name_change = session.get_user_name_change(self._access_token)
self.__name_change_allowed = name_change.allowed
self.__created_at = name_change.created_at

def change_name(self, name: str):
"""Change user name. For more details checkout [`change_user_name`][mojang.account.session.change_user_name]
Args:
name (str): The new name
"""
session.change_user_name(self._access_token, name)
self._fetch_profile()

def change_skin(self, path: str, variant: Optional[str] = "classic"):
"""Change user skin. For more details checkout [`change_user_skin`][mojang.account.session.change_user_skin]
Args:
path (str): The path to the skin, either local or remote
variant (str, optional): The variant of skin (default to 'classic')
"""
session.change_user_skin(self._access_token, path, variant)
self._fetch_profile()

def reset_skin(self):
"""Reset user skin. For more details checkout [`reset_user_skin`][mojang.account.session.reset_user_skin]"""
session.reset_user_skin(self._access_token, self.uuid)
self._fetch_profile()


class MojangAuthenticatedUser(AuthenticatedUser):
def refresh(self):
auth = yggdrasil.refresh(self._access_token, self._refresh_token)
self._access_token, self._refresh_token = (
auth.access_token,
auth.client_token,
)

def close(self):
yggdrasil.invalidate(self._access_token, self._refresh_token)
self._access_token, self._refresh_token = None

@property
def secure(self):
"""Check wether user IP is secured. For more details checkout [`check_ip`][mojang.account.auth.security.check_ip]"""
return security.check_ip(self._access_token)

@property
def challenges(self):
"""Returns the list of challenges to verify user IP. For more details checkout [`get_challenges`][mojang.account.auth.security.get_challenges]"""
return security.get_challenges(self._access_token)

def verify(self, answers: list):
"""Verify user IP. For more details checkout [`verify_ip`][mojang.account.auth.security.verify_ip]"""
return security.verify_ip(self._access_token, answers)


class MicrosoftAuthenticatedUser(AuthenticatedUser):
def __init__(
self,
access_token: str,
refresh_token: str,
oauth_client: msal.ClientApplication,
) -> None:
super().__init__(access_token, refresh_token)
self.__oauth_client = oauth_client

def refresh(self):
response = self.__oauth_client.acquire_token_by_refresh_token(
self._refresh_token, ["XboxLive.signin"]
)
xbl_token, userhash = microsoft.authenticate_xbl(
response["access_token"]
)
xsts_token, userhash = microsoft.authenticate_xsts(xbl_token)
mc_token = microsoft.authenticate_minecraft(userhash, xsts_token)

self._access_token, self._refresh_token = (
mc_token,
response["refresh_token"],
)

def close(self):
self._access_token, self._refresh_token = None, None
66 changes: 40 additions & 26 deletions mojang/account/ext/microsoft.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
from mojang.exceptions import MicrosoftInvalidGrant, MicrosoftUserNotOwner
import msal
from ..auth import microsoft
from .session import UserSession
from mojang.exceptions import MicrosoftInvalidGrant, MicrosoftUserNotOwner

from .. import session
from ..auth import microsoft
from ._profile import MicrosoftAuthenticatedUser

_DEFAULT_SCOPES = ["XboxLive.signin"]

_DEFAULT_SCOPES = ['XboxLive.signin']

def microsoft_app(client_id: str, client_secret: str, redirect_uri: str = 'http://example.com') -> 'MicrosoftApp':
def microsoft_app(
client_id: str,
client_secret: str,
redirect_uri: str = "http://example.com",
) -> "MicrosoftApp":
"""It create an instance of [`MicrosoftApp`][mojang.account.ext.microsoft.MicrosoftApp] with the client id and client secret.
This app can then be used to get a [`UserSession`][mojang.account.ext.session.UserSession] like
This app can then be used to get a [`UserSession`][mojang.account.ext.session.UserSession] like
[`connect`][mojang.account.ext.session.connect] for Microsoft users.
Args:
Expand Down Expand Up @@ -38,52 +43,61 @@ def microsoft_app(client_id: str, client_secret: str, redirect_uri: str = 'http:
# they will be redirect to the uri you choose with a `code` parameter
# http://example.com?code=...
# You can use this code to authenticate the user
code = ...
code = ...
user = app.authenticate(code)
```
"""
client = msal.ClientApplication(client_id, client_credential=client_secret, authority='https://login.microsoftonline.com/consumers')
client = msal.ClientApplication(
client_id,
client_credential=client_secret,
authority="https://login.microsoftonline.com/consumers",
)
return MicrosoftApp(client, redirect_uri)


class MicrosoftApp:
"""This class allows you to authenticate Microsoft users to Minecraft"""

def __init__(self, client: msal.ClientApplication, redirect_uri: str):
self.__client = client
self.__redirect_uri = redirect_uri

def authorization_url(self, redirect_uri: str = None) -> str:
"""Returns the authorization url for Microsfot OAuth"""
return self.__client.get_authorization_request_url(scopes=_DEFAULT_SCOPES, redirect_uri=(redirect_uri or self.__redirect_uri))

def authenticate(self, auth_code: str, redirect_uri: str = None) -> 'UserSession':
return self.__client.get_authorization_request_url(
scopes=_DEFAULT_SCOPES,
redirect_uri=(redirect_uri or self.__redirect_uri),
)

def authenticate(
self, auth_code: str, redirect_uri: str = None
) -> "MicrosoftAuthenticatedUser":
"""Authenticate a user with the auth code
Args:
auth_code (str): The auth code from the redirect
redirect_uri (str, optional): The redirect uri for your app
Returns:
An instance of UserSession
An instance of MicrosoftAuthenticatedUser
"""
response = self.__client.acquire_token_by_authorization_code(auth_code, scopes=_DEFAULT_SCOPES, redirect_uri=(redirect_uri or self.__redirect_uri))
if response.get('error', False):
response = self.__client.acquire_token_by_authorization_code(
auth_code,
scopes=_DEFAULT_SCOPES,
redirect_uri=(redirect_uri or self.__redirect_uri),
)
if response.get("error", False):
raise MicrosoftInvalidGrant(*response.values())

xbl_token, userhash = microsoft.authenticate_xbl(response['access_token'])
xbl_token, userhash = microsoft.authenticate_xbl(
response["access_token"]
)
xsts_token, userhash = microsoft.authenticate_xsts(xbl_token)
access_token = microsoft.authenticate_minecraft(userhash, xsts_token)

if not session.owns_minecraft(access_token):
raise MicrosoftUserNotOwner()

return UserSession(access_token, response['refresh_token'], True, self._refresh_session, None)

def _refresh_session(self, access_token: str, refresh_token: str):
response = self.__client.acquire_token_by_refresh_token(refresh_token, _DEFAULT_SCOPES)
xbl_token, userhash = microsoft.authenticate_xbl(response['access_token'])
xsts_token, userhash = microsoft.authenticate_xsts(xbl_token)
mc_token = microsoft.authenticate_minecraft(userhash, xsts_token)

return mc_token, response['refresh_token']
return MicrosoftAuthenticatedUser(
access_token, response["refresh_token"], self.__client
)

0 comments on commit 9749e10

Please sign in to comment.