From 3f12b6737e18116cf9b51d2f1d472edb7254eb7c Mon Sep 17 00:00:00 2001 From: slitherrr Date: Mon, 10 Feb 2014 09:01:48 -0800 Subject: [PATCH 1/2] Some SQLAlchemy session cleanup Rebuilt #93 on development, then split out the plugin part. This is the session handling cleanup. The point is to abstract any session management away from the endpoints are using the data, and to initiate a session per transaction rather than leaving one session open for the lifetime of the server (the latter was causing concurrency issues when different factories were accessing the session). --- base_plugin.py | 7 +- plugin_manager.py | 4 +- .../admin_command_plugin.py | 22 +- plugins/core/colored_names/colored_names.py | 1 - plugins/core/player_manager/manager.py | 222 +++++++++++++----- plugins/core/player_manager/plugin.py | 14 +- .../planet_protect/planet_protect_plugin.py | 2 +- server.py | 38 ++- 8 files changed, 209 insertions(+), 101 deletions(-) diff --git a/base_plugin.py b/base_plugin.py index c6f9401..2ca7153 100644 --- a/base_plugin.py +++ b/base_plugin.py @@ -3,11 +3,11 @@ class BasePlugin(object): Defines an interface for all plugins to inherit from. Note that the __init__ method should generally not be overrode; all setup work should be done in activate() if possible. If you do override __init__, remember to super()! - + Note that only one instance of each plugin will be instantiated for *all* connected clients. self.protocol will be changed by the plugin manager to the current protocol. - + You may access the factory if necessary via self.factory.protocols to access other clients, but this "Is Not A Very Good Idea" (tm) @@ -357,8 +357,7 @@ def activate(self): for alias in alias_list: self.plugins['command_dispatcher'].register(alias, command) - def deactivate(self): super(SimpleCommandPlugin, self).deactivate() for command in self.commands: - self.plugins['command_dispatcher'].unregister(command) \ No newline at end of file + self.plugins['command_dispatcher'].unregister(command) diff --git a/plugin_manager.py b/plugin_manager.py index eb278d8..d55dc74 100644 --- a/plugin_manager.py +++ b/plugin_manager.py @@ -119,7 +119,6 @@ def load_plugins(self, plugin_dir): self.logger.critical(str(e)) self.activate_plugins() - def reload_plugins(self): self.logger.warning("Reloading plugins.") for x in self.plugins: @@ -132,7 +131,6 @@ def reload_plugins(self): self.logger.exception("Couldn't reload plugins!") raise - def activate_plugins(self): for plugin in [self.plugins[x] for x in self.load_order]: if self.config.config['plugin_config'][plugin.name]['auto_activate']: @@ -217,4 +215,4 @@ def print_this_defered_failure(f): class FatalPluginError(Exception): - pass \ No newline at end of file + pass diff --git a/plugins/core/admin_commands_plugin/admin_command_plugin.py b/plugins/core/admin_commands_plugin/admin_command_plugin.py index b0b128e..a8dfe03 100644 --- a/plugins/core/admin_commands_plugin/admin_command_plugin.py +++ b/plugins/core/admin_commands_plugin/admin_command_plugin.py @@ -107,41 +107,21 @@ def promote(self, data): def make_guest(self, player): self.logger.trace("Setting %s to GUEST", player.name) player.access_level = UserLevels.GUEST - try: - self.player_manager.session.commit() - except: - self.player_manager.session.rollback() - raise @permissions(UserLevels.MODERATOR) def make_registered(self, player): self.logger.trace("Setting %s to REGISTERED", player.name) player.access_level = UserLevels.REGISTERED - try: - self.player_manager.session.commit() - except: - self.player_manager.session.rollback() - raise @permissions(UserLevels.ADMIN) def make_mod(self, player): player.access_level = UserLevels.MODERATOR self.logger.trace("Setting %s to MODERATOR", player.name) - try: - self.player_manager.session.commit() - except: - self.player_manager.session.rollback() - raise @permissions(UserLevels.OWNER) def make_admin(self, player): self.logger.trace("Setting %s to ADMIN", player.name) player.access_level = UserLevels.ADMIN - try: - self.player_manager.session.commit() - except: - self.player_manager.session.rollback() - raise @permissions(UserLevels.MODERATOR) def kick(self, data): @@ -198,7 +178,7 @@ def unban(self, data): ip = data[0] for ban in self.player_manager.bans: if ban.ip == ip: - self.player_manager.session.delete(ban) + self.player_manager.remove_ban(ban) self.protocol.send_chat_message("Unbanned IP: %s" % ip) break else: diff --git a/plugins/core/colored_names/colored_names.py b/plugins/core/colored_names/colored_names.py index 1ed9721..d9df066 100644 --- a/plugins/core/colored_names/colored_names.py +++ b/plugins/core/colored_names/colored_names.py @@ -27,4 +27,3 @@ def on_chat_received(self, data): self.logger.warning("Received AttributeError in colored_name. %s", str(e)) self.protocol.transport.write(data.original_data) return False - diff --git a/plugins/core/player_manager/manager.py b/plugins/core/player_manager/manager.py index 1f288e0..a86c680 100644 --- a/plugins/core/player_manager/manager.py +++ b/plugins/core/player_manager/manager.py @@ -1,3 +1,4 @@ +from contextlib import contextmanager import datetime from functools import wraps import inspect @@ -6,13 +7,29 @@ from enum import Enum from sqlalchemy.ext.mutable import Mutable -from sqlalchemy.orm import Session, relationship, backref +from sqlalchemy.orm import sessionmaker, relationship, backref from sqlalchemy import create_engine, Column, Integer, String, DateTime, ForeignKey, Boolean, func from sqlalchemy.ext.declarative import declarative_base as sqla_declarative_base from twisted.words.ewords import AlreadyLoggedIn from sqlalchemy.types import TypeDecorator, VARCHAR from utility_functions import path + +@contextmanager +def _autoclosing_session(sm): + session = sm() + + try: + yield session + + except: + session.rollback() + raise + + finally: + session.close() + + class JSONEncodedDict(TypeDecorator): impl = VARCHAR @@ -91,6 +108,32 @@ def __delitem__(self, key): MutableDict.associate_with(JSONEncodedDict) +class RecordWithAttachedSession(object): + def __init__(self, record, sessionmaker): + self.__dict__['record'] = record + self.__dict__['sessionmaker'] = sessionmaker + + def __getattr__(self, name): + with _autoclosing_session(self.sessionmaker) as session: + if sessionmaker.object_session(self.record) != session: + session.add(self.record) + session.refresh(self.record) + val = getattr(self.record, name) + + return val + + def __setattr__(self, name, val): + with _autoclosing_session(self.sessionmaker) as session: + if sessionmaker.object_session(self.record) != session: + session.add(self.record) + session.refresh(self.record) + setattr(self.record, name, val) + session.merge(self.record) + session.commit() + + return val + + class Player(Base): __tablename__ = 'players' @@ -160,75 +203,144 @@ class PlayerManager(object): def __init__(self, config): self.config = config self.engine = create_engine('sqlite:///%s' % path.preauthChild(self.config.player_db).path) - self.session = Session(self.engine) Base.metadata.create_all(self.engine) - for player in self.session.query(Player).all(): - player.logged_in = False - player.protocol = None + self.sessionmaker = sessionmaker(bind=self.engine, autoflush=True) + with _autoclosing_session(self.sessionmaker) as session: + for player in session.query(Player).all(): + player.logged_in = False + player.protocol = None + session.commit() + + def _cache_and_return_from_session(self, session, record, collection=False): + to_return = record + + if not isinstance(record, Base): + return to_return + + if collection: + to_return = [] + for r in record: + to_return.append(RecordWithAttachedSession(r, self.sessionmaker)) + else: + to_return = RecordWithAttachedSession(record, self.sessionmaker) + + return to_return def fetch_or_create(self, uuid, name, ip, protocol=None): - if self.session.query(Player).filter_by(uuid=uuid, logged_in=True).first(): - raise AlreadyLoggedIn - if self.check_bans(ip): - raise Banned - while self.whois(name): - logger.info("Got a duplicate player, affixing _ to name") - name += "_" - player = self.session.query(Player).filter_by(uuid=uuid).first() - if player: - if player.name != name: - logger.info("Detected username change.") - player.name = name - if ip not in player.ips: - player.ips.append(IPAddress(ip=ip)) - player.ip = ip - player.protocol = protocol - else: - logger.info("Adding new player with name: %s" % name) - player = Player(uuid=uuid, name=name, - last_seen=datetime.datetime.utcnow(), - access_level=int(UserLevels.GUEST), - logged_in=False, - protocol=protocol, - client_id=-1, - ip=ip, - planet="", - on_ship=True) - player.ips = [IPAddress(ip=ip)] - self.session.add(player) - if uuid == self.config.owner_uuid: - player.access_level = int(UserLevels.OWNER) - try: - self.session.commit() - except: - self.session.rollback() - raise - return player + with _autoclosing_session(self.sessionmaker) as session: + if session.query(Player).filter_by(uuid=uuid, logged_in=True).first(): + raise AlreadyLoggedIn + if self.check_bans(ip): + raise Banned + while self.whois(name): + logger.info("Got a duplicate player, affixing _ to name") + name += "_" + player = session.query(Player).filter_by(uuid=uuid).first() + if player: + if player.name != name: + logger.info("Detected username change.") + player.name = name + if ip not in player.ips: + player.ips.append(IPAddress(ip=ip)) + player.ip = ip + player.protocol = protocol + else: + logger.info("Adding new player with name: %s" % name) + player = Player(uuid=uuid, name=name, + last_seen=datetime.datetime.now(), + access_level=int(UserLevels.GUEST), + logged_in=False, + protocol=protocol, + client_id=-1, + ip=ip, + planet="", + on_ship=True) + player.ips = [IPAddress(ip=ip)] + session.add(player) + if uuid == self.config.owner_uuid: + player.access_level = int(UserLevels.OWNER) + + session.commit() + + return self._cache_and_return_from_session(session, player) + + def delete(self, player_cache): + with _autoclosing_session(self.sessionmaker) as session: + session.delete(player_cache.record) + session.commit() def who(self): - return self.session.query(Player).filter_by(logged_in=True).all() + with _autoclosing_session(self.sessionmaker) as session: + return self._cache_and_return_from_session( + session, + session.query(Player).filter_by(logged_in=True).all(), + collection=True, + ) + + def all(self): + with _autoclosing_session(self.sessionmaker) as session: + return self._cache_and_return_from_session( + session, + session.query(Player).all(), + collection=True, + ) + + def all_like(self, regex): + with _autoclosing_session(self.sessionmaker) as session: + return self._cache_and_return_from_session( + session, + session.query(Player).filter(Player.name.like(regex)).all(), + collection=True, + ) def whois(self, name): - return self.session.query(Player).filter(Player.logged_in == True, - func.lower(Player.name) == func.lower(name)).first() + with _autoclosing_session(self.sessionmaker) as session: + return self._cache_and_return_from_session( + session, + session.query(Player).filter( + Player.logged_in is True, + func.lower(Player.name) == func.lower(name), + ).first(), + ) def check_bans(self, ip): - return self.session.query(Ban).filter_by(ip=ip).first() is not None + with _autoclosing_session(self.sessionmaker) as session: + return session.query(Ban).filter_by(ip=ip).first() is not None def ban(self, ip): - self.session.add(Ban(ip=ip)) - try: - self.session.commit() - except: - self.session.rollback() - raise + with _autoclosing_session(self.sessionmaker) as session: + session.add(Ban(ip=ip)) + session.commit() + + @property + def bans(self): + with _autoclosing_session(self.sessionmaker) as session: + return self._cache_and_return_from_session( + session, + session.query(Ban).all(), + ) + + def delete_ban(self, ban_cache): + with _autoclosing_session(self.sessionmaker) as session: + session.delete(ban_cache.record) + session.commit() def get_by_name(self, name): - return self.session.query(Player).filter(func.lower(Player.name) == func.lower(name)).first() + with _autoclosing_session(self.sessionmaker) as session: + return self._cache_and_return_from_session( + session, + session.query(Player).filter(func.lower(Player.name) == func.lower(name)).first(), + ) def get_logged_in_by_name(self, name): - return self.session.query(Player).filter(Player.logged_in == True, - func.lower(Player.name) == func.lower(name)).first() + with _autoclosing_session(self.sessionmaker) as session: + return self._cache_and_return_from_session( + session, + session.query(Player).filter( + Player.logged_in, + func.lower(Player.name) == func.lower(name), + ).first(), + ) def permissions(level=UserLevels.OWNER): diff --git a/plugins/core/player_manager/plugin.py b/plugins/core/player_manager/plugin.py index 41d3e51..a2b248b 100644 --- a/plugins/core/player_manager/plugin.py +++ b/plugins/core/player_manager/plugin.py @@ -26,7 +26,7 @@ def deactivate(self): del self.player_manager def check_logged_in(self): - for player in self.player_manager.session.query(Player).filter_by(logged_in=True).all(): + for player in self.player_manager.who(): if player.protocol not in self.factory.protocols.keys(): player.logged_in = False @@ -45,8 +45,11 @@ def on_client_connect(self, data): name=client_data.name, uuid=str(client_data.uuid), ip=self.protocol.transport.getPeer().host, - protocol=self.protocol.id) + protocol=self.protocol.id, + ) + return True + except AlreadyLoggedIn: self.reject_with_reason( "You're already logged in! If this is not the case, please wait 10 seconds and try again.") @@ -123,17 +126,16 @@ def delete_player(self, data): self.protocol.send_chat_message( "Couldn't find a player named %s. Please check the spelling and try again." % name) return False - self.player_manager.session.delete(player) + self.player_manager.delete(player) self.protocol.send_chat_message("Deleted player with name %s." % name) @permissions(UserLevels.ADMIN) def list_players(self, data): if len(data) == 0: - self.format_player_response(self.player_manager.session.query(Player).all()) + self.format_player_response(self.player_manager.all()) else: rx = re.sub(r"[\*]", "%", " ".join(data)) - self.format_player_response( - self.player_manager.session.query(Player).filter(Player.name.like(rx)).all()) + self.format_player_response(self.player_manager.all_like(rx)) def format_player_response(self, players): if len(players) <= 25: diff --git a/plugins/planet_protect/planet_protect_plugin.py b/plugins/planet_protect/planet_protect_plugin.py index 7981779..394c3eb 100644 --- a/plugins/planet_protect/planet_protect_plugin.py +++ b/plugins/planet_protect/planet_protect_plugin.py @@ -76,4 +76,4 @@ def on_entity_create(self, data): self.logger.trace( "Player %s attempted to use a prohibited projectile, %s, on a protected planet.", self.protocol.player.name, p_type) - return False \ No newline at end of file + return False diff --git a/server.py b/server.py index 3a09ea3..dddc8aa 100644 --- a/server.py +++ b/server.py @@ -27,6 +27,21 @@ logging.Logger.trace = lambda s, m, *a, **k: s._log(TRACE_LVL, m, a, **k) +def port_check(upstream_hostname, upstream_port): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(1) + result = sock.connect_ex((upstream_hostname, upstream_port)) + + if result != 0: + sock.close() + return False + else: + sock.shutdown(SHUT_RDWR) + sock.close() + + return True + + class StarryPyServerProtocol(Protocol): """ The main protocol class for handling connections from Starbound clients. @@ -115,7 +130,7 @@ def connectionMade(self): def string_received(self, packet): """ - This method is called whenever a completed packet is received from the + This method is called whenever a completed packet is received from the client going to the Starbound server. This is the first and only time where these packets can be modified, stopped, or allowed. @@ -415,6 +430,7 @@ def send_chat_message(self, text, channel=0, world='', name=''): logger.trace("Built chat packet. Data: %s", chat_packet.encode("hex")) self.transport.write(chat_packet) logger.debug("Sent chat message with text: %s", text) + def write(self, data): """ Convenience method to send data to the client. @@ -440,6 +456,7 @@ def connectionLost(self, reason=connectionDone): except: self.logger.trace("Protocol was not in factory list. This should not happen.") + class ClientProtocol(Protocol): """ The protocol class which handles the connection to the Starbound server. @@ -460,7 +477,6 @@ def connectionMade(self): """ self.server_protocol.client_protocol = self - def string_received(self, packet): """ This method is called whenever a completed packet is received from the @@ -482,7 +498,6 @@ def string_received(self, packet): self.server_protocol.write( packet.original_data) - def dataReceived(self, data): """ Called whenever a packet is received. Generally this should not be @@ -504,6 +519,7 @@ def disconnect(self): self.transport.write(x) self.transport.abortConnection() + class StarryPyServerFactory(ServerFactory): """ Factory which creates `StarryPyServerProtocol` instances. @@ -569,7 +585,6 @@ def broadcast_planet(self, text, planet, name=''): except: logger.exception("Exception in broadcast.") - def buildProtocol(self, address): """ Builds the protocol to a given address. @@ -623,12 +638,14 @@ def buildProtocol(self, address): if __name__ == '__main__': logger = logging.getLogger('starrypy') logger.setLevel(9) + if TRACE: trace_logger = logging.FileHandler("trace.log") trace_logger.setLevel("TRACE") trace_logger.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")) logger.addHandler(trace_logger) logger.trace("Initialized trace logger.") + fh_d = logging.FileHandler("debug.log") fh_d.setLevel(logging.DEBUG) fh_w = logging.FileHandler("server.log") @@ -647,28 +664,29 @@ def buildProtocol(self, address): fh_d.setFormatter(debugfile_formatter) fh_w.setFormatter(logfile_formatter) sh.setFormatter(console_formatter) + if config.port_check: logger.debug("Port check enabled. Performing port check to %s:%d", config.upstream_hostname, config.upstream_port) - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.settimeout(1) - result = sock.connect_ex((config.upstream_hostname, config.upstream_port)) - if result != 0: + + if port_check(config.upstream_hostname, config.upstream_port): logger.critical("The starbound server is not connectable at the address %s:%d." % ( config.upstream_hostname, config.upstream_port)) logger.critical( "Please ensure that you are running starbound_server on the correct port and that is reflected in the StarryPy configuration.") sys.exit() - sock.shutdown(SHUT_RDWR) - sock.close() + logger.debug("Port check succeeded. Continuing.") + logger.info("Started StarryPy server version %s" % VERSION) factory = StarryPyServerFactory() logger.debug("Attempting to listen on TCP port %d", factory.config.bind_port) + try: reactor.listenTCP(factory.config.bind_port, factory, interface=factory.config.bind_address) except CannotListenError: logger.critical("Cannot listen on TCP port %d. Exiting.", factory.config.bind_port) sys.exit() + logger.info("Listening on port %s" % factory.config.bind_port) reactor.run() From 3e559cbb7e57817640d36718a50f9343babf35f1 Mon Sep 17 00:00:00 2001 From: slitherrr Date: Mon, 10 Feb 2014 17:33:40 -0800 Subject: [PATCH 2/2] Fix for plugin_storage --- plugins/core/player_manager/manager.py | 4 ++-- .../new_player_greeter_plugin/new_player_greeter_plugin.py | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/plugins/core/player_manager/manager.py b/plugins/core/player_manager/manager.py index a86c680..235f031 100644 --- a/plugins/core/player_manager/manager.py +++ b/plugins/core/player_manager/manager.py @@ -164,7 +164,7 @@ def colored_name(self, colors): @property def storage(self): - caller = inspect.stack()[1][0].f_locals["self"].__class__.name + caller = inspect.stack()[2][0].f_locals["self"].__class__.name if self.plugin_storage is None: self.plugin_storage = {} try: @@ -175,7 +175,7 @@ def storage(self): @storage.setter def storage(self, store): - caller = inspect.stack()[1][0].f_locals["self"].__class__.name + caller = inspect.stack()[2][0].f_locals["self"].__class__.name self.plugin_storage[caller] = store def as_dict(self): diff --git a/plugins/new_player_greeter_plugin/new_player_greeter_plugin.py b/plugins/new_player_greeter_plugin/new_player_greeter_plugin.py index 2c33183..a97e6e3 100644 --- a/plugins/new_player_greeter_plugin/new_player_greeter_plugin.py +++ b/plugins/new_player_greeter_plugin/new_player_greeter_plugin.py @@ -16,9 +16,10 @@ def after_connect_response(self, data): if self.protocol.player is not None and self.protocol.player.logged_in: my_storage = self.protocol.player.storage if not 'given_starter_items' in my_storage or my_storage['given_starter_items'] == "False": - my_storage['given_starter_items'] = "True" self.give_items() self.send_greetings() + my_storage['given_starter_items'] = "True" + self.protocol.player.storage = my_storage self.logger.info("Gave starter items to %s.", self.protocol.player.name) def give_items(self):