# Implementing the MQTTSN protocol with Scapy.
Previously, we've shown how to use an off the shelf client (Paho) to publish MQTT events.
Using Scapy, we can also implement the protocol manually by constructing the correct packets.

In [1]:
from scapy.all import *
exec(open("MQTTSN.py").read())
# In this case we are sending packets to the local host.
# Due to limitations of the Loopback device, we need to force Scapy to send raw packets
conf.L3socket=L3RawSocket

serverIP = "192.168.3.99"
serverPort = 1884



The first step in communicating with the broker is to establish a connection.  The response to this packet should be an "MQTTSN_CONNACK" packet.  Note that we don't have to populate all the parameters of the packet, as Scapy is able to fill in some appropriate defaults.  It's possible that an ICMP error packet is returned if there is no broker operating at the given IP address.

In [2]:
# Establish a connection
sr1(IP(dst=serverIP)/UDP(sport=50000,dport=serverPort)/MQTTSN()/MQTTSN_CONNECT(client="test"))

Begin emission:
Finished to send 1 packets.

Received 23 packets, got 1 answers, remaining 0 packets


<IP  version=4 ihl=5 tos=0x0 len=31 id=23946 flags=DF frag=0 ttl=64 proto=udp chksum=0x2427 src=192.168.3.99 dst=172.19.72.254 options=[] |<UDP  sport=1884 dport=50000 len=11 chksum=0xb939 |<MQTTSN  len=3 type=CONNACK |<MQTTSN_CONNACK  returnCode=0 |>>>>

The next step is that we have to register a topic.  In this case, the broker should remember that we opened a connection from this source port.  In this case, we add some error handling to check the result packet.  The expected response is an "MQTTSN_REGACK" packet, which contains a numeric topicID.  MQTTSN often uses numeric topicIDs instead of strings to reduce the size of transmitted packets.  Hence we need to store this topicID for later use.

In [3]:
# Register a new topic.  Note the returned topicID
ack = sr1(IP(dst=serverIP)/UDP(sport=50000,dport=serverPort)/MQTTSN()/MQTTSN_REGISTER(topic="foobar"))
if(isinstance(ack[IP].payload, ICMP)):
    print("Error response:")
    ack[IP].payload.show()
else:
    if(not isinstance(ack[MQTTSN].payload, MQTTSN_REGACK)):
        print("Unknown response:")
        ack[MQTTSN].payload.show()
    else:
        topicID = ack[MQTTSN_REGACK].topicID
print("topicID = " + str(topicID))

Begin emission:
Finished to send 1 packets.

Received 18 packets, got 1 answers, remaining 0 packets
topicID = 1


Now that we have a topic ID, we can publish and subscribe different messages.  

In [4]:
# Publish on the topic, qos=0 implies no response expected.
send(IP(dst=serverIP)/UDP(sport=50000,dport=serverPort)/MQTTSN()/MQTTSN_PUBLISH(topicID=topicID, message="foo"))


Sent 1 packets.


In [5]:
# Subscribe to a topic.  Note the returned topicID
sr1(IP(dst=serverIP)/UDP(sport=50000,dport=serverPort)/MQTTSN()/MQTTSN_SUBSCRIBE(topic="foobar", messageID=1))

Begin emission:
Finished to send 1 packets.

Received 22 packets, got 1 answers, remaining 0 packets


<IP  version=4 ihl=5 tos=0x0 len=36 id=25313 flags=DF frag=0 ttl=64 proto=udp chksum=0x1ecb src=192.168.3.99 dst=172.19.72.254 options=[] |<UDP  sport=1884 dport=50000 len=16 chksum=0xb93e |<MQTTSN  len=8 type=SUBACK |<MQTTSN_SUBACK  dup=0 qos=0 retain=0 will=0 clean=0 topicIDtype=0 topicID=2 messageID=1 returnCode=0 |>>>>

In [None]:
# Publish on the topic again.  This time we get a PUBLISH response because we're subscribed.
sr1(IP(dst=serverIP)/UDP(sport=50000,dport=serverPort)/MQTTSN()/MQTTSN_PUBLISH(qos=0,topicID=topicID, message="foo"))

In [None]:
# Publish on the topic again with qos=1 (guaraunteed delivery).  This time we get a PUBACK acknowledgement.
sr1(IP(dst=serverIP)/UDP(sport=50000,dport=serverPort)/MQTTSN()/MQTTSN_PUBLISH(qos=1,topicID=topicID, message="foo"))

We can also wrap this up in a more abstract interface (like Paho.MQTT).  The key difference here is that we're generating MQTTSN traffic, rather than MQTT.

In [8]:
def valid_ack(ack,t):
    if(isinstance(ack[IP].payload, ICMP)):
        print("Error response:")
        ack[IP].payload.show()
        return False
    if(not isinstance(ack[MQTTSN].payload, t)):
        print("Unexpected response should have been "+ str(t)+":")
        ack.payload.show()
        return False
    return True

class MQTT_client:
    def __init__(self, serverIP, serverPort, name, verbose=1):
        self.serverIP = serverIP
        self.serverPort = serverPort
        self.client = name
        self.verbose = 0
    
    def __enter__(self):
        self.connect()
        # Fixme: on failure throw exception?
        return self
    
    def __exit__(self, type, value, traceback):
        self.disconnect()
        
    def connect(self):
        ### Establish a connection
        connack = sr1(IP(dst=self.serverIP)/UDP(sport=50000,dport=self.serverPort)/MQTTSN()/MQTTSN_CONNECT(client=self.client), verbose=self.verbose)
        return valid_ack(connack,MQTTSN_CONNACK)

    def disconnect(self):
        ### Destroy the connection
        disconnack = send(IP(dst=serverIP)/UDP(sport=50000,dport=serverPort)/MQTTSN()/MQTTSN_DISCONNECT(), verbose=self.verbose)
        # rsmb seems to respond with out the disconnect payload.
        # return valid_ack(disconnack,MQTTSN_DISCONNECT)
    
    def register(self, topic):
        ### Register the given topic.  Return the associated topicID
        regack = sr1(IP(dst=serverIP)/UDP(sport=50000,dport=serverPort)/MQTTSN()/MQTTSN_REGISTER(topic=topic), verbose=self.verbose)
        if not valid_ack(regack,MQTTSN_REGACK):
            return None
        return regack[MQTTSN_REGACK].topicID

    def publish(self, topicID, message, qos=1):
        ### Publish on the given topicID with the given message with qos=1 (guaraunteed delivery).  Return bool indicating success.
        frame = IP(dst=serverIP)/UDP(sport=50000,dport=serverPort)/MQTTSN()/MQTTSN_PUBLISH(qos=qos, topicID=topicID, message=message)
        if(qos == 0):
            send(frame, verbose=self.verbose)
        if(qos == 1):
            puback_frame = sr1(frame, verbose=self.verbose)
            if not valid_ack(puback_frame, MQTTSN_PUBACK):
                return False
        return True
   

with MQTT_client(serverIP, serverPort, "client-test") as client:
    topicID = client.register("temp")
    if client.publish(topicID, "bar"):
        print("Publish Succeeded")
    else:
        print("Publish Failed")


Publish Succeeded


Finally we can use the MQTT_client class to do something useful with the PYNQ board.  In this case, we read the local temperature sensor and write the result to the OLED screen.  In addition, the message is published to the MQTT server.   Is anyone listening, though?

In [None]:
from pynq import Overlay
from pynq.lib.pmod import Pmod_OLED
from pynq.lib.pmod import Pmod_TMP2
from pynq.lib.pmod import PMODB
from pynq.lib.pmod import PMODA
import timeit

ol = Overlay("base.bit")
ol.download()

pmod_oled = Pmod_OLED(PMODA)
mytmp = Pmod_TMP2(PMODB)
print(mytmp.read())
with MQTT_client(serverIP, serverPort, "client-temp") as client:
    temp_topicID = client.register("temp")
    for i in range(0,10):
        temperature = mytmp.read()
        pmod_oled.clear()
        timestr = str(timeit.default_timer())
        tempStr = timestr[:9] + " " + str(temperature) + " C"
        pmod_oled.write(tempStr)
        client.publish(temp_topicID, tempStr)

In [9]:
#Test how fast we can publish events.
from pynq import Overlay
from pynq.lib.pmod import Pmod_OLED
from pynq.lib.pmod import Pmod_TMP2
from pynq.lib.pmod import PMODB
from pynq.lib.pmod import PMODA
import timeit

ol = Overlay("base.bit")
ol.download()

pmod_oled = Pmod_OLED(PMODA)
mytmp = Pmod_TMP2(PMODB)

count = 40
#with sys_pipes():
with MQTT_client(serverIP, serverPort, "client-temp") as client:
    temp_topicID = client.register("temp")
    client.publish(topicID,"test")
    start_time = timeit.default_timer()
    temperature = mytmp.read()
    tempStr = str(temperature)
    for i in range(0,count):
        #pmod_oled.clear()
        #pmod_oled.write(tempStr)
        client.publish(temp_topicID, tempStr, qos=1)
        
    elapsed = timeit.default_timer() - start_time
    print(str(elapsed)+" seconds.")
    print(str(count/elapsed)+" messages/second.")

4.380008551999708 seconds.
9.132402260205145 messages/second.
