From 1d2782608f922e71d6efa5ea7e8a06f0bcbc0ed5 Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Fri, 30 Jun 2023 11:36:39 +0100 Subject: [PATCH] Implement plugin Read --- go.mod | 2 +- go.sum | 2 ++ internal/clients/state/v3/state.go | 15 +++-------- internal/servers/plugin/v3/plugin.go | 38 ++++++++++++++++++++++++++++ plugin/plugin.go | 2 +- plugin/plugin_destination.go | 15 +++++++++++ 6 files changed, 61 insertions(+), 13 deletions(-) diff --git a/go.mod b/go.mod index 61198d6df0..7f6672976c 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.19 require ( github.com/apache/arrow/go/v13 v13.0.0-20230622042343-ec413b7763fe github.com/bradleyjkemp/cupaloy/v2 v2.8.0 - github.com/cloudquery/plugin-pb-go v1.4.0 + github.com/cloudquery/plugin-pb-go v1.5.0 github.com/cloudquery/plugin-sdk/v2 v2.7.0 github.com/getsentry/sentry-go v0.20.0 github.com/goccy/go-json v0.10.0 diff --git a/go.sum b/go.sum index ba54473bbf..049ba8119c 100644 --- a/go.sum +++ b/go.sum @@ -44,6 +44,8 @@ github.com/cloudquery/arrow/go/v13 v13.0.0-20230626001500-065602842c3a h1:O/FNq1 github.com/cloudquery/arrow/go/v13 v13.0.0-20230626001500-065602842c3a/go.mod h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc= github.com/cloudquery/plugin-pb-go v1.4.0 h1:sfy0oWSFac2JCJQJuKoR+8flZGKkEoUVORwZDNM3aiI= github.com/cloudquery/plugin-pb-go v1.4.0/go.mod h1:NbWAtT2BzJQ9+XUWwh3IKBg3MOeV9ZEpHoHNAQ/YDV8= +github.com/cloudquery/plugin-pb-go v1.5.0 h1:A/RE1U1l34W5T+JlXJzrHz0IMzfpdUK4VSg+J1Hw0gw= +github.com/cloudquery/plugin-pb-go v1.5.0/go.mod h1:NbWAtT2BzJQ9+XUWwh3IKBg3MOeV9ZEpHoHNAQ/YDV8= github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U= github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= diff --git a/internal/clients/state/v3/state.go b/internal/clients/state/v3/state.go index fc40096519..d4c01fc375 100644 --- a/internal/clients/state/v3/state.go +++ b/internal/clients/state/v3/state.go @@ -72,8 +72,8 @@ func NewClient(ctx context.Context, pbClient pb.PluginClient, tableName string) return nil, err } - syncClient, err := c.client.Sync(ctx, &pb.Sync_Request{ - Tables: []string{tableName}, + readClient, err := c.client.Read(ctx, &pb.Read_Request{ + Table: tableBytes, }) if err != nil { return nil, err @@ -81,21 +81,14 @@ func NewClient(ctx context.Context, pbClient pb.PluginClient, tableName string) c.mutex.Lock() defer c.mutex.Unlock() for { - res, err := syncClient.Recv() + res, err := readClient.Recv() if err != nil { if err == io.EOF { break } return nil, err } - var insertMessage *pb.Sync_Response_Insert - switch m := res.Message.(type) { - case *pb.Sync_Response_MigrateTable: - continue - case *pb.Sync_Response_Insert: - insertMessage = m - } - rdr, err := ipc.NewReader(bytes.NewReader(insertMessage.Insert.Record)) + rdr, err := ipc.NewReader(bytes.NewReader(res.Record)) if err != nil { return nil, err } diff --git a/internal/servers/plugin/v3/plugin.go b/internal/servers/plugin/v3/plugin.go index 7a23bc1b35..8477b5c026 100644 --- a/internal/servers/plugin/v3/plugin.go +++ b/internal/servers/plugin/v3/plugin.go @@ -5,6 +5,7 @@ import ( "fmt" "io" + "github.com/apache/arrow/go/v13/arrow" pb "github.com/cloudquery/plugin-pb-go/pb/plugin/v3" "github.com/cloudquery/plugin-sdk/v4/message" "github.com/cloudquery/plugin-sdk/v4/plugin" @@ -63,6 +64,43 @@ func (s *Server) Init(ctx context.Context, req *pb.Init_Request) (*pb.Init_Respo return &pb.Init_Response{}, nil } +func (s *Server) Read(req *pb.Read_Request, stream pb.Plugin_ReadServer) error { + records := make(chan arrow.Record) + var syncErr error + ctx := stream.Context() + + sc, err := pb.NewSchemaFromBytes(req.Table) + if err != nil { + return status.Errorf(codes.InvalidArgument, "failed to create schema from bytes: %v", err) + } + table, err := schema.NewTableFromArrowSchema(sc) + if err != nil { + return status.Errorf(codes.InvalidArgument, "failed to create table from schema: %v", err) + } + go func() { + defer close(records) + err := s.Plugin.Read(ctx, table, records) + if err != nil { + syncErr = fmt.Errorf("failed to sync records: %w", err) + } + }() + + for rec := range records { + recBytes, err := pb.RecordToBytes(rec) + if err != nil { + return status.Errorf(codes.Internal, "failed to convert record to bytes: %v", err) + } + resp := &pb.Read_Response{ + Record: recBytes, + } + if err := stream.Send(resp); err != nil { + return status.Errorf(codes.Internal, "failed to send read response: %v", err) + } + } + + return syncErr +} + func (s *Server) Sync(req *pb.Sync_Request, stream pb.Plugin_SyncServer) error { msgs := make(chan message.SyncMessage) var syncErr error diff --git a/plugin/plugin.go b/plugin/plugin.go index ca4874daed..301f408550 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -27,7 +27,7 @@ func (UnimplementedDestination) Write(context.Context, <-chan message.WriteMessa } func (UnimplementedDestination) Read(context.Context, *schema.Table, chan<- arrow.Record) error { - return fmt.Errorf("not implemented") + return ErrNotImplemented } type UnimplementedSource struct{} diff --git a/plugin/plugin_destination.go b/plugin/plugin_destination.go index af1bff7b21..e8b9b8aefd 100644 --- a/plugin/plugin_destination.go +++ b/plugin/plugin_destination.go @@ -37,3 +37,18 @@ func (p *Plugin) Write(ctx context.Context, res <-chan message.WriteMessage) err } return p.client.Write(ctx, res) } + +// Read is read data from the requested table to the given channel, returned in the same format as the table +func (p *Plugin) Read(ctx context.Context, table *schema.Table, res chan<- arrow.Record) error { + if !p.mu.TryLock() { + return fmt.Errorf("plugin already in use") + } + defer p.mu.Unlock() + if p.client == nil { + return fmt.Errorf("plugin not initialized. call Init() first") + } + if err := p.client.Read(ctx, table, res); err != nil { + return fmt.Errorf("failed to read: %w", err) + } + return nil +}