Skip to content

Commit

Permalink
golint, fix pkg path
Browse files Browse the repository at this point in the history
  • Loading branch information
nejisama committed Sep 11, 2018
1 parent 3a4ca5b commit c3b3ff0
Show file tree
Hide file tree
Showing 15 changed files with 63 additions and 119 deletions.
2 changes: 1 addition & 1 deletion examples/tcpproxy-sample/rpc_server.go
Expand Up @@ -5,7 +5,7 @@ import (
"net"
"time"

"github.com/alipay/sofa-mosn/pkg/network/buffer"
"github.com/alipay/sofa-mosn/pkg/buffer"
"github.com/alipay/sofa-mosn/pkg/protocol/sofarpc"
"github.com/alipay/sofa-mosn/pkg/protocol/sofarpc/codec"
)
Expand Down
44 changes: 22 additions & 22 deletions pkg/config/callback.go
Expand Up @@ -53,20 +53,20 @@ func (config *MOSNConfig) OnAddOrUpdateListeners(listeners []*pb.Listener) {
}
}

if listenerAdapter := server.GetListenerAdapterInstance(); listenerAdapter == nil {
listenerAdapter := server.GetListenerAdapterInstance()
if listenerAdapter == nil {
// if listenerAdapter is nil, return directly
log.DefaultLogger.Errorf("listenerAdapter is nil and hasn't been initiated at this time")
return
} else {
log.DefaultLogger.Debugf("listenerAdapter.AddOrUpdateListener called, with mosn Listener:%+v, networkFilters:%+v, streamFilters: %+v",
listeners, networkFilters, streamFilters)
}
log.DefaultLogger.Debugf("listenerAdapter.AddOrUpdateListener called, with mosn Listener:%+v, networkFilters:%+v, streamFilters: %+v",
listeners, networkFilters, streamFilters)

if err := listenerAdapter.AddOrUpdateListener("", mosnListener, networkFilters, streamFilters); err == nil {
log.DefaultLogger.Debugf("xds AddOrUpdateListener success,listener address = %s", mosnListener.Addr.String())
} else {
log.DefaultLogger.Errorf("xds AddOrUpdateListener failure,listener address = %s, msg = %s ",
mosnListener.Addr.String(), err.Error())
}
if err := listenerAdapter.AddOrUpdateListener("", mosnListener, networkFilters, streamFilters); err == nil {
log.DefaultLogger.Debugf("xds AddOrUpdateListener success,listener address = %s", mosnListener.Addr.String())
} else {
log.DefaultLogger.Errorf("xds AddOrUpdateListener failure,listener address = %s, msg = %s ",
mosnListener.Addr.String(), err.Error())
}
}
}
Expand All @@ -78,17 +78,17 @@ func (config *MOSNConfig) OnDeleteListeners(listeners []*pb.Listener) {
continue
}

if listenerAdapter := server.GetListenerAdapterInstance(); listenerAdapter == nil {
listenerAdapter := server.GetListenerAdapterInstance()
if listenerAdapter == nil {
log.DefaultLogger.Errorf("listenerAdapter is nil and hasn't been initiated at this time")
return
}
if err := listenerAdapter.DeleteListener("", mosnListener.Name); err == nil {
log.DefaultLogger.Debugf("xds OnDeleteListeners success,listener address = %s", mosnListener.Addr.String())
} else {
if err := listenerAdapter.DeleteListener("", mosnListener.Name); err == nil {
log.DefaultLogger.Debugf("xds OnDeleteListeners success,listener address = %s", mosnListener.Addr.String())
} else {
log.DefaultLogger.Errorf("xds OnDeleteListeners failure,listener address = %s, mag = %s ",
mosnListener.Addr.String(), err.Error())
log.DefaultLogger.Errorf("xds OnDeleteListeners failure,listener address = %s, mag = %s ",
mosnListener.Addr.String(), err.Error())

}
}
}
}
Expand Down Expand Up @@ -152,16 +152,16 @@ func (config *MOSNConfig) OnUpdateEndpoints(loadAssignments []*pb.ClusterLoadAss

clusterMngAdapter := clusterAdapter.GetClusterMngAdapterInstance()
if clusterMngAdapter == nil {
log.DefaultLogger.Errorf("xds client update Error: clusterMngAdapter nil , hosts are %+v:", hosts)
errGlobal = fmt.Errorf("xds client update Error: clusterMngAdapter nil , hosts are %+v:", hosts)
log.DefaultLogger.Errorf("xds client update Error: clusterMngAdapter nil , hosts are %+v", hosts)
errGlobal = fmt.Errorf("xds client update Error: clusterMngAdapter nil , hosts are %+v", hosts)
}

if err := clusterAdapter.GetClusterMngAdapterInstance().TriggerClusterHostUpdate(clusterName, hosts); err != nil {
log.DefaultLogger.Errorf("xds client update Error = %s, hosts are %+v:", err.Error(), hosts)
errGlobal = fmt.Errorf("xds client update Error = %s, hosts are %+v:", err.Error(), hosts)
log.DefaultLogger.Errorf("xds client update Error = %s, hosts are %+v", err.Error(), hosts)
errGlobal = fmt.Errorf("xds client update Error = %s, hosts are %+v", err.Error(), hosts)

} else {
log.DefaultLogger.Debugf("xds client update host success,hosts are %+v:", hosts)
log.DefaultLogger.Debugf("xds client update host success,hosts are %+v", hosts)
}
}
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/config/parser.go
Expand Up @@ -44,11 +44,10 @@ var protocolsSupported = map[string]bool{
func RegisterProtocolParser(key string) bool {
if _, ok := protocolsSupported[key]; ok {
return false
} else {
log.StartLogger.Infof(" %s added to protocolsSupported", key)
protocolsSupported[key] = true
return true
}
log.StartLogger.Infof(" %s added to protocolsSupported", key)
protocolsSupported[key] = true
return true
}

// ParsedCallback is an
Expand Down
4 changes: 2 additions & 2 deletions pkg/proxy/proxy.go
Expand Up @@ -95,11 +95,11 @@ func NewProxy(ctx context.Context, config *v2.Proxy, clusterManager types.Cluste

proxy.context = buffer.NewBufferPoolContext(ctx, false)

extJson, err := json.Marshal(proxy.config.ExtendConfig)
extJSON, err := json.Marshal(proxy.config.ExtendConfig)
if err == nil {
log.DefaultLogger.Tracef("proxy extend config = %v", proxy.config.ExtendConfig)
var xProxyExtendConfig v2.XProxyExtendConfig
json.Unmarshal([]byte(extJson), &xProxyExtendConfig)
json.Unmarshal([]byte(extJSON), &xProxyExtendConfig)
proxy.context = context.WithValue(proxy.context, types.ContextSubProtocol, xProxyExtendConfig.SubProtocol)
log.DefaultLogger.Tracef("proxy extend config subprotocol = %v", xProxyExtendConfig.SubProtocol)
} else {
Expand Down
2 changes: 1 addition & 1 deletion pkg/router/routeruleimpl.go
Expand Up @@ -42,7 +42,7 @@ func NewRouteRuleImplBase(vHost *VirtualHostImpl, route *v2.Router) (RouteRuleIm
routerAction: route.Route,
clusterName: route.Route.ClusterName,
randInstance: rand.New(rand.NewSource(time.Now().UnixNano())),
configHeaders: getRouterHeades(route.Match.Headers),
configHeaders: getRouterHeaders(route.Match.Headers),
}

routeRuleImplBase.weightedClusters, routeRuleImplBase.totalClusterWeight = getWeightedClusterEntry(route.Route.WeightedClusters)
Expand Down
42 changes: 3 additions & 39 deletions pkg/router/routeruleimpl_test.go
Expand Up @@ -206,37 +206,6 @@ func TestWeightedClusterSelect(t *testing.T) {
},
}

routerMock3 := &v2.Router{}
routerMock3.Route = v2.RouteAction{
RouterActionConfig: v2.RouterActionConfig{
ClusterName: "defaultCluster",
WeightedClusters: []v2.WeightedCluster{
{
Cluster: v2.ClusterWeight{
ClusterWeightConfig: v2.ClusterWeightConfig{
Name: "w1",
Weight: 50,
},
MetadataMatch: v2.Metadata{
"version": "v1",
},
},
},
{
Cluster: v2.ClusterWeight{
ClusterWeightConfig: v2.ClusterWeightConfig{
Name: "w2",
Weight: 40,
},
MetadataMatch: v2.Metadata{
"version": "v2",
},
},
},
},
},
}

type testCase struct {
routerCase []*v2.Router
ratio []uint
Expand All @@ -257,11 +226,11 @@ func TestWeightedClusterSelect(t *testing.T) {
clusterName := routeRuleImplBase.ClusterName()
switch clusterName {
case "defaultCluster":
dcCount += 1
dcCount++
case "w1":
w1Count += 1
w1Count++
case "w2":
w2Count += 1
w2Count++
}
}

Expand All @@ -272,9 +241,4 @@ func TestWeightedClusterSelect(t *testing.T) {
}
t.Log("defalut = ", dcCount, "w1 = ", w1Count, "w2 =", w2Count)
}

routeRuleImplBase, _ := NewRouteRuleImplBase(nil, routerMock3)
if len(routeRuleImplBase.weightedClusters) != 0 {
t.Errorf("wanted invalid weighted cluster init but not")
}
}
6 changes: 3 additions & 3 deletions pkg/router/utility.go
Expand Up @@ -41,7 +41,7 @@ func getClusterMosnLBMetaDataMap(metadata v2.Metadata) types.RouteMetaData {
// "runtimeKey" and "loader" are not used currently
func getWeightedClusterEntry(weightedClusters []v2.WeightedCluster) (map[string]weightedClusterEntry, uint32) {
var weightedClusterEntries = make(map[string]weightedClusterEntry)
var totalWeight uint32 = 0
var totalWeight uint32
for _, weightedCluster := range weightedClusters {
subsetLBMetaData := weightedCluster.Cluster.MetadataMatch
totalWeight = totalWeight + weightedCluster.Cluster.Weight
Expand All @@ -56,7 +56,7 @@ func getWeightedClusterEntry(weightedClusters []v2.WeightedCluster) (map[string]
return weightedClusterEntries, totalWeight
}

func getRouterHeades(heades []v2.HeaderMatcher) []*types.HeaderData {
func getRouterHeaders(heades []v2.HeaderMatcher) []*types.HeaderData {
var headerDatas []*types.HeaderData

for _, header := range heades {
Expand All @@ -72,7 +72,7 @@ func getRouterHeades(heades []v2.HeaderMatcher) []*types.HeaderData {
if pattern, err := regexp.Compile(header.Name); err != nil {
headerData.RegexPattern = pattern
} else {
log.DefaultLogger.Errorf("getRouterHeades compile error")
log.DefaultLogger.Errorf("getRouterHeaders compile error")
continue
}
}
Expand Down
16 changes: 1 addition & 15 deletions pkg/router/utility_test.go
Expand Up @@ -69,27 +69,13 @@ func Test_getWeightedClusterEntryAndVerify(t *testing.T) {
},
},
},
{
name: "case1",
args: args{
weightedClusters: []v2.WeightedCluster{
{Cluster: v2.ClusterWeight{ClusterWeightConfig: v2.ClusterWeightConfig{Name: "c1", Weight: 50}, MetadataMatch: v2.Metadata{"label": "green", "version": "v1"}}},
{Cluster: v2.ClusterWeight{ClusterWeightConfig: v2.ClusterWeightConfig{Name: "c2", Weight: 30}, MetadataMatch: v2.Metadata{"label": "blue", "version": "v2"}}},
{Cluster: v2.ClusterWeight{ClusterWeightConfig: v2.ClusterWeightConfig{Name: "c3", Weight: 10}, MetadataMatch: v2.Metadata{"label": "gray", "version": "v0"}}},
},
},
want: result{
valid: false,
value: nil,
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
entry, _ := getWeightedClusterEntry(tt.args.weightedClusters)
if !reflect.DeepEqual(entry, tt.want.value) {
t.Errorf("get weighted cluster entry and verify name = %s got1 = %v, want %v", tt.want, entry, tt.want.value)
t.Errorf("get weighted cluster entry and verify name = %s got1 = %v, want %v", tt.name, entry, tt.want.value)
}
})
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/adapter.go
Expand Up @@ -74,7 +74,7 @@ func (adapter *ListenerAdapter) AddOrUpdateListener(serverName string, lc *v2.Li
listener, err := connHandler.AddOrUpdateListener(lc, networkFiltersFactories, streamFiltersFactories)

if err != nil {
return fmt.Errorf("connHandler.AddOrUpdateListener called error:", err.Error())
return fmt.Errorf("connHandler.AddOrUpdateListener called error: %s", err.Error())
}

if listener == nil {
Expand Down
21 changes: 10 additions & 11 deletions pkg/server/handler.go
Expand Up @@ -450,20 +450,19 @@ func (al *activeListener) OnNewConnection(ctx context.Context, conn types.Connec
// no filter found, close connection
conn.Close(types.NoFlush, types.LocalClose)
return
} else {
ac := newActiveConnection(al, conn)
}
ac := newActiveConnection(al, conn)

al.connsMux.Lock()
e := al.conns.PushBack(ac)
al.connsMux.Unlock()
ac.element = e
al.connsMux.Lock()
e := al.conns.PushBack(ac)
al.connsMux.Unlock()
ac.element = e

al.stats.DownstreamConnectionActive().Inc(1)
al.stats.DownstreamConnectionTotal().Inc(1)
atomic.AddInt64(&al.handler.numConnections, 1)
al.stats.DownstreamConnectionActive().Inc(1)
al.stats.DownstreamConnectionTotal().Inc(1)
atomic.AddInt64(&al.handler.numConnections, 1)

al.logger.Debugf("new downstream connection %d accepted", conn.ID())
}
al.logger.Debugf("new downstream connection %d accepted", conn.ID())

// todo: this hack is due to http2 protocol process. golang http2 provides a io loop to read/write stream
if !al.disableConnIo {
Expand Down
4 changes: 2 additions & 2 deletions pkg/stream/http/codecclient_test.go
Expand Up @@ -38,7 +38,7 @@ type mockClient struct {
t *testing.T
}

func NewMockClient(t *testing.T) *mockClient {
func newMockClient(t *testing.T) *mockClient {
c := &mockClient{
t: t,
}
Expand Down Expand Up @@ -66,7 +66,7 @@ func checkNumbers(t *testing.T, codecClient str.CodecClient, want int) {
}

func TestActiveRequests(t *testing.T) {
cli := NewMockClient(t)
cli := newMockClient(t)
host := cluster.NewHost(v2.Host{
HostConfig: v2.HostConfig{
Address: "127.0.0.1", Hostname: "test", Weight: 0,
Expand Down
4 changes: 2 additions & 2 deletions pkg/stream/xprotocol/stream.go
Expand Up @@ -160,8 +160,8 @@ func (conn *streamConnection) Dispatch(buffer types.IoBuffer) {
if ok {
serviceName := tracingCodec.GetServiceName(request)
methodName := tracingCodec.GetMethodName(request)
headers[types.HeaderRpcService] = serviceName
headers[types.HeaderRpcMethod] = methodName
headers[types.HeaderRPCService] = serviceName
headers[types.HeaderRPCMethod] = methodName
log.DefaultLogger.Tracef("xprotocol handle tracing ,serviceName = %v , methodName = %v", serviceName, methodName)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/types/exception.go
Expand Up @@ -29,8 +29,8 @@ const (
HeaderTryTimeout = "x-mosn-try-timeout"
HeaderException = "x-mosn-exception"
HeaderStremEnd = "x-mosn-endstream"
HeaderRpcService = "x-mosn-rpc-service"
HeaderRpcMethod = "x-mosn-rpc-method"
HeaderRPCService = "x-mosn-rpc-service"
HeaderRPCMethod = "x-mosn-rpc-method"
)

// Error messages
Expand Down
10 changes: 4 additions & 6 deletions pkg/upstream/cluster/clustermanager.go
Expand Up @@ -115,10 +115,9 @@ func (cm *clusterManager) AddOrUpdatePrimaryCluster(cluster v2.Cluster) bool {
if v, exist := cm.primaryClusters.Load(clusterName); exist {
if !v.(*primaryCluster).addedViaAPI {
return false
} else {
// update cluster
return cm.updateCluster(cluster, v.(*primaryCluster), true)
}
// update cluster
return cm.updateCluster(cluster, v.(*primaryCluster), true)
}
// add new cluster
return cm.loadCluster(cluster, true)
Expand Down Expand Up @@ -264,10 +263,9 @@ func (cm *clusterManager) RemoveClusterHost(clusterName string, hostAddress stri
log.DefaultLogger.Debugf("RemoveClusterHost success, host address = %s", hostAddress)
// concretedCluster.UpdateHosts(ccHosts)
return nil
} else {
return fmt.Errorf("RemoveClusterHost failed, host address = %s doesn't exist", hostAddress)

}
return fmt.Errorf("RemoveClusterHost failed, host address = %s doesn't exist", hostAddress)

}

return fmt.Errorf("RemoveClusterHost failed, cluster name = %s is not valid", clusterName)
Expand Down
14 changes: 6 additions & 8 deletions pkg/upstream/cluster/loadbalancer_test.go
Expand Up @@ -166,7 +166,7 @@ func TestSmoothWeightedRRLoadBalancer_ChooseHost(t *testing.T) {

for _, tt := range tests {
var a, b, c float64
var i float64 = 0
var i float64

l1 := newSmoothWeightedRRLoadBalancer(tt.args)
runningTimes := float64(rand.Int31n(1000))
Expand All @@ -177,10 +177,9 @@ func TestSmoothWeightedRRLoadBalancer_ChooseHost(t *testing.T) {
if host == nil {
if tt.name == "zeroTest" {
return
} else {
t.Errorf("test sommoth loalbalancer err, want a = %f, b = %f, c = %f, got a = %f, b=%f, c=%f, case = %s", a/runningTimes, b/runningTimes, c/runningTimes,
tt.want[0], tt.want[1], tt.want[2], tt.name)
}
t.Errorf("test sommoth loalbalancer err, want a = %f, b = %f, c = %f, got a = %f, b=%f, c=%f, case = %s", a/runningTimes, b/runningTimes, c/runningTimes,
tt.want[0], tt.want[1], tt.want[2], tt.name)
}

switch host.Hostname() {
Expand Down Expand Up @@ -280,18 +279,17 @@ func TestSmoothWeightedRRLoadBalancer_UpdateHost(t *testing.T) {

runningTimes := float64(rand.Int31n(1000))
var a, b, c, d float64
var i float64 = 0
var i float64

for ; i < runningTimes; i++ {
host := loadbBalancer.ChooseHost(nil)

if host == nil {
if tt.name == "zeroTest" {
return
} else {
t.Errorf("test sommoth loalbalancer err, want a = %f, b = %f, c = %f, got a = %f, b=%f, c=%f, case = %s", a/runningTimes, b/runningTimes, c/runningTimes,
tt.want[0], tt.want[1], tt.want[2], tt.name)
}
t.Errorf("test sommoth loalbalancer err, want a = %f, b = %f, c = %f, got a = %f, b=%f, c=%f, case = %s", a/runningTimes, b/runningTimes, c/runningTimes,
tt.want[0], tt.want[1], tt.want[2], tt.name)
}

switch host.Hostname() {
Expand Down

0 comments on commit c3b3ff0

Please sign in to comment.