diff --git a/README.md b/README.md index 952357b..58fcf42 100644 --- a/README.md +++ b/README.md @@ -1 +1,149 @@ -# StreamChatAgent \ No newline at end of file +# StreamChatAgent +YouTube chat poller which can get massages very smothly by using internal queue. + +## The user of this library can +- receive YouTube chat messages continiously by registering callback. +- natively obtain high performance by using internal queue. + +## Hou to install +- Clone this repository.
+ ```clone + $ clone https://github.com/GeneralYadoc/StreamChatAgent.git + ``` +- Change directory to the root of the repository.
+ ```cd + $ cd StreamChatAgent + ``` +- Install package to your environment.
+ ```install + pip install . + ``` + +## How to use + +- Sample codes + ``` sample.py + import sys + import re + import StreamChatAgent as sca # Import this. + + # callback for getting YouTube chat items + # You can implement several processes in it. + # This example prints datetime, ahthor name, message, of each item. + def get_item_cb(c): + print(f"{c.datetime} [{c.author.name}]- {c.message}") + + # pre putting queue filter + # You can edit YouTube chat items before putting internal queue. + # You can avoid putting internal queue by returning None. + # This example removes items whose message consists of stamps only. + def pre_filter_cb(c): + return None if re.match(r'^(:[^:]+:)+$', c.message) else c + + # post getting queue filter + # You can edit YouTube chat items after popping internal queue. + # You can avoid sending items to get_item_cb by returning None. + # This example removes stamps from message of items. + def post_filter_cb(c): + c.message = re.sub(r':[^:]+:','', c.message) + return c + + # Video ID is given from command line in this example. + if len(sys.argv) <= 1: + exit(0) + + # Create StreamChatAgent instance. + agent = sca.StreamChatAgent( video_id=sys.argv[1], + get_item_cb=get_item_cb, + pre_filter_cb=pre_filter_cb, + post_filter_cb=post_filter_cb ) + + # Start async getting YouTube chat items. + # Then get_item_cb is called continuosly. + agent.start() + + # Wait any key inputted from keyboad. + input() + + # Finish getting items. + # Internal thread will stop soon. + agent.disconnect() + + # Wait terminating internal threads. + agent.join() + + del agent + ``` + +- Output of the sample + ```output + % ./test.py MB57rMXXXXs + 2023-05-19 05:21:26 [John]- Hello! + 2023-05-19 05:21:27 [Kelly]- Hello everyone! + 2023-05-19 05:21:27 [Taro]- Welcome to our stream. + ``` +## Arguments of Constractor +- StreamChatAgent object can be configured with following arguments of its constractor. + + | name | description | default | + |------|------------|---------| + | video_id | String following after 'v=' in url of target YouTube live | - | + | get_item_cb | Chat items are thrown to this callback | - | + | pre_filter_cb | Filter set before internal queue | None | + | post_filter_cb | Filter set between internal queue and get_item_cb | None | + | max_queue_size | Max slots of internal queue (0 is no limit) | 1000 | + | interval_sec | Polling interval of picking up items from YouTube | 0.01 \[sec\] | + +## Methods +### start() +- Start polling and calling user callbacks asyncronously. +- No arguments required, nothing returns. + +### join() +- Wait terminating internal threads kicked by start(). +- No arguments required, nothing returns. + +### connect() +- Start polling and calling user callbacks syncronously. +- Lines following the call of the method never executen before terminate of internal threads. +- No arguments required, nothing returns. + +### disconnect() +- Request to terminate polling and calling user callbacks. +- Internal process will be terminated soon after. +- No arguments required, nothing returns. + +And other [threading.Thread](https://docs.python.org/3/library/threading.html) public pethods are available. + +## Callbacks +### get_item_callback +- Callback for getting YouTube chat items. +- You can implement several processes in it. +- YouTube chat item is thrown as an argument. +- It's not be assumed that any values are returned. +### pre_filter_callback +- pre putting queue filter. +- YouTube chat item is thrown as an argument. +- You can edit YouTube chat items before putting internal queue. +- It's required that edited chat item is returned. +- You can avoid putting internal queue by returning None. +### post_filter_callback +- post getting queue filter +- You can edit YouTube chat items after popping internal queue. +- It's required that edited chat item is returned. +- You can avoid sending items to get_item_cb by returning None. + + +## Type of YouTube Chat item +- Please refer [pytchat README](https://github.com/taizan-hokuto/pytchat) + +## Concept of design +- Putting thread is separated from getting thread in order to avoid locking polling.
+Unexpected sleep of pytchat may reduce by ths approach. +- If internal queue is full when putting thread try to push new item, oldest item is removed from the queue before pushing. + ![](ReadMeParts/concept.png) + +## Links +StreamingChaatAgent uses following libraries internally. + +- [pytchat](https://github.com/taizan-hokuto/pytchat)   Python library for fetching youtube live chat. diff --git a/ReadMeParts/concept.png b/ReadMeParts/concept.png new file mode 100644 index 0000000..32c6033 Binary files /dev/null and b/ReadMeParts/concept.png differ diff --git a/samples/sample.py b/samples/sample.py new file mode 100644 index 0000000..873d088 --- /dev/null +++ b/samples/sample.py @@ -0,0 +1,51 @@ +import sys +import re +import StreamChatAgent as sca # Import this. + +# callback for getting YouTube chat items +# You can implement several processes in it. +# This example prints datetime, ahthor name, message, of each item. +def get_item_cb(c): + print(f"{c.datetime} [{c.author.name}]- {c.message}") + +# pre putting queue filter +# You can edit YouTube chat items before putting internal queue. +# You can avoid putting internal queue by returning None. +# This example removes items whose message consists of stamps only. +def pre_filter_cb(c): + return None if re.match(r'^(:[^:]+:)+$', c.message) else c + +# post getting queue filter +# You can edit YouTube chat items after popping internal queue. +# You can avoid sending items to get_item_cb by returning None. +# This example removes stamps from message of items. +def post_filter_cb(c): + c.message = re.sub(r':[^:]+:','', c.message) + return c + +# Video ID is given from command line in this example. +if len(sys.argv) <= 1: + exit(0) + +# Create StreamChatAgent instance. +agent = sca.StreamChatAgent( video_id=sys.argv[1], + get_item_cb=get_item_cb, + pre_filter_cb=pre_filter_cb, + post_filter_cb=post_filter_cb ) + +# Start async getting YouTube chat items. +# Then get_item_cb is called continuosly. +agent.start() + +# Wait any key inputted from keyboad. +input() + +# Finish getting items. +# Internal thread will stop soon. +agent.disconnect() + +# Wait terminating internal threads. +agent.join() + +del agent + diff --git a/src/StreamChatAgent.py b/src/StreamChatAgent.py index 517e589..79d3132 100644 --- a/src/StreamChatAgent.py +++ b/src/StreamChatAgent.py @@ -1,72 +1,107 @@ import time import threading import queue +import math import pytchat -class StreamChatAgent: +class StreamChatAgent(threading.Thread): def __init__( self, video_id, get_item_cb, pre_filter_cb=None, post_filter_cb=None, - max_queue_size=0, - interval_sec=0.001 ): - self.get_item_cb = get_item_cb - self.pre_filter_cb = pre_filter_cb - self.post_filter_cb = post_filter_cb - self.item_queue = queue.Queue(max_queue_size) - self.interval_sec = interval_sec - self.alive = False + max_queue_size=1000, + interval_sec=0.01 ): + self.__get_item_cb = get_item_cb + self.__pre_filter_cb = pre_filter_cb + self.__post_filter_cb = post_filter_cb + self.__item_queue = queue.Queue(max_queue_size) + self.__interval_sec = interval_sec + self.__keeping_connection = False - self.chat = pytchat.create(video_id=video_id) + self.__chat = pytchat.create(video_id=video_id) - self.my_put_thread = threading.Thread(target=self.put_items) - self.my_get_thread = threading.Thread(target=self.get_items) + self.__my_put_thread = threading.Thread(target=self.__put_items) + self.__my_get_thread = threading.Thread(target=self.__get_items) + + super(StreamChatAgent, self).__init__(daemon=True) def connect(self): - self.alive = True - self.my_put_thread.start() - self.my_get_thread.start() - self.my_get_thread.join() - self.my_put_thread.join() + self.start() + self.join() + + def run(self): + self.__keeping_connection = True + self.__my_put_thread.start() + self.__my_get_thread.start() + self.__my_get_thread.join() + self.__my_put_thread.join() def disconnect(self): - self.alive = False + self.__keeping_connection = False - def is_chat_alive(self, immediate=False, retry_count=5, sleep=1.0): + def __is_alive(self, immediate=True, wait_sec=0): + if not self.__my_get_thread.is_alive() and not self.__my_put_thread.is_alive(): + return False + + retry_count = math.floor(wait_sec / 0.01) if immediate: retry_count=1 + + steps = 0 if retry_count == 0 else 1 i = 0 while True: - if not self.alive: + if not self.__keeping_connection: return False - if self.chat.is_alive(): + if self.__chat.is_alive(): return True - i += 1 - if retry_count > 0 and i >= retry_count: + i += steps + if retry_count != 0 and i >= retry_count: return False - time.sleep(sleep) + time.sleep(0.01) - def put_items(self): - while self.alive and self.is_chat_alive(): - for c in self.chat.get().sync_items(): - if not self.alive: + def __put_items(self): + start_time = time.time() + while self.__is_alive(immediate=False): + for c in self.__chat.get().sync_items(): + if not self.__keeping_connection: break prefiltered_c = c - if self.pre_filter_cb: - prefiltered_c = self.pre_filter_cb(c) + if self.__pre_filter_cb: + prefiltered_c = self.__pre_filter_cb(c) if prefiltered_c: - if self.item_queue.full(): - self.item_queue.get() - self.item_queue.put(prefiltered_c) + if self.__item_queue.full(): + self.__item_queue.get() + self.__item_queue.put(prefiltered_c) + self.__sleep_from(start_time) + start_time = time.time() + self.__keeping_connection = False + - def get_items(self): - while self.alive and self.is_chat_alive(): - while self.alive and not self.item_queue.empty(): - c = self.item_queue.get() + def __get_items(self): + start_time = time.time() + while self.__is_alive(immediate=False): + while self.__keeping_connection and not self.__item_queue.empty(): + c = self.__item_queue.get() postfiltered_c = c - if self.post_filter_cb: - postfiltered_c = self.post_filter_cb(c) + if self.__post_filter_cb: + postfiltered_c = self.__post_filter_cb(c) if postfiltered_c: - self.get_item_cb(postfiltered_c) - time.sleep(self.interval_sec) + self.__get_item_cb(postfiltered_c) + self.__sleep_from(start_time, 0.01) + start_time = time.time() + self.__keeping_connection = False + + def __sleep_from(self, start_time, interval_sec=None): + if not interval_sec: + interval_sec = self.__interval_sec + cur_time = time.time() + sleep = interval_sec - (cur_time - start_time) + if sleep > 0.: + sleep_counter = math.floor(sleep * 10) + sleep_frac = sleep - (sleep_counter / 10.) + for i in range(sleep_counter): + if not self.__keeping_connection: + break + time.sleep(0.1) + time.sleep(sleep_frac) diff --git a/__init__.py b/src/__init__.py similarity index 100% rename from __init__.py rename to src/__init__.py