Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sqle/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type SeviceOpts struct {
PluginPath string `yaml:"plugin_path"`
Database Database `yaml:"database"`
PluginConfig []PluginConfig `yaml:"plugin_config"`
PprofPort int `yaml:"pprof_port"` // pprof 独立服务器端口,0 表示禁用
}

type Database struct {
Expand Down
135 changes: 135 additions & 0 deletions sqle/pprof/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package pprof

import (
"fmt"
"os"
"path/filepath"
"runtime"
"runtime/pprof"
"time"

"github.com/actiontech/sqle/sqle/log"
)

const (
pprofDirName = "pprof"
)

// CollectHeapProfile 采集堆内存 profile 并保存到文件
func CollectHeapProfile(logPath string) error {
return collectProfile("heap", logPath, func(f *os.File) error {
runtime.GC()
return pprof.WriteHeapProfile(f)
})
}

// CollectGoroutineProfile 采集 goroutine profile 并保存到文件
func CollectGoroutineProfile(logPath string) error {
return collectProfile("goroutine", logPath, func(f *os.File) error {
return pprof.Lookup("goroutine").WriteTo(f, 0)
})
}

// CollectAllocsProfile 采集内存分配 profile 并保存到文件
func CollectAllocsProfile(logPath string) error {
return collectProfile("allocs", logPath, func(f *os.File) error {
return pprof.Lookup("allocs").WriteTo(f, 0)
})
}

// CollectBlockProfile 采集阻塞 profile 并保存到文件
func CollectBlockProfile(logPath string) error {
return collectProfile("block", logPath, func(f *os.File) error {
return pprof.Lookup("block").WriteTo(f, 0)
})
}

// CollectMutexProfile 采集互斥锁 profile 并保存到文件
func CollectMutexProfile(logPath string) error {
return collectProfile("mutex", logPath, func(f *os.File) error {
return pprof.Lookup("mutex").WriteTo(f, 0)
})
}

// CollectCPUProfile 采集 CPU profile 并保存到文件(持续指定秒数)
func CollectCPUProfile(logPath string, duration time.Duration) error {
return collectProfile("cpu", logPath, func(f *os.File) error {
if err := pprof.StartCPUProfile(f); err != nil {
return err
}
time.Sleep(duration)
pprof.StopCPUProfile()
return nil
})
}

// collectProfile 通用的 profile 采集函数
func collectProfile(profileType, logPath string, writeFunc func(*os.File) error) error {
pprofDir := filepath.Join(logPath, pprofDirName)
if err := os.MkdirAll(pprofDir, 0755); err != nil {
return fmt.Errorf("failed to create pprof directory: %v", err)
}

timestamp := time.Now().Format("20060102_150405")
filename := fmt.Sprintf("%s_%s.prof", profileType, timestamp)
filePath := filepath.Join(pprofDir, filename)

f, err := os.Create(filePath)
if err != nil {
return fmt.Errorf("failed to create profile file: %v", err)
}
defer f.Close()

if err := writeFunc(f); err != nil {
return fmt.Errorf("failed to write profile: %v", err)
}

log.Logger().Infof("pprof %s profile saved to: %s", profileType, filePath)
return nil
}

// CollectAllProfiles 采集所有类型的 profile(除了 CPU,因为 CPU 需要持续时间)
func CollectAllProfiles(logPath string) error {
profiles := []struct {
name string
fn func(string) error
}{
{"heap", CollectHeapProfile},
{"goroutine", CollectGoroutineProfile},
{"allocs", CollectAllocsProfile},
{"block", CollectBlockProfile},
{"mutex", CollectMutexProfile},
}

var lastErr error
for _, p := range profiles {
if err := p.fn(logPath); err != nil {
log.Logger().Errorf("failed to collect %s profile: %v", p.name, err)
lastErr = err
}
}

return lastErr
}

// StartPeriodicCollection 启动定期自动采集 pprof profile
// interval: 采集间隔时间,如果为 0 则不启用定期采集
func StartPeriodicCollection(logPath string, interval time.Duration) {
if interval <= 0 {
return
}

go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()

log.Logger().Infof("Starting periodic pprof collection, interval: %v", interval)

for range ticker.C {
log.Logger().Infof("Periodic pprof collection triggered")
if err := CollectAllProfiles(logPath); err != nil {
log.Logger().Errorf("Periodic pprof collection failed: %v", err)
}
}
}()
}
43 changes: 43 additions & 0 deletions sqle/pprof/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package pprof

import (
"fmt"
"net/http"
_ "net/http/pprof"

"github.com/actiontech/sqle/sqle/log"
)

// StartServer 启动独立的 pprof HTTP 服务器
// port: pprof 服务器监听端口,如果为 0 则不启动
func StartServer(port int) error {
if port <= 0 {
log.Logger().Infof("pprof server disabled (port: %d)", port)
return nil
}

address := fmt.Sprintf("0.0.0.0:%d", port)
log.Logger().Infof("starting pprof server on %s", address)

// pprof 包在导入时会自动注册路由到 http.DefaultServeMux
// 只需要启动一个 HTTP 服务器即可
if err := http.ListenAndServe(address, nil); err != nil {
return fmt.Errorf("pprof server failed: %v", err)
}

return nil
}

// StartServerAsync 异步启动独立的 pprof HTTP 服务器
func StartServerAsync(port int) {
if port <= 0 {
log.Logger().Infof("pprof server disabled (port: %d)", port)
return
}

go func() {
if err := StartServer(port); err != nil {
log.Logger().Errorf("pprof server error: %v", err)
}
}()
}
51 changes: 36 additions & 15 deletions sqle/sqled.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/actiontech/dms/pkg/dms-common/pkg/http"
"github.com/actiontech/sqle/sqle/api"
"github.com/actiontech/sqle/sqle/dms"
"github.com/actiontech/sqle/sqle/pprof"
knowledge_base "github.com/actiontech/sqle/sqle/server/knowledge_base"
optimization "github.com/actiontech/sqle/sqle/server/optimization"

Expand Down Expand Up @@ -58,7 +59,7 @@ func Run(options *config.SqleOptions) error {
// nofify singal
exitChan := make(chan struct{})
net := &gracenet.Net{}
go NotifySignal(exitChan, net)
go NotifySignal(exitChan, net, sqleCnf.LogPath)

// init plugins
{
Expand Down Expand Up @@ -156,6 +157,9 @@ func Run(options *config.SqleOptions) error {

go api.StartApi(net, exitChan, options, sqleSwaggerYaml)

// start independent pprof server on separate port
pprof.StartServerAsync(sqleCnf.PprofPort)

// Wait for exit signal from NotifySignal goroutine
<-exitChan
log.Logger().Infoln("sqled server will exit")
Expand All @@ -175,22 +179,39 @@ func validateConfig(options *config.SqleOptions) error {
return nil
}

func NotifySignal(exitChan chan struct{}, net *gracenet.Net) {
func NotifySignal(exitChan chan struct{}, net *gracenet.Net, logPath string) {
killChan := make(chan os.Signal, 1)
// os.Kill is like kill -9 which kills a process immediately, can't be caught
signal.Notify(killChan, os.Interrupt, syscall.SIGTERM, syscall.SIGUSR2 /*graceful-shutdown*/)
sig := <-killChan
switch sig {
case syscall.SIGUSR2:
if pid, err := net.StartProcess(); nil != err {
log.Logger().Infof("Graceful restarted by signal SIGUSR2, but failed: %v", err)
} else {
log.Logger().Infof("Graceful restarted, new pid is %v", pid)
// SIGUSR1: trigger pprof collection
// SIGUSR2: graceful restart
signal.Notify(killChan, os.Interrupt, syscall.SIGTERM, syscall.SIGUSR1, syscall.SIGUSR2)

for {
sig := <-killChan
switch sig {
case syscall.SIGUSR1:
// Trigger pprof collection
log.Logger().Infof("Received SIGUSR1, collecting pprof profiles...")
if err := pprof.CollectAllProfiles(logPath); err != nil {
log.Logger().Errorf("Failed to collect pprof profiles: %v", err)
} else {
log.Logger().Infof("pprof profiles collected successfully")
}
// Continue running after collecting profiles
continue
case syscall.SIGUSR2:
if pid, err := net.StartProcess(); nil != err {
log.Logger().Infof("Graceful restarted by signal SIGUSR2, but failed: %v", err)
} else {
log.Logger().Infof("Graceful restarted, new pid is %v", pid)
}
log.Logger().Infof("old sqled exit")
close(exitChan)
return
default:
log.Logger().Infof("Exit by signal %v", sig)
close(exitChan)
return
}
log.Logger().Infof("old sqled exit")
default:
log.Logger().Infof("Exit by signal %v", sig)
}

close(exitChan)
}
Loading