Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions internal/apiserver/middleware/operation_record.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package middleware

import (
"strings"

"github.com/actiontech/dms/pkg/dms-common/i18nPkg"
"github.com/labstack/echo/v4"
)

type ApiInterfaceInfo struct {
RouterPath string
Method string
OperationType string
OperationAction string
GetProjectAndContentFunc func(c echo.Context, dms interface{}) (projectName string, content i18nPkg.I18nStr, err error)
}

var ApiInterfaceInfoList []ApiInterfaceInfo

func pathMatch(pattern, path string) bool {
ps := strings.Split(strings.Trim(pattern, "/"), "/")
pathSegs := strings.Split(strings.Trim(path, "/"), "/")
if len(ps) != len(pathSegs) {
return false
}
for i := range ps {
if len(ps[i]) > 0 && ps[i][0] == ':' {
continue
}
if ps[i] != pathSegs[i] {
return false
}
}
return true
}
14 changes: 14 additions & 0 deletions internal/apiserver/middleware/operation_record_ce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
//go:build !enterprise

package middleware

import (
"github.com/actiontech/dms/internal/dms/service"
"github.com/labstack/echo/v4"
)

func OperationRecordMiddleware(_ *service.DMSService) echo.MiddlewareFunc {
return func(next echo.HandlerFunc) echo.HandlerFunc {
return next
}
}
2 changes: 2 additions & 0 deletions internal/apiserver/service/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,8 @@ func (s *APIServer) installMiddleware() error {
i18nPkg.GetLangByAcceptLanguage,
))

s.echo.Use(dmsMiddleware.OperationRecordMiddleware(s.DMSController.DMS))

return nil
}

Expand Down
33 changes: 18 additions & 15 deletions internal/dms/biz/cron_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,27 @@ import (
)

type CronTaskUsecase struct {
log *utilLog.Helper
cronTask *cronTask
workflowUsecase *DataExportWorkflowUsecase
cbOperationLogUsecase *CbOperationLogUsecase
licenseUsecase *LicenseUsecase
oauth2SessionUsecase *OAuth2SessionUsecase
log *utilLog.Helper
cronTask *cronTask
workflowUsecase *DataExportWorkflowUsecase
cbOperationLogUsecase *CbOperationLogUsecase
operationRecordUsecase *OperationRecordUsecase
licenseUsecase *LicenseUsecase
oauth2SessionUsecase *OAuth2SessionUsecase
}
type cronTask struct {
cron *cron.Cron
}

func NewCronTaskUsecase(log utilLog.Logger, wu *DataExportWorkflowUsecase, cu *CbOperationLogUsecase, os *OAuth2SessionUsecase) *CronTaskUsecase {
func NewCronTaskUsecase(log utilLog.Logger, wu *DataExportWorkflowUsecase, cu *CbOperationLogUsecase, oru *OperationRecordUsecase, os *OAuth2SessionUsecase) *CronTaskUsecase {
ctu := &CronTaskUsecase{
log: utilLog.NewHelper(log, utilLog.WithMessageKey("biz.cronTask")),
cronTask: &cronTask{
cron: cron.New(),
},
workflowUsecase: wu,
cbOperationLogUsecase: cu,
oauth2SessionUsecase: os,
log: utilLog.NewHelper(log, utilLog.WithMessageKey("biz.cronTask")),
cronTask: &cronTask{cron: cron.New()},
workflowUsecase: wu,
cbOperationLogUsecase: cu,
operationRecordUsecase: oru,
oauth2SessionUsecase: os,
}

return ctu
}

Expand All @@ -48,6 +47,10 @@ func (ctu *CronTaskUsecase) InitialTask() error {
return err
}

if _, err := ctu.cronTask.cron.AddFunc("@hourly", ctu.operationRecordUsecase.DoClean); err != nil {
return err
}

if _, err := ctu.cronTask.cron.AddFunc("@hourly", ctu.oauth2SessionUsecase.DeleteExpiredSessions); err != nil {
return err
}
Expand Down
66 changes: 59 additions & 7 deletions internal/dms/biz/operation_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package biz

import (
"context"
"strconv"
"time"

"github.com/actiontech/dms/pkg/dms-common/i18nPkg"
Expand All @@ -12,6 +13,7 @@ type OperationRecordRepo interface {
SaveOperationRecord(ctx context.Context, record *OperationRecord) error
ListOperationRecords(ctx context.Context, opt *ListOperationRecordOption) ([]*OperationRecord, uint64, error)
ExportOperationRecords(ctx context.Context, opt *ListOperationRecordOption) ([]*OperationRecord, error)
CleanOperationRecordOpTimeBefore(ctx context.Context, t time.Time) (rowsAffected int64, err error)
}

type OperationRecord struct {
Expand All @@ -37,18 +39,68 @@ type ListOperationRecordOption struct {
FilterOperateTypeName string
FilterOperateAction string
// 权限相关字段
CanViewGlobal bool // 是否有全局查看权限(admin/sys/全局权限)
AccessibleProjectNames []string // 可访问的项目名称列表(项目管理员)
CanViewGlobal bool // 是否有全局查看权限(admin/sys/全局权限)
AccessibleProjectNames []string // 可访问的项目名称列表(项目管理员)
}

type OperationRecordUsecase struct {
repo OperationRecordRepo
log *utilLog.Helper
repo OperationRecordRepo
systemVariableUsecase *SystemVariableUsecase
log *utilLog.Helper
}

func NewOperationRecordUsecase(logger utilLog.Logger, repo OperationRecordRepo) *OperationRecordUsecase {
func NewOperationRecordUsecase(logger utilLog.Logger, repo OperationRecordRepo, svu *SystemVariableUsecase) *OperationRecordUsecase {
return &OperationRecordUsecase{
repo: repo,
log: utilLog.NewHelper(logger, utilLog.WithMessageKey("biz.operationRecord")),
repo: repo,
systemVariableUsecase: svu,
log: utilLog.NewHelper(logger, utilLog.WithMessageKey("biz.operationRecord")),
}
}

func (u *OperationRecordUsecase) DoClean() {
if u.systemVariableUsecase == nil {
u.log.Errorf("failed to clean operation record when get systemVariableUsecase")
return
}

ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

variables, err := u.systemVariableUsecase.GetSystemVariables(ctx)
if err != nil {
u.log.Errorf("failed to clean operation record when get expired duration: %v", err)
return
}

operationRecordExpiredHoursVar, ok := variables[SystemVariableOperationRecordExpiredHours]
if !ok {
u.log.Debugf("system variable %s not found, using default value", SystemVariableOperationRecordExpiredHours)
operationRecordExpiredHoursVar = SystemVariable{
Key: SystemVariableOperationRecordExpiredHours,
Value: strconv.Itoa(DefaultOperationRecordExpiredHours),
}
}

operationRecordExpiredHours, err := strconv.Atoi(operationRecordExpiredHoursVar.Value)
if err != nil {
u.log.Errorf("failed to parse operation_record_expired_hours value: %v", err)
return
}

if operationRecordExpiredHours <= 0 {
u.log.Errorf("got OperationRecordExpiredHours: %d", operationRecordExpiredHours)
return
}

cleanTime := time.Now().Add(time.Duration(-operationRecordExpiredHours) * time.Hour)
rowsAffected, err := u.repo.CleanOperationRecordOpTimeBefore(ctx, cleanTime)
if err != nil {
u.log.Errorf("failed to clean operation record: %v", err)
return
}
u.log.Infof("OperationRecord regular cleaned rows: %d operation time before: %s", rowsAffected, cleanTime.Format("2006-01-02 15:04:05"))
}

func (u *OperationRecordUsecase) GetLog() *utilLog.Helper {
return u.log
}
4 changes: 2 additions & 2 deletions internal/dms/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func NewAndInitDMSService(logger utilLog.Logger, opts *conf.DMSOptions) (*DMSSer
swaggerUseCase := biz.NewSwaggerUseCase(logger, dmsProxyUsecase)
systemVariableUsecase := biz.NewSystemVariableUsecase(logger, storage.NewSystemVariableRepo(logger, st))
operationRecordRepo := storage.NewOperationRecordRepo(logger, st)
operationRecordUsecase := biz.NewOperationRecordUsecase(logger, operationRecordRepo)
operationRecordUsecase := biz.NewOperationRecordUsecase(logger, operationRecordRepo, systemVariableUsecase)
cbOperationRepo := storage.NewCbOperationLogRepo(logger, st)
CbOperationLogUsecase := biz.NewCbOperationLogUsecase(logger, cbOperationRepo, opPermissionVerifyUsecase, dmsProxyTargetRepo, systemVariableUsecase)
workflowRepo := storage.NewWorkflowRepo(logger, st)
Expand All @@ -156,7 +156,7 @@ func NewAndInitDMSService(logger utilLog.Logger, opts *conf.DMSOptions) (*DMSSer
}
dataMaskingUsecase := biz.NewMaskingUsecase(logger, dataMasking)

cronTask := biz.NewCronTaskUsecase(logger, DataExportWorkflowUsecase, CbOperationLogUsecase, oauth2SessionUsecase)
cronTask := biz.NewCronTaskUsecase(logger, DataExportWorkflowUsecase, CbOperationLogUsecase, operationRecordUsecase, oauth2SessionUsecase)
err = cronTask.InitialTask()
if err != nil {
return nil, fmt.Errorf("failed to new cron task: %v", err)
Expand Down
13 changes: 13 additions & 0 deletions internal/dms/storage/operation_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package storage
import (
"context"
"fmt"
"time"

"github.com/actiontech/dms/internal/dms/biz"
"github.com/actiontech/dms/internal/dms/storage/model"
Expand Down Expand Up @@ -136,6 +137,18 @@ func (d *operationRecordRepo) ExportOperationRecords(ctx context.Context, opt *b
return ret, nil
}

func (d *operationRecordRepo) CleanOperationRecordOpTimeBefore(ctx context.Context, t time.Time) (rowsAffected int64, err error) {
err = transaction(d.log, ctx, d.db, func(tx *gorm.DB) error {
result := tx.WithContext(ctx).Unscoped().Delete(&model.OperationRecord{}, "operation_time < ?", t)
if err := result.Error; err != nil {
return err
}
rowsAffected = result.RowsAffected
return nil
})
return
}

func convertBizOperationRecord(src *biz.OperationRecord) *model.OperationRecord {
return &model.OperationRecord{
ID: src.ID,
Expand Down
36 changes: 35 additions & 1 deletion internal/pkg/locale/active.en.toml
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ NameRoleDevEngineer = "Developer"
NameRoleDevManager = "Development manager"
NameRoleOpsEngineer = "Operation engineer"
NameRoleProjectAdmin = "Project admin"
NotifyDataWorkflowBodyApprovalReminder = "⏰ The export workflow has been approved. Please complete the export within 1 day, otherwise it will expire and cannot be executed"
NotifyDataWorkflowBodyConfigUrl = "Please add a global URL in the system settings - global configuration"
NotifyDataWorkflowBodyHead = "\n📋 Data Export Workflow Topic: %v\n📍 ProjectName: %v\n🆔 Workflow ID: %v\n📝 Workflow Description: %v\n👤 Applicant: %v\n⏰ Creation Time: %v"
NotifyDataWorkflowBodyInstanceAndSchema = "🗄️ Data Source: %v\n📊 Schema: %v"
Expand All @@ -138,7 +139,6 @@ NotifyDataWorkflowBodyReason = "❌ Rejection Reason: %v"
NotifyDataWorkflowBodyReport = "⭐ Data Export Workflow Audit Score: %v"
NotifyDataWorkflowBodyStartEnd = "▶️ Execute Start Time: %v\n◀️ Execute End Time: %v"
NotifyDataWorkflowBodyWorkFlowErr = "⚠️ Failed to read data export workflow task content, please check the workflow status through the SQLE interface"
NotifyDataWorkflowBodyApprovalReminder = "⏰ The export workflow has been approved. Please complete the export within 1 day, otherwise it will expire and cannot be executed"
OAuth2AutoCreateUserErr = "Failed to automatically create user: %v"
OAuth2AutoCreateUserWithoutDefaultPwdErr = "Failed to automatically create user: default password not configured"
OAuth2BackendLogoutFailed = "; Failed to log out of third-party platform session: %v"
Expand All @@ -155,6 +155,40 @@ OAuth2SyncSessionErr = "Failed to synchronize OAuth2 session: %v"
OAuth2UserNotBoundAndDisableManuallyBindErr = "No user associated with %q was found and manual binding is disabled; please contact the system administrator"
OAuth2UserNotBoundAndNoPermErr = "This OAuth2 user is not bound and has no login permissions"
OAuth2UserStatIsDisableErr = "User %q is disabled"
OpRecordConfigFeishu = "Update Feishu configuration"
OpRecordConfigLDAP = "Update LDAP configuration"
OpRecordConfigLogin = "Update login configuration"
OpRecordConfigOAuth2 = "Update OAuth2 configuration"
OpRecordConfigSMTP = "Update SMTP configuration"
OpRecordConfigSms = "Update SMS configuration"
OpRecordConfigSystemVariables = "Update system variables configuration"
OpRecordConfigWebhook = "Update Webhook configuration"
OpRecordConfigWechat = "Update WeChat Work configuration"
OpRecordDBServiceCreate = "Create data source"
OpRecordDBServiceCreateWithName = "Create data source %s"
OpRecordDBServiceDelete = "Delete data source %s"
OpRecordDBServiceImport = "Import data sources"
OpRecordDBServiceUpdate = "Update data source %s"
OpRecordDataExportApproveWithName = "Approve data export workflow %s"
OpRecordDataExportCancelWithName = "Cancel data export workflow %s"
OpRecordDataExportCreate = "Create data export workflow"
OpRecordDataExportCreateWithName = "Create data export workflow %s"
OpRecordDataExportExportWithName = "Execute data export %s"
OpRecordDataExportRejectWithName = "Reject data export workflow %s"
OpRecordMemberCreate = "Add member"
OpRecordMemberCreateWithName = "Add member %s"
OpRecordMemberDelete = "Delete member %s"
OpRecordMemberUpdate = "Update member %s"
OpRecordProjectArchive = "Archive project %s"
OpRecordProjectCreate = "Create project"
OpRecordProjectCreateWithName = "Create project %s"
OpRecordProjectDelete = "Delete project %s"
OpRecordProjectUnarchive = "Unarchive project %s"
OpRecordProjectUpdate = "Update project %s"
OpRecordUserCreate = "Create user"
OpRecordUserCreateWithName = "Create user %s"
OpRecordUserDelete = "Delete user %s"
OpRecordUserUpdate = "Update user %s"
ProjectAvailable = "Available"
ProjectBusiness = "Available business"
ProjectCreateTime = "Create time"
Expand Down
36 changes: 35 additions & 1 deletion internal/pkg/locale/active.zh.toml
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ NameRoleDevEngineer = "开发工程师"
NameRoleDevManager = "开发主管"
NameRoleOpsEngineer = "运维工程师"
NameRoleProjectAdmin = "项目管理员"
NotifyDataWorkflowBodyApprovalReminder = "⏰ 导出工单已审批通过,请在1天内完成导出,过期后将无法执行"
NotifyDataWorkflowBodyConfigUrl = "请在系统设置-全局配置中补充全局url"
NotifyDataWorkflowBodyHead = "\n📋 数据导出工单主题: %v\n📍 所属项目: %v\n🆔 数据导出工单ID: %v\n📝 数据导出工单描述: %v\n👤 申请人: %v\n⏰ 创建时间: %v\n"
NotifyDataWorkflowBodyInstanceAndSchema = "🗄️ 数据源: %v\n📊 schema: %v\n"
Expand All @@ -138,7 +139,6 @@ NotifyDataWorkflowBodyReason = "❌ 驳回原因: %v"
NotifyDataWorkflowBodyReport = "⭐ 数据导出工单审核得分: %v"
NotifyDataWorkflowBodyStartEnd = "▶️ 数据导出开始时间: %v\n◀️ 数据导出结束时间: %v"
NotifyDataWorkflowBodyWorkFlowErr = "❌ 读取工单任务内容失败,请通过SQLE界面确认工单状态"
NotifyDataWorkflowBodyApprovalReminder = "⏰ 导出工单已审批通过,请在1天内完成导出,过期后将无法执行"
OAuth2AutoCreateUserErr = "自动创建用户失败: %v"
OAuth2AutoCreateUserWithoutDefaultPwdErr = "自动创建用户失败,默认密码未配置"
OAuth2BackendLogoutFailed = ";注销第三方平台会话失败: %v"
Expand All @@ -155,6 +155,40 @@ OAuth2SyncSessionErr = "同步OAuth2会话失败: %v"
OAuth2UserNotBoundAndDisableManuallyBindErr = "未查询到 %q 关联的用户且关闭了手动绑定功能,请联系系统管理员"
OAuth2UserNotBoundAndNoPermErr = "该OAuth2用户未绑定且没有登陆权限"
OAuth2UserStatIsDisableErr = "用户 %q 被禁用"
OpRecordConfigFeishu = "更新飞书配置"
OpRecordConfigLDAP = "更新LDAP配置"
OpRecordConfigLogin = "更新登录配置"
OpRecordConfigOAuth2 = "更新OAuth2配置"
OpRecordConfigSMTP = "更新SMTP配置"
OpRecordConfigSms = "更新短信配置"
OpRecordConfigSystemVariables = "更新系统变量配置"
OpRecordConfigWebhook = "更新Webhook配置"
OpRecordConfigWechat = "更新企业微信配置"
OpRecordDBServiceCreate = "创建数据源"
OpRecordDBServiceCreateWithName = "创建数据源 %s"
OpRecordDBServiceDelete = "删除数据源 %s"
OpRecordDBServiceImport = "导入数据源"
OpRecordDBServiceUpdate = "更新数据源 %s"
OpRecordDataExportApproveWithName = "审批通过数据导出工单 %s"
OpRecordDataExportCancelWithName = "取消数据导出工单 %s"
OpRecordDataExportCreate = "创建数据导出工单"
OpRecordDataExportCreateWithName = "创建数据导出工单 %s"
OpRecordDataExportExportWithName = "执行数据导出 %s"
OpRecordDataExportRejectWithName = "驳回数据导出工单 %s"
OpRecordMemberCreate = "添加成员"
OpRecordMemberCreateWithName = "添加成员 %s"
OpRecordMemberDelete = "删除成员 %s"
OpRecordMemberUpdate = "更新成员 %s"
OpRecordProjectArchive = "归档项目 %s"
OpRecordProjectCreate = "创建项目"
OpRecordProjectCreateWithName = "创建项目 %s"
OpRecordProjectDelete = "删除项目 %s"
OpRecordProjectUnarchive = "取消归档项目 %s"
OpRecordProjectUpdate = "更新项目 %s"
OpRecordUserCreate = "创建用户"
OpRecordUserCreateWithName = "创建用户 %s"
OpRecordUserDelete = "删除用户 %s"
OpRecordUserUpdate = "更新用户 %s"
ProjectAvailable = "可用"
ProjectBusiness = "所属业务"
ProjectCreateTime = "创建时间"
Expand Down
Loading