-
Notifications
You must be signed in to change notification settings - Fork 499
/
client.go
108 lines (88 loc) · 2.53 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package client
import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
"github.com/cloudquery/plugin-sdk/v4/message"
"github.com/cloudquery/plugin-sdk/v4/plugin"
"github.com/cloudquery/plugin-sdk/v4/writers/batchwriter"
"github.com/meilisearch/meilisearch-go"
"github.com/rs/zerolog"
)
type Client struct {
Meilisearch *meilisearch.Client
logger zerolog.Logger
spec Spec
writer *batchwriter.BatchWriter
plugin.UnimplementedSource
batchwriter.UnimplementedDeleteRecord
}
var _ plugin.Client = (*Client)(nil)
var _ batchwriter.Client = (*Client)(nil)
func (c *Client) Close(ctx context.Context) error {
if err := c.writer.Close(ctx); err != nil {
return fmt.Errorf("failed to close writer: %w", err)
}
c.Meilisearch = nil
return nil
}
func (*Client) DeleteStale(context.Context, message.WriteDeleteStales) error {
return fmt.Errorf("DeleteStale not supported")
}
func (c *Client) verifyVersion() error {
version, err := c.Meilisearch.Version()
if err != nil {
return err
}
parts := strings.Split(version.PkgVersion, ".")
if len(parts) < 3 {
return fmt.Errorf("malformed version %q (expected \"major.minor.patch\"", version)
}
major, err := strconv.Atoi(parts[0])
if err != nil {
return fmt.Errorf("failed to parse major version (%q): %w", parts[0], err)
}
minor, err := strconv.Atoi(parts[1])
if err != nil {
return fmt.Errorf("failed to parse minor version (%q): %w", parts[1], err)
}
const (
minMajor = 1
minMinor = 1
)
if (major > minMajor) || (major == minMajor && minor >= minMinor) {
return nil
}
return fmt.Errorf("unsupported Meilisearch version %s (must be >= 1.1)", version.PkgVersion)
}
func New(_ context.Context, logger zerolog.Logger, specBytes []byte, _ plugin.NewClientOptions) (plugin.Client, error) {
spec := Spec{}
if err := json.Unmarshal(specBytes, &spec); err != nil {
return nil, fmt.Errorf("failed to unmarshal spec: %w", err)
}
spec.setDefaults()
if err := spec.validate(); err != nil {
return nil, err
}
mClient, err := spec.getClient()
if err != nil {
return nil, err
}
client := &Client{
Meilisearch: mClient,
logger: logger.With().Str("module", "dest-meilisearch").Str("host", spec.Host).Logger(),
spec: spec,
}
client.writer, err = batchwriter.New(client,
batchwriter.WithBatchSize(spec.BatchSize),
batchwriter.WithBatchSizeBytes(spec.BatchSizeBytes),
batchwriter.WithBatchTimeout(spec.BatchTimeout.Duration()),
batchwriter.WithLogger(client.logger),
)
if err != nil {
return nil, err
}
return client, client.verifyVersion()
}