Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 36 additions & 10 deletions sqle/api/controller/v1/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package v1

import (
"context"
dmsV1 "github.com/actiontech/dms/pkg/dms-common/api/dms/v1"
"fmt"
v1 "github.com/actiontech/dms/pkg/dms-common/api/dms/v1"
"github.com/actiontech/sqle/sqle/errors"
"net/http"
"strconv"

Expand All @@ -14,17 +16,33 @@ import (

// pipelineDetail 流水线的信息详情
type pipelineDetail struct {
ID uint `json:"id"` // 流水线的唯一标识符
NodeCount uint32 `json:"node_count"` // 节点个数
ID uint `json:"id"` // 流水线的唯一标识符
NodeCount uint32 `json:"node_count"` // 节点个数
DataSources []string `json:"data_sources"` // 数据源
pipelineBase
}

func (p *pipelineDetail) fillWith(pipe *pipeline.Pipeline) {
func (p *pipelineDetail) fillWith(pipe *pipeline.Pipeline) error {
p.ID = pipe.ID
p.NodeCount = pipe.NodeCount()
p.Name = pipe.Name
p.Description = pipe.Description
p.Address = pipe.Address
dataSources := make([]string, 0)
for _, node := range pipe.PipelineNodes {
if node.InstanceID != 0 {
instance, exist, err := dms.GetInstancesById(context.TODO(), fmt.Sprint(node.InstanceID))
if err != nil {
return err
}
if !exist {
return errors.NewInstanceNoExistErr()
}
dataSources = append(dataSources, instance.Name)
}
}
p.DataSources = dataSources
return nil
}

// pipelineBase 流水线基础信息
Expand Down Expand Up @@ -212,24 +230,32 @@ func GetPipelines(c echo.Context) error {
}
// 3. 计算分页参数
limit, offset := controller.GetLimitAndOffset(req.PageIndex, req.PageSize)
hasPermission, err := hasViewPermission(user.GetIDStr(), projectUid, dmsV1.OpPermissionViewPipeline)
userPermission, err := dms.NewUserPermission(user.GetIDStr(), projectUid)
if err != nil {
return controller.JSONBaseErrorReq(c, err)
return errors.New(errors.ConnectStorageError, fmt.Errorf("check get pipelines failed: %v", err))
}
userId := ""
if !hasPermission {
if !userPermission.CanViewProject() {
userId = user.GetIDStr()
}
rangeDatasourceIds := make([]string, 0)
viewPipelinePermission := userPermission.GetOnePermission(v1.OpPermissionViewPipeline)
if viewPipelinePermission != nil {
userId = ""
rangeDatasourceIds = viewPipelinePermission.RangeUids
}
// 4. 获取存储对象并查询流水线列表
var pipelineSvc pipeline.PipelineSvc
count, pipelineList, err := pipelineSvc.GetPipelineList(limit, offset, req.FuzzySearchNameDesc, projectUid, userId)
count, pipelineList, err := pipelineSvc.GetPipelineList(limit, offset, req.FuzzySearchNameDesc, projectUid, userId, rangeDatasourceIds)
if err != nil {
return controller.JSONBaseErrorReq(c, err)
}

data := make([]pipelineDetail, len(pipelineList))
for idx, pipe := range pipelineList {
data[idx].fillWith(pipe)
if err = data[idx].fillWith(pipe); err != nil {
return controller.JSONBaseErrorReq(c, err)
}
}
// 5. 返回成功响应
return c.JSON(http.StatusOK, &GetPipelinesResV1{
Expand Down Expand Up @@ -277,7 +303,7 @@ func GetPipelineDetail(c echo.Context) error {
}

var pipelineDetail pipelineDetail
pipelineDetail.fillWith(pipe)
err = pipelineDetail.fillWith(pipe)
nodeDetails := make([]pipelineNodeDetail, len(pipe.PipelineNodes))
for i, node := range pipe.PipelineNodes {
nodeDetails[i].fillWith(c.Request().Context(), node, c.Param("project_name"))
Expand Down
9 changes: 9 additions & 0 deletions sqle/dms/permission.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,15 @@ func (p *UserPermission) HasOnePermission(opPermissionType v1.OpPermissionType)
return false
}

func (p *UserPermission) GetOnePermission(opPermissionType v1.OpPermissionType) *v1.OpPermissionItem {
for i := range p.opPermissionItem {
if p.opPermissionItem[i].OpPermissionType == opPermissionType {
return &p.opPermissionItem[i]
}
}
return nil
}

func (p *UserPermission) IsProjectAdmin() bool {
for _, userOpPermission := range p.opPermissionItem {
if userOpPermission.OpPermissionType == v1.OpPermissionTypeProjectAdmin {
Expand Down
14 changes: 14 additions & 0 deletions sqle/docs/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20584,6 +20584,13 @@ var doc = `{
"description": "关联流水线地址",
"type": "string"
},
"data_sources": {
"description": "数据源",
"type": "array",
"items": {
"type": "string"
}
},
"description": {
"description": "流水线描述",
"type": "string"
Expand All @@ -20609,6 +20616,13 @@ var doc = `{
"description": "关联流水线地址",
"type": "string"
},
"data_sources": {
"description": "数据源",
"type": "array",
"items": {
"type": "string"
}
},
"description": {
"description": "流水线描述",
"type": "string"
Expand Down
14 changes: 14 additions & 0 deletions sqle/docs/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -20568,6 +20568,13 @@
"description": "关联流水线地址",
"type": "string"
},
"data_sources": {
"description": "数据源",
"type": "array",
"items": {
"type": "string"
}
},
"description": {
"description": "流水线描述",
"type": "string"
Expand All @@ -20593,6 +20600,13 @@
"description": "关联流水线地址",
"type": "string"
},
"data_sources": {
"description": "数据源",
"type": "array",
"items": {
"type": "string"
}
},
"description": {
"description": "流水线描述",
"type": "string"
Expand Down
10 changes: 10 additions & 0 deletions sqle/docs/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5587,6 +5587,11 @@ definitions:
address:
description: 关联流水线地址
type: string
data_sources:
description: 数据源
items:
type: string
type: array
description:
description: 流水线描述
type: string
Expand All @@ -5605,6 +5610,11 @@ definitions:
address:
description: 关联流水线地址
type: string
data_sources:
description: 数据源
items:
type: string
type: array
description:
description: 流水线描述
type: string
Expand Down
7 changes: 6 additions & 1 deletion sqle/model/pipline.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func isValidAuditMethod(a string) bool {
return false
}

func (s *Storage) GetPipelineList(projectID ProjectUID, fuzzySearchContent string, limit, offset uint32, userId string) ([]*Pipeline, uint64, error) {
func (s *Storage) GetPipelineList(projectID ProjectUID, fuzzySearchContent string, limit, offset uint32, userId string, rangeDatasourceIds []string) ([]*Pipeline, uint64, error) {
var count int64
var pipelines []*Pipeline
query := s.db.Model(&Pipeline{}).Where("project_uid = ?", projectID)
Expand All @@ -111,6 +111,11 @@ func (s *Storage) GetPipelineList(projectID ProjectUID, fuzzySearchContent strin
if fuzzySearchContent != "" {
query = query.Where("name LIKE ? OR description LIKE ?", "%"+fuzzySearchContent+"%", "%"+fuzzySearchContent+"%")
}
if len(rangeDatasourceIds) > 0 {
query = query.Joins("JOIN pipeline_nodes ON pipelines.id = pipeline_nodes.pipeline_id").
Where("pipeline_nodes.instance_id IN (?)", rangeDatasourceIds).
Group("pipelines.id")
}

err := query.Count(&count).Error
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions sqle/server/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,9 @@ func (svc PipelineSvc) GetPipeline(projectUID string, pipelineID uint) (*Pipelin
return svc.toPipeline(modelPipeline, modelPiplineNodes), nil
}

func (svc PipelineSvc) GetPipelineList(limit, offset uint32, fuzzySearchNameDesc string, projectUID string, userId string) (count uint64, pipelines []*Pipeline, err error) {
func (svc PipelineSvc) GetPipelineList(limit, offset uint32, fuzzySearchNameDesc string, projectUID string, userId string, rangeDatasourceIds []string) (count uint64, pipelines []*Pipeline, err error) {
s := model.GetStorage()
modelPipelines, count, err := s.GetPipelineList(model.ProjectUID(projectUID), fuzzySearchNameDesc, limit, offset, userId)
modelPipelines, count, err := s.GetPipelineList(model.ProjectUID(projectUID), fuzzySearchNameDesc, limit, offset, userId, rangeDatasourceIds)
if err != nil {
return 0, nil, err
}
Expand Down
Loading