Skip to content

Commit

Permalink
Refactor data processing into pipeline component (#2169)
Browse files Browse the repository at this point in the history
  • Loading branch information
andig committed Dec 31, 2021
1 parent bac4e07 commit d97e8d1
Show file tree
Hide file tree
Showing 5 changed files with 296 additions and 296 deletions.
8 changes: 6 additions & 2 deletions meter/discovergy.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/evcc-io/evcc/api"
"github.com/evcc-io/evcc/provider"
"github.com/evcc-io/evcc/provider/pipeline"
"github.com/evcc-io/evcc/util"
"github.com/evcc-io/evcc/util/request"
"github.com/evcc-io/evcc/util/transport"
Expand Down Expand Up @@ -71,12 +72,15 @@ func NewDiscovergyFromConfig(other map[string]interface{}) (api.Meter, error) {

uri := fmt.Sprintf("%s/last_reading?meterId=%s", discovergyAPI, meterID)
power, err := provider.NewHTTP(log, http.MethodGet, uri, false, 0.001*cc.Scale, 0).WithAuth("basic", cc.User, cc.Password)
if err == nil {
_, err = power.WithJq(".values.power")
if err != nil {
return nil, err
}

pipe, err := new(pipeline.Pipeline).WithJq(".values.power")
if err != nil {
return nil, err
}
power = power.WithPipeline(pipe)

return NewConfigurable(power.FloatGetter())
}
Expand Down
238 changes: 24 additions & 214 deletions provider/http.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,18 @@
package provider

import (
"bytes"
"encoding/hex"
"fmt"
"io"
"math"
"regexp"
"strconv"
"strings"
"time"

xj "github.com/basgys/goxml2json"
"github.com/evcc-io/evcc/provider/javascript"
"github.com/evcc-io/evcc/provider/pipeline"
"github.com/evcc-io/evcc/util"
"github.com/evcc-io/evcc/util/jq"
"github.com/evcc-io/evcc/util/request"
"github.com/evcc-io/evcc/util/transport"
"github.com/itchyny/gojq"
"github.com/jpfielding/go-http-digest/pkg/digest"
"github.com/robertkrimen/otto"
"github.com/volkszaehler/mbmd/meters/rs485"
)

// HTTP implements HTTP request provider
Expand All @@ -29,15 +21,10 @@ type HTTP struct {
url, method string
headers map[string]string
body string
re *regexp.Regexp
jq *gojq.Query
unpack string
decode string
vm *otto.Otto
script string
scale float64
cache time.Duration
updated time.Time
pipeline *pipeline.Pipeline
val []byte // Cached http response value
err error // Cached http response error
}
Expand All @@ -54,20 +41,15 @@ type Auth struct {
// NewHTTPProviderFromConfig creates a HTTP provider
func NewHTTPProviderFromConfig(other map[string]interface{}) (IntProvider, error) {
cc := struct {
URI, Method string
Headers map[string]string
Body string
Regex string
Jq string
Unpack string
Decode string
VM string
Script string
Scale float64
Insecure bool
Auth Auth
Timeout time.Duration
Cache time.Duration
URI, Method string
Headers map[string]string
Body string
pipeline.Settings `mapstructure:",squash"`
Scale float64
Insecure bool
Auth Auth
Timeout time.Duration
Cache time.Duration
}{
Headers: make(map[string]string),
Scale: 1,
Expand All @@ -85,32 +67,14 @@ func NewHTTPProviderFromConfig(other map[string]interface{}) (IntProvider, error
cc.Insecure,
cc.Scale,
cc.Cache,
).WithHeaders(cc.Headers).WithBody(cc.Body)
http.Client.Timeout = cc.Timeout
).
WithHeaders(cc.Headers).
WithBody(cc.Body)

var err error
if err == nil && cc.Regex != "" {
_, err = http.WithRegex(cc.Regex)
}

if err == nil && cc.Jq != "" {
_, err = http.WithJq(cc.Jq)
}

if err == nil && cc.Unpack != "" {
_, err = http.WithUnpack(cc.Unpack)
}

if err == nil && cc.Decode != "" {
_, err = http.WithDecode(cc.Decode)
}

if err == nil && cc.Script != "" {
_, err = http.WithScript(cc.VM, cc.Script)
}

if err == nil && cc.Auth.Type != "" {
_, err = http.WithAuth(cc.Auth.Type, cc.Auth.User, cc.Auth.Password)
pipe, err := pipeline.New(cc.Settings)
if err == nil {
http = http.WithPipeline(pipe)
http.Client.Timeout = cc.Timeout
}

return http, err
Expand Down Expand Up @@ -151,52 +115,10 @@ func (p *HTTP) WithHeaders(headers map[string]string) *HTTP {
return p
}

// WithRegex adds a regex query applied to the mqtt listener payload
func (p *HTTP) WithRegex(regex string) (*HTTP, error) {
re, err := regexp.Compile(regex)
if err != nil {
return nil, fmt.Errorf("invalid regex '%s': %w", re, err)
}

p.re = re

return p, nil
}

// WithJq adds a jq query applied to the mqtt listener payload
func (p *HTTP) WithJq(jq string) (*HTTP, error) {
op, err := gojq.Parse(jq)
if err != nil {
return nil, fmt.Errorf("invalid jq query '%s': %w", jq, err)
}

p.jq = op

return p, nil
}

// WithUnpack adds data unpacking
func (p *HTTP) WithUnpack(unpack string) (*HTTP, error) {
p.unpack = strings.ToLower(unpack)

return p, nil
}

// WithDecode adds data decoding
func (p *HTTP) WithDecode(decode string) (*HTTP, error) {
p.decode = strings.ToLower(decode)

return p, nil
}

// WithScript adds a javascript script to process the response
func (p *HTTP) WithScript(vm, script string) (*HTTP, error) {
regvm := javascript.RegisteredVM(strings.ToLower(vm))

p.vm = regvm
p.script = script

return p, nil
// WithPipeline adds a processing pipeline
func (p *HTTP) WithPipeline(pipeline *pipeline.Pipeline) *HTTP {
p.pipeline = pipeline
return p
}

// WithAuth adds authorized transport
Expand Down Expand Up @@ -237,72 +159,6 @@ func (p *HTTP) request(body ...string) ([]byte, error) {
return p.val, p.err
}

// transform XML into JSON with attribute names getting 'attr' prefix
func (p *HTTP) transformXML(value []byte) []byte {
// only do a simple check, as some devices e.g. Kostal Piko MP plus don't seem to send proper XML
if !bytes.HasPrefix(value, []byte("<")) {
return value
}

xmlReader := bytes.NewReader(value)

// Decode XML document
root := new(xj.Node)
if err := xj.NewDecoder(xmlReader).DecodeWithCustomPrefixes(root, "", "attr"); err != nil {
return value
}

// Then encode it in JSON
json := new(bytes.Buffer)
if err := xj.NewEncoder(json).Encode(root); err != nil {
return value
}

return json.Bytes()
}

func (p *HTTP) unpackValue(value []byte) (string, error) {
switch p.unpack {
case "hex":
b, err := hex.DecodeString(string(value))
if err != nil {
return "", err
}
return string(b), nil
}

return "", fmt.Errorf("invalid unpack: %s", p.unpack)
}

// decode a hex string to a proper value
// TODO reuse similar code from Modbus
func (p *HTTP) decodeValue(value []byte) (interface{}, error) {
switch p.decode {
case "float32", "ieee754":
return rs485.RTUIeee754ToFloat64(value), nil
case "float32s", "ieee754s":
return rs485.RTUIeee754ToFloat64Swapped(value), nil
case "float64":
return rs485.RTUUint64ToFloat64(value), nil
case "uint16":
return rs485.RTUUint16ToFloat64(value), nil
case "uint32":
return rs485.RTUUint32ToFloat64(value), nil
case "uint32s":
return rs485.RTUUint32ToFloat64Swapped(value), nil
case "uint64":
return rs485.RTUUint64ToFloat64(value), nil
case "int16":
return rs485.RTUInt16ToFloat64(value), nil
case "int32":
return rs485.RTUInt32ToFloat64(value), nil
case "int32s":
return rs485.RTUInt32ToFloat64Swapped(value), nil
}

return nil, fmt.Errorf("invalid decoding: %s", p.decode)
}

// FloatGetter parses float from request
func (p *HTTP) FloatGetter() func() (float64, error) {
g := p.StringGetter()
Expand Down Expand Up @@ -336,55 +192,9 @@ func (p *HTTP) IntGetter() func() (int64, error) {
func (p *HTTP) StringGetter() func() (string, error) {
return func() (string, error) {
b, err := p.request(p.body)
if err != nil {
return string(b), err
}

b = p.transformXML(b)

if p.re != nil {
m := p.re.FindSubmatch(b)
if len(m) > 1 {
b = m[1] // first submatch
}
}

if p.jq != nil {
v, err := jq.Query(p.jq, b)
if err != nil {
return string(b), err
}
b = []byte(fmt.Sprintf("%v", v))
}

if p.unpack != "" {
v, err := p.unpackValue(b)
if err != nil {
return string(b), err
}
b = []byte(fmt.Sprintf("%v", v))
}

if p.decode != "" {
v, err := p.decodeValue(b)
if err != nil {
return string(b), err
}
b = []byte(fmt.Sprintf("%v", v))
}

if p.vm != nil {
err := p.vm.Set("val", string(b))
if err != nil {
return string(b), err
}

v, err := p.vm.Eval(p.script)
if err != nil {
return string(b), err
}

return v.ToString()
if err == nil && p.pipeline != nil {
b, err = p.pipeline.Process(b)
}

return string(b), err
Expand Down

0 comments on commit d97e8d1

Please sign in to comment.