Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ require (
github.com/CloudyKit/jet/v6 v6.2.0 // indirect
github.com/Joker/jade v1.1.3 // indirect
github.com/Shopify/goreferrer v0.0.0-20220729165902-8cddb4f5de06 // indirect
github.com/adrg/xdg v0.4.0 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/apache/arrow/go/v13 v13.0.0-20230731205701-112f94971882 // indirect
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE
github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk=
github.com/Shopify/goreferrer v0.0.0-20220729165902-8cddb4f5de06 h1:KkH3I3sJuOLP3TjA/dfr4NAY8bghDwnXiU7cTKxQqo0=
github.com/Shopify/goreferrer v0.0.0-20220729165902-8cddb4f5de06/go.mod h1:7erjKLwalezA0k99cWs5L11HWOAPNjdUZ6RxH1BXbbM=
github.com/adrg/xdg v0.4.0 h1:RzRqFcjH4nE5C6oTAxhBtoE2IRyjBSa62SCbyPidvls=
github.com/adrg/xdg v0.4.0/go.mod h1:N6ag73EX4wyxeaoeHctc1mas01KZgsj5tYiAIwqJE/E=
github.com/ajg/form v1.5.1 h1:t9c7v8JUKu/XxOGBU0yjNpaMloxGEJhUkqFRq0ibGeU=
github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
Expand Down Expand Up @@ -556,6 +558,7 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211103235746-7861aae1554b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
4 changes: 4 additions & 0 deletions internal/servers/plugin/v3/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ func (s *Server) Sync(req *pb.Sync_Request, stream pb.Plugin_SyncServer) error {
}
}

if err := s.Plugin.OnSyncFinish(ctx); err != nil {
return status.Errorf(codes.Internal, "failed to finish sync: %v", err)
}

return syncErr
}

Expand Down
13 changes: 13 additions & 0 deletions plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,19 @@ func (p *Plugin) OnBeforeSend(ctx context.Context, msg message.SyncMessage) (mes
return msg, nil
}

// OnSyncFinisher is an interface that can be implemented by a plugin client to be notified when a sync finishes.
type OnSyncFinisher interface {
OnSyncFinish(context.Context) error
}

// OnSyncFinish gets called after a sync finishes.
func (p *Plugin) OnSyncFinish(ctx context.Context) error {
if v, ok := p.client.(OnSyncFinisher); ok {
return v.OnSyncFinish(ctx)
}
return nil
}

// IsStaticLinkingEnabled whether static linking is to be enabled
func (p *Plugin) IsStaticLinkingEnabled() bool {
return p.staticLinking
Expand Down
80 changes: 80 additions & 0 deletions premium/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package premium

import (
"context"
"errors"
"time"
)

var ErrNoQuota = errors.New("no remaining quota for the month, please increase your usage limit if you want to continue syncing this plugin")

const DefaultQuotaCheckInterval = 30 * time.Second

type quotaChecker struct {
qm QuotaMonitor
duration time.Duration
}

type QuotaCheckOption func(*quotaChecker)

// WithQuotaCheckPeriod the time interval between quota checks
func WithQuotaCheckPeriod(duration time.Duration) QuotaCheckOption {
return func(m *quotaChecker) {
m.duration = duration
}
}

// WithCancelOnQuotaExceeded monitors the quota usage at intervals defined by duration and cancels the context if the quota is exceeded
func WithCancelOnQuotaExceeded(ctx context.Context, qm QuotaMonitor, ops ...QuotaCheckOption) (context.Context, func(), error) {
m := quotaChecker{
qm: qm,
duration: DefaultQuotaCheckInterval,
}
for _, op := range ops {
op(&m)
}

if err := m.checkInitialQuota(ctx); err != nil {
return ctx, nil, err
}

ctx, cancel := m.startQuotaMonitor(ctx)

return ctx, cancel, nil
}

func (qc quotaChecker) checkInitialQuota(ctx context.Context) error {
hasQuota, err := qc.qm.HasQuota(ctx)
if err != nil {
return err
}

if !hasQuota {
return ErrNoQuota
}

return nil
}

func (qc quotaChecker) startQuotaMonitor(ctx context.Context) (context.Context, func()) {
newCtx, cancel := context.WithCancel(ctx)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically a cancelable context is created by the caller, but in this case I didn't want to make the caller (plugin developer) have to write this code.

go func() {
defer cancel()
ticker := time.NewTicker(qc.duration)
for {
select {
case <-newCtx.Done():
return
case <-ticker.C:
hasQuota, err := qc.qm.HasQuota(newCtx)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add retries to the HasQuota call.

if err != nil {
continue
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What should we do with an error determining the quota - since returning an error here would cancel the sync.

}
if !hasQuota {
return
}
}
}
}()
return newCtx, cancel
}
52 changes: 52 additions & 0 deletions premium/monitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package premium

import (
"context"
"github.com/stretchr/testify/require"
"testing"
"time"
)

func newFakeQuotaMonitor(hasQuota ...bool) *fakeQuotaMonitor {
return &fakeQuotaMonitor{hasQuota: hasQuota}
}

type fakeQuotaMonitor struct {
hasQuota []bool
calls int
}

func (f *fakeQuotaMonitor) HasQuota(_ context.Context) (bool, error) {
hasQuota := f.hasQuota[f.calls]
if f.calls < len(f.hasQuota)-1 {
f.calls++
}
return hasQuota, nil
}

func TestWithCancelOnQuotaExceeded_NoInitialQuota(t *testing.T) {
ctx := context.Background()

_, _, err := WithCancelOnQuotaExceeded(ctx, newFakeQuotaMonitor(false))

require.Error(t, err)
}

func TestWithCancelOnQuotaExceeded_NoQuota(t *testing.T) {
ctx := context.Background()

ctx, _, err := WithCancelOnQuotaExceeded(ctx, newFakeQuotaMonitor(true, false), WithQuotaCheckPeriod(1*time.Millisecond))
require.NoError(t, err)

<-ctx.Done()
}

func TestWithCancelOnQuotaExceeded_HasQuotaCanceled(t *testing.T) {
ctx := context.Background()

ctx, cancel, err := WithCancelOnQuotaExceeded(ctx, newFakeQuotaMonitor(true, true, true), WithQuotaCheckPeriod(1*time.Millisecond))
require.NoError(t, err)
cancel()

<-ctx.Done()
}
27 changes: 27 additions & 0 deletions premium/tables.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package premium

import "github.com/cloudquery/plugin-sdk/v4/schema"

// ContainsPaidTables returns true if any of the tables are paid
func ContainsPaidTables(tables schema.Tables) bool {
for _, t := range tables {
if t.IsPaid {
return true
}
}
return false
}

// MakeAllTablesPaid sets all tables to paid
func MakeAllTablesPaid(tables schema.Tables) schema.Tables {
for _, table := range tables {
MakeTablePaid(table)
}
return tables
}

// MakeTablePaid sets the table to paid
func MakeTablePaid(table *schema.Table) *schema.Table {
table.IsPaid = true
return table
}
39 changes: 39 additions & 0 deletions premium/tables_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package premium

import (
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/stretchr/testify/assert"
"testing"
)

func TestContainsPaidTables(t *testing.T) {
noPaidTables := schema.Tables{
&schema.Table{Name: "table1", IsPaid: false},
&schema.Table{Name: "table2", IsPaid: false},
&schema.Table{Name: "table3", IsPaid: false},
}

paidTables := schema.Tables{
&schema.Table{Name: "table1", IsPaid: false},
&schema.Table{Name: "table2", IsPaid: true},
&schema.Table{Name: "table3", IsPaid: false},
}

assert.False(t, ContainsPaidTables(noPaidTables), "no paid tables")
assert.True(t, ContainsPaidTables(paidTables), "paid tables")
}

func TestMakeAllTablesPaid(t *testing.T) {
noPaidTables := schema.Tables{
&schema.Table{Name: "table1", IsPaid: false},
&schema.Table{Name: "table2", IsPaid: false},
&schema.Table{Name: "table3", IsPaid: false},
}

paidTables := MakeAllTablesPaid(noPaidTables)

assert.Equal(t, 3, len(paidTables))
for _, table := range paidTables {
assert.True(t, table.IsPaid)
}
}
Loading