Skip to content

Commit

Permalink
fix(discovery): ignore unavailable indices for delete by query
Browse files Browse the repository at this point in the history
If there are closed indices with the 'universe' alias, unless we add the
option to ignore unavailable indices, the delete operation fails with an
error.

Add the ignore unavailable option for delete by ID/URN in discovery
repository so that closed indices are ignored.
  • Loading branch information
sudo-suhas committed Apr 18, 2023
1 parent 7c2dde5 commit 0c4a9b6
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 3 deletions.
8 changes: 5 additions & 3 deletions internal/store/elasticsearch/discovery_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,13 @@ func (repo *DiscoveryRepository) DeleteByURN(ctx context.Context, assetURN strin
}

func (repo *DiscoveryRepository) deleteWithQuery(ctx context.Context, qry string) error {
res, err := repo.cli.client.DeleteByQuery(
deleteByQ := repo.cli.client.DeleteByQuery
res, err := deleteByQ(
[]string{defaultSearchIndex},
strings.NewReader(qry),
repo.cli.client.DeleteByQuery.WithContext(ctx),
repo.cli.client.DeleteByQuery.WithRefresh(true),
deleteByQ.WithContext(ctx),
deleteByQ.WithRefresh(true),
deleteByQ.WithIgnoreUnavailable(true),
)
if err != nil {
return asset.DiscoveryError{
Expand Down
76 changes: 76 additions & 0 deletions internal/store/elasticsearch/discovery_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func TestDiscoveryRepositoryDeleteByID(t *testing.T) {
var (
ctx = context.Background()
bigqueryService = "bigquery-test"
kafkaService = "kafka-test"
)

t.Run("should return error if id empty", func(t *testing.T) {
Expand Down Expand Up @@ -261,12 +262,50 @@ func TestDiscoveryRepositoryDeleteByID(t *testing.T) {

assert.Equal(t, int64(0), body.Hits.Total.Value)
})

t.Run("should ignore unavailable indices", func(t *testing.T) {
ast1 := asset.Asset{
ID: "id1",
Type: asset.TypeTable,
Service: bigqueryService,
URN: "urn1",
}
ast2 := asset.Asset{
ID: "id2",
Type: asset.TypeTopic,
Service: kafkaService,
URN: "urn2",
}
cli, err := esTestServer.NewClient()
require.NoError(t, err)
esClient, err := store.NewClient(
log.NewNoop(),
store.Config{},
store.WithClient(cli),
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient)

err = repo.Upsert(ctx, ast1)
require.NoError(t, err)

err = repo.Upsert(ctx, ast2)
require.NoError(t, err)

_, err = cli.Indices.Close([]string{kafkaService})
require.NoError(t, err)

err = repo.DeleteByID(ctx, ast1.ID)
assert.NoError(t, err)
})
}

func TestDiscoveryRepositoryDeleteByURN(t *testing.T) {
var (
ctx = context.Background()
bigqueryService = "bigquery-test"
kafkaService = "kafka-test"
)

cli, err := esTestServer.NewClient()
Expand Down Expand Up @@ -314,4 +353,41 @@ func TestDiscoveryRepositoryDeleteByURN(t *testing.T) {

assert.Equal(t, int64(0), body.Hits.Total.Value)
})

t.Run("should ignore unavailable indices", func(t *testing.T) {
ast1 := asset.Asset{
ID: "id1",
Type: asset.TypeTable,
Service: bigqueryService,
URN: "urn1",
}
ast2 := asset.Asset{
ID: "id2",
Type: asset.TypeTopic,
Service: kafkaService,
URN: "urn2",
}
cli, err := esTestServer.NewClient()
require.NoError(t, err)
esClient, err := store.NewClient(
log.NewNoop(),
store.Config{},
store.WithClient(cli),
)
require.NoError(t, err)

repo := store.NewDiscoveryRepository(esClient)

err = repo.Upsert(ctx, ast1)
require.NoError(t, err)

err = repo.Upsert(ctx, ast2)
require.NoError(t, err)

_, err = cli.Indices.Close([]string{kafkaService})
require.NoError(t, err)

err = repo.DeleteByURN(ctx, ast1.URN)
assert.NoError(t, err)
})
}

0 comments on commit 0c4a9b6

Please sign in to comment.