Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 177 additions & 94 deletions schema/testdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package schema
import (
"encoding/base64"
"fmt"
"reflect"
"sort"
"strconv"
"strings"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/cloudquery/plugin-sdk/v4/types"
"github.com/google/uuid"
"golang.org/x/exp/rand"
"golang.org/x/exp/slices"
)

// TestSourceOptions controls which types are included by TestSourceColumns.
Expand All @@ -31,6 +33,176 @@ type TestSourceOptions struct {
SkipDecimals bool
}

// TestSourceColumns returns columns for all Arrow types and composites thereof. TestSourceOptions controls
// which types are included.
func TestSourceColumns(testOpts TestSourceOptions) []Column {
var basicColumns []Column
basicColumns = append(basicColumns, primitiveColumns()...)
basicColumns = append(basicColumns, binaryColumns()...)
basicColumns = append(basicColumns, fixedWidthColumns()...)

// add extensions
basicColumns = append(basicColumns, Column{Name: "uuid", Type: types.NewUUIDType()})
basicColumns = append(basicColumns, Column{Name: "inet", Type: types.NewInetType()})
basicColumns = append(basicColumns, Column{Name: "mac", Type: types.NewMACType()})

// sort and remove duplicates (e.g. date32 and date64 appear twice)
sort.Slice(basicColumns, func(i, j int) bool {
return basicColumns[i].Name < basicColumns[j].Name
})
basicColumns = removeDuplicates(basicColumns)

// we don't support float16 right now
basicColumns = removeColumnsByType(basicColumns, arrow.FLOAT16)

if !testOpts.SkipDecimals {
basicColumns = append(basicColumns, Column{Name: "decimal", Type: &arrow.Decimal128Type{Precision: 19, Scale: 10}})
}

if testOpts.SkipTimestamps {
// for backwards-compatibility, microsecond timestamps are not removed here
basicColumns = removeColumnsByDataType(basicColumns, &arrow.TimestampType{Unit: arrow.Second, TimeZone: "UTC"})
basicColumns = removeColumnsByDataType(basicColumns, &arrow.TimestampType{Unit: arrow.Millisecond, TimeZone: "UTC"})
basicColumns = removeColumnsByDataType(basicColumns, &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "UTC"})
}
if testOpts.SkipDates {
basicColumns = removeColumnsByType(basicColumns, arrow.DATE32, arrow.DATE64)
}
if testOpts.SkipTimes {
basicColumns = removeColumnsByType(basicColumns, arrow.TIME32, arrow.TIME64)
}
if testOpts.SkipIntervals {
basicColumns = removeColumnsByType(basicColumns, arrow.INTERVAL_DAY_TIME, arrow.INTERVAL_MONTHS, arrow.INTERVAL_MONTH_DAY_NANO)
}
if testOpts.SkipDurations {
basicColumns = removeColumnsByType(basicColumns, arrow.DURATION)
}
if testOpts.SkipLargeTypes {
basicColumns = removeColumnsByType(basicColumns, arrow.LARGE_BINARY, arrow.LARGE_STRING)
}

var compositeColumns []Column

// we don't need to include lists of binary or large binary right now; probably no destinations or sources need to support that
basicColumnsWithExclusions := removeColumnsByType(basicColumns, arrow.BINARY, arrow.LARGE_BINARY)
if testOpts.SkipLists {
// only include lists that were originally supported by CQTypes
cqListColumns := []Column{
{Name: "string", Type: arrow.BinaryTypes.String},
{Name: "uuid", Type: types.NewUUIDType()},
{Name: "inet", Type: types.NewInetType()},
{Name: "mac", Type: types.NewMACType()},
}
compositeColumns = append(compositeColumns, listOfColumns(cqListColumns)...)
} else {
compositeColumns = append(compositeColumns, listOfColumns(basicColumnsWithExclusions)...)
}

if !testOpts.SkipMaps {
compositeColumns = append(compositeColumns, mapOfColumns(basicColumnsWithExclusions)...)
}

// add JSON later, we don't want to include it as a list or map right now (it causes complications with JSON unmarshalling)
basicColumns = append(basicColumns, Column{Name: "json", Type: types.NewJSONType()})
basicColumns = append(basicColumns, Column{Name: "json_array", Type: types.NewJSONType()}) // GenTestData knows to populate this with a JSON array

if !testOpts.SkipStructs {
// struct with all the types
compositeColumns = append(compositeColumns, Column{Name: "struct", Type: arrow.StructOf(columnsToFields(basicColumns...)...)})

// struct with nested struct
// compositeColumns = append(compositeColumns, Column{Name: "nested_struct", Type: arrow.StructOf(arrow.Field{Name: "inner", Type: arrow.StructOf(columnsToFields(basicColumns...)...)})})
}

return append(basicColumns, compositeColumns...)
}

// primitiveColumns returns a list of primitive columns as defined by Arrow types.
func primitiveColumns() []Column {
primitiveTypesValue := reflect.ValueOf(arrow.PrimitiveTypes)
primitiveTypesType := reflect.TypeOf(arrow.PrimitiveTypes)
columns := make([]Column, primitiveTypesType.NumField())
for i := 0; i < primitiveTypesType.NumField(); i++ {
fieldName := primitiveTypesType.Field(i).Name
dataType := primitiveTypesValue.FieldByName(fieldName).Interface().(arrow.DataType)
columns[i] = Column{Name: strings.ToLower(fieldName), Type: dataType}
}
return columns
}

// binaryColumns returns a list of binary columns as defined by Arrow types.
func binaryColumns() []Column {
binaryTypesValue := reflect.ValueOf(arrow.BinaryTypes)
binaryTypesType := reflect.TypeOf(arrow.BinaryTypes)
columns := make([]Column, binaryTypesType.NumField())
for i := 0; i < binaryTypesType.NumField(); i++ {
fieldName := binaryTypesType.Field(i).Name
dataType := binaryTypesValue.FieldByName(fieldName).Interface().(arrow.DataType)
columns[i] = Column{Name: strings.ToLower(fieldName), Type: dataType}
}
return columns
}

// fixedWidthColumns returns a list of fixed width columns as defined by Arrow types.
func fixedWidthColumns() []Column {
fixedWidthTypesValue := reflect.ValueOf(arrow.FixedWidthTypes)
fixedWidthTypesType := reflect.TypeOf(arrow.FixedWidthTypes)
columns := make([]Column, fixedWidthTypesType.NumField())
for i := 0; i < fixedWidthTypesType.NumField(); i++ {
fieldName := fixedWidthTypesType.Field(i).Name
dataType := fixedWidthTypesValue.FieldByName(fieldName).Interface().(arrow.DataType)
columns[i] = Column{Name: strings.ToLower(fieldName), Type: dataType}
}
return columns
}

func removeDuplicates(columns []Column) []Column {
newColumns := make([]Column, 0, len(columns))
seen := map[string]struct{}{}
for _, c := range columns {
if _, ok := seen[c.Name]; ok {
continue
}
newColumns = append(newColumns, c)
seen[c.Name] = struct{}{}
}
return slices.Clip(newColumns)
}

func removeColumnsByType(columns []Column, t ...arrow.Type) []Column {
var newColumns []Column
for _, c := range columns {
shouldRemove := false
for _, d := range t {
if c.Type.ID() == d {
shouldRemove = true
break
}
}
if !shouldRemove {
newColumns = append(newColumns, c)
}
}
return newColumns
}

func removeColumnsByDataType(columns []Column, dt ...arrow.DataType) []Column {
var newColumns []Column
for _, c := range columns {
shouldRemove := false
for _, d := range dt {
if arrow.TypeEqual(c.Type, d) {
shouldRemove = true
break
}
}
if !shouldRemove {
newColumns = append(newColumns, c)
}
}
return newColumns
}

// listOfColumns returns a list of columns that are lists of the given columns.
func listOfColumns(baseColumns []Column) []Column {
columns := make([]Column, len(baseColumns))
Expand All @@ -53,7 +225,7 @@ func mapOfColumns(baseColumns []Column) []Column {
return columns
}

func columnsToFields(columns []Column) []arrow.Field {
func columnsToFields(columns ...Column) []arrow.Field {
fields := make([]arrow.Field, len(columns))
for i := range columns {
fields[i] = arrow.Field{
Expand All @@ -64,102 +236,13 @@ func columnsToFields(columns []Column) []arrow.Field {
return fields
}

func TestTable(name string, opts TestSourceOptions) *Table {
t := &Table{
Name: name,
Columns: make(ColumnList, 0),
}
var columns ColumnList
// primitive columns
columns = append(columns, ColumnList{
{Name: "int8", Type: arrow.PrimitiveTypes.Int8},
{Name: "int16", Type: arrow.PrimitiveTypes.Int16},
{Name: "int32", Type: arrow.PrimitiveTypes.Int32},
{Name: "int64", Type: arrow.PrimitiveTypes.Int64},
{Name: "uint8", Type: arrow.PrimitiveTypes.Uint8},
{Name: "uint16", Type: arrow.PrimitiveTypes.Uint16},
{Name: "uint32", Type: arrow.PrimitiveTypes.Uint32},
{Name: "uint64", Type: arrow.PrimitiveTypes.Uint64},
{Name: "float32", Type: arrow.PrimitiveTypes.Float32},
{Name: "float64", Type: arrow.PrimitiveTypes.Float64},
{Name: "binary", Type: arrow.BinaryTypes.Binary},
{Name: "string", Type: arrow.BinaryTypes.String},
}...)
if !opts.SkipDates {
columns = append(columns, ColumnList{
{Name: "date32", Type: arrow.FixedWidthTypes.Date32},
{Name: "date64", Type: arrow.FixedWidthTypes.Date64},
}...)
}
if !opts.SkipDurations {
columns = append(columns, ColumnList{
{Name: "duration_s", Type: arrow.FixedWidthTypes.Duration_s},
{Name: "duration_ms", Type: arrow.FixedWidthTypes.Duration_ms},
{Name: "duration_us", Type: arrow.FixedWidthTypes.Duration_us},
{Name: "duration_ns", Type: arrow.FixedWidthTypes.Duration_ns},
}...)
}

if !opts.SkipIntervals {
columns = append(columns, ColumnList{
{Name: "interval_month", Type: arrow.FixedWidthTypes.MonthInterval},
{Name: "interval_day_time", Type: arrow.FixedWidthTypes.DayTimeInterval},
{Name: "interval_month_day_nano", Type: arrow.FixedWidthTypes.MonthDayNanoInterval},
}...)
}

if !opts.SkipLargeTypes {
columns = append(columns, ColumnList{
{Name: "large_binary", Type: arrow.BinaryTypes.LargeBinary},
{Name: "large_string", Type: arrow.BinaryTypes.LargeString},
}...)
}

if !opts.SkipTimes {
columns = append(columns, ColumnList{
{Name: "time32_s", Type: arrow.FixedWidthTypes.Time32s},
{Name: "time32_ms", Type: arrow.FixedWidthTypes.Time32ms},
{Name: "time64_us", Type: arrow.FixedWidthTypes.Time64us},
{Name: "time64_ns", Type: arrow.FixedWidthTypes.Time64ns},
}...)
}

if !opts.SkipTimestamps {
columns = append(columns, ColumnList{
{Name: "timestamp_s", Type: arrow.FixedWidthTypes.Timestamp_s},
{Name: "timestamp_ms", Type: arrow.FixedWidthTypes.Timestamp_ms},
{Name: "timestamp_us", Type: arrow.FixedWidthTypes.Timestamp_us},
{Name: "timestamp_ns", Type: arrow.FixedWidthTypes.Timestamp_ns},
}...)
}

if !opts.SkipDecimals {
columns = append(columns, ColumnList{
{Name: "decimal128", Type: &arrow.Decimal128Type{Precision: 19, Scale: 10}},
}...)
}

if !opts.SkipStructs {
columns = append(columns, Column{Name: "struct", Type: arrow.StructOf(columnsToFields(columns)...)})

// struct with nested struct
// columns = append(columns, Column{Name: "nested_struct", Type: arrow.StructOf(arrow.Field{Name: "inner", Type: arrow.StructOf(columnsToFields(basicColumns...)...)})})
}

if !opts.SkipLists {
columns = append(columns, listOfColumns(columns)...)
}

if !opts.SkipMaps {
columns = append(columns, mapOfColumns(columns)...)
}
// var PKColumnNames = []string{"uuid_pk"}

t.Columns = append(t.Columns, columns...)
return t
// TestTable returns a table with columns of all types. Useful for destination testing purposes
func TestTable(name string, testOpts TestSourceOptions) *Table {
return &Table{Name: name, Columns: TestSourceColumns(testOpts)}
}

// var PKColumnNames = []string{"uuid_pk"}

// GenTestDataOptions are options for generating test data
type GenTestDataOptions struct {
// SourceName is the name of the source to set in the source_name column.
Expand Down