Skip to content

Commit

Permalink
Merge branch 'master' of github.com:emqx/kuiper
Browse files Browse the repository at this point in the history
  • Loading branch information
jinfahua committed Dec 10, 2019
2 parents aef7b6c + 7ce6430 commit d4bd19e
Show file tree
Hide file tree
Showing 12 changed files with 282 additions and 18 deletions.
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -31,7 +31,7 @@ It can be run at various IoT edge use scenarios, such as real-time processing of
- 60+ functions, includes mathematical, string, aggregate and hash etc
- 4 time windows

- Highly extensibility
- Highly extensibile

Plugin system is provided, and it supports to extend at ``Source``, ``SQL functions `` and ``Sink``.

Expand Down Expand Up @@ -79,4 +79,4 @@ To using cross-compilation, refer to [this doc](docs/en_US/cross-compile.md).

## Open source license

[Apache 2.0](LICENSE)
[Apache 2.0](LICENSE)
3 changes: 2 additions & 1 deletion docker/README.md
Expand Up @@ -70,7 +70,8 @@ It can be run at various IoT edge use scenarios, such as real-time processing of
- Data order, group, aggregation and join
- 60+ functions, includes mathematical, string, aggregate and hash etc
- 4 time windows
- Highly extensibility
- Highly extensibile

Plugin system is provided, and it supports to extend at Source, SQL functions and Sink.
- Source: embedded support for MQTT, and provide extension points for sources
- Sink: embedded support for MQTT and HTTP, and provide extension points for sinks
Expand Down
2 changes: 1 addition & 1 deletion docs/en_US/rules/overview.md
Expand Up @@ -45,7 +45,7 @@ The sql query to run for the rule.

### actions

Currently, 2 kinds of actions are supported: [log](sinks/logs.md) and [mqtt](sinks/mqtt.md). Each action can define its own properties.
Currently, 3 kinds of actions are supported: [log](sinks/logs.md), [mqtt](sinks/mqtt.md) and [rest](sinks/rest.md). Each action can define its own properties.

Actions could be customized to support different kinds of outputs, see [extension](../extension/overview.md) for more detailed info.

Expand Down
23 changes: 23 additions & 0 deletions docs/en_US/rules/sinks/rest.md
@@ -0,0 +1,23 @@
# REST action

The action is used for publish output message into a RESTful API.

| Property name | Optional | Description |
| ----------------- | -------- | ------------------------------------------------------------ |
| method | true | The http method for the RESTful API. It is a case insensitive string whose value is among "get", "post", "put", "patch", "delete" and "head". The default value is "get". |
| url | false | The RESTful API endpoint, such as ``https://www.example.com/api/dummy`` |
| bodyType | true | The type of the body. Currently, 3 types are supported: "none", "raw" and "form". For "get" and "head", no body is required so the default value is "none". For other http methods, the default value is "raw". |
| timeout | true | The timeout (milliseconds) for a http request, defaults to 5000 ms |
| headers | true | The additional headers to be set for the http request. |
| sendSingle | true | The output messages are received as an array. This is indicate whether to send the results one by one. If false, the output message will be ``{"result":"${the string of received message}"}``. For example, ``{"result":"[{\"count\":30},"\"count\":20}]"}``. Otherwise, the result message will be sent one by one with the actual field name. For the same example as above, it will send ``{"count":30}``, then send ``{"count":20}`` to the RESTful endpoint.Default to false. |

Below is sample configuration for connecting to Edgex Foundry core command.
```json
{
"rest": {
"url": "http://127.0.0.1:48082/api/v1/device/cc622d99-f835-4e94-b5cb-b1eff8699dc4/command/51fce08a-ae19-4bce-b431-b9f363bba705",
"method": "post",
"sendSingle": true
}
}
```
2 changes: 1 addition & 1 deletion docs/zh_CN/rules/overview.md
Expand Up @@ -45,7 +45,7 @@

### 动作

当前,支持两种操作: [log](sinks/logs.md) [mqtt](sinks/mqtt.md).。 每个动作可以定义自己的属性。
当前,支持两种操作: [log](sinks/logs.md) [mqtt](sinks/mqtt.md)[rest](sinks/rest.md)。 每个动作可以定义自己的属性。

可以自定义动作以支持不同种类的输出,有关更多详细信息,请参见 [extension](../extension/overview.md)

Expand Down
23 changes: 23 additions & 0 deletions docs/zh_CN/rules/sinks/rest.md
@@ -0,0 +1,23 @@
# REST action

The action is used for publish output message into a RESTful API.

| Property name | Optional | Description |
| ----------------- | -------- | ------------------------------------------------------------ |
| method | true | The http method for the RESTful API. It is a case insensitive string whose value is among "get", "post", "put", "patch", "delete" and "head". The default value is "get". |
| url | false | The RESTful API endpoint, such as ``https://www.example.com/api/dummy`` |
| bodyType | true | The type of the body. Currently, 3 types are supported: "none", "raw" and "form". For "get" and "head", no body is required so the default value is "none". For other http methods, the default value is "raw". |
| timeout | true | The timeout (milliseconds) for a http request, defaults to 5000 ms |
| headers | true | The additional headers to be set for the http request. |
| sendSingle | true | The output messages are received as an array. This is indicate whether to send the results one by one. If false, the output message will be ``{"result":"${the string of received message}"}``. For example, ``{"result":"[{\"count\":30},"\"count\":20}]"}``. Otherwise, the result message will be sent one by one with the actual field name. For the same example as above, it will send ``{"count":30}``, then send ``{"count":20}`` to the RESTful endpoint.Default to false. |

Below is sample configuration for connecting to Edgex Foundry core command.
```json
{
"rest": {
"url": "http://127.0.0.1:48082/api/v1/device/cc622d99-f835-4e94-b5cb-b1eff8699dc4/command/51fce08a-ae19-4bce-b431-b9f363bba705",
"method": "post",
"sendSingle": true
}
}
```
2 changes: 1 addition & 1 deletion xsql/plans/preprocessor.go
Expand Up @@ -63,7 +63,7 @@ func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}) interface{
if f.AName != "" && (!xsql.HasAggFuncs(f.Expr)) {
ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(tuple, &xsql.FunctionValuer{})}
if v := ve.Eval(f.Expr); v != nil {
result[f.AName] = v
result[strings.ToLower(f.AName)] = v
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions xsql/processors/xsql_processor.go
Expand Up @@ -477,6 +477,8 @@ func getSink(name string, action map[string]interface{}) (api.Sink, error) {
s = sinks.NewLogSink()
case "mqtt":
s = &sinks.MQTTSink{}
case "rest":
s = &sinks.RestSink{}
default:
nf, err := plugin_manager.GetPlugin(name, "sinks")
if err != nil {
Expand Down
13 changes: 13 additions & 0 deletions xsql/processors/xsql_processor_test.go
Expand Up @@ -404,6 +404,19 @@ func TestSingleSQL(t *testing.T) {
"ts": float64(1541152488442),
}},
},
}, {
name: `rule3`,
sql: `SELECT size as Int8, ts FROM demo where size > 3`,
r: [][]map[string]interface{}{
{{
"Int8": float64(6),
"ts": float64(1541152486822),
}},
{{
"Int8": float64(4),
"ts": float64(1541152488442),
}},
},
},
}
fmt.Printf("The test bucket size is %d.\n\n", len(tests))
Expand Down
10 changes: 6 additions & 4 deletions xstream/nodes/sink_node.go
Expand Up @@ -37,10 +37,12 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
for {
select {
case item := <-m.input:
if err := m.sink.Collect(ctx, item); err != nil{
//TODO deal with publish error
logger.Errorf("sink node %s publish %v error: %v", ctx.GetOpId(), item, err)
}
go func() {
if err := m.sink.Collect(ctx, item); err != nil{
//TODO deal with publish error
logger.Errorf("sink node %s publish %v error: %v", ctx.GetOpId(), item, err)
}
}()
case <-ctx.Done():
logger.Infof("sink node %s done", m.name)
if err := m.sink.Close(ctx); err != nil{
Expand Down
14 changes: 6 additions & 8 deletions xstream/sinks/mqtt_sink.go
Expand Up @@ -2,10 +2,10 @@ package sinks

import (
"crypto/tls"
"github.com/emqx/kuiper/common"
"github.com/emqx/kuiper/xstream/api"
"fmt"
MQTT "github.com/eclipse/paho.mqtt.golang"
"github.com/emqx/kuiper/common"
"github.com/emqx/kuiper/xstream/api"
"github.com/google/uuid"
"strings"
)
Expand Down Expand Up @@ -41,7 +41,7 @@ func (ms *MQTTSink) Configure(ps map[string]interface{}) error {
}
}
var pVersion uint = 3
pVersionStr, ok := ps["protocolVersion"];
pVersionStr, ok := ps["protocolVersion"]
if ok {
v, _ := pVersionStr.(string)
if v == "3.1" {
Expand All @@ -54,7 +54,7 @@ func (ms *MQTTSink) Configure(ps map[string]interface{}) error {
}

uName := ""
un, ok := ps["username"];
un, ok := ps["username"]
if ok {
v, _ := un.(string)
if strings.Trim(v, " ") != "" {
Expand All @@ -63,7 +63,7 @@ func (ms *MQTTSink) Configure(ps map[string]interface{}) error {
}

password := ""
pwd, ok := ps["password"];
pwd, ok := ps["password"]
if ok {
v, _ := pwd.(string)
if strings.Trim(v, " ") != "" {
Expand Down Expand Up @@ -154,6 +154,4 @@ func (ms *MQTTSink) Close(ctx api.StreamContext) error {
ms.conn.Disconnect(5000)
}
return nil
}


}

0 comments on commit d4bd19e

Please sign in to comment.