Skip to content

Commit

Permalink
Development of 1.10 (#19)
Browse files Browse the repository at this point in the history
* resolves #16

* ignore compiled python

* resolves #15

* resolves #17

* add some smoke tests

* first pass at refactoring managing config data

* cleanup config logging

resolves #9

* method signature changes to handle multi topics

* driver handle multiple topics

* pass queue into processing

prepping for processing multiple queues

* service handle multpile topics

* support multi queues

* subscribe to multiple topics

still need to review tests

* cleanup code after multi-topic support

* proof of concept/hack for handling wind

When individual payload, attempt to put all wind data into a single
packet so that WeeWx accumulator works "out of the box" (no necessary
configuring)

* slightly improved logging

* additional logging

* move wind data to a separate queue

seems a bit cleaner to have data that that needs to be grouped in its
own queue

* get tests working

* tests running under python 3

probably need redo when i fully understand python 3 string handling

* lookup subscription topic

* some additional help comments

* cleanup archive_topic and unit_system

* additional help comments

* add queue limit

* clean up collecting of wind data

* service wind queue processing tests

* driver wind processing tests

* test collect data class

* better handle when archive_topic not configured

When hardware generation configured, but no archive_topic -
fall back to software generation (by throwing not implemented)

* enable configuring MQTT

* ability to configure MQTT logging on/off

* improved comments

* eliminate multiple inheritance

* move client creation

* Move topic management

* improve comment

* mock MQTTSubscribe

* cleanup

* mock MQTTSubscribe

* additional refactoring

* experiment with properties

* prep for python 3

* enable install/configure of driver

progress for #12

* Move files in preparation of installer

* add and option to turn the service on/off

* extension installer

* Install extension and documentation

* Exclude the config editor from tests

* additional formatting

Rendered badly in a different editor

* continue to improve formatting

* renders differently

* additional documentation improvements

* Initial refactor for message callback factory

Need to refactor/rework the tests

* test that base callbacks are returned

* queue size tests

* test keyword payload

* json payload tests

* individual payload tests

* [[message_handler]] with payload_type

* move label_map to [[message_handler]]

* move full_topic_fieldname to [[message_handler]]

* move deliminator and separator to [[message_handler]]

* move max queue to [[message_handler]]

* move unit_system to [[message_handler]]

* initial Topics class implementation

* signature change

* continued cleanup

* continued elimination of accessing TopicX/topics2 directly

* remove dead code

* cleanup adding data

* refactor to hide queue implentation

* hide queue implementation

* improved naming

* append data tests

* test get data

* minor cleanup

* improved logging

* additional logging

* cleanup driver tests

* cleanup service tests

* remove obsolete tests

* cleanup

* improve naming

* fix install

* changes
  • Loading branch information
bellrichm committed May 31, 2019
1 parent 6197be8 commit ea4a4ba
Show file tree
Hide file tree
Showing 17 changed files with 2,852 additions and 1,757 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.pyc
198 changes: 151 additions & 47 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,89 +1,193 @@
# WeeWX-MQTTSubscribe
A Weewx service and driver that receives data from MQTT
A Weewx service and driver that receives data from multiple MQTT topics.

Currently MQTT payloads of json, keyword (field1=value, field2=value..), and individual (each topic contains a single observation) are supported.

## Description
### **Driver** ###
The driver captures the MQTT payload in a separate thread. Then in *genLoopPackets*, every element in the queue is turned into its own packet.

It is also possible to subscribe to a second topic. The MQTT payload from this topic is put into a separate queue. This queue is processed by the *genArchiveRecords* to create archive records (simulating hardware generation). With the combination of the two topics/queues, one can have a WeeWX instance gather the data and publish loop and archive data for other instances to receive.

### Service
The service can bind to either new loop packets or new archive records. In both cases a separate thread captures the MQTT payload and puts it on a queue. On new packets/records events, the main thread takes all the elements from the queue with a dateTime that is less than the packet's/record's dateTime. These elements are accumulated to a single set of data fields. if necessary, the data is converted to the units of the packet/record. The packet/record is then updated with the data.
When generating loop packets and the queue becomes empty, the option, wait_before_retry, controls how long before an attempt is made to get data from the queue.
### **Service**
The service can bind to either new loop packets or new archive records. In both cases a separate thread captures the MQTT payload and puts it on a queue. On new packets/records events, the main thread takes all the elements from the queue with a dateTime that is less than the packet's/record's dateTime. These elements are accumulated to a single dictionary of data fields. If necessary, the data is converted to the units of the packet/record. The packet/record is then updated with the data.

In development, it was noticed that due to the shorter interval, sometimes the MQTT payload was "dropped" because it dateTime was smaller than the expected start (the previous packet's dateTime). The option, overlap, can be set to decrement the start time and capture the payload. This value should be as small as possible.

### Driver
The driver also captures the MQTT payload in a separate thread. Then in genLoopPackets, every element is in the queue is turned into a packet.
It is also possible to subscribe to a second topic. The MQTT payload from this topic is put into a separate queue. This queue is processed by the genArchiveRecords to create archive records. With the combination of the two topics/queues, one can have a WeeWX instance gather the data and publish loop and archive data for other instances to receive.
## Installation and running stand-alone notes
Because there are [multiple methods to install WeeWX](http://weewx.com/docs/usersguide.htm#installation_methods), location of files can vary. See [where to find things](http://weewx.com/docs/usersguide.htm#Where_to_find_things) in the WeeWX [User's Guide](http://weewx.com/docs/usersguide.htm") for the definitive information. The following symbolic names are used to define the various locations:

When generating loop packets and the queue becomes empty, the option, wait_before_retry, controls how long before an attempt is made to get data from the queue.
* *$DOWNLOAD_ROOT* - The directory containing the downloaded *MQTTSubscribe* extension.
* *$BIN_ROOT* - The directory where WeeWX executables are located.
* *$CONFIG_ROOT* - The directory where the configuration (typically, weewx.conf) is located.

## Preqrequisites
1. Install the paho MQTT client.

```
pip install paho-mqtt
```

## Installation
1. Download MQTTSubscribe

```
wget -P $DOWNLOAD_ROOT https://github.com/bellrichm/WeeWX-MQTTSubscribe/archive/v.X.Y.Z.tar.gz
```
2. Install MQTTSubscribe
* As a driver

```
$BIN_ROOT/wee_extension --install=$DOWNLOAD_ROOT/v.X.Y.Z.tar.gz
$BIN_ROOT/wee_config --reconfig
```
**Note:** By default when installing, the service is installed and configured, but not enabled.
To not install and configure the service (only install the file(s)),
set the environment variable MQTTSubscribe_install_type to DRIVER. For example,

```
MQTTSubscribe_install_type=DRIVER --install=$DOWNLOAD_ROOT/v.X.Y.Z.tar.gz
```
And then configure the driver.

```
$BIN_ROOT/wee_config --reconfig
```
In either case, edit the [MQTTSubscribeDriver][\[topics\]] stanza to configure the topics to subscribe to.

* As a service

```
$BIN_ROOT/wee_extension --install=$DOWNLOAD_DIR/v.X.Y.Z.tar.gz
```
**Note:** By default when installing, the service is installed and configured, but not enabled.
To enable, set the environment variable MQTTSubscribe_install_type to SERVICE. For example,

```
MQTTSubscribe_install_type=SERVICE --install=$DOWNLOAD_DIR/v.X.Y.Z.tar.gz
```
In either case, edit the [MQTTSubscribeService] stanza as required.
At the very least the [\[topics\]] stanza must be configured to the topics to subscribe to.
Other settings such as host and port may need to be changed.

3. Optionally run in standalone mode

```
PYTHONPATH=$BIN_ROOT python $BIN_ROOT/user/MQTTSubscribe.py <options> $CONFIG_ROOT/weewx.conf
```
4. Restart WeeWX

```
sudo /etc/init.d/weewx restart
```

or

```
sudo sudo service restart weewx
```

or

```
sudo systemctl restart weewx
```

## Manual installation instructions
1. Download MQTTSubscribe

```
wget -P $DOWNLOAD_ROOT https://github.com/bellrichm/WeeWX-MQTTSubscribe/archive/v.X.Y.Z.tar.gz
```
2. Unpack MQTTSubscribe

```
tar xvfz v.X.Y.Z.tar.gz
```
3. Copy the file

```
cp $DOWNLOAD_ROOT/v.X.Y.X/bin/user/MQTTSubscribe.py $BIN_ROOT/user
```
4. Configure
* As a driver

```
$BIN_ROOT/wee_config --reconfig
```
Edit the [MQTTSubscribeDriver] stanza to configure the topics to subscribe to.

```
[MQTTSubscribeDriver]
[[topics]]
[[[topic1]]]
[[[topic2]]]
```

* As a service
Configure as required and add the service to WeeWX.

```
[MQTTSubscribeService]
host = localhost
payload_type = json
[[topics]]
[[[topic1]]]
[[[topic2]]]
[Engine]
[[Services]]
data_services = user.MQTTSubscribe.MQTTSubscribeService
```
## Running stand-alone
To aid in development and debugging, both the service and driver can be run "stand-alone" via
To aid in development and debugging, both the service and driver can be run "stand-alone" via:

```
PYTHONPATH=bin python bin/user/MQTTSubscribe.py <options> weewx.conf
PYTHONPATH=$BIN_ROOT python $BIN_ROOT/user/MQTTSubscribe.py <options> $CONFIG_ROOT/weewx.conf
```

Where the options are:

```
--verbose
Verbose logging
Verbose logging.
--console
Write all logging to the console/terminal.
This will override the setting in weewx.conf.
--type [driver|service]
Run as driver or service
--records N
Create N records or packets
--interval N
Number of seconds between packet or record generation.
-- delay N
The archive delay in seconds.
--delay N
Number of seconds delay to wait after the archive period is over
--binding [archive|loop]
Bind to either new packets or records when running the service.
Bind to either new packets or records when running the service.
This will override the setting in weewx.conf.
Process loop/packet or archive.record when running the driver.
--units [US|METRIC|METRICWX]
The units of the packet/record passed to the service. Only used when running the service.
```

Examples:

Bind the service to new archive records and update two records.

```
PYTHONPATH=bin python bin/user/MQTTSubscribe.py --type service --binding archive --interval 300 --delay 15 --records 2 weewx.conf
```

Run as a driver every 2 seconds generate a loop packet for a total of 30.

```
PYTHONPATH=bin python bin/user/MQTTSubscribe.py --type driver --binding loop --interval 2 --records 30 weewx.conf
```

## Manual installation Instructions

1. Install the paho MQTT client.
* Bind the service to new archive records and update two records.

```
sudo pip install paho-mqtt
PYTHONPATH=$BIN_ROOT python $BIN_ROOT/user/MQTTSubscribe.py $CONFIG_ROOT/weewx.conf --type service --binding archive --interval 300 --delay 15 --records 2 weewx.conf
```

2. For the driver, update the station type and configure the driver as required. See comments in MQTTSubscribe for configuration information.
* Run as a driver every 2 seconds generate a loop packet for a total of 30.

```
[Station]
station_type = MQTTSubscribeDriver
[MQTTSubscribeDriver]
driver = user.MQTTSubscribe
host = localhost
payload_type = json
```

3. For the service, configure as required and add the service to WeeWX. See comments in MQTTSubscribe for configuration information.

```
[MQTTSubscribeService]
host = localhost
payload_type = json
[Engine]
[[Services]]
data_services = user.MQTTSubscribe.MQTTSubscribeService
```
PYTHONPATH=$BIN_ROOT python $BIN_ROOT/user/MQTTSubscribe.py $CONFIG_ROOT/weewx.conf --type driver --binding loop --interval 2 --records 30 weewx.conf
```
Loading

0 comments on commit ea4a4ba

Please sign in to comment.