Skip to content

Commit

Permalink
Added microsoft_app function to authenticate to minecraft using Micosoft
Browse files Browse the repository at this point in the history
  • Loading branch information
Lucino772 committed Sep 15, 2021
1 parent 381bb4d commit c69ef52
Show file tree
Hide file tree
Showing 11 changed files with 120 additions and 129 deletions.
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ flake8 = "*"
requests = "*"
validators = "*"
pyjwt = {extras = ["crypto"], version = "*"}
msal = "*"

[requires]
python_version = "3.8"
10 changes: 9 additions & 1 deletion Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mojang/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@
Checkout the [`documentation`](https://pymojang.readthedocs.io/en/latest/)
"""
from .account import get_uuid, get_uuids, names, status, user, connect
from .account import get_uuid, get_uuids, names, status, user, connect, microsoft_app
3 changes: 2 additions & 1 deletion mojang/account/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .base import status, get_uuid, get_uuids, names, user
from .ext.session import connect
from .ext.session import connect
from .ext.microsoft import microsoft_app
74 changes: 0 additions & 74 deletions mojang/account/auth/microsoft.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,86 +3,12 @@

from ..utils.auth import URLs


HEADERS_ACCEPT_URL_ENCODED = {
'content-type': 'application/x-www-form-urlencoded'
}

HEADERS_ACCEPT_JSON = {
'content-type': 'application/json',
'accept': 'application/json'
}


def get_login_url(client_id: str, redirect_uri: str = 'http://example.com') -> str:
"""Returns the login url for the browser
Args:
client_id (str): Azure Active Directory App's client id
redirect_uri (str, optional): The redirect uri of your application
"""
return URLs.microsoft_authorize(client_id, redirect_uri)


def authorize(client_id: str, client_secret: str, auth_code: str, redirect_uri: str = 'http://example.com') -> tuple:
"""Retrieve the access token and refresh token from the given auth code
Args:
client_id (str): Azure Active Directory App's client id
client_secret (str): Azure Active Directory App's client secret
auth_code (str): The auth code received from the login url
redirect_uri (str, optional): The redirect uri of your application
Returns:
A tuple containing the access token and refresh token
Raises:
MicrosoftInvalidGrant: If the auth code is invalid
"""

data = {
'client_id': client_id,
'client_secret': client_secret,
'code': auth_code,
'grant_type': 'authorization_code',
'redirect_uri': redirect_uri
}

response = requests.post(URLs.microsoft_token(), headers=HEADERS_ACCEPT_URL_ENCODED, data=data)
data = handle_response(response, MicrosoftInvalidGrant)

return data['access_token'], data['refresh_token']

def refresh(client_id: str, client_secret: str, refresh_token: str, redirect_uri: str = 'http://example.com') -> tuple:
"""Refresh an access token
Args:
client_id (str): Azure Active Directory App's client id
client_secret (str): Azure Active Directory App's client secret
refresh_token (str): The refresh token
redirect_uri (str, optional): The redirect uri of your application
Returns:
A tuple containing the access token and refresh token
Raises:
MicrosoftInvalidGrant: If the auth code is invalid
"""

data = {
'client_id': client_id,
'client_secret': client_secret,
'refresh_token': refresh_token,
'grant_type': 'refresh_token',
'redirect_uri': redirect_uri
}

response = requests.post(URLs.microsoft_token(), headers=HEADERS_ACCEPT_URL_ENCODED, data=data)
data = handle_response(response, MicrosoftInvalidGrant)

return data['access_token'], data['refresh_token']


def authenticate_xbl(auth_token: str) -> tuple:
"""Authenticate with Xbox Live
Expand Down
36 changes: 36 additions & 0 deletions mojang/account/ext/microsoft.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import msal
from ..auth import microsoft
from .session import UserSession


_DEFAULT_SCOPES = ['XboxLive.signin']

def microsoft_app(client_id: str, client_secret: str, redirect_uri: str = 'http://example.com'):
client = msal.ClientApplication(client_id, client_credential=client_secret, authority='https://login.microsoftonline.com/consumers')
return MicrosoftApp(client, redirect_uri)


class MicrosoftApp:

def __init__(self, client: msal.ClientApplication, redirect_uri: str):
self.__client = client
self.__redirect_uri = redirect_uri

def authorization_url(self, redirect_uri: str = None) -> str:
return self.__client.get_authorization_request_url(scopes=_DEFAULT_SCOPES, redirect_uri=(redirect_uri or self.__redirect_uri))

def authenticate(self, auth_code: str, redirect_uri: str = None) -> 'UserSession':
response = self.__client.acquire_token_by_authorization_code(auth_code, scopes=_DEFAULT_SCOPES, redirect_uri=(redirect_uri or self.__redirect_uri))
xbl_token, userhash = microsoft.authenticate_xbl(response['access_token'])
xsts_token, userhash = microsoft.authenticate_xsts(xbl_token)
access_token = microsoft.authenticate_minecraft(userhash, xsts_token)

return UserSession(access_token, response['refresh_token'], True, self._refresh_session, None)

def _refresh_session(self, access_token: str, refresh_token: str):
response = self.__client.acquire_token_by_refresh_token(refresh_token, _DEFAULT_SCOPES)
xbl_token, userhash = microsoft.authenticate_xbl(response['access_token'])
xsts_token, userhash = microsoft.authenticate_xsts(xbl_token)
mc_token = microsoft.authenticate_minecraft(userhash, xsts_token)

return mc_token, response['refresh_token']
82 changes: 42 additions & 40 deletions mojang/account/ext/session.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import datetime as dt
from typing import Optional, Tuple
from typing import Callable, Optional, Tuple

from ..structures.session import Cape, Skin

from .. import session, user
from .. import session
from ..auth import security, yggdrasil
from ..structures.base import NameInfoList
from ..structures.session import Cape, Skin


def _refresh_method(access_token: str, client_token: str):
auth = yggdrasil.refresh(access_token, client_token)
return auth.access_token, auth.client_token


def connect(username: str, password: str, client_token: Optional[str] = None) -> 'UserSession':
Expand Down Expand Up @@ -42,8 +46,7 @@ def connect(username: str, password: str, client_token: Optional[str] = None) ->
```
"""
auth = yggdrasil.authenticate(username, password, client_token)
return UserSession(auth.access_token, auth.client_token)

return UserSession(auth.access_token, auth.client_token, False, _refresh_method, yggdrasil.invalidate)


class UserSession:
Expand All @@ -69,52 +72,44 @@ class UserSession:
created_at: dt.datetime
name_change_allowed: bool

def __init__(self, access_token: str, client_token: str):
"""Create a user session with access token and client token.
The access token will be refreshed once the class is initiated
Args:
access_token (str): The session's access token
client_token (str): The session's client token
"""
def __init__(self, access_token: str, client_token: str, has_migrated: bool, refresh_method: Callable[[str, str], Tuple], close_method: Callable[[str, str], Tuple]):
self.__access_token = access_token
self.__client_token = client_token
self.__refresh_method = refresh_method
self.__close_method = close_method

self.refresh()
self.__has_migrated = has_migrated
self._fetch_profile()

def refresh(self):
"""Refresh the full user session, including the data"""
auth = yggdrasil.refresh(self.__access_token, self.__client_token)

# Update tokens
self.__access_token = auth.access_token
self.__client_token = auth.client_token

# Update info
self.uuid = auth.uuid
self.name = auth.name
self.is_demo = auth.demo
self.is_legacy = auth.legacy

# Fetch other data
self._fetch_data()
"""Refresh the session's token"""
if callable(self.__refresh_method):
self.__access_token, self.__client_token = self.__refresh_method(self.__access_token, self.__client_token)

self._fetch_profile()

def _fetch_data(self):
def _fetch_profile(self):
# Load profile
profile = user(self.uuid)
profile = session.get_profile(self.__access_token)
self.name = profile.name
self.uuid = profile.uuid
self.names = profile.names
self.skin = profile.skin
self.cape = profile.cape
self.is_demo = profile.is_demo
self.is_legacy = profile.is_legacy
del profile

# Load name change
name_change = session.get_user_name_change(self.__access_token)
self.name_change_allowed = name_change.allowed
self.created_at = name_change.created_at

def close(self):
"""Close the session and invalidates the access token"""
yggdrasil.invalidate(self.__access_token, self.__client_token)
if callable(self.__close_method):
self.__close_method(self.__access_token, self.__client_token)

self.__access_token = None
self.__client_token = None

Expand All @@ -127,16 +122,23 @@ def token_pair(self) -> Tuple[str, str]:
@property
def secure(self):
"""Check wether user IP is secured. For more details checkout [`check_ip`][mojang.account.auth.security.check_ip]"""
return security.check_ip(self.__access_token)
if not self.__has_migrated:
return security.check_ip(self.__access_token)

return True

@property
def challenges(self):
"""Returns the list of challenges to verify user IP. For more details checkout [`get_challenges`][mojang.account.auth.security.get_challenges]"""
return security.get_challenges(self.__access_token)
if not self.__has_migrated:
return security.get_challenges(self.__access_token)

return []

def verify(self, answers: list):
"""Verify user IP. For more details checkout [`verify_ip`][mojang.account.auth.security.verify_ip]"""
return security.verify_ip(self.__access_token, answers)
if not self.__has_migrated:
return security.verify_ip(self.__access_token, answers)

# Name
def change_name(self, name: str):
Expand All @@ -146,7 +148,7 @@ def change_name(self, name: str):
name (str): The new name
"""
session.change_user_name(self.__access_token, name)
self._fetch_data()
self._fetch_profile()

# Skin
def change_skin(self, path: str, variant: Optional[str] = 'classic'):
Expand All @@ -157,9 +159,9 @@ def change_skin(self, path: str, variant: Optional[str] = 'classic'):
variant (str, optional): The variant of skin (default to 'classic')
"""
session.change_user_skin(self.__access_token, path, variant)
self._fetch_data()
self._fetch_profile()

def reset_skin(self):
"""Reset user skin. For more details checkout [`reset_user_skin`][mojang.account.session.reset_user_skin]"""
session.reset_user_skin(self.__access_token, self.uuid)
self._fetch_data()
self._fetch_profile()
25 changes: 24 additions & 1 deletion mojang/account/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import requests

from ..exceptions import *
from .structures.session import NameChange, Skin
from .structures.session import NameChange, Skin, Cape
from .structures.base import UserProfile
from .utils.auth import BearerAuth
from .utils.urls import URLs
from .base import names


def get_user_name_change(access_token: str) -> NameChange:
Expand Down Expand Up @@ -145,3 +147,24 @@ def owns_minecraft(access_token: str, verify_sig: bool = False, public_key: str
jwt.decode(data['signature'], public_key, algorithms=['RS256'])

return not len(data['items']) == 0

def get_profile(access_token: str):
response = requests.get(URLs.get_profile(), auth=BearerAuth(access_token))
data = handle_response(response, Unauthorized)

_dict = dict.fromkeys(UserProfile._fields, None)

_dict['name'] = data['name']
_dict['uuid'] = data['id']
_dict['names'] = names(data['id'])

if len(data['skins']) > 0:
_dict['skin'] = Skin(data['skins'][0]['url'], data['skins'][0]['variant'])

if len(data['capes']) > 0:
_dict['cape'] = Cape(data['capes'][0]['url'])

_dict['is_legacy'] = False
_dict['is_demo'] = False

return UserProfile(**_dict)
10 changes: 0 additions & 10 deletions mojang/account/utils/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,6 @@ def get_challenges(cls):
return 'https://api.mojang.com/user/security/challenges'

# Microsoft
@classmethod
def microsoft_authorize(cls, client_id: str, redirect_uri: str):
"""Returns the authorization url for Microsoft OAuth"""
return f'https://login.live.com/oauth20_authorize.srf?client_id={client_id}&response_type=code&redirect_uri={redirect_uri}&scope=XboxLive.signin%20offline_access'

@classmethod
def microsoft_token(cls):
"""Returns the token url for Microsoft OAuth"""
return 'https://login.live.com/oauth20_token.srf'

@classmethod
def microsoft_xbl_authenticate(cls):
"""Returns the authentication url for Xbox Live"""
Expand Down
4 changes: 4 additions & 0 deletions mojang/account/utils/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,7 @@ def reset_skin(cls, uuid: str):
@classmethod
def check_minecraft_onwership(cls):
return 'https://api.minecraftservices.com/entitlements/mcstore'

@classmethod
def get_profile(cls):
return 'https://api.minecraftservices.com/minecraft/profile'
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"Programming Language :: Python :: 3.9"
],
packages=setuptools.find_packages(),
install_requires=['requests', 'validators', 'pyjwt[crypto]'],
install_requires=['requests', 'validators', 'pyjwt[crypto]', 'msal'],
keywords=['minecraft', 'mojang', 'python3'],
project_urls={
'Documentation': 'https://pymojang.readthedocs.io/en/latest/'
Expand Down

0 comments on commit c69ef52

Please sign in to comment.