From 1fde6a563a0a13736edf50e12ca3ef6f201a46f7 Mon Sep 17 00:00:00 2001 From: littleniannian Date: Thu, 26 Jun 2025 13:49:39 +0800 Subject: [PATCH 1/2] fix: get pipeline list filter by permission's datasource --- sqle/api/controller/v1/pipeline.go | 44 +++++++++++++++++++++++------- sqle/dms/permission.go | 9 ++++++ sqle/docs/docs.go | 14 ++++++++++ sqle/docs/swagger.json | 14 ++++++++++ sqle/docs/swagger.yaml | 10 +++++++ sqle/model/pipline.go | 7 ++++- sqle/server/pipeline/pipeline.go | 4 +-- 7 files changed, 89 insertions(+), 13 deletions(-) diff --git a/sqle/api/controller/v1/pipeline.go b/sqle/api/controller/v1/pipeline.go index 277681ca7f..e8b5bcd9b7 100644 --- a/sqle/api/controller/v1/pipeline.go +++ b/sqle/api/controller/v1/pipeline.go @@ -2,7 +2,9 @@ package v1 import ( "context" - dmsV1 "github.com/actiontech/dms/pkg/dms-common/api/dms/v1" + "fmt" + v1 "github.com/actiontech/dms/pkg/dms-common/api/dms/v1" + "github.com/actiontech/sqle/sqle/errors" "net/http" "strconv" @@ -14,17 +16,33 @@ import ( // pipelineDetail 流水线的信息详情 type pipelineDetail struct { - ID uint `json:"id"` // 流水线的唯一标识符 - NodeCount uint32 `json:"node_count"` // 节点个数 + ID uint `json:"id"` // 流水线的唯一标识符 + NodeCount uint32 `json:"node_count"` // 节点个数 + DataSources []string `json:"data_sources"` // 数据源 pipelineBase } -func (p *pipelineDetail) fillWith(pipe *pipeline.Pipeline) { +func (p *pipelineDetail) fillWith(pipe *pipeline.Pipeline) error { p.ID = pipe.ID p.NodeCount = pipe.NodeCount() p.Name = pipe.Name p.Description = pipe.Description p.Address = pipe.Address + dataSources := make([]string, 0) + for _, node := range pipe.PipelineNodes { + if node.InstanceID != 0 { + instance, exist, err := dms.GetInstancesById(context.TODO(), fmt.Sprint(node.InstanceID)) + if err != nil { + return err + } + if !exist { + return errors.NewInstanceNoExistErr() + } + dataSources = append(dataSources, instance.Name) + } + } + p.DataSources = dataSources + return nil } // pipelineBase 流水线基础信息 @@ -212,24 +230,30 @@ func GetPipelines(c echo.Context) error { } // 3. 计算分页参数 limit, offset := controller.GetLimitAndOffset(req.PageIndex, req.PageSize) - hasPermission, err := hasViewPermission(user.GetIDStr(), projectUid, dmsV1.OpPermissionViewPipeline) + userPermission, err := dms.NewUserPermission(user.GetIDStr(), projectUid) if err != nil { - return controller.JSONBaseErrorReq(c, err) + return errors.New(errors.ConnectStorageError, fmt.Errorf("check get pipelines failed: %v", err)) } userId := "" - if !hasPermission { + if !userPermission.CanViewProject() { userId = user.GetIDStr() } + rangeDatasourceIds := make([]string, 0) + viewPipelinePermission := userPermission.GetOnePermission(v1.OpPermissionViewPipeline) + if viewPipelinePermission != nil { + userId = "" + rangeDatasourceIds = viewPipelinePermission.RangeUids + } // 4. 获取存储对象并查询流水线列表 var pipelineSvc pipeline.PipelineSvc - count, pipelineList, err := pipelineSvc.GetPipelineList(limit, offset, req.FuzzySearchNameDesc, projectUid, userId) + count, pipelineList, err := pipelineSvc.GetPipelineList(limit, offset, req.FuzzySearchNameDesc, projectUid, userId, rangeDatasourceIds) if err != nil { return controller.JSONBaseErrorReq(c, err) } data := make([]pipelineDetail, len(pipelineList)) for idx, pipe := range pipelineList { - data[idx].fillWith(pipe) + err = data[idx].fillWith(pipe) } // 5. 返回成功响应 return c.JSON(http.StatusOK, &GetPipelinesResV1{ @@ -277,7 +301,7 @@ func GetPipelineDetail(c echo.Context) error { } var pipelineDetail pipelineDetail - pipelineDetail.fillWith(pipe) + err = pipelineDetail.fillWith(pipe) nodeDetails := make([]pipelineNodeDetail, len(pipe.PipelineNodes)) for i, node := range pipe.PipelineNodes { nodeDetails[i].fillWith(c.Request().Context(), node, c.Param("project_name")) diff --git a/sqle/dms/permission.go b/sqle/dms/permission.go index 6efd6e9836..ae634a0c98 100644 --- a/sqle/dms/permission.go +++ b/sqle/dms/permission.go @@ -97,6 +97,15 @@ func (p *UserPermission) HasOnePermission(opPermissionType v1.OpPermissionType) return false } +func (p *UserPermission) GetOnePermission(opPermissionType v1.OpPermissionType) *v1.OpPermissionItem { + for _, userOpPermission := range p.opPermissionItem { + if userOpPermission.OpPermissionType == opPermissionType { + return &userOpPermission + } + } + return nil +} + func (p *UserPermission) IsProjectAdmin() bool { for _, userOpPermission := range p.opPermissionItem { if userOpPermission.OpPermissionType == v1.OpPermissionTypeProjectAdmin { diff --git a/sqle/docs/docs.go b/sqle/docs/docs.go index 3b6e1c096c..30af0cda39 100644 --- a/sqle/docs/docs.go +++ b/sqle/docs/docs.go @@ -20584,6 +20584,13 @@ var doc = `{ "description": "关联流水线地址", "type": "string" }, + "data_sources": { + "description": "数据源", + "type": "array", + "items": { + "type": "string" + } + }, "description": { "description": "流水线描述", "type": "string" @@ -20609,6 +20616,13 @@ var doc = `{ "description": "关联流水线地址", "type": "string" }, + "data_sources": { + "description": "数据源", + "type": "array", + "items": { + "type": "string" + } + }, "description": { "description": "流水线描述", "type": "string" diff --git a/sqle/docs/swagger.json b/sqle/docs/swagger.json index 8f209126e9..76c83212c6 100644 --- a/sqle/docs/swagger.json +++ b/sqle/docs/swagger.json @@ -20568,6 +20568,13 @@ "description": "关联流水线地址", "type": "string" }, + "data_sources": { + "description": "数据源", + "type": "array", + "items": { + "type": "string" + } + }, "description": { "description": "流水线描述", "type": "string" @@ -20593,6 +20600,13 @@ "description": "关联流水线地址", "type": "string" }, + "data_sources": { + "description": "数据源", + "type": "array", + "items": { + "type": "string" + } + }, "description": { "description": "流水线描述", "type": "string" diff --git a/sqle/docs/swagger.yaml b/sqle/docs/swagger.yaml index fad86814ff..279f02582b 100644 --- a/sqle/docs/swagger.yaml +++ b/sqle/docs/swagger.yaml @@ -5587,6 +5587,11 @@ definitions: address: description: 关联流水线地址 type: string + data_sources: + description: 数据源 + items: + type: string + type: array description: description: 流水线描述 type: string @@ -5605,6 +5610,11 @@ definitions: address: description: 关联流水线地址 type: string + data_sources: + description: 数据源 + items: + type: string + type: array description: description: 流水线描述 type: string diff --git a/sqle/model/pipline.go b/sqle/model/pipline.go index f66d44c10f..8c16050c0f 100644 --- a/sqle/model/pipline.go +++ b/sqle/model/pipline.go @@ -101,7 +101,7 @@ func isValidAuditMethod(a string) bool { return false } -func (s *Storage) GetPipelineList(projectID ProjectUID, fuzzySearchContent string, limit, offset uint32, userId string) ([]*Pipeline, uint64, error) { +func (s *Storage) GetPipelineList(projectID ProjectUID, fuzzySearchContent string, limit, offset uint32, userId string, rangeDatasourceIds []string) ([]*Pipeline, uint64, error) { var count int64 var pipelines []*Pipeline query := s.db.Model(&Pipeline{}).Where("project_uid = ?", projectID) @@ -111,6 +111,11 @@ func (s *Storage) GetPipelineList(projectID ProjectUID, fuzzySearchContent strin if fuzzySearchContent != "" { query = query.Where("name LIKE ? OR description LIKE ?", "%"+fuzzySearchContent+"%", "%"+fuzzySearchContent+"%") } + if len(rangeDatasourceIds) > 0 { + query = query.Joins("JOIN pipeline_nodes ON pipelines.id = pipeline_nodes.pipeline_id"). + Where("pipeline_nodes.instance_id IN (?)", rangeDatasourceIds). + Group("pipelines.id") + } err := query.Count(&count).Error if err != nil { diff --git a/sqle/server/pipeline/pipeline.go b/sqle/server/pipeline/pipeline.go index 5384091061..111e087f1b 100644 --- a/sqle/server/pipeline/pipeline.go +++ b/sqle/server/pipeline/pipeline.go @@ -235,9 +235,9 @@ func (svc PipelineSvc) GetPipeline(projectUID string, pipelineID uint) (*Pipelin return svc.toPipeline(modelPipeline, modelPiplineNodes), nil } -func (svc PipelineSvc) GetPipelineList(limit, offset uint32, fuzzySearchNameDesc string, projectUID string, userId string) (count uint64, pipelines []*Pipeline, err error) { +func (svc PipelineSvc) GetPipelineList(limit, offset uint32, fuzzySearchNameDesc string, projectUID string, userId string, rangeDatasourceIds []string) (count uint64, pipelines []*Pipeline, err error) { s := model.GetStorage() - modelPipelines, count, err := s.GetPipelineList(model.ProjectUID(projectUID), fuzzySearchNameDesc, limit, offset, userId) + modelPipelines, count, err := s.GetPipelineList(model.ProjectUID(projectUID), fuzzySearchNameDesc, limit, offset, userId, rangeDatasourceIds) if err != nil { return 0, nil, err } From 75a05975669c238b3ceb1a70d097491530de9a36 Mon Sep 17 00:00:00 2001 From: littleniannian Date: Thu, 26 Jun 2025 14:07:40 +0800 Subject: [PATCH 2/2] refactor(sqle): opt for error opt for GetOnePermission --- sqle/api/controller/v1/pipeline.go | 4 +++- sqle/dms/permission.go | 6 +++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/sqle/api/controller/v1/pipeline.go b/sqle/api/controller/v1/pipeline.go index e8b5bcd9b7..9559290bd1 100644 --- a/sqle/api/controller/v1/pipeline.go +++ b/sqle/api/controller/v1/pipeline.go @@ -253,7 +253,9 @@ func GetPipelines(c echo.Context) error { data := make([]pipelineDetail, len(pipelineList)) for idx, pipe := range pipelineList { - err = data[idx].fillWith(pipe) + if err = data[idx].fillWith(pipe); err != nil { + return controller.JSONBaseErrorReq(c, err) + } } // 5. 返回成功响应 return c.JSON(http.StatusOK, &GetPipelinesResV1{ diff --git a/sqle/dms/permission.go b/sqle/dms/permission.go index ae634a0c98..8f0e412936 100644 --- a/sqle/dms/permission.go +++ b/sqle/dms/permission.go @@ -98,9 +98,9 @@ func (p *UserPermission) HasOnePermission(opPermissionType v1.OpPermissionType) } func (p *UserPermission) GetOnePermission(opPermissionType v1.OpPermissionType) *v1.OpPermissionItem { - for _, userOpPermission := range p.opPermissionItem { - if userOpPermission.OpPermissionType == opPermissionType { - return &userOpPermission + for i := range p.opPermissionItem { + if p.opPermissionItem[i].OpPermissionType == opPermissionType { + return &p.opPermissionItem[i] } } return nil