Skip to content

Commit

Permalink
[#131] [event][cloudevents] development
Browse files Browse the repository at this point in the history
  • Loading branch information
heaven-chp committed Apr 19, 2024
1 parent 24da92b commit affb3aa
Show file tree
Hide file tree
Showing 9 changed files with 464 additions and 0 deletions.
97 changes: 97 additions & 0 deletions event/cloudevents/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Package cloudevents provides cloudevents client and server implementations.
package cloudevents

import (
"context"
"sync"

cloudeventssdk "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/client"
"github.com/cloudevents/sdk-go/v2/protocol/http"
)

type clientType int

const (
clientTypeHttp clientType = iota + 1
)

// NewHttp creates and returns a http client.
//
// ex) client, err := cloudevents.NewHttp(address, nil, nil)
func NewHttp(address string, httpOption []http.Option, clientOption []client.Option) (*Client, error) {
if protocol, err := cloudeventssdk.NewHTTP(httpOption...); err != nil {
return nil, err
} else if client, err := cloudeventssdk.NewClient(protocol, clientOption...); err != nil {
return nil, err
} else {
return &Client{clientType: clientTypeHttp, client: client, address: address}, nil
}
}

// Client is a struct that provides client related methods.
type Client struct {
clientType clientType

client client.Client
address string

wgForReceiver sync.WaitGroup
cancelFuncForReceiver context.CancelFunc
}

// Send transmits an event.
//
// ex) result := client.Send(event)
func (this *Client) Send(event Event) Result {
return Result{result: this.client.Send(this.getContext(), event)}
}

// Send transmits an event and returns a response event.
//
// ex) responseEvent, result := client.Request(event)
func (this *Client) Request(event Event) (*Event, Result) {
responseEvent, result := this.client.Request(this.getContext(), event)

return responseEvent, Result{result: result}
}

// StartReceiver receives events until StopReceiver is called.
//
// ex)
//
// httpOption := []cloudeventssdk_http.Option{cloudeventssdk_http.WithPort(port)}
// receiveclient, err := cloudevents.NewHttp("", httpOption, nil)
// receiveclient.StartReceiver(handler, failureFunc)
func (this *Client) StartReceiver(handler func(context.Context, Event), failureFunc func(error)) {
this.wgForReceiver.Add(1)
go func() {
defer this.wgForReceiver.Done()

ctx, cancel := context.WithCancel(this.getContext())
this.cancelFuncForReceiver = cancel

if err := this.client.StartReceiver(ctx, handler); err != nil {
failureFunc(err)
}
}()
}

// StopReceiver stops receiving events by StartReceiver.
//
// ex)client.StopReceiver()
func (this *Client) StopReceiver() {
if this.cancelFuncForReceiver != nil {
this.cancelFuncForReceiver()
}
this.wgForReceiver.Wait()
}

func (this *Client) getContext() context.Context {
switch this.clientType {
case clientTypeHttp:
return cloudeventssdk.ContextWithTarget(context.Background(), this.address)
default:
return nil
}
}
84 changes: 84 additions & 0 deletions event/cloudevents/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package cloudevents_test

import (
"context"
"math/rand/v2"
"net/http"
"strconv"
"testing"

cloudeventssdk_http "github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/common-library/go/event/cloudevents"
)

func TestSend(t *testing.T) {
address, server := startServer(t)
defer stopServer(t, server)

if client, err := cloudevents.NewHttp("http://"+address, nil, nil); err != nil {
t.Fatal(err)
} else {
for i := 0; i < 100; i++ {
if result := client.Send(getEvent(t)); result.IsUndelivered() {
t.Fatal(result.Error())
} else if statusCode, err := result.GetHttpStatusCode(); err != nil {
t.Fatal(err)
} else if statusCode != http.StatusOK {
t.Fatal("invalid -", statusCode)
}
}
}
}

func TestRequest(t *testing.T) {
address, server := startServer(t)
defer stopServer(t, server)

if client, err := cloudevents.NewHttp("http://"+address, nil, nil); err != nil {
t.Fatal(err)
} else {
for i := 0; i < 100; i++ {
if event, result := client.Request(getEvent(t)); result.IsUndelivered() {
t.Fatal(result.Error())
} else if statusCode, err := result.GetHttpStatusCode(); err != nil {
t.Fatal(err)
} else if statusCode != http.StatusOK {
t.Fatal("invalid -", statusCode)
} else {
consistencyEvent(t, event)
}
}
}
}

func TestStartReceiver(t *testing.T) {
port := rand.IntN(1000) + 10000
httpOption := []cloudeventssdk_http.Option{cloudeventssdk_http.WithPort(port)}
handler := func(ctx context.Context, event cloudevents.Event) {
consistencyEvent(t, &event)
}
failureFunc := func(err error) { t.Fatal(err) }
if receiveClient, err := cloudevents.NewHttp("", httpOption, nil); err != nil {
t.Fatal(err)
} else {
receiveClient.StartReceiver(handler, failureFunc)

for i := 0; i < 100; i++ {
if sendCient, err := cloudevents.NewHttp("http://:"+strconv.Itoa(port), nil, nil); err != nil {
t.Fatal(err)
} else if result := sendCient.Send(getEvent(t)); result.IsUndelivered() {
t.Fatal(result.Error())
}
}

receiveClient.StopReceiver()
}
}

func TestStopReceiver(t *testing.T) {
if client, err := cloudevents.NewHttp("", nil, nil); err != nil {
t.Fatal(err)
} else {
client.StopReceiver()
}
}
70 changes: 70 additions & 0 deletions event/cloudevents/result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Package cloudevents provides cloudevents client and server implementations.
package cloudevents

import (
"errors"

cloudeventssdk "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/protocol"
cloudeventssdk_http "github.com/cloudevents/sdk-go/v2/protocol/http"
)

// NewResult creates and returns a result.
//
// ex) result := cloudevents.NewResult("ok")
func NewResult(format string, arguments ...any) Result {
return Result{result: cloudeventssdk.NewResult(format, arguments)}
}

// NewResult creates and returns a result.
//
// ex) result := cloudevents.NewHTTPResult(http.StatusOK, "")
func NewHTTPResult(statusCode int, format string, arguments ...any) Result {
return Result{result: cloudeventssdk.NewHTTPResult(statusCode, format, arguments)}
}

// Result is the result of event delivery.
type Result struct {
result protocol.Result
}

// IsACK returns whether the recipient acknowledged the event.
//
// ex) isACK := result.IsACK()
func (this *Result) IsACK() bool {
return cloudeventssdk.IsACK(this.result)
}

// IsNACK returns whether the recipient did not acknowledge the event.
//
// ex) isNACK := result.IsNACK()
func (this *Result) IsNACK() bool {
return cloudeventssdk.IsNACK(this.result)
}

// IsUndelivered returns whether it was delivered or not.
//
// ex) isUndelivered := result.IsUndelivered()
func (this *Result) IsUndelivered() bool {
return cloudeventssdk.IsUndelivered(this.result)
}

// GetHttpStatusCode returns the status code if the result is http.
//
// ex) statusCode, err := result.GetHttpStatusCode()
func (this *Result) GetHttpStatusCode() (int, error) {
httpResult := new(cloudeventssdk_http.Result)

if cloudeventssdk.ResultAs(this.result, &httpResult) == false {
return -1, errors.New("match failed.")
} else {
return httpResult.StatusCode, nil
}
}

// Error returns the error string.
//
// ex) errString := result.Error()
func (this *Result) Error() string {
return this.result.Error()
}
44 changes: 44 additions & 0 deletions event/cloudevents/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Package cloudevents provides cloudevents client and server implementations.
package cloudevents

import (
"context"
"time"

cloudeventssdk "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/protocol"
"github.com/common-library/go/http"
)

// Server is a struct that provides server related methods.
type Server struct {
server http.Server
}

// Start is start the server.
//
// ex) err := server.Start(address, handler, listenAndServeFailureFunc)
func (this *Server) Start(address string, handler func(Event) (*Event, Result), listenAndServeFailureFunc func(error)) error {
finalHandler := func(requestEvent Event) (*Event, protocol.Result) {
responseEvent, result := handler(requestEvent)
return responseEvent, result.result
}

if protocol, err := cloudeventssdk.NewHTTP(); err != nil {
return err
} else if eventReceiver, err := cloudeventssdk.NewHTTPReceiveHandler(context.Background(), protocol, finalHandler); err != nil {

return err
} else {
this.server.RegisterPathPrefixHandler("/", eventReceiver)

return this.server.Start(address, listenAndServeFailureFunc)
}
}

// Stop is stop the server.
//
// ex) err := server.Stop(10)
func (this *Server) Stop(shutdownTimeout time.Duration) error {
return this.server.Stop(shutdownTimeout)
}
35 changes: 35 additions & 0 deletions event/cloudevents/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package cloudevents_test

import (
"net/http"
"testing"

"github.com/common-library/go/event/cloudevents"
)

func TestStart(t *testing.T) {
address, server := startServer(t)
defer stopServer(t, server)

if client, err := cloudevents.NewHttp("http://"+address, nil, nil); err != nil {
t.Fatal(err)
} else {
for i := 0; i < 100; i++ {
if result := client.Send(getEvent(t)); result.IsUndelivered() {
t.Fatal(result.Error())
} else if statusCode, err := result.GetHttpStatusCode(); err != nil {
t.Fatal(err)
} else if statusCode != http.StatusOK {
t.Fatal("invalid -", statusCode)
}
}
}
}

func TestStop(t *testing.T) {
server := cloudevents.Server{}

if err := server.Stop(10); err != nil {
t.Fatal(err)
}
}
10 changes: 10 additions & 0 deletions event/cloudevents/type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Package cloudevents provides cloudevents client and server implementations.
package cloudevents

import (
v2 "github.com/cloudevents/sdk-go/v2"
)

var NewEvent = v2.NewEvent

type Event = v2.Event

0 comments on commit affb3aa

Please sign in to comment.