Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 149 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,149 @@
# StreamChatAgent
# StreamChatAgent
YouTube chat poller which can get massages very smothly by using internal queue.

## The user of this library can
- receive YouTube chat messages continiously by registering callback.
- natively obtain high performance by using internal queue.

## Hou to install
- Clone this repository.<br>
```clone
$ clone https://github.com/GeneralYadoc/StreamChatAgent.git
```
- Change directory to the root of the repository.<br>
```cd
$ cd StreamChatAgent
```
- Install package to your environment.<br>
```install
pip install .
```

## How to use

- Sample codes
``` sample.py
import sys
import re
import StreamChatAgent as sca # Import this.

# callback for getting YouTube chat items
# You can implement several processes in it.
# This example prints datetime, ahthor name, message, of each item.
def get_item_cb(c):
print(f"{c.datetime} [{c.author.name}]- {c.message}")

# pre putting queue filter
# You can edit YouTube chat items before putting internal queue.
# You can avoid putting internal queue by returning None.
# This example removes items whose message consists of stamps only.
def pre_filter_cb(c):
return None if re.match(r'^(:[^:]+:)+$', c.message) else c

# post getting queue filter
# You can edit YouTube chat items after popping internal queue.
# You can avoid sending items to get_item_cb by returning None.
# This example removes stamps from message of items.
def post_filter_cb(c):
c.message = re.sub(r':[^:]+:','', c.message)
return c

# Video ID is given from command line in this example.
if len(sys.argv) <= 1:
exit(0)

# Create StreamChatAgent instance.
agent = sca.StreamChatAgent( video_id=sys.argv[1],
get_item_cb=get_item_cb,
pre_filter_cb=pre_filter_cb,
post_filter_cb=post_filter_cb )

# Start async getting YouTube chat items.
# Then get_item_cb is called continuosly.
agent.start()

# Wait any key inputted from keyboad.
input()

# Finish getting items.
# Internal thread will stop soon.
agent.disconnect()

# Wait terminating internal threads.
agent.join()

del agent
```

- Output of the sample
```output
% ./test.py MB57rMXXXXs
2023-05-19 05:21:26 [John]- Hello!
2023-05-19 05:21:27 [Kelly]- Hello everyone!
2023-05-19 05:21:27 [Taro]- Welcome to our stream.
```
## Arguments of Constractor
- StreamChatAgent object can be configured with following arguments of its constractor.

| name | description | default |
|------|------------|---------|
| video_id | String following after 'v=' in url of target YouTube live | - |
| get_item_cb | Chat items are thrown to this callback | - |
| pre_filter_cb | Filter set before internal queue | None |
| post_filter_cb | Filter set between internal queue and get_item_cb | None |
| max_queue_size | Max slots of internal queue (0 is no limit) | 1000 |
| interval_sec | Polling interval of picking up items from YouTube | 0.01 \[sec\] |

## Methods
### start()
- Start polling and calling user callbacks asyncronously.
- No arguments required, nothing returns.

### join()
- Wait terminating internal threads kicked by start().
- No arguments required, nothing returns.

### connect()
- Start polling and calling user callbacks syncronously.
- Lines following the call of the method never executen before terminate of internal threads.
- No arguments required, nothing returns.

### disconnect()
- Request to terminate polling and calling user callbacks.
- Internal process will be terminated soon after.
- No arguments required, nothing returns.

And other [threading.Thread](https://docs.python.org/3/library/threading.html) public pethods are available.

## Callbacks
### get_item_callback
- Callback for getting YouTube chat items.
- You can implement several processes in it.
- YouTube chat item is thrown as an argument.
- It's not be assumed that any values are returned.
### pre_filter_callback
- pre putting queue filter.
- YouTube chat item is thrown as an argument.
- You can edit YouTube chat items before putting internal queue.
- It's required that edited chat item is returned.
- You can avoid putting internal queue by returning None.
### post_filter_callback
- post getting queue filter
- You can edit YouTube chat items after popping internal queue.
- It's required that edited chat item is returned.
- You can avoid sending items to get_item_cb by returning None.


## Type of YouTube Chat item
- Please refer [pytchat README](https://github.com/taizan-hokuto/pytchat)

## Concept of design
- Putting thread is separated from getting thread in order to avoid locking polling.<br>
Unexpected sleep of pytchat may reduce by ths approach.
- If internal queue is full when putting thread try to push new item, oldest item is removed from the queue before pushing.
![](ReadMeParts/concept.png)

## Links
StreamingChaatAgent uses following libraries internally.

- [pytchat](https://github.com/taizan-hokuto/pytchat) &emsp; Python library for fetching youtube live chat.
Binary file added ReadMeParts/concept.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
51 changes: 51 additions & 0 deletions samples/sample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import sys
import re
import StreamChatAgent as sca # Import this.

# callback for getting YouTube chat items
# You can implement several processes in it.
# This example prints datetime, ahthor name, message, of each item.
def get_item_cb(c):
print(f"{c.datetime} [{c.author.name}]- {c.message}")

# pre putting queue filter
# You can edit YouTube chat items before putting internal queue.
# You can avoid putting internal queue by returning None.
# This example removes items whose message consists of stamps only.
def pre_filter_cb(c):
return None if re.match(r'^(:[^:]+:)+$', c.message) else c

# post getting queue filter
# You can edit YouTube chat items after popping internal queue.
# You can avoid sending items to get_item_cb by returning None.
# This example removes stamps from message of items.
def post_filter_cb(c):
c.message = re.sub(r':[^:]+:','', c.message)
return c

# Video ID is given from command line in this example.
if len(sys.argv) <= 1:
exit(0)

# Create StreamChatAgent instance.
agent = sca.StreamChatAgent( video_id=sys.argv[1],
get_item_cb=get_item_cb,
pre_filter_cb=pre_filter_cb,
post_filter_cb=post_filter_cb )

# Start async getting YouTube chat items.
# Then get_item_cb is called continuosly.
agent.start()

# Wait any key inputted from keyboad.
input()

# Finish getting items.
# Internal thread will stop soon.
agent.disconnect()

# Wait terminating internal threads.
agent.join()

del agent

117 changes: 76 additions & 41 deletions src/StreamChatAgent.py
Original file line number Diff line number Diff line change
@@ -1,72 +1,107 @@
import time
import threading
import queue
import math
import pytchat

class StreamChatAgent:
class StreamChatAgent(threading.Thread):
def __init__( self,
video_id,
get_item_cb,
pre_filter_cb=None,
post_filter_cb=None,
max_queue_size=0,
interval_sec=0.001 ):
self.get_item_cb = get_item_cb
self.pre_filter_cb = pre_filter_cb
self.post_filter_cb = post_filter_cb
self.item_queue = queue.Queue(max_queue_size)
self.interval_sec = interval_sec
self.alive = False
max_queue_size=1000,
interval_sec=0.01 ):
self.__get_item_cb = get_item_cb
self.__pre_filter_cb = pre_filter_cb
self.__post_filter_cb = post_filter_cb
self.__item_queue = queue.Queue(max_queue_size)
self.__interval_sec = interval_sec
self.__keeping_connection = False

self.chat = pytchat.create(video_id=video_id)
self.__chat = pytchat.create(video_id=video_id)

self.my_put_thread = threading.Thread(target=self.put_items)
self.my_get_thread = threading.Thread(target=self.get_items)
self.__my_put_thread = threading.Thread(target=self.__put_items)
self.__my_get_thread = threading.Thread(target=self.__get_items)

super(StreamChatAgent, self).__init__(daemon=True)

def connect(self):
self.alive = True
self.my_put_thread.start()
self.my_get_thread.start()
self.my_get_thread.join()
self.my_put_thread.join()
self.start()
self.join()

def run(self):
self.__keeping_connection = True
self.__my_put_thread.start()
self.__my_get_thread.start()
self.__my_get_thread.join()
self.__my_put_thread.join()

def disconnect(self):
self.alive = False
self.__keeping_connection = False

def is_chat_alive(self, immediate=False, retry_count=5, sleep=1.0):
def __is_alive(self, immediate=True, wait_sec=0):
if not self.__my_get_thread.is_alive() and not self.__my_put_thread.is_alive():
return False

retry_count = math.floor(wait_sec / 0.01)
if immediate:
retry_count=1

steps = 0 if retry_count == 0 else 1
i = 0
while True:
if not self.alive:
if not self.__keeping_connection:
return False
if self.chat.is_alive():
if self.__chat.is_alive():
return True
i += 1
if retry_count > 0 and i >= retry_count:
i += steps
if retry_count != 0 and i >= retry_count:
return False
time.sleep(sleep)
time.sleep(0.01)

def put_items(self):
while self.alive and self.is_chat_alive():
for c in self.chat.get().sync_items():
if not self.alive:
def __put_items(self):
start_time = time.time()
while self.__is_alive(immediate=False):
for c in self.__chat.get().sync_items():
if not self.__keeping_connection:
break
prefiltered_c = c
if self.pre_filter_cb:
prefiltered_c = self.pre_filter_cb(c)
if self.__pre_filter_cb:
prefiltered_c = self.__pre_filter_cb(c)
if prefiltered_c:
if self.item_queue.full():
self.item_queue.get()
self.item_queue.put(prefiltered_c)
if self.__item_queue.full():
self.__item_queue.get()
self.__item_queue.put(prefiltered_c)
self.__sleep_from(start_time)
start_time = time.time()
self.__keeping_connection = False


def get_items(self):
while self.alive and self.is_chat_alive():
while self.alive and not self.item_queue.empty():
c = self.item_queue.get()
def __get_items(self):
start_time = time.time()
while self.__is_alive(immediate=False):
while self.__keeping_connection and not self.__item_queue.empty():
c = self.__item_queue.get()
postfiltered_c = c
if self.post_filter_cb:
postfiltered_c = self.post_filter_cb(c)
if self.__post_filter_cb:
postfiltered_c = self.__post_filter_cb(c)
if postfiltered_c:
self.get_item_cb(postfiltered_c)
time.sleep(self.interval_sec)
self.__get_item_cb(postfiltered_c)
self.__sleep_from(start_time, 0.01)
start_time = time.time()
self.__keeping_connection = False

def __sleep_from(self, start_time, interval_sec=None):
if not interval_sec:
interval_sec = self.__interval_sec
cur_time = time.time()
sleep = interval_sec - (cur_time - start_time)
if sleep > 0.:
sleep_counter = math.floor(sleep * 10)
sleep_frac = sleep - (sleep_counter / 10.)
for i in range(sleep_counter):
if not self.__keeping_connection:
break
time.sleep(0.1)
time.sleep(sleep_frac)
File renamed without changes.