Skip to content

Commit

Permalink
Merge pull request #50 from CactusDev/rel-v0.3.6
Browse files Browse the repository at this point in the history
Rel v0.3.6
  • Loading branch information
Innectic committed Oct 1, 2016
2 parents fcd1ef1 + 85b686a commit 69db9d2
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 107 deletions.
177 changes: 74 additions & 103 deletions beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from tornado.websocket import websocket_connect
from tornado.gen import coroutine
from tornado.ioloop import PeriodicCallback

from requests import Session
from requests.compat import urljoin
Expand All @@ -18,7 +17,6 @@
import time

from models import User, session
from datetime import datetime


class Beam:
Expand Down Expand Up @@ -294,134 +292,107 @@ def read_chat(self, handler=None):
if callable(handler):
handler(response)

def connect_to_liveloading(self, channel_id, user_id):
def connect_to_constellation(self, channel_id, user_id):
"""Connect to Beam liveloading."""

self.liveloading_connection_information = {
self.constellation_connection_information = {
"channel_id": channel_id,
"user_id": user_id
}

liveloading_websocket_connection = websocket_connect(
"wss://realtime.beam.pro/socket.io/?EIO=3&transport=websocket")
liveloading_websocket_connection.add_done_callback(
constellation_websocket_connection = websocket_connect(
"wss://constellation.beam.pro")
constellation_websocket_connection.add_done_callback(
partial(self.subscribe_to_liveloading, channel_id, user_id))

def subscribe_to_liveloading(self, channel_id, user_id, future):
"""Subscribe to Beam liveloading."""
"""Subscribe to Beam constellation."""

if future.exception() is None:
self.liveloading_websocket = future.result()
self.constellation_websocket = future.result()

self.logger.info(
"Successfully connected to liveloading websocket.")

interfaces = (
"channel:{channel_id}:update",
"channel:{channel_id}:followed",
"channel:{channel_id}:subscribed",
"channel:{channel_id}:resubscribed",
"channel:{channel_id}:hosted",
"user:{user_id}:update"
)
self.subscribe_to_interfaces(
*tuple(
interface.format(channel_id=channel_id, user_id=user_id)
for interface in interfaces
)
)
"Successfully connected to constellation websocket.")

interfaces = [
"channel:{channel}:update".format(channel=channel_id),
"channel:{channel}:followed".format(channel=channel_id),
"channel:{channel}:subscribed".format(channel=channel_id),
"channel:{channel}:resubscribed".format(channel=channel_id),
"channel:{channel}:hosted".format(channel=channel_id),
"user:{user}:update".format(user=user_id)
]
self.subscribe_to_interfaces(interfaces)

self.logger.info(
"Successfully subscribed to liveloading interfaces.")
"Successfully subscribed to Constellation interfaces.")

self.watch_liveloading()
self.watch_constellation()
else:
self.logger.warning(future.exception())
self.connect_to_liveloading(channel_id, user_id)

def subscribe_to_interfaces(self, *interfaces):
"""Subscribe to a Beam liveloading interface."""

packet = [
"put",
{
"method": "put",
"headers": {},
"data": {
"slug": interfaces
},
"url": "/api/v1/live"
}
]
self.liveloading_websocket.write_message('420' + dumps(packet))

def parse_liveloading_message(self, message):
"""Parse a message received from the Beam liveloading websocket."""
self.connect_to_constellation(channel_id, user_id)

sections = re.match(r"(\d+)(.+)?$", message).groups()
def subscribe_to_interfaces(self, interfaces: list):
"""Subscribe to a Beam constellation interface."""

return {
"code": sections[0],
"data": loads(sections[1]) if sections[1] is not None else None
packet = {
"type": "method",
"method": "livesubscribe",
"params": {
"events": interfaces
},
"id": 1
}
self.constellation_websocket.write_message(dumps(packet))

def parse_constellation_message(self, packet):
try:
packet = loads(packet)
except:
return ""
else:
if "data" in packet and "payload" in packet["data"]:
return packet["data"]
else:
return ""

@coroutine
def watch_liveloading(self, handler=None):
def watch_constellation(self):
"""Watch and handle packets from the Beam liveloading websocket."""

response = yield self.liveloading_websocket.read_message()
response = yield self.constellation_websocket.read_message()
if response is None:
raise ConnectionError

packet = self.parse_liveloading_message(response)

PeriodicCallback(
partial(self.liveloading_websocket.write_message, '2'),
packet["data"]["pingInterval"]
).start()

while True:
message = yield self.liveloading_websocket.read_message()

if message is None:
self.logger.info("Connection to Liveloading lost.")
self.logger.info("Attempting to reconnect.")

return self.connect_to_liveloading(
**self.liveloading_connection_information)

self.logger.info("Attempting to reconnect.")
self.watch_liveloading()

packet = self.parse_liveloading_message(message)

if packet.get("data") is not None:
self.logger.debug("LIVE: {}".format(packet))

if isinstance(packet["data"], list):
if isinstance(packet["data"][0], str):
if packet["data"][1].get("following"):
self.logger.info("- {} followed.".format(
packet["data"][1]["user"]["username"]))

user = session.query(User).filter_by(
id=packet["data"][1]["user"]["id"]).first()
if user and (datetime.now() - user.follow_date).days:
self.send_message(
"Thanks for the follow, @{}!".format(
packet["data"][1]["user"]["username"]))
user.follow_date = datetime.now()
session.add(user)
session.commit()
elif packet["data"][1].get("subscribed"):
self.logger.info("- {} subscribed.".format(
packet["data"][1]["user"]["username"]))
self.send_message(
"Thanks for the subscription, @{}! <3".format(
packet["data"][1]["user"]["username"]))
elif packet["data"][1].get("hoster"):
self.logger.info("- {} hosted the channel.".format(
packet["data"][1]["hoster"]["token"]))
message = yield self.constellation_websocket.read_message()
message = self.parse_constellation_message(message)
if message is None or message is "":
pass
else:
self.logger.debug("LIVE: {}".format(message))
if "followed" in message["channel"]:
if message["payload"]["following"]:
self.send_message(
"Thanks for hosting the channel, @{}!".format(
packet["data"][1]["hoster"]["token"]))
"Thanks for the follow, @{} !".format(
message["payload"]["user"]["username"]))
self.logger.info("- {} followed.".format(
message["payload"]["user"]["username"]))
elif "subscribed" in message["channel"]:
self.send_message("Thanks for subscribing, @{} !".format(
message["payload"]["user"]["username"]
))
elif "resubscribed" in message["channel"]:
self.send_message("Thanks for subscribing, @{} !".format(
message["payload"]["user"]["username"]
))

# if message is None:
# self.logger.info("Connection to Constellation lost.")
# self.logger.info("Attempting to reconnect.")

# return self.connect_to_constellation(
# **self.constellation_connection_information)

# self.logger.info("Attempting to reconnect.")
# self.watch_constellation()
2 changes: 1 addition & 1 deletion cactus.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def run(self, *args, **kwargs):

def connect_liveloading():
try:
self.connect_to_liveloading(
self.connect_to_constellation(
self.channel_data["id"],
self.channel_data["userId"])
except ConnectionError as e:
Expand Down
9 changes: 6 additions & 3 deletions models.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,11 +491,14 @@ def __call__(self, args, data):
if len(args) < 2:
return "Not enough arguments."
elif len(args) == 2:
user = re.match(r'@?([\w_-]*[a-z][\w_-]*', args[1])
user = re.match(r'@?([\w_-]*[a-z][\w_-])*', args[1], re.I)
user = str(user.group()).replace("@", "")

# user = user.group().replace("@", "")
if user is None:
return "Invalid username '{}'.".format(args[1])

channel_id = self.get_channel(user.group())
channel_id = self.get_channel(user)

if channel_id.get("statusCode") == 404:
return "User has not entered this channel."
Expand All @@ -506,7 +509,7 @@ def __call__(self, args, data):
query.friend = not query.friend
session.commit()
return "{}ed @{} as a friend.".format(
["Remov", "Add"][query.friend], user.group())
["Remov", "Add"][query.friend], user)
else:
return "User has not entered this channel."
elif len(args) > 2:
Expand Down

0 comments on commit 69db9d2

Please sign in to comment.