Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ func (r *Agent) Run(ctx context.Context, recipe recipe.Recipe) (run Run) {
func() error { return runExtractor() },
retryNotification,
)

if err != nil {
run.Error = errors.Wrap(err, "failed to run extractor")
}
Expand Down
2 changes: 1 addition & 1 deletion agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,7 @@ func TestValidate(t *testing.T) {
assert.Equal(t, expectedErrs, errs)
})
t.Run("", func(t *testing.T) {
var invalidRecipe = recipe.Recipe{
invalidRecipe := recipe.Recipe{
Name: "sample",
Source: recipe.PluginRecipe{
Name: "test-extractor",
Expand Down
14 changes: 8 additions & 6 deletions agent/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"github.com/pkg/errors"
)

type streamMiddleware func(src models.Record) (dst models.Record, err error)
type subscriber struct {
callback func([]models.Record) error
channel chan models.Record
batchSize int
}
type (
streamMiddleware func(src models.Record) (dst models.Record, err error)
subscriber struct {
callback func([]models.Record) error
channel chan models.Record
batchSize int
}
)

type stream struct {
middlewares []streamMiddleware
Expand Down
1 change: 0 additions & 1 deletion cmd/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/goto/meteor/registry"
"github.com/goto/salt/printer"
"github.com/goto/salt/term"

"github.com/spf13/cobra"
)

Expand Down
7 changes: 3 additions & 4 deletions cmd/new.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ func NewRecipeCmd() *cobra.Command {
}

return cmd

}

func recipeSinkSurvey() ([]string, error) {
Expand All @@ -124,7 +123,7 @@ func recipeSinkSurvey() ([]string, error) {
return []string{}, errors.New("no sinks found")
}

var qs = []*survey.Question{
qs := []*survey.Question{
{
Name: "sink",
Prompt: &survey.MultiSelect{
Expand Down Expand Up @@ -154,7 +153,7 @@ func recipeProcessorSurvey() ([]string, error) {
return []string{}, errors.New("no processors found")
}

var qs = []*survey.Question{
qs := []*survey.Question{
{
Name: "processor",
Prompt: &survey.MultiSelect{
Expand Down Expand Up @@ -183,7 +182,7 @@ func recipeExtractorSurvey() (string, error) {
return "", errors.New("no extractors found")
}

var qs = []*survey.Question{
qs := []*survey.Question{
{
Name: "extractor",
Prompt: &survey.Select{
Expand Down
2 changes: 1 addition & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

// New adds all child commands to the root command and sets flags appropriately.
func New() *cobra.Command {
var cmd = &cobra.Command{
cmd := &cobra.Command{
Use: "meteor <command> <subcommand> [flags]",
Short: "Metadata CLI",
Long: "Metadata collection tool.",
Expand Down
1 change: 0 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"strings"

"github.com/goto/meteor/cmd"

_ "github.com/goto/meteor/plugins/extractors"
_ "github.com/goto/meteor/plugins/processors"
_ "github.com/goto/meteor/plugins/sinks"
Expand Down
5 changes: 2 additions & 3 deletions metrics/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import (
"net"
"strconv"

"github.com/pkg/errors"

statsd "github.com/etsy/statsd/examples/go"
"github.com/goto/meteor/agent"
"github.com/goto/meteor/recipe"
"github.com/pkg/errors"
)

const (
Expand Down Expand Up @@ -65,7 +64,7 @@ func (m *StatsdMonitor) RecordPlugin(recipeName, pluginName, pluginType string,

// createMetricName creates a metric name for a given recipe and success
func (m *StatsdMonitor) createMetricName(metricName string, recipe recipe.Recipe, success bool) string {
var successText = "false"
successText := "false"
if success {
successText = "true"
}
Expand Down
3 changes: 1 addition & 2 deletions metrics/statsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ import (
"os"
"testing"

"github.com/goto/meteor/test/utils"

"github.com/goto/meteor/agent"
"github.com/goto/meteor/metrics"
"github.com/goto/meteor/recipe"
"github.com/goto/meteor/test/utils"
"github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"
"github.com/stretchr/testify/assert"
Expand Down
3 changes: 0 additions & 3 deletions plugins/extractors/bigquery/auditlog/auditlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ func (l *AuditLog) Collect(ctx context.Context, tableID string) (tableStats *Tab
}

func (l *AuditLog) buildFilter(tableID string) string {

timeNow := time.Now().UTC()
dayDuration := time.Duration(24*l.config.UsagePeriodInDay) * time.Hour
timeFrom := timeNow.Add(-1 * dayDuration)
Expand All @@ -126,7 +125,6 @@ func (l *AuditLog) buildFilter(tableID string) string {
}

func parsePayload(payload interface{}) (ld *LogData, err error) {

ad := &loggingpb.AuditData{}
pl, ok := payload.(*auditpb.AuditLog)
if !ok {
Expand Down Expand Up @@ -166,7 +164,6 @@ func getAuditDataFromServiceData(pl *auditpb.AuditLog, ad *loggingpb.AuditData)
}

func getAuditDataFromMetadata(pl *auditpb.AuditLog, ad *loggingpb.AuditData) error {

if pl.GetMetadata() == nil {
return errors.New("metadata field is nil")
}
Expand Down
1 change: 0 additions & 1 deletion plugins/extractors/bigquery/auditlog/logdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ func (ld *LogData) GetReferencedTablesURN() (refTablesURN []string) {
}

func (ld *LogData) GetQuery() (sqlQuery string, err error) {

if jobConfig := ld.GetJobCompletedEvent().GetJob().GetJobConfiguration(); jobConfig == nil {
err = errors.New("jobConfiguration is nil")
return
Expand Down
1 change: 0 additions & 1 deletion plugins/extractors/bigquery/auditlog/logdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
)

func TestValidateAuditData(t *testing.T) {

t.Run("return error if AuditData does not have JobCompletedEvent data", func(t *testing.T) {
ld := &LogData{
&loggingpb.AuditData{},
Expand Down
3 changes: 1 addition & 2 deletions plugins/extractors/bigquery/auditlog/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (b *TableStats) populateTableUsage(tableURN string) {
b.TableUsage[tableURN]++
}

func (b *TableStats) populateJoinDetail(tableURN string, refTablesURN []string, jcs []string) {
func (b *TableStats) populateJoinDetail(tableURN string, refTablesURN, jcs []string) {
if _, exist := b.JoinDetail[tableURN]; !exist {
b.JoinDetail[tableURN] = map[string]JoinDetail{}
}
Expand Down Expand Up @@ -111,7 +111,6 @@ func (b *TableStats) populateJoinDetail(tableURN string, refTablesURN []string,
}

}

}

func (b *TableStats) populateFilterConditions(tableURN string, fcs []string) {
Expand Down
3 changes: 2 additions & 1 deletion plugins/extractors/bigquery/auditlog/testdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ var testDataLogData3 = &LogData{
*
(SELECT order_id FROM FROM project1.dataset1.table1 WHERE column_1 IS TRUE)
JOIN project3.dataset1.table1
USING (somefield,anotherfield)`},
USING (somefield,anotherfield)`,
},
},
},
JobStatistics: &loggingpb.JobStatistics{
Expand Down
1 change: 0 additions & 1 deletion plugins/extractors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) (err error)

// Extract checks if the table is valid and extracts the table schema
func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) {

// Fetch and iterate over datasets
it := e.client.Datasets(ctx)
it.PageInfo().MaxSize = e.getMaxPageSize()
Expand Down
12 changes: 7 additions & 5 deletions plugins/extractors/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ import (
"context"
"testing"

"github.com/goto/meteor/test/utils"

"github.com/goto/meteor/plugins"
"github.com/goto/meteor/plugins/extractors/bigquery"
"github.com/goto/meteor/test/utils"
"github.com/stretchr/testify/assert"
)

Expand All @@ -23,7 +22,8 @@ func TestInit(t *testing.T) {
URNScope: "test-bigquery",
RawConfig: map[string]interface{}{
"wrong-config": "sample-project",
}})
},
})

assert.ErrorAs(t, err, &plugins.InvalidConfigError{})
})
Expand All @@ -35,7 +35,8 @@ func TestInit(t *testing.T) {
URNScope: "test-bigquery",
RawConfig: map[string]interface{}{
"project_id": "sample-project",
}})
},
})

assert.NotEqual(t, plugins.InvalidConfigError{}, err)
})
Expand All @@ -48,7 +49,8 @@ func TestInit(t *testing.T) {
RawConfig: map[string]interface{}{
"project_id": "google-project-id",
"service_account_base64": "----", // invalid
}})
},
})

assert.ErrorContains(t, err, "failed to decode base64 service account")
})
Expand Down
1 change: 0 additions & 1 deletion plugins/extractors/bigquery/profile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
func TestBuildTableProfile(t *testing.T) {
tableURN := plugins.BigQueryURN("project1", "dataset1", "table1")
t.Run("table profile usage related fields are empty if usage collection is disabled", func(t *testing.T) {

var tableStats *auditlog.TableStats
extr := &Extractor{
config: Config{
Expand Down
2 changes: 0 additions & 2 deletions plugins/extractors/bigquery/sqlparser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
)

func TestParseSimpleJoin(t *testing.T) {

type set map[string]bool
newSet := func(values ...string) set {
s := make(set)
Expand Down Expand Up @@ -93,7 +92,6 @@ func TestParseSimpleJoin(t *testing.T) {
}

func TestParseSimpleFilter(t *testing.T) {

type set map[string]bool
newSet := func(values ...string) set {
s := make(set)
Expand Down
3 changes: 1 addition & 2 deletions plugins/extractors/bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ func (e *Extractor) getTablesInfo(ctx context.Context, emit plugins.Emit) error
Data: tableMeta,
}
emit(models.NewRecord(&asset))

}(table)
}
wg.Wait()
Expand All @@ -171,7 +170,7 @@ func createInstanceAdminClient(ctx context.Context, config Config) (*bigtable.In
return bigtable.NewInstanceAdminClient(ctx, config.ProjectID, config.clientOptions()...)
}

func (e *Extractor) createAdminClient(ctx context.Context, instance string, projectID string) (*bigtable.AdminClient, error) {
func (e *Extractor) createAdminClient(ctx context.Context, instance, projectID string) (*bigtable.AdminClient, error) {
return bigtable.NewAdminClient(ctx, projectID, instance, e.config.clientOptions()...)
}

Expand Down
12 changes: 7 additions & 5 deletions plugins/extractors/bigtable/bigtable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ import (
"context"
"testing"

"github.com/goto/meteor/test/utils"

"github.com/goto/meteor/plugins"
bt "github.com/goto/meteor/plugins/extractors/bigtable"
"github.com/goto/meteor/test/utils"
"github.com/stretchr/testify/assert"
)

Expand All @@ -24,7 +23,8 @@ func TestInit(t *testing.T) {
URNScope: urnScope,
RawConfig: map[string]interface{}{
"wrong-config": "sample-project",
}})
},
})

assert.ErrorAs(t, err, &plugins.InvalidConfigError{})
})
Expand All @@ -34,7 +34,8 @@ func TestInit(t *testing.T) {
URNScope: urnScope,
RawConfig: map[string]interface{}{
"project_id": "",
}})
},
})

assert.ErrorAs(t, err, &plugins.InvalidConfigError{})
})
Expand All @@ -48,7 +49,8 @@ func TestInit(t *testing.T) {
RawConfig: map[string]interface{}{
"project_id": "google-project-id",
"service_account_base64": "----", // invalid
}})
},
})

assert.ErrorContains(t, err, "decode Base64 encoded service account")
})
Expand Down
2 changes: 1 addition & 1 deletion plugins/extractors/caramlstore/asset_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (b featureTableBuilder) buildAsset(ft *core.FeatureTable) (*v1beta2.Asset,
}

func (b featureTableBuilder) buildLineage(ft *core.FeatureTable) (
upstreams []*v1beta2.Resource, downstreams []*v1beta2.Resource, err error,
upstreams, downstreams []*v1beta2.Resource, err error,
) {
upstreams, err = b.buildUpstreams(ft)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion plugins/extractors/caramlstore/caramlstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error {

// Extract checks if the table is valid and extracts the table schema
func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error {

projects, err := e.client.Projects(ctx)
if err != nil {
if shouldRetry(err) {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions plugins/extractors/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (e *Extractor) extractTables(keyspace string) (err error) {
}

// processTable build and push table to out channel
func (e *Extractor) processTable(keyspace string, tableName string) (err error) {
func (e *Extractor) processTable(keyspace, tableName string) (err error) {
var columns []*v1beta2.Column
columns, err = e.extractColumns(keyspace, tableName)
if err != nil {
Expand Down Expand Up @@ -180,7 +180,7 @@ func (e *Extractor) processTable(keyspace string, tableName string) (err error)
}

// extractColumns extract columns from a given table
func (e *Extractor) extractColumns(keyspace string, tableName string) (columns []*v1beta2.Column, err error) {
func (e *Extractor) extractColumns(keyspace, tableName string) (columns []*v1beta2.Column, err error) {
query := `SELECT column_name, type
FROM system_schema.columns
WHERE keyspace_name = ?
Expand Down
2 changes: 1 addition & 1 deletion plugins/extractors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (e *Extractor) extractTables(emit plugins.Emit) (err error) {
return
}

func (e *Extractor) getColumnsInfo(dbName string, tableName string) (result []*v1beta2.Column, err error) {
func (e *Extractor) getColumnsInfo(dbName, tableName string) (result []*v1beta2.Column, err error) {
sqlStr := fmt.Sprintf("DESCRIBE TABLE %s.%s", dbName, tableName)

rows, err := e.db.Query(sqlStr)
Expand Down
3 changes: 2 additions & 1 deletion plugins/extractors/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ func TestInit(t *testing.T) {
URNScope: urnScope,
RawConfig: map[string]interface{}{
"invalid_config": "invalid_config_value",
}})
},
})

assert.ErrorAs(t, err, &plugins.InvalidConfigError{})
})
Expand Down
Loading