Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move retention policy to instance config #2011

Open
wants to merge 40 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
af08d81
fix: move retention policy to instance config
danvixent May 17, 2024
427bc60
Update retention policy settings (#2004)
Oluwadaminiola May 20, 2024
0fd0c48
Merge branch 'main' into daniel/feat/move_retention_policy_to_instanc…
Oluwadaminiola May 20, 2024
c7839ac
fix:
danvixent May 20, 2024
ae0bd18
fix: fix test syntac errors
danvixent May 20, 2024
79ed8cc
fix: add stub
danvixent May 20, 2024
74fb5cd
fix: remove non-existent fields from query
danvixent May 20, 2024
ed1b61d
fix: fix TestProjectService_CreateProject
danvixent May 20, 2024
0b0a108
fix: fix config tests
danvixent May 20, 2024
fd9bf1e
fix build error
Oluwadaminiola May 21, 2024
fae7417
update project form with search policy
Oluwadaminiola May 21, 2024
1de4261
remove search policy from config form
Oluwadaminiola May 21, 2024
e6fdf42
Merge main
danvixent May 22, 2024
13ca1c1
fix: add retention cli flags
danvixent May 22, 2024
36b8a69
fix: define retention cli flags
danvixent May 22, 2024
70d9c7f
fix: fix cli flag not overriding
danvixent May 22, 2024
a5760d9
Merge branch 'main' into daniel/feat/move_retention_policy_to_instanc…
danvixent May 22, 2024
1f2320b
fix: change cron spec
danvixent Jun 25, 2024
d8d5ec1
fix: change cron spec
danvixent Jun 25, 2024
7cbaf10
fix: change cron spec
danvixent Jun 26, 2024
0672bc2
fix: check for len(projects)
danvixent Jun 26, 2024
cc17e5c
fix: change cron spec
danvixent Jun 26, 2024
dd8bacf
Merge branch 'main' into daniel/feat/move_retention_policy_to_instanc…
danvixent Jun 26, 2024
bf40eae
fix:
danvixent Jun 26, 2024
6a7eabc
fix: remove duplicate import
danvixent Jun 26, 2024
ac6be45
fix: add project.Config.SearchPolicy to params in UpdateProject
danvixent Jun 26, 2024
a3429b5
fix: skip if no record was exported
danvixent Jun 26, 2024
bb96628
Merge branch 'main' into daniel/feat/move_retention_policy_to_instanc…
danvixent Jun 26, 2024
2edbd2f
fix: check len projects in sub loader
danvixent Jun 26, 2024
119025e
Merge branch 'main' into daniel/feat/move_retention_policy_to_instanc…
danvixent Jul 1, 2024
ca9b7f5
fix: web package-lock.json
danvixent Jul 1, 2024
a9c3491
fix: remove go.uber.org/automaxprocs pkg
danvixent Jul 1, 2024
8a6bd42
fix: check for len(projects)
danvixent Jul 1, 2024
bc476ed
fix: add gzip compression
danvixent Jul 1, 2024
d52d258
fix: dont update config singleton with the instance id if command is…
danvixent Jul 1, 2024
e1f8b83
fix: remove compressWriter
danvixent Jul 2, 2024
ec7893a
fix: remove .gz
danvixent Jul 2, 2024
022dfca
fix: skip limiter if limit or duration is zero
danvixent Jul 2, 2024
3c9f1dc
fix: add version to skip config load
danvixent Jul 2, 2024
a40253a
fix:
danvixent Jul 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions api/migrations/v20240517/projects.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package v20240517

Check failure on line 1 in api/migrations/v20240517/projects.go

View workflow job for this annotation

GitHub Actions / lint

: # github.com/frain-dev/convoy/api/migrations/v20240517
jirevwe marked this conversation as resolved.
Show resolved Hide resolved

import (
"encoding/json"
"net/http"
"time"

"github.com/frain-dev/convoy/datastore"

"github.com/frain-dev/convoy/api/models"
"github.com/frain-dev/convoy/util"
"gopkg.in/guregu/null.v4"
)

type oldProjectResponse struct {
UID string `json:"uid" db:"id"`
Name string `json:"name" db:"name"`
LogoURL string `json:"logo_url" db:"logo_url"`
OrganisationID string `json:"organisation_id" db:"organisation_id"`
ProjectConfigID string `json:"-" db:"project_configuration_id"`
Type ProjectType `json:"type" db:"type"`
Config *datastore.ProjectConfig `json:"config" db:"config"`
Statistics *ProjectStatistics `json:"statistics" db:"statistics"`

RetainedEvents int `json:"retained_events" db:"retained_events"`

CreatedAt time.Time `json:"created_at,omitempty" db:"created_at,omitempty" swaggertype:"string"`
UpdatedAt time.Time `json:"updated_at,omitempty" db:"updated_at,omitempty" swaggertype:"string"`
DeletedAt null.Time `json:"deleted_at,omitempty" db:"deleted_at" swaggertype:"string"`
}

type oldProjectConfig struct {
MaxIngestSize uint64 `json:"max_payload_read_size" db:"max_payload_read_size"`
ReplayAttacks bool `json:"replay_attacks_prevention_enabled" db:"replay_attacks_prevention_enabled"`
AddEventIDTraceHeaders bool `json:"add_event_id_trace_headers"`
DisableEndpoint bool `json:"disable_endpoint" db:"disable_endpoint"`
MultipleEndpointSubscriptions bool `json:"multiple_endpoint_subscriptions" db:"multiple_endpoint_subscriptions"`
SearchPolicy string `json:"search_policy" db:"search_policy"`
SSL *SSLConfiguration `json:"ssl" db:"ssl"`
RateLimit *RateLimitConfiguration `json:"ratelimit" db:"ratelimit"`
Strategy *StrategyConfiguration `json:"strategy" db:"strategy"`
Signature *SignatureConfiguration `json:"signature" db:"signature"`
MetaEvent *MetaEventConfiguration `json:"meta_event" db:"meta_event"`
}

type CreateEndpointResponseMigration struct{}

func (c *CreateEndpointResponseMigration) Migrate(b []byte, h http.Header) ([]byte, http.Header, error) {
var serverResponse util.ServerResponse
err := json.Unmarshal(b, &serverResponse)
if err != nil {
return nil, nil, err
}

if len(serverResponse.Data) == 0 {
// nothing to transform.
return b, h, nil
}

var endpointResp *models.EndpointResponse
err = json.Unmarshal(serverResponse.Data, &endpointResp)
if err != nil {
return nil, nil, err
}

endpoint := endpointResp.Endpoint

var old oldProjectResponse
err = migrateEndpoint(&endpoint, &old)
if err != nil {
return nil, nil, err
}

b, err = json.Marshal(old)
if err != nil {
return nil, nil, err
}

serverResponse.Data = b

sb, err := json.Marshal(serverResponse)
if err != nil {
return nil, nil, err
}

return sb, h, nil
}
2 changes: 2 additions & 0 deletions api/models/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ type Configuration struct {

// Used to configure where events removed by retention policies are stored
StoragePolicy *StoragePolicyConfiguration `json:"storage_policy"`

RetentionPolicy *RetentionPolicyConfiguration
}

func (c *Configuration) Validate() error {
Expand Down
37 changes: 16 additions & 21 deletions api/models/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,14 @@ type ProjectConfig struct {
// Controls if your project will add a timestamp to it's webhook signature header to prevent a replay attack, See this blog post[https://getconvoy.io/blog/generating-stripe-like-webhook-signatures] for more]
ReplayAttacks bool `json:"replay_attacks_prevention_enabled"`

// Controls whether the retention policy is active on this project.
IsRetentionPolicyEnabled bool `json:"retention_policy_enabled"`

// Controls of the Event ID and Event Delivery ID Headers are added to the request when events are dispatched to endpoints
AddEventIDTraceHeaders bool `json:"add_event_id_trace_headers"`

// Controls if the project will disable and endpoint after the retry threshold for an event is reached
DisableEndpoint bool `json:"disable_endpoint"`

// RetentionPolicy is used configure values for our retention and search tokenization policies
RetentionPolicy *RetentionPolicyConfiguration `json:"retention_policy"`
// Specify the interval in hours for which the event tokenizer runs
SearchPolicy string `json:"search_policy" db:"search_policy"`

// RateLimit is used to configure the projects rate limiting config values
RateLimit *RateLimitConfiguration `json:"ratelimit"`
Expand Down Expand Up @@ -85,18 +82,16 @@ func (pc *ProjectConfig) Transform() *datastore.ProjectConfig {
}

return &datastore.ProjectConfig{
MaxIngestSize: pc.MaxIngestSize,
ReplayAttacks: pc.ReplayAttacks,
IsRetentionPolicyEnabled: pc.IsRetentionPolicyEnabled,
DisableEndpoint: pc.DisableEndpoint,
AddEventIDTraceHeaders: pc.AddEventIDTraceHeaders,
MaxIngestSize: pc.MaxIngestSize,
ReplayAttacks: pc.ReplayAttacks,
DisableEndpoint: pc.DisableEndpoint,
AddEventIDTraceHeaders: pc.AddEventIDTraceHeaders,
MultipleEndpointSubscriptions: pc.MultipleEndpointSubscriptions,
SSL: pc.SSL.transform(),
RetentionPolicy: pc.RetentionPolicy.transform(),
RateLimit: pc.RateLimit.Transform(),
Strategy: pc.Strategy.transform(),
Signature: pc.Signature.transform(),
MetaEvent: pc.MetaEvent.transform(),
SSL: pc.SSL.transform(),
RateLimit: pc.RateLimit.Transform(),
Strategy: pc.Strategy.transform(),
Signature: pc.Signature.transform(),
MetaEvent: pc.MetaEvent.transform(),
}
}

Expand All @@ -115,19 +110,19 @@ func (r *SSLConfiguration) transform() *datastore.SSLConfiguration {
}

type RetentionPolicyConfiguration struct {
// Controls whether the retention policy is active on this instance.
IsRetentionPolicyEnabled bool `json:"retention_policy_enabled"`

// Specify the number of hours the policy job should go back before deleting events and deliveries.
Policy string `json:"policy" valid:"duration~please provide a valid retention policy time duration"`

// Specify the interval in hours for which the event tokenizer runs
SearchPolicy string `json:"search_policy" db:"search_policy"`
}

func (r *RetentionPolicyConfiguration) transform() *datastore.RetentionPolicyConfiguration {
func (r *RetentionPolicyConfiguration) Transform() *datastore.RetentionPolicyConfiguration {
if r == nil {
return nil
}

return &datastore.RetentionPolicyConfiguration{Policy: r.Policy, SearchPolicy: r.SearchPolicy}
return &datastore.RetentionPolicyConfiguration{Policy: r.Policy, IsRetentionPolicyEnabled: r.IsRetentionPolicyEnabled}
}

type RateLimitConfiguration struct {
Expand Down
Empty file removed api/ui/build/go_test_stub.txt
Empty file.
43 changes: 25 additions & 18 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,11 @@ type SentryConfiguration struct {
DSN string `json:"dsn" envconfig:"CONVOY_SENTRY_DSN"`
}

type RetentionPolicyConfiguration struct {
Policy string `json:"policy" db:"policy" valid:"required~please provide a valid retention policy"`
SearchPolicy string `json:"search_policy" db:"search_policy"`
}

type AnalyticsConfiguration struct {
IsEnabled bool `json:"enabled" envconfig:"CONVOY_ANALYTICS_ENABLED"`
}
Expand Down Expand Up @@ -326,24 +331,26 @@ func (ft FlagLevel) MarshalJSON() ([]byte, error) {
}

type Configuration struct {
APIVersion string `json:"api_version" envconfig:"CONVOY_API_VERSION"`
Auth AuthConfiguration `json:"auth,omitempty"`
Database DatabaseConfiguration `json:"database"`
Redis RedisConfiguration `json:"redis"`
Prometheus PrometheusConfiguration `json:"prometheus"`
Server ServerConfiguration `json:"server"`
MaxResponseSize uint64 `json:"max_response_size" envconfig:"CONVOY_MAX_RESPONSE_SIZE"`
SMTP SMTPConfiguration `json:"smtp"`
Environment string `json:"env" envconfig:"CONVOY_ENV"`
Logger LoggerConfiguration `json:"logger"`
Tracer TracerConfiguration `json:"tracer"`
Host string `json:"host" envconfig:"CONVOY_HOST"`
CustomDomainSuffix string `json:"custom_domain_suffix" envconfig:"CONVOY_CUSTOM_DOMAIN_SUFFIX"`
FeatureFlag FlagLevel `json:"feature_flag" envconfig:"CONVOY_FEATURE_FLAG"`
Analytics AnalyticsConfiguration `json:"analytics"`
StoragePolicy StoragePolicyConfiguration `json:"storage_policy"`
ConsumerPoolSize int `json:"consumer_pool_size" envconfig:"CONVOY_CONSUMER_POOL_SIZE"`
EnableProfiling bool `json:"enable_profiling" envconfig:"CONVOY_ENABLE_PROFILING"`
APIVersion string `json:"api_version" envconfig:"CONVOY_API_VERSION"`
Auth AuthConfiguration `json:"auth,omitempty"`
Database DatabaseConfiguration `json:"database"`
Redis RedisConfiguration `json:"redis"`
Prometheus PrometheusConfiguration `json:"prometheus"`
Server ServerConfiguration `json:"server"`
MaxResponseSize uint64 `json:"max_response_size" envconfig:"CONVOY_MAX_RESPONSE_SIZE"`
SMTP SMTPConfiguration `json:"smtp"`
Environment string `json:"env" envconfig:"CONVOY_ENV"`
Logger LoggerConfiguration `json:"logger"`
Tracer TracerConfiguration `json:"tracer"`
Host string `json:"host" envconfig:"CONVOY_HOST"`
CustomDomainSuffix string `json:"custom_domain_suffix" envconfig:"CONVOY_CUSTOM_DOMAIN_SUFFIX"`
FeatureFlag FlagLevel `json:"feature_flag" envconfig:"CONVOY_FEATURE_FLAG"`
Analytics AnalyticsConfiguration `json:"analytics"`
IsRetentionPolicyEnabled bool `json:"retention_policy_enabled" envconfig:"CONVOY_RETENTION_POLICY_ENABLED"`
RetentionPolicy *RetentionPolicyConfiguration `json:"retention_policy" envconfig:"CONVOY_RETENTION_POLICY"`
StoragePolicy StoragePolicyConfiguration `json:"storage_policy"`
ConsumerPoolSize int `json:"consumer_pool_size" envconfig:"CONVOY_CONSUMER_POOL_SIZE"`
EnableProfiling bool `json:"enable_profiling" envconfig:"CONVOY_ENABLE_PROFILING"`
}

// Get fetches the application configuration. LoadConfig must have been called
Expand Down
7 changes: 4 additions & 3 deletions database/listener/project_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package listener

import (
"encoding/json"
"time"

"github.com/frain-dev/convoy"
"github.com/frain-dev/convoy/datastore"
"github.com/frain-dev/convoy/pkg/log"
"github.com/frain-dev/convoy/queue"
"github.com/r3labs/diff/v3"
"time"
)

type ProjectListener struct {
Expand Down Expand Up @@ -35,9 +36,9 @@ func (e *ProjectListener) run(eventType string, data interface{}, changelog inte
switch change.Type {
case diff.UPDATE:
if testSliceEq(change.Path, []string{"Config", "RetentionPolicy", "SearchPolicy"}) {
dur, err := time.ParseDuration(project.Config.RetentionPolicy.SearchPolicy)
dur, err := time.ParseDuration(project.Config.SearchPolicy)
if err != nil {
log.WithError(err).Errorf("%s is not a valid time duration", project.Config.RetentionPolicy.SearchPolicy)
log.WithError(err).Errorf("%s is not a valid time duration", project.Config.SearchPolicy)
return
}

Expand Down
17 changes: 15 additions & 2 deletions database/postgres/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@ const (
id, is_analytics_enabled, is_signup_enabled,
storage_policy_type, on_prem_path, s3_prefix,
s3_bucket, s3_access_key, s3_secret_key,
s3_region, s3_session_token, s3_endpoint
s3_region, s3_session_token, s3_endpoint,
retention_policy_policy, retention_policy_enabled
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12);
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14);
`

fetchConfiguration = `
SELECT
id,
is_analytics_enabled,
is_signup_enabled,
retention_policy_enabled AS "retention_policy.retention_policy_enabled",
retention_policy_policy AS "retention_policy.policy",
storage_policy_type AS "storage_policy.type",
on_prem_path AS "storage_policy.on_prem.path",
s3_bucket AS "storage_policy.s3.bucket",
Expand Down Expand Up @@ -58,6 +61,8 @@ const (
s3_session_token = $10,
s3_endpoint = $11,
s3_prefix = $12,
retention_policy_policy = $13,
retention_policy_enabled = $14,
updated_at = NOW()
WHERE id = $1 AND deleted_at IS NULL;
`
Expand Down Expand Up @@ -88,6 +93,8 @@ func (c *configRepo) CreateConfiguration(ctx context.Context, config *datastore.
}
}

rc := config.GetRetentionPolicyConfig()

r, err := c.db.ExecContext(ctx, createConfiguration,
config.UID,
config.IsAnalyticsEnabled,
Expand All @@ -101,6 +108,8 @@ func (c *configRepo) CreateConfiguration(ctx context.Context, config *datastore.
config.StoragePolicy.S3.Region,
config.StoragePolicy.S3.SessionToken,
config.StoragePolicy.S3.Endpoint,
rc.Policy,
rc.IsRetentionPolicyEnabled,
)
if err != nil {
return err
Expand Down Expand Up @@ -148,6 +157,8 @@ func (c *configRepo) UpdateConfiguration(ctx context.Context, cfg *datastore.Con
}
}

rc := cfg.GetRetentionPolicyConfig()

result, err := c.db.ExecContext(ctx, updateConfiguration,
cfg.UID,
cfg.IsAnalyticsEnabled,
Expand All @@ -161,6 +172,8 @@ func (c *configRepo) UpdateConfiguration(ctx context.Context, cfg *datastore.Con
cfg.StoragePolicy.S3.SessionToken,
cfg.StoragePolicy.S3.Endpoint,
cfg.StoragePolicy.S3.Prefix,
rc.Policy,
rc.IsRetentionPolicyEnabled,
)
if err != nil {
return err
Expand Down
4 changes: 4 additions & 0 deletions database/postgres/configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ func generateConfig() *datastore.Configuration {
UID: ulid.Make().String(),
IsAnalyticsEnabled: true,
IsSignupEnabled: false,
RetentionPolicy: &datastore.RetentionPolicyConfiguration{
Policy: "720h",
IsRetentionPolicyEnabled: true,
},
StoragePolicy: &datastore.StoragePolicyConfiguration{
Type: datastore.OnPrem,
S3: &datastore.S3Storage{
Expand Down
Loading
Loading