Skip to content
Permalink
Browse files

update

  • Loading branch information...
sunface committed May 15, 2019
1 parent f5e323c commit ca49eb4d63603f0028981462136e7153ccc6cb62
Showing with 23 additions and 17 deletions.
  1. +9 −6 agent/service/pinpoint.go
  2. +4 −2 alert/service/alert.go
  3. +3 −2 collector/service/app.go
  4. +4 −5 collector/service/stats.go
  5. +3 −2 collector/storage/storage.go
@@ -177,7 +177,6 @@ func (p *Pinpoint) tcpCollector() {

// udpCollector ...
func (p *Pinpoint) udpCollector() {

// 定时器
ticker := time.NewTicker(time.Duration(misc.Conf.Pinpoint.SpanReportInterval) * time.Millisecond)
defer ticker.Stop()
@@ -237,7 +236,6 @@ func (p *Pinpoint) udpCollector() {
continue
}
tracePack.Payload = payload
// @TODO 重连优化
// 发送
if err := gAgent.collector.write(tracePack); err != nil {
logger.Warn("write", zap.String("error", err.Error()))
@@ -270,7 +268,7 @@ func (p *Pinpoint) agentInfo(conn net.Conn) error {
conn.Close()
}
}()

isSendOnline := false
reader := bufio.NewReaderSize(conn, proto.TCP_MAX_PACKET_SIZE)
buf := make([]byte, 2)
for {
@@ -430,9 +428,12 @@ func (p *Pinpoint) agentInfo(conn net.Conn) error {
gAgent.agentInfo.OperatingEnv = agentInfo.OperatingEnv
gAgent.agentInfo.AgentInfo = agentInfo.AgentInfo

if err := updateAgentStats(true); err != nil {
logger.Warn("agent update stats", zap.String("error", err.Error()), zap.Bool("live", true))
return err
if !isSendOnline {
if err := updateAgentStats(true); err != nil {
logger.Warn("agent update stats", zap.String("error", err.Error()), zap.Bool("live", true))
return err
}
isSendOnline = true
}

rePacket, err = createResponse(controlHandShake)
@@ -449,6 +450,8 @@ func (p *Pinpoint) agentInfo(conn net.Conn) error {
logger.Warn("agent update stats", zap.String("error", err.Error()), zap.Bool("live", false))
return err
}

isSendOnline = false
break

case proto.CONTROL_PING:
@@ -1,6 +1,8 @@
package service

import (
"time"

"github.com/gocql/gocql"
"github.com/imdevlab/tracing/alert/misc"
"github.com/imdevlab/tracing/alert/ticker"
@@ -86,7 +88,7 @@ func (a *Alert) initTraceCql() error {
cluster.Consistency = gocql.Quorum
//设置连接池的数量,默认是2个(针对每一个host,都建立起NumConns个连接)
cluster.NumConns = misc.Conf.DB.NumConns

cluster.ReconnectInterval = 1 * time.Second
session, err := cluster.CreateSession()
if err != nil {
logger.Warn("create session", zap.String("error", err.Error()))
@@ -103,7 +105,7 @@ func (a *Alert) initStaticCql() error {
cluster.Consistency = gocql.Quorum
//设置连接池的数量,默认是2个(针对每一个host,都建立起NumConns个连接)
cluster.NumConns = misc.Conf.DB.NumConns

cluster.ReconnectInterval = 1 * time.Second
session, err := cluster.CreateSession()
if err != nil {
logger.Warn("create session", zap.String("error", err.Error()))
@@ -259,10 +259,11 @@ func (a *App) start() {
// 获取任务ID
a.taskID = gCollector.ticker.NewID()
logger.Info("app start", zap.String("name", a.name), zap.Int64("taskID", a.taskID))
// 加入定时模块

// 统计信息定时服务
gCollector.ticker.AddTask(a.taskID, a.tickerC)

// api二次定时模块
// api二次聚合计算定时服务
gCollector.apiTicker.AddTask(a.taskID, a.apiTickerC)

// 启动计算模块
@@ -406,13 +406,11 @@ func (s *Stats) eventCounter(appName, apiStr string, event *trace.TSpanEvent) {
if isDubbo(event.GetServiceType()) {
// dubbo
s.dubboCounter(appName, event)
} else {
} else if isHttp(event.GetServiceType()) {
// http 统计
s.urlCounter(event)
}

// 数据库统计
if isDB(event.GetServiceType()) {
} else if isDB(event.GetServiceType()) {
// 数据库统计
s.sqlCount(event)
}

@@ -504,6 +502,7 @@ func cutVip(destination string) string {

// 计算child拓扑图
func (s *Stats) targetMapCounter(event *trace.TSpanEvent) {

if event.ServiceType == constant.DUBBO_CONSUMER ||
event.ServiceType == constant.HTTP_CLIENT_3 ||
event.ServiceType == constant.HTTP_CLIENT_3_INTERNAL ||
@@ -60,12 +60,13 @@ func (s *Storage) initTraceCql() error {
cluster.Consistency = gocql.Quorum
//设置连接池的数量,默认是2个(针对每一个host,都建立起NumConns个连接)
cluster.NumConns = misc.Conf.Storage.NumConns

cluster.ReconnectInterval = 1 * time.Second
session, err := cluster.CreateSession()
if err != nil {
s.logger.Warn("create session", zap.String("error", err.Error()))
return err
}

s.traceCql = session
return nil
}
@@ -77,7 +78,7 @@ func (s *Storage) initStaticCql() error {
cluster.Consistency = gocql.Quorum
//设置连接池的数量,默认是2个(针对每一个host,都建立起NumConns个连接)
cluster.NumConns = misc.Conf.Storage.NumConns

cluster.ReconnectInterval = 1 * time.Second
session, err := cluster.CreateSession()
if err != nil {
s.logger.Warn("create session", zap.String("error", err.Error()))

0 comments on commit ca49eb4

Please sign in to comment.
You can’t perform that action at this time.