diff --git a/tryhackme/badge.py b/tryhackme/badge.py index b94cd97..8d5120f 100644 --- a/tryhackme/badge.py +++ b/tryhackme/badge.py @@ -3,7 +3,7 @@ class Badge: def __init__(self, state, data): self._state = state - + self.earned = False self._from_data(data) def _from_data(self, data): diff --git a/tryhackme/checks.py b/tryhackme/checks.py index 6d82daf..cddfd05 100644 --- a/tryhackme/checks.py +++ b/tryhackme/checks.py @@ -1,27 +1,34 @@ from .errors import * +class _base_check: + @property + def name(self): + return type(self).__name__.split('_')[1] -class _type_convertor: + +class _type_check(_base_check): _TYPES = () _DEFAULT = None def convert(self, cls, arg): if arg is None: return self._DEFAULT if arg in self._TYPES: return arg else: raise TypeNotInTypeList(f"'{str(arg)}' is an invalid {self.name} type.") - - @property - def name(self): - return type(self).__name__.split('_')[1] -class _vpn_types(_type_convertor): +class _notNone_check(_base_check): + def convert(self, cls, arg): + if arg is not None: + return arg + else: raise NotValidUrlParameters(f"'{str(arg)}' is not allowed to be null {self.name} type.") + +class _vpn_types(_type_check): _TYPES = ('machines', 'networks') _DEFAULT = 'machines' -class _county_types(_type_convertor): +class _county_types(_type_check): _TYPES = ("AF","AX","AL","DZ","AS","AD","AO","AI","AQ","AG","AR","AM","AW","AU","AT","AZ","BS","BH","BD","BB","BY","BE","BZ","BJ","BM","BT","BO","BQ","BA","BW","BV","BR","IO","BN","BG","BF","BI","KH","CM","CA","CV","KY","CF","TD","CL","CN","CX","CC","CO","KM","CG","CD","CK","CR","CI","HR","CU","CW","CY","CZ","DK","DJ","DM","DO","EC","EG","SV","GQ","ER","EE","ET","FK","FO","FJ","FI","FR","GF","PF","TF","GA","GM","GE","DE","GH","GI","GR","GL","GD","GP","GU","GT","GG","GN","GW","GY","HT","HM","VA","HN","HK","HU","IS","IN","ID","IR","IQ","IE","IM","IL","IT","JM","JP","JE","JO","KZ","KE","KI","KP","KR","KW","KG","LA","LV","LB","LS","LR","LY","LI","LT","LU","MO","MK","MG","MW","MY","MV","ML","MT","MH","MQ","MR","MU","YT","MX","FM","MD","MC","MN","ME","MS","MA","MZ","MM","NA","NR","NP","NL","NC","NZ","NI","NE","NG","NU","NF","MP","NO","OM","PK","PW","PS","PA","PG","PY","PE","PH","PN","PL","PT","PR","QA","RE","RO","RU","RW","BL","SH","KN","LC","MF","PM","VC","WS","SM","ST","SA","SN","RS","SC","SL","SG","SX","SK","SI","SB","SO","ZA","GS","SS","ES","LK","SD","SR","SJ","SZ","SE","CH","SY","TW","TJ","TZ","TH","TL","TG","TK","TO","TT","TN","TR","TM","TC","TV","UG","UA","AE","GB","US","UM","UY","UZ","VU","VE","VN","VG","VI","WF","EH","YE","ZM","ZW") -class _leaderboard_types(_type_convertor): +class _leaderboard_types(_type_check): _TYPES = ("monthly", "") diff --git a/tryhackme/client.py b/tryhackme/client.py index 6236d4f..68beb7d 100644 --- a/tryhackme/client.py +++ b/tryhackme/client.py @@ -1,8 +1,9 @@ from .http import HTTP from .state import State +from .user import ClientUser -# TODO: build out Hackivities (room search), HTML scrapper for username and CSRF token, error build out, de HTML question object +# TODO: build out Hackivities (room search) error build out # TODO: add VM, add GAMES, add VPN, user: (Team, messages, notifications) # ? maybe a writeup class but maybe not @@ -13,6 +14,12 @@ def __init__(self, session=None): def login(self, session): self.http.static_login(session) + if self._state.authenticated: + try: + self._state.user = ClientUser(state=self._state, username=self.http.username) + except Exception as e: + print("Failed to create CLient user: ", str(e)) + self._state.authenticated = False def get_room(self, room_code): try: @@ -40,18 +47,17 @@ def get_user(self, username): def get_badge(self, badge_name): return self._state.get_badge(badge_name) - def get_badges(self): return self._state.badges def get_practice_rooms(self): practice_rooms = self.http.get_practise_rooms() return_rooms = [] - return_rooms += [self._state.store_room(data=room) for room in practice_rooms.get("featured", [])] - return_rooms += [self._state.store_room(data=room) for room in practice_rooms.get("webExploitation", [])] - return_rooms += [self._state.store_room(data=room) for room in practice_rooms.get("windowsExploitation", [])] - return_rooms += [self._state.store_room(data=room) for room in practice_rooms.get("defensive", [])] - return_rooms += [self._state.store_room(data=room) for room in practice_rooms.get("recommended", [])] + return_rooms += [self._state.get_room(room_code=room.get("code")) for room in practice_rooms.get("featured", [])] + return_rooms += [self._state.get_room(room_code=room.get("code")) for room in practice_rooms.get("webExploitation", [])] + return_rooms += [self._state.get_room(room_code=room.get("code")) for room in practice_rooms.get("windowsExploitation", [])] + return_rooms += [self._state.get_room(room_code=room.get("code")) for room in practice_rooms.get("defensive", [])] + return_rooms += [self._state.get_room(room_code=room.get("code")) for room in practice_rooms.get("recommended", [])] return return_rooms # ! network is basicly nothing at the moment since i cant access is (im not a premium member) @@ -100,4 +106,7 @@ def glossary(self): return self.http.get_glossary_terms() @property def user(self): - return self._state.get_client_user() \ No newline at end of file + return self._state.user + @property + def authenticated(self): + return self._state.authenticated \ No newline at end of file diff --git a/tryhackme/cog.py b/tryhackme/cog.py index e121c41..838d467 100644 --- a/tryhackme/cog.py +++ b/tryhackme/cog.py @@ -35,7 +35,7 @@ def convert(self, *args, **kwargs): if arg in annotions: try: result = self.function.__annotations__[arg] - out_kwargs[arg] = result().convert(kwargs[arg]) + out_kwargs[arg] = result().convert(self.cls, kwargs[arg]) except Exception as e: raise e else: out_kwargs[arg] = kwargs[arg] diff --git a/tryhackme/errors.py b/tryhackme/errors.py index 7f97865..a50c3ff 100644 --- a/tryhackme/errors.py +++ b/tryhackme/errors.py @@ -15,7 +15,7 @@ def __init__(self, request, route, data): self.data = data def __str__(self): - return f"{type(self).__name__}(code={self.request.status_code}, return_URL={self.route.url}, data_length: {self.data.__len__()})" + return f"{type(self).__name__}(code={self.request.status_code}, URL={self.route.path}, returned_url={self.request.url}, data_length: {self.data.__len__()})" class Unauthorized(WebError): pass diff --git a/tryhackme/http.py b/tryhackme/http.py index 273f140..c8bc26e 100644 --- a/tryhackme/http.py +++ b/tryhackme/http.py @@ -5,7 +5,7 @@ import requests from . import __version__, errors, utils -from .checks import _county_types, _leaderboard_types, _vpn_types +from .checks import _county_types, _leaderboard_types, _vpn_types, _notNone_check from .cog import request_cog GET='get' @@ -14,14 +14,14 @@ class HTTPClient: __CSRF_token_regex = re.compile("const csrfToken[ ]{0,1}=[ ]{0,1}[\"|'](.{36})[\"|']") - __Username_regex = re.compile("const username[ ]{0,1}=[ ]{0,1}[\"|'](.{16})[\"|']") + __Username_regex = re.compile("const username[ ]{0,1}=[ ]{0,1}[\"|'](.{1,16})[\"|']") def __init__(self, session=None): self.authenticated = False self.__session = requests.Session() self.satic_session = requests.Session() self.connect_sid = None self._CSRF_token = None - self._username = None + self.username = None self.user_agent = f'Tryhackme: (https://github.com/GnarLito/thm-api-py {__version__}) Python/{sys.version_info[0]}.{sys.version_info[1]} requests/{requests.__version__}' @@ -39,21 +39,30 @@ def static_login(self, session): try: self.request(RouteList.get_unseen_notifications()) self.authenticated = True - self.retrieve_CSRF_token() - self.retrieve_username() - except: pass + self._CSRF_token = self.retrieve_CSRF_token() + self.username = self.retrieve_username() + except Exception as e: + print("session Issue:", e) def retrieve_CSRF_token(self): if not self.authenticated: - return - page = self.request(RouteList.get_profile_page()) - self._CSRF_token = self.__CSRF_token_regex.search(page).group(1) + return None + try: + page = self.request(RouteList.get_profile_page()) + return self._HTTPClient__CSRF_token_regex.search(page).group(1) + except AttributeError: + self.authenticated = False + return None def retrieve_username(self): if not self.authenticated: - return - page = self.request(RouteList.get_profile_page()) - self.username = self.__Username_regex.search(page).group(1) + return None + try: + page = self.request(RouteList.get_profile_page()) + return self._HTTPClient__Username_regex.search(page).group(1) + except AttributeError: + self.authenticated = False + return None def request(self, route, **kwargs): session = self.__session @@ -101,7 +110,7 @@ def request(self, route, **kwargs): raise errors.ServerError(request=r, route=route, data=data) except Exception as e: - print(e) + raise e class Route: @@ -109,13 +118,15 @@ class Route: BASE = "https://www.tryhackme.com" def __init__(self, method=GET, path='', **parameters): self.method = method + self._path = path self.path = path url = self.BASE + self.path options = parameters.pop("options", None) if parameters: try: - self.url = url.format(**{k: _uriquote(v) if isinstance(v, str) else v for k, v in parameters.items()}) + self.path = self.path.format(**{k: _uriquote(v) if isinstance(v, str) else v for k, v in parameters.items()}) + self.url = self.BASE + self.path except Exception as e: raise errors.NotValidUrlParameters(e) else: @@ -200,13 +211,14 @@ def get_team_info(**parameters): return Route(path="/api/team/is-member", **para # * user -notifications - def get_unseen_notifications(**parameters): return Route(path="/api/notifications/has-unseen", **parameters) - def get_all_notifications( **parameters): return Route(path="/api/notifications/get", **parameters) + def get_unseen_notifications(**parameters): return Route(path="/notifications/has-unseen", **parameters) + def get_all_notifications( **parameters): return Route(path="/notifications/get", **parameters) # * user -messages - def get_unseen_messages( **parameters): return Route(path="/api/message/has-unseen", **parameters) - def get_all_group_messages(**parameters): return Route(path="/api/message/group/get-all", **parameters) + def get_unseen_messages( **parameters): return Route(path="/message/has-unseen", **parameters) + def get_all_group_messages(**parameters): return Route(path="/message/group/get-all", **parameters) + def get_group_messages( **parameters): return Route(path="/message/group/get/{group_id}", **parameters) # * user -room @@ -353,6 +365,8 @@ def get_unseen_messages(self): return self.request(RouteList.get_unseen_messages()) def get_all_group_messages(self): return self.request(RouteList.get_all_group_messages()) + def get_group_messages(self, group_id): + return self.request(RouteList.get_group_messages(group_id)) # * user -room @@ -365,17 +379,17 @@ def get_user_created_rooms(self, username, limit:int=10, page:int=1): # * user - def get_user_rank(self, username): + def get_user_rank(self, username : _notNone_check): return self.request(RouteList.get_user_rank(username=username)) - def get_user_activty(self, username): + def get_user_activty(self, username : _notNone_check): return self.request(RouteList.get_user_activty(username=username)) def get_all_friends(self): return self.request(RouteList.get_all_friends()) - def get_discord_user(self, username): + def get_discord_user(self, username : _notNone_check): return self.request(RouteList.get_discord_user(username=username)) - def get_user_exist(self, username): + def get_user_exist(self, username : _notNone_check): return self.request(RouteList.get_user_exist(username=username)) - def search_user(self, username): + def search_user(self, username : _notNone_check): return self.request(RouteList.search_user(username=username)) # * room diff --git a/tryhackme/message.py b/tryhackme/message.py new file mode 100644 index 0000000..0276e02 --- /dev/null +++ b/tryhackme/message.py @@ -0,0 +1,41 @@ +from . import utils + +class Message: + def __init__(self, state, group, data): + self._state = state + self.group = group + self._from_data(data) + + def _from_data(self, data): + self.message = data.get("message") + self.inserted = data.get("inserted") + self.user = self.group.get_user_from_userId(data.get("userId")) + + +# * can only be used on `get_all_message_groups` api call +class MessageGroup: + def __init__(self, state, data): + self._state = state + self.messages = [] + self._from_data(data) + + def _from_data(self, data): + self.id = data.get("groupId") + self.title = data.get("title") + self._users = data.get("users") + + self._sync(data) + + def _sync(self, data): + self.messages = [Message(state=self._state, group=self, data=message) for message in self._state.http.get_group_messages(self.id)] + + @property + def users(self): + return [self._stats.store_user(user) for user in self._users] + + def get_user_from_userId(self, userId): + try: + username = [user.get("username") for user in self._users if user.get("userId") == userId] + return self._state.get_user(username[0]) + except: + return None \ No newline at end of file diff --git a/tryhackme/notifications.py b/tryhackme/notifications.py new file mode 100644 index 0000000..e69de29 diff --git a/tryhackme/question.py b/tryhackme/question.py index 5882b0a..2ccdda3 100644 --- a/tryhackme/question.py +++ b/tryhackme/question.py @@ -1,6 +1,5 @@ from . import utils -# TODO: de HTML all the things class Question: def __init__(self, state, data): self._state = state @@ -8,21 +7,17 @@ def __init__(self, state, data): def _from_data(self, data): self.raw_question = data.get("question") - self.number = data.get("questionNo") + self.question = utils.HTML_parse(self.raw_question) self.raw_hint = data.get("hint") + self.hint = utils.HTML_parse(self.raw_hint) + self.number = data.get("questionNo") + + # * only when valid session is used self.raw_description = data.get("answerDesc", "") + self.description = utils.HTML_parse(self.raw_description) self.extra_points = data.get("extraPoints", None) self.correct = data.get("correct", False) self.attempts = data.get("attempts", 0) self.submission = data.get("submission", "") self.has_answer = data.get("noAnswer", False) - - @property - def question(self): - return utils.HTML_parse(self.raw_question) - @property - def description(self): - return utils.HTML_parse(self.raw_description) - @property - def hint(self): - return utils.HTML_parse(self.raw_hint) \ No newline at end of file + \ No newline at end of file diff --git a/tryhackme/room.py b/tryhackme/room.py index c3ba880..0b05572 100644 --- a/tryhackme/room.py +++ b/tryhackme/room.py @@ -10,7 +10,10 @@ def __init__(self, state, data): self._creators = [] if not data.get('success', False): - raise NotImplemented("failed to create room, no success value returned") + if data.get("code") == 5: + raise NotImplemented(f"Room: {data.get('roomCode')}, Unable to load room: {data.get('message')}") + else: + raise NotImplemented("failed to create room, no success value returned") self._from_data(data) @@ -57,7 +60,6 @@ def scoreboard(self): return self._state.http.get_room_scoreboard(room_code=self.name) @property def tasks(self): - # TODO: add sessionless http client for no session task gathering if self.freeToUse or self._state.subscribed: return [RoomTask(state=self._state, data=task) for task in self._state.http.get_room_tasks(room_code=self.name).get('data')] else: diff --git a/tryhackme/state.py b/tryhackme/state.py index facfee0..a0364bf 100644 --- a/tryhackme/state.py +++ b/tryhackme/state.py @@ -4,27 +4,27 @@ from .path import Path from .module import Module from .user import User, ClientUser +from .message import MessageGroup +from .team import Team from .badge import Badge from .serie import Serie from .network import Network from .vpn import VPN from .http import HTTP -# TODO: vpn +# TODO: vpn class State: def __init__(self, http : HTTP): self.http = http - - self._CRRF_token = self.http._CSRF_token - if self.authenticated: - self.user = ClientUser(self.http.username) - else: - self.user = None + self.user = None + self._team = None + self._CRRF_token = self.http.retrieve_CSRF_token() self._rooms = weakref.WeakValueDictionary() self._paths = weakref.WeakValueDictionary() self._modules = weakref.WeakValueDictionary() self._users = weakref.WeakValueDictionary() + self._message_groups = weakref.WeakValueDictionary() self._badges = weakref.WeakValueDictionary() self._series = weakref.WeakValueDictionary() self._networks = weakref.WeakValueDictionary() @@ -33,9 +33,17 @@ def __init__(self, http : HTTP): def _sync(self): for badge in self.http.get_all_badges(): self.store_badge(badge) + def _clear_client(self): + self._message_groups = weakref.WeakValueDictionary() + self._team = None + def get_client_user(self): return self.user + @property + def rooms(self): + return list(self._rooms.values()) + def store_room(self, data): room_code = data.get("roomCode") try: @@ -51,6 +59,10 @@ def get_room(self, room_code): room_data = self.http.get_room_details(room_code=room_code) return self.store_room(room_data) + @property + def paths(self): + return list(self._paths.values()) + def store_path(self, data): path_code = data.get("code") try: @@ -66,6 +78,10 @@ def get_path(self, path_code): path_data = self.http.get_path(path_code=path_code) return self.store_path(path_data) + @property + def modules(self): + return list(self._modules.values()) + def store_module(self, data): module_code = data.get("moduleURL") try: @@ -81,6 +97,10 @@ def get_module(self, module_code): module_data = self.http.get_module(module_code=module_code) return self.store_module(module_data) + @property + def users(self): + return list(self._users.values()) + def store_user(self, username): try: return self._users[username] @@ -94,6 +114,10 @@ def get_user(self, username): except KeyError: return self.store_user(username) + @property + def badges(self): + return list(self._badges.values()) + def store_badge(self, data): badge_name = data.get("name") try: @@ -106,7 +130,15 @@ def get_badge(self, badge_name): try: return self._badges[badge_name] except KeyError: - return self.store_badge(badge_name) + self._sync() + try: + return self._badges[badge_name] + except KeyError: + raise NotImplemented(f"Badge with name {badge_name} is not found") + + @property + def series(self): + return list(self._series.values()) def store_serie(self, data): serie_code = data.get("id") @@ -123,6 +155,10 @@ def get_serie(self, serie_code): serie_data = self.http.get_serie(serie_code=serie_code) return self.store_serie(serie_data) + @property + def networks(self): + return list(self._networks.values()) + def store_network(self, data): network_code = data.get("code") try: @@ -138,9 +174,40 @@ def get_network(self, network_code): network_data = self.http.get_network(network_code=network_code) return self.store_network(network_data) + @property + def message_groups(self): + return list(self._message_groups.values()) + + def store_message_group(self, data): + group_id = data.get("groupId") + try: + return self._message_groups[group_id] + except KeyError: + message_group = MessageGroup(state=self, data=data) + self._message_groups[group_id] = message_group + return message_group + def get_message_group(self, group_id): + try: + return self._message_groups[group_id] + except KeyError: + raise NotImplemented(f"message group with id {group_id} cannot be found") + + @property + def team(self): + return self.team + + def store_team(self, data): + if self._team is not None: + return self._team + else: + team = Team(state=self, data=data) + self._team = team + return team + @property def authenticated(self): return self.http.authenticated + @property def subscribed(self): if self.authenticated: diff --git a/tryhackme/task.py b/tryhackme/task.py index 40e4239..303cc70 100644 --- a/tryhackme/task.py +++ b/tryhackme/task.py @@ -11,7 +11,9 @@ def __init__(self, state, data): def _from_data(self, data): self.raw_title = data.get('taskTitle') + self.title = utils.HTML_parse(self.raw_title, "*") self.raw_description = data.get('taskDesc') + self.description = utils.HTML_parse(self.raw_description) self.type = data.get('taskType') self.number = data.get('taskNo') self.created = data.get('taskCreated') @@ -19,10 +21,6 @@ def _from_data(self, data): self.uploadId = data.get('uploadId') self._questions = data.get('tasksInfo', []) if self._state.authenticated else data.get('questions', []) - @property - def title(self): - return utils.HTML_parse(self.raw_title, "*") - @property def question_count(self): return self._questions.__len__() diff --git a/tryhackme/team.py b/tryhackme/team.py index d3695a7..3c79263 100644 --- a/tryhackme/team.py +++ b/tryhackme/team.py @@ -10,7 +10,7 @@ def __init__(self, state, data): def _from_data(self, data): self.name = data.get("name") self._members = data.get("members") - self.capitain = self._state.get_user(data.get("capitain")) + self.captain = self._state.get_user(data.get("captain")) self.password = data.get("password") self.university = data.get("university") diff --git a/tryhackme/user.py b/tryhackme/user.py index 914242e..06b4ab9 100644 --- a/tryhackme/user.py +++ b/tryhackme/user.py @@ -1,25 +1,25 @@ from .errors import NotImplemented -from .team import Team -# TODO: add message/notification class -class User: + +class BaseUser: def __init__(self, state, username): self._state = state - self.username = username - self._completed_rooms = [] + if not self._state.http.get_user_exist(username=username).get('success', False): + raise NotImplemented("Unknown user with username: "+ str(username)) - if not self._state.http.get_user_exist(username=self.username).get('success', False): - raise NotImplemented("Unknown user with username: "+ self.username) + self.name = username + self._completed_rooms = [] data = self._fetch() self._from_data(data) - # TODO: fetch is a mess, needs fixing + + # ? fixable. fetch is a mess, but this is also how tryhackme web does it .. def _fetch(self): data = {} - data['badges'] = self._state.http.get_user_badges(username=self.username) - data['rank'] = self._state.http.get_discord_user(username=self.username) - data['completed_rooms'] = self._state.http.get_user_completed_rooms(username=self.username) + data['badges'] = self._state.http.get_user_badges(username=self.name) + data['rank'] = self._state.http.get_discord_user(username=self.name) + data['completed_rooms'] = self._state.http.get_user_completed_rooms(username=self.name) return data def _from_data(self, data): @@ -30,24 +30,34 @@ def _from_data(self, data): self._completed_rooms = data.get('completed_rooms') @property - def completed_rooms(self): - return [self._state.store_room(data=data) for data in self._completed_rooms] - @property def badges(self): - return [self._state.get_badge(badge_code) for badge_code in self._badges] + return [self._state.get_badge(badge.get("name")) for badge in self._badges] + @property + def completed_rooms(self): + return [self._state.get_room(room.get("code")) for room in self._completed_rooms] +class User(BaseUser): + pass -class ClientUser(User): +class ClientUser(BaseUser): def __init__(self, state, username): super().__init__(state, username) - - data = self._fetch() - self._from_data(data) + self._message_groups = [] + self._update() - def _fetch(self): - data = {} - data['team'] = self._state.http.get_team_info() - return data + def _update(self): + self._state._clear_client() + self._message_groups = self._state.http.get_all_group_messages() + self._team = self._state.http.get_team_info() + for badge in self._state.badges: + if [user_badge for user_badge in self.badges if badge.name == user_badge.name].__len__() > 1: + badge.earned = True - def _from_data(self, data): - self.team = Team(state=self._state, data=data.get("team")) \ No newline at end of file + @property + def message_groups(self): + return [self._state.store_message_group(group) for group in self._message_groups] + + # * Team data is semi dynamic, it isnt likly to change much during runtime + @property + def team(self): + return self._state.store_team(self._team) diff --git a/tryhackme/utils.py b/tryhackme/utils.py index f98ecbe..47a73ea 100644 --- a/tryhackme/utils.py +++ b/tryhackme/utils.py @@ -22,4 +22,15 @@ def HTML_parse(text, replace=""): text = text.replace("\n", "") text = html.unescape(text) text = re.sub(_HTML_TAGS_, replace, text) - return text \ No newline at end of file + return text + + +def find(predicate, seq): + for element in seq: + if predicate(element): + return element + return None + +def find_userId(userId, users): + predicate = lambda user : user.id == userId + return find(predicate, users) \ No newline at end of file