Skip to content

Commit

Permalink
feat(meilisearch): Migrate to github.com/cloudquery/plugin-sdk/v3 (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
candiduslynx committed May 18, 2023
1 parent 84f3416 commit 3ffb2af
Show file tree
Hide file tree
Showing 11 changed files with 56 additions and 53 deletions.
6 changes: 3 additions & 3 deletions plugins/destination/meilisearch/client/client.go
Expand Up @@ -8,8 +8,8 @@ import (
"time"

"github.com/cloudquery/plugin-pb-go/specs"
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
"github.com/cloudquery/plugin-sdk/v2/schema"
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/meilisearch/meilisearch-go"
"github.com/rs/zerolog"
)
Expand All @@ -31,7 +31,7 @@ func (c *Client) Close(context.Context) error {
return nil
}

func (*Client) DeleteStale(context.Context, schema.Schemas, string, time.Time) error {
func (*Client) DeleteStale(context.Context, schema.Tables, string, time.Time) error {
return fmt.Errorf("DeleteStale not supported")
}

Expand Down
2 changes: 1 addition & 1 deletion plugins/destination/meilisearch/client/client_test.go
Expand Up @@ -7,7 +7,7 @@ import (

"github.com/cloudquery/cloudquery/plugins/destination/meilisearch/resources/plugin"
"github.com/cloudquery/plugin-pb-go/specs"
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
)

var migrateStrategy = destination.MigrateStrategy{
Expand Down
14 changes: 4 additions & 10 deletions plugins/destination/meilisearch/client/hash.go
Expand Up @@ -4,27 +4,21 @@ import (
"crypto/sha256"
"fmt"

"github.com/apache/arrow/go/v13/arrow"
"github.com/cloudquery/plugin-sdk/v2/schema"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/google/uuid"
)

const hashColumnName = "_cq_pk_hash_uuid"

func hashUUID(sc *arrow.Schema) func(map[string]any) string {
pk := schema.PrimaryKeyIndices(sc)
func hashUUID(table *schema.Table) func(map[string]any) string {
pk := table.PrimaryKeys()
if len(pk) == 0 {
return func(map[string]any) string { return uuid.New().String() }
}

names := make([]string, len(pk))
for i, idx := range pk {
names[i] = sc.Field(idx).Name
}

return func(row map[string]any) string {
h := sha256.New()
for _, name := range names {
for _, name := range pk {
h.Write([]byte(name))
h.Write([]byte(fmt.Sprint(row[name])))
}
Expand Down
15 changes: 5 additions & 10 deletions plugins/destination/meilisearch/client/index.go
Expand Up @@ -4,8 +4,7 @@ import (
"context"
"fmt"

"github.com/apache/arrow/go/v13/arrow"
"github.com/cloudquery/plugin-sdk/v2/schema"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/meilisearch/meilisearch-go"
"golang.org/x/exp/slices"
)
Expand Down Expand Up @@ -35,19 +34,15 @@ func (i *indexSchema) canMigrate(o *indexSchema) bool {
return i.UID == o.UID && i.PrimaryKey == o.PrimaryKey
}

func (c *Client) tableIndexSchema(sc *arrow.Schema) *indexSchema {
attributes := make([]string, len(sc.Fields()))
for i, fld := range sc.Fields() {
attributes[i] = fld.Name
}
func (c *Client) tableIndexSchema(table *schema.Table) *indexSchema {
return &indexSchema{
UID: schema.TableName(sc),
UID: table.Name,
PrimaryKey: c.pkColumn,
Attributes: attributes,
Attributes: table.Columns.Names(),
}
}

func (c *Client) tablesIndexSchemas(tables schema.Schemas) map[string]*indexSchema {
func (c *Client) tablesIndexSchemas(tables schema.Tables) map[string]*indexSchema {
res := make(map[string]*indexSchema)
for _, table := range tables {
s := c.tableIndexSchema(table)
Expand Down
6 changes: 3 additions & 3 deletions plugins/destination/meilisearch/client/migrate.go
Expand Up @@ -5,18 +5,18 @@ import (
"fmt"

"github.com/cloudquery/plugin-pb-go/specs"
"github.com/cloudquery/plugin-sdk/v2/schema"
"github.com/cloudquery/plugin-sdk/v3/schema"
)

func (c *Client) Migrate(ctx context.Context, tables schema.Schemas) error {
func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error {
c.logger.Info().Msg("Migrate")

have, err := c.indexes()
if err != nil {
return err
}

want := c.tablesIndexSchemas(tables)
want := c.tablesIndexSchemas(tables.FlattenTables())

var recreate, create, update []*indexSchema
for uid, need := range want {
Expand Down
7 changes: 4 additions & 3 deletions plugins/destination/meilisearch/client/read.go
Expand Up @@ -7,12 +7,13 @@ import (
"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/cloudquery/plugin-sdk/v2/schema"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/meilisearch/meilisearch-go"
)

func (c *Client) Read(_ context.Context, sc *arrow.Schema, sourceName string, res chan<- arrow.Record) error {
index, err := c.Meilisearch.GetIndex(schema.TableName(sc))
func (c *Client) Read(_ context.Context, table *schema.Table, sourceName string, res chan<- arrow.Record) error {
sc := table.ToArrowSchema()
index, err := c.Meilisearch.GetIndex(table.Name)
if err != nil {
return err
}
Expand Down
23 changes: 17 additions & 6 deletions plugins/destination/meilisearch/client/transform.go
@@ -1,11 +1,12 @@
package client

import (
"fmt"
"bytes"

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/cloudquery/plugin-sdk/v2/types"
"github.com/cloudquery/plugin-sdk/v3/types"
"github.com/goccy/go-json"
)

func timestampValues(arr *array.Timestamp) []any {
Expand Down Expand Up @@ -77,6 +78,12 @@ func reverseTransform(builder array.Builder, val any) error {
builder.Append(float32(val.(float64)))
case *array.Float64Builder:
builder.Append(val.(float64))
case *array.BinaryBuilder:
return builder.AppendValueFromString(val.(string))
case *array.StringBuilder:
builder.Append(val.(string))
case *array.LargeStringBuilder:
builder.Append(val.(string))
case *types.JSONBuilder:
builder.Append(val)
case array.ListLikeBuilder:
Expand All @@ -87,14 +94,18 @@ func reverseTransform(builder array.Builder, val any) error {
return err
}
}

default:
v, ok := val.(string)
if !ok {
return fmt.Errorf("unsupported type %T with builder %T", val, builder)
data, err := json.MarshalWithOption(val, json.DisableHTMLEscape())
if err != nil {
return err
}
if err := builder.AppendValueFromString(v); err != nil {

dec := json.NewDecoder(bytes.NewReader(data))
if err := builder.UnmarshalOne(dec); err != nil {
return err
}
}

return nil
}
23 changes: 10 additions & 13 deletions plugins/destination/meilisearch/client/write.go
Expand Up @@ -6,21 +6,21 @@ import (

"github.com/apache/arrow/go/v13/arrow"
"github.com/cloudquery/plugin-pb-go/specs"
"github.com/cloudquery/plugin-sdk/v2/schema"
"github.com/cloudquery/plugin-sdk/v3/schema"
)

func (c *Client) WriteTableBatch(ctx context.Context, sc *arrow.Schema, records []arrow.Record) error {
index, err := c.Meilisearch.GetIndex(schema.TableName(sc))
func (c *Client) WriteTableBatch(ctx context.Context, table *schema.Table, records []arrow.Record) error {
index, err := c.Meilisearch.GetIndex(table.Name)
if err != nil {
return err
}

var transformer rowTransformer
switch c.dstSpec.WriteMode {
case specs.WriteModeAppend:
transformer = toMap(sc)
transformer = toMap(table)
case specs.WriteModeOverwrite, specs.WriteModeOverwriteDeleteStale:
transformer = toMapWithHash(sc)
transformer = toMapWithHash(table)
default:
return fmt.Errorf("unsupported write mode %q", c.dstSpec.WriteMode.String())
}
Expand Down Expand Up @@ -48,11 +48,8 @@ func (c *Client) WriteTableBatch(ctx context.Context, sc *arrow.Schema, records

type rowTransformer func(record arrow.Record) ([]map[string]any, error)

func toMap(sc *arrow.Schema) rowTransformer {
columns := make([]string, len(sc.Fields()))
for i, fld := range sc.Fields() {
columns[i] = fld.Name
}
func toMap(table *schema.Table) rowTransformer {
columns := table.Columns.Names()
return func(record arrow.Record) ([]map[string]any, error) {
byColumn := make(map[string][]any, len(columns))
for i, col := range record.Columns() {
Expand All @@ -62,9 +59,9 @@ func toMap(sc *arrow.Schema) rowTransformer {
}
}

func toMapWithHash(sc *arrow.Schema) rowTransformer {
m := toMap(sc)
h := hashUUID(sc)
func toMapWithHash(table *schema.Table) rowTransformer {
m := toMap(table)
h := hashUUID(table)
return func(record arrow.Record) ([]map[string]any, error) {
rows, err := m(record)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions plugins/destination/meilisearch/go.mod
Expand Up @@ -5,7 +5,8 @@ go 1.19
require (
github.com/apache/arrow/go/v13 v13.0.0-20230509040948-de6c3cd2b604
github.com/cloudquery/plugin-pb-go v1.0.8
github.com/cloudquery/plugin-sdk/v2 v2.7.0
github.com/cloudquery/plugin-sdk/v3 v3.5.1
github.com/goccy/go-json v0.9.11
github.com/google/uuid v1.3.0
github.com/meilisearch/meilisearch-go v0.24.0
github.com/rs/zerolog v1.29.0
Expand All @@ -19,10 +20,10 @@ replace github.com/apache/arrow/go/v13 => github.com/cloudquery/arrow/go/v13 v13
require (
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/apache/thrift v0.16.0 // indirect
github.com/cloudquery/plugin-sdk/v2 v2.7.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/getsentry/sentry-go v0.20.0 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/goccy/go-json v0.9.11 // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
Expand All @@ -39,6 +40,7 @@ require (
github.com/mattn/go-isatty v0.0.18 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/cobra v1.6.1 // indirect
Expand Down
3 changes: 3 additions & 0 deletions plugins/destination/meilisearch/go.sum
Expand Up @@ -51,6 +51,8 @@ github.com/cloudquery/plugin-pb-go v1.0.8 h1:wn3GXhcNItcP+6wUUZuzUFbvdL59liKBO37
github.com/cloudquery/plugin-pb-go v1.0.8/go.mod h1:vAGA27psem7ZZNAY4a3S9TKuA/JDQWstjKcHPJX91Mc=
github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U=
github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug=
github.com/cloudquery/plugin-sdk/v3 v3.5.1 h1:797hWUEsojwvp7xtr6LSaf5tk5iG9UDixoRACxu3xrU=
github.com/cloudquery/plugin-sdk/v3 v3.5.1/go.mod h1:3JrZXEULmGXpkOukVaRIzaA63d7TJr9Ukp6hemTjbtc=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
Expand Down Expand Up @@ -185,6 +187,7 @@ github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8D
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down
4 changes: 2 additions & 2 deletions plugins/destination/meilisearch/main.go
Expand Up @@ -3,8 +3,8 @@ package main
import (
"github.com/cloudquery/cloudquery/plugins/destination/meilisearch/client"
"github.com/cloudquery/cloudquery/plugins/destination/meilisearch/resources/plugin"
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
"github.com/cloudquery/plugin-sdk/v2/serve"
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
"github.com/cloudquery/plugin-sdk/v3/serve"
)

const (
Expand Down

0 comments on commit 3ffb2af

Please sign in to comment.