Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MicroPython support for VEDirect driver #95

Merged
merged 6 commits into from
Sep 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements-cpython.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ adafruit-circuitpython-bmp280==3.2.1
adafruit-circuitpython-ina219==3.4.2

# Victron Energy VE.Direct text protocol driver.
git+https://github.com/karioja/vedirect@f74c0f2
https://github.com/nznobody/vedirect/tarball/345a688
1 change: 1 addition & 0 deletions requirements-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ pytest-httpserver==0.3.4
paho-mqtt==1.5.0

freezegun==1.1.0
dummyserial==1.0.0
12 changes: 8 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@
'adafruit-circuitpython-bmp280==3.2.1',
'adafruit-circuitpython-ina219==3.4.2',


# Victron Energy VE.Direct text protocol driver.
#'git+https://github.com/karioja/vedirect@f74c0f2',

]

extras = {
Expand All @@ -63,6 +59,13 @@
# Required for LoRaWAN.
'pycryptodome==3.10.1',
],

# Victron Energy VE.Direct text protocol driver.
# Unless the version at https://github.com/nznobody/vedirect has been published on PyPI,
# it can't be part of the vanilla `install_requires` section.
'vedirect': [
'vedirect==2.0.0',
],
}

setup(name='terkin',
Expand Down Expand Up @@ -126,6 +129,7 @@
extras_require=extras,
#tests_require=extras['test'],
dependency_links=[
"https://github.com/nznobody/vedirect/tarball/345a688#egg=vedirect-2.0.0"
],
entry_points={
'console_scripts': [
Expand Down
55 changes: 40 additions & 15 deletions src/lib/terkin/driver/vedirect_sensor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# (c) 2020 Jan Hoffmann <jan.hoffmann@bergamsee.de>
# (c) 2020 Andreas Motl <andreas.motl@terkin.org>
# (c) 2021 Manu Lange <Manu.Lange@plantandfood.co.nz>
# License: GNU General Public License, Version 3
from terkin import logging
from terkin.sensor import SensorManager, AbstractSensor
Expand Down Expand Up @@ -39,6 +40,7 @@ class VEDirectSensor(AbstractSensor):
Resources
=========
- https://github.com/karioja/vedirect
- https://github.com/nznobody/vedirect
- https://www.victronenergy.com/solar-charge-controllers/smartsolar-mppt-75-10-75-15-100-15-100-20
- https://www.victronenergy.com/solar-charge-controllers/bluesolar-mppt-150-35
- https://www.victronenergy.com/battery-monitors/bmv-700
Expand All @@ -51,47 +53,70 @@ def __init__(self, settings=None):
super().__init__(settings=settings)

# Can be overwritten by ``.set_address()``.
self.device = settings['device']
self.device = settings["device"]
self.timeout = 5
self.driver = None

def start(self):
log.info('Initializing sensor "Victron Energy VE.Direct"')
log.info('Initializing sensor "Victron Energy VE.Direct" on "{}"'.format(self.device))

# Initialize the hardware driver.
try:

# MicroPython
if platform_info.vendor in [platform_info.MICROPYTHON.Vanilla, platform_info.MICROPYTHON.Pycom]:
raise NotImplementedError('VEDirect driver not implemented on MicroPython')
if platform_info.vendor in [
platform_info.MICROPYTHON.Vanilla,
platform_info.MICROPYTHON.Pycom,
]:
try:
from vedirect import VEDirect

uart = int(self.device)
self.driver = VEDirect(serialport=uart, timeout=self.timeout)
except Exception as e:
log.exc(
e,
"Could not start VEDirect interface on device: {}".format(
self.device
),
)
raise e

# CPython
elif platform_info.vendor == platform_info.MICROPYTHON.RaspberryPi:
from vedirect import Vedirect
self.driver = Vedirect(serialport=self.device, timeout=self.timeout)
from vedirect import VEDirect

self.driver = VEDirect(serialport=self.device, timeout=self.timeout)

else:
raise NotImplementedError('VEDirect driver not implemented on this platform')
raise NotImplementedError(
"VEDirect driver not implemented on this platform"
)

return True

except Exception as ex:
log.exc(ex, 'VEDirect hardware driver failed')
log.exc(ex, "VEDirect hardware driver failed")

def read(self):
if not self.driver:
log.error("VEDirect interface not initialised")
return
log.info('Reading sensor "Victron Energy VE.Direct"')

# MicroPython
if platform_info.vendor in [platform_info.MICROPYTHON.Vanilla, platform_info.MICROPYTHON.Pycom]:
raise NotImplementedError('VEDirect driver not implemented on MicroPython')
# Read raw data from sensor.
data_raw = self.driver.read_data_single()

# CPython
elif platform_info.vendor == platform_info.MICROPYTHON.RaspberryPi:
data_raw = self.driver.read_data_single()
# Compute key fragment based on information from data packet.
if "PID" in data_raw:
product_id = str(data_raw["PID"])
else:
product_id = "unknown-pid"

# Aggregate measurement values.
data = {}
for key, value in data_raw.items():
key = 'vedirect:{}'.format(key)
key = "vedirect-{}:{}".format(product_id, key)
data[key] = value

return data
3 changes: 3 additions & 0 deletions test/fixtures/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,6 @@

# Fixture to emulate Pycom's "network.LoRa" API.
from .lora import network_lora

# Fixture to emulate a Linux serial port.
from .serial import fake_serial
18 changes: 18 additions & 0 deletions test/fixtures/serial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# https://github.com/nznobody/vedirect/blob/master/tests/conftest.py
from pytest import fixture
from unittest.mock import patch, Mock
import dummyserial
import logging
from serial import SerialBase


@fixture(scope="function")
def fake_serial():
dummy = dummyserial.Serial(port="COM50", baudrate=9600)
dummy.flushInput = Mock()
dummy.reset_input_buffer = Mock()
dummy._logger.setLevel(logging.INFO)
with patch("serial.Serial", spec=SerialBase) as mock:
# All possible parameters
mock.return_value = dummy
yield dummy
16 changes: 16 additions & 0 deletions test/settings/sensors_vedirect_mpy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""Datalogger configuration"""

# Sensor configuration.
sensors = {
'environment': [
{
'id': 'vedirect-mpy-1',
'description': 'Victron Energy SmartSolar Charge Controller MPPT 75|15',
'type': 'vedirect',
'enabled': True,
# 'bus': 'serial:0',
'device': '1',
'port': '1',
},
],
}
15 changes: 15 additions & 0 deletions test/settings/sensors_vedirect_sbc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""Datalogger configuration"""

# Sensor configuration.
sensors = {
'environment': [
{
'id': 'vedirect-sbc-1',
'description': 'Victron Energy SmartSolar Charge Controller MPPT 75|15',
'type': 'vedirect',
'enabled': True,
# 'bus': 'serial:0',
'device': '/dev/ttysdummy042',
},
],
}
115 changes: 115 additions & 0 deletions test/test_sensors_vedirect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# -*- coding: utf-8 -*-
# (c) 2020 Richard Pobering <richard@hiveeyes.org>
# (c) 2020-2021 Andreas Motl <andreas@hiveeyes.org>
# (c) 2021 Manu Lange <Manu.Lange@plantandfood.co.nz>
# License: GNU General Public License, Version 3
import sys
import types
import mock
import pytest
from test.util.terkin import invoke_datalogger_raspberrypi, invoke_datalogger_pycom

# https://github.com/nznobody/vedirect/blob/901ed6e/tests/vedirect_device_emulator.py#L61-L69
MPPT_DATA = '\r\nV\t12800\r\nVPV\t3350\r\nPPV\t130\r\nI\t15000\r\nIL\t1500\r\nLOAD\tON\r\nRelay\tOFF\r\nH19\t456\r\nH20\t45\r\nH21\t300\r\nH22\t45\r\nH23\t350\r\nERR\t0\r\nCS\t5\r\nFW\t1.19\r\nPID\t0xA042\r\nSER#\tHQ141112345\r\nHSDS\t0\r\nMPPT\t2\r\nChecksum\t\x99'


@pytest.mark.sensors
@pytest.mark.sbc
@mock.patch('adafruit_blinka.agnostic.board_id', "RASPBERRY_PI_4B")
@mock.patch('adafruit_blinka.agnostic.chip_id', "BCM2XXX")
@mock.patch('adafruit_platformdetect.board.Board.any_raspberry_pi_40_pin', True)
@mock.patch('adafruit_platformdetect.board.Board.any_embedded_linux', True)
def test_sensors_vedirect_sbc(mocker, caplog, fake_serial):
"""
Check the whole sensor machinery.
"""

fake_serial._waiting_data = MPPT_DATA

# Acquire settings.
from test.settings import sensors_vedirect_sbc as sensor_settings

# Invoke datalogger for a single duty cycle.
datalogger = invoke_datalogger_raspberrypi(caplog, settings=sensor_settings)

# Capture log output.
captured = caplog.text

# Proof it works by verifying log output.
assert 'Initializing sensor "Victron Energy VE.Direct" on "/dev/ttysdummy042"' in captured, captured

# Get hold of the last reading.
last_reading = datalogger.storage.last_reading

# Proof it works by verifying last sensor readings.
verify_reading(last_reading)


@pytest.mark.sensors
@pytest.mark.esp32
@mock.patch('sys.platform', 'esp32')
@mock.patch('sys.implementation', types.SimpleNamespace(_multiarch='micropython', name='micropython', cache_tag='micropython-1.14', version=sys.version_info))
def test_sensors_vedirect_mpy(caplog):

import vedirect
vedirect.vedirect.MICROPYTHON = True

# Acquire settings.
from test.settings import sensors_vedirect_mpy as sensor_settings

# Invoke datalogger for a single duty cycle.
datalogger = invoke_datalogger_pycom(caplog, settings=sensor_settings, after_setup=setup_dummy_uart)

# Capture log output.
captured = caplog.text

# Proof it works by verifying log output.
assert 'Initializing sensor "Victron Energy VE.Direct" on "1"' in captured, captured

# Get hold of the last reading.
last_reading = datalogger.storage.last_reading

# Proof it works by verifying measurement values.
verify_reading(last_reading)


def verify_reading(last_reading):
assert last_reading['vedirect-0xA042:CS'] == 5
assert last_reading['vedirect-0xA042:ERR'] == 0
assert last_reading['vedirect-0xA042:FW'] == "1.19"
assert last_reading['vedirect-0xA042:H19'] == 456
assert last_reading['vedirect-0xA042:H20'] == 45
assert last_reading['vedirect-0xA042:H21'] == 300
assert last_reading['vedirect-0xA042:H22'] == 45
assert last_reading['vedirect-0xA042:H23'] == 350
assert last_reading['vedirect-0xA042:HSDS'] == 0
assert last_reading['vedirect-0xA042:I'] == 15000
assert last_reading['vedirect-0xA042:IL'] == 1500
assert last_reading['vedirect-0xA042:LOAD'] == "ON"
assert last_reading['vedirect-0xA042:MPPT'] == 2
assert last_reading['vedirect-0xA042:PID'] == "0xA042"
assert last_reading['vedirect-0xA042:PPV'] == 130
assert last_reading['vedirect-0xA042:Relay'] == "OFF"
assert last_reading['vedirect-0xA042:SER#'] == "HQ141112345"
assert last_reading['vedirect-0xA042:V'] == 12800
assert last_reading['vedirect-0xA042:VPV'] == 3350


def setup_dummy_uart(datalogger):
"""
Patch UART interface to use a dummy one instead of a mocked one.
"""
sens = datalogger.sensor_manager.get_sensor_by_id("vedirect-mpy-1")
if sens:
class DummyUART:

def __init__(self):
self.position = 0
self.data = MPPT_DATA

def read(self, count):
char = MPPT_DATA[self.position]
self.position += 1
return bytes(char, "latin1")

sens.driver.ser = DummyUART()
12 changes: 8 additions & 4 deletions test/util/terkin.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ def invoke_umal():
return bootloader


def invoke_datalogger_pycom(caplog, settings):
return invoke_datalogger(caplog, settings, pycom=True)
def invoke_datalogger_pycom(caplog, settings, after_setup=None):
return invoke_datalogger(caplog, settings, pycom=True, after_setup=after_setup)


def invoke_datalogger_raspberrypi(caplog, settings):
Expand All @@ -50,10 +50,10 @@ def invoke_datalogger_raspberrypi(caplog, settings):
return invoke_datalogger(caplog, settings, raspberrypi=True)


def invoke_datalogger(caplog, settings, pycom=False, raspberrypi=False):
def invoke_datalogger(caplog, settings, pycom=False, raspberrypi=False, after_setup=None):

# Use a fake filesystem.
with FakeFS():
with FakeFS(additional_skip_names=["serial", "serial.serialutil", "serial.serialposix"]):

# Pycom mounts the main filesystem at "/flash".
if pycom:
Expand Down Expand Up @@ -90,6 +90,10 @@ def invoke_datalogger(caplog, settings, pycom=False, raspberrypi=False):
from terkin.datalogger import TerkinDatalogger
datalogger = TerkinDatalogger(settings, platform_info=bootloader.platform_info)
datalogger.setup()

if callable(after_setup):
after_setup(datalogger=datalogger)

datalogger.duty_cycle()

return datalogger
5 changes: 5 additions & 0 deletions tools/setup.mk
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ download-requirements-real:
# 5. Install INA219 library
$(fetch) $(target_dir) https://raw.githubusercontent.com/chrisb2/pyb_ina219/f427017/ina219.py

# 6. Install VEDirect library
mkdir -p $(target_dir)/vedirect
$(fetch) $(target_dir)/vedirect https://github.com/nznobody/vedirect/raw/345a688/src/vedirect/__init__.py
$(fetch) $(target_dir)/vedirect https://github.com/nznobody/vedirect/raw/345a688/src/vedirect/vedirect.py


download-requirements-ui:

Expand Down