Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian Craggs committed Jan 7, 2014
0 parents commit 743aa60
Show file tree
Hide file tree
Showing 21 changed files with 2,914 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*.pyc
__pycache__
*~
298 changes: 298 additions & 0 deletions interoperability/MQTTV311_spec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,298 @@
"""
"""

import mbt, socket, time, _thread, sys, traceback, pdb, select, random, mqtt

clientlist = {}

test = None

class Clients:

def __init__(self):
self.msgid = 1
self.running = False
self.packets = []

def getNextMsgid(self):
def getWrappedMsgid():
id = self.msgid + 1
if id == 65535:
id = 1
return id
self.msgid = getWrappedMsgid()
return self.msgid

def __call__(self, sock):
mbt.log("*** running")
clientlist[sock] = self
self.running = True
try:
while True:
packet = MQTTV3.unpackPacket(MQTTV3.getPacket(sock))
if packet == None:
break
if test:
print("received result", packet)
test.addResult(packet)
else:
mbt.observe(packet)
if packet.fh.MessageType == MQTTV3.PUBREC:
mbt.execution.pools["pubrecs"].append(mbt.Choices((sock, packet)))
elif packet.fh.MessageType == MQTTV3.PUBLISH and packet.fh.QoS in [1, 2]:
mbt.execution.pools["publishes"].append(mbt.Choices((sock, packet)))
elif packet.fh.MessageType == MQTTV3.PUBREL:
mbt.execution.pools["pubrels"].append(mbt.Choices((sock, packet)))
elif packet.fh.MessageType == MQTTV3.CONNACK:
self.packets.append(packet)
except:
if sys.exc_info()[0] != socket.error:
print("unexpected exception", sys.exc_info())
mbt.log(traceback.format_exc())
self.running = False
del clientlist[sock]
mbt.log("*** stopping "+str(packet))

client = Clients()


@mbt.action
def socket_create(hostname : "hostnames", port : "ports") -> "socket":
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((hostname, port))
id = _thread.start_new_thread(client, (sock,))
return sock


"""
After socket_close, the socket object is not valid any more, so we need to indicate that it should be
thrown away.
"""
@mbt.action
def socket_close(sock : "socket"):
sock.shutdown(socket.SHUT_RDWR)
sock.close()

mbt.finishedWith(socket_close, "sock")


"""
protocol name valid, invalid
protocol version valid, invalid
clientID lengths 0, 1, 22, 23; characters?
cleansession true, false
will: topic, message, qos, retained
keepAlive 0, 60,
username None,
password None
"""
@mbt.action
def connect(sock : "socket", clientid : "clientids", cleansession : "boolean", #willmsg : "willmsgs",
# username : "usernames", password : "passwords"
) -> "connackrc":
connect = MQTTV3.Connects()
connect.ClientIdentifier = clientid
connect.CleanStart = cleansession
connect.KeepAliveTimer = 60
#if username:
# self.usernameFlag = True
# self.username = username
#if password:
# self.passwordFlag = True
# self.password = password
sock.send(connect.pack())
time.sleep(0.1)
response = clientlist[sock].packets.pop(0) #MQTTV3.unpackPacket(MQTTV3.getPacket(sock))
print("+++connect response", response)
if response == None or response.returnCode not in [0, 2]:
raise Exception("Return code "+str(response.returnCode)+" in connack")

#id = _thread.start_new_thread(client, (sock,))
return response.returnCode

def checksocket(sock):
time.sleep(0.1)
if sock not in clientlist.keys():
raise Exception("Must have been socket error")

@mbt.action
def disconnect(sock : "socket"):
disconnect = MQTTV3.Disconnects()
sock.send(disconnect.pack())
checksocket(sock)
#time.sleep(0.2)


@mbt.action
def subscribe(sock : "socket", topics : "topicLists", qoss : "qosLists"):
subscribe = MQTTV3.Subscribes()
subscribe.messageIdentifier = client.getNextMsgid()
count = 0
for t in topics:
subscribe.data.append((t, qoss[count]))
count += 1
sock.send(subscribe.pack())
checksocket(sock)
return subscribe.messageIdentifier


@mbt.action
def unsubscribe(sock : "socket", topics : "topicLists"):
unsubscribe = MQTTV3.Unsubscribes()
unsubscribe.messageIdentifier = client.getNextMsgid()
unsubscribe.data = topics
sock.send(unsubscribe.pack())
checksocket(sock)
return unsubscribe.messageIdentifier


@mbt.action
def publish(sock : "socket", topic : "topics", payload : "payloads", qos : "QoSs", retained : "boolean"):
publish = MQTTV3.Publishes()
publish.fh.QoS = qos
publish.fh.RETAIN = retained
if qos == 0:
publish.messageIdentifier = 0
else:
publish.messageIdentifier = client.getNextMsgid()
publish.topicName = topic
publish.data = payload
sock.send(publish.pack())
checksocket(sock)
return publish.messageIdentifier


@mbt.action
def pubrel(pubrec : "pubrecs"): # pubrecs are observable events
sock, pubrec = pubrec
pubrel = MQTTV3.Pubrels()
pubrel.messageIdentifier = pubrec.messageIdentifier
sock.send(pubrel.pack())

mbt.finishedWith(pubrel, "pubrec")

@mbt.action
def puback(publish : "publishes"):
sock, publish = publish
if publish.fh.QoS == 1:
puback = MQTTV3.Pubacks()
puback.messageIdentifier = publish.messageIdentifier
sock.send(puback.pack())
elif publish.fh.QoS == 2:
pubrec = MQTTV3.Pubrecs()
pubrec.messageIdentifier = publish.messageIdentifier
sock.send(pubrec.pack())

mbt.finishedWith(puback, "publish")

@mbt.action
def pubcomp(pubrel : "pubrels"):
sock, pubrel = pubrel
pubcomp = MQTTV3.Pubcomps()
pubcomp.messageIdentifier = pubrel.messageIdentifier
sock.send(pubcomp.pack())

mbt.finishedWith(pubcomp, "pubrel")

def pingreq():
pingreq = MQTTV3.Pingreqs()
sock.send(pingreq.pack())


#print(mbt.model.getActionNames())

"""
choice lists should be ordered but unique - ordered sets
options:
sequenced - add sequence number
frequency of choices (somehow)
"""

mbt.choices("boolean", (True, False))

mbt.choices("hostnames", ("localhost",))
mbt.choices("ports", (1883,))
mbt.choices("clientids", ("", "normal", "23 characters4567890123", "A clientid that is too long - should fail"))

topics = ("TopicA", "TopicA/B", "Topic/C", "TopicA/C", "/TopicA")
wildTopics = ("TopicA/+", "+/C", "#", "/#", "/+", "+/+")

mbt.choices("topics", topics)
mbt.choices("QoSs", (0, 1, 2))

mbt.choices("topicLists", [(t,) for t in topics + wildTopics])
mbt.choices("qosLists", [(0,), (1,), (2,)])


mbt.choices("payloads", ("", "1", "333", "long"*512), sequenced=True)

mbt.choices("connackrc", (0, 2), output=True)

mbt.model.addReturnType("pubrecs")
mbt.model.addReturnType("pubrels")
mbt.model.addReturnType("publishes")

mbt.model.maxobjects["socket"] = 1

last_free_names = set()
after_socket_create = set()

def select(frees):
global last_free_names, after_socket_create
free_names = set([f[0].getName() for f in frees])
print("*** after_socket_create", after_socket_create, last_free_names)
if last_free_names == set(['socket_create']):
diff = set(free_names).difference(after_socket_create)
print("*** diff", diff)
if diff == set():
frees = [f for f in frees if f[0].getName() == "connect"]
else:
curname = random.choice(list(diff))
frees = [f for f in frees if f[0].getName() == curname]
after_socket_create.add(curname)
else:
for f in frees:
if f[0].getName() in ["pubrel", "puback", "pubcomp"]:
frees = [f]
break
last_free_names = free_names
return frees

mbt.model.selectCallback = select


"""
stepping = False
if len(sys.argv) > 1:
stepping = True
#mbt.run(stepping=stepping)
def socket_check(a, b):
# <socket.socket object, fd=3, family=2, type=1, proto=0>
awords = str(a).split()
del awords[2]
astr = ''.join(awords)
bwords = str(b).split()
del bwords[2]
bstr = ''.join(bwords)
print("checking sockets", astr, "and", bstr)
return astr == bstr
def exception_check(a, b):
return True
checks = {"socket": socket_check, "exception": exception_check}
test = mbt.Tests(mbt.model, "spec.log", checks)
test.run(stepping=False)
"""

19 changes: 19 additions & 0 deletions interoperability/client_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import mqtt.client, time

if __name__ == "__main__":

callback = mqtt.client.Callback()

aclient = mqtt.client.Client("myclientid")
aclient.registerCallback(callback)

aclient.connect(port=1883)
aclient.disconnect()

aclient.connect(port=1883)
aclient.subscribe(["k"], [2])
aclient.publish("k", b"qos 0")
aclient.publish("k", b"qos 1", 1)
aclient.publish("k", b"qos 2", 2)
time.sleep(1.0)
aclient.disconnect()
49 changes: 49 additions & 0 deletions interoperability/mbt/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""
values of parameters to controllable actions can come from several sources:
1) created as a result of other actions, either controllable or observable
(keep a pool of results obtained)
2) specific sets of values enumerated in the model
"""

__version__ = 0.1

from mbt.main import Models, Executions, Choices, Tests, log

model = Models()
execution = None

def action(fn):
model.addAction(fn)
return fn

def choices(varname, values, sequenced=False, output=False):
model.addChoice(varname, values, output)

def finishedWith(fn, parm_name):
model.finishedWith(fn, parm_name)

def observe(event):
execution.addObservation(event)

def run(stepping=True):
global execution
execution = Executions(model)
execution.run(stepping)

def step(interactive=False):
global execution
if not execution:
execution = Executions(model)
return execution.step(interactive)






Loading

0 comments on commit 743aa60

Please sign in to comment.