Skip to content

Commit

Permalink
[#6] - Callback functions in cloud_comm
Browse files Browse the repository at this point in the history
Allow the user to register a callback function to be executed when a
full message is received (over UDP)

I've also fixed a couple failing unit tests.
  • Loading branch information
ZacBlanco committed Jan 24, 2017
1 parent 8ba0b24 commit a56453d
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 10 deletions.
38 changes: 33 additions & 5 deletions cloud_ksvd/cloud_comm.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import socket
import threading
import time
import inspect
import json
import logging
import math
import socket
import struct
import logging
import consensus
import threading
import time
import inspect
from types import *
from threading import Lock

import consensus

logging.basicConfig(filename='cloud_comm.log')

TAG_SIZE = 4
Expand Down Expand Up @@ -181,6 +185,28 @@ def __init__(self, protocol, listen_port, send_port=None):
self.tmp_data = {}
self.data_store = {}
self.mtu = get_mtu()
self.recv_callback = None

def register_recv_callback(self, callback):
'''Allows one to register a callback function which is executed whenever a
full message is received and decoded.
The callback must have the signature of ``(str, bytes, bytes)`` where ``str`` is the sender address.
The first ``bytes`` is the tag. The second ``bytes`` is the data.
Args:
callback (func): A function with arguments of (sender, tag, data)
Returns:
N/A
'''
if type(callback) != FunctionType:
raise TypeError("Callback must be a function")
fullspec = inspect.getfullargspec(callback)
n_args = len(fullspec[0])
if n_args != 3:
raise ValueError("Callback function did not have 3 arguments.")
self.recv_callback = callback

def close(self):
'''Closes both listening sockets and the sending sockets
Expand Down Expand Up @@ -493,5 +519,7 @@ def receive(self, data, addr):
lock.acquire()
self.data_store[addr][data_tag] = reassembled
lock.release()
if self.recv_callback != None:
self.recv_callback(addr, data_tag, reassembled) # run a callback on the newly collected packets.
self.tmp_data[addr][data_tag]['packets'] = {}
self.tmp_data[addr][data_tag]['seq_total'] = {}
19 changes: 19 additions & 0 deletions cloud_ksvd/tests/test_cloud_comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,25 @@ def test_multi_get(self):
s2 = c1.get('local', 'tg11'.encode('utf-8'))
self.assertNotEqual(s2, s, "Should not be able to retrieve the same data again")
c1.close()

def test_register_callback(self):
def cbk(a, b, c):
return "callback"
def bck(a, b):
return "bad"
c1 = Communicator('udp', 9071)
c1.register_recv_callback(cbk)

# Should raise error on non-function
with self.assertRaises(TypeError):
c1.register_recv_callback("a")

# Should raise error on bad function signature
with self.assertRaises(ValueError):
c1.register_recv_callback(bck)






Expand Down
6 changes: 1 addition & 5 deletions cloud_ksvd/tests/test_node_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,8 @@ def test_kickoff(self, mock2, mock1, mock3, mock4, mock5):
consensus.run = MagicMock()
task = n.TASK_RUNNING
n.kickoff(task, 20)
self.assertEqual(mock1.call_count, 5)
mock1.assert_any_call('http://192.168.2.180:9090/start/consensus?tc=20')
mock1.assert_any_call('http://192.168.2.181:9090/start/consensus?tc=20')
mock1.assert_any_call('http://192.168.2.182:9090/start/consensus?tc=20')
self.assertEqual(mock1.call_count, 1)
mock1.assert_any_call('http://192.168.2.183:9090/start/consensus?tc=20')
mock1.assert_any_call('http://192.168.2.184:9090/start/consensus?tc=20')


def test_load_data(self):
Expand Down

0 comments on commit a56453d

Please sign in to comment.