Skip to content

Commit

Permalink
Merge pull request #2070 from horoc/add-eventmesh-message-protocol
Browse files Browse the repository at this point in the history
[ISSUE #2069] Support Go SDK Http EventMesh Message Protocol 
close #2069
  • Loading branch information
xwm1992 committed Nov 7, 2022
2 parents 6c0fff0 + 6e2bac2 commit c81c366
Show file tree
Hide file tree
Showing 9 changed files with 316 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package producer
package protocol

import (
cloudevents "github.com/cloudevents/sdk-go/v2"
"time"
)

type EventMeshProtocolProducer interface {
Publish(event *cloudevents.Event) error
Request(event *cloudevents.Event, timeout time.Duration) (*cloudevents.Event, error)

// TODO: add EventMeshMessage and OpenMessage support
type EventMeshMessage struct {
BizSeqNo string `json:"biz_seq_no"`
UniqueId string `json:"unique_id"`
Topic string `json:"topic"`
Content string `json:"content"`
Prop map[string]string `json:"prop"`
}
2 changes: 1 addition & 1 deletion eventmesh-sdk-go/examples/http/async_pub_cloudevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,5 @@ func AsyncPubCloudEvents() {

// Publish event
httpProducer := producer.NewEventMeshHttpProducer(eventMeshClientConfig)
httpProducer.Publish(&event)
httpProducer.PublishCloudEvent(&event)
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package constant
package constants

const (
EventMeshMessageProtocol = "eventmeshmessage"
CloudEventsProtocol = "cloudevents"
OpenMessageProtocol = "openmessage"
)

const ProtocolDesc = "http"
ProtocolDesc = "http"
EventMeshMessageConstTTL = "ttl"
)
12 changes: 6 additions & 6 deletions eventmesh-sdk-go/http/producer/cloudevent_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ import (
"fmt"
gcommon "github.com/apache/incubator-eventmesh/eventmesh-sdk-go/common"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/common/protocol/http/common"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/common/protocol/http/message"
protocol_message "github.com/apache/incubator-eventmesh/eventmesh-sdk-go/common/protocol/http/message"
gutils "github.com/apache/incubator-eventmesh/eventmesh-sdk-go/common/utils"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/http"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/http/conf"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/http/constant"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/http/constants"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/http/model"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/http/utils"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/log"
Expand Down Expand Up @@ -115,13 +115,13 @@ func (c *CloudEventProducer) buildCommonPostParam(event *cloudevents.Event) *mod
requestParam.AddHeader(common.ProtocolKey.ClientInstanceKey.USERNAME, c.EventMeshHttpClientConfig.UserName())
requestParam.AddHeader(common.ProtocolKey.ClientInstanceKey.PASSWORD, c.EventMeshHttpClientConfig.Password())
requestParam.AddHeader(common.ProtocolKey.LANGUAGE, gcommon.Constants.LANGUAGE_GO)
requestParam.AddHeader(common.ProtocolKey.PROTOCOL_TYPE, constant.CloudEventsProtocol)
requestParam.AddHeader(common.ProtocolKey.PROTOCOL_DESC, constant.ProtocolDesc)
requestParam.AddHeader(common.ProtocolKey.PROTOCOL_TYPE, constants.CloudEventsProtocol)
requestParam.AddHeader(common.ProtocolKey.PROTOCOL_DESC, constants.ProtocolDesc)
requestParam.AddHeader(common.ProtocolKey.PROTOCOL_VERSION, event.SpecVersion())

// todo: move producerGroup tp header
requestParam.AddBody(message.SendMessageRequestBodyKey.PRODUCERGROUP, c.EventMeshHttpClientConfig.ProducerGroup())
requestParam.AddBody(message.SendMessageRequestBodyKey.CONTENT, content)
requestParam.AddBody(protocol_message.SendMessageRequestBodyKey.PRODUCERGROUP, c.EventMeshHttpClientConfig.ProducerGroup())
requestParam.AddBody(protocol_message.SendMessageRequestBodyKey.CONTENT, content)

return requestParam
}
Expand Down
8 changes: 4 additions & 4 deletions eventmesh-sdk-go/http/producer/cloudevent_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"time"
)

func TestEventMeshHttpProducer_Publish(t *testing.T) {
func TestEventMeshHttpProducer_PublishCloudEvent(t *testing.T) {
f := func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"retCode":0}`))
Expand All @@ -57,11 +57,11 @@ func TestEventMeshHttpProducer_Publish(t *testing.T) {
}
// Publish event
httpProducer := NewEventMeshHttpProducer(eventMeshClientConfig)
err = httpProducer.Publish(&event)
err = httpProducer.PublishCloudEvent(&event)
assert.Nil(t, err)
}

func TestEventMeshHttpProducer_Request(t *testing.T) {
func TestEventMeshHttpProducer_RequestCloudEvent(t *testing.T) {
f := func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"retCode":0, "retMsg":"{\"topic\":\"test-topic\",\"body\":\"{\\\"data\\\":1}\",\"properties\":null}"}`))
Expand All @@ -88,7 +88,7 @@ func TestEventMeshHttpProducer_Request(t *testing.T) {
}

httpProducer := NewEventMeshHttpProducer(eventMeshClientConfig)
ret, err := httpProducer.Request(&event, time.Second)
ret, err := httpProducer.RequestCloudEvent(&event, time.Second)
assert.Nil(t, err)
retData := make(map[string]interface{})
utils.UnMarshalJsonString(string(ret.DataEncoded), &retData)
Expand Down
137 changes: 137 additions & 0 deletions eventmesh-sdk-go/http/producer/eventmesh_message_producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package producer

import (
"errors"
"fmt"
gcommon "github.com/apache/incubator-eventmesh/eventmesh-sdk-go/common"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/common/protocol"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/common/protocol/http/common"
protocol_message "github.com/apache/incubator-eventmesh/eventmesh-sdk-go/common/protocol/http/message"
gutils "github.com/apache/incubator-eventmesh/eventmesh-sdk-go/common/utils"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/http"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/http/conf"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/http/constants"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/http/model"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/http/utils"
nethttp "net/http"
"strconv"
"time"
)

type EventMeshMessageProducer struct {
*http.AbstractHttpClient
}

func NewEventMeshMessageProducer(eventMeshHttpClientConfig conf.EventMeshHttpClientConfig) *EventMeshMessageProducer {
c := &EventMeshMessageProducer{AbstractHttpClient: http.NewAbstractHttpClient(eventMeshHttpClientConfig)}
return c
}

func (c *EventMeshMessageProducer) Publish(message *protocol.EventMeshMessage) error {
err := c.validateEventMeshMessage(message)
if err != nil {
return err
}

requestParam := c.buildCommonPostParam(message)
requestParam.AddHeader(common.ProtocolKey.REQUEST_CODE, strconv.Itoa(common.DefaultRequestCode.MSG_SEND_ASYNC.RequestCode))

target := c.SelectEventMesh()
resp := utils.HttpPost(c.HttpClient, target, requestParam)
var ret http.EventMeshRetObj
gutils.UnMarshalJsonString(resp, &ret)
if ret.RetCode != common.DefaultEventMeshRetCode.SUCCESS.RetCode {
return fmt.Errorf("publish failed, http request error code: %d", ret.RetCode)
}
return nil
}

func (c *EventMeshMessageProducer) Request(message *protocol.EventMeshMessage, timeout time.Duration) (*protocol.EventMeshMessage, error) {
err := c.validateEventMeshMessage(message)
if err != nil {
return nil, err
}

requestParam := c.buildCommonPostParam(message)
requestParam.AddHeader(common.ProtocolKey.REQUEST_CODE, strconv.Itoa(common.DefaultRequestCode.MSG_SEND_SYNC.RequestCode))
requestParam.SetTimeout(timeout.Milliseconds())
target := c.SelectEventMesh()
resp := utils.HttpPost(c.HttpClient, target, requestParam)
var ret http.EventMeshRetObj
gutils.UnMarshalJsonString(resp, &ret)
if ret.RetCode != common.DefaultEventMeshRetCode.SUCCESS.RetCode {
return nil, fmt.Errorf("request failed, http request code: %d", ret.RetCode)
}
retMessage, err := c.transferMessage(&ret)
if err != nil {
return nil, err
}
return retMessage, nil
}

func (c *EventMeshMessageProducer) transferMessage(retObj *http.EventMeshRetObj) (message *protocol.EventMeshMessage, err error) {
defer func() {
if r := recover(); r != nil {
err = errors.New("fail to transfer from EventMeshRetObj to EventMeshMessage")
}
}()
var replyMessage http.ReplyMessage
gutils.UnMarshalJsonString(retObj.RetMsg, &replyMessage)
return &protocol.EventMeshMessage{
Topic: replyMessage.Topic,
Content: replyMessage.Body,
Prop: replyMessage.Properties,
}, nil
}

func (c *EventMeshMessageProducer) buildCommonPostParam(message *protocol.EventMeshMessage) *model.RequestParam {
requestParam := model.NewRequestParam(nethttp.MethodPost)
requestParam.AddHeader(common.ProtocolKey.ClientInstanceKey.ENV, c.EventMeshHttpClientConfig.Env())
requestParam.AddHeader(common.ProtocolKey.ClientInstanceKey.IDC, c.EventMeshHttpClientConfig.Idc())
requestParam.AddHeader(common.ProtocolKey.ClientInstanceKey.IP, c.EventMeshHttpClientConfig.Ip())
requestParam.AddHeader(common.ProtocolKey.ClientInstanceKey.PID, c.EventMeshHttpClientConfig.Pid())
requestParam.AddHeader(common.ProtocolKey.ClientInstanceKey.SYS, c.EventMeshHttpClientConfig.Sys())
requestParam.AddHeader(common.ProtocolKey.ClientInstanceKey.USERNAME, c.EventMeshHttpClientConfig.UserName())
requestParam.AddHeader(common.ProtocolKey.ClientInstanceKey.PASSWORD, c.EventMeshHttpClientConfig.Password())
requestParam.AddHeader(common.ProtocolKey.VERSION, common.DefaultProtocolVersion.V1.Version())
requestParam.AddHeader(common.ProtocolKey.PROTOCOL_TYPE, constants.EventMeshMessageProtocol)
requestParam.AddHeader(common.ProtocolKey.PROTOCOL_DESC, constants.ProtocolDesc)
requestParam.AddHeader(common.ProtocolKey.PROTOCOL_VERSION, common.DefaultProtocolVersion.V1.Version())
requestParam.AddHeader(common.ProtocolKey.LANGUAGE, gcommon.Constants.LANGUAGE_GO)

requestParam.AddBody(protocol_message.SendMessageRequestBodyKey.PRODUCERGROUP, c.EventMeshHttpClientConfig.ProducerGroup())
requestParam.AddBody(protocol_message.SendMessageRequestBodyKey.TOPIC, message.Topic)
requestParam.AddBody(protocol_message.SendMessageRequestBodyKey.CONTENT, message.Content)
requestParam.AddBody(protocol_message.SendMessageRequestBodyKey.TTL, message.Prop[constants.EventMeshMessageConstTTL])
requestParam.AddBody(protocol_message.SendMessageRequestBodyKey.BIZSEQNO, message.BizSeqNo)
requestParam.AddBody(protocol_message.SendMessageRequestBodyKey.UNIQUEID, message.UniqueId)
return requestParam
}

func (c *EventMeshMessageProducer) validateEventMeshMessage(message *protocol.EventMeshMessage) error {
if message == nil {
return errors.New("EventMeshMessage can not be nil")
}
if len(message.Topic) == 0 {
return errors.New("EventMeshMessage topic can not be empty")
}
if len(message.Content) == 0 {
return errors.New("EventMeshMessage content can not be empty")
}
return nil
}
82 changes: 82 additions & 0 deletions eventmesh-sdk-go/http/producer/eventmesh_message_producer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package producer

import (
"fmt"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/common/protocol"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/common/utils"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/http/conf"
"github.com/stretchr/testify/assert"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
)

func TestEventMeshHttpProducer_PublishEventMeshMessage(t *testing.T) {
f := func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"retCode":0}`))
}
server := httptest.NewServer(http.HandlerFunc(f))
defer server.Close()

eventMeshClientConfig := conf.DefaultEventMeshHttpClientConfig
sp := strings.Split(server.URL, ":")
eventMeshClientConfig.SetLiteEventMeshAddr(fmt.Sprintf("127.0.0.1:%s", sp[len(sp)-1]))

message := &protocol.EventMeshMessage{
BizSeqNo: "test-biz-no",
UniqueId: "test-unique-id",
Topic: "test-topic",
Content: "test-content",
Prop: map[string]string{"hello": "EventMesh"},
}
// Publish event
httpProducer := NewEventMeshHttpProducer(eventMeshClientConfig)
err := httpProducer.PublishEventMeshMessage(message)
assert.Nil(t, err)
}

func TestEventMeshHttpProducer_RequestEventMeshMessage(t *testing.T) {
f := func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"retCode":0, "retMsg":"{\"topic\":\"test-topic\",\"body\":\"{\\\"data\\\":1}\",\"properties\":null}"}`))
}

server := httptest.NewServer(http.HandlerFunc(f))
defer server.Close()

eventMeshClientConfig := conf.DefaultEventMeshHttpClientConfig
sp := strings.Split(server.URL, ":")
eventMeshClientConfig.SetLiteEventMeshAddr(fmt.Sprintf("127.0.0.1:%s", sp[len(sp)-1]))

message := &protocol.EventMeshMessage{
BizSeqNo: "test-biz-no",
UniqueId: "test-unique-id",
Topic: "test-topic",
Content: "test-content",
Prop: map[string]string{"hello": "EventMesh"},
}
httpProducer := NewEventMeshHttpProducer(eventMeshClientConfig)
ret, err := httpProducer.RequestEventMeshMessage(message, time.Second)
assert.Nil(t, err)
retData := make(map[string]interface{})
utils.UnMarshalJsonString(ret.Content, &retData)
assert.Equal(t, float64(1), retData["data"])
}
64 changes: 64 additions & 0 deletions eventmesh-sdk-go/http/producer/http_producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package producer

import (
"errors"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/common/protocol"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/http/conf"
cloudevents "github.com/cloudevents/sdk-go/v2"
"time"
)

type EventMeshHttpProducer struct {
cloudEventProducer *CloudEventProducer
eventMeshMessageProducer *EventMeshMessageProducer
}

func NewEventMeshHttpProducer(eventMeshHttpClientConfig conf.EventMeshHttpClientConfig) *EventMeshHttpProducer {
return &EventMeshHttpProducer{
cloudEventProducer: NewCloudEventProducer(eventMeshHttpClientConfig),
eventMeshMessageProducer: NewEventMeshMessageProducer(eventMeshHttpClientConfig),
}
}

func (e *EventMeshHttpProducer) PublishCloudEvent(event *cloudevents.Event) error {
if event == nil {
return errors.New("publish CloudEvent message failed, message is nil")
}
return e.cloudEventProducer.Publish(event)
}

func (e *EventMeshHttpProducer) RequestCloudEvent(event *cloudevents.Event, timeout time.Duration) (*cloudevents.Event, error) {
if event == nil {
return nil, errors.New("request CloudEvent message failed, message is nil")
}
return e.cloudEventProducer.Request(event, timeout)
}

func (e *EventMeshHttpProducer) PublishEventMeshMessage(message *protocol.EventMeshMessage) error {
if message == nil {
return errors.New("publish EventMesh message failed, message is nil")
}
return e.eventMeshMessageProducer.Publish(message)
}

func (e *EventMeshHttpProducer) RequestEventMeshMessage(message *protocol.EventMeshMessage, timeout time.Duration) (*protocol.EventMeshMessage, error) {
if message == nil {
return nil, errors.New("request EventMesh message failed, message is nil")
}
return e.eventMeshMessageProducer.Request(message, timeout)
}
Loading

0 comments on commit c81c366

Please sign in to comment.