diff --git a/README.md b/README.md index e7ee196324..89eb16c2e2 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,7 @@ EVCC is an extensible EV Charge Controller with PV integration implemented in [G - [MQTT](#mqtt-readwrite) - [Script](#script-readwrite) - [HTTP](#http-readwrite) + - [Websocket](#websocket-read-only) - [Combined status](#combined-status-read-only) - [Developer information](#developer-information) - [Background](#background) @@ -374,7 +375,7 @@ Sample configuration: type: mqtt topic: mbmd/sdm1-1/Power timeout: 30s # don't accept values older than timeout -scale: 0.001 # floating point factor applied to result, e.g. for kW to W conversion +scale: 0.001 # floating point factor applied to result, e.g. for Wh to kWh conversion ``` Sample write configuration: @@ -435,6 +436,20 @@ Sample write configuration: body: %v # only applicable for PUT or POST requests ``` +### Websocket (read only) + +The `websocket` plugin implements a web socket listener. Includes the ability to read and parse JSON using jq-like queries. It can for example be used to receive messages from Volkszähler's push server. + +Sample configuration (read only): + +```yaml +type: http +uri: ws:///socket +jq: .data | select(.uuid=="") .tuples[0][1] # parse message json +scale: 0.001 # floating point factor applied to result, e.g. for Wh to kWh conversion +timeout: 30s # error if no update received in 30 seconds +``` + ### Combined status (read only) The `combined` status plugin is used to convert a mixed boolean status of plugged/charging into an EVCC-compatible charger status of A..F. It is typically used together with OpenWB MQTT integration. diff --git a/provider/config.go b/provider/config.go index 2f28e0fdf7..6e7e7d3f41 100644 --- a/provider/config.go +++ b/provider/config.go @@ -67,6 +67,8 @@ func NewFloatGetterFromConfig(log *util.Logger, config Config) (res FloatGetter) res = NewCalcFromConfig(log, config.Other) case "http": res = NewHTTPProviderFromConfig(log, config.Other).FloatGetter + case "websocket", "ws": + res = NewSocketProviderFromConfig(log, config.Other).FloatGetter case "mqtt": pc := mqttFromConfig(log, config.Other) res = MQTT.FloatGetter(pc.Topic, pc.Scale, pc.Timeout) @@ -90,6 +92,8 @@ func NewIntGetterFromConfig(log *util.Logger, config Config) (res IntGetter) { switch strings.ToLower(config.Type) { case "http": res = NewHTTPProviderFromConfig(log, config.Other).IntGetter + case "websocket", "ws": + res = NewSocketProviderFromConfig(log, config.Other).IntGetter case "mqtt": pc := mqttFromConfig(log, config.Other) res = MQTT.IntGetter(pc.Topic, int64(pc.Scale), pc.Timeout) @@ -113,6 +117,8 @@ func NewStringGetterFromConfig(log *util.Logger, config Config) (res StringGette switch strings.ToLower(config.Type) { case "http": res = NewHTTPProviderFromConfig(log, config.Other).StringGetter + case "websocket", "ws": + res = NewSocketProviderFromConfig(log, config.Other).StringGetter case "mqtt": pc := mqttFromConfig(log, config.Other) res = MQTT.StringGetter(pc.Topic, pc.Timeout) @@ -136,6 +142,8 @@ func NewBoolGetterFromConfig(log *util.Logger, config Config) (res BoolGetter) { switch strings.ToLower(config.Type) { case "http": res = NewHTTPProviderFromConfig(log, config.Other).BoolGetter + case "websocket", "ws": + res = NewSocketProviderFromConfig(log, config.Other).BoolGetter case "mqtt": pc := mqttFromConfig(log, config.Other) res = MQTT.BoolGetter(pc.Topic, pc.Timeout) diff --git a/provider/socket.go b/provider/socket.go new file mode 100644 index 0000000000..caa132be08 --- /dev/null +++ b/provider/socket.go @@ -0,0 +1,217 @@ +package provider + +import ( + "crypto/tls" + "fmt" + "math" + "net/http" + "strconv" + "sync" + "time" + + "github.com/andig/evcc/util" + "github.com/andig/evcc/util/jq" + "github.com/gorilla/websocket" + "github.com/itchyny/gojq" +) + +// Socket implements websocket request provider +type Socket struct { + *util.HTTPHelper + mux sync.Mutex + once sync.Once + url string + headers map[string]string + scale float64 + jq *gojq.Query + timeout time.Duration + val interface{} + updated time.Time +} + +// NewSocketProviderFromConfig creates a HTTP provider +func NewSocketProviderFromConfig(log *util.Logger, other map[string]interface{}) *Socket { + cc := struct { + URI string + Headers map[string]string + Jq string + Scale float64 + Insecure bool + Auth Auth + Timeout time.Duration + }{} + util.DecodeOther(log, other, &cc) + + logger := util.NewLogger("ws") + + p := &Socket{ + HTTPHelper: util.NewHTTPHelper(logger), + url: cc.URI, + headers: cc.Headers, + scale: cc.Scale, + timeout: cc.Timeout, + } + + // handle basic auth + if cc.Auth.Type != "" { + if p.headers == nil { + p.headers = make(map[string]string) + } + NewAuth(log, cc.Auth, p.headers) + } + + // ignore the self signed certificate + if cc.Insecure { + customTransport := http.DefaultTransport.(*http.Transport).Clone() + customTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + p.HTTPHelper.Client.Transport = customTransport + } + + if cc.Jq != "" { + op, err := gojq.Parse(cc.Jq) + if err != nil { + log.FATAL.Fatalf("config: invalid jq query: %s", p.jq) + } + + p.jq = op + } + + go p.listen() + + return p +} + +func (p *Socket) listen() { + log := p.HTTPHelper.Log + + headers := make(http.Header) + for k, v := range p.headers { + headers.Set(k, v) + } + + for { + client, _, err := websocket.DefaultDialer.Dial(p.url, headers) + if err != nil { + log.ERROR.Println("dial:", err) + } + + for { + _, b, err := client.ReadMessage() + if err != nil { + log.TRACE.Println("read:", err) + _ = client.Close() + break + } + + log.TRACE.Printf("recv: %s", b) + + p.mux.Lock() + if p.jq != nil { + v, err := jq.Query(p.jq, b) + if err == nil { + p.val = v + p.updated = time.Now() + } else { + log.WARN.Printf("invalid: %s", string(b)) + } + } else { + p.val = string(b) + p.updated = time.Now() + } + p.mux.Unlock() + } + } +} + +func (p *Socket) waitForInitialValue() { + p.mux.Lock() + defer p.mux.Unlock() + + if p.updated.IsZero() { + p.HTTPHelper.Log.TRACE.Println("wait for initial value") + + // wait for initial update + for p.updated.IsZero() { + p.mux.Unlock() + time.Sleep(waitTimeout) + p.mux.Lock() + } + } +} + +func (p *Socket) hasValue() (interface{}, error) { + p.once.Do(p.waitForInitialValue) + p.mux.Lock() + defer p.mux.Unlock() + + if elapsed := time.Since(p.updated); p.timeout != 0 && elapsed > p.timeout { + return nil, fmt.Errorf("outdated: %v", elapsed.Truncate(time.Second)) + } + + return p.val, nil +} + +// StringGetter sends string request +func (p *Socket) StringGetter() (string, error) { + v, err := p.hasValue() + if err != nil { + return "", err + } + + return jq.String(v) +} + +// FloatGetter parses float from string getter +func (p *Socket) FloatGetter() (float64, error) { + v, err := p.hasValue() + if err != nil { + return 0, err + } + + // v is always string when jq not used + if p.jq == nil { + v, err = strconv.ParseFloat(v.(string), 64) + if err != nil { + return 0, err + } + } + + f, err := jq.Float64(v) + return f * p.scale, err +} + +// IntGetter parses int64 from float getter +func (p *Socket) IntGetter() (int64, error) { + v, err := p.hasValue() + if err != nil { + return 0, err + } + + // v is always string when jq not used + if p.jq == nil { + v, err = strconv.ParseInt(v.(string), 10, 64) + if err != nil { + return 0, err + } + } + + i, err := jq.Int64(v) + f := float64(i) * p.scale + + return int64(math.Round(f)), err +} + +// BoolGetter parses bool from string getter +func (p *Socket) BoolGetter() (bool, error) { + v, err := p.hasValue() + if err != nil { + return false, err + } + + // v is always string when jq not used + if p.jq == nil { + v = util.Truish(v.(string)) + } + + return jq.Bool(v) +} diff --git a/util/jq/jq.go b/util/jq/jq.go index 4902a4f13b..0bb0995565 100644 --- a/util/jq/jq.go +++ b/util/jq/jq.go @@ -2,6 +2,7 @@ package jq import ( "encoding/json" + "fmt" "github.com/itchyny/gojq" "github.com/pkg/errors" @@ -31,3 +32,50 @@ func Query(query *gojq.Query, input []byte) (interface{}, error) { return v, nil } + +// Float64 converts interface to float64 +func Float64(v interface{}) (float64, error) { + switch v := v.(type) { + case int: + return float64(v), nil + case float64: + return v, nil + default: + return 0, fmt.Errorf("unexpected float type: %T", v) + } +} + +// Int64 converts interface to int64 +func Int64(v interface{}) (int64, error) { + switch v := v.(type) { + case int: + return int64(v), nil + case float64: + if float64(int64(v)) == v { + return int64(v), nil + } + return 0, fmt.Errorf("unexpected int64: %v", v) + default: + return 0, fmt.Errorf("unexpected int64 type: %T", v) + } +} + +// String converts interface to string +func String(v interface{}) (string, error) { + switch v := v.(type) { + case string: + return v, nil + default: + return "", fmt.Errorf("unexpected string type: %T", v) + } +} + +// Bool converts interface to bool +func Bool(v interface{}) (bool, error) { + switch v := v.(type) { + case bool: + return v, nil + default: + return false, fmt.Errorf("unexpected bool type: %T", v) + } +}