Skip to content

Commit

Permalink
Add Websocket plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
andig committed May 20, 2020
1 parent bc4fcea commit 68cceb8
Show file tree
Hide file tree
Showing 4 changed files with 289 additions and 1 deletion.
17 changes: 16 additions & 1 deletion README.md
Expand Up @@ -34,6 +34,7 @@ EVCC is an extensible EV Charge Controller with PV integration implemented in [G
- [MQTT](#mqtt-readwrite)
- [Script](#script-readwrite)
- [HTTP](#http-readwrite)
- [Websocket](#websocket-read-only)
- [Combined status](#combined-status-read-only)
- [Developer information](#developer-information)
- [Background](#background)
Expand Down Expand Up @@ -374,7 +375,7 @@ Sample configuration:
type: mqtt
topic: mbmd/sdm1-1/Power
timeout: 30s # don't accept values older than timeout
scale: 0.001 # floating point factor applied to result, e.g. for kW to W conversion
scale: 0.001 # floating point factor applied to result, e.g. for Wh to kWh conversion
```

Sample write configuration:
Expand Down Expand Up @@ -435,6 +436,20 @@ Sample write configuration:
body: %v # only applicable for PUT or POST requests
```

### Websocket (read only)

The `websocket` plugin implements a web socket listener. Includes the ability to read and parse JSON using jq-like queries. It can for example be used to receive messages from Volkszähler's push server.

Sample configuration (read only):

```yaml
type: http
uri: ws://<volkszaehler host:port>/socket
jq: .data | select(.uuid=="<uuid>") .tuples[0][1] # parse message json
scale: 0.001 # floating point factor applied to result, e.g. for Wh to kWh conversion
timeout: 30s # error if no update received in 30 seconds
```

### Combined status (read only)

The `combined` status plugin is used to convert a mixed boolean status of plugged/charging into an EVCC-compatible charger status of A..F. It is typically used together with OpenWB MQTT integration.
Expand Down
8 changes: 8 additions & 0 deletions provider/config.go
Expand Up @@ -67,6 +67,8 @@ func NewFloatGetterFromConfig(log *util.Logger, config Config) (res FloatGetter)
res = NewCalcFromConfig(log, config.Other)
case "http":
res = NewHTTPProviderFromConfig(log, config.Other).FloatGetter
case "websocket", "ws":
res = NewSocketProviderFromConfig(log, config.Other).FloatGetter
case "mqtt":
pc := mqttFromConfig(log, config.Other)
res = MQTT.FloatGetter(pc.Topic, pc.Scale, pc.Timeout)
Expand All @@ -90,6 +92,8 @@ func NewIntGetterFromConfig(log *util.Logger, config Config) (res IntGetter) {
switch strings.ToLower(config.Type) {
case "http":
res = NewHTTPProviderFromConfig(log, config.Other).IntGetter
case "websocket", "ws":
res = NewSocketProviderFromConfig(log, config.Other).IntGetter
case "mqtt":
pc := mqttFromConfig(log, config.Other)
res = MQTT.IntGetter(pc.Topic, int64(pc.Scale), pc.Timeout)
Expand All @@ -113,6 +117,8 @@ func NewStringGetterFromConfig(log *util.Logger, config Config) (res StringGette
switch strings.ToLower(config.Type) {
case "http":
res = NewHTTPProviderFromConfig(log, config.Other).StringGetter
case "websocket", "ws":
res = NewSocketProviderFromConfig(log, config.Other).StringGetter
case "mqtt":
pc := mqttFromConfig(log, config.Other)
res = MQTT.StringGetter(pc.Topic, pc.Timeout)
Expand All @@ -136,6 +142,8 @@ func NewBoolGetterFromConfig(log *util.Logger, config Config) (res BoolGetter) {
switch strings.ToLower(config.Type) {
case "http":
res = NewHTTPProviderFromConfig(log, config.Other).BoolGetter
case "websocket", "ws":
res = NewSocketProviderFromConfig(log, config.Other).BoolGetter
case "mqtt":
pc := mqttFromConfig(log, config.Other)
res = MQTT.BoolGetter(pc.Topic, pc.Timeout)
Expand Down
217 changes: 217 additions & 0 deletions provider/socket.go
@@ -0,0 +1,217 @@
package provider

import (
"crypto/tls"
"fmt"
"math"
"net/http"
"strconv"
"sync"
"time"

"github.com/andig/evcc/util"
"github.com/andig/evcc/util/jq"
"github.com/gorilla/websocket"
"github.com/itchyny/gojq"
)

// Socket implements websocket request provider
type Socket struct {
*util.HTTPHelper
mux sync.Mutex
once sync.Once
url string
headers map[string]string
scale float64
jq *gojq.Query
timeout time.Duration
val interface{}
updated time.Time
}

// NewSocketProviderFromConfig creates a HTTP provider
func NewSocketProviderFromConfig(log *util.Logger, other map[string]interface{}) *Socket {
cc := struct {
URI string
Headers map[string]string
Jq string
Scale float64
Insecure bool
Auth Auth
Timeout time.Duration
}{}
util.DecodeOther(log, other, &cc)

logger := util.NewLogger("ws")

p := &Socket{
HTTPHelper: util.NewHTTPHelper(logger),
url: cc.URI,
headers: cc.Headers,
scale: cc.Scale,
timeout: cc.Timeout,
}

// handle basic auth
if cc.Auth.Type != "" {
if p.headers == nil {
p.headers = make(map[string]string)
}
NewAuth(log, cc.Auth, p.headers)
}

// ignore the self signed certificate
if cc.Insecure {
customTransport := http.DefaultTransport.(*http.Transport).Clone()
customTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
p.HTTPHelper.Client.Transport = customTransport
}

if cc.Jq != "" {
op, err := gojq.Parse(cc.Jq)
if err != nil {
log.FATAL.Fatalf("config: invalid jq query: %s", p.jq)
}

p.jq = op
}

go p.listen()

return p
}

func (p *Socket) listen() {
log := p.HTTPHelper.Log

headers := make(http.Header)
for k, v := range p.headers {
headers.Set(k, v)
}

for {
client, _, err := websocket.DefaultDialer.Dial(p.url, headers)
if err != nil {
log.ERROR.Println("dial:", err)
}

for {
_, b, err := client.ReadMessage()
if err != nil {
log.TRACE.Println("read:", err)
_ = client.Close()
break
}

log.TRACE.Printf("recv: %s", b)

p.mux.Lock()
if p.jq != nil {
v, err := jq.Query(p.jq, b)
if err == nil {
p.val = v
p.updated = time.Now()
} else {
log.WARN.Printf("invalid: %s", string(b))
}
} else {
p.val = string(b)
p.updated = time.Now()
}
p.mux.Unlock()
}
}
}

func (p *Socket) waitForInitialValue() {
p.mux.Lock()
defer p.mux.Unlock()

if p.updated.IsZero() {
p.HTTPHelper.Log.TRACE.Println("wait for initial value")

// wait for initial update
for p.updated.IsZero() {
p.mux.Unlock()
time.Sleep(waitTimeout)
p.mux.Lock()
}
}
}

func (p *Socket) hasValue() (interface{}, error) {
p.once.Do(p.waitForInitialValue)
p.mux.Lock()
defer p.mux.Unlock()

if elapsed := time.Since(p.updated); p.timeout != 0 && elapsed > p.timeout {
return nil, fmt.Errorf("outdated: %v", elapsed.Truncate(time.Second))
}

return p.val, nil
}

// StringGetter sends string request
func (p *Socket) StringGetter() (string, error) {
v, err := p.hasValue()
if err != nil {
return "", err
}

return jq.String(v)
}

// FloatGetter parses float from string getter
func (p *Socket) FloatGetter() (float64, error) {
v, err := p.hasValue()
if err != nil {
return 0, err
}

// v is always string when jq not used
if p.jq == nil {
v, err = strconv.ParseFloat(v.(string), 64)
if err != nil {
return 0, err
}
}

f, err := jq.Float64(v)
return f * p.scale, err
}

// IntGetter parses int64 from float getter
func (p *Socket) IntGetter() (int64, error) {
v, err := p.hasValue()
if err != nil {
return 0, err
}

// v is always string when jq not used
if p.jq == nil {
v, err = strconv.ParseInt(v.(string), 10, 64)
if err != nil {
return 0, err
}
}

i, err := jq.Int64(v)
f := float64(i) * p.scale

return int64(math.Round(f)), err
}

// BoolGetter parses bool from string getter
func (p *Socket) BoolGetter() (bool, error) {
v, err := p.hasValue()
if err != nil {
return false, err
}

// v is always string when jq not used
if p.jq == nil {
v = util.Truish(v.(string))
}

return jq.Bool(v)
}
48 changes: 48 additions & 0 deletions util/jq/jq.go
Expand Up @@ -2,6 +2,7 @@ package jq

import (
"encoding/json"
"fmt"

"github.com/itchyny/gojq"
"github.com/pkg/errors"
Expand Down Expand Up @@ -31,3 +32,50 @@ func Query(query *gojq.Query, input []byte) (interface{}, error) {

return v, nil
}

// Float64 converts interface to float64
func Float64(v interface{}) (float64, error) {
switch v := v.(type) {
case int:
return float64(v), nil
case float64:
return v, nil
default:
return 0, fmt.Errorf("unexpected float type: %T", v)
}
}

// Int64 converts interface to int64
func Int64(v interface{}) (int64, error) {
switch v := v.(type) {
case int:
return int64(v), nil
case float64:
if float64(int64(v)) == v {
return int64(v), nil
}
return 0, fmt.Errorf("unexpected int64: %v", v)
default:
return 0, fmt.Errorf("unexpected int64 type: %T", v)
}
}

// String converts interface to string
func String(v interface{}) (string, error) {
switch v := v.(type) {
case string:
return v, nil
default:
return "", fmt.Errorf("unexpected string type: %T", v)
}
}

// Bool converts interface to bool
func Bool(v interface{}) (bool, error) {
switch v := v.(type) {
case bool:
return v, nil
default:
return false, fmt.Errorf("unexpected bool type: %T", v)
}
}

0 comments on commit 68cceb8

Please sign in to comment.