Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.19

require (
github.com/apache/arrow/go/v13 v13.0.0-20230509040948-de6c3cd2b604
github.com/bradleyjkemp/cupaloy/v2 v2.8.0
github.com/cloudquery/plugin-pb-go v1.0.8
github.com/cloudquery/plugin-sdk/v2 v2.7.0
github.com/getsentry/sentry-go v0.20.0
Expand All @@ -18,6 +19,7 @@ require (
github.com/stretchr/testify v1.8.2
github.com/thoas/go-funk v0.9.3
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53
golang.org/x/net v0.9.0
golang.org/x/sync v0.1.0
golang.org/x/text v0.9.0
google.golang.org/grpc v1.54.0
Expand All @@ -42,11 +44,11 @@ require (
github.com/mattn/go-isatty v0.0.18 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.9.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/tools v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/apache/thrift v0.16.0 h1:qEy6UW60iVOlUy+b9ZR0d5WzUWYGOo4HfopoyBaNmoY=
github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU=
github.com/bradleyjkemp/cupaloy/v2 v2.8.0 h1:any4BmKE+jGIaMpnU8YgH/I2LPiLBufr6oMMlVBbn9M=
github.com/bradleyjkemp/cupaloy/v2 v2.8.0/go.mod h1:bm7JXdkRd4BHJk9HpwqAI8BoAY1lps46Enkdqw6aRX0=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
Expand Down Expand Up @@ -174,6 +176,7 @@ github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8D
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand All @@ -196,10 +199,12 @@ github.com/spf13/cobra v1.6.1/go.mod h1:IOw/AERYS7UzyrGinqmz6HLUo219MORXGxhbaJUq
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
Expand Down
191 changes: 191 additions & 0 deletions internal/servers/destination/v1/destinations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
package destination

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/ipc"
pb "github.com/cloudquery/plugin-pb-go/pb/destination/v1"
"github.com/cloudquery/plugin-pb-go/specs"
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type Server struct {
pb.UnimplementedDestinationServer
Plugin *destination.Plugin
Logger zerolog.Logger
spec specs.Destination
}

func (s *Server) Configure(ctx context.Context, req *pb.Configure_Request) (*pb.Configure_Response, error) {
var spec specs.Destination
if err := json.Unmarshal(req.Config, &spec); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "failed to unmarshal spec: %v", err)
}
s.spec = spec
return &pb.Configure_Response{}, s.Plugin.Init(ctx, s.Logger, spec)
}

func (s *Server) GetName(context.Context, *pb.GetName_Request) (*pb.GetName_Response, error) {
return &pb.GetName_Response{
Name: s.Plugin.Name(),
}, nil
}

func (s *Server) GetVersion(context.Context, *pb.GetVersion_Request) (*pb.GetVersion_Response, error) {
return &pb.GetVersion_Response{
Version: s.Plugin.Version(),
}, nil
}

func (s *Server) Migrate(ctx context.Context, req *pb.Migrate_Request) (*pb.Migrate_Response, error) {
schemas, err := schema.NewSchemasFromBytes(req.Tables)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "failed to create schemas: %v", err)
}
tables, err := schema.NewTablesFromArrowSchemas(schemas)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "failed to create tables: %v", err)
}
s.setPKsForTables(tables)

return &pb.Migrate_Response{}, s.Plugin.Migrate(ctx, tables)
}

// Note the order of operations in this method is important!
// Trying to insert into the `resources` channel before starting the reader goroutine will cause a deadlock.
func (s *Server) Write(msg pb.Destination_WriteServer) error {
resources := make(chan arrow.Record)

r, err := msg.Recv()
if err != nil {
if err == io.EOF {
return msg.SendAndClose(&pb.Write_Response{})
}
return status.Errorf(codes.Internal, "failed to receive msg: %v", err)
}

schemas, err := schema.NewSchemasFromBytes(r.Tables)
if err != nil {
return status.Errorf(codes.InvalidArgument, "failed to create schemas: %v", err)
}
tables, err := schema.NewTablesFromArrowSchemas(schemas)
if err != nil {
return status.Errorf(codes.InvalidArgument, "failed to create tables: %v", err)
}
Comment on lines +77 to +84
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be using schema.NewTablesFromBytes(r.Tables) (which is a little bit shorter)

var sourceSpec specs.Source
if r.SourceSpec == nil {
// this is for backward compatibility
sourceSpec = specs.Source{
Name: r.Source,
}
} else {
if err := json.Unmarshal(r.SourceSpec, &sourceSpec); err != nil {
return status.Errorf(codes.InvalidArgument, "failed to unmarshal source spec: %v", err)
}
}
syncTime := r.Timestamp.AsTime()
s.setPKsForTables(tables)
eg, ctx := errgroup.WithContext(msg.Context())
eg.Go(func() error {
return s.Plugin.Write(ctx, sourceSpec, tables, syncTime, resources)
})

for {
r, err := msg.Recv()
if err == io.EOF {
close(resources)
if err := eg.Wait(); err != nil {
return status.Errorf(codes.Internal, "write failed: %v", err)
}
return msg.SendAndClose(&pb.Write_Response{})
}
if err != nil {
close(resources)
if wgErr := eg.Wait(); wgErr != nil {
return status.Errorf(codes.Internal, "failed to receive msg: %v and write failed: %v", err, wgErr)
}
return status.Errorf(codes.Internal, "failed to receive msg: %v", err)
}
rdr, err := ipc.NewReader(bytes.NewReader(r.Resource))
if err != nil {
close(resources)
if wgErr := eg.Wait(); wgErr != nil {
return status.Errorf(codes.InvalidArgument, "failed to create reader: %v and write failed: %v", err, wgErr)
}
return status.Errorf(codes.InvalidArgument, "failed to create reader: %v", err)
}
for rdr.Next() {
rec := rdr.Record()
rec.Retain()
select {
case resources <- rec:
case <-ctx.Done():
close(resources)
if err := eg.Wait(); err != nil {
return status.Errorf(codes.Internal, "Context done: %v and failed to wait for plugin: %v", ctx.Err(), err)
}
return status.Errorf(codes.Internal, "Context done: %v", ctx.Err())
}
}
if err := rdr.Err(); err != nil {
return status.Errorf(codes.InvalidArgument, "failed to read resource: %v", err)
}
}
}

func setCQIDAsPrimaryKeysForTables(tables schema.Tables) {
for _, table := range tables {
for i, col := range table.Columns {
table.Columns[i].PrimaryKey = col.Name == schema.CqIDColumn.Name
}
setCQIDAsPrimaryKeysForTables(table.Relations)
}
}

func (s *Server) GetMetrics(context.Context, *pb.GetDestinationMetrics_Request) (*pb.GetDestinationMetrics_Response, error) {
stats := s.Plugin.Metrics()
b, err := json.Marshal(stats)
if err != nil {
return nil, fmt.Errorf("failed to marshal stats: %w", err)
}
return &pb.GetDestinationMetrics_Response{
Metrics: b,
}, nil
}

func (s *Server) DeleteStale(ctx context.Context, req *pb.DeleteStale_Request) (*pb.DeleteStale_Response, error) {
schemas, err := schema.NewSchemasFromBytes(req.Tables)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "failed to create schemas: %v", err)
}
tables, err := schema.NewTablesFromArrowSchemas(schemas)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "failed to create tables: %v", err)
}

if err := s.Plugin.DeleteStale(ctx, tables, req.Source, req.Timestamp.AsTime()); err != nil {
return nil, err
}

return &pb.DeleteStale_Response{}, nil
}

func (s *Server) setPKsForTables(tables schema.Tables) {
if s.spec.PKMode == specs.PKModeCQID {
setCQIDAsPrimaryKeysForTables(tables)
}
}

func (s *Server) Close(ctx context.Context, _ *pb.Close_Request) (*pb.Close_Response, error) {
return &pb.Close_Response{}, s.Plugin.Close(ctx)
}
Loading