Skip to content

Commit

Permalink
chore: create record if not exists
Browse files Browse the repository at this point in the history
  • Loading branch information
jirevwe committed May 22, 2024
1 parent fad5d45 commit ca89a44
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 15 deletions.
28 changes: 23 additions & 5 deletions internal/pkg/limiter/pg/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ import (
"errors"
"github.com/frain-dev/convoy/database"
"github.com/frain-dev/convoy/pkg/log"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
)

var ErrRateLimitExceeded = errors.New("rate limit exceeded")

type SlidingWindowRateLimiter struct {
db database.Database
}
Expand All @@ -31,28 +35,42 @@ func (p *SlidingWindowRateLimiter) takeToken(ctx context.Context, key string, ra

tx, err := p.db.GetDB().BeginTxx(ctx, nil)
if err != nil {
log.Infof("ratelimit failed: %v", err)
return nil
}

var allowed bool
err = tx.QueryRowContext(ctx, `select convoy.take_token($1, $2, $3)::bool;`, key, rate, windowSize).Scan(&allowed)
if err != nil {
log.Infof("ratelimit failed: %v", err)
return nil
return postgresErrorTransform(tx, err)
}

err = tx.Commit()
if err != nil {
if rollbackErr := tx.Rollback(); rollbackErr != nil {
log.Infof("update failed: %v, unable to rollback: %v", err, rollbackErr)
log.Infof("failed: %v, unable to rollback: %v", err, rollbackErr)
}
return nil
}

if !allowed {
return errors.New("rate limit error")
return ErrRateLimitExceeded
}

return nil
}

func postgresErrorTransform(tx *sqlx.Tx, err error) error {
if rollbackErr := tx.Rollback(); rollbackErr != nil {
log.Infof("failed: %v, unable to rollback: %v", err, rollbackErr)
}

var pgErr *pq.Error
ok := errors.As(err, &pgErr)
if ok {
if pgErr.Code == "23505" {
return ErrRateLimitExceeded
}
}

return err
}
15 changes: 5 additions & 10 deletions sql/1715118159.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ DECLARE
can_take BOOLEAN;
row RECORD;
BEGIN
next_min := current_timestamp + make_interval(secs := _bucket_size);

SELECT expires_at, tokens FROM convoy.token_bucket WHERE key = _key FOR UPDATE SKIP LOCKED LIMIT 1 INTO row;
if row is null then
return false;
INSERT INTO convoy.token_bucket (key, rate, expires_at)
VALUES (_key, _rate, next_min);
return true;
end if;

next_min := current_timestamp + make_interval(secs := _bucket_size);

IF current_timestamp < row.expires_at AND row.tokens = _rate THEN
RETURN FALSE;
END IF;
Expand All @@ -42,13 +44,6 @@ BEGIN
WHERE key = _key
RETURNING TRUE INTO can_take;

-- Insert if no record found
IF NOT FOUND THEN
INSERT INTO convoy.token_bucket (key, rate, expires_at)
VALUES (_key, _rate, next_min);
return true;
END IF;

RETURN can_take;
END;
$$;
Expand Down

0 comments on commit ca89a44

Please sign in to comment.