Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ require (
github.com/CloudyKit/jet/v6 v6.2.0 // indirect
github.com/Joker/jade v1.1.3 // indirect
github.com/Shopify/goreferrer v0.0.0-20220729165902-8cddb4f5de06 // indirect
github.com/adrg/xdg v0.4.0 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/apache/arrow/go/v13 v13.0.0-20230731205701-112f94971882 // indirect
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE
github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk=
github.com/Shopify/goreferrer v0.0.0-20220729165902-8cddb4f5de06 h1:KkH3I3sJuOLP3TjA/dfr4NAY8bghDwnXiU7cTKxQqo0=
github.com/Shopify/goreferrer v0.0.0-20220729165902-8cddb4f5de06/go.mod h1:7erjKLwalezA0k99cWs5L11HWOAPNjdUZ6RxH1BXbbM=
github.com/adrg/xdg v0.4.0 h1:RzRqFcjH4nE5C6oTAxhBtoE2IRyjBSa62SCbyPidvls=
github.com/adrg/xdg v0.4.0/go.mod h1:N6ag73EX4wyxeaoeHctc1mas01KZgsj5tYiAIwqJE/E=
github.com/ajg/form v1.5.1 h1:t9c7v8JUKu/XxOGBU0yjNpaMloxGEJhUkqFRq0ibGeU=
github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
Expand Down Expand Up @@ -556,6 +558,7 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211103235746-7861aae1554b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
100 changes: 100 additions & 0 deletions premium/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package premium

import (
"context"
"errors"
"time"
)

var ErrNoQuota = errors.New("no remaining quota for the month, please increase your usage limit if you want to continue syncing this plugin")

const DefaultQuotaCheckInterval = 30 * time.Second
const DefaultMaxQuotaFailures = 10 // 5 minutes

type quotaChecker struct {
qm QuotaMonitor
duration time.Duration
maxConsecutiveFailures int
}

type QuotaCheckOption func(*quotaChecker)

// WithQuotaCheckPeriod controls the time interval between quota checks
func WithQuotaCheckPeriod(duration time.Duration) QuotaCheckOption {
return func(m *quotaChecker) {
m.duration = duration
}
}

// WithQuotaMaxConsecutiveFailures controls the number of consecutive failed quota checks before the context is cancelled
func WithQuotaMaxConsecutiveFailures(n int) QuotaCheckOption {
return func(m *quotaChecker) {
m.maxConsecutiveFailures = n
}
}

// WithCancelOnQuotaExceeded monitors the quota usage at intervals defined by duration and cancels the context if the quota is exceeded
func WithCancelOnQuotaExceeded(ctx context.Context, qm QuotaMonitor, ops ...QuotaCheckOption) (context.Context, error) {
m := quotaChecker{
qm: qm,
duration: DefaultQuotaCheckInterval,
maxConsecutiveFailures: DefaultMaxQuotaFailures,
}
for _, op := range ops {
op(&m)
}

if err := m.checkInitialQuota(ctx); err != nil {
return ctx, err
}

newCtx := m.startQuotaMonitor(ctx)

return newCtx, nil
}

func (qc quotaChecker) checkInitialQuota(ctx context.Context) error {
hasQuota, err := qc.qm.HasQuota(ctx)
if err != nil {
return err
}

if !hasQuota {
return ErrNoQuota
}

return nil
}

func (qc quotaChecker) startQuotaMonitor(ctx context.Context) context.Context {
newCtx, cancelWithCause := context.WithCancelCause(ctx)
go func() {
ticker := time.NewTicker(qc.duration)
consecutiveFailures := 0
var hasQuotaErrors error
for {
select {
case <-newCtx.Done():
return
case <-ticker.C:
hasQuota, err := qc.qm.HasQuota(newCtx)
if err != nil {
consecutiveFailures++
hasQuotaErrors = errors.Join(hasQuotaErrors, err)
if consecutiveFailures >= qc.maxConsecutiveFailures {
cancelWithCause(hasQuotaErrors)
return
}
continue
}
consecutiveFailures = 0
hasQuotaErrors = nil
if !hasQuota {
cancelWithCause(ErrNoQuota)
return
}
}
}
}()
return newCtx
}
77 changes: 77 additions & 0 deletions premium/monitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package premium

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/require"
)

type quotaResponse struct {
hasQuota bool
err error
}

func newFakeQuotaMonitor(hasQuota ...quotaResponse) *fakeQuotaMonitor {
return &fakeQuotaMonitor{responses: hasQuota}
}

type fakeQuotaMonitor struct {
responses []quotaResponse
calls int
}

func (f *fakeQuotaMonitor) HasQuota(_ context.Context) (bool, error) {
resp := f.responses[f.calls]
if f.calls < len(f.responses)-1 {
f.calls++
}
return resp.hasQuota, resp.err
}

func TestWithCancelOnQuotaExceeded_NoInitialQuota(t *testing.T) {
ctx := context.Background()

responses := []quotaResponse{
{false, nil},
}
_, err := WithCancelOnQuotaExceeded(ctx, newFakeQuotaMonitor(responses...))

require.Error(t, err)
}

func TestWithCancelOnQuotaExceeded_NoQuota(t *testing.T) {
ctx := context.Background()

responses := []quotaResponse{
{true, nil},
{false, nil},
}
ctx, err := WithCancelOnQuotaExceeded(ctx, newFakeQuotaMonitor(responses...), WithQuotaCheckPeriod(1*time.Millisecond))
require.NoError(t, err)

<-ctx.Done()
cause := context.Cause(ctx)
require.Equal(t, ErrNoQuota, cause)
}

func TestWithCancelOnQuotaCheckConsecutiveFailures(t *testing.T) {
ctx := context.Background()

responses := []quotaResponse{
{true, nil},
{false, errors.New("test2")},
{false, errors.New("test3")},
}
ctx, err := WithCancelOnQuotaExceeded(ctx,
newFakeQuotaMonitor(responses...),
WithQuotaCheckPeriod(1*time.Millisecond),
WithQuotaMaxConsecutiveFailures(2),
)
require.NoError(t, err)
<-ctx.Done()
cause := context.Cause(ctx)
require.Equal(t, "test2\ntest3", cause.Error())
}
27 changes: 27 additions & 0 deletions premium/tables.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package premium

import "github.com/cloudquery/plugin-sdk/v4/schema"

// ContainsPaidTables returns true if any of the tables are paid
func ContainsPaidTables(tables schema.Tables) bool {
for _, t := range tables {
if t.IsPaid {
return true
}
}
return false
}

// MakeAllTablesPaid sets all tables to paid
func MakeAllTablesPaid(tables schema.Tables) schema.Tables {
for _, table := range tables {
MakeTablePaid(table)
}
return tables
}

// MakeTablePaid sets the table to paid
func MakeTablePaid(table *schema.Table) *schema.Table {
table.IsPaid = true
return table
}
39 changes: 39 additions & 0 deletions premium/tables_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package premium

import (
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/stretchr/testify/assert"
"testing"
)

func TestContainsPaidTables(t *testing.T) {
noPaidTables := schema.Tables{
&schema.Table{Name: "table1", IsPaid: false},
&schema.Table{Name: "table2", IsPaid: false},
&schema.Table{Name: "table3", IsPaid: false},
}

paidTables := schema.Tables{
&schema.Table{Name: "table1", IsPaid: false},
&schema.Table{Name: "table2", IsPaid: true},
&schema.Table{Name: "table3", IsPaid: false},
}

assert.False(t, ContainsPaidTables(noPaidTables), "no paid tables")
assert.True(t, ContainsPaidTables(paidTables), "paid tables")
}

func TestMakeAllTablesPaid(t *testing.T) {
noPaidTables := schema.Tables{
&schema.Table{Name: "table1", IsPaid: false},
&schema.Table{Name: "table2", IsPaid: false},
&schema.Table{Name: "table3", IsPaid: false},
}

paidTables := MakeAllTablesPaid(noPaidTables)

assert.Equal(t, 3, len(paidTables))
for _, table := range paidTables {
assert.True(t, table.IsPaid)
}
}
Loading