Skip to content

Commit

Permalink
feat!: Migrate Elasticsearch to Plugin SDK v3 for native Arrow support (
Browse files Browse the repository at this point in the history
#10967)

Closes #10716

BEGIN_COMMIT_OVERRIDE
feat: Update to use [Apache Arrow](https://arrow.apache.org/) type system

BREAKING-CHANGE: This release introduces an internal change to our type system to use [Apache Arrow](https://arrow.apache.org/). This should have only one visible breaking change: JSON columns are now stored as string, due to limitations of Elasticsearch when storing JSON lists with mixed types. If you encounter an issue during the upgrade, please submit a [bug report](https://github.com/cloudquery/cloudquery/issues/new/choose).

END_COMMIT_OVERRIDE
  • Loading branch information
hermanschaaf committed May 25, 2023
1 parent 612d3ea commit 0ef3e12
Show file tree
Hide file tree
Showing 13 changed files with 451 additions and 212 deletions.
5 changes: 2 additions & 3 deletions plugins/destination/elasticsearch/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@ import (
"time"

"github.com/cenkalti/backoff/v4"
"github.com/cloudquery/plugin-sdk/plugins/destination"
"github.com/cloudquery/plugin-sdk/specs"
"github.com/cloudquery/plugin-pb-go/specs"
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
"github.com/elastic/go-elasticsearch/v8"
"github.com/rs/zerolog"
)

type Client struct {
destination.UnimplementedUnmanagedWriter
destination.DefaultReverseTransformer
logger zerolog.Logger
spec specs.Destination
metrics destination.Metrics
Expand Down
7 changes: 4 additions & 3 deletions plugins/destination/elasticsearch/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"os"
"testing"

"github.com/cloudquery/plugin-sdk/plugins/destination"
"github.com/cloudquery/plugin-sdk/specs"
"github.com/cloudquery/plugin-pb-go/specs"
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
)

var migrateStrategy = destination.MigrateStrategy{
Expand Down Expand Up @@ -36,5 +36,6 @@ func TestPlugin(t *testing.T) {

MigrateStrategyOverwrite: migrateStrategy,
MigrateStrategyAppend: migrateStrategy,
})
},
)
}
2 changes: 1 addition & 1 deletion plugins/destination/elasticsearch/client/deletestale.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"io"
"time"

"github.com/cloudquery/plugin-sdk/schema"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/elastic/go-elasticsearch/v8/typedapi/core/deletebyquery"
"github.com/elastic/go-elasticsearch/v8/typedapi/types"
"golang.org/x/sync/errgroup"
Expand Down
2 changes: 1 addition & 1 deletion plugins/destination/elasticsearch/client/metrics.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package client

import (
"github.com/cloudquery/plugin-sdk/plugins/destination"
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
)

func (c *Client) Metrics() destination.Metrics {
Expand Down
171 changes: 131 additions & 40 deletions plugins/destination/elasticsearch/client/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"fmt"
"strings"

"github.com/cloudquery/plugin-sdk/schema"
"github.com/apache/arrow/go/v13/arrow"
"github.com/cloudquery/plugin-sdk/v3/schema"
cqtypes "github.com/cloudquery/plugin-sdk/v3/types"
"github.com/elastic/go-elasticsearch/v8/typedapi/types"
)

Expand All @@ -31,45 +33,7 @@ func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error {
func (c *Client) getIndexTemplate(table *schema.Table) (string, error) {
properties := map[string]types.Property{}
for _, col := range table.Columns {
switch col.Type {
case schema.TypeBool:
properties[col.Name] = types.NewBooleanProperty()
case schema.TypeInt:
properties[col.Name] = types.NewIntegerNumberProperty()
case schema.TypeFloat:
properties[col.Name] = types.NewFloatNumberProperty()
case schema.TypeUUID:
properties[col.Name] = types.NewTextProperty()
case schema.TypeString:
properties[col.Name] = types.NewTextProperty()
case schema.TypeByteArray:
properties[col.Name] = types.NewBinaryProperty()
case schema.TypeStringArray:
properties[col.Name] = types.NewTextProperty()
case schema.TypeTimestamp:
d := types.NewDateProperty()
f := "strict_date_optional_time||epoch_millis"
d.Format = &f
properties[col.Name] = d
case schema.TypeJSON:
properties[col.Name] = types.NewObjectProperty()
case schema.TypeUUIDArray:
properties[col.Name] = types.NewTextProperty()
case schema.TypeCIDR:
properties[col.Name] = types.NewIpRangeProperty()
case schema.TypeCIDRArray:
properties[col.Name] = types.NewIpRangeProperty()
case schema.TypeMacAddr:
properties[col.Name] = types.NewTextProperty()
case schema.TypeMacAddrArray:
properties[col.Name] = types.NewTextProperty()
case schema.TypeInet:
properties[col.Name] = types.NewIpRangeProperty()
case schema.TypeInetArray:
properties[col.Name] = types.NewIpRangeProperty()
case schema.TypeIntArray:
properties[col.Name] = types.NewIntegerNumberProperty()
}
properties[col.Name] = arrowTypeToElasticsearchProperty(col.Type)
}
tmp := types.IndexTemplate{
AllowAutoCreate: nil,
Expand All @@ -88,3 +52,130 @@ func (c *Client) getIndexTemplate(table *schema.Table) (string, error) {
b, err := json.Marshal(tmp)
return string(b), err
}

func arrowTypeToElasticsearchProperty(dataType arrow.DataType) types.Property {
if dataType == nil {
return types.NewTextProperty()
}
switch {
// handle known extensions
case typeOneOf(dataType,
cqtypes.ExtensionTypes.UUID,
cqtypes.ExtensionTypes.MAC,
cqtypes.ExtensionTypes.Inet):
return types.NewTextProperty()
case typeOneOf(dataType,
cqtypes.ExtensionTypes.JSON):
return types.NewTextProperty()

// handle nested types
case dataType.ID() == arrow.LIST:
return arrowTypeToElasticsearchProperty(dataType.(*arrow.ListType).Elem())
case dataType.ID() == arrow.LARGE_LIST:
return arrowTypeToElasticsearchProperty(dataType.(*arrow.LargeListType).Elem())
case dataType.ID() == arrow.FIXED_SIZE_LIST:
return arrowTypeToElasticsearchProperty(dataType.(*arrow.FixedSizeListType).Elem())
case dataType.ID() == arrow.STRUCT:
p := types.NewObjectProperty()
for _, field := range dataType.(*arrow.StructType).Fields() {
p.Properties[field.Name] = arrowTypeToElasticsearchProperty(field.Type)
}
return p
case dataType.ID() == arrow.MAP:
p := types.NewObjectProperty()
p.Properties["key"] = arrowTypeToElasticsearchProperty(dataType.(*arrow.MapType).KeyType())
p.Properties["value"] = arrowTypeToElasticsearchProperty(dataType.(*arrow.MapType).ItemType())
return p
// handle primitive types
case typeOneOf(dataType,
arrow.FixedWidthTypes.Boolean):
return types.NewBooleanProperty()
case typeOneOf(dataType,
arrow.PrimitiveTypes.Int8):
return types.NewByteNumberProperty()
case typeOneOf(dataType,
arrow.PrimitiveTypes.Int16):
return types.NewShortNumberProperty()
case typeOneOf(dataType,
arrow.PrimitiveTypes.Int32):
return types.NewIntegerNumberProperty()
case typeOneOf(dataType,
arrow.PrimitiveTypes.Int64):
return types.NewLongNumberProperty()
case typeOneOf(dataType,
arrow.PrimitiveTypes.Uint8,
arrow.PrimitiveTypes.Uint16,
arrow.PrimitiveTypes.Uint32,
arrow.PrimitiveTypes.Uint64):
return types.NewUnsignedLongNumberProperty()
case typeOneOf(dataType,
arrow.FixedWidthTypes.Float16):
return types.NewHalfFloatNumberProperty()
case typeOneOf(dataType,
arrow.PrimitiveTypes.Float32):
return types.NewFloatNumberProperty()
case typeOneOf(dataType,
arrow.PrimitiveTypes.Float64):
return types.NewDoubleNumberProperty()
case typeOneOf(dataType,
arrow.BinaryTypes.String,
arrow.BinaryTypes.LargeString):
return types.NewTextProperty()
case typeOneOf(dataType,
arrow.BinaryTypes.Binary,
arrow.BinaryTypes.LargeBinary):
return types.NewBinaryProperty()
case typeOneOf(dataType,
arrow.FixedWidthTypes.Date32,
arrow.FixedWidthTypes.Date64):
d := types.NewDateProperty()
f := "yyyy-MM-dd"
d.Format = &f
return d
case typeOneOf(dataType,
arrow.FixedWidthTypes.Time32s):
d := types.NewDateProperty()
f := "HH:mm:ss"
d.Format = &f
return d
case typeOneOf(dataType,
arrow.FixedWidthTypes.Time32ms):
d := types.NewDateProperty()
f := "HH:mm:ss.SSS"
d.Format = &f
return d
case typeOneOf(dataType,
arrow.FixedWidthTypes.Time64us,
arrow.FixedWidthTypes.Time64ns):
return types.NewTextProperty()
case typeOneOf(dataType,
arrow.FixedWidthTypes.Timestamp_s,
arrow.FixedWidthTypes.Timestamp_ms,
arrow.FixedWidthTypes.Timestamp_us):
d := types.NewDateProperty()
f := "strict_date_optional_time"
d.Format = &f
return d
case typeOneOf(dataType,
arrow.FixedWidthTypes.Timestamp_ns):
d := types.NewDateNanosProperty()
f := "strict_date_optional_time_nanos"
d.Format = &f
return d
case typeOneOf(dataType,
arrow.FixedWidthTypes.DayTimeInterval,
arrow.FixedWidthTypes.MonthInterval,
arrow.FixedWidthTypes.MonthDayNanoInterval):
return types.NewObjectProperty()
}
return types.NewTextProperty()
}

func typeOneOf(left arrow.DataType, dt ...arrow.DataType) bool {
for _, t := range dt {
if arrow.TypeEqual(left, t) {
return true
}
}
return false
}
78 changes: 62 additions & 16 deletions plugins/destination/elasticsearch/client/read.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
package client

import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"

"github.com/cloudquery/plugin-sdk/schema"
"github.com/goccy/go-json"

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/elastic/go-elasticsearch/v8/typedapi/core/search"
"github.com/elastic/go-elasticsearch/v8/typedapi/types"
)

func (c *Client) Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- []any) error {
func (c *Client) Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- arrow.Record) error {
index := c.getIndexNamePattern(table.Name)

// refresh index before read, to ensure all written data is available
Expand Down Expand Up @@ -49,21 +53,63 @@ func (c *Client) Read(ctx context.Context, table *schema.Table, sourceName strin
return fmt.Errorf("failed to decode response body: %w", err)
}

sm := table.ToArrowSchema()
for _, hit := range result.Hits.Hits {
values := make([]any, len(table.Columns))
for i, col := range table.Columns {
switch col.Type {
case schema.TypeByteArray:
ba, err := base64.StdEncoding.DecodeString(hit.Source[col.Name].(string))
if err != nil {
return fmt.Errorf("failed to decode base64 string: %w", err)
}
values[i] = ba
default:
values[i] = hit.Source[col.Name]
rb := array.NewRecordBuilder(memory.DefaultAllocator, sm)
for i, field := range rb.Fields() {
err := appendValue(field, hit.Source[sm.Field(i).Name])
if err != nil {
return fmt.Errorf("failed to read from table %s: %w", table.Name, err)
}
}
res <- values
res <- rb.NewRecord()
}
return nil
}

func appendValue(builder array.Builder, value any) error {
if value == nil {
builder.AppendNull()
return nil
}
switch bldr := builder.(type) {
case array.ListLikeBuilder:
lst := value.([]any)
if lst == nil {
bldr.AppendNull()
return nil
}
bldr.Append(true)
valBuilder := bldr.ValueBuilder()
for _, v := range lst {
if err := appendValue(valBuilder, v); err != nil {
return err
}
}
return nil
case *array.StructBuilder:
m := value.(map[string]any)
bldr.Append(true)
bldrType := bldr.Type().(*arrow.StructType)
for k, v := range m {
idx, _ := bldrType.FieldIdx(k)
fieldBldr := bldr.FieldBuilder(idx)
if err := appendValue(fieldBldr, v); err != nil {
return err
}
}
return nil
case *array.MonthIntervalBuilder, *array.DayTimeIntervalBuilder, *array.MonthDayNanoIntervalBuilder:
b, err := json.Marshal(value)
if err != nil {
return err
}
dec := json.NewDecoder(bytes.NewReader(b))
return bldr.UnmarshalOne(dec)
case *array.Int8Builder, *array.Int16Builder, *array.Int32Builder, *array.Int64Builder:
return bldr.AppendValueFromString(fmt.Sprintf("%d", int64(value.(float64))))
case *array.Uint8Builder, *array.Uint16Builder, *array.Uint32Builder, *array.Uint64Builder:
return bldr.AppendValueFromString(fmt.Sprintf("%d", uint64(value.(float64))))
}
return builder.AppendValueFromString(fmt.Sprintf("%v", value))
}

0 comments on commit 0ef3e12

Please sign in to comment.