Skip to content

Commit

Permalink
VEDirect driver: Add software tests for SBC/CPython and MPY platforms
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Sep 17, 2021
1 parent 6cb0d5d commit 013ce38
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 5 deletions.
1 change: 1 addition & 0 deletions requirements-test.txt
Expand Up @@ -17,3 +17,4 @@ pytest-httpserver==0.3.4
paho-mqtt==1.5.0

freezegun==1.1.0
dummyserial==1.0.0
2 changes: 1 addition & 1 deletion src/lib/terkin/driver/vedirect_sensor.py
Expand Up @@ -58,7 +58,7 @@ def __init__(self, settings=None):
self.driver = None

def start(self):
log.info('Initializing sensor "Victron Energy VE.Direct"')
log.info('Initializing sensor "Victron Energy VE.Direct" on "{}"'.format(self.device))

# Initialize the hardware driver.
try:
Expand Down
3 changes: 3 additions & 0 deletions test/fixtures/__init__.py
Expand Up @@ -15,3 +15,6 @@

# Fixture to emulate Pycom's "network.LoRa" API.
from .lora import network_lora

# Fixture to emulate a Linux serial port.
from .serial import fake_serial
18 changes: 18 additions & 0 deletions test/fixtures/serial.py
@@ -0,0 +1,18 @@
# https://github.com/nznobody/vedirect/blob/master/tests/conftest.py
from pytest import fixture
from unittest.mock import patch, Mock
import dummyserial
import logging
from serial import SerialBase


@fixture(scope="function")
def fake_serial():
dummy = dummyserial.Serial(port="COM50", baudrate=9600)
dummy.flushInput = Mock()
dummy.reset_input_buffer = Mock()
dummy._logger.setLevel(logging.INFO)
with patch("serial.Serial", spec=SerialBase) as mock:
# All possible parameters
mock.return_value = dummy
yield dummy
16 changes: 16 additions & 0 deletions test/settings/sensors_vedirect_mpy.py
@@ -0,0 +1,16 @@
"""Datalogger configuration"""

# Sensor configuration.
sensors = {
'environment': [
{
'id': 'vedirect-mpy-1',
'description': 'Victron Energy SmartSolar Charge Controller MPPT 75|15',
'type': 'vedirect',
'enabled': True,
# 'bus': 'serial:0',
'device': '1',
'port': '1',
},
],
}
15 changes: 15 additions & 0 deletions test/settings/sensors_vedirect_sbc.py
@@ -0,0 +1,15 @@
"""Datalogger configuration"""

# Sensor configuration.
sensors = {
'environment': [
{
'id': 'vedirect-sbc-1',
'description': 'Victron Energy SmartSolar Charge Controller MPPT 75|15',
'type': 'vedirect',
'enabled': True,
# 'bus': 'serial:0',
'device': '/dev/ttysdummy042',
},
],
}
115 changes: 115 additions & 0 deletions test/test_sensors_vedirect.py
@@ -0,0 +1,115 @@
# -*- coding: utf-8 -*-
# (c) 2020 Richard Pobering <richard@hiveeyes.org>
# (c) 2020-2021 Andreas Motl <andreas@hiveeyes.org>
# (c) 2021 Manu Lange <Manu.Lange@plantandfood.co.nz>
# License: GNU General Public License, Version 3
import sys
import types
import mock
import pytest
from test.util.terkin import invoke_datalogger_raspberrypi, invoke_datalogger_pycom

# https://github.com/nznobody/vedirect/blob/901ed6e/tests/vedirect_device_emulator.py#L61-L69
MPPT_DATA = '\r\nV\t12800\r\nVPV\t3350\r\nPPV\t130\r\nI\t15000\r\nIL\t1500\r\nLOAD\tON\r\nRelay\tOFF\r\nH19\t456\r\nH20\t45\r\nH21\t300\r\nH22\t45\r\nH23\t350\r\nERR\t0\r\nCS\t5\r\nFW\t1.19\r\nPID\t0xA042\r\nSER#\tHQ141112345\r\nHSDS\t0\r\nMPPT\t2\r\nChecksum\t\x99'


@pytest.mark.sensors
@pytest.mark.sbc
@mock.patch('adafruit_blinka.agnostic.board_id', "RASPBERRY_PI_4B")
@mock.patch('adafruit_blinka.agnostic.chip_id', "BCM2XXX")
@mock.patch('adafruit_platformdetect.board.Board.any_raspberry_pi_40_pin', True)
@mock.patch('adafruit_platformdetect.board.Board.any_embedded_linux', True)
def test_sensors_vedirect_sbc(mocker, caplog, fake_serial):
"""
Check the whole sensor machinery.
"""

fake_serial._waiting_data = MPPT_DATA

# Acquire settings.
from test.settings import sensors_vedirect_sbc as sensor_settings

# Invoke datalogger for a single duty cycle.
datalogger = invoke_datalogger_raspberrypi(caplog, settings=sensor_settings)

# Capture log output.
captured = caplog.text

# Proof it works by verifying log output.
assert 'Initializing sensor "Victron Energy VE.Direct" on "/dev/ttysdummy042"' in captured, captured

# Get hold of the last reading.
last_reading = datalogger.storage.last_reading

# Proof it works by verifying last sensor readings.
verify_reading(last_reading)


@pytest.mark.sensors
@pytest.mark.esp32
@mock.patch('sys.platform', 'esp32')
@mock.patch('sys.implementation', types.SimpleNamespace(_multiarch='micropython', name='micropython', cache_tag='micropython-1.14', version=sys.version_info))
def test_sensors_vedirect_mpy(caplog):

import vedirect
vedirect.vedirect.MICROPYTHON = True

# Acquire settings.
from test.settings import sensors_vedirect_mpy as sensor_settings

# Invoke datalogger for a single duty cycle.
datalogger = invoke_datalogger_pycom(caplog, settings=sensor_settings, after_setup=setup_dummy_uart)

# Capture log output.
captured = caplog.text

# Proof it works by verifying log output.
assert 'Initializing sensor "Victron Energy VE.Direct" on "1"' in captured, captured

# Get hold of the last reading.
last_reading = datalogger.storage.last_reading

# Proof it works by verifying measurement values.
verify_reading(last_reading)


def verify_reading(last_reading):
assert last_reading['vedirect-0xA042:CS'] == 5
assert last_reading['vedirect-0xA042:ERR'] == 0
assert last_reading['vedirect-0xA042:FW'] == "1.19"
assert last_reading['vedirect-0xA042:H19'] == 456
assert last_reading['vedirect-0xA042:H20'] == 45
assert last_reading['vedirect-0xA042:H21'] == 300
assert last_reading['vedirect-0xA042:H22'] == 45
assert last_reading['vedirect-0xA042:H23'] == 350
assert last_reading['vedirect-0xA042:HSDS'] == 0
assert last_reading['vedirect-0xA042:I'] == 15000
assert last_reading['vedirect-0xA042:IL'] == 1500
assert last_reading['vedirect-0xA042:LOAD'] == "ON"
assert last_reading['vedirect-0xA042:MPPT'] == 2
assert last_reading['vedirect-0xA042:PID'] == "0xA042"
assert last_reading['vedirect-0xA042:PPV'] == 130
assert last_reading['vedirect-0xA042:Relay'] == "OFF"
assert last_reading['vedirect-0xA042:SER#'] == "HQ141112345"
assert last_reading['vedirect-0xA042:V'] == 12800
assert last_reading['vedirect-0xA042:VPV'] == 3350


def setup_dummy_uart(datalogger):
"""
Patch UART interface to use a dummy one instead of a mocked one.
"""
sens = datalogger.sensor_manager.get_sensor_by_id("vedirect-mpy-1")
if sens:
class DummyUART:

def __init__(self):
self.position = 0
self.data = MPPT_DATA

def read(self, count):
char = MPPT_DATA[self.position]
self.position += 1
return bytes(char, "latin1")

sens.driver.ser = DummyUART()
12 changes: 8 additions & 4 deletions test/util/terkin.py
Expand Up @@ -30,8 +30,8 @@ def invoke_umal():
return bootloader


def invoke_datalogger_pycom(caplog, settings):
return invoke_datalogger(caplog, settings, pycom=True)
def invoke_datalogger_pycom(caplog, settings, after_setup=None):
return invoke_datalogger(caplog, settings, pycom=True, after_setup=after_setup)


def invoke_datalogger_raspberrypi(caplog, settings):
Expand All @@ -50,10 +50,10 @@ def invoke_datalogger_raspberrypi(caplog, settings):
return invoke_datalogger(caplog, settings, raspberrypi=True)


def invoke_datalogger(caplog, settings, pycom=False, raspberrypi=False):
def invoke_datalogger(caplog, settings, pycom=False, raspberrypi=False, after_setup=None):

# Use a fake filesystem.
with FakeFS():
with FakeFS(additional_skip_names=["serial", "serial.serialutil", "serial.serialposix"]):

# Pycom mounts the main filesystem at "/flash".
if pycom:
Expand Down Expand Up @@ -90,6 +90,10 @@ def invoke_datalogger(caplog, settings, pycom=False, raspberrypi=False):
from terkin.datalogger import TerkinDatalogger
datalogger = TerkinDatalogger(settings, platform_info=bootloader.platform_info)
datalogger.setup()

if callable(after_setup):
after_setup(datalogger=datalogger)

datalogger.duty_cycle()

return datalogger

0 comments on commit 013ce38

Please sign in to comment.