diff --git a/cache/redis/client.go b/cache/redis/client.go index bf5ddbe176..b05d3f5a2f 100644 --- a/cache/redis/client.go +++ b/cache/redis/client.go @@ -5,8 +5,8 @@ import ( "errors" "time" + "github.com/frain-dev/convoy/internal/pkg/rdb" "github.com/go-redis/cache/v8" - "github.com/go-redis/redis/v8" ) type RedisCache struct { @@ -14,16 +14,14 @@ type RedisCache struct { } func NewRedisCache(dsn string) (*RedisCache, error) { - opts, err := redis.ParseURL(dsn) + rdb, err := rdb.NewClient(dsn) if err != nil { return nil, err } - client := redis.NewClient(opts) - c := cache.New(&cache.Options{ - Redis: client, + Redis: rdb.Client(), }) r := &RedisCache{cache: c} diff --git a/cmd/main.go b/cmd/main.go index 89fb858c95..81c10ada4b 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -14,15 +14,16 @@ import ( "github.com/frain-dev/convoy/cache" "github.com/frain-dev/convoy/datastore/badger" + "github.com/frain-dev/convoy/internal/pkg/apm" + "github.com/frain-dev/convoy/internal/pkg/rdb" "github.com/frain-dev/convoy/searcher" "github.com/google/uuid" - "github.com/hibiken/asynq" + "github.com/newrelic/go-agent/v3/newrelic" "go.mongodb.org/mongo-driver/bson/primitive" "github.com/frain-dev/convoy/logger" redisqueue "github.com/frain-dev/convoy/queue/redis" "github.com/frain-dev/convoy/tracer" - "github.com/getsentry/sentry-go" prefixed "github.com/x-cray/logrus-prefixed-formatter" log "github.com/sirupsen/logrus" @@ -170,34 +171,32 @@ func preRun(app *app, db datastore.DatabaseClient) func(cmd *cobra.Command, args return err } - db, err := NewDB(cfg) + nwCfg := cfg.Tracer.NewRelic + nRApp, err := newrelic.NewApplication( + newrelic.ConfigAppName(nwCfg.AppName), + newrelic.ConfigLicense(nwCfg.LicenseKey), + newrelic.ConfigDistributedTracerEnabled(nwCfg.DistributedTracerEnabled), + newrelic.ConfigEnabled(nwCfg.ConfigEnabled), + ) + if err != nil { return err } - err = sentry.Init(sentry.ClientOptions{ - Debug: true, - Dsn: cfg.Sentry.Dsn, - Environment: cfg.Environment, - }) + apm.SetApplication(nRApp) + + db, err := NewDB(cfg) if err != nil { return err } - defer sentry.Recover() // recover any panic and report to sentry - defer sentry.Flush(2 * time.Second) // send any events in sentry before exiting - - sentryHook := convoy.NewSentryHook(convoy.DefaultLevels) - log.AddHook(sentryHook) - - var aC *asynq.Client var tr tracer.Tracer var ca cache.Cache var li limiter.RateLimiter var q queue.Queuer if cfg.Queue.Type == config.RedisQueueProvider { - aC, err = redisqueue.NewClient(cfg) + rdb, err := rdb.NewClient(cfg.Queue.Redis.Dsn) if err != nil { return err } @@ -209,7 +208,7 @@ func preRun(app *app, db datastore.DatabaseClient) func(cmd *cobra.Command, args } opts := queue.QueueOptions{ Names: queueNames, - Client: aC, + RedisClient: rdb, RedisAddress: cfg.Queue.Redis.Dsn, Type: string(config.RedisQueueProvider), PrometheusAddress: cfg.Prometheus.Dsn, diff --git a/cmd/server.go b/cmd/server.go index 920ce5e912..e219135f25 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -452,7 +452,7 @@ func loadServerConfigFromCliFlags(cmd *cobra.Command, c *config.Configuration) e } if !util.IsStringEmpty(newReplicKey) { - c.Tracer.NewRelic.AppName = newReplicKey + c.Tracer.NewRelic.LicenseKey = newReplicKey } // CONVOY_SEARCH_TYPE diff --git a/datastore/badger/subscription.go b/datastore/badger/subscription.go index c029ebd8ac..8c29a96f63 100644 --- a/datastore/badger/subscription.go +++ b/datastore/badger/subscription.go @@ -11,6 +11,10 @@ type subscriptionRepo struct { client *badgerhold.Store } +func (s *subscriptionRepo) FindSubscriptionsByAppID(ctx context.Context, groupId string, appID string) ([]datastore.Subscription, error) { + return nil, nil +} + func (*subscriptionRepo) UpdateSubscriptionStatus(context.Context, string, string, datastore.SubscriptionStatus) error { return nil } diff --git a/datastore/mongo/group_test.go b/datastore/mongo/group_test.go index 4cc9d685a5..da35824728 100644 --- a/datastore/mongo/group_test.go +++ b/datastore/mongo/group_test.go @@ -57,12 +57,14 @@ func Test_CreateGroup(t *testing.T) { groups: []datastore.Group{ { Name: "group 2", + OrganisationID: "123abc", UID: uuid.NewString(), DocumentStatus: datastore.ActiveDocumentStatus, }, { Name: "group 2", + OrganisationID: "123abc", UID: uuid.NewString(), DocumentStatus: datastore.ActiveDocumentStatus, }, @@ -75,17 +77,38 @@ func Test_CreateGroup(t *testing.T) { groups: []datastore.Group{ { Name: "group 3", + OrganisationID: "abc", UID: uuid.NewString(), DocumentStatus: datastore.DeletedDocumentStatus, }, { Name: "group 3", + OrganisationID: "abc", UID: uuid.NewString(), DocumentStatus: datastore.ActiveDocumentStatus, }, }, }, + { + name: "can create group with existing name in a different organisation", + groups: []datastore.Group{ + { + Name: "group 4", + OrganisationID: uuid.NewString(), + UID: uuid.NewString(), + DocumentStatus: datastore.ActiveDocumentStatus, + }, + + { + Name: "group 4", + OrganisationID: uuid.NewString(), + UID: uuid.NewString(), + DocumentStatus: datastore.ActiveDocumentStatus, + }, + }, + isDuplicate: true, + }, } for _, tc := range tt { @@ -93,28 +116,28 @@ func Test_CreateGroup(t *testing.T) { groupRepo := NewGroupRepo(db) for i, group := range tc.groups { - newOrg := &datastore.Group{ + newGroup := &datastore.Group{ Name: group.Name, UID: group.UID, DocumentStatus: group.DocumentStatus, } if i == 0 { - require.NoError(t, groupRepo.CreateGroup(context.Background(), newOrg)) + require.NoError(t, groupRepo.CreateGroup(context.Background(), newGroup)) - org, err := groupRepo.FetchGroupByID(context.Background(), newOrg.UID) + g, err := groupRepo.FetchGroupByID(context.Background(), newGroup.UID) require.NoError(t, err) - require.Equal(t, org.UID, newOrg.UID) + require.Equal(t, g.UID, newGroup.UID) } if i > 0 && tc.isDuplicate { - err := groupRepo.CreateGroup(context.Background(), newOrg) + err := groupRepo.CreateGroup(context.Background(), newGroup) require.Error(t, err) require.ErrorIs(t, err, datastore.ErrDuplicateGroupName) } if i > 0 && !tc.isDuplicate { - require.NoError(t, groupRepo.CreateGroup(context.Background(), newOrg)) + require.NoError(t, groupRepo.CreateGroup(context.Background(), newGroup)) } } diff --git a/datastore/mongo/mongo.go b/datastore/mongo/mongo.go index 204e97b136..932f81b7d4 100644 --- a/datastore/mongo/mongo.go +++ b/datastore/mongo/mongo.go @@ -156,7 +156,6 @@ func (c *Client) ConfigurationRepo() datastore.ConfigurationRepository { func (c *Client) ensureMongoIndices() { c.ensureIndex(GroupCollection, "uid", true, nil) - c.ensureIndex(GroupCollection, "name", true, bson.M{"document_status": datastore.ActiveDocumentStatus}) c.ensureIndex(OrganisationCollection, "uid", true, nil) @@ -179,11 +178,11 @@ func (c *Client) ensureMongoIndices() { c.ensureIndex(SourceCollection, "mask_id", true, nil) c.ensureIndex(SubscriptionCollection, "uid", true, nil) c.ensureIndex(SubscriptionCollection, "filter_config.event_type", false, nil) - c.ensureCompoundIndex(AppCollections) c.ensureCompoundIndex(EventCollection) c.ensureCompoundIndex(UserCollection) c.ensureCompoundIndex(AppCollections) + c.ensureCompoundIndex(GroupCollection) c.ensureCompoundIndex(EventDeliveryCollection) c.ensureCompoundIndex(OrganisationInvitesCollection) c.ensureCompoundIndex(OrganisationMembersCollection) @@ -242,6 +241,16 @@ func (c *Client) ensureCompoundIndex(collectionName string) bool { func compoundIndices() map[string][]mongo.IndexModel { compoundIndices := map[string][]mongo.IndexModel{ + GroupCollection: { + { + Keys: bson.D{ + {Key: "organisation_id", Value: 1}, + {Key: "name", Value: 1}, + {Key: "document_status", Value: 1}, + }, + Options: options.Index().SetUnique(true), + }, + }, EventCollection: { { Keys: bson.D{ diff --git a/datastore/mongo/organisation_invite.go b/datastore/mongo/organisation_invite.go index d30c1d3b04..0ba9637a7b 100644 --- a/datastore/mongo/organisation_invite.go +++ b/datastore/mongo/organisation_invite.go @@ -55,6 +55,7 @@ func (db *orgInviteRepo) UpdateOrganisationInvite(ctx context.Context, iv *datas iv.UpdatedAt = primitive.NewDateTimeFromTime(time.Now()) update := bson.D{primitive.E{Key: "$set", Value: bson.D{ primitive.E{Key: "role", Value: iv.Role}, + primitive.E{Key: "status", Value: iv.Status}, primitive.E{Key: "updated_at", Value: iv.UpdatedAt}, }}} diff --git a/datastore/mongo/organisation_invite_test.go b/datastore/mongo/organisation_invite_test.go index 413248b218..2ca890c8c7 100644 --- a/datastore/mongo/organisation_invite_test.go +++ b/datastore/mongo/organisation_invite_test.go @@ -135,7 +135,12 @@ func TestUpdateOrganisationInvite(t *testing.T) { Groups: []string{uuid.NewString()}, Apps: nil, } + status := datastore.InviteStatusAccepted + updatedAt := primitive.NewDateTimeFromTime(time.Now()) + iv.Role = role + iv.Status = status + iv.UpdatedAt = updatedAt err = inviteRepo.UpdateOrganisationInvite(context.Background(), iv) require.NoError(t, err) @@ -143,8 +148,10 @@ func TestUpdateOrganisationInvite(t *testing.T) { invite, err := inviteRepo.FetchOrganisationInviteByID(context.Background(), iv.UID) require.NoError(t, err) - require.Equal(t, invite.Role, role) require.Equal(t, invite.UID, iv.UID) + require.Equal(t, invite.Role, role) + require.Equal(t, invite.UpdatedAt, updatedAt) + require.Equal(t, invite.Status, status) } func TestDeleteOrganisationInvite(t *testing.T) { diff --git a/datastore/mongo/subscription.go b/datastore/mongo/subscription.go index 5cf3503984..55a27ee9b2 100644 --- a/datastore/mongo/subscription.go +++ b/datastore/mongo/subscription.go @@ -136,6 +136,27 @@ func (s *subscriptionRepo) FindSubscriptionByEventType(ctx context.Context, grou return subscription, nil } +func (s *subscriptionRepo) FindSubscriptionsByAppID(ctx context.Context, groupId string, appID string) ([]datastore.Subscription, error) { + filter := bson.M{ + "app_id": appID, + "group_id": groupId, + "document_status": datastore.ActiveDocumentStatus, + } + + c, err := s.client.Find(ctx, filter) + if errors.Is(err, mongo.ErrNoDocuments) { + return nil, datastore.ErrSubscriptionNotFound + } + + var subscription []datastore.Subscription + err = c.All(ctx, &subscription) + if err != nil { + return nil, err + } + + return subscription, nil +} + func (s *subscriptionRepo) FindSubscriptionBySourceIDs(ctx context.Context, groupId string, sourceId string) ([]datastore.Subscription, error) { var subscription []datastore.Subscription filter := bson.M{"group_id": groupId, "source_id": sourceId, "document_status": datastore.ActiveDocumentStatus} diff --git a/datastore/mongo/subscription_test.go b/datastore/mongo/subscription_test.go index bb07c80140..3657c4b8dd 100644 --- a/datastore/mongo/subscription_test.go +++ b/datastore/mongo/subscription_test.go @@ -18,6 +18,7 @@ func createSubscription() *datastore.Subscription { UID: uuid.NewString(), Name: "Subscription", Type: "incoming", + AppID: "app-id-1", GroupID: "group-id-1", SourceID: "source-id-1", EndpointID: "endpoint-id-1", @@ -185,3 +186,34 @@ func Test_FindSubscriptionByID(t *testing.T) { require.Equal(t, sub.SourceID, newSub.SourceID) require.Equal(t, sub.EndpointID, newSub.EndpointID) } + +func Test_FindSubscriptionByAppID(t *testing.T) { + db, closeFn := getDB(t) + defer closeFn() + + subRepo := NewSubscriptionRepo(db) + + for i := 0; i < 20; i++ { + subscription := &datastore.Subscription{ + UID: uuid.NewString(), + Name: fmt.Sprintf("Subscription %d", i), + Type: "incoming", + AppID: "app-id-1", + GroupID: "group-id-1", + SourceID: "source-id-1", + EndpointID: "endpoint-id-1", + DocumentStatus: datastore.ActiveDocumentStatus, + } + require.NoError(t, subRepo.CreateSubscription(context.Background(), subscription.GroupID, subscription)) + } + + // Fetch sub again + subs, err := subRepo.FindSubscriptionsByAppID(context.Background(), "group-id-1", "app-id-1") + require.NoError(t, err) + + for _, sub := range subs { + require.NotEmpty(t, sub.UID) + require.Equal(t, sub.AppID, "app-id-1") + require.Equal(t, sub.GroupID, "group-id-1") + } +} diff --git a/datastore/repository.go b/datastore/repository.go index 23917578a1..9f39a83ce6 100644 --- a/datastore/repository.go +++ b/datastore/repository.go @@ -96,6 +96,7 @@ type SubscriptionRepository interface { FindSubscriptionByID(context.Context, string, string) (*Subscription, error) FindSubscriptionByEventType(context.Context, string, string, EventType) ([]Subscription, error) FindSubscriptionBySourceIDs(context.Context, string, string) ([]Subscription, error) + FindSubscriptionsByAppID(ctx context.Context, groupId string, appID string) ([]Subscription, error) UpdateSubscriptionStatus(context.Context, string, string, SubscriptionStatus) error } diff --git a/docs/docs.go b/docs/docs.go index bec7cc1e87..e85a06ca4a 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -1,6 +1,6 @@ // Package docs GENERATED BY THE COMMAND ABOVE; DO NOT EDIT // This file was generated by swaggo/swag at -// 2022-06-11 23:17:36.882931 +0100 WAT m=+106.272259251 +// 2022-06-16 12:57:29.873543 +0100 WAT m=+106.525094960 package docs import ( diff --git a/docs/v3/openapi3.json b/docs/v3/openapi3.json index 3a23e72acb..311427e312 100644 --- a/docs/v3/openapi3.json +++ b/docs/v3/openapi3.json @@ -781,6 +781,37 @@ }, "type": "object" }, + "models.Configuration": { + "properties": { + "is_analytics_enabled": { + "type": "boolean" + } + }, + "type": "object" + }, + "models.ConfigurationResponse": { + "properties": { + "api_version": { + "type": "string" + }, + "created_at": { + "type": "integer" + }, + "deleted_at": { + "type": "integer" + }, + "is_analytics_enabled": { + "type": "boolean" + }, + "uid": { + "type": "string" + }, + "updated_at": { + "type": "integer" + } + }, + "type": "object" + }, "models.Endpoint": { "properties": { "description": { @@ -2839,6 +2870,239 @@ ] } }, + "/configuration": { + "get": { + "description": "This endpoint fetches configuration", + "responses": { + "200": { + "content": { + "application/json": { + "schema": { + "allOf": [ + { + "$ref": "#/components/schemas/server.serverResponse" + }, + { + "properties": { + "data": { + "allOf": [ + { + "$ref": "#/components/schemas/server.pagedResponse" + }, + { + "properties": { + "content": { + "items": { + "$ref": "#/components/schemas/models.ConfigurationResponse" + }, + "type": "array" + } + }, + "type": "object" + } + ] + } + }, + "type": "object" + } + ] + } + } + }, + "description": "OK" + }, + "400": { + "content": { + "application/json": { + "schema": { + "allOf": [ + { + "$ref": "#/components/schemas/server.serverResponse" + }, + { + "properties": { + "data": { + "$ref": "#/components/schemas/server.Stub" + } + }, + "type": "object" + } + ] + } + } + }, + "description": "Bad Request" + }, + "401": { + "content": { + "application/json": { + "schema": { + "allOf": [ + { + "$ref": "#/components/schemas/server.serverResponse" + }, + { + "properties": { + "data": { + "$ref": "#/components/schemas/server.Stub" + } + }, + "type": "object" + } + ] + } + } + }, + "description": "Unauthorized" + }, + "500": { + "content": { + "application/json": { + "schema": { + "allOf": [ + { + "$ref": "#/components/schemas/server.serverResponse" + }, + { + "properties": { + "data": { + "$ref": "#/components/schemas/server.Stub" + } + }, + "type": "object" + } + ] + } + } + }, + "description": "Internal Server Error" + } + }, + "security": [ + { + "ApiKeyAuth": [] + } + ], + "summary": "Fetch configuration", + "tags": [ + "Source" + ] + }, + "post": { + "description": "This endpoint creates a configuration", + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/models.Configuration" + } + } + }, + "description": "Configuration Details", + "required": true, + "x-originalParamName": "application" + }, + "responses": { + "200": { + "content": { + "application/json": { + "schema": { + "allOf": [ + { + "$ref": "#/components/schemas/server.serverResponse" + }, + { + "properties": { + "data": { + "$ref": "#/components/schemas/models.ConfigurationResponse" + } + }, + "type": "object" + } + ] + } + } + }, + "description": "OK" + }, + "400": { + "content": { + "application/json": { + "schema": { + "allOf": [ + { + "$ref": "#/components/schemas/server.serverResponse" + }, + { + "properties": { + "data": { + "$ref": "#/components/schemas/server.Stub" + } + }, + "type": "object" + } + ] + } + } + }, + "description": "Bad Request" + }, + "401": { + "content": { + "application/json": { + "schema": { + "allOf": [ + { + "$ref": "#/components/schemas/server.serverResponse" + }, + { + "properties": { + "data": { + "$ref": "#/components/schemas/server.Stub" + } + }, + "type": "object" + } + ] + } + } + }, + "description": "Unauthorized" + }, + "500": { + "content": { + "application/json": { + "schema": { + "allOf": [ + { + "$ref": "#/components/schemas/server.serverResponse" + }, + { + "properties": { + "data": { + "$ref": "#/components/schemas/server.Stub" + } + }, + "type": "object" + } + ] + } + } + }, + "description": "Internal Server Error" + } + }, + "security": [ + { + "ApiKeyAuth": [] + } + ], + "summary": "Create a configuration", + "tags": [ + "Application" + ] + } + }, "/eventdeliveries": { "get": { "description": "This endpoint fetch event deliveries.", diff --git a/docs/v3/openapi3.yaml b/docs/v3/openapi3.yaml index 4a16c68aa9..fb5ad5f369 100644 --- a/docs/v3/openapi3.yaml +++ b/docs/v3/openapi3.yaml @@ -522,6 +522,26 @@ components: support_email: type: string type: object + models.Configuration: + properties: + is_analytics_enabled: + type: boolean + type: object + models.ConfigurationResponse: + properties: + api_version: + type: string + created_at: + type: integer + deleted_at: + type: integer + is_analytics_enabled: + type: boolean + uid: + type: string + updated_at: + type: integer + type: object models.Endpoint: properties: description: @@ -1677,6 +1697,126 @@ paths: summary: Refresh an access token tags: - User + /configuration: + get: + description: This endpoint fetches configuration + responses: + "200": + content: + application/json: + schema: + allOf: + - $ref: '#/components/schemas/server.serverResponse' + - properties: + data: + allOf: + - $ref: '#/components/schemas/server.pagedResponse' + - properties: + content: + items: + $ref: '#/components/schemas/models.ConfigurationResponse' + type: array + type: object + type: object + description: OK + "400": + content: + application/json: + schema: + allOf: + - $ref: '#/components/schemas/server.serverResponse' + - properties: + data: + $ref: '#/components/schemas/server.Stub' + type: object + description: Bad Request + "401": + content: + application/json: + schema: + allOf: + - $ref: '#/components/schemas/server.serverResponse' + - properties: + data: + $ref: '#/components/schemas/server.Stub' + type: object + description: Unauthorized + "500": + content: + application/json: + schema: + allOf: + - $ref: '#/components/schemas/server.serverResponse' + - properties: + data: + $ref: '#/components/schemas/server.Stub' + type: object + description: Internal Server Error + security: + - ApiKeyAuth: [] + summary: Fetch configuration + tags: + - Source + post: + description: This endpoint creates a configuration + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/models.Configuration' + description: Configuration Details + required: true + x-originalParamName: application + responses: + "200": + content: + application/json: + schema: + allOf: + - $ref: '#/components/schemas/server.serverResponse' + - properties: + data: + $ref: '#/components/schemas/models.ConfigurationResponse' + type: object + description: OK + "400": + content: + application/json: + schema: + allOf: + - $ref: '#/components/schemas/server.serverResponse' + - properties: + data: + $ref: '#/components/schemas/server.Stub' + type: object + description: Bad Request + "401": + content: + application/json: + schema: + allOf: + - $ref: '#/components/schemas/server.serverResponse' + - properties: + data: + $ref: '#/components/schemas/server.Stub' + type: object + description: Unauthorized + "500": + content: + application/json: + schema: + allOf: + - $ref: '#/components/schemas/server.serverResponse' + - properties: + data: + $ref: '#/components/schemas/server.Stub' + type: object + description: Internal Server Error + security: + - ApiKeyAuth: [] + summary: Create a configuration + tags: + - Application /eventdeliveries: get: description: This endpoint fetch event deliveries. diff --git a/go.mod b/go.mod index dabd89db2e..d179123d2f 100644 --- a/go.mod +++ b/go.mod @@ -38,6 +38,7 @@ require ( github.com/newrelic/go-agent/v3 v3.15.2 github.com/newrelic/go-agent/v3/integrations/nrlogrus v1.0.1 github.com/newrelic/go-agent/v3/integrations/nrmongo v1.0.2 + github.com/newrelic/go-agent/v3/integrations/nrredis-v8 v1.0.0 // indirect github.com/olekukonko/tablewriter v0.0.5 github.com/onsi/gomega v1.19.0 // indirect github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index bfd57ad046..07910cd8e7 100644 --- a/go.sum +++ b/go.sum @@ -436,6 +436,7 @@ github.com/go-redis/cache/v8 v8.4.3 h1:+RZ0pQM+zOd6h/oWCsOl3+nsCgii9rn26oCYmU87k github.com/go-redis/cache/v8 v8.4.3/go.mod h1:5lQPQ63uyBt4aZuRmdvUJOJRRjPxfLtJtlcJ/z8o1jA= github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= +github.com/go-redis/redis/v8 v8.4.0/go.mod h1:A1tbYoHSa1fXwN+//ljcCYYJeLmVrwL9hbQN45Jdy0M= github.com/go-redis/redis/v8 v8.11.2/go.mod h1:DLomh7y2e3ggQXQLd1YgmvIfecPJoFl7WU5SOQ/r06M= github.com/go-redis/redis/v8 v8.11.3/go.mod h1:xNJ9xDG09FsIPwh3bWdk+0oDWHbtF9rPN0F/oD9XeKc= github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w= @@ -843,6 +844,8 @@ github.com/newrelic/go-agent/v3/integrations/nrlogrus v1.0.1 h1:Tv985B4QriX/KxNw github.com/newrelic/go-agent/v3/integrations/nrlogrus v1.0.1/go.mod h1:JpiVn2lqR9Vk6Iq7mYGQPJhKEnthbba4QqM8Jb1JTW0= github.com/newrelic/go-agent/v3/integrations/nrmongo v1.0.2 h1:1yI2B9BRMqLWhMIY5EE5fzuKkuT7rZxxvLVi+uFSwl4= github.com/newrelic/go-agent/v3/integrations/nrmongo v1.0.2/go.mod h1:iz2tq3oBn0UJhUyiDEgSFNamtGS3iuH0/VA1hHjew9c= +github.com/newrelic/go-agent/v3/integrations/nrredis-v8 v1.0.0 h1:lKNlA35kMBOjJGLusSHE6ydLhmQ7QmjzGzdRidfcWRI= +github.com/newrelic/go-agent/v3/integrations/nrredis-v8 v1.0.0/go.mod h1:xL0cXGWOoPJDg16IqEUncqjZR3Qca5ng7yUCRrPYwyI= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= @@ -863,6 +866,7 @@ github.com/onsi/ginkgo v1.10.3/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+ github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.0/go.mod h1:oUhWkIvk5aDxtKvDDuw8gItl8pKl42LzjC9KZE0HfGg= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= +github.com/onsi/ginkgo v1.14.2/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= github.com/onsi/ginkgo v1.15.0/go.mod h1:hF8qUzuuC8DJGygJH3726JnCZX4MYbRB8yFfISqnKUg= github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= @@ -1188,6 +1192,7 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= +go.opentelemetry.io/otel v0.14.0/go.mod h1:vH5xEuwy7Rts0GNtsCW3HYQoZDY+OmBJ6t1bFGGlxgw= go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo= go.opentelemetry.io/otel/metric v0.20.0/go.mod h1:598I5tYlH1vzBjn+BTuhzTCSb/9debfNp6R3s7Pr1eU= go.opentelemetry.io/otel/oteltest v0.20.0/go.mod h1:L7bgKf9ZB7qCwT9Up7i9/pn0PWIa9FqQ2IQ8LoxiGnw= @@ -1428,6 +1433,7 @@ golang.org/x/sys v0.0.0-20200331124033-c3d80250170d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200501052902-10377860bb8e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200511232937-7e40ca221e25/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200515095857-1151b9dac4a9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200622214017-ed371f2e16b4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/internal/email/email.go b/internal/email/email.go index 5a3f310fec..f65c0f30e5 100644 --- a/internal/email/email.go +++ b/internal/email/email.go @@ -6,7 +6,7 @@ import ( "html/template" "strings" - "github.com/frain-dev/convoy/pkg/smtp" + "github.com/frain-dev/convoy/internal/pkg/smtp" ) //go:embed templates/* diff --git a/internal/email/email_test.go b/internal/email/email_test.go index ab09f74ac1..601def4555 100644 --- a/internal/email/email_test.go +++ b/internal/email/email_test.go @@ -3,8 +3,8 @@ package email import ( "testing" + "github.com/frain-dev/convoy/internal/pkg/smtp" "github.com/frain-dev/convoy/mocks" - "github.com/frain-dev/convoy/pkg/smtp" "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" ) diff --git a/internal/pkg/apm/apm.go b/internal/pkg/apm/apm.go new file mode 100644 index 0000000000..df2a490090 --- /dev/null +++ b/internal/pkg/apm/apm.go @@ -0,0 +1,86 @@ +package apm + +import ( + "context" + "net/http" + + "github.com/newrelic/go-agent/v3/newrelic" +) + +var ( + std = New() +) + +func SetApplication(app *newrelic.Application) { + std.SetApplication(app) +} + +func NoticeError(ctx context.Context, err error) { + std.NoticeError(ctx, err) +} + +func StartTransaction(name string) *Transaction { + return std.StartTransaction(name) +} + +func StartWebTransaction(name string, r *http.Request, w http.ResponseWriter) (*Transaction, *http.Request, http.ResponseWriter) { + return std.StartWebTransaction(name, r, w) +} + +type APM struct { + application *newrelic.Application +} + +func New() *APM { + return &APM{} +} + +func (a *APM) SetApplication(app *newrelic.Application) { + a.application = app +} + +func (a *APM) NoticeError(ctx context.Context, err error) { + txn := newrelic.FromContext(ctx) + txn.NoticeError(err) +} + +func (a *APM) StartTransaction(name string) *Transaction { + inner := a.createTransaction(name) + return NewTransaction(inner) +} + +func (a *APM) StartWebTransaction(name string, r *http.Request, w http.ResponseWriter) (*Transaction, *http.Request, http.ResponseWriter) { + inner := a.createTransaction(name) + + // Set the transaction as a web request, gather attributes based on the + // request, and read incoming distributed trace headers. + inner.SetWebRequestHTTP(r) + + // Prepare to capture attributes, errors, and headers from the + // response. + w = inner.SetWebResponse(w) + + // Add the Transaction to the http.Request's Context. + r = newrelic.RequestWithTransactionContext(r, inner) + + // Encapsulate Transaction + txn := NewTransaction(inner) + + return txn, r, w +} + +func (a *APM) createTransaction(name string) *newrelic.Transaction { + return a.application.StartTransaction(name) +} + +type Transaction struct { + txn *newrelic.Transaction +} + +func NewTransaction(inner *newrelic.Transaction) *Transaction { + return &Transaction{inner} +} + +func (t *Transaction) End() { + t.txn.End() +} diff --git a/internal/pkg/rdb/rdb.go b/internal/pkg/rdb/rdb.go new file mode 100644 index 0000000000..f7ef7fed9b --- /dev/null +++ b/internal/pkg/rdb/rdb.go @@ -0,0 +1,46 @@ +package rdb + +import ( + "errors" + + "github.com/frain-dev/convoy/util" + "github.com/go-redis/redis/v8" + "github.com/newrelic/go-agent/v3/integrations/nrredis-v8" +) + +// Redis is our wrapper logic to instrument redis calls +type Redis struct { + dsn string + client *redis.Client +} + +// NewClient is used to create new Redis type. This type +// encapsulates our interaction with redis and provides instrumentation with new relic. +func NewClient(dsn string) (*Redis, error) { + if util.IsStringEmpty(dsn) { + return nil, errors.New("redis dsn cannot be empty") + } + + opts, err := redis.ParseURL(dsn) + + if err != nil { + return nil, err + } + + client := redis.NewClient(opts) + + // Add Instrumentation + client.AddHook(nrredis.NewHook(opts)) + + return &Redis{dsn: dsn, client: client}, nil +} + +// Client is to return underlying redis interface +func (r *Redis) Client() *redis.Client { + return r.client +} + +// MakeRedisClient is used to fulfill asynq's interface +func (r *Redis) MakeRedisClient() interface{} { + return r.client +} diff --git a/pkg/smtp/smtp.go b/internal/pkg/smtp/smtp.go similarity index 100% rename from pkg/smtp/smtp.go rename to internal/pkg/smtp/smtp.go diff --git a/pkg/smtp/smtp_test.go b/internal/pkg/smtp/smtp_test.go similarity index 100% rename from pkg/smtp/smtp_test.go rename to internal/pkg/smtp/smtp_test.go diff --git a/mocks/repository.go b/mocks/repository.go index 349637c7b0..b851bf20a8 100644 --- a/mocks/repository.go +++ b/mocks/repository.go @@ -1160,6 +1160,21 @@ func (mr *MockSubscriptionRepositoryMockRecorder) FindSubscriptionBySourceIDs(ar return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindSubscriptionBySourceIDs", reflect.TypeOf((*MockSubscriptionRepository)(nil).FindSubscriptionBySourceIDs), arg0, arg1, arg2) } +// FindSubscriptionsByAppID mocks base method. +func (m *MockSubscriptionRepository) FindSubscriptionsByAppID(ctx context.Context, groupId, appID string) ([]datastore.Subscription, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FindSubscriptionsByAppID", ctx, groupId, appID) + ret0, _ := ret[0].([]datastore.Subscription) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FindSubscriptionsByAppID indicates an expected call of FindSubscriptionsByAppID. +func (mr *MockSubscriptionRepositoryMockRecorder) FindSubscriptionsByAppID(ctx, groupId, appID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindSubscriptionsByAppID", reflect.TypeOf((*MockSubscriptionRepository)(nil).FindSubscriptionsByAppID), ctx, groupId, appID) +} + // LoadSubscriptionsPaged mocks base method. func (m *MockSubscriptionRepository) LoadSubscriptionsPaged(arg0 context.Context, arg1 string, arg2 datastore.Pageable) ([]datastore.Subscription, datastore.PaginationData, error) { m.ctrl.T.Helper() diff --git a/notification/email/email.go b/notification/email/email.go index ef06a2d356..d70bf9fc16 100644 --- a/notification/email/email.go +++ b/notification/email/email.go @@ -6,8 +6,8 @@ import ( "github.com/frain-dev/convoy/config" em "github.com/frain-dev/convoy/internal/email" + "github.com/frain-dev/convoy/internal/pkg/smtp" "github.com/frain-dev/convoy/notification" - "github.com/frain-dev/convoy/pkg/smtp" ) type Email struct { diff --git a/queue/queue.go b/queue/queue.go index be8da672c0..c8114f6b59 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -5,7 +5,7 @@ import ( "time" "github.com/frain-dev/convoy" - "github.com/hibiken/asynq" + "github.com/frain-dev/convoy/internal/pkg/rdb" ) type Queuer interface { @@ -22,7 +22,7 @@ type Job struct { type QueueOptions struct { Names map[string]int Type string - Client *asynq.Client + RedisClient *rdb.Redis RedisAddress string PrometheusAddress string } diff --git a/queue/redis/client.go b/queue/redis/client.go index 394056dcc6..672ddbf40f 100644 --- a/queue/redis/client.go +++ b/queue/redis/client.go @@ -3,12 +3,10 @@ package redis import ( "errors" - "github.com/go-redis/redis/v8" - "github.com/frain-dev/convoy" "github.com/frain-dev/convoy/config" + "github.com/frain-dev/convoy/internal/pkg/rdb" "github.com/frain-dev/convoy/queue" - "github.com/frain-dev/convoy/util" "github.com/google/uuid" "github.com/hibiken/asynq" "github.com/hibiken/asynqmon" @@ -16,6 +14,7 @@ import ( type RedisQueue struct { opts queue.QueueOptions + client *asynq.Client inspector *asynq.Inspector } @@ -24,27 +23,21 @@ func NewClient(cfg config.Configuration) (*asynq.Client, error) { return nil, errors.New("please select the redis driver in your config") } - if util.IsStringEmpty(cfg.Queue.Redis.Dsn) { - return nil, errors.New("please provide the Redis DSN") - } - opts, err := redis.ParseURL(cfg.Queue.Redis.Dsn) + rdb, err := rdb.NewClient(cfg.Queue.Redis.Dsn) if err != nil { - return nil, errors.New("error parsing redis dsn") + return nil, err } - client := asynq.NewClient(asynq.RedisClientOpt{Addr: opts.Addr}) + client := asynq.NewClient(rdb) return client, nil } func NewQueue(opts queue.QueueOptions) queue.Queuer { - rOpts, _ := redis.ParseURL(opts.RedisAddress) - opts.RedisAddress = rOpts.Addr - - inspector := asynq.NewInspector(asynq.RedisClientOpt{ - Addr: opts.RedisAddress, - }) + client := asynq.NewClient(opts.RedisClient) + inspector := asynq.NewInspector(opts.RedisClient) return &RedisQueue{ + client: client, opts: opts, inspector: inspector, } @@ -55,7 +48,7 @@ func (q *RedisQueue) Write(taskName convoy.TaskName, queueName convoy.QueueName, job.ID = uuid.NewString() } t := asynq.NewTask(string(taskName), job.Payload, asynq.Queue(string(queueName)), asynq.TaskID(job.ID), asynq.ProcessIn(job.Delay)) - _, err := q.opts.Client.Enqueue(t) + _, err := q.client.Enqueue(t) return err } @@ -65,12 +58,8 @@ func (q *RedisQueue) Options() queue.QueueOptions { func (q *RedisQueue) Monitor() *asynqmon.HTTPHandler { h := asynqmon.New(asynqmon.Options{ - RootPath: "/queue/monitoring", - RedisConnOpt: asynq.RedisClientOpt{ - Addr: q.opts.RedisAddress, - Password: "", - DB: 0, - }, + RootPath: "/queue/monitoring", + RedisConnOpt: q.opts.RedisClient, PrometheusAddress: q.opts.PrometheusAddress, }) return h diff --git a/queue/redis/client_test.go b/queue/redis/client_test.go index 7808c9149d..27bf94eef9 100644 --- a/queue/redis/client_test.go +++ b/queue/redis/client_test.go @@ -10,9 +10,9 @@ import ( "github.com/frain-dev/convoy" "github.com/frain-dev/convoy/config" "github.com/frain-dev/convoy/datastore" + "github.com/frain-dev/convoy/internal/pkg/rdb" "github.com/frain-dev/convoy/queue" "github.com/google/uuid" - "github.com/hibiken/asynq" ) func TestWrite(t *testing.T) { @@ -70,10 +70,9 @@ func initializeQueue(configFile string, name string, t *testing.T) queue.Queuer } - var rC *asynq.Client var opts queue.QueueOptions - rC, err = NewClient(cfg) + rdb, err := rdb.NewClient(cfg.Queue.Redis.Dsn) if err != nil { t.Fatalf("Failed to load new client: %v", err) } @@ -84,7 +83,7 @@ func initializeQueue(configFile string, name string, t *testing.T) queue.Queuer } opts = queue.QueueOptions{ Names: queueNames, - Client: rC, + RedisClient: rdb, RedisAddress: cfg.Queue.Redis.Dsn, Type: string(config.RedisQueueProvider), } diff --git a/sentry_hook.go b/sentry_hook.go deleted file mode 100644 index 1205ca9141..0000000000 --- a/sentry_hook.go +++ /dev/null @@ -1,36 +0,0 @@ -package convoy - -import ( - "fmt" - - "github.com/getsentry/sentry-go" - log "github.com/sirupsen/logrus" -) - -var DefaultLevels = []log.Level{ - log.ErrorLevel, - log.PanicLevel, - log.FatalLevel, - log.WarnLevel, -} - -type SentryHook struct { - LogLevels []log.Level -} - -func NewSentryHook(levels []log.Level) *SentryHook { - return &SentryHook{LogLevels: levels} -} - -func (s *SentryHook) Levels() []log.Level { - return s.LogLevels -} - -func (s *SentryHook) Fire(entry *log.Entry) error { - msg, err := entry.String() - if err != nil { - return fmt.Errorf("failed to get entry string - %w", err) - } - sentry.CaptureMessage(msg) - return nil -} diff --git a/server/group_integration_test.go b/server/group_integration_test.go index 43b1e2afd5..28a3818a81 100644 --- a/server/group_integration_test.go +++ b/server/group_integration_test.go @@ -6,6 +6,10 @@ package server import ( "context" "fmt" + "net/http" + "net/http/httptest" + "testing" + "github.com/frain-dev/convoy/auth" "github.com/frain-dev/convoy/config" "github.com/frain-dev/convoy/datastore" @@ -14,9 +18,6 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "net/http" - "net/http/httptest" - "testing" ) type GroupIntegrationTestSuite struct { @@ -67,7 +68,7 @@ func (s *GroupIntegrationTestSuite) TestGetGroup() { expectedStatusCode := http.StatusOK // Just Before. - group, err := testdb.SeedGroup(s.DB, groupID, "", "", nil) + group, err := testdb.SeedGroup(s.DB, groupID, "", "", datastore.OutgoingGroup, nil) require.NoError(s.T(), err) app, _ := testdb.SeedApplication(s.DB, group, uuid.NewString(), "test-app", false) _, _ = testdb.SeedEndpoint(s.DB, app, group.UID) @@ -111,7 +112,7 @@ func (s *GroupIntegrationTestSuite) TestDeleteGroup() { expectedStatusCode := http.StatusOK // Just Before. - group, err := testdb.SeedGroup(s.DB, groupID, "", "", nil) + group, err := testdb.SeedGroup(s.DB, groupID, "", "", datastore.OutgoingGroup, nil) require.NoError(s.T(), err) url := fmt.Sprintf("/api/v1/groups/%s", group.UID) @@ -202,7 +203,7 @@ func (s *GroupIntegrationTestSuite) TestUpdateGroup() { expectedStatusCode := http.StatusAccepted // Just Before. - group, err := testdb.SeedGroup(s.DB, groupID, "", "test-group", nil) + group, err := testdb.SeedGroup(s.DB, groupID, "", "test-group", datastore.OutgoingGroup, nil) require.NoError(s.T(), err) url := fmt.Sprintf("/api/v1/groups/%s", group.UID) @@ -241,9 +242,9 @@ func (s *GroupIntegrationTestSuite) TestGetGroups() { expectedStatusCode := http.StatusOK // Just Before. - group1, _ := testdb.SeedGroup(s.DB, uuid.NewString(), "", "test-group-1", nil) - group2, _ := testdb.SeedGroup(s.DB, uuid.NewString(), "", "test-group-2", nil) - group3, _ := testdb.SeedGroup(s.DB, uuid.NewString(), "", "test-group-3", nil) + group1, _ := testdb.SeedGroup(s.DB, uuid.NewString(), "", "test-group-1", datastore.OutgoingGroup, nil) + group2, _ := testdb.SeedGroup(s.DB, uuid.NewString(), "", "test-group-2", datastore.OutgoingGroup, nil) + group3, _ := testdb.SeedGroup(s.DB, uuid.NewString(), "", "test-group-3", datastore.OutgoingGroup, nil) req := createRequest(http.MethodGet, "/api/v1/groups", nil) w := httptest.NewRecorder() @@ -268,9 +269,9 @@ func (s *GroupIntegrationTestSuite) TestGetGroups_FilterByName() { expectedStatusCode := http.StatusOK // Just Before. - group1, _ := testdb.SeedGroup(s.DB, uuid.NewString(), "abcdef", "", nil) - _, _ = testdb.SeedGroup(s.DB, uuid.NewString(), "test-group-2", "", nil) - _, _ = testdb.SeedGroup(s.DB, uuid.NewString(), "test-group-3", "", nil) + group1, _ := testdb.SeedGroup(s.DB, uuid.NewString(), "abcdef", "", datastore.OutgoingGroup, nil) + _, _ = testdb.SeedGroup(s.DB, uuid.NewString(), "test-group-2", "", datastore.OutgoingGroup, nil) + _, _ = testdb.SeedGroup(s.DB, uuid.NewString(), "test-group-3", "", datastore.OutgoingGroup, nil) url := fmt.Sprintf("/api/v1/groups?name=%s", group1.Name) req := createRequest(http.MethodGet, url, nil) diff --git a/server/middleware.go b/server/middleware.go index 3db64c2433..3bf071579d 100644 --- a/server/middleware.go +++ b/server/middleware.go @@ -12,6 +12,7 @@ import ( "github.com/frain-dev/convoy" "github.com/frain-dev/convoy/cache" + "github.com/frain-dev/convoy/internal/pkg/apm" "github.com/frain-dev/convoy/logger" "github.com/frain-dev/convoy/tracer" "github.com/newrelic/go-agent/v3/newrelic" @@ -68,21 +69,8 @@ func instrumentPath(path string) func(http.Handler) http.Handler { func instrumentRequests(tr tracer.Tracer) func(next http.Handler) http.Handler { return func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - cfg, err := config.Get() - - if err != nil { - log.WithError(err).Error("failed to load configuration") - return - } - - if cfg.Tracer.Type == config.NewRelicTracerProvider { - txn := tr.StartTransaction(r.URL.Path) - defer txn.End() - - tr.SetWebRequestHTTP(r, txn) - w = tr.SetWebResponse(w, txn) - r = tr.RequestWithTransactionContext(r, txn) - } + txn, r, w := apm.StartWebTransaction(r.URL.Path, r, w) + defer txn.End() next.ServeHTTP(w, r) }) @@ -710,7 +698,7 @@ func requirePermission(role auth.RoleType) func(next http.Handler) http.Handler return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { authUser := getAuthUserFromContext(r.Context()) if authUser.Role.Type.Is(auth.RoleSuperUser) { - // superuser has access to everything + //superuser has access to everything next.ServeHTTP(w, r) return } @@ -744,7 +732,9 @@ func getAuthFromRequest(r *http.Request) (*auth.Credential, error) { authInfo := strings.Split(val, " ") if len(authInfo) != 2 { - return nil, errors.New("invalid header structure") + err := errors.New("invalid header structure") + apm.NoticeError(r.Context(), err) + return nil, err } credType := auth.CredentialType(strings.ToUpper(authInfo[0])) diff --git a/server/route.go b/server/route.go index 308ec31010..c87fc7c05f 100644 --- a/server/route.go +++ b/server/route.go @@ -358,7 +358,7 @@ func buildRoutes(app *applicationHandler) http.Handler { subscriptionRouter.Post("/", app.CreateSubscription) subscriptionRouter.With(pagination).Get("/", app.GetSubscriptions) - subscriptionRouter.Delete("/", app.DeleteSubscription) + subscriptionRouter.Delete("/{subscriptionID}", app.DeleteSubscription) subscriptionRouter.Get("/{subscriptionID}", app.GetSubscription) subscriptionRouter.Put("/{subscriptionID}", app.UpdateSubscription) }) diff --git a/server/server_suite_test.go b/server/server_suite_test.go index 5f77993d75..0078b6a5c8 100644 --- a/server/server_suite_test.go +++ b/server/server_suite_test.go @@ -17,6 +17,7 @@ import ( "testing" "time" + "github.com/frain-dev/convoy/internal/pkg/rdb" "github.com/frain-dev/convoy/server/models" "github.com/frain-dev/convoy" @@ -72,7 +73,7 @@ func getDB() datastore.DatabaseClient { func getQueueOptions(name string) (queue.QueueOptions, error) { var opts queue.QueueOptions cfg := getConfig() - rC, err := redisqueue.NewClient(cfg) + rdb, err := rdb.NewClient(cfg.Queue.Redis.Dsn) if err != nil { return opts, err } @@ -83,7 +84,7 @@ func getQueueOptions(name string) (queue.QueueOptions, error) { } opts = queue.QueueOptions{ Names: queueNames, - Client: rC, + RedisClient: rdb, RedisAddress: cfg.Queue.Redis.Dsn, Type: string(config.RedisQueueProvider), } diff --git a/server/subscription.go b/server/subscription.go index 359c305a8c..8fd0a83706 100644 --- a/server/subscription.go +++ b/server/subscription.go @@ -3,6 +3,8 @@ package server import ( "net/http" + "github.com/frain-dev/convoy/datastore" + "github.com/frain-dev/convoy/server/models" "github.com/frain-dev/convoy/util" "github.com/go-chi/chi/v5" @@ -62,7 +64,8 @@ func (a *applicationHandler) GetSubscription(w http.ResponseWriter, r *http.Requ return } - if subscription.SourceID != "" { + // only incoming groups have sources + if group.Type == datastore.IncomingGroup && subscription.SourceID != "" { source, err := a.sourceService.FindSourceByID(r.Context(), group, subscription.SourceID) if err != nil { _ = render.Render(w, r, newServiceErrResponse(err)) diff --git a/server/subscription_integration_test.go b/server/subscription_integration_test.go index 6391edb516..019a35b9c5 100644 --- a/server/subscription_integration_test.go +++ b/server/subscription_integration_test.go @@ -117,7 +117,7 @@ func (s *SubscriptionIntegrationTestSuite) Test_CreateSubscription_InvalidBody() "event_types": [ "user.created", "user.updated" - ] + ] } }` @@ -147,17 +147,20 @@ func (s *SubscriptionIntegrationTestSuite) Test_GetOneSubscription_SubscriptionN require.Equal(s.T(), http.StatusNotFound, w.Code) } -func (s *SubscriptionIntegrationTestSuite) Test_GetOneSubscription_ValidSubscription() { +func (s *SubscriptionIntegrationTestSuite) Test_GetOneSubscription_OutgoingGroup_ValidSubscription() { subscriptionId := "123456789" + group, err := testdb.SeedGroup(s.DB, uuid.NewString(), "test-group", "", datastore.OutgoingGroup, nil) + require.NoError(s.T(), err) + // Just Before - app, _ := testdb.SeedApplication(s.DB, s.DefaultGroup, uuid.NewString(), "", false) - endpoint, _ := testdb.SeedEndpoint(s.DB, app, s.DefaultGroup.UID) - source, _ := testdb.SeedSource(s.DB, s.DefaultGroup, uuid.NewString()) - _, _ = testdb.SeedSubscription(s.DB, app, s.DefaultGroup, subscriptionId, datastore.OutgoingGroup, source, endpoint, &datastore.RetryConfiguration{}, &datastore.AlertConfiguration{}, &datastore.FilterConfiguration{}) + app, _ := testdb.SeedApplication(s.DB, group, uuid.NewString(), "", false) + endpoint, _ := testdb.SeedEndpoint(s.DB, app, group.UID) + source, _ := testdb.SeedSource(s.DB, group, uuid.NewString()) + _, _ = testdb.SeedSubscription(s.DB, app, group, subscriptionId, group.Type, source, endpoint, &datastore.RetryConfiguration{}, &datastore.AlertConfiguration{}, &datastore.FilterConfiguration{}) // Arrange Request - url := fmt.Sprintf("/api/v1/subscriptions/%s", subscriptionId) + url := fmt.Sprintf("/api/v1/subscriptions/%s?groupID=%s", subscriptionId, group.UID) req := createRequest(http.MethodGet, url, nil) req.SetBasicAuth("test", "test") w := httptest.NewRecorder() @@ -172,7 +175,41 @@ func (s *SubscriptionIntegrationTestSuite) Test_GetOneSubscription_ValidSubscrip var subscription *datastore.Subscription parseResponse(s.T(), w.Result(), &subscription) - dbSub, err := s.DB.SubRepo().FindSubscriptionByID(context.Background(), s.DefaultGroup.UID, subscriptionId) + dbSub, err := s.DB.SubRepo().FindSubscriptionByID(context.Background(), group.UID, subscriptionId) + require.NoError(s.T(), err) + require.Equal(s.T(), subscription.UID, dbSub.UID) + require.Equal(s.T(), subscription.Endpoint.UID, dbSub.EndpointID) +} + +func (s *SubscriptionIntegrationTestSuite) Test_GetOneSubscription_IncomingGroup_ValidSubscription() { + subscriptionId := "123456789" + + group, err := testdb.SeedGroup(s.DB, uuid.NewString(), "test-group", "", datastore.IncomingGroup, nil) + require.NoError(s.T(), err) + + // Just Before + app, _ := testdb.SeedApplication(s.DB, group, uuid.NewString(), "", false) + endpoint, _ := testdb.SeedEndpoint(s.DB, app, group.UID) + source, _ := testdb.SeedSource(s.DB, group, uuid.NewString()) + _, _ = testdb.SeedSubscription(s.DB, app, group, subscriptionId, group.Type, source, endpoint, &datastore.RetryConfiguration{}, &datastore.AlertConfiguration{}, &datastore.FilterConfiguration{}) + + // Arrange Request + url := fmt.Sprintf("/api/v1/subscriptions/%s?groupID=%s", subscriptionId, group.UID) + req := createRequest(http.MethodGet, url, nil) + req.SetBasicAuth("test", "test") + w := httptest.NewRecorder() + + // Act + s.Router.ServeHTTP(w, req) + + // Assert + require.Equal(s.T(), http.StatusOK, w.Code) + + // Deep Assert + var subscription *datastore.Subscription + parseResponse(s.T(), w.Result(), &subscription) + + dbSub, err := s.DB.SubRepo().FindSubscriptionByID(context.Background(), group.UID, subscriptionId) require.NoError(s.T(), err) require.Equal(s.T(), subscription.UID, dbSub.UID) require.Equal(s.T(), subscription.Source.UID, dbSub.SourceID) @@ -258,7 +295,7 @@ func (s *SubscriptionIntegrationTestSuite) Test_UpdateSubscription() { "event_types": [ "user.created", "user.updated" - ] + ] } }` diff --git a/server/testdb/seed.go b/server/testdb/seed.go index d54329fce6..6b77174e41 100644 --- a/server/testdb/seed.go +++ b/server/testdb/seed.go @@ -304,14 +304,14 @@ func SeedAPIKey(db datastore.DatabaseClient, role auth.Role, uid, name, keyType } // seed default group -func SeedGroup(db datastore.DatabaseClient, uid, name, orgID string, cfg *datastore.GroupConfig) (*datastore.Group, error) { +func SeedGroup(db datastore.DatabaseClient, uid, name, orgID string, groupType datastore.GroupType, cfg *datastore.GroupConfig) (*datastore.Group, error) { if orgID == "" { orgID = uuid.NewString() } g := &datastore.Group{ UID: uid, Name: name, - Type: datastore.OutgoingGroup, + Type: groupType, Config: cfg, OrganisationID: orgID, RateLimit: convoy.RATE_LIMIT, diff --git a/services/app_service.go b/services/app_service.go index a07b7e3f2d..3f344dab4f 100644 --- a/services/app_service.go +++ b/services/app_service.go @@ -162,6 +162,7 @@ func (a *AppService) CreateAppEndpoint(ctx context.Context, e models.Endpoint, a Description: e.Description, Secret: e.Secret, RateLimit: e.RateLimit, + HttpTimeout: e.HttpTimeout, RateLimitDuration: duration.String(), CreatedAt: primitive.NewDateTimeFromTime(time.Now()), UpdatedAt: primitive.NewDateTimeFromTime(time.Now()), diff --git a/web/ui/dashboard/src/app/models/group.model.ts b/web/ui/dashboard/src/app/models/group.model.ts index 513fe64ee8..a09f4140e3 100644 --- a/web/ui/dashboard/src/app/models/group.model.ts +++ b/web/ui/dashboard/src/app/models/group.model.ts @@ -29,6 +29,7 @@ export interface GROUP { created_at: Date; updated_at: Date; type: 'incoming' | 'outgoing'; + selected?: boolean; } export interface SOURCE { diff --git a/web/ui/dashboard/src/app/models/teams.model.ts b/web/ui/dashboard/src/app/models/teams.model.ts new file mode 100644 index 0000000000..5fd67d807c --- /dev/null +++ b/web/ui/dashboard/src/app/models/teams.model.ts @@ -0,0 +1,14 @@ +export interface TEAMS { + role: { + groups: string[]; + type: string; + }; + uid: string; + status?: boolean; + invitee_email?: string; + user_metadata: { + first_name: string; + last_name: string; + email: string; + }; +} diff --git a/web/ui/dashboard/src/app/private/components/table-loader/table-loader.component.html b/web/ui/dashboard/src/app/private/components/table-loader/table-loader.component.html index b4373ca817..168285a57a 100644 --- a/web/ui/dashboard/src/app/private/components/table-loader/table-loader.component.html +++ b/web/ui/dashboard/src/app/private/components/table-loader/table-loader.component.html @@ -1,102 +1,102 @@ -
{{ head }} | -|||
---|---|---|---|
-
-
-
- |
- - | - | - |
-
-
-
- |
-
-
-
-
- |
- ||
-
-
-
- |
-
-
-
-
- |
- ||
-
-
-
- |
-
-
-
-
- |
- ||
-
-
-
- |
-
-
-
-
- |
- ||
-
-
-
- |
- - | - | - |
-
-
-
- |
-
-
-
-
- |
- ||
-
-
-
- |
-
-
-
-
- |
-
{{ head }} | +|||
---|---|---|---|
+
+
+
+ |
+ + | + | + |
+
+
+
+ |
+
+
+
+
+ |
+ ||
+
+
+
+ |
+
+
+
+
+ |
+ ||
+
+
+
+ |
+
+
+
+
+ |
+ ||
+
+
+
+ |
+
+
+
+
+ |
+ ||
+
+
+
+ |
+ + | + | + |
+
+
+
+ |
+
+
+
+
+ |
+ ||
+
+
+
+ |
+
+
+
+
+ |
+
All your project's summary at a glance
View and manage your team members.
+{{ head }} | +|||
---|---|---|---|
+
+
+ {{ team?.user_metadata?.first_name?.slice(0, 1) }}{{ team?.user_metadata?.last_name?.slice(0, 1) }}
+ {{ team.user_metadata.first_name || '-' }} {{ team.user_metadata.last_name || '-' }}
+ |
+
+ {{ team.role.type === 'super_user' ? 'Super user' : team.role.type }}
+ |
+
+ All projects
+ |
+
+
+
+
+
+ |
+
{{ selectedFilterOption === 'pending' && head === 'Name' ? 'Email' : head }} | +|||
---|---|---|---|
+ {{ team.invitee_email }}
+ |
+
+ {{ team.role.type === 'super_user' ? 'Super user' : team.role.type }}
+ |
+
+ All projects
+ |
+ + |
You can invite team members to join your organization and assign them roles to projects
+ +Team member created succesfully! We sent them an email for onboarding instructions.
+ ++ Are you sure you want to deactivate + “{{ selectedMember?.user_metadata?.first_name }} {{ selectedMember?.user_metadata?.last_name }}” + ? +
+This action is irrevesible
+ + +My account
Organisation settings
diff --git a/web/ui/dashboard/src/app/private/private.component.ts b/web/ui/dashboard/src/app/private/private.component.ts index 1b6af57d9d..583cedbd24 100644 --- a/web/ui/dashboard/src/app/private/private.component.ts +++ b/web/ui/dashboard/src/app/private/private.component.ts @@ -41,8 +41,7 @@ export class PrivateComponent implements OnInit { const response = await this.privateService.getOrganizations(); this.organisations = response.data.content; const setOrg = localStorage.getItem('CONVOY_ORG'); - - if (!setOrg) { + if (!setOrg || setOrg === 'undefined') { this.selectOrganisation(this.organisations[0]); } else { this.userOrganization = JSON.parse(setOrg); diff --git a/web/ui/dashboard/src/assets/img/no-group.svg b/web/ui/dashboard/src/assets/img/no-group.svg new file mode 100644 index 0000000000..c12cae5cba --- /dev/null +++ b/web/ui/dashboard/src/assets/img/no-group.svg @@ -0,0 +1,97 @@ + diff --git a/web/ui/dashboard/src/assets/img/team-empty-img.svg b/web/ui/dashboard/src/assets/img/team-empty-img.svg new file mode 100644 index 0000000000..44ed030bc5 --- /dev/null +++ b/web/ui/dashboard/src/assets/img/team-empty-img.svg @@ -0,0 +1,40 @@ + diff --git a/worker/consumer.go b/worker/consumer.go index 4a86ee8d7b..2a88d50f4c 100644 --- a/worker/consumer.go +++ b/worker/consumer.go @@ -17,9 +17,8 @@ type Consumer struct { } func NewConsumer(q queue.Queuer) (*Consumer, error) { - dsn := q.Options().RedisAddress srv := asynq.NewServer( - asynq.RedisClientOpt{Addr: dsn}, + q.Options().RedisClient, asynq.Config{ Concurrency: convoy.Concurrency, Queues: q.Options().Names, diff --git a/worker/task/process_event_creation.go b/worker/task/process_event_creation.go index 8393ae9481..d955783956 100644 --- a/worker/task/process_event_creation.go +++ b/worker/task/process_event_creation.go @@ -69,10 +69,12 @@ func ProcessEventCreated(appRepo datastore.ApplicationRepository, eventRepo data } } - subscriptions, err = subRepo.FindSubscriptionByEventType(ctx, group.UID, app.UID, event.EventType) + subs, err := subRepo.FindSubscriptionsByAppID(ctx, group.UID, app.UID) if err != nil { return &EndpointError{Err: errors.New("error fetching subscriptions for event type"), delay: 10 * time.Second} } + + subscriptions = matchSubscriptions(string(event.EventType), subs) } else if group.Type == datastore.IncomingGroup { subscriptions, err = subRepo.FindSubscriptionBySourceIDs(ctx, group.UID, event.SourceID) if err != nil { @@ -155,6 +157,19 @@ func ProcessEventCreated(appRepo datastore.ApplicationRepository, eventRepo data } } +func matchSubscriptions(eventType string, subscriptions []datastore.Subscription) []datastore.Subscription { + var matched []datastore.Subscription + for _, sub := range subscriptions { + for _, ev := range sub.FilterConfig.EventTypes { + if ev == eventType || ev == "*" { // if this event type matches, or is *, add the subscription to matched + matched = append(matched, sub) + } + } + } + + return matched +} + func getEventDeliveryStatus(subscription datastore.Subscription, app *datastore.Application) datastore.EventDeliveryStatus { if app.IsDisabled || subscription.Status != datastore.ActiveSubscriptionStatus { return datastore.DiscardedEventStatus diff --git a/worker/task/process_event_creation_test.go b/worker/task/process_event_creation_test.go new file mode 100644 index 0000000000..b475c934ab --- /dev/null +++ b/worker/task/process_event_creation_test.go @@ -0,0 +1,241 @@ +package task + +import ( + "context" + "encoding/json" + "testing" + "time" + + "go.mongodb.org/mongo-driver/bson/primitive" + + "github.com/frain-dev/convoy" + "github.com/frain-dev/convoy/cache" + "github.com/frain-dev/convoy/datastore" + "github.com/frain-dev/convoy/mocks" + "github.com/frain-dev/convoy/queue" + "github.com/golang/mock/gomock" + "github.com/google/uuid" + "github.com/hibiken/asynq" + "github.com/stretchr/testify/require" +) + +type args struct { + appRepo datastore.ApplicationRepository + eventRepo datastore.EventRepository + groupRepo datastore.GroupRepository + eventDeliveryRepo datastore.EventDeliveryRepository + cache cache.Cache + eventQueue queue.Queuer + subRepo datastore.SubscriptionRepository +} + +func provideArgs(ctrl *gomock.Controller) *args { + groupRepo := mocks.NewMockGroupRepository(ctrl) + appRepo := mocks.NewMockApplicationRepository(ctrl) + eventRepo := mocks.NewMockEventRepository(ctrl) + cache := mocks.NewMockCache(ctrl) + queue := mocks.NewMockQueuer(ctrl) + subRepo := mocks.NewMockSubscriptionRepository(ctrl) + eventDeliveryRepo := mocks.NewMockEventDeliveryRepository(ctrl) + + return &args{ + appRepo: appRepo, + eventRepo: eventRepo, + groupRepo: groupRepo, + eventDeliveryRepo: eventDeliveryRepo, + cache: cache, + eventQueue: queue, + subRepo: subRepo, + } +} + +func TestProcessEventCreated(t *testing.T) { + tests := []struct { + name string + event *datastore.Event + dbFn func(args *args) + wantErr bool + wantErrMsg string + wantDelay time.Duration + }{ + { + name: "should_process_event_for_outgoing_group", + event: &datastore.Event{ + UID: uuid.NewString(), + EventType: "*", + ProviderID: uuid.NewString(), + SourceID: "source-id-1", + GroupID: "group-id-1", + AppID: "app-id-1", + Data: []byte(`{}`), + CreatedAt: primitive.NewDateTimeFromTime(time.Now()), + UpdatedAt: primitive.NewDateTimeFromTime(time.Now()), + }, + dbFn: func(args *args) { + mockCache, _ := args.cache.(*mocks.MockCache) + var gr *datastore.Group + mockCache.EXPECT().Get(gomock.Any(), "groups:group-id-1", &gr).Times(1).Return(nil) + + group := &datastore.Group{ + UID: "group-id-1", + Type: datastore.OutgoingGroup, + Config: &datastore.GroupConfig{ + Strategy: &datastore.StrategyConfiguration{ + Type: datastore.LinearStrategyProvider, + Duration: 10, + RetryCount: 3, + }, + }, + } + + g, _ := args.groupRepo.(*mocks.MockGroupRepository) + g.EXPECT().FetchGroupByID(gomock.Any(), "group-id-1").Times(1).Return( + group, + nil, + ) + mockCache.EXPECT().Set(gomock.Any(), "groups:group-id-1", group, 10*time.Minute).Times(1).Return(nil) + + mockCache.EXPECT().Get(gomock.Any(), "applications:app-id-1", gomock.Any()).Times(1).Return(nil) + + a, _ := args.appRepo.(*mocks.MockApplicationRepository) + + app := &datastore.Application{UID: "app-id-1"} + a.EXPECT().FindApplicationByID(gomock.Any(), "app-id-1").Times(1).Return(app, nil) + mockCache.EXPECT().Set(gomock.Any(), "applications:app-id-1", app, 10*time.Minute).Times(1).Return(nil) + + s, _ := args.subRepo.(*mocks.MockSubscriptionRepository) + subscriptions := []datastore.Subscription{ + { + UID: "456", + AppID: "app-id-1", + EndpointID: "098", + Status: datastore.ActiveSubscriptionStatus, + FilterConfig: &datastore.FilterConfiguration{ + EventTypes: []string{"*"}, + }, + }, + } + s.EXPECT().FindSubscriptionsByAppID(gomock.Any(), "group-id-1", "app-id-1").Times(1).Return(subscriptions, nil) + + e, _ := args.eventRepo.(*mocks.MockEventRepository) + e.EXPECT().CreateEvent(gomock.Any(), gomock.Any()).Times(1).Return(nil) + + a.EXPECT().FindApplicationByID(gomock.Any(), "app-id-1").Times(1).Return(app, nil) + + endpoint := &datastore.Endpoint{UID: "098", TargetURL: "https://google.com"} + a.EXPECT().FindApplicationEndpointByID(gomock.Any(), "app-id-1", "098"). + Times(1).Return(endpoint, nil) + + ed, _ := args.eventDeliveryRepo.(*mocks.MockEventDeliveryRepository) + ed.EXPECT().CreateEventDelivery(gomock.Any(), gomock.Any()).Times(1).Return(nil) + + q, _ := args.eventQueue.(*mocks.MockQueuer) + q.EXPECT().Write(convoy.EventProcessor, convoy.EventQueue, gomock.Any()).Times(1).Return(nil) + }, + wantErr: false, + }, + { + name: "should_process_event_for_incoming_group", + event: &datastore.Event{ + UID: uuid.NewString(), + EventType: "*", + ProviderID: uuid.NewString(), + SourceID: "source-id-1", + GroupID: "group-id-1", + AppID: "app-id-1", + Data: []byte(`{}`), + CreatedAt: primitive.NewDateTimeFromTime(time.Now()), + UpdatedAt: primitive.NewDateTimeFromTime(time.Now()), + }, + dbFn: func(args *args) { + mockCache, _ := args.cache.(*mocks.MockCache) + var gr *datastore.Group + mockCache.EXPECT().Get(gomock.Any(), "groups:group-id-1", &gr).Times(1).Return(nil) + + group := &datastore.Group{ + UID: "group-id-1", + Type: datastore.IncomingGroup, + Config: &datastore.GroupConfig{ + Strategy: &datastore.StrategyConfiguration{ + Type: datastore.LinearStrategyProvider, + Duration: 10, + RetryCount: 3, + }, + }, + } + + g, _ := args.groupRepo.(*mocks.MockGroupRepository) + g.EXPECT().FetchGroupByID(gomock.Any(), "group-id-1").Times(1).Return( + group, + nil, + ) + mockCache.EXPECT().Set(gomock.Any(), "groups:group-id-1", group, 10*time.Minute).Times(1).Return(nil) + + a, _ := args.appRepo.(*mocks.MockApplicationRepository) + app := &datastore.Application{UID: "app-id-1"} + + s, _ := args.subRepo.(*mocks.MockSubscriptionRepository) + subscriptions := []datastore.Subscription{ + { + UID: "456", + AppID: "app-id-1", + EndpointID: "098", + Status: datastore.ActiveSubscriptionStatus, + FilterConfig: &datastore.FilterConfiguration{ + EventTypes: []string{"*"}, + }, + }, + } + s.EXPECT().FindSubscriptionBySourceIDs(gomock.Any(), "group-id-1", "source-id-1").Times(1).Return(subscriptions, nil) + + e, _ := args.eventRepo.(*mocks.MockEventRepository) + e.EXPECT().CreateEvent(gomock.Any(), gomock.Any()).Times(1).Return(nil) + + a.EXPECT().FindApplicationByID(gomock.Any(), "app-id-1").Times(1).Return(app, nil) + + endpoint := &datastore.Endpoint{UID: "098", TargetURL: "https://google.com"} + a.EXPECT().FindApplicationEndpointByID(gomock.Any(), "app-id-1", "098"). + Times(1).Return(endpoint, nil) + + ed, _ := args.eventDeliveryRepo.(*mocks.MockEventDeliveryRepository) + ed.EXPECT().CreateEventDelivery(gomock.Any(), gomock.Any()).Times(1).Return(nil) + + q, _ := args.eventQueue.(*mocks.MockQueuer) + q.EXPECT().Write(convoy.EventProcessor, convoy.EventQueue, gomock.Any()).Times(1).Return(nil) + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + args := provideArgs(ctrl) + + if tt.dbFn != nil { + tt.dbFn(args) + } + + payload, err := json.Marshal(tt.event) + require.NoError(t, err) + + job := queue.Job{ + Payload: payload, + } + + task := asynq.NewTask(string(convoy.EventProcessor), job.Payload, asynq.Queue(string(convoy.EventQueue)), asynq.ProcessIn(job.Delay)) + + fn := ProcessEventCreated(args.appRepo, args.eventRepo, args.groupRepo, args.eventDeliveryRepo, args.cache, args.eventQueue, args.subRepo) + err = fn(context.Background(), task) + if tt.wantErr { + require.NotNil(t, err) + require.Equal(t, tt.wantErrMsg, err.(*EndpointError).Error()) + require.Equal(t, tt.wantDelay, err.(*EndpointError).Delay()) + return + } + + require.Nil(t, err) + }) + } +}