Skip to content

Commit

Permalink
[Controller] The first push link sends full data
Browse files Browse the repository at this point in the history
  • Loading branch information
jin-xiaofeng committed May 10, 2024
1 parent 66afa0d commit 25e2cb7
Showing 1 changed file with 39 additions and 30 deletions.
69 changes: 39 additions & 30 deletions server/controller/trisolaris/services/grpc/synchronize/vtap.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ func (e *VTapEvent) getVTapCache(in *api.SyncRequest, orgID int) (*vtap.VTapCach
return vtapCache, nil
}

func (e *VTapEvent) pushResponse(in *api.SyncRequest) (*api.SyncResponse, error) {
func (e *VTapEvent) pushResponse(in *api.SyncRequest, all bool) (*api.SyncResponse, error) {
ctrlIP := in.GetCtrlIp()
ctrlMac := in.GetCtrlMac()
teamIDStr := in.GetTeamId()
Expand Down Expand Up @@ -705,42 +705,41 @@ func (e *VTapEvent) pushResponse(in *api.SyncRequest) (*api.SyncResponse, error)
versionPolicy := gVTapInfo.GetVTapPolicyVersion(vtapID, functions)
pushVersionPolicy := vtapCache.GetPushVersionPolicy()
newAcls := gVTapInfo.GetVTapPolicyData(vtapID, functions)
changedInfo := fmt.Sprintf("push data ctrl_ip is %s, ctrl_mac is %s, "+
"(platform data version %d -> %d), "+
"(acls version %d -> %d datalen: %d), "+
"(groups version %d -> %d), "+
"NAME:%s REVISION:%s BOOT_TIME:%d",
ctrlIP, ctrlMac,
versionPlatformData, pushVersionPlatformData,
versionPolicy, pushVersionPolicy, len(newAcls),
versionGroups, pushVersionGroups,
in.GetProcessName(), in.GetRevision(), in.GetBootTime())
if versionPlatformData != pushVersionPlatformData ||
versionGroups != pushVersionGroups || versionPolicy != pushVersionPolicy {
log.Infof("push data ctrl_ip is %s, ctrl_mac is %s, team_id: (str=%s,int=%d), org_id is %d, "+
"(platform data version %d -> %d), "+
"(acls version %d -> %d datalen: %d), "+
"(groups version %d -> %d), "+
"NAME:%s REVISION:%s BOOT_TIME:%d",
ctrlIP, ctrlMac, teamIDStr, teamIDInt, orgID,
versionPlatformData, pushVersionPlatformData,
versionPolicy, pushVersionPolicy, len(newAcls),
versionGroups, pushVersionGroups,
in.GetProcessName(), in.GetRevision(), in.GetBootTime())
log.Infof(changedInfo)
} else {
log.Debugf("push data ctrl_ip is %s, ctrl_mac is %s, team_id: (str=%s,int=%d), org_id is %d, "+
"(platform data version %d -> %d), "+
"(acls version %d -> %d), "+
"(groups version %d -> %d), "+
"NAME:%s REVISION:%s BOOT_TIME:%d",
ctrlIP, ctrlMac, teamIDStr, teamIDInt, orgID,
versionPlatformData, pushVersionPlatformData,
versionPolicy, pushVersionPolicy,
versionGroups, pushVersionGroups,
in.GetProcessName(), in.GetRevision(), in.GetBootTime())
log.Debugf(changedInfo)
}

platformData := []byte{}
if versionPlatformData != pushVersionPlatformData {
platformData = vtapCache.GetSimplePlatformDataStr()
}
groups := []byte{}
if versionGroups != pushVersionGroups {
groups = gVTapInfo.GetGroupData()
}
acls := []byte{}
if versionPolicy != pushVersionPolicy {
if all {
log.Info(changedInfo)
platformData = vtapCache.GetSimplePlatformDataStr()
groups = gVTapInfo.GetGroupData()
acls = gVTapInfo.GetVTapPolicyData(vtapID, functions)
} else {
if versionPlatformData != pushVersionPlatformData {
platformData = vtapCache.GetSimplePlatformDataStr()
}
if versionGroups != pushVersionGroups {
groups = gVTapInfo.GetGroupData()
}
if versionPolicy != pushVersionPolicy {
acls = gVTapInfo.GetVTapPolicyData(vtapID, functions)
}
}

// 只有专属采集器下发tap_types
Expand Down Expand Up @@ -781,6 +780,7 @@ func (e *VTapEvent) pushResponse(in *api.SyncRequest) (*api.SyncResponse, error)
}, nil
}

// The first push link sends full data
func (e *VTapEvent) Push(r *api.SyncRequest, in api.Synchronizer_PushServer) error {
var err error
orgID := trisolaris.GetOrgIDByTeamID(r.GetTeamId())
Expand All @@ -796,8 +796,18 @@ func (e *VTapEvent) Push(r *api.SyncRequest, in api.Synchronizer_PushServer) err

return nil
}
response, err := e.pushResponse(r, true)
if err != nil {
log.Error(err)
}
err = in.Send(response)
if err != nil {
log.Error(err)
return err
}
for {
response, err := e.pushResponse(r)
pushmanager.Wait(orgID)
response, err := e.pushResponse(r, false)
if err != nil {
log.Error(err)
}
Expand All @@ -806,7 +816,6 @@ func (e *VTapEvent) Push(r *api.SyncRequest, in api.Synchronizer_PushServer) err
log.Error(err)
break
}
pushmanager.Wait(orgID)
}
log.Info("exit agent push", r.GetCtrlIp(), r.GetCtrlMac())
return err
Expand Down

0 comments on commit 25e2cb7

Please sign in to comment.