Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pipe argument #14

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
129 changes: 129 additions & 0 deletions PcapPipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#via https://github.com/cdealti/Adafruit_BLESniffer_Python

import logging
import os
import sys
import time

"""
PcapPipe.py: an Unix named pipe where PCAP packets are written.
This pipe represents the interface between the sniffer and Wireshark.
The original code has been posted on the Nordic Developer Zone [1][2] by
a Nordic employee and is not accompanied by a license.

[1] https://devzone.nordicsemi.com/blogs/750/ble-sniffer-in-linux-using-wireshark
[2] https://devzone.nordicsemi.com/attachment/74532982f9e4b627b4cddfec2cb585e7
"""

__author__ = "Stian"
__copyright__ = "Copyright (c) 2014, Nordic Semiconductor ASA"
__license__ = "MIT"
__version__ = "0.1.0"

class PcapPipe(object):
def open_and_init(self, pipeFilePath):
try:
os.mkfifo(pipeFilePath)
except OSError:
logging.warn("fifo already exists?")
raise SystemExit(1)
self._pipe = open(pipeFilePath, 'w')
self.write(self.makeGlobalHeader())

def write(self, message):
if not self._pipe: return
try:
self._pipe.write(''.join(map(chr, message)))
self._pipe.flush()
except IOError:
exc_type, exc_value, exc_tb = sys.exc_info()
logging.error('Got exception trying to write to pipe: %s', exc_value)
self.close()

def close(self):
logging.debug("closing pipe")
if not self._pipe: return
self._pipe.close()
self._pipe = None

def isOpen(self):
return self._pipe is not None and not self._pipe.closed

def newBlePacket(self, notification):
packet = notification.msg["packet"]
packetList = packet.getList()
snifferList = self.makePacketHeader(len(packetList) + 1) + [packet.boardId] + packetList
self.write(snifferList)

def makeGlobalHeader(self):
LINKTYPE_BLUETOOTH_LE_LL = 251
LINKTYPE_NORDIC_BLE = 157

MAGIC_NUMBER = 0xa1b2c3d4
VERSION_MAJOR = 2
VERSION_MINOR = 4
THISZONE = 0
SIGFIGS = 0
SNAPLEN = 0xFFFF
NETWORK = LINKTYPE_NORDIC_BLE

headerString = [
((MAGIC_NUMBER >> 0) & 0xFF),
((MAGIC_NUMBER >> 8) & 0xFF),
((MAGIC_NUMBER >> 16) & 0xFF),
((MAGIC_NUMBER >> 24) & 0xFF),
((VERSION_MAJOR >> 0) & 0xFF),
((VERSION_MAJOR >> 8) & 0xFF),
((VERSION_MINOR >> 0) & 0xFF),
((VERSION_MINOR >> 8) & 0xFF),
((THISZONE >> 0) & 0xFF),
((THISZONE >> 8) & 0xFF),
((THISZONE >> 16) & 0xFF),
((THISZONE >> 24) & 0xFF),
((SIGFIGS >> 0) & 0xFF),
((SIGFIGS >> 8) & 0xFF),
((SIGFIGS >> 16) & 0xFF),
((SIGFIGS >> 24) & 0xFF),
((SNAPLEN >> 0) & 0xFF),
((SNAPLEN >> 8) & 0xFF),
((SNAPLEN >> 16) & 0xFF),
((SNAPLEN >> 24) & 0xFF),
((NETWORK >> 0) & 0xFF),
((NETWORK >> 8) & 0xFF),
((NETWORK >> 16) & 0xFF),
((NETWORK >> 24) & 0xFF)
]

return headerString

def makePacketHeader(self, length):

if(os.name == 'posix'):
timeNow = time.time()
else:
timeNow = time.clock()

TS_SEC = int(timeNow)
TS_USEC = int((timeNow-TS_SEC)*1000000)
INCL_LENGTH = length
ORIG_LENGTH = length

headerString = [
((TS_SEC >> 0) & 0xFF),
((TS_SEC >> 8) & 0xFF),
((TS_SEC >> 16) & 0xFF),
((TS_SEC >> 24) & 0xFF),
((TS_USEC >> 0) & 0xFF),
((TS_USEC >> 8) & 0xFF),
((TS_USEC >> 16) & 0xFF),
((TS_USEC >> 24) & 0xFF),
((INCL_LENGTH >> 0) & 0xFF),
((INCL_LENGTH >> 8) & 0xFF),
((INCL_LENGTH >> 16) & 0xFF),
((INCL_LENGTH >> 24) & 0xFF),
((ORIG_LENGTH >> 0) & 0xFF),
((ORIG_LENGTH >> 8) & 0xFF),
((ORIG_LENGTH >> 16) & 0xFF),
((ORIG_LENGTH >> 24) & 0xFF)
]
return headerString
84 changes: 65 additions & 19 deletions sniffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,22 @@
import sys
import time
import argparse
import logging
from sys import platform

from SnifferAPI import Logger
from SnifferAPI import Sniffer
from SnifferAPI import CaptureFiles
from SnifferAPI.Devices import Device
from SnifferAPI.Devices import DeviceList

from PcapPipe import PcapPipe

mySniffer = None
"""@type: SnifferAPI.Sniffer.Sniffer"""

myPipe = None
"""@type: PcapPipe.PcapPipe"""

def setup(serport, delay=6):
"""
Expand Down Expand Up @@ -94,22 +99,47 @@ def selectDevice(devlist):
# This will start a new scan
return None

def loop():
"""Main loop printing some useful statistics"""
nLoops = 0
nPackets = 0
connected = False

while True:
time.sleep(0.1)

packets = mySniffer.getPackets()
nLoops += 1
nPackets += len(packets)

if args.verbose:
for packet in packets:
if packet.blePacket is not None:
# Display the raw BLE packet payload
# Note: 'BlePacket' is nested inside the higher level 'Packet' wrapper class
print packet.blePacket.payload
else:
print packet
else:
if connected != mySniffer.inConnection or nLoops % 20 == 0:
connected = mySniffer.inConnection
print "\rconnected: {}, packets: {}, missed: {}".format(mySniffer.inConnection, nPackets, mySniffer.missedPackets),
sys.stdout.flush()

def dumpPackets():
"""Dumps incoming packets to the display"""
# Get (pop) unprocessed BLE packets.
packets = mySniffer.getPackets()
# Display the packets on the screen in verbose mode
if args.verbose:
for packet in packets:
if packet.blePacket is not None:
# Display the raw BLE packet payload
# Note: 'BlePacket' is nested inside the higher level 'Packet' wrapper class
print packet.blePacket.payload
else:
print packet
else:
print '.' * len(packets)

def setupPipe():
"""setup pipe"""
# Create a named pipe for Wireshark capture
pipeFilePath = os.path.join(Logger.logFilePath, "ble.pipe")
if os.path.exists(pipeFilePath):
os.remove(pipeFilePath)

print "Pipe ready, run: wireshark -Y btle -k -i %s" % os.path.abspath(pipeFilePath)

myPipe = PcapPipe()
myPipe.open_and_init(pipeFilePath)

mySniffer.subscribe("NEW_BLE_PACKET", myPipe.newBlePacket)


if __name__ == '__main__':
Expand All @@ -124,6 +154,12 @@ def dumpPackets():
help="serial port location ('COM14', '/dev/tty.usbserial-DN009WNO', etc.)")

# Optional arguments:
argparser.add_argument("-p", "--pipe",
dest="pipe",
action="store_true",
default=False,
help="Pipe packets to wireshark")

argparser.add_argument("-l", "--logfile",
dest="logfile",
default=CaptureFiles.captureFilePath,
Expand All @@ -148,6 +184,11 @@ def dumpPackets():
# Parser the arguments passed in from the command-line
args = argparser.parse_args()

if args.pipe:
if not (platform.startswith('linux') or platform == "darwin"):
print "Pipes only available on MacOS and Linux"
sys.exit(-1)

# Display the libpcap logfile location
print "Capturing data to " + args.logfile
CaptureFiles.captureFilePath = args.logfile
Expand Down Expand Up @@ -195,22 +236,27 @@ def dumpPackets():
"%02X" % d.address[5])
# Make sure we actually followed the selected device (i.e. it's still available, etc.)
if d is not None:
if args.pipe:
setupPipe()

mySniffer.follow(d)

else:
print "ERROR: Could not find the selected device"

# Dump packets
while True:
dumpPackets()
time.sleep(1)
loop()

# Close gracefully
mySniffer.doExit()
if myPipe is not None:
myPipe.close()
sys.exit()

except (KeyboardInterrupt, ValueError, IndexError) as e:
# Close gracefully on CTRL+C
if 'KeyboardInterrupt' not in str(type(e)):
print "Caught exception:", e
mySniffer.doExit()
if myPipe is not None:
myPipe.close()
sys.exit(-1)