From 178d31bda38e9d6b490afbe0b8674864141a44c4 Mon Sep 17 00:00:00 2001 From: Bee Date: Mon, 16 Sep 2019 09:04:27 +0200 Subject: [PATCH] proper platform --- minecraft_macosx/bin/chardetect.exe | Bin 93066 -> 93074 bytes minecraft_macosx/galaxy/__init__.py | 2 +- minecraft_macosx/galaxy/api/consts.py | 20 +- minecraft_macosx/galaxy/api/errors.py | 2 +- minecraft_macosx/galaxy/api/jsonrpc.py | 51 +- minecraft_macosx/galaxy/api/plugin.py | 556 ++++++++++------------ minecraft_macosx/galaxy/api/types.py | 57 +-- minecraft_macosx/galaxy/http.py | 145 ++++-- minecraft_macosx/galaxy/proc_tools.py | 88 ++++ minecraft_macosx/galaxy/task_manager.py | 49 ++ minecraft_macosx/galaxy/tools.py | 2 + minecraft_macosx/galaxy/unittest/mock.py | 25 +- minecraft_macosx/manifest.json | 2 +- minecraft_macosx/plugin.py | 2 +- minecraft_windows/bin/chardetect.exe | Bin 93066 -> 93074 bytes minecraft_windows/galaxy/__init__.py | 2 +- minecraft_windows/galaxy/api/consts.py | 20 +- minecraft_windows/galaxy/api/errors.py | 2 +- minecraft_windows/galaxy/api/jsonrpc.py | 51 +- minecraft_windows/galaxy/api/plugin.py | 556 ++++++++++------------ minecraft_windows/galaxy/api/types.py | 57 +-- minecraft_windows/galaxy/http.py | 145 ++++-- minecraft_windows/galaxy/proc_tools.py | 88 ++++ minecraft_windows/galaxy/task_manager.py | 49 ++ minecraft_windows/galaxy/tools.py | 2 + minecraft_windows/galaxy/unittest/mock.py | 25 +- minecraft_windows/manifest.json | 2 +- minecraft_windows/plugin.py | 2 +- 28 files changed, 1124 insertions(+), 878 deletions(-) create mode 100644 minecraft_macosx/galaxy/proc_tools.py create mode 100644 minecraft_macosx/galaxy/task_manager.py create mode 100644 minecraft_windows/galaxy/proc_tools.py create mode 100644 minecraft_windows/galaxy/task_manager.py diff --git a/minecraft_macosx/bin/chardetect.exe b/minecraft_macosx/bin/chardetect.exe index b6ef9bf05e14df72ea65d256ae94a10acdc85f49..65cc0613d9fd2740457c4515fb918500ffe635cb 100644 GIT binary patch delta 58 zcmeCW&N}HjYr_^sg)H&B)ROr8wD_dNl9HU%V%@U54AYd7?Ck0KS&RzIie75dgR&UE Kf*D@fj1B;Xu@oNw delta 50 zcmbPqowe&aYr_^sg)IK0)RL0a4Bec9vZ|tr)b#1*S&RzI8yY#LM`ba71v7%O865!b C)DndN diff --git a/minecraft_macosx/galaxy/__init__.py b/minecraft_macosx/galaxy/__init__.py index 69e3be5..97b69ed 100644 --- a/minecraft_macosx/galaxy/__init__.py +++ b/minecraft_macosx/galaxy/__init__.py @@ -1 +1 @@ -__path__ = __import__('pkgutil').extend_path(__path__, __name__) +__path__: str = __import__('pkgutil').extend_path(__path__, __name__) diff --git a/minecraft_macosx/galaxy/api/consts.py b/minecraft_macosx/galaxy/api/consts.py index d006714..d636613 100644 --- a/minecraft_macosx/galaxy/api/consts.py +++ b/minecraft_macosx/galaxy/api/consts.py @@ -81,6 +81,16 @@ class Platform(Enum): NintendoDs = "nds" Nintendo3Ds = "3ds" PathOfExile = "pathofexile" + Twitch = "twitch" + Minecraft = "minecraft" + GameSessions = "gamesessions" + Nuuvem = "nuuvem" + FXStore = "fxstore" + IndieGala = "indiegala" + Playfire = "playfire" + Oculus = "oculus" + Test = "test" + class Feature(Enum): """Possible features that can be implemented by an integration. @@ -98,6 +108,8 @@ class Feature(Enum): ImportUsers = "ImportUsers" VerifyGame = "VerifyGame" ImportFriends = "ImportFriends" + ShutdownPlatformClient = "ShutdownPlatformClient" + LaunchPlatformClient = "LaunchPlatformClient" class LicenseType(Enum): @@ -116,11 +128,3 @@ class LocalGameState(Flag): None_ = 0 Installed = 1 Running = 2 - - -class PresenceState(Enum): - """"Possible states that a user can be in.""" - Unknown = "Unknown" - Online = "online" - Offline = "offline" - Away = "away" diff --git a/minecraft_macosx/galaxy/api/errors.py b/minecraft_macosx/galaxy/api/errors.py index 6564b48..f53479f 100644 --- a/minecraft_macosx/galaxy/api/errors.py +++ b/minecraft_macosx/galaxy/api/errors.py @@ -1,6 +1,6 @@ from galaxy.api.jsonrpc import ApplicationError, UnknownError -UnknownError = UnknownError +assert UnknownError class AuthenticationRequired(ApplicationError): def __init__(self, data=None): diff --git a/minecraft_macosx/galaxy/api/jsonrpc.py b/minecraft_macosx/galaxy/api/jsonrpc.py index 87bff71..8b14ca7 100644 --- a/minecraft_macosx/galaxy/api/jsonrpc.py +++ b/minecraft_macosx/galaxy/api/jsonrpc.py @@ -6,6 +6,7 @@ import json from galaxy.reader import StreamLineReader +from galaxy.task_manager import TaskManager class JsonRpcError(Exception): def __init__(self, code, message, data=None): @@ -52,7 +53,8 @@ def __init__(self, data=None): super().__init__(0, "Unknown error", data) Request = namedtuple("Request", ["method", "params", "id"], defaults=[{}, None]) -Method = namedtuple("Method", ["callback", "signature", "internal", "sensitive_params"]) +Method = namedtuple("Method", ["callback", "signature", "immediate", "sensitive_params"]) + def anonymise_sensitive_params(params, sensitive_params): anomized_data = "****" @@ -74,9 +76,9 @@ def __init__(self, reader, writer, encoder=json.JSONEncoder()): self._encoder = encoder self._methods = {} self._notifications = {} - self._eof_listeners = [] + self._task_manager = TaskManager("jsonrpc server") - def register_method(self, name, callback, internal, sensitive_params=False): + def register_method(self, name, callback, immediate, sensitive_params=False): """ Register method @@ -86,9 +88,9 @@ def register_method(self, name, callback, internal, sensitive_params=False): :param sensitive_params: list of parameters that are anonymized before logging; \ if False - no params are considered sensitive, if True - all params are considered sensitive """ - self._methods[name] = Method(callback, inspect.signature(callback), internal, sensitive_params) + self._methods[name] = Method(callback, inspect.signature(callback), immediate, sensitive_params) - def register_notification(self, name, callback, internal, sensitive_params=False): + def register_notification(self, name, callback, immediate, sensitive_params=False): """ Register notification @@ -98,10 +100,7 @@ def register_notification(self, name, callback, internal, sensitive_params=False :param sensitive_params: list of parameters that are anonymized before logging; \ if False - no params are considered sensitive, if True - all params are considered sensitive """ - self._notifications[name] = Method(callback, inspect.signature(callback), internal, sensitive_params) - - def register_eof(self, callback): - self._eof_listeners.append(callback) + self._notifications[name] = Method(callback, inspect.signature(callback), immediate, sensitive_params) async def run(self): while self._active: @@ -118,14 +117,16 @@ async def run(self): self._handle_input(data) await asyncio.sleep(0) # To not starve task queue - def stop(self): + def close(self): + logging.info("Closing JSON-RPC server - not more messages will be read") self._active = False + async def wait_closed(self): + await self._task_manager.wait() + def _eof(self): logging.info("Received EOF") - self.stop() - for listener in self._eof_listeners: - listener() + self.close() def _handle_input(self, data): try: @@ -145,7 +146,7 @@ def _handle_notification(self, request): logging.error("Received unknown notification: %s", request.method) return - callback, signature, internal, sensitive_params = method + callback, signature, immediate, sensitive_params = method self._log_request(request, sensitive_params) try: @@ -153,12 +154,11 @@ def _handle_notification(self, request): except TypeError: self._send_error(request.id, InvalidParams()) - if internal: - # internal requests are handled immediately + if immediate: callback(*bound_args.args, **bound_args.kwargs) else: try: - asyncio.create_task(callback(*bound_args.args, **bound_args.kwargs)) + self._task_manager.create_task(callback(*bound_args.args, **bound_args.kwargs), request.method) except Exception: logging.exception("Unexpected exception raised in notification handler") @@ -169,7 +169,7 @@ def _handle_request(self, request): self._send_error(request.id, MethodNotFound()) return - callback, signature, internal, sensitive_params = method + callback, signature, immediate, sensitive_params = method self._log_request(request, sensitive_params) try: @@ -177,8 +177,7 @@ def _handle_request(self, request): except TypeError: self._send_error(request.id, InvalidParams()) - if internal: - # internal requests are handled immediately + if immediate: response = callback(*bound_args.args, **bound_args.kwargs) self._send_response(request.id, response) else: @@ -190,11 +189,13 @@ async def handle(): self._send_error(request.id, MethodNotFound()) except JsonRpcError as error: self._send_error(request.id, error) + except asyncio.CancelledError: + self._send_error(request.id, Aborted()) except Exception as e: #pylint: disable=broad-except logging.exception("Unexpected exception raised in plugin handler") self._send_error(request.id, UnknownError(str(e))) - asyncio.create_task(handle()) + self._task_manager.create_task(handle(), request.method) @staticmethod def _parse_request(data): @@ -215,7 +216,7 @@ def _send(self, data): logging.debug("Sending data: %s", line) data = (line + "\n").encode("utf-8") self._writer.write(data) - asyncio.create_task(self._writer.drain()) + self._task_manager.create_task(self._writer.drain(), "drain") except TypeError as error: logging.error(str(error)) @@ -255,6 +256,7 @@ def __init__(self, writer, encoder=json.JSONEncoder()): self._writer = writer self._encoder = encoder self._methods = {} + self._task_manager = TaskManager("notification client") def notify(self, method, params, sensitive_params=False): """ @@ -273,13 +275,16 @@ def notify(self, method, params, sensitive_params=False): self._log(method, params, sensitive_params) self._send(notification) + async def close(self): + await self._task_manager.wait() + def _send(self, data): try: line = self._encoder.encode(data) data = (line + "\n").encode("utf-8") logging.debug("Sending %d byte of data", len(data)) self._writer.write(data) - asyncio.create_task(self._writer.drain()) + self._task_manager.create_task(self._writer.drain(), "drain") except TypeError as error: logging.error("Failed to parse outgoing message: %s", str(error)) diff --git a/minecraft_macosx/galaxy/api/plugin.py b/minecraft_macosx/galaxy/api/plugin.py index bfa1d75..f573ebb 100644 --- a/minecraft_macosx/galaxy/api/plugin.py +++ b/minecraft_macosx/galaxy/api/plugin.py @@ -1,21 +1,17 @@ import asyncio +import dataclasses import json import logging import logging.handlers -import dataclasses -from enum import Enum -from collections import OrderedDict import sys +from enum import Enum +from typing import Any, Dict, List, Optional, Set, Union -from typing import Any, List, Dict, Optional, Union - -from galaxy.api.types import Achievement, Game, LocalGame, FriendInfo, GameTime, UserInfo, Room - -from galaxy.api.jsonrpc import Server, NotificationClient, ApplicationError from galaxy.api.consts import Feature -from galaxy.api.errors import UnknownError, ImportInProgress -from galaxy.api.types import Authentication, NextStep, Message - +from galaxy.api.errors import ImportInProgress, UnknownError +from galaxy.api.jsonrpc import ApplicationError, NotificationClient, Server +from galaxy.api.types import Achievement, Authentication, FriendInfo, Game, GameTime, LocalGame, NextStep +from galaxy.task_manager import TaskManager class JSONEncoder(json.JSONEncoder): def default(self, o): # pylint: disable=method-hidden @@ -23,6 +19,7 @@ def default(self, o): # pylint: disable=method-hidden # filter None values def dict_factory(elements): return {k: v for k, v in elements if v is not None} + return dataclasses.asdict(o, dict_factory=dict_factory) if isinstance(o, Enum): return o.value @@ -31,14 +28,14 @@ def dict_factory(elements): class Plugin: """Use and override methods of this class to create a new platform integration.""" + def __init__(self, platform, version, reader, writer, handshake_token): logging.info("Creating plugin for platform %s, version %s", platform.value, version) self._platform = platform self._version = version - self._feature_methods = OrderedDict() + self._features: Set[Feature] = set() self._active = True - self._pass_control_task = None self._reader, self._writer = reader, writer self._handshake_token = handshake_token @@ -47,25 +44,25 @@ def __init__(self, platform, version, reader, writer, handshake_token): self._server = Server(self._reader, self._writer, encoder) self._notification_client = NotificationClient(self._writer, encoder) - def eof_handler(): - self._shutdown() - self._server.register_eof(eof_handler) - self._achievements_import_in_progress = False self._game_times_import_in_progress = False self._persistent_cache = dict() + self._internal_task_manager = TaskManager("plugin internal") + self._external_task_manager = TaskManager("plugin external") + # internal self._register_method("shutdown", self._shutdown, internal=True) - self._register_method("get_capabilities", self._get_capabilities, internal=True) + self._register_method("get_capabilities", self._get_capabilities, internal=True, immediate=True) self._register_method( "initialize_cache", self._initialize_cache, internal=True, + immediate=True, sensitive_params="data" ) - self._register_method("ping", self._ping, internal=True) + self._register_method("ping", self._ping, internal=True, immediate=True) # implemented by developer self._register_method( @@ -81,139 +78,123 @@ def eof_handler(): self._register_method( "import_owned_games", self.get_owned_games, - result_name="owned_games", - feature=Feature.ImportOwnedGames - ) - self._register_method( - "import_unlocked_achievements", - self.get_unlocked_achievements, - result_name="unlocked_achievements", - feature=Feature.ImportAchievements - ) - self._register_method( - "start_achievements_import", - self.start_achievements_import, - ) - self._register_method( - "import_local_games", - self.get_local_games, - result_name="local_games", - feature=Feature.ImportInstalledGames - ) - self._register_notification("launch_game", self.launch_game, feature=Feature.LaunchGame) - self._register_notification("install_game", self.install_game, feature=Feature.InstallGame) - self._register_notification( - "uninstall_game", - self.uninstall_game, - feature=Feature.UninstallGame - ) - self._register_method( - "import_friends", - self.get_friends, - result_name="friend_info_list", - feature=Feature.ImportFriends - ) - self._register_method( - "import_user_infos", - self.get_users, - result_name="user_info_list", - feature=Feature.ImportUsers - ) - self._register_method( - "send_message", - self.send_message, - feature=Feature.Chat - ) - self._register_method( - "mark_as_read", - self.mark_as_read, - feature=Feature.Chat - ) - self._register_method( - "import_rooms", - self.get_rooms, - result_name="rooms", - feature=Feature.Chat - ) - self._register_method( - "import_room_history_from_message", - self.get_room_history_from_message, - result_name="messages", - feature=Feature.Chat - ) - self._register_method( - "import_room_history_from_timestamp", - self.get_room_history_from_timestamp, - result_name="messages", - feature=Feature.Chat - ) - self._register_method( - "import_game_times", - self.get_game_times, - result_name="game_times", - feature=Feature.ImportGameTime - ) - self._register_method( - "start_game_times_import", - self.start_game_times_import, + result_name="owned_games" ) + self._detect_feature(Feature.ImportOwnedGames, ["get_owned_games"]) - @property - def features(self): - features = [] - if self.__class__ != Plugin: - for feature, handlers in self._feature_methods.items(): - if self._implements(handlers): - features.append(feature) + self._register_method("start_achievements_import", self._start_achievements_import) + self._detect_feature(Feature.ImportAchievements, ["get_unlocked_achievements"]) + + self._register_method("import_local_games", self.get_local_games, result_name="local_games") + self._detect_feature(Feature.ImportInstalledGames, ["get_local_games"]) + + self._register_notification("launch_game", self.launch_game) + self._detect_feature(Feature.LaunchGame, ["launch_game"]) + + self._register_notification("install_game", self.install_game) + self._detect_feature(Feature.InstallGame, ["install_game"]) + + self._register_notification("uninstall_game", self.uninstall_game) + self._detect_feature(Feature.UninstallGame, ["uninstall_game"]) + + self._register_notification("shutdown_platform_client", self.shutdown_platform_client) + self._detect_feature(Feature.ShutdownPlatformClient, ["shutdown_platform_client"]) + + self._register_notification("launch_platform_client", self.launch_platform_client) + self._detect_feature(Feature.LaunchPlatformClient, ["launch_platform_client"]) + + self._register_method("import_friends", self.get_friends, result_name="friend_info_list") + self._detect_feature(Feature.ImportFriends, ["get_friends"]) + + self._register_method("start_game_times_import", self._start_game_times_import) + self._detect_feature(Feature.ImportGameTime, ["get_game_time"]) - return features + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + self.close() + await self.wait_closed() + + @property + def features(self) -> List[Feature]: + return list(self._features) @property - def persistent_cache(self) -> Dict: + def persistent_cache(self) -> Dict[str, str]: """The cache is only available after the :meth:`~.handshake_complete()` is called. """ return self._persistent_cache - def _implements(self, handlers): - for handler in handlers: - if handler.__name__ not in self.__class__.__dict__: + def _implements(self, methods: List[str]) -> bool: + for method in methods: + if method not in self.__class__.__dict__: return False return True - def _register_method(self, name, handler, result_name=None, internal=False, sensitive_params=False, feature=None): - if internal: + def _detect_feature(self, feature: Feature, methods: List[str]): + if self._implements(methods): + self._features.add(feature) + + def _register_method(self, name, handler, result_name=None, internal=False, immediate=False, sensitive_params=False): + def wrap_result(result): + if result_name: + result = { + result_name: result + } + return result + + if immediate: def method(*args, **kwargs): result = handler(*args, **kwargs) - if result_name: - result = { - result_name: result - } - return result + return wrap_result(result) + self._server.register_method(name, method, True, sensitive_params) else: async def method(*args, **kwargs): - result = await handler(*args, **kwargs) - if result_name: - result = { - result_name: result - } - return result - self._server.register_method(name, method, False, sensitive_params) + if not internal: + handler_ = self._wrap_external_method(handler, name) + else: + handler_ = handler + result = await handler_(*args, **kwargs) + return wrap_result(result) - if feature is not None: - self._feature_methods.setdefault(feature, []).append(handler) + self._server.register_method(name, method, False, sensitive_params) - def _register_notification(self, name, handler, internal=False, sensitive_params=False, feature=None): - self._server.register_notification(name, handler, internal, sensitive_params) + def _register_notification(self, name, handler, internal=False, immediate=False, sensitive_params=False): + if not internal and not immediate: + handler = self._wrap_external_method(handler, name) + self._server.register_notification(name, handler, immediate, sensitive_params) - if feature is not None: - self._feature_methods.setdefault(feature, []).append(handler) + def _wrap_external_method(self, handler, name: str): + async def wrapper(*args, **kwargs): + return await self._external_task_manager.create_task(handler(*args, **kwargs), name, False) + return wrapper async def run(self): """Plugin's main coroutine.""" await self._server.run() - if self._pass_control_task is not None: - await self._pass_control_task + await self._external_task_manager.wait() + + def close(self) -> None: + if not self._active: + return + + logging.info("Closing plugin") + self._server.close() + self._external_task_manager.cancel() + self._internal_task_manager.create_task(self.shutdown(), "shutdown") + self._active = False + + async def wait_closed(self) -> None: + await self._external_task_manager.wait() + await self._internal_task_manager.wait() + await self._server.wait_closed() + await self._notification_client.close() + + def create_task(self, coro, description): + """Wrapper around asyncio.create_task - takes care of canceling tasks on shutdown""" + return self._external_task_manager.create_task(coro, description) async def _pass_control(self): while self._active: @@ -223,11 +204,11 @@ async def _pass_control(self): logging.exception("Unexpected exception raised in plugin tick") await asyncio.sleep(1) - def _shutdown(self): + async def _shutdown(self): logging.info("Shutting down") - self._server.stop() - self._active = False - self.shutdown() + self.close() + await self._external_task_manager.wait() + await self._internal_task_manager.wait() def _get_capabilities(self): return { @@ -238,8 +219,11 @@ def _get_capabilities(self): def _initialize_cache(self, data: Dict): self._persistent_cache = data - self.handshake_complete() - self._pass_control_task = asyncio.create_task(self._pass_control()) + try: + self.handshake_complete() + except Exception: + logging.exception("Unhandled exception during `handshake_complete` step") + self._internal_task_manager.create_task(self._pass_control(), "tick") @staticmethod def _ping(): @@ -267,8 +251,10 @@ async def pass_login_credentials(self, step, credentials, cookies): self.store_credentials(user_data['credentials']) return Authentication(user_data['userId'], user_data['username']) - """ - self.persistent_cache['credentials'] = credentials + """ + # temporary solution for persistent_cache vs credentials issue + self.persistent_cache['credentials'] = credentials # type: ignore + self._notification_client.notify("store_credentials", credentials, sensitive_params=True) def add_game(self, game: Game) -> None: @@ -297,7 +283,7 @@ def remove_game(self, game_id: str) -> None: """Notify the client to remove game from the list of owned games of the currently authenticated user. - :param game_id: game id of the game to remove from the list of owned games + :param game_id: the id of the game to remove from the list of owned games Example use case of remove_game: @@ -327,7 +313,7 @@ def update_game(self, game: Game) -> None: def unlock_achievement(self, game_id: str, achievement: Achievement) -> None: """Notify the client to unlock an achievement for a specific game. - :param game_id: game_id of the game for which to unlock an achievement. + :param game_id: the id of the game for which to unlock an achievement. :param achievement: achievement to unlock. """ params = { @@ -336,26 +322,14 @@ def unlock_achievement(self, game_id: str, achievement: Achievement) -> None: } self._notification_client.notify("achievement_unlocked", params) - def game_achievements_import_success(self, game_id: str, achievements: List[Achievement]) -> None: - """Notify the client that import of achievements for a given game has succeeded. - This method is called by import_games_achievements. - - :param game_id: id of the game for which the achievements were imported - :param achievements: list of imported achievements - """ + def _game_achievements_import_success(self, game_id: str, achievements: List[Achievement]) -> None: params = { "game_id": game_id, "unlocked_achievements": achievements } self._notification_client.notify("game_achievements_import_success", params) - def game_achievements_import_failure(self, game_id: str, error: ApplicationError) -> None: - """Notify the client that import of achievements for a given game has failed. - This method is called by import_games_achievements. - - :param game_id: id of the game for which the achievements import failed - :param error: error which prevented the achievements import - """ + def _game_achievements_import_failure(self, game_id: str, error: ApplicationError) -> None: params = { "game_id": game_id, "error": { @@ -365,9 +339,7 @@ def game_achievements_import_failure(self, game_id: str, error: ApplicationError } self._notification_client.notify("game_achievements_import_failure", params) - def achievements_import_finished(self) -> None: - """Notify the client that importing achievements has finished. - This method is called by import_games_achievements_task""" + def _achievements_import_finished(self) -> None: self._notification_client.notify("achievements_import_finished", None) def update_local_game_status(self, local_game: LocalGame) -> None: @@ -387,7 +359,7 @@ async def _check_statuses(self): continue self.update_local_game_status(LocalGame(game.id, game.status)) self._cached_games_statuses[game.id] = game.status - asyncio.sleep(5) # interval + await asyncio.sleep(5) # interval def tick(self): if self._check_statuses_task is None or self._check_statuses_task.done(): @@ -412,26 +384,6 @@ def remove_friend(self, user_id: str) -> None: params = {"user_id": user_id} self._notification_client.notify("friend_removed", params) - def update_room( - self, - room_id: str, - unread_message_count: Optional[int]=None, - new_messages: Optional[List[Message]]=None - ) -> None: - """WIP, Notify the client to update the information regarding - a chat room that the currently authenticated user is in. - - :param room_id: id of the room to update - :param unread_message_count: information about the new unread message count in the room - :param new_messages: list of new messages that the user received - """ - params = {"room_id": room_id} - if unread_message_count is not None: - params["unread_message_count"] = unread_message_count - if new_messages is not None: - params["messages"] = new_messages - self._notification_client.notify("chat_room_updated", params) - def update_game_time(self, game_time: GameTime) -> None: """Notify the client to update game time for a game. @@ -440,22 +392,11 @@ def update_game_time(self, game_time: GameTime) -> None: params = {"game_time": game_time} self._notification_client.notify("game_time_updated", params) - def game_time_import_success(self, game_time: GameTime) -> None: - """Notify the client that import of a given game_time has succeeded. - This method is called by import_game_times. - - :param game_time: game_time which was imported - """ + def _game_time_import_success(self, game_time: GameTime) -> None: params = {"game_time": game_time} self._notification_client.notify("game_time_import_success", params) - def game_time_import_failure(self, game_id: str, error: ApplicationError) -> None: - """Notify the client that import of a game time for a given game has failed. - This method is called by import_game_times. - - :param game_id: id of the game for which the game time could not be imported - :param error: error which prevented the game time import - """ + def _game_time_import_failure(self, game_id: str, error: ApplicationError) -> None: params = { "game_id": game_id, "error": { @@ -465,10 +406,7 @@ def game_time_import_failure(self, game_id: str, error: ApplicationError) -> Non } self._notification_client.notify("game_time_import_failure", params) - def game_times_import_finished(self) -> None: - """Notify the client that importing game times has finished. - This method is called by :meth:`~.import_game_times_task`. - """ + def _game_times_import_finished(self) -> None: self._notification_client.notify("game_times_import_finished", None) def lost_authentication(self) -> None: @@ -514,7 +452,7 @@ def tick(self): """ - def shutdown(self) -> None: + async def shutdown(self) -> None: """This method is called on integration shutdown. Override it to implement tear down. This method is called by the GOG Galaxy Client.""" @@ -549,7 +487,7 @@ async def authenticate(self, stored_credentials=None): raise NotImplementedError() async def pass_login_credentials(self, step: str, credentials: Dict[str, str], cookies: List[Dict[str, str]]) \ - -> Union[NextStep, Authentication]: + -> Union[NextStep, Authentication]: """This method is called if we return galaxy.api.types.NextStep from authenticate or from pass_login_credentials. This method's parameters provide the data extracted from the web page navigation that previous NextStep finished on. This method should either return galaxy.api.types.Authentication if the authentication is finished @@ -597,50 +535,63 @@ async def get_owned_games(self): """ raise NotImplementedError() - async def get_unlocked_achievements(self, game_id: str) -> List[Achievement]: - """ - .. deprecated:: 0.33 - Use :meth:`~.import_games_achievements`. - """ - raise NotImplementedError() - - async def start_achievements_import(self, game_ids: List[str]) -> None: - """Starts the task of importing achievements. - This method is called by the GOG Galaxy Client. - - :param game_ids: ids of the games for which the achievements are imported - """ + async def _start_achievements_import(self, game_ids: List[str]) -> None: if self._achievements_import_in_progress: raise ImportInProgress() - async def import_games_achievements_task(game_ids): + context = await self.prepare_achievements_context(game_ids) + + async def import_game_achievements(game_id, context_): + try: + achievements = await self.get_unlocked_achievements(game_id, context_) + self._game_achievements_import_success(game_id, achievements) + except ApplicationError as error: + self._game_achievements_import_failure(game_id, error) + except Exception: + logging.exception("Unexpected exception raised in import_game_achievements") + self._game_achievements_import_failure(game_id, UnknownError()) + + async def import_games_achievements(game_ids_, context_): try: - await self.import_games_achievements(game_ids) + imports = [import_game_achievements(game_id, context_) for game_id in game_ids_] + await asyncio.gather(*imports) finally: - self.achievements_import_finished() + self._achievements_import_finished() self._achievements_import_in_progress = False + self.achievements_import_complete() - asyncio.create_task(import_games_achievements_task(game_ids)) + self._external_task_manager.create_task( + import_games_achievements(game_ids, context), + "unlocked achievements import", + handle_exceptions=False + ) self._achievements_import_in_progress = True - async def import_games_achievements(self, game_ids: List[str]) -> None: + async def prepare_achievements_context(self, game_ids: List[str]) -> Any: + """Override this method to prepare context for get_unlocked_achievements. + This allows for optimizations like batch requests to platform API. + Default implementation returns None. + + :param game_ids: the ids of the games for which achievements are imported + :return: context """ - Override this method to return the unlocked achievements - of the user that is currently logged in to the plugin. - Call game_achievements_import_success/game_achievements_import_failure for each game_id on the list. - This method is called by the GOG Galaxy Client. + return None + + async def get_unlocked_achievements(self, game_id: str, context: Any) -> List[Achievement]: + """Override this method to return list of unlocked achievements + for the game identified by the provided game_id. + This method is called by import task initialized by GOG Galaxy Client. - :param game_ids: ids of the games for which to import unlocked achievements + :param game_id: the id of the game for which the achievements are returned + :param context: the value returned from :meth:`prepare_achievements_context` + :return: list of Achievement objects """ - async def import_game_achievements(game_id): - try: - achievements = await self.get_unlocked_achievements(game_id) - self.game_achievements_import_success(game_id, achievements) - except Exception as error: - self.game_achievements_import_failure(game_id, error) + raise NotImplementedError() - imports = [import_game_achievements(game_id) for game_id in game_ids] - await asyncio.gather(*imports) + def achievements_import_complete(self): + """Override this method to handle operations after achievements import is finished + (like updating cache). + """ async def get_local_games(self) -> List[LocalGame]: """Override this method to return the list of @@ -669,7 +620,7 @@ async def launch_game(self, game_id: str) -> None: identified by the provided game_id. This method is called by the GOG Galaxy Client. - :param str game_id: id of the game to launch + :param str game_id: the id of the game to launch Example of possible override of the method: @@ -687,7 +638,7 @@ async def install_game(self, game_id: str) -> None: identified by the provided game_id. This method is called by the GOG Galaxy Client. - :param str game_id: id of the game to install + :param str game_id: the id of the game to install Example of possible override of the method: @@ -705,7 +656,7 @@ async def uninstall_game(self, game_id: str) -> None: identified by the provided game_id. This method is called by the GOG Galaxy Client. - :param str game_id: id of the game to uninstall + :param str game_id: the id of the game to uninstall Example of possible override of the method: @@ -718,6 +669,16 @@ async def uninstall_game(self, game_id): """ raise NotImplementedError() + async def shutdown_platform_client(self) -> None: + """Override this method to gracefully terminate platform client. + This method is called by the GOG Galaxy Client.""" + raise NotImplementedError() + + async def launch_platform_client(self) -> None: + """Override this method to launch platform client. Preferably minimized to tray. + This method is called by the GOG Galaxy Client.""" + raise NotImplementedError() + async def get_friends(self) -> List[FriendInfo]: """Override this method to return the friends list of the currently authenticated user. @@ -738,105 +699,63 @@ async def get_friends(self): """ raise NotImplementedError() - async def get_users(self, user_id_list: List[str]) -> List[UserInfo]: - """WIP, Override this method to return the list of users matching the provided ids. - This method is called by the GOG Galaxy Client. - - :param user_id_list: list of user ids - """ - raise NotImplementedError() - - async def send_message(self, room_id: str, message_text: str) -> None: - """WIP, Override this method to send message to a chat room. - This method is called by the GOG Galaxy Client. + async def _start_game_times_import(self, game_ids: List[str]) -> None: + if self._game_times_import_in_progress: + raise ImportInProgress() - :param room_id: id of the room to which the message should be sent - :param message_text: text which should be sent in the message - """ - raise NotImplementedError() + context = await self.prepare_game_times_context(game_ids) - async def mark_as_read(self, room_id: str, last_message_id: str) -> None: - """WIP, Override this method to mark messages in a chat room as read up to the id provided in the parameter. - This method is called by the GOG Galaxy Client. + async def import_game_time(game_id, context_): + try: + game_time = await self.get_game_time(game_id, context_) + self._game_time_import_success(game_time) + except ApplicationError as error: + self._game_time_import_failure(game_id, error) + except Exception: + logging.exception("Unexpected exception raised in import_game_time") + self._game_time_import_failure(game_id, UnknownError()) - :param room_id: id of the room - :param last_message_id: id of the last message; room is marked as read only if this id matches - the last message id known to the client - """ - raise NotImplementedError() + async def import_game_times(game_ids_, context_): + try: + imports = [import_game_time(game_id, context_) for game_id in game_ids_] + await asyncio.gather(*imports) + finally: + self._game_times_import_finished() + self._game_times_import_in_progress = False + self.game_times_import_complete() - async def get_rooms(self) -> List[Room]: - """WIP, Override this method to return the chat rooms in which the user is currently in. - This method is called by the GOG Galaxy Client - """ - raise NotImplementedError() + self._external_task_manager.create_task( + import_game_times(game_ids, context), + "game times import", + handle_exceptions=False + ) + self._game_times_import_in_progress = True - async def get_room_history_from_message(self, room_id: str, message_id: str) -> List[Message]: - """WIP, Override this method to return the chat room history since the message provided in parameter. - This method is called by the GOG Galaxy Client. + async def prepare_game_times_context(self, game_ids: List[str]) -> Any: + """Override this method to prepare context for get_game_time. + This allows for optimizations like batch requests to platform API. + Default implementation returns None. - :param room_id: id of the room - :param message_id: id of the message since which the history should be retrieved + :param game_ids: the ids of the games for which game time are imported + :return: context """ - raise NotImplementedError() + return None - async def get_room_history_from_timestamp(self, room_id: str, from_timestamp: int) -> List[Message]: - """WIP, Override this method to return the chat room history since the timestamp provided in parameter. - This method is called by the GOG Galaxy Client. - - :param room_id: id of the room - :param from_timestamp: timestamp since which the history should be retrieved - """ - raise NotImplementedError() + async def get_game_time(self, game_id: str, context: Any) -> GameTime: + """Override this method to return the game time for the game + identified by the provided game_id. + This method is called by import task initialized by GOG Galaxy Client. - async def get_game_times(self) -> List[GameTime]: - """ - .. deprecated:: 0.33 - Use :meth:`~.import_game_times`. + :param game_id: the id of the game for which the game time is returned + :param context: the value returned from :meth:`prepare_game_times_context` + :return: GameTime object """ raise NotImplementedError() - async def start_game_times_import(self, game_ids: List[str]) -> None: - """Starts the task of importing game times - This method is called by the GOG Galaxy Client. - - :param game_ids: ids of the games for which the game time is imported - """ - if self._game_times_import_in_progress: - raise ImportInProgress() - - async def import_game_times_task(game_ids): - try: - await self.import_game_times(game_ids) - finally: - self.game_times_import_finished() - self._game_times_import_in_progress = False - - asyncio.create_task(import_game_times_task(game_ids)) - self._game_times_import_in_progress = True - - async def import_game_times(self, game_ids: List[str]) -> None: + def game_times_import_complete(self) -> None: + """Override this method to handle operations after game times import is finished + (like updating cache). """ - Override this method to return game times for - games owned by the currently authenticated user. - Call game_time_import_success/game_time_import_failure for each game_id on the list. - This method is called by GOG Galaxy Client. - - :param game_ids: ids of the games for which the game time is imported - """ - try: - game_times = await self.get_game_times() - game_ids_set = set(game_ids) - for game_time in game_times: - if game_time.game_id not in game_ids_set: - continue - self.game_time_import_success(game_time) - game_ids_set.discard(game_time.game_id) - for game_id in game_ids_set: - self.game_time_import_failure(game_id, UnknownError()) - except Exception as error: - for game_id in game_ids: - self.game_time_import_failure(game_id, error) def create_and_run_plugin(plugin_class, argv): @@ -880,10 +799,13 @@ async def coroutine(): reader, writer = await asyncio.open_connection("127.0.0.1", port) extra_info = writer.get_extra_info("sockname") logging.info("Using local address: %s:%u", *extra_info) - plugin = plugin_class(reader, writer, token) - await plugin.run() + async with plugin_class(reader, writer, token) as plugin: + await plugin.run() try: + if sys.platform == "win32": + asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) + asyncio.run(coroutine()) except Exception: logging.exception("Error while running plugin") diff --git a/minecraft_macosx/galaxy/api/types.py b/minecraft_macosx/galaxy/api/types.py index 21466ac..37d55a3 100644 --- a/minecraft_macosx/galaxy/api/types.py +++ b/minecraft_macosx/galaxy/api/types.py @@ -1,7 +1,7 @@ from dataclasses import dataclass from typing import List, Dict, Optional -from galaxy.api.consts import LicenseType, LocalGameState, PresenceState +from galaxy.api.consts import LicenseType, LocalGameState @dataclass class Authentication(): @@ -61,7 +61,6 @@ async def authenticate(self, stored_credentials=None): :param auth_params: configuration options: {"window_title": :class:`str`, "window_width": :class:`str`, "window_height": :class:`int`, "start_uri": :class:`int`, "end_uri_regex": :class:`str`} :param cookies: browser initial set of cookies :param js: a map of the url regex patterns into the list of *js* scripts that should be executed on every document at given step of internal browser authentication. - """ next_step: str auth_params: Dict[str, str] @@ -130,34 +129,6 @@ class LocalGame(): game_id: str local_game_state: LocalGameState -@dataclass -class Presence(): - """Information about a presence of a user. - - :param presence_state: the state in which the user's presence is - :param game_id: id of the game which the user is currently playing - :param presence_status: optional attached string with the detailed description of the user's presence - """ - presence_state: PresenceState - game_id: Optional[str] = None - presence_status: Optional[str] = None - -@dataclass -class UserInfo(): - """Detailed information about a user. - - :param user_id: of the user - :param is_friend: whether the user is a friend of the currently authenticated user - :param user_name: of the user - :param avatar_url: to the avatar of the user - :param presence: about the users presence - """ - user_id: str - is_friend: bool - user_name: str - avatar_url: str - presence: Presence - @dataclass class FriendInfo(): """Information about a friend of the currently authenticated user. @@ -168,32 +139,6 @@ class FriendInfo(): user_id: str user_name: str -@dataclass -class Room(): - """WIP, Chatroom. - - :param room_id: id of the room - :param unread_message_count: number of unread messages in the room - :param last_message_id: id of the last message in the room - """ - room_id: str - unread_message_count: int - last_message_id: str - -@dataclass -class Message(): - """WIP, A chatroom message. - - :param message_id: id of the message - :param sender_id: id of the sender of the message - :param sent_time: time at which the message was sent - :param message_text: text attached to the message - """ - message_id: str - sender_id: str - sent_time: int - message_text: str - @dataclass class GameTime(): """Game time of a game, defines the total time spent in the game diff --git a/minecraft_macosx/galaxy/http.py b/minecraft_macosx/galaxy/http.py index 667f55a..615daa0 100644 --- a/minecraft_macosx/galaxy/http.py +++ b/minecraft_macosx/galaxy/http.py @@ -1,5 +1,37 @@ +""" +This module standarize http traffic and the error handling for further communication with the GOG Galaxy 2.0. + +It is recommended to use provided convenient methods for HTTP requests, especially when dealing with authorized sessions. +Examplary simple web service could looks like: + + .. code-block:: python + + import logging + from galaxy.http import create_client_session, handle_exception + + class BackendClient: + AUTH_URL = 'my-integration.com/auth' + HEADERS = { + "My-Custom-Header": "true", + } + def __init__(self): + self._session = create_client_session(headers=self.HEADERS) + + async def authenticate(self): + await self._session.request('POST', self.AUTH_URL) + + async def close(self): + # to be called on plugin shutdown + await self._session.close() + + async def _authorized_request(self, method, url, *args, **kwargs): + with handle_exceptions(): + return await self._session.request(method, url, *args, **kwargs) +""" + import asyncio import ssl +from contextlib import contextmanager from http import HTTPStatus import aiohttp @@ -12,44 +44,101 @@ ) +#: Default limit of the simultaneous connections for ssl connector. +DEFAULT_LIMIT = 20 +#: Default timeout in seconds used for client session. +DEFAULT_TIMEOUT = 60 + + class HttpClient: - def __init__(self, limit=20, timeout=aiohttp.ClientTimeout(total=60), cookie_jar=None): - ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) - ssl_context.load_verify_locations(certifi.where()) - connector = aiohttp.TCPConnector(limit=limit, ssl=ssl_context) - self._session = aiohttp.ClientSession(connector=connector, timeout=timeout, cookie_jar=cookie_jar) + """ + .. deprecated:: 0.41 + Use http module functions instead + """ + def __init__(self, limit=DEFAULT_LIMIT, timeout=aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT), cookie_jar=None): + connector = create_tcp_connector(limit=limit) + self._session = create_client_session(connector=connector, timeout=timeout, cookie_jar=cookie_jar) async def close(self): + """Closes connection. Should be called in :meth:`~galaxy.api.plugin.Plugin.shutdown`""" await self._session.close() async def request(self, method, url, *args, **kwargs): - try: - response = await self._session.request(method, url, *args, **kwargs) - except asyncio.TimeoutError: - raise BackendTimeout() - except aiohttp.ServerDisconnectedError: - raise BackendNotAvailable() - except aiohttp.ClientConnectionError: - raise NetworkError() - except aiohttp.ContentTypeError: - raise UnknownBackendResponse() - except aiohttp.ClientError: - logging.exception( - "Caught exception while running {} request for {}".format(method, url)) - raise UnknownError() - if response.status == HTTPStatus.UNAUTHORIZED: + with handle_exception(): + return await self._session.request(method, url, *args, **kwargs) + + +def create_tcp_connector(*args, **kwargs) -> aiohttp.TCPConnector: + """ + Creates TCP connector with resonable defaults. + For details about available parameters refer to + `aiohttp.TCPConnector `_ + """ + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + ssl_context.load_verify_locations(certifi.where()) + kwargs.setdefault("ssl", ssl_context) + kwargs.setdefault("limit", DEFAULT_LIMIT) + return aiohttp.TCPConnector(*args, **kwargs) # type: ignore due to https://github.com/python/mypy/issues/4001 + + +def create_client_session(*args, **kwargs) -> aiohttp.ClientSession: + """ + Creates client session with resonable defaults. + For details about available parameters refer to + `aiohttp.ClientSession `_ + + Examplary customization: + + .. code-block:: python + + from galaxy.http import create_client_session, create_tcp_connector + + session = create_client_session( + headers={ + "Keep-Alive": "true" + }, + connector=create_tcp_connector(limit=40), + timeout=100) + """ + kwargs.setdefault("connector", create_tcp_connector()) + kwargs.setdefault("timeout", aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT)) + kwargs.setdefault("raise_for_status", True) + return aiohttp.ClientSession(*args, **kwargs) # type: ignore due to https://github.com/python/mypy/issues/4001 + + +@contextmanager +def handle_exception(): + """ + Context manager translating network related exceptions + to custom :mod:`~galaxy.api.errors`. + """ + try: + yield + except asyncio.TimeoutError: + raise BackendTimeout() + except aiohttp.ServerDisconnectedError: + raise BackendNotAvailable() + except aiohttp.ClientConnectionError: + raise NetworkError() + except aiohttp.ContentTypeError: + raise UnknownBackendResponse() + except aiohttp.ClientResponseError as error: + if error.status == HTTPStatus.UNAUTHORIZED: raise AuthenticationRequired() - if response.status == HTTPStatus.FORBIDDEN: + if error.status == HTTPStatus.FORBIDDEN: raise AccessDenied() - if response.status == HTTPStatus.SERVICE_UNAVAILABLE: + if error.status == HTTPStatus.SERVICE_UNAVAILABLE: raise BackendNotAvailable() - if response.status == HTTPStatus.TOO_MANY_REQUESTS: + if error.status == HTTPStatus.TOO_MANY_REQUESTS: raise TooManyRequests() - if response.status >= 500: + if error.status >= 500: raise BackendError() - if response.status >= 400: + if error.status >= 400: logging.warning( - "Got status {} while running {} request for {}".format(response.status, method, url)) + "Got status %d while performing %s request for %s", + error.status, error.request_info.method, str(error.request_info.url) + ) raise UnknownError() - - return response + except aiohttp.ClientError: + logging.exception("Caught exception while performing request") + raise UnknownError() diff --git a/minecraft_macosx/galaxy/proc_tools.py b/minecraft_macosx/galaxy/proc_tools.py new file mode 100644 index 0000000..b0de0bc --- /dev/null +++ b/minecraft_macosx/galaxy/proc_tools.py @@ -0,0 +1,88 @@ +import sys +from dataclasses import dataclass +from typing import Iterable, NewType, Optional, List, cast + + + +ProcessId = NewType("ProcessId", int) + + +@dataclass +class ProcessInfo: + pid: ProcessId + binary_path: Optional[str] + + +if sys.platform == "win32": + from ctypes import byref, sizeof, windll, create_unicode_buffer, FormatError, WinError + from ctypes.wintypes import DWORD + + + def pids() -> Iterable[ProcessId]: + _PROC_ID_T = DWORD + list_size = 4096 + + def try_get_pids(list_size: int) -> List[ProcessId]: + result_size = DWORD() + proc_id_list = (_PROC_ID_T * list_size)() + + if not windll.psapi.EnumProcesses(byref(proc_id_list), sizeof(proc_id_list), byref(result_size)): + raise WinError(descr="Failed to get process ID list: %s" % FormatError()) # type: ignore + + return cast(List[ProcessId], proc_id_list[:int(result_size.value / sizeof(_PROC_ID_T()))]) + + while True: + proc_ids = try_get_pids(list_size) + if len(proc_ids) < list_size: + return proc_ids + + list_size *= 2 + + + def get_process_info(pid: ProcessId) -> Optional[ProcessInfo]: + _PROC_QUERY_LIMITED_INFORMATION = 0x1000 + + process_info = ProcessInfo(pid=pid, binary_path=None) + + h_process = windll.kernel32.OpenProcess(_PROC_QUERY_LIMITED_INFORMATION, False, pid) + if not h_process: + return process_info + + try: + def get_exe_path() -> Optional[str]: + _MAX_PATH = 260 + _WIN32_PATH_FORMAT = 0x0000 + + exe_path_buffer = create_unicode_buffer(_MAX_PATH) + exe_path_len = DWORD(len(exe_path_buffer)) + + return cast(str, exe_path_buffer[:exe_path_len.value]) if windll.kernel32.QueryFullProcessImageNameW( + h_process, _WIN32_PATH_FORMAT, exe_path_buffer, byref(exe_path_len) + ) else None + + process_info.binary_path = get_exe_path() + finally: + windll.kernel32.CloseHandle(h_process) + return process_info +else: + import psutil + + + def pids() -> Iterable[ProcessId]: + for pid in psutil.pids(): + yield pid + + + def get_process_info(pid: ProcessId) -> Optional[ProcessInfo]: + process_info = ProcessInfo(pid=pid, binary_path=None) + try: + process_info.binary_path = psutil.Process(pid=pid).as_dict(attrs=["exe"])["exe"] + except psutil.NoSuchProcess: + pass + finally: + return process_info + + +def process_iter() -> Iterable[Optional[ProcessInfo]]: + for pid in pids(): + yield get_process_info(pid) diff --git a/minecraft_macosx/galaxy/task_manager.py b/minecraft_macosx/galaxy/task_manager.py new file mode 100644 index 0000000..1f6d457 --- /dev/null +++ b/minecraft_macosx/galaxy/task_manager.py @@ -0,0 +1,49 @@ +import asyncio +import logging +from collections import OrderedDict +from itertools import count + +class TaskManager: + def __init__(self, name): + self._name = name + self._tasks = OrderedDict() + self._task_counter = count() + + def create_task(self, coro, description, handle_exceptions=True): + """Wrapper around asyncio.create_task - takes care of canceling tasks on shutdown""" + + async def task_wrapper(task_id): + try: + result = await coro + logging.debug("Task manager %s: finished task %d (%s)", self._name, task_id, description) + return result + except asyncio.CancelledError: + if handle_exceptions: + logging.debug("Task manager %s: canceled task %d (%s)", self._name, task_id, description) + else: + raise + except Exception: + if handle_exceptions: + logging.exception("Task manager %s: exception raised in task %d (%s)", self._name, task_id, description) + else: + raise + finally: + del self._tasks[task_id] + + task_id = next(self._task_counter) + logging.debug("Task manager %s: creating task %d (%s)", self._name, task_id, description) + task = asyncio.create_task(task_wrapper(task_id)) + self._tasks[task_id] = task + return task + + def cancel(self): + for task in self._tasks.values(): + task.cancel() + + async def wait(self): + # Tasks can spawn other tasks + while True: + tasks = self._tasks.values() + if not tasks: + return + await asyncio.gather(*tasks, return_exceptions=True) diff --git a/minecraft_macosx/galaxy/tools.py b/minecraft_macosx/galaxy/tools.py index 3996d25..8cb5540 100644 --- a/minecraft_macosx/galaxy/tools.py +++ b/minecraft_macosx/galaxy/tools.py @@ -3,6 +3,7 @@ import zipfile from glob import glob + def zip_folder(folder): files = glob(os.path.join(folder, "**"), recursive=True) files = [file.replace(folder + os.sep, "") for file in files] @@ -14,6 +15,7 @@ def zip_folder(folder): zipf.write(os.path.join(folder, file), arcname=file) return zip_buffer + def zip_folder_to_file(folder, filename): zip_content = zip_folder(folder).getbuffer() with open(filename, "wb") as archive: diff --git a/minecraft_macosx/galaxy/unittest/mock.py b/minecraft_macosx/galaxy/unittest/mock.py index 264c3fa..b439671 100644 --- a/minecraft_macosx/galaxy/unittest/mock.py +++ b/minecraft_macosx/galaxy/unittest/mock.py @@ -1,12 +1,31 @@ -from asyncio import coroutine +import asyncio from unittest.mock import MagicMock + class AsyncMock(MagicMock): + """ + .. deprecated:: 0.45 + Use: :class:`MagicMock` with meth:`~.async_return_value`. + """ async def __call__(self, *args, **kwargs): return super(AsyncMock, self).__call__(*args, **kwargs) + def coroutine_mock(): + """ + .. deprecated:: 0.45 + Use: :class:`MagicMock` with meth:`~.async_return_value`. + """ coro = MagicMock(name="CoroutineResult") - corofunc = MagicMock(name="CoroutineFunction", side_effect=coroutine(coro)) + corofunc = MagicMock(name="CoroutineFunction", side_effect=asyncio.coroutine(coro)) corofunc.coro = coro - return corofunc \ No newline at end of file + return corofunc + +async def skip_loop(iterations=1): + for _ in range(iterations): + await asyncio.sleep(0) + + +async def async_return_value(return_value, loop_iterations_delay=0): + await skip_loop(loop_iterations_delay) + return return_value diff --git a/minecraft_macosx/manifest.json b/minecraft_macosx/manifest.json index 8c98db6..e17b77e 100644 --- a/minecraft_macosx/manifest.json +++ b/minecraft_macosx/manifest.json @@ -1,6 +1,6 @@ { "name": "Galaxy Minecraft plugin", - "platform": "bb", + "platform": "minecraft", "guid": "cb57391f-1675-35b1-05c0-896d43bdf8f4", "version": "0.5", "description": "Galaxy Minecraft plugin", diff --git a/minecraft_macosx/plugin.py b/minecraft_macosx/plugin.py index 973bc58..5dd5fc2 100644 --- a/minecraft_macosx/plugin.py +++ b/minecraft_macosx/plugin.py @@ -22,7 +22,7 @@ class MinecraftPlugin(Plugin): def __init__(self, reader, writer, token): - super().__init__(Platform.BestBuy, __version__, reader, writer, token) + super().__init__(Platform.Minecraft, __version__, reader, writer, token) self.local_client = LocalClient() self.minecraft_launcher = None self.minecraft_uninstall_command = None diff --git a/minecraft_windows/bin/chardetect.exe b/minecraft_windows/bin/chardetect.exe index b6ef9bf05e14df72ea65d256ae94a10acdc85f49..65cc0613d9fd2740457c4515fb918500ffe635cb 100644 GIT binary patch delta 58 zcmeCW&N}HjYr_^sg)H&B)ROr8wD_dNl9HU%V%@U54AYd7?Ck0KS&RzIie75dgR&UE Kf*D@fj1B;Xu@oNw delta 50 zcmbPqowe&aYr_^sg)IK0)RL0a4Bec9vZ|tr)b#1*S&RzI8yY#LM`ba71v7%O865!b C)DndN diff --git a/minecraft_windows/galaxy/__init__.py b/minecraft_windows/galaxy/__init__.py index 69e3be5..97b69ed 100644 --- a/minecraft_windows/galaxy/__init__.py +++ b/minecraft_windows/galaxy/__init__.py @@ -1 +1 @@ -__path__ = __import__('pkgutil').extend_path(__path__, __name__) +__path__: str = __import__('pkgutil').extend_path(__path__, __name__) diff --git a/minecraft_windows/galaxy/api/consts.py b/minecraft_windows/galaxy/api/consts.py index d006714..d636613 100644 --- a/minecraft_windows/galaxy/api/consts.py +++ b/minecraft_windows/galaxy/api/consts.py @@ -81,6 +81,16 @@ class Platform(Enum): NintendoDs = "nds" Nintendo3Ds = "3ds" PathOfExile = "pathofexile" + Twitch = "twitch" + Minecraft = "minecraft" + GameSessions = "gamesessions" + Nuuvem = "nuuvem" + FXStore = "fxstore" + IndieGala = "indiegala" + Playfire = "playfire" + Oculus = "oculus" + Test = "test" + class Feature(Enum): """Possible features that can be implemented by an integration. @@ -98,6 +108,8 @@ class Feature(Enum): ImportUsers = "ImportUsers" VerifyGame = "VerifyGame" ImportFriends = "ImportFriends" + ShutdownPlatformClient = "ShutdownPlatformClient" + LaunchPlatformClient = "LaunchPlatformClient" class LicenseType(Enum): @@ -116,11 +128,3 @@ class LocalGameState(Flag): None_ = 0 Installed = 1 Running = 2 - - -class PresenceState(Enum): - """"Possible states that a user can be in.""" - Unknown = "Unknown" - Online = "online" - Offline = "offline" - Away = "away" diff --git a/minecraft_windows/galaxy/api/errors.py b/minecraft_windows/galaxy/api/errors.py index 6564b48..f53479f 100644 --- a/minecraft_windows/galaxy/api/errors.py +++ b/minecraft_windows/galaxy/api/errors.py @@ -1,6 +1,6 @@ from galaxy.api.jsonrpc import ApplicationError, UnknownError -UnknownError = UnknownError +assert UnknownError class AuthenticationRequired(ApplicationError): def __init__(self, data=None): diff --git a/minecraft_windows/galaxy/api/jsonrpc.py b/minecraft_windows/galaxy/api/jsonrpc.py index 87bff71..8b14ca7 100644 --- a/minecraft_windows/galaxy/api/jsonrpc.py +++ b/minecraft_windows/galaxy/api/jsonrpc.py @@ -6,6 +6,7 @@ import json from galaxy.reader import StreamLineReader +from galaxy.task_manager import TaskManager class JsonRpcError(Exception): def __init__(self, code, message, data=None): @@ -52,7 +53,8 @@ def __init__(self, data=None): super().__init__(0, "Unknown error", data) Request = namedtuple("Request", ["method", "params", "id"], defaults=[{}, None]) -Method = namedtuple("Method", ["callback", "signature", "internal", "sensitive_params"]) +Method = namedtuple("Method", ["callback", "signature", "immediate", "sensitive_params"]) + def anonymise_sensitive_params(params, sensitive_params): anomized_data = "****" @@ -74,9 +76,9 @@ def __init__(self, reader, writer, encoder=json.JSONEncoder()): self._encoder = encoder self._methods = {} self._notifications = {} - self._eof_listeners = [] + self._task_manager = TaskManager("jsonrpc server") - def register_method(self, name, callback, internal, sensitive_params=False): + def register_method(self, name, callback, immediate, sensitive_params=False): """ Register method @@ -86,9 +88,9 @@ def register_method(self, name, callback, internal, sensitive_params=False): :param sensitive_params: list of parameters that are anonymized before logging; \ if False - no params are considered sensitive, if True - all params are considered sensitive """ - self._methods[name] = Method(callback, inspect.signature(callback), internal, sensitive_params) + self._methods[name] = Method(callback, inspect.signature(callback), immediate, sensitive_params) - def register_notification(self, name, callback, internal, sensitive_params=False): + def register_notification(self, name, callback, immediate, sensitive_params=False): """ Register notification @@ -98,10 +100,7 @@ def register_notification(self, name, callback, internal, sensitive_params=False :param sensitive_params: list of parameters that are anonymized before logging; \ if False - no params are considered sensitive, if True - all params are considered sensitive """ - self._notifications[name] = Method(callback, inspect.signature(callback), internal, sensitive_params) - - def register_eof(self, callback): - self._eof_listeners.append(callback) + self._notifications[name] = Method(callback, inspect.signature(callback), immediate, sensitive_params) async def run(self): while self._active: @@ -118,14 +117,16 @@ async def run(self): self._handle_input(data) await asyncio.sleep(0) # To not starve task queue - def stop(self): + def close(self): + logging.info("Closing JSON-RPC server - not more messages will be read") self._active = False + async def wait_closed(self): + await self._task_manager.wait() + def _eof(self): logging.info("Received EOF") - self.stop() - for listener in self._eof_listeners: - listener() + self.close() def _handle_input(self, data): try: @@ -145,7 +146,7 @@ def _handle_notification(self, request): logging.error("Received unknown notification: %s", request.method) return - callback, signature, internal, sensitive_params = method + callback, signature, immediate, sensitive_params = method self._log_request(request, sensitive_params) try: @@ -153,12 +154,11 @@ def _handle_notification(self, request): except TypeError: self._send_error(request.id, InvalidParams()) - if internal: - # internal requests are handled immediately + if immediate: callback(*bound_args.args, **bound_args.kwargs) else: try: - asyncio.create_task(callback(*bound_args.args, **bound_args.kwargs)) + self._task_manager.create_task(callback(*bound_args.args, **bound_args.kwargs), request.method) except Exception: logging.exception("Unexpected exception raised in notification handler") @@ -169,7 +169,7 @@ def _handle_request(self, request): self._send_error(request.id, MethodNotFound()) return - callback, signature, internal, sensitive_params = method + callback, signature, immediate, sensitive_params = method self._log_request(request, sensitive_params) try: @@ -177,8 +177,7 @@ def _handle_request(self, request): except TypeError: self._send_error(request.id, InvalidParams()) - if internal: - # internal requests are handled immediately + if immediate: response = callback(*bound_args.args, **bound_args.kwargs) self._send_response(request.id, response) else: @@ -190,11 +189,13 @@ async def handle(): self._send_error(request.id, MethodNotFound()) except JsonRpcError as error: self._send_error(request.id, error) + except asyncio.CancelledError: + self._send_error(request.id, Aborted()) except Exception as e: #pylint: disable=broad-except logging.exception("Unexpected exception raised in plugin handler") self._send_error(request.id, UnknownError(str(e))) - asyncio.create_task(handle()) + self._task_manager.create_task(handle(), request.method) @staticmethod def _parse_request(data): @@ -215,7 +216,7 @@ def _send(self, data): logging.debug("Sending data: %s", line) data = (line + "\n").encode("utf-8") self._writer.write(data) - asyncio.create_task(self._writer.drain()) + self._task_manager.create_task(self._writer.drain(), "drain") except TypeError as error: logging.error(str(error)) @@ -255,6 +256,7 @@ def __init__(self, writer, encoder=json.JSONEncoder()): self._writer = writer self._encoder = encoder self._methods = {} + self._task_manager = TaskManager("notification client") def notify(self, method, params, sensitive_params=False): """ @@ -273,13 +275,16 @@ def notify(self, method, params, sensitive_params=False): self._log(method, params, sensitive_params) self._send(notification) + async def close(self): + await self._task_manager.wait() + def _send(self, data): try: line = self._encoder.encode(data) data = (line + "\n").encode("utf-8") logging.debug("Sending %d byte of data", len(data)) self._writer.write(data) - asyncio.create_task(self._writer.drain()) + self._task_manager.create_task(self._writer.drain(), "drain") except TypeError as error: logging.error("Failed to parse outgoing message: %s", str(error)) diff --git a/minecraft_windows/galaxy/api/plugin.py b/minecraft_windows/galaxy/api/plugin.py index bfa1d75..f573ebb 100644 --- a/minecraft_windows/galaxy/api/plugin.py +++ b/minecraft_windows/galaxy/api/plugin.py @@ -1,21 +1,17 @@ import asyncio +import dataclasses import json import logging import logging.handlers -import dataclasses -from enum import Enum -from collections import OrderedDict import sys +from enum import Enum +from typing import Any, Dict, List, Optional, Set, Union -from typing import Any, List, Dict, Optional, Union - -from galaxy.api.types import Achievement, Game, LocalGame, FriendInfo, GameTime, UserInfo, Room - -from galaxy.api.jsonrpc import Server, NotificationClient, ApplicationError from galaxy.api.consts import Feature -from galaxy.api.errors import UnknownError, ImportInProgress -from galaxy.api.types import Authentication, NextStep, Message - +from galaxy.api.errors import ImportInProgress, UnknownError +from galaxy.api.jsonrpc import ApplicationError, NotificationClient, Server +from galaxy.api.types import Achievement, Authentication, FriendInfo, Game, GameTime, LocalGame, NextStep +from galaxy.task_manager import TaskManager class JSONEncoder(json.JSONEncoder): def default(self, o): # pylint: disable=method-hidden @@ -23,6 +19,7 @@ def default(self, o): # pylint: disable=method-hidden # filter None values def dict_factory(elements): return {k: v for k, v in elements if v is not None} + return dataclasses.asdict(o, dict_factory=dict_factory) if isinstance(o, Enum): return o.value @@ -31,14 +28,14 @@ def dict_factory(elements): class Plugin: """Use and override methods of this class to create a new platform integration.""" + def __init__(self, platform, version, reader, writer, handshake_token): logging.info("Creating plugin for platform %s, version %s", platform.value, version) self._platform = platform self._version = version - self._feature_methods = OrderedDict() + self._features: Set[Feature] = set() self._active = True - self._pass_control_task = None self._reader, self._writer = reader, writer self._handshake_token = handshake_token @@ -47,25 +44,25 @@ def __init__(self, platform, version, reader, writer, handshake_token): self._server = Server(self._reader, self._writer, encoder) self._notification_client = NotificationClient(self._writer, encoder) - def eof_handler(): - self._shutdown() - self._server.register_eof(eof_handler) - self._achievements_import_in_progress = False self._game_times_import_in_progress = False self._persistent_cache = dict() + self._internal_task_manager = TaskManager("plugin internal") + self._external_task_manager = TaskManager("plugin external") + # internal self._register_method("shutdown", self._shutdown, internal=True) - self._register_method("get_capabilities", self._get_capabilities, internal=True) + self._register_method("get_capabilities", self._get_capabilities, internal=True, immediate=True) self._register_method( "initialize_cache", self._initialize_cache, internal=True, + immediate=True, sensitive_params="data" ) - self._register_method("ping", self._ping, internal=True) + self._register_method("ping", self._ping, internal=True, immediate=True) # implemented by developer self._register_method( @@ -81,139 +78,123 @@ def eof_handler(): self._register_method( "import_owned_games", self.get_owned_games, - result_name="owned_games", - feature=Feature.ImportOwnedGames - ) - self._register_method( - "import_unlocked_achievements", - self.get_unlocked_achievements, - result_name="unlocked_achievements", - feature=Feature.ImportAchievements - ) - self._register_method( - "start_achievements_import", - self.start_achievements_import, - ) - self._register_method( - "import_local_games", - self.get_local_games, - result_name="local_games", - feature=Feature.ImportInstalledGames - ) - self._register_notification("launch_game", self.launch_game, feature=Feature.LaunchGame) - self._register_notification("install_game", self.install_game, feature=Feature.InstallGame) - self._register_notification( - "uninstall_game", - self.uninstall_game, - feature=Feature.UninstallGame - ) - self._register_method( - "import_friends", - self.get_friends, - result_name="friend_info_list", - feature=Feature.ImportFriends - ) - self._register_method( - "import_user_infos", - self.get_users, - result_name="user_info_list", - feature=Feature.ImportUsers - ) - self._register_method( - "send_message", - self.send_message, - feature=Feature.Chat - ) - self._register_method( - "mark_as_read", - self.mark_as_read, - feature=Feature.Chat - ) - self._register_method( - "import_rooms", - self.get_rooms, - result_name="rooms", - feature=Feature.Chat - ) - self._register_method( - "import_room_history_from_message", - self.get_room_history_from_message, - result_name="messages", - feature=Feature.Chat - ) - self._register_method( - "import_room_history_from_timestamp", - self.get_room_history_from_timestamp, - result_name="messages", - feature=Feature.Chat - ) - self._register_method( - "import_game_times", - self.get_game_times, - result_name="game_times", - feature=Feature.ImportGameTime - ) - self._register_method( - "start_game_times_import", - self.start_game_times_import, + result_name="owned_games" ) + self._detect_feature(Feature.ImportOwnedGames, ["get_owned_games"]) - @property - def features(self): - features = [] - if self.__class__ != Plugin: - for feature, handlers in self._feature_methods.items(): - if self._implements(handlers): - features.append(feature) + self._register_method("start_achievements_import", self._start_achievements_import) + self._detect_feature(Feature.ImportAchievements, ["get_unlocked_achievements"]) + + self._register_method("import_local_games", self.get_local_games, result_name="local_games") + self._detect_feature(Feature.ImportInstalledGames, ["get_local_games"]) + + self._register_notification("launch_game", self.launch_game) + self._detect_feature(Feature.LaunchGame, ["launch_game"]) + + self._register_notification("install_game", self.install_game) + self._detect_feature(Feature.InstallGame, ["install_game"]) + + self._register_notification("uninstall_game", self.uninstall_game) + self._detect_feature(Feature.UninstallGame, ["uninstall_game"]) + + self._register_notification("shutdown_platform_client", self.shutdown_platform_client) + self._detect_feature(Feature.ShutdownPlatformClient, ["shutdown_platform_client"]) + + self._register_notification("launch_platform_client", self.launch_platform_client) + self._detect_feature(Feature.LaunchPlatformClient, ["launch_platform_client"]) + + self._register_method("import_friends", self.get_friends, result_name="friend_info_list") + self._detect_feature(Feature.ImportFriends, ["get_friends"]) + + self._register_method("start_game_times_import", self._start_game_times_import) + self._detect_feature(Feature.ImportGameTime, ["get_game_time"]) - return features + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + self.close() + await self.wait_closed() + + @property + def features(self) -> List[Feature]: + return list(self._features) @property - def persistent_cache(self) -> Dict: + def persistent_cache(self) -> Dict[str, str]: """The cache is only available after the :meth:`~.handshake_complete()` is called. """ return self._persistent_cache - def _implements(self, handlers): - for handler in handlers: - if handler.__name__ not in self.__class__.__dict__: + def _implements(self, methods: List[str]) -> bool: + for method in methods: + if method not in self.__class__.__dict__: return False return True - def _register_method(self, name, handler, result_name=None, internal=False, sensitive_params=False, feature=None): - if internal: + def _detect_feature(self, feature: Feature, methods: List[str]): + if self._implements(methods): + self._features.add(feature) + + def _register_method(self, name, handler, result_name=None, internal=False, immediate=False, sensitive_params=False): + def wrap_result(result): + if result_name: + result = { + result_name: result + } + return result + + if immediate: def method(*args, **kwargs): result = handler(*args, **kwargs) - if result_name: - result = { - result_name: result - } - return result + return wrap_result(result) + self._server.register_method(name, method, True, sensitive_params) else: async def method(*args, **kwargs): - result = await handler(*args, **kwargs) - if result_name: - result = { - result_name: result - } - return result - self._server.register_method(name, method, False, sensitive_params) + if not internal: + handler_ = self._wrap_external_method(handler, name) + else: + handler_ = handler + result = await handler_(*args, **kwargs) + return wrap_result(result) - if feature is not None: - self._feature_methods.setdefault(feature, []).append(handler) + self._server.register_method(name, method, False, sensitive_params) - def _register_notification(self, name, handler, internal=False, sensitive_params=False, feature=None): - self._server.register_notification(name, handler, internal, sensitive_params) + def _register_notification(self, name, handler, internal=False, immediate=False, sensitive_params=False): + if not internal and not immediate: + handler = self._wrap_external_method(handler, name) + self._server.register_notification(name, handler, immediate, sensitive_params) - if feature is not None: - self._feature_methods.setdefault(feature, []).append(handler) + def _wrap_external_method(self, handler, name: str): + async def wrapper(*args, **kwargs): + return await self._external_task_manager.create_task(handler(*args, **kwargs), name, False) + return wrapper async def run(self): """Plugin's main coroutine.""" await self._server.run() - if self._pass_control_task is not None: - await self._pass_control_task + await self._external_task_manager.wait() + + def close(self) -> None: + if not self._active: + return + + logging.info("Closing plugin") + self._server.close() + self._external_task_manager.cancel() + self._internal_task_manager.create_task(self.shutdown(), "shutdown") + self._active = False + + async def wait_closed(self) -> None: + await self._external_task_manager.wait() + await self._internal_task_manager.wait() + await self._server.wait_closed() + await self._notification_client.close() + + def create_task(self, coro, description): + """Wrapper around asyncio.create_task - takes care of canceling tasks on shutdown""" + return self._external_task_manager.create_task(coro, description) async def _pass_control(self): while self._active: @@ -223,11 +204,11 @@ async def _pass_control(self): logging.exception("Unexpected exception raised in plugin tick") await asyncio.sleep(1) - def _shutdown(self): + async def _shutdown(self): logging.info("Shutting down") - self._server.stop() - self._active = False - self.shutdown() + self.close() + await self._external_task_manager.wait() + await self._internal_task_manager.wait() def _get_capabilities(self): return { @@ -238,8 +219,11 @@ def _get_capabilities(self): def _initialize_cache(self, data: Dict): self._persistent_cache = data - self.handshake_complete() - self._pass_control_task = asyncio.create_task(self._pass_control()) + try: + self.handshake_complete() + except Exception: + logging.exception("Unhandled exception during `handshake_complete` step") + self._internal_task_manager.create_task(self._pass_control(), "tick") @staticmethod def _ping(): @@ -267,8 +251,10 @@ async def pass_login_credentials(self, step, credentials, cookies): self.store_credentials(user_data['credentials']) return Authentication(user_data['userId'], user_data['username']) - """ - self.persistent_cache['credentials'] = credentials + """ + # temporary solution for persistent_cache vs credentials issue + self.persistent_cache['credentials'] = credentials # type: ignore + self._notification_client.notify("store_credentials", credentials, sensitive_params=True) def add_game(self, game: Game) -> None: @@ -297,7 +283,7 @@ def remove_game(self, game_id: str) -> None: """Notify the client to remove game from the list of owned games of the currently authenticated user. - :param game_id: game id of the game to remove from the list of owned games + :param game_id: the id of the game to remove from the list of owned games Example use case of remove_game: @@ -327,7 +313,7 @@ def update_game(self, game: Game) -> None: def unlock_achievement(self, game_id: str, achievement: Achievement) -> None: """Notify the client to unlock an achievement for a specific game. - :param game_id: game_id of the game for which to unlock an achievement. + :param game_id: the id of the game for which to unlock an achievement. :param achievement: achievement to unlock. """ params = { @@ -336,26 +322,14 @@ def unlock_achievement(self, game_id: str, achievement: Achievement) -> None: } self._notification_client.notify("achievement_unlocked", params) - def game_achievements_import_success(self, game_id: str, achievements: List[Achievement]) -> None: - """Notify the client that import of achievements for a given game has succeeded. - This method is called by import_games_achievements. - - :param game_id: id of the game for which the achievements were imported - :param achievements: list of imported achievements - """ + def _game_achievements_import_success(self, game_id: str, achievements: List[Achievement]) -> None: params = { "game_id": game_id, "unlocked_achievements": achievements } self._notification_client.notify("game_achievements_import_success", params) - def game_achievements_import_failure(self, game_id: str, error: ApplicationError) -> None: - """Notify the client that import of achievements for a given game has failed. - This method is called by import_games_achievements. - - :param game_id: id of the game for which the achievements import failed - :param error: error which prevented the achievements import - """ + def _game_achievements_import_failure(self, game_id: str, error: ApplicationError) -> None: params = { "game_id": game_id, "error": { @@ -365,9 +339,7 @@ def game_achievements_import_failure(self, game_id: str, error: ApplicationError } self._notification_client.notify("game_achievements_import_failure", params) - def achievements_import_finished(self) -> None: - """Notify the client that importing achievements has finished. - This method is called by import_games_achievements_task""" + def _achievements_import_finished(self) -> None: self._notification_client.notify("achievements_import_finished", None) def update_local_game_status(self, local_game: LocalGame) -> None: @@ -387,7 +359,7 @@ async def _check_statuses(self): continue self.update_local_game_status(LocalGame(game.id, game.status)) self._cached_games_statuses[game.id] = game.status - asyncio.sleep(5) # interval + await asyncio.sleep(5) # interval def tick(self): if self._check_statuses_task is None or self._check_statuses_task.done(): @@ -412,26 +384,6 @@ def remove_friend(self, user_id: str) -> None: params = {"user_id": user_id} self._notification_client.notify("friend_removed", params) - def update_room( - self, - room_id: str, - unread_message_count: Optional[int]=None, - new_messages: Optional[List[Message]]=None - ) -> None: - """WIP, Notify the client to update the information regarding - a chat room that the currently authenticated user is in. - - :param room_id: id of the room to update - :param unread_message_count: information about the new unread message count in the room - :param new_messages: list of new messages that the user received - """ - params = {"room_id": room_id} - if unread_message_count is not None: - params["unread_message_count"] = unread_message_count - if new_messages is not None: - params["messages"] = new_messages - self._notification_client.notify("chat_room_updated", params) - def update_game_time(self, game_time: GameTime) -> None: """Notify the client to update game time for a game. @@ -440,22 +392,11 @@ def update_game_time(self, game_time: GameTime) -> None: params = {"game_time": game_time} self._notification_client.notify("game_time_updated", params) - def game_time_import_success(self, game_time: GameTime) -> None: - """Notify the client that import of a given game_time has succeeded. - This method is called by import_game_times. - - :param game_time: game_time which was imported - """ + def _game_time_import_success(self, game_time: GameTime) -> None: params = {"game_time": game_time} self._notification_client.notify("game_time_import_success", params) - def game_time_import_failure(self, game_id: str, error: ApplicationError) -> None: - """Notify the client that import of a game time for a given game has failed. - This method is called by import_game_times. - - :param game_id: id of the game for which the game time could not be imported - :param error: error which prevented the game time import - """ + def _game_time_import_failure(self, game_id: str, error: ApplicationError) -> None: params = { "game_id": game_id, "error": { @@ -465,10 +406,7 @@ def game_time_import_failure(self, game_id: str, error: ApplicationError) -> Non } self._notification_client.notify("game_time_import_failure", params) - def game_times_import_finished(self) -> None: - """Notify the client that importing game times has finished. - This method is called by :meth:`~.import_game_times_task`. - """ + def _game_times_import_finished(self) -> None: self._notification_client.notify("game_times_import_finished", None) def lost_authentication(self) -> None: @@ -514,7 +452,7 @@ def tick(self): """ - def shutdown(self) -> None: + async def shutdown(self) -> None: """This method is called on integration shutdown. Override it to implement tear down. This method is called by the GOG Galaxy Client.""" @@ -549,7 +487,7 @@ async def authenticate(self, stored_credentials=None): raise NotImplementedError() async def pass_login_credentials(self, step: str, credentials: Dict[str, str], cookies: List[Dict[str, str]]) \ - -> Union[NextStep, Authentication]: + -> Union[NextStep, Authentication]: """This method is called if we return galaxy.api.types.NextStep from authenticate or from pass_login_credentials. This method's parameters provide the data extracted from the web page navigation that previous NextStep finished on. This method should either return galaxy.api.types.Authentication if the authentication is finished @@ -597,50 +535,63 @@ async def get_owned_games(self): """ raise NotImplementedError() - async def get_unlocked_achievements(self, game_id: str) -> List[Achievement]: - """ - .. deprecated:: 0.33 - Use :meth:`~.import_games_achievements`. - """ - raise NotImplementedError() - - async def start_achievements_import(self, game_ids: List[str]) -> None: - """Starts the task of importing achievements. - This method is called by the GOG Galaxy Client. - - :param game_ids: ids of the games for which the achievements are imported - """ + async def _start_achievements_import(self, game_ids: List[str]) -> None: if self._achievements_import_in_progress: raise ImportInProgress() - async def import_games_achievements_task(game_ids): + context = await self.prepare_achievements_context(game_ids) + + async def import_game_achievements(game_id, context_): + try: + achievements = await self.get_unlocked_achievements(game_id, context_) + self._game_achievements_import_success(game_id, achievements) + except ApplicationError as error: + self._game_achievements_import_failure(game_id, error) + except Exception: + logging.exception("Unexpected exception raised in import_game_achievements") + self._game_achievements_import_failure(game_id, UnknownError()) + + async def import_games_achievements(game_ids_, context_): try: - await self.import_games_achievements(game_ids) + imports = [import_game_achievements(game_id, context_) for game_id in game_ids_] + await asyncio.gather(*imports) finally: - self.achievements_import_finished() + self._achievements_import_finished() self._achievements_import_in_progress = False + self.achievements_import_complete() - asyncio.create_task(import_games_achievements_task(game_ids)) + self._external_task_manager.create_task( + import_games_achievements(game_ids, context), + "unlocked achievements import", + handle_exceptions=False + ) self._achievements_import_in_progress = True - async def import_games_achievements(self, game_ids: List[str]) -> None: + async def prepare_achievements_context(self, game_ids: List[str]) -> Any: + """Override this method to prepare context for get_unlocked_achievements. + This allows for optimizations like batch requests to platform API. + Default implementation returns None. + + :param game_ids: the ids of the games for which achievements are imported + :return: context """ - Override this method to return the unlocked achievements - of the user that is currently logged in to the plugin. - Call game_achievements_import_success/game_achievements_import_failure for each game_id on the list. - This method is called by the GOG Galaxy Client. + return None + + async def get_unlocked_achievements(self, game_id: str, context: Any) -> List[Achievement]: + """Override this method to return list of unlocked achievements + for the game identified by the provided game_id. + This method is called by import task initialized by GOG Galaxy Client. - :param game_ids: ids of the games for which to import unlocked achievements + :param game_id: the id of the game for which the achievements are returned + :param context: the value returned from :meth:`prepare_achievements_context` + :return: list of Achievement objects """ - async def import_game_achievements(game_id): - try: - achievements = await self.get_unlocked_achievements(game_id) - self.game_achievements_import_success(game_id, achievements) - except Exception as error: - self.game_achievements_import_failure(game_id, error) + raise NotImplementedError() - imports = [import_game_achievements(game_id) for game_id in game_ids] - await asyncio.gather(*imports) + def achievements_import_complete(self): + """Override this method to handle operations after achievements import is finished + (like updating cache). + """ async def get_local_games(self) -> List[LocalGame]: """Override this method to return the list of @@ -669,7 +620,7 @@ async def launch_game(self, game_id: str) -> None: identified by the provided game_id. This method is called by the GOG Galaxy Client. - :param str game_id: id of the game to launch + :param str game_id: the id of the game to launch Example of possible override of the method: @@ -687,7 +638,7 @@ async def install_game(self, game_id: str) -> None: identified by the provided game_id. This method is called by the GOG Galaxy Client. - :param str game_id: id of the game to install + :param str game_id: the id of the game to install Example of possible override of the method: @@ -705,7 +656,7 @@ async def uninstall_game(self, game_id: str) -> None: identified by the provided game_id. This method is called by the GOG Galaxy Client. - :param str game_id: id of the game to uninstall + :param str game_id: the id of the game to uninstall Example of possible override of the method: @@ -718,6 +669,16 @@ async def uninstall_game(self, game_id): """ raise NotImplementedError() + async def shutdown_platform_client(self) -> None: + """Override this method to gracefully terminate platform client. + This method is called by the GOG Galaxy Client.""" + raise NotImplementedError() + + async def launch_platform_client(self) -> None: + """Override this method to launch platform client. Preferably minimized to tray. + This method is called by the GOG Galaxy Client.""" + raise NotImplementedError() + async def get_friends(self) -> List[FriendInfo]: """Override this method to return the friends list of the currently authenticated user. @@ -738,105 +699,63 @@ async def get_friends(self): """ raise NotImplementedError() - async def get_users(self, user_id_list: List[str]) -> List[UserInfo]: - """WIP, Override this method to return the list of users matching the provided ids. - This method is called by the GOG Galaxy Client. - - :param user_id_list: list of user ids - """ - raise NotImplementedError() - - async def send_message(self, room_id: str, message_text: str) -> None: - """WIP, Override this method to send message to a chat room. - This method is called by the GOG Galaxy Client. + async def _start_game_times_import(self, game_ids: List[str]) -> None: + if self._game_times_import_in_progress: + raise ImportInProgress() - :param room_id: id of the room to which the message should be sent - :param message_text: text which should be sent in the message - """ - raise NotImplementedError() + context = await self.prepare_game_times_context(game_ids) - async def mark_as_read(self, room_id: str, last_message_id: str) -> None: - """WIP, Override this method to mark messages in a chat room as read up to the id provided in the parameter. - This method is called by the GOG Galaxy Client. + async def import_game_time(game_id, context_): + try: + game_time = await self.get_game_time(game_id, context_) + self._game_time_import_success(game_time) + except ApplicationError as error: + self._game_time_import_failure(game_id, error) + except Exception: + logging.exception("Unexpected exception raised in import_game_time") + self._game_time_import_failure(game_id, UnknownError()) - :param room_id: id of the room - :param last_message_id: id of the last message; room is marked as read only if this id matches - the last message id known to the client - """ - raise NotImplementedError() + async def import_game_times(game_ids_, context_): + try: + imports = [import_game_time(game_id, context_) for game_id in game_ids_] + await asyncio.gather(*imports) + finally: + self._game_times_import_finished() + self._game_times_import_in_progress = False + self.game_times_import_complete() - async def get_rooms(self) -> List[Room]: - """WIP, Override this method to return the chat rooms in which the user is currently in. - This method is called by the GOG Galaxy Client - """ - raise NotImplementedError() + self._external_task_manager.create_task( + import_game_times(game_ids, context), + "game times import", + handle_exceptions=False + ) + self._game_times_import_in_progress = True - async def get_room_history_from_message(self, room_id: str, message_id: str) -> List[Message]: - """WIP, Override this method to return the chat room history since the message provided in parameter. - This method is called by the GOG Galaxy Client. + async def prepare_game_times_context(self, game_ids: List[str]) -> Any: + """Override this method to prepare context for get_game_time. + This allows for optimizations like batch requests to platform API. + Default implementation returns None. - :param room_id: id of the room - :param message_id: id of the message since which the history should be retrieved + :param game_ids: the ids of the games for which game time are imported + :return: context """ - raise NotImplementedError() + return None - async def get_room_history_from_timestamp(self, room_id: str, from_timestamp: int) -> List[Message]: - """WIP, Override this method to return the chat room history since the timestamp provided in parameter. - This method is called by the GOG Galaxy Client. - - :param room_id: id of the room - :param from_timestamp: timestamp since which the history should be retrieved - """ - raise NotImplementedError() + async def get_game_time(self, game_id: str, context: Any) -> GameTime: + """Override this method to return the game time for the game + identified by the provided game_id. + This method is called by import task initialized by GOG Galaxy Client. - async def get_game_times(self) -> List[GameTime]: - """ - .. deprecated:: 0.33 - Use :meth:`~.import_game_times`. + :param game_id: the id of the game for which the game time is returned + :param context: the value returned from :meth:`prepare_game_times_context` + :return: GameTime object """ raise NotImplementedError() - async def start_game_times_import(self, game_ids: List[str]) -> None: - """Starts the task of importing game times - This method is called by the GOG Galaxy Client. - - :param game_ids: ids of the games for which the game time is imported - """ - if self._game_times_import_in_progress: - raise ImportInProgress() - - async def import_game_times_task(game_ids): - try: - await self.import_game_times(game_ids) - finally: - self.game_times_import_finished() - self._game_times_import_in_progress = False - - asyncio.create_task(import_game_times_task(game_ids)) - self._game_times_import_in_progress = True - - async def import_game_times(self, game_ids: List[str]) -> None: + def game_times_import_complete(self) -> None: + """Override this method to handle operations after game times import is finished + (like updating cache). """ - Override this method to return game times for - games owned by the currently authenticated user. - Call game_time_import_success/game_time_import_failure for each game_id on the list. - This method is called by GOG Galaxy Client. - - :param game_ids: ids of the games for which the game time is imported - """ - try: - game_times = await self.get_game_times() - game_ids_set = set(game_ids) - for game_time in game_times: - if game_time.game_id not in game_ids_set: - continue - self.game_time_import_success(game_time) - game_ids_set.discard(game_time.game_id) - for game_id in game_ids_set: - self.game_time_import_failure(game_id, UnknownError()) - except Exception as error: - for game_id in game_ids: - self.game_time_import_failure(game_id, error) def create_and_run_plugin(plugin_class, argv): @@ -880,10 +799,13 @@ async def coroutine(): reader, writer = await asyncio.open_connection("127.0.0.1", port) extra_info = writer.get_extra_info("sockname") logging.info("Using local address: %s:%u", *extra_info) - plugin = plugin_class(reader, writer, token) - await plugin.run() + async with plugin_class(reader, writer, token) as plugin: + await plugin.run() try: + if sys.platform == "win32": + asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) + asyncio.run(coroutine()) except Exception: logging.exception("Error while running plugin") diff --git a/minecraft_windows/galaxy/api/types.py b/minecraft_windows/galaxy/api/types.py index 21466ac..37d55a3 100644 --- a/minecraft_windows/galaxy/api/types.py +++ b/minecraft_windows/galaxy/api/types.py @@ -1,7 +1,7 @@ from dataclasses import dataclass from typing import List, Dict, Optional -from galaxy.api.consts import LicenseType, LocalGameState, PresenceState +from galaxy.api.consts import LicenseType, LocalGameState @dataclass class Authentication(): @@ -61,7 +61,6 @@ async def authenticate(self, stored_credentials=None): :param auth_params: configuration options: {"window_title": :class:`str`, "window_width": :class:`str`, "window_height": :class:`int`, "start_uri": :class:`int`, "end_uri_regex": :class:`str`} :param cookies: browser initial set of cookies :param js: a map of the url regex patterns into the list of *js* scripts that should be executed on every document at given step of internal browser authentication. - """ next_step: str auth_params: Dict[str, str] @@ -130,34 +129,6 @@ class LocalGame(): game_id: str local_game_state: LocalGameState -@dataclass -class Presence(): - """Information about a presence of a user. - - :param presence_state: the state in which the user's presence is - :param game_id: id of the game which the user is currently playing - :param presence_status: optional attached string with the detailed description of the user's presence - """ - presence_state: PresenceState - game_id: Optional[str] = None - presence_status: Optional[str] = None - -@dataclass -class UserInfo(): - """Detailed information about a user. - - :param user_id: of the user - :param is_friend: whether the user is a friend of the currently authenticated user - :param user_name: of the user - :param avatar_url: to the avatar of the user - :param presence: about the users presence - """ - user_id: str - is_friend: bool - user_name: str - avatar_url: str - presence: Presence - @dataclass class FriendInfo(): """Information about a friend of the currently authenticated user. @@ -168,32 +139,6 @@ class FriendInfo(): user_id: str user_name: str -@dataclass -class Room(): - """WIP, Chatroom. - - :param room_id: id of the room - :param unread_message_count: number of unread messages in the room - :param last_message_id: id of the last message in the room - """ - room_id: str - unread_message_count: int - last_message_id: str - -@dataclass -class Message(): - """WIP, A chatroom message. - - :param message_id: id of the message - :param sender_id: id of the sender of the message - :param sent_time: time at which the message was sent - :param message_text: text attached to the message - """ - message_id: str - sender_id: str - sent_time: int - message_text: str - @dataclass class GameTime(): """Game time of a game, defines the total time spent in the game diff --git a/minecraft_windows/galaxy/http.py b/minecraft_windows/galaxy/http.py index 667f55a..615daa0 100644 --- a/minecraft_windows/galaxy/http.py +++ b/minecraft_windows/galaxy/http.py @@ -1,5 +1,37 @@ +""" +This module standarize http traffic and the error handling for further communication with the GOG Galaxy 2.0. + +It is recommended to use provided convenient methods for HTTP requests, especially when dealing with authorized sessions. +Examplary simple web service could looks like: + + .. code-block:: python + + import logging + from galaxy.http import create_client_session, handle_exception + + class BackendClient: + AUTH_URL = 'my-integration.com/auth' + HEADERS = { + "My-Custom-Header": "true", + } + def __init__(self): + self._session = create_client_session(headers=self.HEADERS) + + async def authenticate(self): + await self._session.request('POST', self.AUTH_URL) + + async def close(self): + # to be called on plugin shutdown + await self._session.close() + + async def _authorized_request(self, method, url, *args, **kwargs): + with handle_exceptions(): + return await self._session.request(method, url, *args, **kwargs) +""" + import asyncio import ssl +from contextlib import contextmanager from http import HTTPStatus import aiohttp @@ -12,44 +44,101 @@ ) +#: Default limit of the simultaneous connections for ssl connector. +DEFAULT_LIMIT = 20 +#: Default timeout in seconds used for client session. +DEFAULT_TIMEOUT = 60 + + class HttpClient: - def __init__(self, limit=20, timeout=aiohttp.ClientTimeout(total=60), cookie_jar=None): - ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) - ssl_context.load_verify_locations(certifi.where()) - connector = aiohttp.TCPConnector(limit=limit, ssl=ssl_context) - self._session = aiohttp.ClientSession(connector=connector, timeout=timeout, cookie_jar=cookie_jar) + """ + .. deprecated:: 0.41 + Use http module functions instead + """ + def __init__(self, limit=DEFAULT_LIMIT, timeout=aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT), cookie_jar=None): + connector = create_tcp_connector(limit=limit) + self._session = create_client_session(connector=connector, timeout=timeout, cookie_jar=cookie_jar) async def close(self): + """Closes connection. Should be called in :meth:`~galaxy.api.plugin.Plugin.shutdown`""" await self._session.close() async def request(self, method, url, *args, **kwargs): - try: - response = await self._session.request(method, url, *args, **kwargs) - except asyncio.TimeoutError: - raise BackendTimeout() - except aiohttp.ServerDisconnectedError: - raise BackendNotAvailable() - except aiohttp.ClientConnectionError: - raise NetworkError() - except aiohttp.ContentTypeError: - raise UnknownBackendResponse() - except aiohttp.ClientError: - logging.exception( - "Caught exception while running {} request for {}".format(method, url)) - raise UnknownError() - if response.status == HTTPStatus.UNAUTHORIZED: + with handle_exception(): + return await self._session.request(method, url, *args, **kwargs) + + +def create_tcp_connector(*args, **kwargs) -> aiohttp.TCPConnector: + """ + Creates TCP connector with resonable defaults. + For details about available parameters refer to + `aiohttp.TCPConnector `_ + """ + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + ssl_context.load_verify_locations(certifi.where()) + kwargs.setdefault("ssl", ssl_context) + kwargs.setdefault("limit", DEFAULT_LIMIT) + return aiohttp.TCPConnector(*args, **kwargs) # type: ignore due to https://github.com/python/mypy/issues/4001 + + +def create_client_session(*args, **kwargs) -> aiohttp.ClientSession: + """ + Creates client session with resonable defaults. + For details about available parameters refer to + `aiohttp.ClientSession `_ + + Examplary customization: + + .. code-block:: python + + from galaxy.http import create_client_session, create_tcp_connector + + session = create_client_session( + headers={ + "Keep-Alive": "true" + }, + connector=create_tcp_connector(limit=40), + timeout=100) + """ + kwargs.setdefault("connector", create_tcp_connector()) + kwargs.setdefault("timeout", aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT)) + kwargs.setdefault("raise_for_status", True) + return aiohttp.ClientSession(*args, **kwargs) # type: ignore due to https://github.com/python/mypy/issues/4001 + + +@contextmanager +def handle_exception(): + """ + Context manager translating network related exceptions + to custom :mod:`~galaxy.api.errors`. + """ + try: + yield + except asyncio.TimeoutError: + raise BackendTimeout() + except aiohttp.ServerDisconnectedError: + raise BackendNotAvailable() + except aiohttp.ClientConnectionError: + raise NetworkError() + except aiohttp.ContentTypeError: + raise UnknownBackendResponse() + except aiohttp.ClientResponseError as error: + if error.status == HTTPStatus.UNAUTHORIZED: raise AuthenticationRequired() - if response.status == HTTPStatus.FORBIDDEN: + if error.status == HTTPStatus.FORBIDDEN: raise AccessDenied() - if response.status == HTTPStatus.SERVICE_UNAVAILABLE: + if error.status == HTTPStatus.SERVICE_UNAVAILABLE: raise BackendNotAvailable() - if response.status == HTTPStatus.TOO_MANY_REQUESTS: + if error.status == HTTPStatus.TOO_MANY_REQUESTS: raise TooManyRequests() - if response.status >= 500: + if error.status >= 500: raise BackendError() - if response.status >= 400: + if error.status >= 400: logging.warning( - "Got status {} while running {} request for {}".format(response.status, method, url)) + "Got status %d while performing %s request for %s", + error.status, error.request_info.method, str(error.request_info.url) + ) raise UnknownError() - - return response + except aiohttp.ClientError: + logging.exception("Caught exception while performing request") + raise UnknownError() diff --git a/minecraft_windows/galaxy/proc_tools.py b/minecraft_windows/galaxy/proc_tools.py new file mode 100644 index 0000000..b0de0bc --- /dev/null +++ b/minecraft_windows/galaxy/proc_tools.py @@ -0,0 +1,88 @@ +import sys +from dataclasses import dataclass +from typing import Iterable, NewType, Optional, List, cast + + + +ProcessId = NewType("ProcessId", int) + + +@dataclass +class ProcessInfo: + pid: ProcessId + binary_path: Optional[str] + + +if sys.platform == "win32": + from ctypes import byref, sizeof, windll, create_unicode_buffer, FormatError, WinError + from ctypes.wintypes import DWORD + + + def pids() -> Iterable[ProcessId]: + _PROC_ID_T = DWORD + list_size = 4096 + + def try_get_pids(list_size: int) -> List[ProcessId]: + result_size = DWORD() + proc_id_list = (_PROC_ID_T * list_size)() + + if not windll.psapi.EnumProcesses(byref(proc_id_list), sizeof(proc_id_list), byref(result_size)): + raise WinError(descr="Failed to get process ID list: %s" % FormatError()) # type: ignore + + return cast(List[ProcessId], proc_id_list[:int(result_size.value / sizeof(_PROC_ID_T()))]) + + while True: + proc_ids = try_get_pids(list_size) + if len(proc_ids) < list_size: + return proc_ids + + list_size *= 2 + + + def get_process_info(pid: ProcessId) -> Optional[ProcessInfo]: + _PROC_QUERY_LIMITED_INFORMATION = 0x1000 + + process_info = ProcessInfo(pid=pid, binary_path=None) + + h_process = windll.kernel32.OpenProcess(_PROC_QUERY_LIMITED_INFORMATION, False, pid) + if not h_process: + return process_info + + try: + def get_exe_path() -> Optional[str]: + _MAX_PATH = 260 + _WIN32_PATH_FORMAT = 0x0000 + + exe_path_buffer = create_unicode_buffer(_MAX_PATH) + exe_path_len = DWORD(len(exe_path_buffer)) + + return cast(str, exe_path_buffer[:exe_path_len.value]) if windll.kernel32.QueryFullProcessImageNameW( + h_process, _WIN32_PATH_FORMAT, exe_path_buffer, byref(exe_path_len) + ) else None + + process_info.binary_path = get_exe_path() + finally: + windll.kernel32.CloseHandle(h_process) + return process_info +else: + import psutil + + + def pids() -> Iterable[ProcessId]: + for pid in psutil.pids(): + yield pid + + + def get_process_info(pid: ProcessId) -> Optional[ProcessInfo]: + process_info = ProcessInfo(pid=pid, binary_path=None) + try: + process_info.binary_path = psutil.Process(pid=pid).as_dict(attrs=["exe"])["exe"] + except psutil.NoSuchProcess: + pass + finally: + return process_info + + +def process_iter() -> Iterable[Optional[ProcessInfo]]: + for pid in pids(): + yield get_process_info(pid) diff --git a/minecraft_windows/galaxy/task_manager.py b/minecraft_windows/galaxy/task_manager.py new file mode 100644 index 0000000..1f6d457 --- /dev/null +++ b/minecraft_windows/galaxy/task_manager.py @@ -0,0 +1,49 @@ +import asyncio +import logging +from collections import OrderedDict +from itertools import count + +class TaskManager: + def __init__(self, name): + self._name = name + self._tasks = OrderedDict() + self._task_counter = count() + + def create_task(self, coro, description, handle_exceptions=True): + """Wrapper around asyncio.create_task - takes care of canceling tasks on shutdown""" + + async def task_wrapper(task_id): + try: + result = await coro + logging.debug("Task manager %s: finished task %d (%s)", self._name, task_id, description) + return result + except asyncio.CancelledError: + if handle_exceptions: + logging.debug("Task manager %s: canceled task %d (%s)", self._name, task_id, description) + else: + raise + except Exception: + if handle_exceptions: + logging.exception("Task manager %s: exception raised in task %d (%s)", self._name, task_id, description) + else: + raise + finally: + del self._tasks[task_id] + + task_id = next(self._task_counter) + logging.debug("Task manager %s: creating task %d (%s)", self._name, task_id, description) + task = asyncio.create_task(task_wrapper(task_id)) + self._tasks[task_id] = task + return task + + def cancel(self): + for task in self._tasks.values(): + task.cancel() + + async def wait(self): + # Tasks can spawn other tasks + while True: + tasks = self._tasks.values() + if not tasks: + return + await asyncio.gather(*tasks, return_exceptions=True) diff --git a/minecraft_windows/galaxy/tools.py b/minecraft_windows/galaxy/tools.py index 3996d25..8cb5540 100644 --- a/minecraft_windows/galaxy/tools.py +++ b/minecraft_windows/galaxy/tools.py @@ -3,6 +3,7 @@ import zipfile from glob import glob + def zip_folder(folder): files = glob(os.path.join(folder, "**"), recursive=True) files = [file.replace(folder + os.sep, "") for file in files] @@ -14,6 +15,7 @@ def zip_folder(folder): zipf.write(os.path.join(folder, file), arcname=file) return zip_buffer + def zip_folder_to_file(folder, filename): zip_content = zip_folder(folder).getbuffer() with open(filename, "wb") as archive: diff --git a/minecraft_windows/galaxy/unittest/mock.py b/minecraft_windows/galaxy/unittest/mock.py index 264c3fa..b439671 100644 --- a/minecraft_windows/galaxy/unittest/mock.py +++ b/minecraft_windows/galaxy/unittest/mock.py @@ -1,12 +1,31 @@ -from asyncio import coroutine +import asyncio from unittest.mock import MagicMock + class AsyncMock(MagicMock): + """ + .. deprecated:: 0.45 + Use: :class:`MagicMock` with meth:`~.async_return_value`. + """ async def __call__(self, *args, **kwargs): return super(AsyncMock, self).__call__(*args, **kwargs) + def coroutine_mock(): + """ + .. deprecated:: 0.45 + Use: :class:`MagicMock` with meth:`~.async_return_value`. + """ coro = MagicMock(name="CoroutineResult") - corofunc = MagicMock(name="CoroutineFunction", side_effect=coroutine(coro)) + corofunc = MagicMock(name="CoroutineFunction", side_effect=asyncio.coroutine(coro)) corofunc.coro = coro - return corofunc \ No newline at end of file + return corofunc + +async def skip_loop(iterations=1): + for _ in range(iterations): + await asyncio.sleep(0) + + +async def async_return_value(return_value, loop_iterations_delay=0): + await skip_loop(loop_iterations_delay) + return return_value diff --git a/minecraft_windows/manifest.json b/minecraft_windows/manifest.json index 8c98db6..e17b77e 100644 --- a/minecraft_windows/manifest.json +++ b/minecraft_windows/manifest.json @@ -1,6 +1,6 @@ { "name": "Galaxy Minecraft plugin", - "platform": "bb", + "platform": "minecraft", "guid": "cb57391f-1675-35b1-05c0-896d43bdf8f4", "version": "0.5", "description": "Galaxy Minecraft plugin", diff --git a/minecraft_windows/plugin.py b/minecraft_windows/plugin.py index 973bc58..5dd5fc2 100644 --- a/minecraft_windows/plugin.py +++ b/minecraft_windows/plugin.py @@ -22,7 +22,7 @@ class MinecraftPlugin(Plugin): def __init__(self, reader, writer, token): - super().__init__(Platform.BestBuy, __version__, reader, writer, token) + super().__init__(Platform.Minecraft, __version__, reader, writer, token) self.local_client = LocalClient() self.minecraft_launcher = None self.minecraft_uninstall_command = None