Skip to content

Commit

Permalink
more detailed connection statistics, first char recv bugfix, double c…
Browse files Browse the repository at this point in the history
…onnection bugfix, websocket send queue, loading screen hide bugfix on slow connection, disable user reload
  • Loading branch information
shortcutme committed Feb 25, 2015
1 parent 31d4609 commit e8368a8
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 56 deletions.
79 changes: 46 additions & 33 deletions src/Connection/Connection.py
Expand Up @@ -15,24 +15,35 @@ def __init__(self, server, ip, port, sock=None):
self.port = port
self.peer_id = None # Bittorrent style peer id (not used yet)
self.id = server.last_connection_id
self.protocol = "?"
server.last_connection_id += 1
self.protocol = "?"

self.server = server
self.log = logging.getLogger(str(self))
self.unpacker = msgpack.Unpacker() # Stream incoming socket messages here
self.req_id = 0 # Last request id
self.handshake = None # Handshake info got from peer
self.event_handshake = gevent.event.AsyncResult() # Solves on handshake received
self.event_connected = gevent.event.AsyncResult() # Solves on handshake received
self.closed = False

self.zmq_sock = None # Zeromq sock if outgoing connection
self.zmq_queue = [] # Messages queued to send
self.zmq_working = False # Zmq currently working, just add to queue
self.forward_thread = None # Zmq forwarder thread

# Stats
self.start_time = time.time()
self.last_recv_time = 0
self.last_message_time = 0
self.last_send_time = 0
self.last_sent_time = 0
self.incomplete_buff_recv = 0
self.bytes_recv = 0
self.bytes_sent = 0
self.last_ping_delay = None
self.last_req_time = 0

self.waiting_requests = {} # Waiting sent requests
if not sock: self.connect() # Not an incoming connection, connect to peer


def __str__(self):
Expand All @@ -44,12 +55,13 @@ def __repr__(self):

# Open connection to peer and wait for handshake
def connect(self):
self.log.debug("Connecting...")
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.ip, self.port))
# Detect protocol
self.send({"cmd": "handshake", "req_id": 0, "params": self.handshakeInfo()})
gevent.spawn(self.messageLoop)
return self.event_handshake.get() # Wait for handshake
return self.event_connected.get() # Wait for first char



Expand All @@ -61,7 +73,7 @@ def handleIncomingConnection(self, sock):

self.protocol = "zeromq"
self.log.name = str(self)
self.event_handshake.set(self.protocol)
self.event_connected.set(self.protocol)

if self.server.zmq_running:
zmq_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
Expand All @@ -80,14 +92,19 @@ def handleIncomingConnection(self, sock):
# Message loop for connection
def messageLoop(self, firstchar=None):
sock = self.sock
if not firstchar: firstchar = sock.recv(1)
try:
if not firstchar: firstchar = sock.recv(1)
except Exception, err:
self.log.debug("Socket firstchar error: %s" % Debug.formatException(err))
self.close()
return False
if firstchar == "\xff": # Backward compatibility to zmq
self.sock.close() # Close normal socket
if zmq:
if config.debug_socket: self.log.debug("Connecting as ZeroMQ")
self.protocol = "zeromq"
self.log.name = str(self)
self.event_handshake.set(self.protocol) # Mark handshake as done
self.event_connected.set(self.protocol) # Mark handshake as done

try:
context = zmq.Context()
Expand All @@ -105,39 +122,26 @@ def messageLoop(self, firstchar=None):
else: # Normal socket
self.protocol = "v2"
self.log.name = str(self)
self.event_handshake.set(self.protocol) # Mark handshake as done
self.event_connected.set(self.protocol) # Mark handshake as done

unpacker = self.unpacker
unpacker.feed(firstchar) # Feed the first char we already requested
try:
while True:
buff = sock.recv(16*1024)
if not buff: break # Connection closed
self.last_recv_time = time.time()
self.incomplete_buff_recv += 1
self.bytes_recv += len(buff)
unpacker.feed(buff)
for message in unpacker:
self.incomplete_buff_recv = 0
self.handleMessage(message)
except Exception, err:
self.log.debug("Socket error: %s" % Debug.formatException(err))
self.close() # MessageLoop ended, close connection


# Read one line (not used)
def recvLine(self):
sock = self.sock
data = sock.recv(16*1024)
if not data: return
if not data.endswith("\n"): # Multipart, read until \n
buff = StringIO()
buff.write(data)
while not data.endswith("\n"):
data = sock.recv(16*1024)
if not data: break
buff.write(data)
return buff.getvalue().strip("\n")

return data.strip("\n")


# My handshake info
def handshakeInfo(self):
return {
Expand All @@ -150,12 +154,15 @@ def handshakeInfo(self):

# Handle incoming message
def handleMessage(self, message):
self.last_message_time = time.time()
if message.get("cmd") == "response": # New style response
if message["to"] in self.waiting_requests:
self.waiting_requests[message["to"]].set(message) # Set the response to event
del self.waiting_requests[message["to"]]
elif message["to"] == 0: # Other peers handshake
if config.debug_socket: self.log.debug("Got handshake response: %s" % message)
ping = time.time()-self.start_time
if config.debug_socket: self.log.debug("Got handshake response: %s, ping: %s" % (message, ping))
self.last_ping_delay = ping
self.handshake = message
self.port = message["fileserver_port"] # Set peer fileserver port
else:
Expand All @@ -180,29 +187,35 @@ def handleMessage(self, message):


# Send data to connection
def send(self, data):
if config.debug_socket: self.log.debug("Send: %s" % data.get("cmd"))
def send(self, message):
if config.debug_socket: self.log.debug("Send: %s, to: %s, req_id: %s" % (message.get("cmd"), message.get("to"), message.get("req_id")))
self.last_send_time = time.time()
if self.protocol == "zeromq":
if self.zmq_sock: # Outgoing connection
self.zmq_queue.append(data)
self.zmq_queue.append(message)
if self.zmq_working:
self.log.debug("ZeroMQ already working...")
return
while self.zmq_queue:
self.zmq_working = True
data = self.zmq_queue.pop(0)
self.zmq_sock.send(msgpack.packb(data))
message = self.zmq_queue.pop(0)
self.zmq_sock.send(msgpack.packb(message))
self.handleMessage(msgpack.unpackb(self.zmq_sock.recv()))
self.zmq_working = False

else: # Incoming request
self.server.zmq_sock.send(msgpack.packb(data))
self.server.zmq_sock.send(msgpack.packb(message))
else: # Normal connection
self.sock.sendall(msgpack.packb(data))
data = msgpack.packb(message)
self.bytes_sent += len(data)
self.sock.sendall(data)
self.last_sent_time = time.time()
if config.debug_socket: self.log.debug("Sent: %s, to: %s, req_id: %s" % (message.get("cmd"), message.get("to"), message.get("req_id")))


# Create and send a request to peer
def request(self, cmd, params={}):
self.last_req_time = time.time()
self.req_id += 1
data = {"cmd": cmd, "req_id": self.req_id, "params": params}
event = gevent.event.AsyncResult() # Create new event for response
Expand Down
21 changes: 14 additions & 7 deletions src/Connection/ConnectionServer.py
Expand Up @@ -13,7 +13,7 @@ def __init__(self, ip=None, port=None, request_handler=None):
self.ip = ip
self.port = port
self.last_connection_id = 1 # Connection id incrementer
self.log = logging.getLogger(__name__)
self.log = logging.getLogger("ConnServer")

self.connections = [] # Connections
self.ips = {} # Connection by ip
Expand Down Expand Up @@ -57,18 +57,25 @@ def handleIncomingConnection(self, sock, addr):



def connect(self, ip=None, port=None, peer_id=None):
def getConnection(self, ip=None, port=None, peer_id=None):
if peer_id and peer_id in self.peer_ids: # Find connection by peer id
return self.peer_ids.get(peer_id)
connection = self.peer_ids.get(peer_id)
connection.event_connected.get() # Wait for connection
return connection
if ip in self.ips: # Find connection by ip
return self.ips[ip]
connection = self.ips[ip]
connection.event_connected.get() # Wait for connection
return connection

# No connection found yet
try:
connection = Connection(self, ip, port)
self.ips[ip] = connection
self.connections.append(connection)
connection.connect()
except Exception, err:
self.log.debug("%s Connect error: %s" % (ip, Debug.formatException(err)))
connection.close()
raise err
return connection

Expand All @@ -77,10 +84,10 @@ def connect(self, ip=None, port=None, peer_id=None):
def removeConnection(self, connection):
if self.ips.get(connection.ip) == connection: # Delete if same as in registry
del self.ips[connection.ip]
if connection in self.connections:
self.connections.remove(connection)
if connection.peer_id and self.peer_ids.get(connection.peer_id) == connection: # Delete if same as in registry
del self.peer_ids[connection.peer_id]
if connection in self.connections:
self.connections.remove(connection)


def zmqServer(self):
Expand Down Expand Up @@ -204,7 +211,7 @@ def testZmqSlowClient(num):

def testConnection():
global server
connection = server.connect("127.0.0.1", 1234)
connection = server.getConnection("127.0.0.1", 1234)
connection.send({"res": "Sending: Hello!"})
print connection

Expand Down
4 changes: 2 additions & 2 deletions src/File/FileServer.py
Expand Up @@ -22,9 +22,9 @@ def __init__(self):
# Handle request to fileserver
def handleRequest(self, connection, message):
if "params" in message:
self.log.debug("FileRequest: %s %s %s" % (message["cmd"], message["params"].get("site"), message["params"].get("inner_path")))
self.log.debug("FileRequest: %s %s %s %s" % (str(connection), message["cmd"], message["params"].get("site"), message["params"].get("inner_path")))
else:
self.log.debug("FileRequest: %s" % req["cmd"])
self.log.debug("FileRequest: %s %s" % (str(connection), req["cmd"]))
req = FileRequest(self, connection)
req.route(message["cmd"], message.get("req_id"), message.get("params"))

Expand Down
14 changes: 9 additions & 5 deletions src/Peer/Peer.py
Expand Up @@ -28,15 +28,18 @@ def __init__(self, ip, port, site=None):

# Connect to host
def connect(self):
if self.connection: self.connection.close()
if not self.log: self.log = logging.getLogger("Peer:%s:%s %s" % (self.ip, self.port, self.site.address_short))
if self.connection:
self.log.debug("Getting connection (Closing %s)..." % self.connection)
self.connection.close()
else:
self.log.debug("Getting connection...")
self.connection = None
if not self.log: self.log = logging.getLogger("Peer:%s:%s" % (self.ip, self.port))

self.log.debug("Connecting...")
try:
self.connection = self.connection_server.connect(self.ip, self.port)
self.connection = self.connection_server.getConnection(self.ip, self.port)
except Exception, err:
self.log.debug("Connecting error: %s" % Debug.formatException(err))
self.log.debug("Getting connection error: %s" % Debug.formatException(err))
self.onConnectionError()

def __str__(self):
Expand Down Expand Up @@ -118,6 +121,7 @@ def ping(self):
break # All fine, exit from for loop
# Timeout reached or bad response
self.onConnectionError()
self.connect()
time.sleep(1)

if response_time:
Expand Down
3 changes: 3 additions & 0 deletions src/Site/Site.py
Expand Up @@ -416,6 +416,9 @@ def verifyFiles(self, quick_check=False): # Fast = using file size
self.needFile("content.json", update=True) # Force update to fix corrupt file
self.content_manager.loadContent() # Reload content.json
for content_inner_path, content in self.content_manager.contents.items():
if not os.path.isfile(self.getPath(content_inner_path)): # Missing content.json file
self.log.error("[MISSING] %s" % content_inner_path)
bad_files.append(content_inner_path)
for file_relative_path in content["files"].keys():
file_inner_path = self.content_manager.toDir(content_inner_path)+file_relative_path # Relative to content.json
file_inner_path = file_inner_path.strip("/") # Strip leading /
Expand Down
38 changes: 35 additions & 3 deletions src/Ui/UiRequest.py
Expand Up @@ -276,22 +276,54 @@ def actionConsole(self):
raise Exception("Here is your console")


def formatTableRow(self, row):
back = []
for format, val in row:
if val == None:
formatted = "n/a"
elif format == "since":
if val:
formatted = "%.0f" % (time.time()-val)
else:
formatted = "n/a"
else:
formatted = format % val
back.append("<td>%s</td>" % formatted)
return "<tr>%s</tr>" % "".join(back)

def actionStats(self):
import gc, sys
from greenlet import greenlet
greenlets = [obj for obj in gc.get_objects() if isinstance(obj, greenlet)]
self.sendHeader()
main = sys.modules["src.main"]
yield """
<style>
* { font-family: monospace }
table * { text-align: right; padding: 0px 10px }
</style>
"""

yield "<pre>"
yield "Connections (%s):<br>" % len(main.file_server.connections)
yield "<table><tr> <th>id</th> <th>protocol</th> <th>ip</th> <th>zmqs</th> <th>ping</th> <th>buff</th> <th>idle</th> <th>delay</th> <th>sent</th> <th>received</th> </tr>"
for connection in main.file_server.connections:
yield "%s: %s %s<br>" % (connection.protocol, connection.ip, connection.zmq_sock)
yield self.formatTableRow([
("%3d", connection.id),
("%s", connection.protocol),
("%s", connection.ip),
("%s", bool(connection.zmq_sock)),
("%6.3f", connection.last_ping_delay),
("%s", connection.incomplete_buff_recv),
("since", max(connection.last_send_time, connection.last_recv_time)),
("%.3f", connection.last_sent_time-connection.last_send_time),
("%.0fkB", connection.bytes_sent/1024),
("%.0fkB", connection.bytes_recv/1024)
])
yield "</table>"

yield "Greenlets (%s):<br>" % len(greenlets)
for thread in greenlets:
yield " - %s<br>" % cgi.escape(repr(thread))
yield "</pre>"


# - Tests -
Expand Down
16 changes: 12 additions & 4 deletions src/Ui/UiWebsocket.py
Expand Up @@ -15,6 +15,8 @@ def __init__(self, ws, site, server, user):
self.next_message_id = 1
self.waiting_cb = {} # Waiting for callback. Key: message_id, Value: function pointer
self.channels = [] # Channels joined to
self.sending = False # Currently sending to client
self.send_queue = [] # Messages to send to client


# Start listener loop
Expand Down Expand Up @@ -69,10 +71,16 @@ def cmd(self, cmd, params={}, cb = None):
def send(self, message, cb = None):
message["id"] = self.next_message_id # Add message id to allow response
self.next_message_id += 1
if cb: # Callback after client responsed
self.waiting_cb[message["id"]] = cb
if self.sending: return # Already sending
self.send_queue.append(message)
try:
self.ws.send(json.dumps(message))
if cb: # Callback after client responsed
self.waiting_cb[message["id"]] = cb
while self.send_queue:
self.sending = True
message = self.send_queue.pop(0)
self.ws.send(json.dumps(message))
self.sending = False
except Exception, err:
self.log.debug("Websocket send error: %s" % Debug.formatException(err))

Expand Down Expand Up @@ -177,7 +185,7 @@ def formatSiteInfo(self, site):
"next_size_limit": site.getNextSizeLimit(),
"last_downloads": len(site.last_downloads),
"peers": site.settings.get("peers", len(site.peers)),
"tasks": len([task["inner_path"] for task in site.worker_manager.tasks]),
"tasks": len(site.worker_manager.tasks),
"content": content
}
if site.settings["serving"] and content: ret["peers"] += 1 # Add myself if serving
Expand Down

0 comments on commit e8368a8

Please sign in to comment.