Skip to content

Commit

Permalink
#79
Browse files Browse the repository at this point in the history
  • Loading branch information
sunface committed May 15, 2019
1 parent 68eb7fd commit 27a1776
Show file tree
Hide file tree
Showing 10 changed files with 207 additions and 176 deletions.
8 changes: 4 additions & 4 deletions agent/agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ agent:

etcd:
addrs:
# - "127.0.0.1:2379"
- "127.0.0.1:2379"
# 测试环境
- "10.7.24.191:2379"
- "10.7.24.192:2379"
# - "10.7.24.191:2379"
# - "10.7.24.192:2379"
timeout: 10
watchdir: "/testcollecotr"
watchdir: "/collecotr"

collector:
keeplive: 2
Expand Down
272 changes: 128 additions & 144 deletions agent/service/agent.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,24 @@
package service

import (
"os"
"strings"
"sync/atomic"
"time"

"github.com/imdevlab/tracing/pkg/constant"
"github.com/imdevlab/tracing/pkg/network"
"github.com/vmihailenco/msgpack"

"go.uber.org/zap"
)

// Agent ...
type Agent struct {
appName string // 服务名
agentID string // 服务agent ID
etcd *Etcd // 服务发现
collector *Collector // 监控指标上报
pinpoint *Pinpoint // pinpoint采集服务
isLive bool // app是否存活
isReportInfo bool // 是否已经上报agentInfo
syncID uint32 // 同步请求ID
syncCall *SyncCall // 同步请求
agentInfo *network.AgentInfo // 监控上报的agent info原信息
appName string // 服务名
agentID string // 服务agent ID
etcd *Etcd // 服务发现
collector *Collector // 监控指标上报
pinpoint *Pinpoint // pinpoint采集服务
isLive bool // app是否存活
syncID uint32 // 同步请求ID
syncCall *SyncCall // 同步请求
agentInfo *network.AgentInfo // 监控上报的agent info原信息
}

var gAgent *Agent
Expand Down Expand Up @@ -65,7 +59,7 @@ func (a *Agent) Start() error {
}

// agent 信息上报服务
go reportAgentInfo()
// go reportAgentInfo()

logger.Info("Agent start ok")

Expand All @@ -77,134 +71,124 @@ func (a *Agent) Close() error {
return nil
}

func getApplicationName() error {
hostname, err := os.Hostname()
if err != nil {
logger.Warn("get host name error", zap.Error(err))
return err
}

names := strings.Split(hostname, "-")
if len(names) == 1 {
gAgent.appName = hostname
} else if len(names) == 3 {
gAgent.appName = names[1]
} else if len(names) == 4 {
gAgent.appName = names[1] + names[2]
}
return nil
}

func getAgentID() error {
hostname, err := os.Hostname()
if err != nil {
logger.Warn("get host name error", zap.Error(err))
return err
}
names := strings.Split(hostname, "-")
if len(names) == 1 {
gAgent.agentID = hostname
} else if len(names) == 3 {
var id string
if strings.EqualFold(names[2], "vip") {
id = "v"
} else if strings.EqualFold(names[2], "yf") {
id = "y"
} else {
id = names[2]
}
gAgent.agentID = names[1] + id
} else if len(names) == 4 {
var id string
if strings.EqualFold(names[3], "vip") {
id = "v"
} else if strings.EqualFold(names[3], "yf") {
id = "y"
} else {
id = names[3]
}
gAgent.agentID = names[1] + names[2] + id
}
return nil
}

// getAppname 获取本机App名
func (a *Agent) getAppname() error {

getApplicationName()
getAgentID()

logger.Info("init", zap.String("appName", a.appName))
logger.Info("init", zap.String("agentID", a.agentID))

return nil
}

// reportAgentInfo 上报agent 信息
func reportAgentInfo() {
for {
time.Sleep(1 * time.Second)
if !gAgent.isLive {
continue
}
break
}
for {
time.Sleep(10 * time.Second)
spanPackets := network.NewSpansPacket()
spanPackets.Type = constant.TypeOfTCPData
spanPackets.AppName = gAgent.appName
spanPackets.AgentID = gAgent.agentID

agentInfo, err := msgpack.Marshal(gAgent.agentInfo)
if err != nil {
logger.Warn("msgpack Marshal", zap.String("error", err.Error()))
continue
}
spans := &network.Spans{
Spans: agentInfo,
}
if gAgent.isLive == false {
spans.Type = constant.TypeOfAgentOffline
} else {
spans.Type = constant.TypeOfRegister
}

spanPackets.Payload = append(spanPackets.Payload, spans)
payload, err := msgpack.Marshal(spanPackets)
if err != nil {
logger.Warn("msgpack Marshal", zap.String("error", err.Error()))
continue
}

id := gAgent.getSyncID()
tracePacket := &network.TracePack{
Type: constant.TypeOfPinpoint,
IsSync: constant.TypeOfSyncYes,
IsCompress: constant.TypeOfCompressNo,
ID: id,
Payload: payload,
}

if err := gAgent.collector.write(tracePacket); err != nil {
logger.Warn("write info", zap.String("error", err.Error()))
continue
}

// 创建chan
if _, ok := gAgent.syncCall.newChan(id, 10); !ok {
logger.Warn("syncCall newChan", zap.String("error", "创建sync chan失败"))
continue
}

// 阻塞同步等待,并关闭chan
if _, err := gAgent.syncCall.syncRead(id, 10, true); err != nil {
logger.Warn("syncRead", zap.String("error", err.Error()))
continue
}
time.Sleep(10 * time.Second)
}
}
// // reportAgentInfo 上报agent 信息
// func reportAgentInfo() {
// for {
// time.Sleep(1 * time.Second)
// if !gAgent.isLive {
// continue
// }
// break
// }
// for {
// time.Sleep(10 * time.Second)
// spanPackets := network.NewSpansPacket()
// spanPackets.Type = constant.TypeOfTCPData
// spanPackets.AppName = gAgent.appName
// spanPackets.AgentID = gAgent.agentID

// agentInfo, err := msgpack.Marshal(gAgent.agentInfo)
// if err != nil {
// logger.Warn("msgpack Marshal", zap.String("error", err.Error()))
// continue
// }
// spans := &network.Spans{
// Spans: agentInfo,
// }
// if gAgent.isLive == false {
// spans.Type = constant.TypeOfAgentOffline
// } else {
// spans.Type = constant.TypeOfRegister
// }

// spanPackets.Payload = append(spanPackets.Payload, spans)
// payload, err := msgpack.Marshal(spanPackets)
// if err != nil {
// logger.Warn("msgpack Marshal", zap.String("error", err.Error()))
// continue
// }

// tracePacket := &network.TracePack{
// Type: constant.TypeOfPinpoint,
// IsSync: constant.TypeOfSyncNo,
// IsCompress: constant.TypeOfCompressNo,
// Payload: payload,
// }

// if err := gAgent.collector.write(tracePacket); err != nil {
// logger.Warn("write info", zap.String("error", err.Error()))
// continue
// }

// time.Sleep(10 * time.Second)
// }
// }

// // reportAgentInfo 上报agent 信息
// func reportAgentInfo() {
// for {
// time.Sleep(1 * time.Second)
// if !gAgent.isLive {
// continue
// }
// break
// }
// for {
// time.Sleep(10 * time.Second)
// spanPackets := network.NewSpansPacket()
// spanPackets.Type = constant.TypeOfTCPData
// spanPackets.AppName = gAgent.appName
// spanPackets.AgentID = gAgent.agentID

// agentInfo, err := msgpack.Marshal(gAgent.agentInfo)
// if err != nil {
// logger.Warn("msgpack Marshal", zap.String("error", err.Error()))
// continue
// }
// spans := &network.Spans{
// Spans: agentInfo,
// }
// if gAgent.isLive == false {
// spans.Type = constant.TypeOfAgentOffline
// } else {
// spans.Type = constant.TypeOfRegister
// }

// spanPackets.Payload = append(spanPackets.Payload, spans)
// payload, err := msgpack.Marshal(spanPackets)
// if err != nil {
// logger.Warn("msgpack Marshal", zap.String("error", err.Error()))
// continue
// }

// id := gAgent.getSyncID()
// tracePacket := &network.TracePack{
// Type: constant.TypeOfPinpoint,
// IsSync: constant.TypeOfSyncYes,
// IsCompress: constant.TypeOfCompressNo,
// ID: id,
// Payload: payload,
// }

// if err := gAgent.collector.write(tracePacket); err != nil {
// logger.Warn("write info", zap.String("error", err.Error()))
// continue
// }

// // 创建chan
// if _, ok := gAgent.syncCall.newChan(id, 10); !ok {
// logger.Warn("syncCall newChan", zap.String("error", "创建sync chan失败"))
// continue
// }

// // 阻塞同步等待,并关闭chan
// if _, err := gAgent.syncCall.syncRead(id, 10, true); err != nil {
// logger.Warn("syncRead", zap.String("error", err.Error()))
// continue
// }
// time.Sleep(10 * time.Second)
// }
// }

// getSyncID ...
func (a *Agent) getSyncID() uint32 {
Expand Down
1 change: 0 additions & 1 deletion agent/service/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ func (t *tcpClient) init() error {
for {
select {
case <-ticker.C:
// logger.Debug("keeplive", zap.String("addr", t.addr))
if err := t.keeplive(); err != nil {
logger.Warn("keeplive", zap.String("error", err.Error()))
return
Expand Down
Loading

0 comments on commit 27a1776

Please sign in to comment.