From beb018ec9ef151780717e0f8c99032ff7dfa21be Mon Sep 17 00:00:00 2001 From: cosven Date: Tue, 13 Feb 2018 20:52:44 +0800 Subject: [PATCH] add live lyric --- .travis.yml | 22 ++-- fuocore/__main__.py | 24 ---- fuocore/daemon/__init__.py | 5 - .../{tcp_server.py => aio_tcp_server.py} | 0 .../{lyric_live_pub.py => live_lyric.py} | 17 ++- fuocore/daemon/main.py | 81 +++++++----- fuocore/daemon/pubsub.py | 119 ++++++++++++++++++ fuocore/daemon/thread_tcp_server.py | 33 +++++ fuocore/dispatch.py | 1 + fuocore/player.py | 1 + fuocore/protocol/__init__.py | 0 .../{daemon => protocol}/handlers/__init__.py | 2 +- .../{daemon => protocol}/handlers/helpers.py | 0 fuocore/{daemon => protocol}/handlers/show.py | 4 +- fuocore/{daemon => protocol}/parser.py | 0 makefile | 1 + setup.py | 8 +- tests/test_signal.py | 10 +- 18 files changed, 229 insertions(+), 99 deletions(-) delete mode 100644 fuocore/__main__.py rename fuocore/daemon/{tcp_server.py => aio_tcp_server.py} (100%) rename fuocore/daemon/{lyric_live_pub.py => live_lyric.py} (62%) create mode 100644 fuocore/daemon/pubsub.py create mode 100644 fuocore/daemon/thread_tcp_server.py create mode 100644 fuocore/protocol/__init__.py rename fuocore/{daemon => protocol}/handlers/__init__.py (98%) rename fuocore/{daemon => protocol}/handlers/helpers.py (100%) rename fuocore/{daemon => protocol}/handlers/show.py (97%) rename fuocore/{daemon => protocol}/parser.py (100%) diff --git a/.travis.yml b/.travis.yml index 4d58b8c..79a89fe 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,18 +1,10 @@ -sudo: required +language: python -env: - - ARCH_TRAVIS_VERBOSE=1 - -arch: - packages: - - python3 - - python-pip - - mpv - script: - - python --version - - sudo pip install coveralls - - coverage run --source=fuocore setup.py test - - coveralls +python: + - "3.5" + - "3.6" script: - - "curl -s https://raw.githubusercontent.com/mikkeloscar/arch-travis/master/arch-travis.sh | bash" + - pip install coveralls + - coverage run --source=fuocore setup.py test + - coveralls diff --git a/fuocore/__main__.py b/fuocore/__main__.py deleted file mode 100644 index d4a8514..0000000 --- a/fuocore/__main__.py +++ /dev/null @@ -1,24 +0,0 @@ -import asyncio -import logging -import sys - -from fuocore import setup_logger -from fuocore.app import App -from fuocore.daemon import run, run_live_lyric_pubsub - - -def main(): - debug = '--debug' in sys.argv - setup_logger(debug=debug) - logger = logging.getLogger(__name__) - logger.info('{} mode.'.format('Debug' if debug else 'Release')) - app = App() - app.initialize() - event_loop = asyncio.get_event_loop() - event_loop.create_task(run(app)) - event_loop.create_task(run_live_lyric_pubsub(app)) - event_loop.run_forever() - - -if __name__ == '__main__': - main() diff --git a/fuocore/daemon/__init__.py b/fuocore/daemon/__init__.py index fa34fef..e69de29 100644 --- a/fuocore/daemon/__init__.py +++ b/fuocore/daemon/__init__.py @@ -1,5 +0,0 @@ -from fuocore.daemon.main import run # noqa -from fuocore.daemon.main import run_live_lyric_pubsub # noqa - - -__all__ = ('run', 'run_live_lyric_pubsub') diff --git a/fuocore/daemon/tcp_server.py b/fuocore/daemon/aio_tcp_server.py similarity index 100% rename from fuocore/daemon/tcp_server.py rename to fuocore/daemon/aio_tcp_server.py diff --git a/fuocore/daemon/lyric_live_pub.py b/fuocore/daemon/live_lyric.py similarity index 62% rename from fuocore/daemon/lyric_live_pub.py rename to fuocore/daemon/live_lyric.py index 0c75356..ffeb75e 100644 --- a/fuocore/daemon/lyric_live_pub.py +++ b/fuocore/daemon/live_lyric.py @@ -1,4 +1,3 @@ -import asyncio import logging from fuocore.lyric import parse @@ -9,8 +8,8 @@ class LiveLyric(object): - def __init__(self, send_func): - self.send = send_func + def __init__(self, on_changed): + self.on_changed = on_changed self._lyric = None self._pos_s_map = {} # position sentence map @@ -19,22 +18,20 @@ def __init__(self, send_func): # TODO: performance optimization? def on_position_changed(self, position): - logger.info('position -------------------') if not self._lyric: return - if self._pos is None: - pos = find_previous(position, self._pos_list) - if pos is not None and pos != self._pos: - self.send(self._pos_s_map[pos]) + pos = find_previous(position*1000 + 300, self._pos_list) + if pos is not None and pos != self._pos: + self.on_changed(self._pos_s_map[pos]) + self._pos = pos def on_song_changed(self, song): - logger.info('song !!!!!!!!!!!!!!!!!!!!!') if song.lyric is None: self._lyric = None self._pos_s_map = {} else: self._lyric = song.lyric.content self._pos_s_map = parse(self._lyric) - logger.info('Lyric have been changed.') + self._pos_list = sorted(list(self._pos_s_map.keys())) self._pos = None diff --git a/fuocore/daemon/main.py b/fuocore/daemon/main.py index 2746dae..b536dee 100644 --- a/fuocore/daemon/main.py +++ b/fuocore/daemon/main.py @@ -1,11 +1,15 @@ import asyncio import logging +import sys -from fuocore.daemon.tcp_server import TcpServer -from fuocore.daemon.parser import CmdParser -from fuocore.daemon.handlers import exec_cmd -from fuocore.daemon.lyric_live_pub import LiveLyric +from .aio_tcp_server import TcpServer +from .live_lyric import LiveLyric +from .pubsub import run as run_pubsub +from fuocore import setup_logger +from fuocore.app import App +from fuocore.protocol.parser import CmdParser +from fuocore.protocol.handlers import exec_cmd logger = logging.getLogger(__name__) @@ -31,31 +35,6 @@ async def handle(app, conn, addr): event_loop.sock_sendall(conn, bytes(msg, 'utf-8')) -async def handle_live_lyric(app, conn, addr): - logger.info('live lyric') - event_loop = asyncio.get_event_loop() - event_loop.sock_sendall(conn, b'OK feeluown live lyric\n') - player = app.player - playlist = app.playlist - - q = asyncio.Queue(1) - - def send(s): - q.put_nowait(s) - - livelyric = LiveLyric(send_func=send) - player.position_changed.connect(app.livelyric.on_position_changed) - playlist.song_changed.connect(app.livelyric.on_song_changed) - while True: - s = await q.get() - try: - event_loop.sock_sendall(s) - except Exception as e: - conn.close() - import pdb; pdb.set_trace() - break - - async def run(app, *args, **kwargs): port = 23333 host = '0.0.0.0' @@ -64,9 +43,43 @@ async def run(app, *args, **kwargs): TcpServer(host, port, handle_func=handle).run(app)) -async def run_live_lyric_pubsub(app, *args, **kwargs): - port = 22332 - host = '0.0.0.0' +class LiveLyricPublisher(object): + topic = 'topic.live_lyric' + + def __init__(self, gateway): + self.gateway = gateway + gateway.add_topic(self.topic) + + def publish(self, sentence): + self.gateway.publish(sentence + '\n', self.topic) + + +def main(): + debug = '--debug' in sys.argv + setup_logger(debug=debug) + logger = logging.getLogger(__name__) + logger.info('{} mode.'.format('Debug' if debug else 'Release')) + + app = App() + app.initialize() + event_loop = asyncio.get_event_loop() - event_loop.create_task( - TcpServer(host, port, handle_func=handle_live_lyric).run(app)) + event_loop.create_task(run(app)) + + pubsub_gateway, pubsub_server = run_pubsub() # runs in another thread + live_lyric_publisher = LiveLyricPublisher(pubsub_gateway) + + live_lyric = LiveLyric(on_changed=live_lyric_publisher.publish) + app.player.position_changed.connect(live_lyric.on_position_changed) + app.playlist.song_changed.connect(live_lyric.on_song_changed) + + try: + event_loop.run_forever() + except KeyboardInterrupt: + # NOTE: gracefully shutdown? + pubsub_server.close() + event_loop.close() + + +if __name__ == '__main__': + main() diff --git a/fuocore/daemon/pubsub.py b/fuocore/daemon/pubsub.py new file mode 100644 index 0000000..58a91eb --- /dev/null +++ b/fuocore/daemon/pubsub.py @@ -0,0 +1,119 @@ +from collections import defaultdict +import logging +from threading import Thread + +from fuocore.daemon.thread_tcp_server import TcpServer + + +logger = logging.getLogger(__name__) + + +class DeadSubscriber(Exception): + pass + + +class Subscriber(object): + def __init__(self, addr, conn): + self._addr = addr + self._conn = conn + + def __eq__(self, obj): + return self._addr == obj._addr + + def __hash__(self): + return id(self._addr) + + +def sendto_subscriber(subscriber, msg): + try: + subscriber._conn.send(bytes(msg, 'utf-8')) + except BrokenPipeError: + subscriber._conn.close() + del subscriber + raise DeadSubscriber + + +class Gateway(object): + def __init__(self): + self.topics = set() + self._relations = defaultdict(set) # {'topic': subscriber_set} + + def add_topic(self, topic): + self.topics.add(topic) + + def remove_topic(self, topic): + if topic in self.topics: + self.topics.remove(topic) + + def link(self, topic, subscriber): + self._relations[topic].add(subscriber) + + def unlink(self, topic, subscriber): + if topic in self.topics and subscriber in self._relations[topic]: + self._relations[topic].remove(subscriber) + + def remove_subscriber(self, subscriber): + for topic in self.topics: + if subscriber in self._relations[topic]: + self._relations[topic].remove(subscriber) + + def publish(self, msg, topic): + # NOTE: use queue? maybe. + subscribers = self._relations[topic] + for subscriber in subscribers: + try: + sendto_subscriber(subscriber, msg) + except DeadSubscriber: + # NOTE: need lock? + self._relations[topic].remove(subscriber) + break + + +def handle(conn, addr, gateway, *args, **kwargs): + """ + NOTE: use tcp instead of udp because some operations need ack + """ + conn.sendall(b'OK feeluown live lyric\n') + while True: + try: + s = conn.recv(1024).decode('utf-8').strip() + if not s: + conn.close() + break + except ConnectionResetError: + logger.debug('Client close the connection.') + break + + parts = s.split(' ') + if len(parts) != 2: + conn.send(b"Invalid command\n") + continue + cmd, topic = parts + if cmd.lower() != 'sub': + conn.send(bytes("Unknown command '{}'\n".format(cmd.lower()), 'utf-8')) + continue + if topic not in gateway.topics: + conn.send(bytes("Unknown topic '{}'\n".format(topic), 'utf-8')) + continue + subscriber = Subscriber(addr, conn) + gateway.link(topic, subscriber) + break + + +def run(host='0.0.0.0', port=23334): + gateway = Gateway() + server = TcpServer(host, port, handle_func=handle) + Thread(target=server.run, args=(gateway,)).start() + logger.info('run pubsub server in {host}:{port}'.format( + host=host, port=port)) + return gateway, server + + +if __name__ == '__main__': + import time + gateway, server = run() + print('pubsub is running.') + gateway.add_topic('topic.live_lyric') + while True: + time.sleep(1) + gateway.publish('miao\n', 'topic.live_lyric') diff --git a/fuocore/daemon/thread_tcp_server.py b/fuocore/daemon/thread_tcp_server.py new file mode 100644 index 0000000..64502dd --- /dev/null +++ b/fuocore/daemon/thread_tcp_server.py @@ -0,0 +1,33 @@ +import logging +import socket +from threading import Thread + +logger = logging.getLogger(__name__) + + +class TcpServer(object): + def __init__(self, host, port, handle_func): + self.host = host + self.port = port + self.handle_func = handle_func + self.sock = None + + def run(self, *args, **kwargs): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.bind((self.host, self.port)) + sock.listen() + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.sock = sock + while True: + try: + conn, addr = sock.accept() + except ConnectionError as e: + logger.warning(e) + break + logger.info('{} connected.'.format(addr)) + Thread(target=self.handle_func, args=(conn, addr, *args), + kwargs=kwargs).start() + + def close(self): + if self.sock is not None: + self.sock.close() diff --git a/fuocore/dispatch.py b/fuocore/dispatch.py index b8f1d68..86df001 100644 --- a/fuocore/dispatch.py +++ b/fuocore/dispatch.py @@ -12,6 +12,7 @@ def __init__(self, *sig, name=''): def emit(self, *args): for receiver in self.receivers: + # import pdb; pdb.set_trace() try: receiver()(*args) except Exception: diff --git a/fuocore/player.py b/fuocore/player.py index 9844662..2487f7d 100644 --- a/fuocore/player.py +++ b/fuocore/player.py @@ -340,6 +340,7 @@ def initialize(self): ) self._mpv.register_event_callback(lambda event: self._on_event(event)) self.song_finished.connect(self._playlist.next) + logger.info('Player initialize finished.') def quit(self): del self._mpv diff --git a/fuocore/protocol/__init__.py b/fuocore/protocol/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fuocore/daemon/handlers/__init__.py b/fuocore/protocol/handlers/__init__.py similarity index 98% rename from fuocore/daemon/handlers/__init__.py rename to fuocore/protocol/handlers/__init__.py index b14241c..9a86396 100644 --- a/fuocore/daemon/handlers/__init__.py +++ b/fuocore/protocol/handlers/__init__.py @@ -161,4 +161,4 @@ def clear(self): self.app.playlist.clear() -from fuocore.daemon.handlers.show import ShowHandler # noqa +from .show import ShowHandler # noqa diff --git a/fuocore/daemon/handlers/helpers.py b/fuocore/protocol/handlers/helpers.py similarity index 100% rename from fuocore/daemon/handlers/helpers.py rename to fuocore/protocol/handlers/helpers.py diff --git a/fuocore/daemon/handlers/show.py b/fuocore/protocol/handlers/show.py similarity index 97% rename from fuocore/daemon/handlers/show.py rename to fuocore/protocol/handlers/show.py index c13947d..fbd9c54 100644 --- a/fuocore/daemon/handlers/show.py +++ b/fuocore/protocol/handlers/show.py @@ -12,8 +12,8 @@ import re from urllib.parse import urlparse -from fuocore.daemon.handlers import AbstractHandler, CmdHandleException -from fuocore.daemon.handlers.helpers import ( +from . import AbstractHandler, CmdHandleException +from .helpers import ( show_songs, show_song, show_artist, show_album, show_user, show_playlist ) diff --git a/fuocore/daemon/parser.py b/fuocore/protocol/parser.py similarity index 100% rename from fuocore/daemon/parser.py rename to fuocore/protocol/parser.py diff --git a/makefile b/makefile index 80bcbbb..ecabbfc 100644 --- a/makefile +++ b/makefile @@ -14,6 +14,7 @@ unittest: test: lint unittest clean: + find . -name "*~" -exec rm -f {} \; find . -name "*.pyc" -exec rm -f {} \; find . -name "*flymake.py" -exec rm -f {} \; find . -name "\#*.py\#" -exec rm -f {} \; diff --git a/setup.py b/setup.py index 4b59c8b..f9fd5ca 100644 --- a/setup.py +++ b/setup.py @@ -26,7 +26,8 @@ 'fuocore.local', 'fuocore.netease', 'fuocore.daemon', - 'fuocore.daemon.handlers', + 'fuocore.protocol', + 'fuocore.protocol.handlers', ], package_data={ '': [] @@ -39,14 +40,15 @@ 'Programming Language :: Python :: 3 :: Only', ), install_requires=requires, - setup_requires=[], + setup_requires=['pytest-runner'], + test_suite="tests", tests_require=[ 'pytest', 'mock', ], entry_points={ 'console_scripts': [ - 'fuo=fuocore.__main__:main', + 'fuo = fuocore.daemon.main:main', ], 'fuo.provider': [ 'local = fuocore.local.provider:LocalProvider', diff --git a/tests/test_signal.py b/tests/test_signal.py index abbb0c5..634e893 100644 --- a/tests/test_signal.py +++ b/tests/test_signal.py @@ -32,15 +32,15 @@ def test_connect1(self): # pay attention self.assertTrue(self.a1.f == self.a2.f == mock_method_f) s.connect(self.a1.f) - s.emit(arg1=1, arg2='hello') - mock_method_f.assert_called_once_with(arg1=1, arg2='hello') + s.emit(1, 'hello') + mock_method_f.assert_called_once_with(1, 'hello') @mock.patch('test_signal.f', return_value=None) def test_connect2(self, mock_func): s = Signal() s.connect(f) - s.emit(arg1=1, arg2='hello') - s.emit(arg1=1, arg2='hello') + s.emit(1, 'hello') + s.emit(1, 'hello') self.assertEqual(mock_func.call_count, 2) @mock.patch('test_signal.f', return_value=None) @@ -48,7 +48,7 @@ def test_disconnect(self, mock_func): s = Signal() s.connect(f) s.disconnect(f) - s.emit(arg1=1, arg2='hello') + s.emit(1, 'hello') self.assertEqual(mock_func.call_count, 0) @mock.patch.object(Signal, 'connect')