channel-box
it is a package for Starlette & FastAPI framework that allows
you send messages to named websocket channels from any part of your code.
Example of use:
- group chats
- notifications from backend
- alerts for user groups
https://github.com/Sobolev5/channel-box
To install run:
pip install channel-box
https://channel-box.andrey-sobolev.ru/
https://github.com/Sobolev5/channel-box/tree/master/example
http://nginx.org/en/docs/http/websocket.html
pip install uvicorn[standard]
from starlette.endpoints import WebSocketEndpoint
from channel_box import Channel, ChannelBox
class WsChatEndpoint(WebSocketEndpoint):
async def on_connect(self, websocket):
group_name = websocket.query_params.get(
"group_name"
) # group name */ws?group_name=MyChat
if group_name:
channel = Channel(
websocket,
expires=60 * 60,
payload_type="json",
) # Create new user channel
channel_add_status = await ChannelBox.add_channel_to_group(
channel=channel,
group_name=group_name,
) # Add channel to named group
sprint(channel_add_status)
await websocket.accept()
async def on_receive(self, websocket, data):
data = json.loads(data)
message = data["message"]
username = data["username"]
if message.strip():
payload = {
"username": username,
"message": message,
}
group_name = websocket.query_params.get("group_name")
if group_name:
await ChannelBox.group_send(
group_name=group_name,
payload=payload,
save_history=True,
) # Send to all user channels
## Send messages
Send message to any channel from any part of your code:
```python
from channel_box import ChannelBox
await ChannelBox.channel_send(
group_name="MyChat",
payload={
"username": "Message from any part of your code",
"message": "hello world"
},
save_history=True,
)
Show & flush groups:
from channel_box import ChannelBox
await ChannelBox.show_groups()
await ChannelBox.flush_groups()
Show & flush history:
from channel_box import ChannelBox
await ChannelBox.show_history()
await ChannelBox.flush_history()
tox