Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ lint:

.PHONY: gen-proto
gen-proto:
protoc --proto_path=. --go_out . --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/pb/base.proto internal/pb/source.proto internal/pb/destination.proto
protoc --proto_path=. --go_out . --go_opt=module="github.com/cloudquery/plugin-sdk" --go-grpc_out=. --go-grpc_opt=module="github.com/cloudquery/plugin-sdk" cloudquery/base/v0/base.proto cloudquery/destination/v0/destination.proto cloudquery/source/v0/source.proto
protoc --proto_path=. --go_out . --go_opt=module="github.com/cloudquery/plugin-sdk" --go-grpc_out=. --go-grpc_opt=module="github.com/cloudquery/plugin-sdk" cloudquery/source/v1/source.proto

.PHONY: benchmark
benchmark:
Expand Down
12 changes: 12 additions & 0 deletions buf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
version: v1
breaking:
use:
- FILE
lint:
use:
- BASIC
ignore:
# We are ignoring those as this is an old version and we are not doing any changes here anymore
- cloudquery/destination/v0/destination.proto
- cloudquery/source/v0/source.proto
- cloudquery/base/v0/base.proto
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package clients
package destination

const (
maxMsgSize = 100 * 1024 * 1024 // 100 MiB
Expand Down
88 changes: 46 additions & 42 deletions clients/destination.go → clients/destination/v0/destination.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package clients
package destination

import (
"context"
Expand All @@ -14,8 +14,12 @@ import (
"sync"
"time"

"github.com/cloudquery/plugin-sdk/internal/pb"
"github.com/cloudquery/plugin-sdk/internal/logging"
pbBase "github.com/cloudquery/plugin-sdk/internal/pb/base/v0"
pb "github.com/cloudquery/plugin-sdk/internal/pb/destination/v0"
"github.com/cloudquery/plugin-sdk/internal/random"
"github.com/cloudquery/plugin-sdk/plugins/destination"
"github.com/cloudquery/plugin-sdk/registry"
"github.com/cloudquery/plugin-sdk/schema"
"github.com/cloudquery/plugin-sdk/specs"
"github.com/rs/zerolog"
Expand All @@ -26,7 +30,7 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)

type DestinationClient struct {
type Client struct {
pbClient pb.DestinationClient
directory string
cmd *exec.Cmd
Expand All @@ -38,43 +42,43 @@ type DestinationClient struct {
wg *sync.WaitGroup
}

type DestinationClientOption func(*DestinationClient)
type ClientOption func(*Client)

func WithDestinationLogger(logger zerolog.Logger) func(*DestinationClient) {
return func(c *DestinationClient) {
func WithDestinationLogger(logger zerolog.Logger) func(*Client) {
return func(c *Client) {
c.logger = logger
}
}

func WithDestinationDirectory(directory string) func(*DestinationClient) {
return func(c *DestinationClient) {
func WithDestinationDirectory(directory string) func(*Client) {
return func(c *Client) {
c.directory = directory
}
}

func WithDestinationGrpcConn(userConn *grpc.ClientConn) func(*DestinationClient) {
return func(c *DestinationClient) {
func WithDestinationGrpcConn(userConn *grpc.ClientConn) func(*Client) {
return func(c *Client) {
// we use a different variable here because we don't want to close a connection that wasn't created by us.
c.userConn = userConn
}
}

func WithDestinationNoSentry() func(*DestinationClient) {
return func(c *DestinationClient) {
func WithDestinationNoSentry() func(*Client) {
return func(c *Client) {
c.noSentry = true
}
}

func NewDestinationClient(ctx context.Context, registry specs.Registry, path string, version string, opts ...DestinationClientOption) (*DestinationClient, error) {
func NewClient(ctx context.Context, registrySpec specs.Registry, path string, version string, opts ...ClientOption) (*Client, error) {
var err error
c := &DestinationClient{
directory: DefaultDownloadDir,
c := &Client{
directory: registry.DefaultDownloadDir,
wg: &sync.WaitGroup{},
}
for _, opt := range opts {
opt(c)
}
switch registry {
switch registrySpec {
case specs.RegistryGrpc:
if c.userConn == nil {
c.conn, err = grpc.DialContext(ctx, path,
Expand Down Expand Up @@ -102,25 +106,25 @@ func NewDestinationClient(ctx context.Context, registry specs.Registry, path str
return nil, fmt.Errorf("invalid github plugin path: %s. format should be owner/repo", path)
}
org, name := pathSplit[0], pathSplit[1]
localPath := filepath.Join(c.directory, "plugins", string(PluginTypeDestination), org, name, version, "plugin")
localPath = withBinarySuffix(localPath)
if err := DownloadPluginFromGithub(ctx, localPath, org, name, version, PluginTypeDestination); err != nil {
localPath := filepath.Join(c.directory, "plugins", string(registry.PluginTypeDestination), org, name, version, "plugin")
localPath = registry.WithBinarySuffix(localPath)
if err := registry.DownloadPluginFromGithub(ctx, localPath, org, name, version, registry.PluginTypeDestination); err != nil {
return nil, err
}
if err := c.newManagedClient(ctx, localPath); err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unsupported registry %s", registry)
return nil, fmt.Errorf("unsupported registry %s", registrySpec)
}

return c, nil
}

// newManagedClient starts a new destination plugin process from local file, connects to it via gRPC server
// and returns a new DestinationClient
func (c *DestinationClient) newManagedClient(ctx context.Context, path string) error {
c.grpcSocketName = generateRandomUnixSocketName()
// and returns a new Client
func (c *Client) newManagedClient(ctx context.Context, path string) error {
c.grpcSocketName = random.GenerateRandomUnixSocketName()
// spawn the plugin first and then connect
args := []string{"serve", "--network", "unix", "--address", c.grpcSocketName,
"--log-level", c.logger.GetLevel().String(), "--log-format", "json"}
Expand All @@ -142,13 +146,13 @@ func (c *DestinationClient) newManagedClient(ctx context.Context, path string) e
c.wg.Add(1)
go func() {
defer c.wg.Done()
lr := newLogReader(reader)
lr := logging.NewLogReader(reader)
for {
line, err := lr.NextLine()
if errors.Is(err, io.EOF) {
break
}
if errors.Is(err, errLogLineToLong) {
if errors.Is(err, logging.ErrLogLineToLong) {
c.logger.Err(err).Str("line", string(line)).Msg("skipping too long log line")
continue
}
Expand All @@ -160,7 +164,7 @@ func (c *DestinationClient) newManagedClient(ctx context.Context, path string) e
if err := json.Unmarshal(line, &structuredLogLine); err != nil {
c.logger.Err(err).Str("line", string(line)).Msg("failed to unmarshal log line from plugin")
} else {
jsonToLog(c.logger, structuredLogLine)
logging.JSONToLog(c.logger, structuredLogLine)
}
}
}()
Expand All @@ -180,8 +184,8 @@ func (c *DestinationClient) newManagedClient(ctx context.Context, path string) e
return nil
}

func (c *DestinationClient) GetProtocolVersion(ctx context.Context) (uint64, error) {
res, err := c.pbClient.GetProtocolVersion(ctx, &pb.GetProtocolVersion_Request{})
func (c *Client) GetProtocolVersion(ctx context.Context) (uint64, error) {
res, err := c.pbClient.GetProtocolVersion(ctx, &pbBase.GetProtocolVersion_Request{})
if err != nil {
s := status.Convert(err)
if s.Code() != codes.Unimplemented {
Expand All @@ -193,7 +197,7 @@ func (c *DestinationClient) GetProtocolVersion(ctx context.Context) (uint64, err
return res.Version, nil
}

func (c *DestinationClient) GetMetrics(ctx context.Context) (*destination.Metrics, error) {
func (c *Client) GetMetrics(ctx context.Context) (*destination.Metrics, error) {
res, err := c.pbClient.GetMetrics(ctx, &pb.GetDestinationMetrics_Request{})
if err != nil {
return nil, fmt.Errorf("failed to call GetMetrics: %w", err)
Expand All @@ -205,28 +209,28 @@ func (c *DestinationClient) GetMetrics(ctx context.Context) (*destination.Metric
return &stats, nil
}

func (c *DestinationClient) Name(ctx context.Context) (string, error) {
res, err := c.pbClient.GetName(ctx, &pb.GetName_Request{})
func (c *Client) Name(ctx context.Context) (string, error) {
res, err := c.pbClient.GetName(ctx, &pbBase.GetName_Request{})
if err != nil {
return "", fmt.Errorf("failed to call GetName: %w", err)
}
return res.Name, nil
}

func (c *DestinationClient) Version(ctx context.Context) (string, error) {
res, err := c.pbClient.GetVersion(ctx, &pb.GetVersion_Request{})
func (c *Client) Version(ctx context.Context) (string, error) {
res, err := c.pbClient.GetVersion(ctx, &pbBase.GetVersion_Request{})
if err != nil {
return "", fmt.Errorf("failed to call GetVersion: %w", err)
}
return res.Version, nil
}

func (c *DestinationClient) Initialize(ctx context.Context, spec specs.Destination) error {
func (c *Client) Initialize(ctx context.Context, spec specs.Destination) error {
b, err := json.Marshal(spec)
if err != nil {
return fmt.Errorf("destination configure: failed to marshal spec: %w", err)
}
_, err = c.pbClient.Configure(ctx, &pb.Configure_Request{
_, err = c.pbClient.Configure(ctx, &pbBase.Configure_Request{
Config: b,
})
if err != nil {
Expand All @@ -235,7 +239,7 @@ func (c *DestinationClient) Initialize(ctx context.Context, spec specs.Destinati
return nil
}

func (c *DestinationClient) Migrate(ctx context.Context, tables []*schema.Table) error {
func (c *Client) Migrate(ctx context.Context, tables []*schema.Table) error {
b, err := json.Marshal(tables)
if err != nil {
return fmt.Errorf("destination migrate: failed to marshal plugin: %w", err)
Expand All @@ -250,7 +254,7 @@ func (c *DestinationClient) Migrate(ctx context.Context, tables []*schema.Table)
// Write writes rows as they are received from the channel to the destination plugin.
// resources is marshaled schema.Resource. We are not marshalling this inside the function
// because usually it is already marshalled from the destination plugin.
func (c *DestinationClient) Write(ctx context.Context, source string, syncTime time.Time, resources <-chan []byte) (uint64, error) {
func (c *Client) Write(ctx context.Context, source string, syncTime time.Time, resources <-chan []byte) (uint64, error) {
saveClient, err := c.pbClient.Write(ctx)
if err != nil {
return 0, fmt.Errorf("failed to call Write: %w", err)
Expand All @@ -276,7 +280,7 @@ func (c *DestinationClient) Write(ctx context.Context, source string, syncTime t
return res.FailedWrites, nil
}

func (c *DestinationClient) Write2(ctx context.Context, sourceSpec specs.Source, tables schema.Tables, syncTime time.Time, resources <-chan []byte) error {
func (c *Client) Write2(ctx context.Context, sourceSpec specs.Source, tables schema.Tables, syncTime time.Time, resources <-chan []byte) error {
saveClient, err := c.pbClient.Write2(ctx)
if err != nil {
return fmt.Errorf("failed to call Write2: %w", err)
Expand Down Expand Up @@ -316,14 +320,14 @@ func (c *DestinationClient) Write2(ctx context.Context, sourceSpec specs.Source,
return nil
}

func (c *DestinationClient) Close(ctx context.Context) error {
func (c *Client) Close(ctx context.Context) error {
if _, err := c.pbClient.Close(ctx, &pb.Close_Request{}); err != nil {
return fmt.Errorf("failed to close destination: %w", err)
}
return nil
}

func (c *DestinationClient) DeleteStale(ctx context.Context, tables schema.Tables, source string, timestamp time.Time) error {
func (c *Client) DeleteStale(ctx context.Context, tables schema.Tables, source string, timestamp time.Time) error {
b, err := json.Marshal(tables)
if err != nil {
return fmt.Errorf("destination delete stale: failed to marshal plugin: %w", err)
Expand All @@ -338,9 +342,9 @@ func (c *DestinationClient) DeleteStale(ctx context.Context, tables schema.Table
return nil
}

// Terminate is used only in conjunction with NewManagedDestinationClient.
// Terminate is used only in conjunction with NewManagedClient.
// It closes the connection it created, kills the spawned process and removes the socket file.
func (c *DestinationClient) Terminate() error {
func (c *Client) Terminate() error {
// wait for log streaming to complete before returning from this function
defer c.wg.Wait()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
//go:build !windows

package clients
package destination

import (
"fmt"
"os"
"time"
)

func (c *DestinationClient) terminateProcess() error {
func (c *Client) terminateProcess() error {
if err := c.cmd.Process.Signal(os.Interrupt); err != nil {
c.logger.Error().Err(err).Msg("failed to send interrupt signal to destination plugin")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//go:build windows

package clients
package destination

func (c *DestinationClient) terminateProcess() error {
func (c *Client) terminateProcess() error {
if err := c.cmd.Process.Kill(); err != nil {
c.logger.Error().Err(err).Msg("failed to kill destination plugin")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package clients
package destination

import (
"context"
Expand Down Expand Up @@ -38,7 +38,7 @@ func TestDestinationClient(t *testing.T) {
for _, tc := range newDestinationClientTestCases {
t.Run(tc.Path+"_"+tc.Version, func(t *testing.T) {
dirName := t.TempDir()
c, err := NewDestinationClient(ctx, tc.Registry, tc.Path, tc.Version, WithDestinationLogger(l), WithDestinationDirectory(dirName))
c, err := NewClient(ctx, tc.Registry, tc.Path, tc.Version, WithDestinationLogger(l), WithDestinationDirectory(dirName))
if err != nil {
if strings.HasPrefix(err.Error(), "destination plugin protocol version") {
// this also means success as in this tests we just want to make sure we were able to download and spawn the plugin
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestDestinationClientWriteReturnsCorrectError(t *testing.T) {
ctx := context.Background()
l := zerolog.New(zerolog.NewTestWriter(t)).Output(zerolog.ConsoleWriter{Out: os.Stderr}).Level(zerolog.DebugLevel)
dirName := t.TempDir()
c, err := NewDestinationClient(ctx, specs.RegistryGithub, "cloudquery/sqlite", "v1.0.11", WithDestinationLogger(l), WithDestinationDirectory(dirName))
c, err := NewClient(ctx, specs.RegistryGithub, "cloudquery/sqlite", "v1.0.11", WithDestinationLogger(l), WithDestinationDirectory(dirName))
if err != nil {
t.Fatal(err)
}
Expand Down
22 changes: 0 additions & 22 deletions clients/errors.go

This file was deleted.

Loading