Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ cffi==1.12.2
ndeflib==0.3.3
pycparser==2.19
wheel==0.33.6
pytest-mock
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

setuptools.setup(
name="cuplcodec",
version="1.0.1",
version="2.0.2",
author="Malcolm Mackay",
author_email="malcolm@plotsensor.com",
description="Package for creating and decoding URLs that contain temperature and humidity samples.",
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def instr_sample_populated(instr_sample, request):
# Decode the URL
par = instr_sample.eepromba.get_url_parsedqs()
decodedurl = decode(secretkey=instr_sample.secretkey, statb64=par['x'][0], timeintb64=par['t'][0],
circb64=par['q'][0], ver=par['v'][0])
circb64=par['q'][0], vfmtb64=par['v'][0])

urllist = decodedurl.get_samples_list()
for d in urllist:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_cursorpos.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ def test_cursorpos(instr_sample, n):
# Decode the URL
par = instr_sample.eepromba.get_url_parsedqs()
decodedurl = decode(secretkey=instr_sample.secretkey, statb64=par['x'][0], timeintb64=par['t'][0],
circb64=par['q'][0], ver=par['v'][0])
circb64=par['q'][0], vfmtb64=par['v'][0])

assert encoder_cursorpos == decodedurl.endmarkerpos
2 changes: 1 addition & 1 deletion tests/test_minuteoffset.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def test_elapsedmins(instr_sample):
# Decode the URL
par = instr_sample.eepromba.get_url_parsedqs()
decodedurl = decode(secretkey=instr_sample.secretkey, statb64=par['x'][0], timeintb64=par['t'][0],
circb64=par['q'][0], ver=par['v'][0])
circb64=par['q'][0], vfmtb64=par['v'][0])

assert i == decodedurl.elapsedmins

Expand Down
2 changes: 1 addition & 1 deletion tests/test_ndef.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,4 @@ def test_decode_raises_indexerrror(blankurlqs):
timeintb64=timeintb64,
statb64=statb64,
circb64=circb64,
ver=ver)
vfmtb64=ver)
8 changes: 4 additions & 4 deletions tests/test_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def test_status(instr_sample, n=1):
# Decode the URL
par = instr_sample.eepromba.get_url_parsedqs()
decodedurl = decode(secretkey=instr_sample.secretkey, statb64=par['x'][0], timeintb64=par['t'][0],
circb64=par['q'][0], ver=par['v'][0])
circb64=par['q'][0], vfmtb64=par['v'][0])

assert encoder_cursorpos == decodedurl.endmarkerpos

Expand All @@ -40,7 +40,7 @@ def test_loopcount(instr_sample, n):

par = instr_sample.eepromba.get_url_parsedqs()
decodedurl = decode(secretkey=instr_sample.secretkey, statb64=par['x'][0], timeintb64=par['t'][0],
circb64=par['q'][0], ver=par['v'][0])
circb64=par['q'][0], vfmtb64=par['v'][0])

assert decodedurl.status.loopcount == n

Expand All @@ -57,7 +57,7 @@ def test_resetsalltime(resetsalltime):

par = instr_sample.eepromba.get_url_parsedqs()
decodedurl = decode(secretkey=instr_sample.secretkey, statb64=par['x'][0], timeintb64=par['t'][0],
circb64=par['q'][0], ver=par['v'][0])
circb64=par['q'][0], vfmtb64=par['v'][0])

assert decodedurl.status.resetsalltime == resetsalltime

Expand All @@ -80,7 +80,7 @@ def test_batteryvoltage(resetcause, batteryadc):

par = instr_sample.eepromba.get_url_parsedqs()
decodedurl = decode(secretkey=instr_sample.secretkey, statb64=par['x'][0], timeintb64=par['t'][0],
circb64=par['q'][0], ver=par['v'][0])
circb64=par['q'][0], vfmtb64=par['v'][0])

assert decodedurl.status.get_batvoltageraw() == batteryadc
assert decodedurl.status.get_resetcauseraw() == resetcause
121 changes: 114 additions & 7 deletions tests/test_trhsample.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import pytest
from wscodec.encoder.pyencoder.instrumented import InstrumentedSampleTRH, InstrumentedSample
from wscodec.decoder import decode
from wscodec.decoder.exceptions import DelimiterNotFoundError
from wscodec.decoder.exceptions import NoCircularBufferError, DelimiterNotFoundError, InvalidMajorVersionError, \
InvalidFormatError, MessageIntegrityError
from wscodec.decoder.status import SVSH_BIT

INPUT_SERIAL = 'abcdabcd'
Expand Down Expand Up @@ -57,7 +58,7 @@ def test_md5(n, usehmac):
# Decode the URL
par = instr_md5.eepromba.get_url_parsedqs()
decodedurl = decode(secretkey=INPUT_SECKEY, statb64=par['x'][0], timeintb64=par['t'][0],
circb64=par['q'][0], ver=par['v'][0], usehmac=usehmac)
circb64=par['q'][0], vfmtb64=par['v'][0], usehmac=usehmac)

urllist = decodedurl.get_samples_list()
for d in urllist:
Expand Down Expand Up @@ -85,12 +86,12 @@ def test_batteryvoltage():
# Decode the URL
par = instr_batv.eepromba.get_url_parsedqs()
decodedurl = decode(secretkey="", statb64=par['x'][0], timeintb64=par['t'][0],
circb64=par['q'][0], ver=par['v'][0], usehmac=False)
circb64=par['q'][0], vfmtb64=par['v'][0], usehmac=False)

assert decodedurl.status.get_batvoltageraw() == testbatv


def test_errorcondition():
def test_error_nocircularbuffer():
instr = InstrumentedSample(baseurl=INPUT_BASEURL,
serial=INPUT_SERIAL,
secretkey="",
Expand All @@ -99,13 +100,119 @@ def test_errorcondition():
)

resetcause = SVSH_BIT
instr.ffimodule.lib.enc_init(resetcause, True)
instr.ffimodule.lib.enc_init(resetcause, True, 0)

par = instr.eepromba.get_url_parsedqs()

with pytest.raises(DelimiterNotFoundError) as excinfo:
with pytest.raises(NoCircularBufferError) as excinfo:
# Attempt to decode the parameters
decodedurl = decode(secretkey="", statb64=par['x'][0], timeintb64=par['t'][0],
circb64="", ver=par['v'][0], usehmac=False)
circb64="", vfmtb64=par['v'][0], usehmac=False)

assert excinfo.value.status.resetcause['supervisor'] == True


def test_error_delimiternotfound():
SECRETKEY = "12345678ABCDEFGH"
instr = InstrumentedSampleTRH(baseurl=INPUT_BASEURL,
serial=INPUT_SERIAL,
secretkey=SECRETKEY,
smplintervalmins=INPUT_TIMEINT,
usehmac=True,
)

instr.ffimodule.lib.enc_init(0, False, 0)

par = instr.eepromba.get_url_parsedqs()

with pytest.raises(DelimiterNotFoundError) as excinfo:
# Attempt to decode the parameters
decodedurl = decode(secretkey=SECRETKEY, statb64=par['x'][0], timeintb64=par['t'][0],
circb64=par['q'][0], vfmtb64=par['v'][0], usehmac=True)


def test_error_versionmismatch(mocker):
instr = InstrumentedSample(baseurl=INPUT_BASEURL,
serial=INPUT_SERIAL,
secretkey="",
smplintervalmins=INPUT_TIMEINT,
usehmac=False,
)

BAD_VERSION = 65500

def mock_version():
return BAD_VERSION

instr.ffimodule.lib.enc_init(0, False, 0)

par = instr.eepromba.get_url_parsedqs()

mocker.patch('wscodec.decoder.decoderfactory._get_decoderversion', mock_version)

with pytest.raises(InvalidMajorVersionError) as excinfo:
# Attempt to decode the parameters
decodedurl = decode(secretkey="", statb64=par['x'][0], timeintb64=par['t'][0],
circb64="", vfmtb64=par['v'][0], usehmac=False)

assert excinfo.value.decoderversion == BAD_VERSION


def test_error_formatmismatch():
instr = InstrumentedSample(baseurl=INPUT_BASEURL,
serial=INPUT_SERIAL,
secretkey="",
smplintervalmins=INPUT_TIMEINT,
usehmac=False,
)

BAD_FORMAT = 254
instr.ffimodule.lib.nv.format = bytes([BAD_FORMAT])
instr.ffimodule.lib.enc_init(0, False, 0)

par = instr.eepromba.get_url_parsedqs()

with pytest.raises(InvalidFormatError) as excinfo:
# Attempt to decode the parameters
decodedurl = decode(secretkey="", statb64=par['x'][0], timeintb64=par['t'][0],
circb64=par['q'][0], vfmtb64=par['v'][0], usehmac=False)

assert excinfo.value.circformat == BAD_FORMAT


def test_error_messageintegrity():
instr = InstrumentedSampleTRH(baseurl=INPUT_BASEURL,
serial=INPUT_SERIAL,
secretkey="12345678ABCDEFGH",
smplintervalmins=INPUT_TIMEINT,
usehmac=True,
)

instr.ffimodule.lib.enc_init(0, False, 0)
instr.pushsamples(100)

par = instr.eepromba.get_url_parsedqs()

with pytest.raises(MessageIntegrityError) as excinfo:
# Attempt to decode the parameters
decodedurl = decode(secretkey="notthekey", statb64=par['x'][0], timeintb64=par['t'][0],
circb64=par['q'][0], vfmtb64=par['v'][0], usehmac=True)

def test_error_messageintegrity_wrongalgorithm():
SECRETKEY = "12345678ABCDEFGH"
instr = InstrumentedSampleTRH(baseurl=INPUT_BASEURL,
serial=INPUT_SERIAL,
secretkey=SECRETKEY,
smplintervalmins=INPUT_TIMEINT,
usehmac=True,
)

instr.ffimodule.lib.enc_init(0, False, 0)
instr.pushsamples(100)

par = instr.eepromba.get_url_parsedqs()

with pytest.raises(MessageIntegrityError) as excinfo:
# Attempt to decode the parameters
decodedurl = decode(secretkey=SECRETKEY, statb64=par['x'][0], timeintb64=par['t'][0],
circb64=par['q'][0], vfmtb64=par['v'][0], usehmac=False)
2 changes: 1 addition & 1 deletion tests/test_tsample.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def test_md5(instr_sample, n):
# Decode the URL
par = instr_md5.eepromba.get_url_parsedqs()
decodedurl = decode(secretkey="", statb64=par['x'][0], timeintb64=par['t'][0],
circb64=par['q'][0], ver=par['v'][0], usehmac=False)
circb64=par['q'][0], vfmtb64=par['v'][0], usehmac=False)

urllist = decodedurl.get_samples_list()
for d in urllist:
Expand Down
5 changes: 4 additions & 1 deletion wscodec/decoder/circularbuffer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from .b64decode import B64Decoder
from .status import Status
from .exceptions import DelimiterNotFoundError
from .exceptions import DelimiterNotFoundError, NoCircularBufferError
from struct import unpack


Expand Down Expand Up @@ -58,6 +58,9 @@ def _linearise(self):
either side of the end stop.
"""

if len(self.circb64) == 0:
raise NoCircularBufferError(self.status)

# Split query string at the end of the endstop marker.
splitend = self.circb64.split(self.ENDSTOP_BYTE)

Expand Down
36 changes: 27 additions & 9 deletions wscodec/decoder/decoderfactory.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from datetime import datetime
import pkg_resources
from .samples import SamplesURL
from .exceptions import InvalidMajorVersionError, InvalidCircFormatError
from .b64decode import B64Decoder
from .exceptions import InvalidMajorVersionError, InvalidFormatError
from . import hdc2021


def decode(secretkey: str,
statb64: str,
timeintb64: str,
circb64: str,
ver: str,
vfmtb64: str,
usehmac: bool = True,
scantimestamp: datetime = None) -> SamplesURL:
"""
Expand All @@ -30,8 +32,8 @@ def decode(secretkey: str,
circb64: str
Value of the URL parameter that contains the circular buffer of base64 encoded samples.

ver: str
Value of the URL parameter that contains the version string (base64 encoded).
vfmtb64: str
Value of the URL parameter that contains the version and format string (base64 encoded).

usehmac: bool
True if the hash inside the circular buffer endstop is HMAC-MD5. False if it is MD5.
Expand All @@ -45,16 +47,32 @@ def decode(secretkey: str,
An object containing a list of timestamped environmental sensor samples.

"""
majorversion = int(ver[-2:-1])
formatcode = int(ver[-1:])

if majorversion != 1:
raise InvalidMajorVersionError
encodermajorversion, formatcode = _get_encoderversion(vfmtb64)
decodermajorversion = _get_decoderversion()

if encodermajorversion != decodermajorversion:
raise InvalidMajorVersionError(encodermajorversion, decodermajorversion)

decoder = _get_decoder(formatcode)(statb64=statb64, timeintb64=timeintb64, circb64=circb64, usehmac=usehmac, secretkey=secretkey, scantimestamp=scantimestamp)
return decoder


def _get_encoderversion(vfmtb64):
vfmtb64 = vfmtb64[-4:]
vfmtbytes = B64Decoder.b64decode(vfmtb64)
encoderbytes = vfmtbytes[0:2]
encoderversion = int.from_bytes(encoderbytes, byteorder='big')
formatcode = vfmtbytes[2]
return encoderversion, formatcode


def _get_decoderversion():
decoderversionstr = pkg_resources.require("cuplcodec")[0].version
decodermajorversion = int(decoderversionstr.split('.', 1)[0])
return decodermajorversion


def _get_decoder(formatcode: int):
"""
Parameters
Expand All @@ -74,7 +92,7 @@ def _get_decoder(formatcode: int):
try:
decoder = decoders[formatcode]
except KeyError:
raise InvalidCircFormatError(formatcode)
raise InvalidFormatError(formatcode)

return decoder

Expand Down
Loading