Skip to content
This repository has been archived by the owner on Sep 20, 2022. It is now read-only.

Commit

Permalink
Merge pull request #82 from miri64/scripts/enh/black
Browse files Browse the repository at this point in the history
sniffer / spectrum-scanner: reformat using black
  • Loading branch information
chrysn committed Sep 20, 2022
2 parents 4de0fce + 42c868a commit afdacca
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 54 deletions.
82 changes: 46 additions & 36 deletions sniffer/tools/sniffer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
"""
(C) 2012, Mariano Alvira <mar@devl.org>
(C) 2014, Oliver Hahm <oliver.hahm@inria.fr>
(C) 2015, Hauke Petersen <hauke.petersen@fu-berlin.de>
Expand Down Expand Up @@ -30,7 +30,7 @@
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
SUCH DAMAGE.
'''
"""

from __future__ import print_function
import argparse
Expand All @@ -42,70 +42,69 @@
from serial import Serial

# PCAP setup
MAGIC = 0xa1b2c3d4
MAGIC = 0xA1B2C3D4
MAJOR = 2
MINOR = 4
ZONE = 0
SIG = 0
SNAPLEN = 0xffff
NETWORK = 230 # 802.15.4 no FCS
SNAPLEN = 0xFFFF
NETWORK = 230 # 802.15.4 no FCS

DEFAULT_BAUDRATE = 115200


def configure_interface(port, channel):
line = ""
iface = 0
port.write('ifconfig\n'.encode())
port.write("ifconfig\n".encode())
while True:
line = port.readline()
if line == b'':
print("Application has no network interface defined",
file=sys.stderr)
if line == b"":
print("Application has no network interface defined", file=sys.stderr)
sys.exit(2)
match = re.search(r'^Iface +(\d+)', line.decode(errors="ignore"))
match = re.search(r"^Iface +(\d+)", line.decode(errors="ignore"))
if match is not None:
iface = int(match.group(1))
break

# set channel, raw mode, and promiscuous mode
print('ifconfig %d set chan %d' % (iface, channel), file=sys.stderr)
print('ifconfig %d raw' % iface, file=sys.stderr)
print('ifconfig %d promisc' % iface, file=sys.stderr)
port.write(('ifconfig %d set chan %d\n' % (iface, channel)).encode())
port.write(('ifconfig %d raw\n' % iface).encode())
port.write(('ifconfig %d promisc\n' % iface).encode())
print("ifconfig %d set chan %d" % (iface, channel), file=sys.stderr)
print("ifconfig %d raw" % iface, file=sys.stderr)
print("ifconfig %d promisc" % iface, file=sys.stderr)
port.write(("ifconfig %d set chan %d\n" % (iface, channel)).encode())
port.write(("ifconfig %d raw\n" % iface).encode())
port.write(("ifconfig %d promisc\n" % iface).encode())


def generate_pcap(port, out):
# count incoming packets
count = 0
# output overall PCAP header
out.write(pack('<LHHLLLL', MAGIC, MAJOR, MINOR, ZONE, SIG, SNAPLEN,
NETWORK))
out.write(pack("<LHHLLLL", MAGIC, MAJOR, MINOR, ZONE, SIG, SNAPLEN, NETWORK))
sys.stderr.write("RX: %i\r" % count)
while True:
line = port.readline().rstrip()

pkt_header = re.match(r">? *rftest-rx --- len (\w+).*",
line.decode(errors="ignore"))
pkt_header = re.match(
r">? *rftest-rx --- len (\w+).*", line.decode(errors="ignore")
)
if pkt_header:
now = time()
sec = int(now)
usec = int((now - sec) * 1000000)
length = int(pkt_header.group(1), 16)
out.write(pack('<LLLL', sec, usec, length, length))
out.write(pack("<LLLL", sec, usec, length, length))
out.flush()
count += 1
sys.stderr.write("RX: %i\r" % count)
continue

pkt_data = re.match(r"(\w\w )+", line.decode(errors="ignore"))
if pkt_data:
for part in line.decode(errors="ignore").split(' '):
for part in line.decode(errors="ignore").split(" "):
byte = re.match(r"(\w\w)", part)
if byte:
out.write(pack('<B', int(byte.group(1), 16)))
out.write(pack("<B", int(byte.group(1), 16)))
out.flush()


Expand All @@ -114,15 +113,14 @@ def connect(args):
if args.conn.startswith("/dev/tty") or args.conn.startswith("COM"):
# open serial port
try:
conn = Serial(args.conn, args.baudrate, dsrdtr=0, rtscts=0,
timeout=1)
conn = Serial(args.conn, args.baudrate, dsrdtr=0, rtscts=0, timeout=1)
except IOError:
print("error opening serial port %s" % args.conn, file=sys.stderr)
sys.exit(2)
else:
try:
port = args.conn.split(":")[-1]
host = args.conn[:-(len(port)+1)]
host = args.conn[: -(len(port) + 1)]
port = int(port)
except (IndexError, ValueError):
print("Can't parse host:port pair %s" % args.conn, file=sys.stderr)
Expand All @@ -143,17 +141,29 @@ def main():
else:
default_outfile = sys.stdout
p = argparse.ArgumentParser()
p.add_argument("-b", "--baudrate", type=int, default=DEFAULT_BAUDRATE,
help="Baudrate of the serial port (only evaluated "
"for non TCP-terminal, default: %d)" %
DEFAULT_BAUDRATE)
p.add_argument("conn", metavar="tty/host:port", type=str,
help="Serial port or TCP (host, port) tuple to "
"terminal with sniffer application")
p.add_argument(
"-b",
"--baudrate",
type=int,
default=DEFAULT_BAUDRATE,
help="Baudrate of the serial port (only evaluated "
"for non TCP-terminal, default: %d)" % DEFAULT_BAUDRATE,
)
p.add_argument(
"conn",
metavar="tty/host:port",
type=str,
help="Serial port or TCP (host, port) tuple to "
"terminal with sniffer application",
)
p.add_argument("channel", type=int, help="Channel to sniff on")
p.add_argument("outfile", type=argparse.FileType("w+b"),
default=default_outfile, nargs="?",
help="PCAP file to output to (default: stdout)")
p.add_argument(
"outfile",
type=argparse.FileType("w+b"),
default=default_outfile,
nargs="?",
help="PCAP file to output to (default: stdout)",
)
args = p.parse_args()

conn = connect(args)
Expand Down
56 changes: 38 additions & 18 deletions spectrum-scanner/tools/plot_rssi.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@


class SpectrumEmitter(object):

def __init__(self, port):
self.port = port

Expand All @@ -35,7 +34,10 @@ def data_gen(self):
while True:
# Read one line from the spectrum device
line = self.port.readline().rstrip()
pkt_data = re.match(r"\[([-+]?\d+),\s*([-+]?\d+),\s*([-+]?\d+)\]\s*(.*)", line.decode(errors='replace'))
pkt_data = re.match(
r"\[([-+]?\d+),\s*([-+]?\d+),\s*([-+]?\d+)\]\s*(.*)",
line.decode(errors="replace"),
)
if pkt_data:
ed = {}
try:
Expand All @@ -60,21 +62,24 @@ def data_gen(self):


class RSSIPlot(object):

def __init__(self, ax, *args, tlen=120, dt=0.5, nchannels=27):
self.ax = ax
self.count = 0
self.dt = dt
self.tlen = tlen
# Generate mesh for plotting, this creates a grid of nchannel rows and
# (tlen / dt) columns
self.Y, self.X = np.mgrid[slice(0 - .5, nchannels + 0.5, 1),
slice(-self.tlen - self.dt / 2, 0 + 1 - self.dt / 2, self.dt)]
self.Y, self.X = np.mgrid[
slice(0 - 0.5, nchannels + 0.5, 1),
slice(-self.tlen - self.dt / 2, 0 + 1 - self.dt / 2, self.dt),
]
Z = np.zeros_like(self.X)
# X and Y are the bounds, so Z should be the value *inside* those bounds.
# Therefore, remove the last row and column from the Z array.
self.Z = Z[:-1, :-1]
self.pcm = self.ax.pcolormesh(self.X, self.Y, self.Z, vmin=-100, vmax=-20, cmap=plt.cm.get_cmap('jet'))
self.pcm = self.ax.pcolormesh(
self.X, self.Y, self.Z, vmin=-100, vmax=-20, cmap=plt.cm.get_cmap("jet")
)
self.ax.get_figure().colorbar(self.pcm, label="Measured signal level [dB]")
self.ax.set_ylabel("Channel number")
self.ax.set_xlabel("Time [s]")
Expand All @@ -95,29 +100,42 @@ def update(self, ed):
col[ch, 0] = ed[ch]
self.Z = np.hstack((self.Z[:, 1:], col))
if resize:
self.ax.set_ylim([self.ch_min - .5, self.ch_max + 0.5])
self.ax.set_ylim([self.ch_min - 0.5, self.ch_max + 0.5])
self.ax.set_yticks(range(self.ch_min, self.ch_max + 1))
self.pcm.set_array(self.Z.ravel())
return self.pcm,
return (self.pcm,)


def main(argv):
loglevels = [logging.CRITICAL, logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG]
loglevels = [
logging.CRITICAL,
logging.ERROR,
logging.WARN,
logging.INFO,
logging.DEBUG,
]
parser = argparse.ArgumentParser(argv)
parser.add_argument('-v', '--verbosity', type=int, default=4,
help='set logging verbosity, 1=CRITICAL, 5=DEBUG')
parser.add_argument('tty',
help='Serial port device file name')
parser.add_argument('-b', '--baudrate', default=115200, type=int,
help='Serial port baudrate')
parser.add_argument(
"-v",
"--verbosity",
type=int,
default=4,
help="set logging verbosity, 1=CRITICAL, 5=DEBUG",
)
parser.add_argument("tty", help="Serial port device file name")
parser.add_argument(
"-b", "--baudrate", default=115200, type=int, help="Serial port baudrate"
)
args = parser.parse_args()
# logging setup
logging.basicConfig(level=loglevels[args.verbosity-1])
logging.basicConfig(level=loglevels[args.verbosity - 1])

# open serial port
try:
logging.debug("Open serial port %s, baud=%d", args.tty, args.baudrate)
port = serial.Serial(port=args.tty, baudrate=9600, dsrdtr=0, rtscts=0, timeout=0.3)
port = serial.Serial(
port=args.tty, baudrate=9600, dsrdtr=0, rtscts=0, timeout=0.3
)
# This baudrate reconfiguration is necessary for certain USB to serial
# adapters, the Linux cdc_acm driver will keep repeating stale buffer
# contents otherwise. No idea about the cause, but this fixes the symptom.
Expand All @@ -131,7 +149,9 @@ def main(argv):
fig, ax = plt.subplots()
graph = RSSIPlot(ax)
emitter = SpectrumEmitter(port)
animation.FuncAnimation(fig, graph.update, emitter.data_gen, interval=10, blit=True)
animation.FuncAnimation(
fig, graph.update, emitter.data_gen, interval=10, blit=True
)
plt.show()
except KeyboardInterrupt:
port.close()
Expand Down

0 comments on commit afdacca

Please sign in to comment.