Skip to content

Commit

Permalink
Functions Name refactoring + removing docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
Lucino772 authored and lpalmisa committed Mar 23, 2021
1 parent 5ad9eb1 commit 6a3a5b2
Show file tree
Hide file tree
Showing 15 changed files with 232 additions and 596 deletions.
7 changes: 2 additions & 5 deletions mojang/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,5 @@
"""




from .main import user
from .api import names, status, uuid, uuids
from .main import (api_status, connect, get_username, get_uuid, get_uuids,
name_history, user)
1 change: 0 additions & 1 deletion mojang/api/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
from .base import status, names, uuid, uuids
27 changes: 27 additions & 0 deletions mojang/api/auth/security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from ...error.exceptions import *
from ...utils import web
from ..urls import SECURITY_CHALLENGES, SECURITY_CHECK


def is_user_ip_secure(access_token: str) -> bool:
try:
headers = web.get_auth_header(access_token)
web.request('get', SECURITY_CHECK, exceptions=(PayloadError, Unauthorized, IPNotSecured), headers=headers)
except IPNotSecured:
return False
else:
return True

def get_user_challenges(access_token: str) -> list:
headers = web.get_auth_header(access_token)
data = web.request('get', SECURITY_CHALLENGES, exceptions=(PayloadError, Unauthorized), headers=headers)
return data

def verify_user_ip(access_token: str, answers: list) -> bool:
try:
headers = web.get_auth_header(access_token)
web.request('post', SECURITY_CHECK, exceptions=(PayloadError, Unauthorized, IPVerificationError), json=answers, headers=headers)
except IPVerificationError:
return False
else:
return True
122 changes: 20 additions & 102 deletions mojang/api/auth/yggdrasil.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,22 @@
"""

from ...error.exceptions import *
from ...utils import web
from ..urls import AUTHENTICATE, INVALIDATE, REFRESH, SIGNOUT, VALIDATE
from ..validator import validate_context


@validate_context
def authenticate(ctx):
"""
Authenticate with username and password, only works with a context.
Required context variables
--------------------------
session: requests.Session
username: str
password: str
client_token: str, optional
(default is None)
Returns
-------
A dict with the following key: `access_token`, `client_token`, `uuid`,
`name`, `legacy` and `demo`
"""
def authenticate_user(username: str, password: str, client_token=None) -> dict:
payload = {
'username': ctx.username,
'password': ctx.password,
'clientToken': getattr(ctx, 'client_token', None),
'username': username,
'password': password,
'clientToken': client_token,
'agent': {
'name': 'Minecraft',
'version': 1
}
}

data = ctx.request('post', AUTHENTICATE, exceptions=(PayloadError, CredentialsError), json=payload)
ctx.session.headers.update({'Authorization': 'Bearer {}'.format(data['accessToken'])})
data = web.request('post', AUTHENTICATE, exceptions=(PayloadError, CredentialsError), json=payload)

return {
'access_token': data['accessToken'],
Expand All @@ -50,31 +29,13 @@ def authenticate(ctx):
'demo': not data['selectedProfile'].get('paid', True)
}

@validate_context
def refresh(ctx):
"""
Refresh an invalid token, only works with a context.
Required context variables
--------------------------
session: requests.Session
access_token: str
client_token: str
Returns
-------
A dict with the following key: `access_token`, `client_token`, `uuid`,
`name`, `legacy` and `demo`
"""
def refresh_token(access_token: str, client_token: str):
payload = {
'accessToken': ctx.access_token,
'clientToken': ctx.client_token
'accessToken': access_token,
'clientToken': client_token
}

data = ctx.request('post', REFRESH, exceptions=(PayloadError, TokenError), json=payload)
ctx.session.headers.update({'Authorization': 'Bearer {}'.format(data['accessToken'])})
data = web.request('post', REFRESH, exceptions=(PayloadError, TokenError), json=payload)

return {
'access_token': data['accessToken'],
Expand All @@ -85,69 +46,26 @@ def refresh(ctx):
'demo': not data['selectedProfile'].get('paid', True)
}

@validate_context
def validate(ctx):
"""
Check if token is valid, only works with a context.
Required context variables
--------------------------
session: requests.Session
access_token: str
client_token: str
Returns
-------
True if valid
"""
def validate_token(access_token: str, client_token: str):
payload = {
'accessToken': ctx.access_token,
'clientToken': ctx.client_token
'accessToken': access_token,
'clientToken': client_token
}

ctx.request('post', VALIDATE, exceptions=(PayloadError, TokenError), json=payload)
return True
web.request('post', VALIDATE, exceptions=(PayloadError, TokenError), json=payload)

@validate_context
def signout(ctx):
"""
Signout with username and password, only works with a context.
Required context variables
--------------------------
session: requests.Session
username: str
password: str
"""
def signout_user(username: str, password: str):
payload = {
'username': ctx.username,
'password': ctx.password
}

ctx.request('post', SIGNOUT, exceptions=(PayloadError, CredentialsError), json=payload)
ctx.session.headers.pop('Authorization')
web.request('post', SIGNOUT, exceptions=(PayloadError, CredentialsError), json=payload)

@validate_context
def invalidate(ctx):
"""
Invalidate current token, only works with a context.
Required context variables
--------------------------
session: requests.Session
access_token: str
client_token: str
"""
def invalidate_token(access_token: str, client_token: str):
payload = {
'accessToken': ctx.access_token,
'clientToken': ctx.client_token
'accessToken': access_token,
'clientToken': client_token
}

ctx.request('post', INVALIDATE, exceptions=(PayloadError, TokenError), json=payload)
ctx.session.headers.pop('Authorization')
web.request('post', INVALIDATE, exceptions=(PayloadError, TokenError), json=payload)
134 changes: 45 additions & 89 deletions mojang/api/base.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,21 @@
"""
Functions for the basic MojangAPI
"""

import datetime as dt
import json
from base64 import urlsafe_b64decode

from ..error.exceptions import PayloadError
from ..globals import current_ctx
from .urls import GET_UUID, GET_UUIDS, NAME_HISTORY, STATUS_CHECK
from .validator import default_context

from ..utils import web
from ..utils.cape import Cape
from ..utils.skin import Skin
from .urls import GET_PROFILE, GET_UUID, GET_UUIDS, NAME_HISTORY, STATUS_CHECK

@default_context
def status(service: str = None) -> dict:
"""
Retrieve the Mojang API status, work without context.

Parameters
----------
service: str, optional
If given, return only status for this service (default is None)
Returns
-------
By default it will returns a `dict` where the keys are the
services and the values are the status. If service is not
None then only the status for the given service is returned.
"""
def api_status(service: str = None) -> dict:
res = {}

data = current_ctx.request('get', STATUS_CHECK)
data = web.request('get', STATUS_CHECK)
for s in data:
res.update(s)

Expand All @@ -37,94 +24,63 @@ def status(service: str = None) -> dict:

return res

@default_context
def names(uuid: str) -> list:
"""
Get the name history for a given uuid, work without context.
Parameters
----------
uuid: str
Returns
-------
A list of tuples. Each tuple contains the `name` and the `datetime`
it was changed.
"""
def name_history(uuid: str) -> list:
names = []

data = current_ctx.request('get', NAME_HISTORY.format(uuid=uuid))
data = web.request('get', NAME_HISTORY.format(uuid=uuid))
for item in data:
if 'changedToAt' in item:
item['changedToAt'] = dt.datetime.fromtimestamp(item['changedToAt'])
names.append((item['name'], item.get('changedToAt',None)))

return names

@default_context
def uuid(username: str, only_uuid: bool = True) -> dict:
"""
Return the uuid of a given username
Parameters
----------
username: str
only_uuid: bool, optional
(default is True)
Returns
-------
By default it returns only the uuid for the given username.
If `only_uuid` is set to false, it will also return the
following values `legacy`, `demo` and `name`
"""

data = current_ctx.request('get', GET_UUID.format(name=username))
def get_uuid(username: str, only_uuid: bool = True) -> dict:
data = web.request('get', GET_UUID.format(name=username))

data['uuid'] = data.pop('id')
data['legacy'] = data.get('legacy', False)
data['demo'] = data.get('demo', False)
if data:
data['uuid'] = data.pop('id')
data['legacy'] = data.get('legacy', False)
data['demo'] = data.get('demo', False)

if only_uuid:
return data['uuid']

return data

@default_context
def uuids(usernames: list, only_uuid: bool = True) -> list:
"""
Return the uuid for multiple username
if only_uuid:
return data['uuid']
else:
data = None

Parameters
----------
usernames: list
only_uuid: bool, optional
(default is True)
return data

Returns
-------
By default it returns only the uuid for each username.
If `only_uuid` is set to false, it will also return the
following values `legacy`, `demo` and `name` for each
username.
"""
res = []

if len(usernames) > 0:
data = current_ctx.request('post', GET_UUIDS, exceptions=(PayloadError,),json=usernames[:10])
def get_uuids(usernames: list, only_uuid: bool = True) -> list:
res = [None]*len(usernames)

for i in range(0, len(usernames), 10):
data = web.request('post', GET_UUIDS, exceptions=(PayloadError,),json=usernames[i:i+10])
for item in data:
index = usernames.index(item['name'])
if not only_uuid:
res.append({
res[index] = {
'uuid': item['id'],
'name': item['name'],
'legacy': item.get('legacy',False),
'demo': item.get('demo', False)
})
}
else:
res.append(item['id'])

if len(usernames[:10]) > 0:
res.extend(uuids(usernames[10:], only_uuid=only_uuid))
res[index] = item['id']

return res

def get_profile(uuid: str) -> dict:
data = web.request('get', GET_PROFILE.format(uuid=uuid), exceptions=(PayloadError,))
if data:
res = {'name': None, 'uuid': None,'skins': [], 'capes': []}
res['uuid'] = data['id']
res['name'] = data['name']

for d in data['properties']:
textures = json.loads(urlsafe_b64decode(d['value']))['textures']
if 'SKIN' in textures.keys():
res['skins'].append(Skin(textures['SKIN']['url'], textures['SKIN'].get('metadata',{}).get('model','classic')))
if 'CAPE' in textures.keys():
res['skins'].append(Cape(textures['CAPE']['url']))

return res

0 comments on commit 6a3a5b2

Please sign in to comment.