Skip to content

Commit

Permalink
Add V2 Hooks and V1 Configs New integrations test
Browse files Browse the repository at this point in the history
  • Loading branch information
agourdel committed Jul 6, 2024
1 parent 1f251d8 commit 6a07021
Show file tree
Hide file tree
Showing 63 changed files with 1,962 additions and 736 deletions.
7 changes: 3 additions & 4 deletions ee/webhooks/cmd/webhook-all-in-one.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,18 @@ func allInOneRun(cmd *cobra.Command, _ []string) error {
r.AddNoPublisherHandler(fmt.Sprintf("messages-%s", topic), topic, subscriber, Worker.HandleMessage)

}


lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
logging.FromContext(ctx).Infof("Start Webhook_Worker")

return nil
},
OnStop: func(ctx context.Context) error {
logging.FromContext(ctx).Infof("Stop Webhook_Worker")

subscriber.Close()

Worker.Stop()
return nil
},
Expand Down
8 changes: 4 additions & 4 deletions ee/webhooks/cmd/webhook-worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,16 @@ func webhookWorkerRun(cmd *cobra.Command, _ []string) error {
Worker.Init()
for _, topic := range topics {
r.AddNoPublisherHandler(fmt.Sprintf("messages-%s", topic), topic, subscriber, Worker.HandleMessage)

}

lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {

return nil
},
OnStop: func(ctx context.Context) error {

subscriber.Close()
r.Close()
Worker.Stop()
Expand Down
2 changes: 1 addition & 1 deletion ee/webhooks/internal/commons/attempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (

type Attempt struct {
ID string `json:"id" bun:",pk"`
HookID string `json:"hookID" bun:"webhook_id"`
HookID string `json:"hookId" bun:"webhook_id"`
HookName string `json:"hookName" bun:"hook_name"`
HookEndpoint string `json:"hookEndpoint" bun:"hook_endpoint"`
Event string `json:"event" bun:"event"`
Expand Down
11 changes: 6 additions & 5 deletions ee/webhooks/internal/components/commons/webhook_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,17 @@ func (wr *WebhookRunner) StartHandleFreshLogs() {
}

func (wr *WebhookRunner) HandleFreshLogs(stopChan chan struct{}) {

ticker := time.NewTicker(time.Duration(wr.RunnerParams.DelayPull) * time.Second)

delay := time.Duration(wr.RunnerParams.DelayPull) * time.Second
ticker := time.NewTicker(time.Duration(delay))
var last_time time.Time = time.Now()

for {
select {
case <-stopChan:
return
case <-ticker.C:
logging.Infof("Webhook_Runner :: HandleFreshLogs() - Case Ticker C : ")
freezeTime := time.Now()
logs, err := wr.Database.GetFreshLogs(wr.LogChannels, last_time)
last_time = freezeTime
Expand All @@ -74,6 +76,7 @@ func (wr *WebhookRunner) HandleFreshLogs(stopChan chan struct{}) {
}

for _, log := range *logs {
logging.Infof("Webhook_Runner :: HandleFreshLogs() - NewLog ")
wr.HandleFreshLog(log)
}
}
Expand All @@ -82,7 +85,7 @@ func (wr *WebhookRunner) HandleFreshLogs(stopChan chan struct{}) {
}

func (wr *WebhookRunner) HandleFreshLog(log *commons.Log) {
fmt.Println("HandleFreshLog")
logging.Infof("Webhook_Runner :: HandleFreshLog() - Log.Payload : %s", log.Payload)
e, err := commons.Event{}.FromPayload(log.Payload)
if err != nil {
message := fmt.Sprintf("WebhookRunner:HandleFreshLogs() - LogChannels : %s : Error while Event.FromPayload(log.payload): %x", wr.LogChannels, err)
Expand All @@ -104,11 +107,9 @@ func (wr *WebhookRunner) HandleFreshLog(log *commons.Log) {
}

case commons.ChangeHookStatusType:
fmt.Println(e.Value)
strValue := e.Value.(string)
switch commons.HookStatus(strValue) {
case commons.EnableStatus:
fmt.Println("ENABLE CASE")
wr.State.ActivateHook(e.ID)
case commons.DisableStatus:
wr.State.DisableHook(e.ID)
Expand Down
16 changes: 14 additions & 2 deletions ee/webhooks/internal/components/webhook_collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,18 @@ func (c *Collector) AsyncHandleSharedAttempt(sAttempt *commons.SharedAttempt, wg
return
}

if(sAttempt.Val.HookEndpoint != sHook.Val.Endpoint){
sAttempt.Val.HookEndpoint = sHook.Val.Endpoint
go func(){
_, err := c.Database.UpdateAttemptEndpoint(sAttempt.Val.ID, sAttempt.Val.HookEndpoint)
if err != nil {
message := fmt.Sprintf("Collector:AsyncHandleSharedAttempt:Database.UpdateAttemptEndpoint() : %x", err)
logging.Error(message)
panic(message)
}
}()
}

sAttempt.Val.LastHttpStatusCode = statusCode

if commons.IsHTTPRequestSuccess(statusCode) {
Expand All @@ -117,15 +129,15 @@ func (c *Collector) handleSuccess(sAttempt *commons.SharedAttempt) {
func (c *Collector) handleNextRetry(sAttempt *commons.SharedAttempt) {
sAttempt.Val.NbTry += 1

if(c.RunnerParams.MaxRetry <= sAttempt.Val.NbTry ) {
if c.RunnerParams.MaxRetry <= sAttempt.Val.NbTry {
commons.SetAbortMaxRetryStatus(sAttempt.Val)
_, err := c.Database.AbortAttempt(sAttempt.Val.ID, string(sAttempt.Val.Comment), false)
if err != nil {
message := fmt.Sprintf("Collector:handleNextRetry:Database.AbortAttempt: %x", err)
logging.Error(message)
panic(message)
}
}else {
} else {

commons.SetNextRetry(sAttempt.Val)
c.State.WaitingAttempts.Add(sAttempt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ type ErrorType string

const (
NoneType ErrorType = "NONE"
ValidationType ErrorType = "VALIDATION"
ValidationType ErrorType = "VALIDATION_TYPE"
NotFoundType ErrorType = "NOT_FOUND"
InternalType ErrorType = "INTERNAL"
InternalType ErrorType = "INTERNAL_TYPE"
)

type Response[T interface{}] struct {
Expand Down Expand Up @@ -61,7 +61,7 @@ func NotFoundErrorResp[T interface{}](err error) Response[T] {
}

const (
ErrValidation = "VALIDATION_REQUEST"
ErrValidation = "VALIDATION_TYPE"
ErrHealthcheck = "HEALTHCHECK_STATUS"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,23 @@ func ToV1Hooks(hooks *[]*commons.Hook) []V1Hook {
}

func ToV1Attempt(hook commons.Hook, attempt commons.Attempt) V1Attempt {

var status string
if attempt.Status == commons.SuccessStatus {
status = "success"
} else {
status = "failed"
}
return V1Attempt{
ID: attempt.ID,
WebhookID: hook.ID,
Config: ToV1Hook(hook),
Payload: attempt.Payload,
StatusCode: attempt.LastHttpStatusCode,
RetryAttempt: 0,
ID: attempt.ID,
WebhookID: hook.ID,
Config: ToV1Hook(hook),
Payload: attempt.Payload,
StatusCode: attempt.LastHttpStatusCode,
Status: status,
RetryAttempt: attempt.RetryAttempt,
UpdatedAt: attempt.DateStatus,
NextRetryAfter: attempt.NextTry,
CreatedAt: attempt.CreatedAt,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,7 @@ func RegisterV1HookControllers(server serverInterfaces.IHTTPServer, database sto
id := chi.URLParam(r, "id")
payload := PayloadBody{}

if err := utils.DecodeJSONBody(r, &payload); err != nil {
sharedapi.BadRequest(w, utils.ErrValidation, err)
return
}
payload.Payload = "{\"data\":\"test\"}"

resp := V1TestHookController(database, client, id, payload.Payload)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,31 @@ func RegisterV2HookControllers(server serverInterfaces.IHTTPServer, database sto

})

server.Register(string(r.V2GetHook.Method), r.V2GetHook.Url, func(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
resp := V2GetHookController(database, id)

if resp.Err != nil {
if resp.T == utils.ValidationType {
sharedapi.BadRequest(w, string(resp.T), resp.Err)
return
}
if resp.T == utils.InternalType {
sharedapi.InternalServerError(w, r, resp.Err)
return
}
if resp.T == utils.NotFoundType {
sharedapi.NotFound(w, resp.Err)
return
}

sharedapi.InternalServerError(w, r, resp.Err)
return
}

sharedapi.Ok(w, *resp.Data)
})

server.Register(string(r.V2GetHooks.Method), r.V2GetHooks.Url, func(w http.ResponseWriter, r *http.Request) {

filterEndpoint := r.URL.Query().Get("endpoint")
Expand Down Expand Up @@ -251,19 +276,28 @@ func RegisterV2HookControllers(server serverInterfaces.IHTTPServer, database sto
sharedapi.BadRequest(w, utils.ErrValidation, err)
}

hook, err := controllersCommons.UpdateEndpoint(database, id, ep.Endpoint)
resp := V2ChangeEndpointController(database, id, ep.Endpoint)

if err != nil {
sharedapi.InternalServerError(w, r, err)
return
}
if resp.Err != nil {
if resp.T == utils.ValidationType {
sharedapi.BadRequest(w, string(resp.T), resp.Err)
return
}
if resp.T == utils.InternalType {
sharedapi.InternalServerError(w, r, resp.Err)
return
}
if resp.T == utils.NotFoundType {
sharedapi.NotFound(w, resp.Err)
return
}

if hook.ID == "" {
sharedapi.NotFound(w, errors.New(fmt.Sprintf("Hook (id : %s) doesn't exist", id)))
sharedapi.InternalServerError(w, r, resp.Err)
return
}

sharedapi.Ok(w, hook)
sharedapi.Ok(w, *resp.Data)
return
})

server.Register(string(r.V2ChangeHookRetry.Method), r.V2ChangeHookRetry.Url, func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -319,6 +353,18 @@ func V2CreateHookController(database storeInterface.IStoreProvider, hookParams c
return utils.SuccessResp(hook)
}

func V2GetHookController(database storeInterface.IStoreProvider, id string) utils.Response[commons.Hook] {

hook, err := controllersCommons.GetHook(database, id)
if err != nil {
return utils.InternalErrorResp[commons.Hook](err)
}
if hook.ID == "" {
return utils.NotFoundErrorResp[commons.Hook](errors.New(fmt.Sprintf("Hook (id : %s) doesn't exist", id)))
}
return utils.SuccessResp(hook)
}

func V2GetHooksController(database storeInterface.IStoreProvider, filterEndpoint, filterCursor string) utils.Response[bunpaginate.Cursor[commons.Hook]] {
hasMore := false
strPrevious := " "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ var V1DeleteHook = s.NewRoute(s.DELETE, "/configs/{id}")
var V1TestHook = s.NewRoute(s.GET, "/configs/{id}/test")
var V1ActiveHook = s.NewRoute(s.PUT, "/configs/{id}/activate")
var V1DeactiveHook = s.NewRoute(s.PUT, "/configs/{id}/deactivate")
var V1ChangeSecret = s.NewRoute(s.PUT, "/configs/{id}/secret")
var V1ChangeSecret = s.NewRoute(s.PUT, "/configs/{id}/secret/change")
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

var V2GetHooks = s.NewRoute(s.GET, "/v2/hooks")
var V2CreateHook = s.NewRoute(s.POST, "/v2/hooks")
var V2GetHook = s.NewRoute(s.GET, "/v2/hooks/{id}")
var V2DeleteHook = s.NewRoute(s.DELETE, "/v2/hooks/{id}")
var V2TestHook = s.NewRoute(s.POST, "/v2/hooks/{id}/test")
var V2ActiveHook = s.NewRoute(s.PUT, "/v2/hooks/{id}/activate")
Expand Down
3 changes: 2 additions & 1 deletion ee/webhooks/internal/components/webhook_worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ func (w *Worker) HandleMessage(msg *message.Message) error {
if eventApp != "" {
event = strings.Join([]string{eventApp, event}, ".")
}

logging.Infof("Webhook_Worker :: Handling The Event : %s", event)
triggedSHooks := w.State.ActiveHooksByEvent.Get(event)

if triggedSHooks == nil || triggedSHooks.Size() == 0 {
return nil
}
Expand Down
3 changes: 1 addition & 2 deletions ee/webhooks/internal/services/httpclient/default_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,14 @@ func (dc *DefaultHttpClient) Call(context context.Context, hook *commons.Hook, a

ts := time.Now().UTC()
timestamp := ts.Unix()
signature, err := security.Sign(hook.ID, timestamp, hook.Secret, []byte(attempt.Payload))
signature, err := security.Sign(attempt.ID, timestamp, hook.Secret, []byte(attempt.Payload))

req.Header.Set("content-type", "application/json")
req.Header.Set("user-agent", "formance-webhooks/v2")
req.Header.Set("formance-webhook-id", attempt.ID)
req.Header.Set("formance-webhook-timestamp", fmt.Sprintf("%d", timestamp))
req.Header.Set("formance-webhook-signature", signature)
req.Header.Set("formance-webhook-test", fmt.Sprintf("%v", isTest))

resp, err := dc.httpClient.Do(req)
if err != nil {
span.RecordError(err)
Expand Down
3 changes: 2 additions & 1 deletion ee/webhooks/internal/services/httpserver/default_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"net"
"net/http"
"time"

sharedapi "github.com/formancehq/stack/libs/go-libs/api"
"github.com/formancehq/stack/libs/go-libs/auth"
Expand Down Expand Up @@ -72,7 +73,7 @@ func NewDefaultHTTPServer(addr string, info commons.ServiceInfo, a auth.Auth, lo
server: &http.Server{
Addr: addr,
Handler: router,

ReadHeaderTimeout: time.Duration(1 * time.Second),
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type IStoreProvider interface {
CompleteAttempt(index string) (commons.Attempt, error)
AbortAttempt(index string, comment string, wrapInLog bool) (commons.Attempt, error)
ChangeAttemptStatus(index string, status commons.AttemptStatus, comment string, wrapInLog bool) (commons.Attempt, error)
UpdateAttemptEndpoint(index string, endpoint string) (commons.Attempt, error)
UpdateAttemptNextTry(index string, nextTry time.Time, statusCode int) (commons.Attempt, error)

GetWaitingAttempts(page int, size int) (*[]*commons.Attempt, bool, error)
Expand Down
13 changes: 13 additions & 0 deletions ee/webhooks/internal/services/storage/postgres/attempt_queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (
insertAttemptQuery = "INSERT INTO attempts (id, webhook_id, hook_name, hook_endpoint, event, payload, status_code, date_occured, status, date_status, comment, next_retry_after) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING *"
updateAttemptStatus = "UPDATE attempts SET status = ?, date_status = NOW(), comment = ? WHERE id = ? RETURNING *"
updateAttemptNextTry = "UPDATE attempts SET next_retry_after = ?, status_code = ? WHERE id = ? RETURNING *"
updateAttemptEndpoint = "UPDATE attempts SET hook_endpoint = ? WHERE id = ? RETURNING *"
)

func (store PostgresStore) GetAttempt(index string) (commons.Attempt, error) {
Expand Down Expand Up @@ -130,6 +131,18 @@ func (store PostgresStore) ChangeAttemptStatus(index string, status commons.Atte
return attempt, err
}

func (store PostgresStore) UpdateAttemptEndpoint(index string, endpoint string) (commons.Attempt, error){
var attempt commons.Attempt

_, err := store.db.NewRaw(updateAttemptEndpoint, endpoint, index).Exec(context.Background(), &attempt)

if err == sql.ErrNoRows {
return attempt, nil
}

return attempt, nil
}

func (store PostgresStore) UpdateAttemptNextTry(index string, nextTry time.Time, statusCode int) (commons.Attempt, error) {
var attempt commons.Attempt

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ func (store PostgresStore) changeHookStatus(index string, status commons.HookSta

wrapQuery := wrapWithLogQuery(updateHookStatusQuery)

_, err = store.db.NewRaw(wrapQuery, string(status), index, log.ID, log.Channel, log.Payload, log.CreatedAt).Exec(context.Background(), &hook)
_, err = store.db.NewRaw(wrapQuery, string(status), index, log.ID, log.Channel, log.Payload, log.CreatedAt).Exec(context.Background(), &hook)

if err == sql.ErrNoRows {
Expand Down
Loading

0 comments on commit 6a07021

Please sign in to comment.