Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Fix sorting bug for named entities #394

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/repositories/gormimpl/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const Description = "description"
const ResourceType = "resource_type"
const State = "state"
const ID = "id"
const CreatedAt = "created_at"

const executionTableName = "executions"
const namedEntityMetadataTableName = "named_entity_metadata"
Expand All @@ -30,7 +31,7 @@ const taskTableName = "tasks"
const limit = "limit"
const filters = "filters"

var identifierGroupBy = fmt.Sprintf("%s, %s, %s", Project, Domain, Name)
var identifierGroupBy = fmt.Sprintf("%s, %s, %s, %s", Project, Domain, Name, CreatedAt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we always group by created at now? what if someone queries by updated_at or some other field?

Copy link
Contributor Author

@pmahindrakar-oss pmahindrakar-oss Apr 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Facing issue with group by . Let me dig deeper.

I am planning to remove the group by instead do the following queries which should be equivalent

 explain SELECT 
Distinct 
entities.project,
entities.domain,
entities.name,
'2' AS resource_type,
named_entity_metadata.description,
named_entity_metadata.state 

FROM "named_entity_metadata" 
RIGHT JOIN (
select distinct project,domain,name 
from (
        SELECT  project,domain,name, created_at 
        FROM "tasks" 
        WHERE "domain" = 'development' AND "project" = 'admintests' order by created_at
     ) t limit 2 offset 2) AS entities ON named_entity_metadata.resource_type = 2 

AND named_entity_metadata.project = entities.project
AND named_entity_metadata.domain = entities.domain 
AND named_entity_metadata.name = entities.name 

WHERE COALESCE(named_entity_metadata.state, 0) = '0' ;
                                                                                  QUERY PLAN                                                                                  
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Unique  (cost=9.82..9.84 rows=1 width=581)
   ->  Sort  (cost=9.82..9.83 rows=1 width=581)
         Sort Key: tasks.project, tasks.domain, tasks.name, named_entity_metadata.description, named_entity_metadata.state
         ->  Nested Loop Left Join  (cost=1.76..9.81 rows=1 width=581)
               Join Filter: ((named_entity_metadata.project = tasks.project) AND (named_entity_metadata.domain = tasks.domain) AND (named_entity_metadata.name = tasks.name))
               Filter: (COALESCE(named_entity_metadata.state, 0) = 0)
               ->  Limit  (cost=1.61..1.62 rows=1 width=29)
                     ->  HashAggregate  (cost=1.59..1.61 rows=2 width=29)
                           Group Key: tasks.project, tasks.domain, tasks.name
                           ->  Sort  (cost=1.41..1.44 rows=9 width=37)
                                 Sort Key: tasks.created_at
                                 ->  Seq Scan on tasks  (cost=0.00..1.27 rows=9 width=37)
                                       Filter: ((domain = 'development'::text) AND (project = 'admintests'::text))
               ->  Index Scan using named_entity_metadata_type_project_domain_name_idx on named_entity_metadata  (cost=0.14..8.16 rows=1 width=616)
                     Index Cond: (resource_type = 2)
(15 rows)

Also the query plan look more efficient

One with group by

explain SELECT 
entities.project,
entities.domain,
entities.name,
'2' AS resource_type,
named_entity_metadata.description,
named_entity_metadata.state 

FROM "named_entity_metadata" 
RIGHT JOIN (SELECT project,domain,name FROM "workflows" WHERE "domain" = 'development' AND "project" = 'admintests' GROUP BY project, domain, name, created_at ORDER BY created_at LIMIT 2 offset 2) AS entities ON named_entity_metadata.resource_type = 2 
AND named_entity_metadata.project = entities.project
 AND named_entity_metadata.domain = entities.domain 
AND named_entity_metadata.name = entities.name 

WHERE COALESCE(named_entity_metadata.state, 0) = '0' 

GROUP BY "created_at",entities.project, entities.domain, entities.name, named_entity_metadata.description, named_entity_metadata.state 

ORDER BY created_at desc;
                                                                                        QUERY PLAN                                                                                        
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Group  (cost=9.70..9.72 rows=1 width=589)
   Group Key: named_entity_metadata.created_at, workflows.project, workflows.domain, workflows.name, named_entity_metadata.description, named_entity_metadata.state
   ->  Sort  (cost=9.70..9.71 rows=1 width=557)
         Sort Key: named_entity_metadata.created_at DESC, workflows.project, workflows.domain, workflows.name, named_entity_metadata.description, named_entity_metadata.state
         ->  Nested Loop Left Join  (cost=1.58..9.69 rows=1 width=557)
               Join Filter: ((named_entity_metadata.project = workflows.project) AND (named_entity_metadata.domain = workflows.domain) AND (named_entity_metadata.name = workflows.name))
               Filter: (COALESCE(named_entity_metadata.state, 0) = 0)
               ->  Limit  (cost=1.44..1.46 rows=2 width=37)
                     ->  Group  (cost=1.41..1.53 rows=9 width=37)
                           Group Key: workflows.created_at, workflows.project, workflows.domain, workflows.name
                           ->  Sort  (cost=1.41..1.44 rows=9 width=37)
                                 Sort Key: workflows.created_at, workflows.name
                                 ->  Seq Scan on workflows  (cost=0.00..1.27 rows=9 width=37)
                                       Filter: ((domain = 'development'::text) AND (project = 'admintests'::text))
               ->  Materialize  (cost=0.14..8.17 rows=1 width=624)
                     ->  Index Scan using named_entity_metadata_type_project_domain_name_idx on named_entity_metadata  (cost=0.14..8.16 rows=1 width=624)
                           Index Cond: (resource_type = 2)

We dont need an additional sort for the same key in both inner and outer sql queries

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a note we still need something equivalent to the WHERE COALESCE(named_entity_metadata.state, 0) = '0' clause to filter by named entity state

re:

RIGHT JOIN (
select distinct project,domain,name 
from (
        SELECT  project,domain,name, created_at 
        FROM "tasks" 
        WHERE "domain" = 'development' AND "project" = 'admintests' order by created_at
     ) t limit 2 offset 2) AS entities ON named_entity_metadata.resource_type = 2 

why do we need to do the nested select? can't we right join directly on the result of SELECT DISTINCT project, domain, name, created_at


var entityToTableName = map[common.Entity]string{
common.Execution: "executions",
Expand Down
2 changes: 1 addition & 1 deletion pkg/repositories/gormimpl/launch_plan_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (r *LaunchPlanRepo) ListLaunchPlanIdentifiers(ctx context.Context, input in
// Scan the results into a list of launch plans
var launchPlans []models.LaunchPlan
timer := r.metrics.ListIdentifiersDuration.Start()
tx.Select([]string{Project, Domain, Name}).Group(identifierGroupBy).Scan(&launchPlans)
tx.Distinct([]string{Project, Domain, Name}).Group(identifierGroupBy).Scan(&launchPlans)
timer.Stop()
if tx.Error != nil {
return interfaces.LaunchPlanCollectionOutput{}, r.errorTransformer.ToFlyteAdminError(tx.Error)
Expand Down
12 changes: 6 additions & 6 deletions pkg/repositories/gormimpl/named_entity_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ import (
"context"
"fmt"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"google.golang.org/grpc/codes"

"github.com/flyteorg/flyteadmin/pkg/common"
adminErrors "github.com/flyteorg/flyteadmin/pkg/errors"
"github.com/flyteorg/flyteadmin/pkg/repositories/errors"
"github.com/flyteorg/flyteadmin/pkg/repositories/interfaces"
"github.com/flyteorg/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flytestdlib/promutils"

"google.golang.org/grpc/codes"
"gorm.io/gorm"
)

Expand Down Expand Up @@ -65,10 +65,10 @@ var resourceTypeToMetadataJoin = map[core.ResourceType]string{
core.ResourceType_TASK: leftJoinTaskNameToMetadata,
}

var getGroupByForNamedEntity = fmt.Sprintf("%s.%s, %s.%s, %s.%s, %s.%s, %s.%s",
var getGroupByForNamedEntity = fmt.Sprintf("%s.%s, %s.%s, %s.%s, %s.%s, %s.%s, %s.%s",
innerJoinTableAlias, Project, innerJoinTableAlias, Domain, innerJoinTableAlias, Name, namedEntityMetadataTableName,
Description,
namedEntityMetadataTableName, State)
namedEntityMetadataTableName, State, namedEntityMetadataTableName, CreatedAt)

func getSelectForNamedEntity(tableName string, resourceType core.ResourceType) []string {
return []string{
Expand Down Expand Up @@ -198,7 +198,7 @@ func (r *NamedEntityRepo) List(ctx context.Context, input interfaces.ListNamedEn
var entities []models.NamedEntity
timer := r.metrics.ListDuration.Start()

tx.Select(getSelectForNamedEntity(innerJoinTableAlias, input.ResourceType)).Table(namedEntityMetadataTableName).Group(getGroupByForNamedEntity).Scan(&entities)
tx.Distinct(getSelectForNamedEntity(innerJoinTableAlias, input.ResourceType)).Table(namedEntityMetadataTableName).Group(getGroupByForNamedEntity).Scan(&entities)

timer.Stop()

Expand Down
114 changes: 109 additions & 5 deletions pkg/repositories/gormimpl/named_entity_repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@ package gormimpl

import (
"context"
"fmt"
"testing"

"github.com/flyteorg/flyteadmin/pkg/common"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"

mocket "github.com/Selvatico/go-mocket"
adminErrors "github.com/flyteorg/flyteadmin/pkg/errors"
"github.com/flyteorg/flyteadmin/pkg/repositories/errors"
"github.com/flyteorg/flyteadmin/pkg/repositories/interfaces"
"github.com/flyteorg/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
mockScope "github.com/flyteorg/flytestdlib/promutils"

mocket "github.com/Selvatico/go-mocket"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
)

func getMockNamedEntityResponseFromDb(expected models.NamedEntity) map[string]interface{} {
Expand Down Expand Up @@ -155,7 +157,7 @@ func TestListNamedEntity(t *testing.T) {
mockQuery := GlobalMock.NewMock()

mockQuery.WithQuery(
`SELECT entities.project,entities.domain,entities.name,'2' AS resource_type,named_entity_metadata.description,named_entity_metadata.state FROM "named_entity_metadata" RIGHT JOIN (SELECT project,domain,name FROM "workflows" WHERE "domain" = $1 AND "project" = $2 GROUP BY project, domain, name ORDER BY name desc LIMIT 20) AS entities ON named_entity_metadata.resource_type = 2 AND named_entity_metadata.project = entities.project AND named_entity_metadata.domain = entities.domain AND named_entity_metadata.name = entities.name GROUP BY entities.project, entities.domain, entities.name, named_entity_metadata.description, named_entity_metadata.state ORDER BY name desc`).WithReply(results)
`SELECT entities.project,entities.domain,entities.name,'2' AS resource_type,named_entity_metadata.description,named_entity_metadata.state FROM "named_entity_metadata" RIGHT JOIN (SELECT project,domain,name FROM "workflows" WHERE "domain" = $1 AND "project" = $2 GROUP BY project, domain, name, created_at ORDER BY name desc LIMIT 20) AS entities ON named_entity_metadata.resource_type = 2 AND named_entity_metadata.project = entities.project AND named_entity_metadata.domain = entities.domain AND named_entity_metadata.name = entities.name GROUP BY entities.project, entities.domain, entities.name, named_entity_metadata.description, named_entity_metadata.state, named_entity_metadata.created_at ORDER BY name desc`).WithReply(results)

sortParameter, _ := common.NewSortParameter(admin.Sort{
Direction: admin.Sort_DESCENDING,
Expand All @@ -173,3 +175,105 @@ func TestListNamedEntity(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, output.Entities, 1)
}

func TestListNamedEntityTxErrorCases(t *testing.T) {
metadataRepo := NewNamedEntityRepo(GetDbForTest(t), errors.NewTestErrorTransformer(), mockScope.NewTestScope())
GlobalMock := mocket.Catcher.Reset()
GlobalMock.Logging = true
mockQuery := GlobalMock.NewMock()

mockQuery.WithQuery(
`SELECT entities.project,entities.domain,entities.name,'2' AS resource_type,named_entity_metadata.description,named_entity_metadata.state FROM "named_entity_metadata" RIGHT JOIN (SELECT project,domain,name FROM "workflows" WHERE "domain" = $1 AND "project" = $2 GROUP BY project, domain, name, created_at ORDER BY name desc LIMIT 20) AS entities ON named_entity_metadata.resource_type = 2 AND named_entity_metadata.project = entities.project AND named_entity_metadata.domain = entities.domain AND named_entity_metadata.name = entities.name GROUP BY entities.project, entities.domain, entities.name, named_entity_metadata.description, named_entity_metadata.state, named_entity_metadata.created_at ORDER BY name desc%`).WithError(fmt.Errorf("failed"))

sortParameter, _ := common.NewSortParameter(admin.Sort{
Direction: admin.Sort_DESCENDING,
Key: "name",
})
output, err := metadataRepo.List(context.Background(), interfaces.ListNamedEntityInput{
ResourceType: resourceType,
Project: "admintests",
Domain: "development",
ListResourceInput: interfaces.ListResourceInput{
Limit: 20,
SortParameter: sortParameter,
},
})
assert.Equal(t, "Test transformer failed to find transformation to apply", err.Error())
assert.Len(t, output.Entities, 0)
}

func TestListNamedEntityInputErrorCases(t *testing.T) {
type test struct {
input interfaces.ListNamedEntityInput
wantedError error
wantedLength int
}

sortParameter, _ := common.NewSortParameter(admin.Sort{
Direction: admin.Sort_DESCENDING,
Key: "name",
})

tests := []test{
{
input: interfaces.ListNamedEntityInput{
ResourceType: resourceType,
Project: "",
Domain: "development",
ListResourceInput: interfaces.ListResourceInput{
Limit: 20,
SortParameter: sortParameter,
},
},
wantedError: errors.GetInvalidInputError(Project),
wantedLength: 0,
},
{
input: interfaces.ListNamedEntityInput{
ResourceType: resourceType,
Project: "project",
Domain: "",
ListResourceInput: interfaces.ListResourceInput{
Limit: 20,
SortParameter: sortParameter,
},
},
wantedError: errors.GetInvalidInputError(Domain),
wantedLength: 0,
},
{
input: interfaces.ListNamedEntityInput{
ResourceType: resourceType,
Project: "project",
Domain: "development",
ListResourceInput: interfaces.ListResourceInput{
Limit: 0,
SortParameter: sortParameter,
},
},
wantedError: errors.GetInvalidInputError(limit),
wantedLength: 0,
},
{
input: interfaces.ListNamedEntityInput{
ResourceType: -1,
Project: "project",
Domain: "development",
ListResourceInput: interfaces.ListResourceInput{
Limit: 20,
SortParameter: sortParameter,
},
},
wantedError: adminErrors.NewFlyteAdminErrorf(codes.InvalidArgument,
"Cannot list entity names for resource type: %v", -1),
wantedLength: 0,
},
}

metadataRepo := NewNamedEntityRepo(GetDbForTest(t), errors.NewTestErrorTransformer(), mockScope.NewTestScope())
for _, tc := range tests {
output, err := metadataRepo.List(context.Background(), tc.input)
assert.Len(t, output.Entities, tc.wantedLength)
assert.Equal(t, tc.wantedError, err)
}
}
2 changes: 1 addition & 1 deletion pkg/repositories/gormimpl/task_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (r *TaskRepo) ListTaskIdentifiers(ctx context.Context, input interfaces.Lis
// Scan the results into a list of tasks
var tasks []models.Task
timer := r.metrics.ListIdentifiersDuration.Start()
tx.Select([]string{Project, Domain, Name}).Group(identifierGroupBy).Scan(&tasks)
tx.Distinct([]string{Project, Domain, Name}).Group(identifierGroupBy).Scan(&tasks)
timer.Stop()
if tx.Error != nil {
return interfaces.TaskCollectionOutput{}, r.errorTransformer.ToFlyteAdminError(tx.Error)
Expand Down
2 changes: 1 addition & 1 deletion pkg/repositories/gormimpl/workflow_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (r *WorkflowRepo) ListIdentifiers(ctx context.Context, input interfaces.Lis
// Scan the results into a list of workflows
var workflows []models.Workflow
timer := r.metrics.ListIdentifiersDuration.Start()
tx.Select([]string{Project, Domain, Name}).Group(identifierGroupBy).Scan(&workflows)
tx.Distinct([]string{Project, Domain, Name}).Group(identifierGroupBy).Scan(&workflows)
timer.Stop()
if tx.Error != nil {
return interfaces.WorkflowCollectionOutput{}, r.errorTransformer.ToFlyteAdminError(tx.Error)
Expand Down