Skip to content

Commit

Permalink
optimize websocket log
Browse files Browse the repository at this point in the history
  • Loading branch information
andeya committed May 12, 2017
1 parent 2239312 commit 5007d19
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 63 deletions.
26 changes: 26 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,33 @@
*.o
*.a
*.so
_obj
_test
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe
*.exe~
*.test
*.prof
*.rar
*.zip
*.gz
*.psd
*.bmd
*.cfg
*.pptx
*.log
*nohup.out
*.sublime-project
*.sublime-workspace

pholcus
pholcus_pkg/cache
pholcus_pkg/file_out
pholcus_pkg/history
Expand Down
2 changes: 1 addition & 1 deletion app/aid/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/henrylee2cn/pholcus/app/downloader/request"
"github.com/henrylee2cn/pholcus/app/downloader/surfer"
"github.com/henrylee2cn/pholcus/common/ping"
"github.com/henrylee2cn/pholcus/config"
"github.com/henrylee2cn/pholcus/logs"
"sync/atomic"
)

type Proxy struct {
Expand Down
117 changes: 55 additions & 62 deletions web/logsocketController.go
Original file line number Diff line number Diff line change
@@ -1,105 +1,98 @@
package web

import (
"regexp"
"sync"
"sync/atomic"

ws "github.com/henrylee2cn/pholcus/common/websocket"
"github.com/henrylee2cn/pholcus/logs"
)

// log发送api
// send log api
func wsLogHandle(conn *ws.Conn) {
defer func() {
if p := recover(); p != nil {
logs.Log.Error("%v", p)
}
}()
var err error
// var err error
sess, _ := globalSessions.SessionStart(nil, conn.Request())
sessID := sess.SessionID()
if Lsc.connPool[sessID] == nil {
connPool := Lsc.connPool.Load().(map[string]*ws.Conn)
if connPool[sessID] == nil {
Lsc.Add(sessID, conn)
}
go func() {
defer func() {
// 关闭web前端log输出并断开websocket连接
Lsc.Remove(sessID, conn)
}()
for {
if err := ws.JSON.Receive(conn, nil); err != nil {
// logs.Log.Debug("websocket log接收出错断开 (%v) !", err)
return
}
}
defer func() {
Lsc.Remove(sessID)
}()

for msg := range Lsc.lvPool[sessID].logChan {
if _, err = ws.Message.Send(conn, msg); err != nil {
for {
if err := ws.JSON.Receive(conn, nil); err != nil {
return
}
}
}

type LogSocketController struct {
connPool map[string]*ws.Conn
lvPool map[string]*LogView
connPool atomic.Value
lock sync.Mutex
}

var (
// Lsc log set
Lsc = func() *LogSocketController {
l := new(LogSocketController)
l.connPool.Store(make(map[string]*ws.Conn))
return l
}()
colorRegexp = regexp.MustCompile("\033\\[[0-9]{1,2}m")
)

func (self *LogSocketController) Write(p []byte) (int, error) {
for sessID, lv := range self.lvPool {
if self.connPool[sessID] != nil {
lv.Write(p)
defer func() {
recover()
}()
p = colorRegexp.ReplaceAll(p, []byte{})
connPool := self.connPool.Load().(map[string]*ws.Conn)
for sessID, conn := range connPool {
_, err := ws.Message.Send(conn, (string(p) + "\r\n"))
if err != nil {
self.Remove(sessID)
}
}
return len(p), nil
}

func (self *LogSocketController) Add(sessID string, conn *ws.Conn) {
self.connPool[sessID] = conn
self.lvPool[sessID] = newLogView()
self.lock.Lock()
defer self.lock.Unlock()

connPool := self.connPool.Load().(map[string]*ws.Conn)
newConnPool := make(map[string]*ws.Conn, len(connPool)+1)
for k, v := range connPool {
newConnPool[k] = v
}
newConnPool[sessID] = conn
self.connPool.Store(newConnPool)
}

func (self *LogSocketController) Remove(sessID string, conn *ws.Conn) {
func (self *LogSocketController) Remove(sessID string) {
self.lock.Lock()
defer self.lock.Unlock()
defer func() {
recover()
}()
if self.connPool[sessID] == nil {
connPool := self.connPool.Load().(map[string]*ws.Conn)
conn := connPool[sessID]
if conn == nil {
return
}
lv := self.lvPool[sessID]
lv.closed = true
close(lv.logChan)
conn.Close()
delete(self.connPool, sessID)
delete(self.lvPool, sessID)
}

var Lsc = &LogSocketController{
connPool: make(map[string]*ws.Conn),
lvPool: make(map[string]*LogView),
}

// 设置所有log输出位置为Log
type LogView struct {
closed bool
logChan chan string
}

func newLogView() *LogView {
return &LogView{
logChan: make(chan string, 1024),
closed: false,
}
}

func (self *LogView) Write(p []byte) (int, error) {
if self.closed {
goto end
newConnPool := make(map[string]*ws.Conn, len(connPool)+1)
for k, v := range connPool {
if k != sessID {
newConnPool[k] = v
}
}
defer func() { recover() }()
self.logChan <- (string(p) + "\r\n")
end:
return len(p), nil
}

func (self *LogView) Sprint() string {
return <-self.logChan
self.connPool.Store(newConnPool)
}

0 comments on commit 5007d19

Please sign in to comment.