Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

Commit

Permalink
addressing test failures
Browse files Browse the repository at this point in the history
  • Loading branch information
jrbrinlee1 committed Apr 9, 2023
1 parent e24c73e commit 5e13f5e
Show file tree
Hide file tree
Showing 19 changed files with 244 additions and 226 deletions.
40 changes: 26 additions & 14 deletions .gitlab/.gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,9 @@ run go tests dax/test/dax:
run go tests cli kafka integration:
stage: integration
image: golang:$GOVERSION
tags:
- shell
- aws
variables:
KAFKA_RUNNER_TEST_FEATUREBASE_HOST: pilosa:10101
KAFKA_RUNNER_TEST_FEATUREBASEGRPC_HOST: pilosa:20101
Expand All @@ -375,19 +378,14 @@ run go tests cli kafka integration:
script:
- echo "running fbsql integration tests"
- cd ./idk/
- BRANCH_NAME=${CI_COMMIT_REF_SLUG} make start-pilosa
- BRANCH_NAME=${CI_COMMIT_REF_SLUG} make start-zookeeper
- BRANCH_NAME=${CI_COMMIT_REF_SLUG} make start-kafka
- BRANCH_NAME=${CI_COMMIT_REF_SLUG} make start-schema-registry
- cd ..
- go test -coverprofile=coverage-cli-kafka-integration.out -run -timeout=10m TestKafkaRunner ./cli
- make test-cli
after_script:
- cd ./idk/
- make save-pilosa-logs
- make shutdown
- rm -rf /mnt/ramdisk/test-$CI_JOB_ID
artifacts:
paths:
- coverage-cli-kafka-integration.out
- ./idk/testdata/*_coverage.out
needs:
- job: build amd container fb
- job: build fbsql amd64
Expand Down Expand Up @@ -523,16 +521,23 @@ upload to sonarcloud:
package for linux amd64:
stage: build
image: golang:$GOVERSION
extends: .go-cache
rules:
- if: '$CI_PIPELINE_SOURCE == "push" || $CI_PIPELINE_SOURCE == "schedule" || $CI_PIPELINE_SOURCE == "web"'
variables:
GOOS: "linux"
GOARCH: "amd64"
script:
- export VERSION=$(git describe --tags 2>/dev/null || echo unknown)
- echo 'deb [trusted=yes] https://repo.goreleaser.com/apt/ /' | tee /etc/apt/sources.list.d/goreleaser.list
- apt update && apt install nfpm=2.11.3
- make package
- apt-get update -y && apt-get install -y -qq nfpm=2.11.3
- nfpm version # smoke
- mv "featurebase_${GOOS}_${GOARCH}" featurebase
- mv "./build/fbsql_${GOOS}_${GOARCH}" fbsql
- nfpm package --packager deb --target "featurebase.${VERSION}.${GOARCH}.deb"
- nfpm package --packager rpm --target "featurebase.${VERSION}.${GOARCH}.rpm"
needs:
- build featurebase
- build fbsql amd64
artifacts:
paths:
- "*.deb"
Expand Down Expand Up @@ -561,16 +566,23 @@ trigger_m-cloud-images:
package for linux arm64:
stage: build
image: golang:$GOVERSION
extends: .go-cache
rules:
- if: '$CI_PIPELINE_SOURCE == "push" || $CI_PIPELINE_SOURCE == "schedule" || $CI_PIPELINE_SOURCE == "web"'
variables:
GOOS: "linux"
GOARCH: "arm64"
script:
- export VERSION=$(git describe --tags 2>/dev/null || echo unknown)
- echo 'deb [trusted=yes] https://repo.goreleaser.com/apt/ /' | tee /etc/apt/sources.list.d/goreleaser.list
- apt update && apt install nfpm=2.11.3
- make package
- apt-get update -y && apt-get install -y -qq nfpm=2.11.3
- nfpm version # smoke
- mv "featurebase_${GOOS}_${GOARCH}" featurebase
- mv "./build/fbsql_${GOOS}_${GOARCH}" fbsql
- nfpm package --packager deb --target "featurebase.${VERSION}.${GOARCH}.deb"
- nfpm package --packager rpm --target "featurebase.${VERSION}.${GOARCH}.rpm"
needs:
- build featurebase
- build fbsql arm64 linux
artifacts:
paths:
- "*.deb"
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ build:

package:
GOOS=$(GOOS) GOARCH=$(GOARCH) $(MAKE) build
GOOS=$(GOOS) GOARCH=$(GOARCH) $(MAKE) build-fbsql
GOOS=$(GOOS) GOARCH=$(GOARCH) $(MAKE) docker-build-fbsql
GOARCH=$(GOARCH) VERSION=$(VERSION) nfpm package --packager deb --target featurebase.$(VERSION).$(GOARCH).deb
GOARCH=$(GOARCH) VERSION=$(VERSION) nfpm package --packager rpm --target featurebase.$(VERSION).$(GOARCH).rpm

Expand Down
7 changes: 5 additions & 2 deletions cli/cli_kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io/ioutil"
"os"
"sort"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -340,9 +341,11 @@ func verifyQueryReponse(t *testing.T, wqr *featurebase.WireQueryResponse, expect
for j, element := range line {
switch newElement := element.(type) {
case featurebase.StringSet:
newline[j] = newElement.SortedStringSlice()
sort.Strings(newElement)
newline[j] = newElement
case featurebase.IDSet:
newline[j] = newElement.SortedInt64Slice()
sort.Slice(newElement, func(i, j int) bool { return newElement[i] < newElement[j] })
newline[j] = newElement
default:
newline[j] = element
}
Expand Down
16 changes: 8 additions & 8 deletions cli/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ func (cmd *Command) newKafkaRunner(cfgFile string) (*kafka.Runner, error) {

cfg, err := kafka.ConfigFromFile(cfgFile)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "getting config from file")
}

if err := kafka.ValidateConfig(cfg); err != nil {
Expand Down Expand Up @@ -49,14 +49,14 @@ func (cmd *Command) newKafkaRunner(cfgFile string) (*kafka.Runner, error) {
return nil, errors.Wrap(err, "getting fields from config")
}

// for avro, let the SchemaManager and IDK handle fields
if cfg.Encode == "avro" {
flds = nil
}

return kafka.NewRunner(
kr := kafka.NewRunner(
idkCfg,
batch.NewSQLBatcher(cmd, flds),
cmd.stderr,
), nil
)

// set pilosa host which is the only IDK config to come through
// configuration flags rather than the kafka config file
kr.Main.PilosaHosts = []string{cmd.host + ":" + cmd.port}
return kr, nil
}
82 changes: 49 additions & 33 deletions cli/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,14 @@ type Config struct {
Table string `mapstructure:"table" help:"Destination table name."`
Fields []Field `mapstructure:"fields"`

SchemaRegistryURL string `mapstructure:"schema-registry-url" help:"host and port of schema registry. Defaults to localhost:8081"`
SchemaRegistryUsername string `mapstructure:"schema-registry-username" help:"authenticaion key provided by confluent for schema registry."`
SchemaRegistryPassword string `mapstructure:"schema-registry-password" help:"authenticaion secret provided by confluent for schema registry."`

Encode string `mapstructure:"encode" help:"Encoding format (currently supported formats: avro, json)"`
AllowMissingFields bool `mapstructure:"allow-missing-fields" help:"allow missing fields in messages from kafka"`
MaxMessages int `mapstructure:"max-messages" help:"max messages read from kakfka"`
ConfluentConfig string `mapstructure:"confluent-config" help:"max messages read from kakfka"`
ConfluentConfig string `mapstructure:"confluent-config" help:"path to JSON file mapping librdkafka consumer configurations to configuration values"`
}

// Field is a user-facing configuration field.
Expand All @@ -62,6 +66,10 @@ type ConfigForIDK struct {
PrimaryKeys []string
Fields []idk.RawField

SchemaRegistryURL string
SchemaRegistryUsername string
SchemaRegistryPassword string

Encode string
AllowMissingFields bool
MaxMessages int
Expand Down Expand Up @@ -117,40 +125,40 @@ func ValidateConfig(c Config) error {
return validateConfigJSON(c)
case encodingTypeAvro:
return validateConfigAvro(c)
default:
return errors.Errorf("encode configuration value must be %s or %s: got %s", encodingTypeJSON, encodingTypeAvro, c.Encode)
}

return nil

}

func validateConfigJSON(c Config) error {

if len(c.Fields) > 0 {
switch len(c.Fields) {
case 0:
// We only need to do these checks if any fields are specified at all.
// If no fields are specified, that's ok because then we default to
// using fields based off the existing table.
if len(c.Fields) < 2 {
return errors.Errorf("at least two fields are required (one should be a primary key)")
} else {
var found int
for i := range c.Fields {
if c.Fields[i].PrimaryKey {
found++
}
if c.Fields[i].Name == "" {
return errors.Errorf("a name attribute (which isn't equal to \"\") should exist for all fields")
}
if c.Fields[i].SourceType == "" {
return errors.Errorf("a source-type attribute (which isn't equal to \"\") should exist for all fields")
}
return nil
case 1:
return errors.Errorf("at least two fields are required (one should be a primary key)")
default:
var found int
for i := range c.Fields {
if c.Fields[i].PrimaryKey {
found++
}
if c.Fields[i].Name == "" {
return errors.Errorf("a name attribute (which isn't equal to \"\") should exist for all fields")
}
if found < 1 {
return errors.Errorf("at least one primary key field is required")
if c.Fields[i].SourceType == "" {
return errors.Errorf("a source-type attribute (which isn't equal to \"\") should exist for all fields")
}
}
if found < 1 {
return errors.Errorf("at least one primary key field is required")
}
return nil
}

return nil
}

// Only primary key fields required
Expand Down Expand Up @@ -181,17 +189,20 @@ func ConvertConfig(c Config) (ConfigForIDK, error) {

// Copy all the shared members from Config to ConfigForIDK.
out := ConfigForIDK{
Hosts: c.Hosts,
Group: c.Group,
Topics: c.Topics,
BatchSize: c.BatchSize,
BatchMaxStaleness: c.BatchMaxStaleness,
Timeout: c.Timeout,
Table: c.Table,
Encode: c.Encode,
AllowMissingFields: c.AllowMissingFields,
MaxMessages: c.MaxMessages,
ConfluentConfig: c.ConfluentConfig,
Hosts: c.Hosts,
Group: c.Group,
Topics: c.Topics,
BatchSize: c.BatchSize,
BatchMaxStaleness: c.BatchMaxStaleness,
Timeout: c.Timeout,
Table: c.Table,
Encode: c.Encode,
AllowMissingFields: c.AllowMissingFields,
MaxMessages: c.MaxMessages,
ConfluentConfig: c.ConfluentConfig,
SchemaRegistryURL: c.SchemaRegistryURL,
SchemaRegistryUsername: c.SchemaRegistryUsername,
SchemaRegistryPassword: c.SchemaRegistryPassword,
}

if len(c.Fields) == 0 {
Expand Down Expand Up @@ -282,6 +293,11 @@ func ConfigToFields(c Config, primaryKeys []string) ([]*dax.Field, error) {
// capacity to `len(c.Fields)-1`.
out := make([]*dax.Field, 0, len(c.Fields))

// for avro, let the SchemaManager and IDK handle fields
if c.Encode == encodingTypeAvro {
return nil, nil
}

for _, fld := range c.Fields {
// When we have a single primary key, don't also store that value as a
// field in FeatureBase. However, when we have more than one primary
Expand Down
8 changes: 4 additions & 4 deletions cli/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@ func TestConfigFromFile(t *testing.T) {
tests := []configFromFileTest{
{ // confirm json config struct fields are being set
configFilePath: "./testdata/config/config01.toml",
expectedStruct: `{"Hosts":["kafka:9090"],"Group":"testGroup","Topics":["testTopic"],"BatchSize":35,"BatchMaxStaleness":25000000000,"Timeout":16000000000,"Table":"testTable","Fields":[{"Name":"id","SourceType":"id","SourcePath":null,"PrimaryKey":true},{"Name":"name","SourceType":"string","SourcePath":["test","path"],"PrimaryKey":false},{"Name":"age","SourceType":"int","SourcePath":null,"PrimaryKey":false},{"Name":"hobbies","SourceType":"stringset","SourcePath":null,"PrimaryKey":false}],"Encode":"json","AllowMissingFields":false,"MaxMessages":0,"ConfluentConfig":""}`,
expectedStruct: `{"Hosts":["kafka:9090"],"Group":"testGroup","Topics":["testTopic"],"BatchSize":35,"BatchMaxStaleness":25000000000,"Timeout":16000000000,"Table":"testTable","Fields":[{"Name":"id","SourceType":"id","SourcePath":null,"PrimaryKey":true},{"Name":"name","SourceType":"string","SourcePath":["test","path"],"PrimaryKey":false},{"Name":"age","SourceType":"int","SourcePath":null,"PrimaryKey":false},{"Name":"hobbies","SourceType":"stringset","SourcePath":null,"PrimaryKey":false}],"SchemaRegistryURL":"","SchemaRegistryUsername":"","SchemaRegistryPassword":"","Encode":"json","AllowMissingFields":false,"MaxMessages":0,"ConfluentConfig":""}`,
},
{ // confirm defaults are being set
configFilePath: "./testdata/config/config00.toml",
expectedStruct: `{"Hosts":["localhost:9092"],"Group":"default-featurebase-group","Topics":["topic00"],"BatchSize":1,"BatchMaxStaleness":5000000000,"Timeout":5000000000,"Table":"table00","Fields":[{"Name":"id","SourceType":"string","SourcePath":null,"PrimaryKey":true},{"Name":"name","SourceType":"string","SourcePath":null,"PrimaryKey":false},{"Name":"age","SourceType":"int","SourcePath":null,"PrimaryKey":false},{"Name":"hobbies","SourceType":"stringset","SourcePath":null,"PrimaryKey":false}],"Encode":"json","AllowMissingFields":false,"MaxMessages":0,"ConfluentConfig":""}`,
expectedStruct: `{"Hosts":["localhost:9092"],"Group":"default-featurebase-group","Topics":["topic00"],"BatchSize":1,"BatchMaxStaleness":5000000000,"Timeout":5000000000,"Table":"table00","Fields":[{"Name":"id","SourceType":"string","SourcePath":null,"PrimaryKey":true},{"Name":"name","SourceType":"string","SourcePath":null,"PrimaryKey":false},{"Name":"age","SourceType":"int","SourcePath":null,"PrimaryKey":false},{"Name":"hobbies","SourceType":"stringset","SourcePath":null,"PrimaryKey":false}],"SchemaRegistryURL":"","SchemaRegistryUsername":"","SchemaRegistryPassword":"","Encode":"json","AllowMissingFields":false,"MaxMessages":0,"ConfluentConfig":""}`,
},
{ // confirm avro config struct field are being set
configFilePath: "./testdata/config/config02.toml",
expectedStruct: `{"Hosts":["kafka:9090"],"Group":"testGroup","Topics":["testTopic"],"BatchSize":35,"BatchMaxStaleness":25000000000,"Timeout":16000000000,"Table":"testTable","Fields":[{"Name":"id","SourceType":"id","SourcePath":null,"PrimaryKey":true},{"Name":"name","SourceType":"string","SourcePath":["test","path"],"PrimaryKey":false},{"Name":"age","SourceType":"int","SourcePath":null,"PrimaryKey":false},{"Name":"hobbies","SourceType":"stringset","SourcePath":null,"PrimaryKey":false}],"Encode":"avro","AllowMissingFields":false,"MaxMessages":0,"ConfluentConfig":""}`,
expectedStruct: `{"Hosts":["kafka:9090"],"Group":"testGroup","Topics":["testTopic"],"BatchSize":35,"BatchMaxStaleness":25000000000,"Timeout":16000000000,"Table":"testTable","Fields":[{"Name":"id","SourceType":"id","SourcePath":null,"PrimaryKey":true},{"Name":"name","SourceType":"string","SourcePath":["test","path"],"PrimaryKey":false},{"Name":"age","SourceType":"int","SourcePath":null,"PrimaryKey":false},{"Name":"hobbies","SourceType":"stringset","SourcePath":null,"PrimaryKey":false}],"SchemaRegistryURL":"","SchemaRegistryUsername":"","SchemaRegistryPassword":"","Encode":"avro","AllowMissingFields":false,"MaxMessages":0,"ConfluentConfig":""}`,
},
{ // confirm avro config struct field are being set
configFilePath: "./testdata/config/config03.toml",
expectedStruct: `{"Hosts":["kafka:9090"],"Group":"testGroup","Topics":["testTopic"],"BatchSize":35,"BatchMaxStaleness":25000000000,"Timeout":16000000000,"Table":"testTable","Fields":[{"Name":"id","SourceType":"id","SourcePath":null,"PrimaryKey":true},{"Name":"name","SourceType":"string","SourcePath":["test","path"],"PrimaryKey":false},{"Name":"age","SourceType":"int","SourcePath":null,"PrimaryKey":false},{"Name":"hobbies","SourceType":"stringset","SourcePath":null,"PrimaryKey":false}],"Encode":"avro","AllowMissingFields":true,"MaxMessages":100,"ConfluentConfig":"./test/confluent/config.json"}`,
expectedStruct: `{"Hosts":["kafka:9090"],"Group":"testGroup","Topics":["testTopic"],"BatchSize":35,"BatchMaxStaleness":25000000000,"Timeout":16000000000,"Table":"testTable","Fields":[{"Name":"id","SourceType":"id","SourcePath":null,"PrimaryKey":true},{"Name":"name","SourceType":"string","SourcePath":["test","path"],"PrimaryKey":false},{"Name":"age","SourceType":"int","SourcePath":null,"PrimaryKey":false},{"Name":"hobbies","SourceType":"stringset","SourcePath":null,"PrimaryKey":false}],"SchemaRegistryURL":"","SchemaRegistryUsername":"","SchemaRegistryPassword":"","Encode":"avro","AllowMissingFields":true,"MaxMessages":100,"ConfluentConfig":"./test/confluent/config.json"}`,
},
}

Expand Down
12 changes: 8 additions & 4 deletions cli/kafka/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ func NewRunner(cfg ConfigForIDK, batcher fbbatch.Batcher, logWriter io.Writer) *
}

// NewSource should be set based on the encoding of the source (e.g. JSON, Avro)
if cfg.Encode == encodingTypeAvro {
switch cfg.Encode {
case encodingTypeAvro:
kr.GetAvroNewSource(cfg)
} else if cfg.Encode == encodingTypeJSON {
default:
kr.GetJSONNewSource(cfg)
}

Expand Down Expand Up @@ -89,16 +90,19 @@ func (r *Runner) GetAvroNewSource(cfg ConfigForIDK) {
r.NewSource = func() (idk.Source, error) {
source := kafka.NewSource()
source.KafkaBootstrapServers = r.Hosts
source.SchemaRegistryURL = cfg.SchemaRegistryURL
source.SchemaRegistryUsername = cfg.SchemaRegistryUsername
source.SchemaRegistryPassword = cfg.SchemaRegistryPassword
source.Group = r.Group
source.Topics = r.Topics
source.Log = r.Main.Log()
source.Timeout = r.Timeout
source.KafkaConfiguration = cfg.ConfluentConfig
confluentcfg, err := common.SetupConfluent(&r.ConfluentCommand)
confluentCfg, err := common.SetupConfluent(&r.ConfluentCommand)
if err != nil {
return nil, err
}
source.ConfigMap = confluentcfg
source.ConfigMap = confluentCfg

err = source.Open()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cli/kafka/testdata/runner/config/config04.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ batch-size = 1
batch-max-staleness = "5s"
timeout = "5s"
encode = "avro"
schemaRegistryHost = "localhost:8081"
schema-registry-url = "SCHEMA_REGISTRY_SERVICE"
max-messages = MAX_MESSAGES

[[fields]]
Expand Down
2 changes: 1 addition & 1 deletion cli/kafka/testdata/runner/config/config06.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ batch-size = 1
batch-max-staleness = "5s"
timeout = "5s"
encode = "avro"
schemaRegistryHost = "localhost:8081"
schema-registry-url = "SCHEMA_REGISTRY_SERVICE"
max-messages = MAX_MESSAGES

[[fields]]
Expand Down
2 changes: 1 addition & 1 deletion cli/kafka/testdata/runner/config/config07.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ batch-size = 1
batch-max-staleness = "5s"
timeout = "5s"
encode = "avro"
schemaRegistryHost = "localhost:8081"
schema-registry-url = "SCHEMA_REGISTRY_SERVICE"
max-messages = MAX_MESSAGES

[[fields]]
Expand Down
28 changes: 28 additions & 0 deletions idk/Dockerfile-cli-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
ARG GO_VERSION=1.19

FROM golang:${GO_VERSION} as build_base

WORKDIR /
RUN ["apt-get","update","-y"]
RUN ["apt-get","install","-y","git","unixodbc","unixodbc-dev","netcat", "build-essential","musl-tools"]
RUN ["git", "clone", "https://github.com/edenhill/librdkafka.git"]
WORKDIR /librdkafka
RUN ["./configure", "--install-deps"]
RUN ["./configure", "--prefix", "/usr"]
RUN ["make"]
RUN ["make", "install"]

FROM build_base

ARG KAFKA_RUNNER_TEST_FEATUREBASE_HOST=pilosa:10101
ARG KAFKA_RUNNER_TEST_FEATUREBASEGRPC_HOST=pilosa:20101
ARG KAFKA_RUNNER_TEST_KAFKA_HOST=kafka:9092
ARG KAFKA_RUNNER_TEST_REGISTRY_HOST=schema-registry:8081

WORKDIR /go/src/github.com/featurebasedb/featurebase/

COPY . .

WORKDIR /go/src/github.com/featurebasedb/featurebase/cli/

CMD ["go","test","-v","-mod=vendor","-tags=odbc,dynamic" "-run TestKafkaRunner","./..."]

0 comments on commit 5e13f5e

Please sign in to comment.