Skip to content

Commit

Permalink
Merge pull request #19 from CactusBot/rel-v0.3.2
Browse files Browse the repository at this point in the history
v0.3.2 to develop
  • Loading branch information
Innectic committed Jul 17, 2016
2 parents 5f811c1 + 7d2428e commit 6160fc2
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 111 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ target/
config.json
stats.json
data/*.db
data/*.sqlite

# Virtualenv
venv/
194 changes: 130 additions & 64 deletions beam.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Connects to Beam's chat and liveloading."""

from tornado.websocket import websocket_connect
from tornado.gen import coroutine
from tornado.ioloop import PeriodicCallback
Expand All @@ -12,7 +14,11 @@
from functools import partial
from json import dumps, loads

from re import match
import re
import time

from models import User, session
from datetime import datetime


class Beam:
Expand Down Expand Up @@ -52,7 +58,7 @@ def _init_logger(self, level="INFO", file_logging=True, **kwargs):
except ImportError:
colored_formatter = formatter
self.logger.warning(
"Module 'coloredlogs' unavailable; using ugly logging.")
"Module 'coloredlogs' unavailable; using normal logging.")

stream_handler = StreamHandler()
stream_handler.setLevel(level)
Expand All @@ -69,6 +75,22 @@ def _init_logger(self, level="INFO", file_logging=True, **kwargs):

self.logger.info("Logger initialized with level '{}'.".format(level))

def _init_users(self):
viewers = set(
user["userId"] for user in
self.get_chat_users(self.channel_data["id"]))

stored_users = set(
user[0] for user in session.query(User).with_entities(User.id))

for user in viewers - stored_users:
user = User(id=user, joins=1)
session.add(user)

session.commit()

self.logger.info("Successfully added new users to database.")

def _request(self, url, method="GET", **kwargs):
"""Send HTTP request to Beam."""
response = self.http_session.request(
Expand All @@ -95,13 +117,16 @@ def get_chat(self, id):
"""Get chat server data."""
return self._request("/chats/{id}".format(id=id))

def connect(self, channel_id, bot_id, silent=False):
def get_chat_users(self, id):
return self._request("/chats/{id}/users".format(id=id))

def connect(self, channel_id, bot_id, quiet=False):
"""Connect to a Beam chat through a websocket."""

self.connection_information = {
"channel_id": channel_id,
"bot_id": bot_id,
"silent": silent
"quiet": quiet
}

chat = self.get_chat(channel_id)
Expand All @@ -117,7 +142,7 @@ def connect(self, channel_id, bot_id, silent=False):
websocket_connection = websocket_connect(
self.servers[self.server_offset])

if silent:
if quiet is True:
websocket_connection.add_done_callback(
partial(self.authenticate, channel_id))
else:
Expand All @@ -127,6 +152,8 @@ def connect(self, channel_id, bot_id, silent=False):
def authenticate(self, *args):
"""Authenticate session to a Beam chat through a websocket."""

backoff = 0

future = args[-1]
if future.exception() is None:
self.websocket = future.result()
Expand All @@ -135,23 +162,48 @@ def authenticate(self, *args):

self.send_message(*args[:-1], method="auth")

if self.quiet:
self.http_session = Session()

self.read_chat(self.handle)
else:
raise ConnectionError(future.exception())
self.logger.error("There was an issue connecting.")
self.logger.error("Trying again in {} seconds.".format(backoff))

time.sleep(min(2**backoff, 60))
backoff += 1

self.authenticate(*args)

def send_message(self, *args, method="msg"):
"""Send a message to a Beam chat through a websocket."""

if self.quiet and method != "auth":
if self.quiet is True:
return

if method == "msg":
args = (self.quiet, r'\n'.join(args))
elif method == "whisper":
args = (
self.quiet,
"> {args[0]} | {args[1]}".format(
args=args,
)
)
method = "whisper"

if method == "msg":
for message in args:
message_packet = {
"type": "method",
"method": "msg",
"arguments": (message,),
"id": self.message_id
}
self.websocket.write_message(dumps(message_packet))
self.message_id += 1
for chunk in re.findall(r'.{1,250}', message):
message_packet = {
"type": "method",
"method": "msg",
"arguments": (chunk,),
"id": self.message_id
}
self.websocket.write_message(dumps(message_packet))
self.message_id += 1

else:
message_packet = {
Expand All @@ -164,8 +216,8 @@ def send_message(self, *args, method="msg"):
self.message_id += 1

if method == "whisper":
self.logger.info("$ [{bot_name} > {user}] {message}".format(
bot_name=self.config["auth"]["username"],
self.logger.info("$ [{bot} > {user}] {message}".format(
bot=self.config["auth"]["username"],
user=args[0],
message=args[1]))

Expand All @@ -184,45 +236,39 @@ def read_chat(self, handler=None):
if message is None:
self.logger.warning(
"Connection to chat server lost. Attempting to reconnect.")

self.server_offset += 1
self.server_offset %= len(self.servers)

self.logger.debug("Connecting to: {server}.".format(
server=self.servers[self.server_offset]))

websocket_connection = websocket_connect(
self.servers[self.server_offset])

# NOTE: We'll remove these try/excepts in the future
# Current just for debugging, we need to see the returned
# values from self.get_chat()
try:
authkey = self.get_chat(
self.connection_information["channel_id"])["authkey"]
except TypeError as e:
self.logger.warning("Caught crash-worthy error!")
self.logger.warning(repr(e))
self.logger.warning(self.get_chat(
self.connection_information["channel_id"]))

# Skip this loop
continue

if self.connection_information["silent"]:
websocket_connection.add_done_callback(
partial(
self.authenticate,
self.connection_information["channel_id"]
)
)
except TypeError:
self.logger.error("Couldn't get the auth key from data.")
self.read_chat(self.handle)
else:
websocket_connection.add_done_callback(
partial(
self.authenticate,
self.connection_information["channel_id"],
self.connection_information["bot_id"],
authkey
if self.connection_information["quiet"]:
return websocket_connection.add_done_callback(
partial(
self.authenticate,
self.connection_information["channel_id"]
)
)
else:
return websocket_connection.add_done_callback(
partial(
self.authenticate,
self.connection_information["channel_id"],
self.connection_information["bot_id"],
authkey
)
)
)

else:
response = loads(message)
Expand All @@ -234,6 +280,12 @@ def read_chat(self, handler=None):

def connect_to_liveloading(self, channel_id, user_id):
"""Connect to Beam liveloading."""

self.liveloading_connection_information = {
"channel_id": channel_id,
"user_id": user_id
}

liveloading_websocket_connection = websocket_connect(
"wss://realtime.beam.pro/socket.io/?EIO=3&transport=websocket")
liveloading_websocket_connection.add_done_callback(
Expand All @@ -253,6 +305,7 @@ def subscribe_to_liveloading(self, channel_id, user_id, future):
"channel:{channel_id}:followed",
"channel:{channel_id}:subscribed",
"channel:{channel_id}:resubscribed",
"channel:{channel_id}:hosted",
"user:{user_id}:update"
)
self.subscribe_to_interfaces(
Expand All @@ -269,30 +322,27 @@ def subscribe_to_liveloading(self, channel_id, user_id, future):
else:
self.logger.warning(future.exception())
self.connect_to_liveloading(channel_id, user_id)
# raise ConnectionError(future.exception())

def subscribe_to_interfaces(self, *interfaces):
"""Subscribe to a Beam liveloading interface."""
for interface in interfaces:
packet = [
"put",
{
"method": "put",
"headers": {},
"data": {
"slug": [
interface
]
},
"url": "/api/v1/live"
}
]
self.liveloading_websocket.write_message('420' + dumps(packet))

packet = [
"put",
{
"method": "put",
"headers": {},
"data": {
"slug": interfaces
},
"url": "/api/v1/live"
}
]
self.liveloading_websocket.write_message('420' + dumps(packet))

def parse_liveloading_message(self, message):
"""Parse a message received from the Beam liveloading websocket."""

sections = match("(\d+)(.+)?$", message).groups()
sections = re.match(r"(\d+)(.+)?$", message).groups()

return {
"code": sections[0],
Expand All @@ -318,8 +368,11 @@ def watch_liveloading(self, handler=None):
message = yield self.liveloading_websocket.read_message()

if message is None:
self.logger.info("There was an error connecting.")
raise ConnectionError
self.logger.info("Connection to Liveloading lost.")
self.logger.info("Attempting to reconnect.")

return self.connect_to_liveloading(
**self.liveloading_connection_information)

self.logger.info("Attempting to reconnect.")
self.watch_liveloading()
Expand All @@ -334,12 +387,25 @@ def watch_liveloading(self, handler=None):
if packet["data"][1].get("following"):
self.logger.info("- {} followed.".format(
packet["data"][1]["user"]["username"]))
self.send_message(
"Thanks for the follow, @{}!".format(
packet["data"][1]["user"]["username"]))

user = session.query(User).filter_by(
id=packet["data"][1]["user"]["id"]).first()
if user and (datetime.now() - user.follow_date).days:
self.send_message(
"Thanks for the follow, @{}!".format(
packet["data"][1]["user"]["username"]))
user.follow_date = datetime.now()
session.add(user)
session.commit()
elif packet["data"][1].get("subscribed"):
self.logger.info("- {} subscribed.".format(
packet["data"][1]["user"]["username"]))
self.send_message(
"Thanks for the subscription, @{}! <3".format(
packet["data"][1]["user"]["username"]))
elif packet["data"][1].get("hoster"):
self.logger.info("- {} hosted the channel.".format(
packet["data"][1]["hoster"]["token"]))
self.send_message(
"Thanks for the hosting the channel, @{}!".format(
packet["data"][1]["hoster"]["token"]))

0 comments on commit 6160fc2

Please sign in to comment.