Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/frain-dev/convoy into pelum…
Browse files Browse the repository at this point in the history
…i/feature/user-management
  • Loading branch information
Oluwadaminiola committed Jun 17, 2022
2 parents 0f4142c + bb56a36 commit ef1c6f0
Show file tree
Hide file tree
Showing 56 changed files with 1,810 additions and 260 deletions.
8 changes: 3 additions & 5 deletions cache/redis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,23 @@ import (
"errors"
"time"

"github.com/frain-dev/convoy/internal/pkg/rdb"
"github.com/go-redis/cache/v8"
"github.com/go-redis/redis/v8"
)

type RedisCache struct {
cache *cache.Cache
}

func NewRedisCache(dsn string) (*RedisCache, error) {
opts, err := redis.ParseURL(dsn)
rdb, err := rdb.NewClient(dsn)

if err != nil {
return nil, err
}

client := redis.NewClient(opts)

c := cache.New(&cache.Options{
Redis: client,
Redis: rdb.Client(),
})

r := &RedisCache{cache: c}
Expand Down
33 changes: 16 additions & 17 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@ import (

"github.com/frain-dev/convoy/cache"
"github.com/frain-dev/convoy/datastore/badger"
"github.com/frain-dev/convoy/internal/pkg/apm"
"github.com/frain-dev/convoy/internal/pkg/rdb"
"github.com/frain-dev/convoy/searcher"
"github.com/google/uuid"
"github.com/hibiken/asynq"
"github.com/newrelic/go-agent/v3/newrelic"
"go.mongodb.org/mongo-driver/bson/primitive"

"github.com/frain-dev/convoy/logger"
redisqueue "github.com/frain-dev/convoy/queue/redis"
"github.com/frain-dev/convoy/tracer"
"github.com/getsentry/sentry-go"
prefixed "github.com/x-cray/logrus-prefixed-formatter"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -170,34 +171,32 @@ func preRun(app *app, db datastore.DatabaseClient) func(cmd *cobra.Command, args
return err
}

db, err := NewDB(cfg)
nwCfg := cfg.Tracer.NewRelic
nRApp, err := newrelic.NewApplication(
newrelic.ConfigAppName(nwCfg.AppName),
newrelic.ConfigLicense(nwCfg.LicenseKey),
newrelic.ConfigDistributedTracerEnabled(nwCfg.DistributedTracerEnabled),
newrelic.ConfigEnabled(nwCfg.ConfigEnabled),
)

if err != nil {
return err
}

err = sentry.Init(sentry.ClientOptions{
Debug: true,
Dsn: cfg.Sentry.Dsn,
Environment: cfg.Environment,
})
apm.SetApplication(nRApp)

db, err := NewDB(cfg)
if err != nil {
return err
}

defer sentry.Recover() // recover any panic and report to sentry
defer sentry.Flush(2 * time.Second) // send any events in sentry before exiting

sentryHook := convoy.NewSentryHook(convoy.DefaultLevels)
log.AddHook(sentryHook)

var aC *asynq.Client
var tr tracer.Tracer
var ca cache.Cache
var li limiter.RateLimiter
var q queue.Queuer

if cfg.Queue.Type == config.RedisQueueProvider {
aC, err = redisqueue.NewClient(cfg)
rdb, err := rdb.NewClient(cfg.Queue.Redis.Dsn)
if err != nil {
return err
}
Expand All @@ -209,7 +208,7 @@ func preRun(app *app, db datastore.DatabaseClient) func(cmd *cobra.Command, args
}
opts := queue.QueueOptions{
Names: queueNames,
Client: aC,
RedisClient: rdb,
RedisAddress: cfg.Queue.Redis.Dsn,
Type: string(config.RedisQueueProvider),
PrometheusAddress: cfg.Prometheus.Dsn,
Expand Down
2 changes: 1 addition & 1 deletion cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func loadServerConfigFromCliFlags(cmd *cobra.Command, c *config.Configuration) e
}

if !util.IsStringEmpty(newReplicKey) {
c.Tracer.NewRelic.AppName = newReplicKey
c.Tracer.NewRelic.LicenseKey = newReplicKey
}

// CONVOY_SEARCH_TYPE
Expand Down
4 changes: 4 additions & 0 deletions datastore/badger/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ type subscriptionRepo struct {
client *badgerhold.Store
}

func (s *subscriptionRepo) FindSubscriptionsByAppID(ctx context.Context, groupId string, appID string) ([]datastore.Subscription, error) {
return nil, nil
}

func (*subscriptionRepo) UpdateSubscriptionStatus(context.Context, string, string, datastore.SubscriptionStatus) error {
return nil
}
Expand Down
35 changes: 29 additions & 6 deletions datastore/mongo/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,14 @@ func Test_CreateGroup(t *testing.T) {
groups: []datastore.Group{
{
Name: "group 2",
OrganisationID: "123abc",
UID: uuid.NewString(),
DocumentStatus: datastore.ActiveDocumentStatus,
},

{
Name: "group 2",
OrganisationID: "123abc",
UID: uuid.NewString(),
DocumentStatus: datastore.ActiveDocumentStatus,
},
Expand All @@ -75,46 +77,67 @@ func Test_CreateGroup(t *testing.T) {
groups: []datastore.Group{
{
Name: "group 3",
OrganisationID: "abc",
UID: uuid.NewString(),
DocumentStatus: datastore.DeletedDocumentStatus,
},

{
Name: "group 3",
OrganisationID: "abc",
UID: uuid.NewString(),
DocumentStatus: datastore.ActiveDocumentStatus,
},
},
},
{
name: "can create group with existing name in a different organisation",
groups: []datastore.Group{
{
Name: "group 4",
OrganisationID: uuid.NewString(),
UID: uuid.NewString(),
DocumentStatus: datastore.ActiveDocumentStatus,
},

{
Name: "group 4",
OrganisationID: uuid.NewString(),
UID: uuid.NewString(),
DocumentStatus: datastore.ActiveDocumentStatus,
},
},
isDuplicate: true,
},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
groupRepo := NewGroupRepo(db)

for i, group := range tc.groups {
newOrg := &datastore.Group{
newGroup := &datastore.Group{
Name: group.Name,
UID: group.UID,
DocumentStatus: group.DocumentStatus,
}

if i == 0 {
require.NoError(t, groupRepo.CreateGroup(context.Background(), newOrg))
require.NoError(t, groupRepo.CreateGroup(context.Background(), newGroup))

org, err := groupRepo.FetchGroupByID(context.Background(), newOrg.UID)
g, err := groupRepo.FetchGroupByID(context.Background(), newGroup.UID)
require.NoError(t, err)
require.Equal(t, org.UID, newOrg.UID)
require.Equal(t, g.UID, newGroup.UID)
}

if i > 0 && tc.isDuplicate {
err := groupRepo.CreateGroup(context.Background(), newOrg)
err := groupRepo.CreateGroup(context.Background(), newGroup)
require.Error(t, err)
require.ErrorIs(t, err, datastore.ErrDuplicateGroupName)
}

if i > 0 && !tc.isDuplicate {
require.NoError(t, groupRepo.CreateGroup(context.Background(), newOrg))
require.NoError(t, groupRepo.CreateGroup(context.Background(), newGroup))
}
}

Expand Down
13 changes: 11 additions & 2 deletions datastore/mongo/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ func (c *Client) ConfigurationRepo() datastore.ConfigurationRepository {

func (c *Client) ensureMongoIndices() {
c.ensureIndex(GroupCollection, "uid", true, nil)
c.ensureIndex(GroupCollection, "name", true, bson.M{"document_status": datastore.ActiveDocumentStatus})

c.ensureIndex(OrganisationCollection, "uid", true, nil)

Expand All @@ -179,11 +178,11 @@ func (c *Client) ensureMongoIndices() {
c.ensureIndex(SourceCollection, "mask_id", true, nil)
c.ensureIndex(SubscriptionCollection, "uid", true, nil)
c.ensureIndex(SubscriptionCollection, "filter_config.event_type", false, nil)
c.ensureCompoundIndex(AppCollections)

c.ensureCompoundIndex(EventCollection)
c.ensureCompoundIndex(UserCollection)
c.ensureCompoundIndex(AppCollections)
c.ensureCompoundIndex(GroupCollection)
c.ensureCompoundIndex(EventDeliveryCollection)
c.ensureCompoundIndex(OrganisationInvitesCollection)
c.ensureCompoundIndex(OrganisationMembersCollection)
Expand Down Expand Up @@ -242,6 +241,16 @@ func (c *Client) ensureCompoundIndex(collectionName string) bool {

func compoundIndices() map[string][]mongo.IndexModel {
compoundIndices := map[string][]mongo.IndexModel{
GroupCollection: {
{
Keys: bson.D{
{Key: "organisation_id", Value: 1},
{Key: "name", Value: 1},
{Key: "document_status", Value: 1},
},
Options: options.Index().SetUnique(true),
},
},
EventCollection: {
{
Keys: bson.D{
Expand Down
1 change: 1 addition & 0 deletions datastore/mongo/organisation_invite.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (db *orgInviteRepo) UpdateOrganisationInvite(ctx context.Context, iv *datas
iv.UpdatedAt = primitive.NewDateTimeFromTime(time.Now())
update := bson.D{primitive.E{Key: "$set", Value: bson.D{
primitive.E{Key: "role", Value: iv.Role},
primitive.E{Key: "status", Value: iv.Status},
primitive.E{Key: "updated_at", Value: iv.UpdatedAt},
}}}

Expand Down
9 changes: 8 additions & 1 deletion datastore/mongo/organisation_invite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,23 @@ func TestUpdateOrganisationInvite(t *testing.T) {
Groups: []string{uuid.NewString()},
Apps: nil,
}
status := datastore.InviteStatusAccepted
updatedAt := primitive.NewDateTimeFromTime(time.Now())

iv.Role = role
iv.Status = status
iv.UpdatedAt = updatedAt

err = inviteRepo.UpdateOrganisationInvite(context.Background(), iv)
require.NoError(t, err)

invite, err := inviteRepo.FetchOrganisationInviteByID(context.Background(), iv.UID)
require.NoError(t, err)

require.Equal(t, invite.Role, role)
require.Equal(t, invite.UID, iv.UID)
require.Equal(t, invite.Role, role)
require.Equal(t, invite.UpdatedAt, updatedAt)
require.Equal(t, invite.Status, status)
}

func TestDeleteOrganisationInvite(t *testing.T) {
Expand Down
21 changes: 21 additions & 0 deletions datastore/mongo/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,27 @@ func (s *subscriptionRepo) FindSubscriptionByEventType(ctx context.Context, grou
return subscription, nil
}

func (s *subscriptionRepo) FindSubscriptionsByAppID(ctx context.Context, groupId string, appID string) ([]datastore.Subscription, error) {
filter := bson.M{
"app_id": appID,
"group_id": groupId,
"document_status": datastore.ActiveDocumentStatus,
}

c, err := s.client.Find(ctx, filter)
if errors.Is(err, mongo.ErrNoDocuments) {
return nil, datastore.ErrSubscriptionNotFound
}

var subscription []datastore.Subscription
err = c.All(ctx, &subscription)
if err != nil {
return nil, err
}

return subscription, nil
}

func (s *subscriptionRepo) FindSubscriptionBySourceIDs(ctx context.Context, groupId string, sourceId string) ([]datastore.Subscription, error) {
var subscription []datastore.Subscription
filter := bson.M{"group_id": groupId, "source_id": sourceId, "document_status": datastore.ActiveDocumentStatus}
Expand Down
32 changes: 32 additions & 0 deletions datastore/mongo/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func createSubscription() *datastore.Subscription {
UID: uuid.NewString(),
Name: "Subscription",
Type: "incoming",
AppID: "app-id-1",
GroupID: "group-id-1",
SourceID: "source-id-1",
EndpointID: "endpoint-id-1",
Expand Down Expand Up @@ -185,3 +186,34 @@ func Test_FindSubscriptionByID(t *testing.T) {
require.Equal(t, sub.SourceID, newSub.SourceID)
require.Equal(t, sub.EndpointID, newSub.EndpointID)
}

func Test_FindSubscriptionByAppID(t *testing.T) {
db, closeFn := getDB(t)
defer closeFn()

subRepo := NewSubscriptionRepo(db)

for i := 0; i < 20; i++ {
subscription := &datastore.Subscription{
UID: uuid.NewString(),
Name: fmt.Sprintf("Subscription %d", i),
Type: "incoming",
AppID: "app-id-1",
GroupID: "group-id-1",
SourceID: "source-id-1",
EndpointID: "endpoint-id-1",
DocumentStatus: datastore.ActiveDocumentStatus,
}
require.NoError(t, subRepo.CreateSubscription(context.Background(), subscription.GroupID, subscription))
}

// Fetch sub again
subs, err := subRepo.FindSubscriptionsByAppID(context.Background(), "group-id-1", "app-id-1")
require.NoError(t, err)

for _, sub := range subs {
require.NotEmpty(t, sub.UID)
require.Equal(t, sub.AppID, "app-id-1")
require.Equal(t, sub.GroupID, "group-id-1")
}
}
1 change: 1 addition & 0 deletions datastore/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type SubscriptionRepository interface {
FindSubscriptionByID(context.Context, string, string) (*Subscription, error)
FindSubscriptionByEventType(context.Context, string, string, EventType) ([]Subscription, error)
FindSubscriptionBySourceIDs(context.Context, string, string) ([]Subscription, error)
FindSubscriptionsByAppID(ctx context.Context, groupId string, appID string) ([]Subscription, error)
UpdateSubscriptionStatus(context.Context, string, string, SubscriptionStatus) error
}

Expand Down
2 changes: 1 addition & 1 deletion docs/docs.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Package docs GENERATED BY THE COMMAND ABOVE; DO NOT EDIT
// This file was generated by swaggo/swag at
// 2022-06-11 23:17:36.882931 +0100 WAT m=+106.272259251
// 2022-06-16 12:57:29.873543 +0100 WAT m=+106.525094960
package docs

import (
Expand Down
Loading

0 comments on commit ef1c6f0

Please sign in to comment.