Skip to content

Commit

Permalink
1.Support auto relink discovery;2.Add ip tool
Browse files Browse the repository at this point in the history
  • Loading branch information
AbericYang committed May 11, 2019
1 parent 42f456e commit 363e606
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 14 deletions.
17 changes: 17 additions & 0 deletions discovery/consul/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"net/http"
"strconv"
"strings"
"time"
)

// agentRegister 注册服务到 consul
Expand All @@ -40,6 +41,22 @@ import (
//
// port:注册到 consul 的服务端口(优先通过环境变量 PORT 获取)
func agentRegister(consulURL, serviceID, serviceName, hostname string, port int) {
agentServiceChecks, slips := ServiceCheck(selfServiceName)
if nil == slips {
have := false
for index := range agentServiceChecks {
if agentServiceChecks[index].Service.ID == selfServiceID {
have = true
}
}
if have {
return
}
} else {
time.Sleep(1 * time.Second)
ReEnroll()
return
}
if containerID, err := file.ReadFileFirstLine("/etc/hostname"); nil == err && str.IsEmpty(hostname) {
hostname = containerID
} else {
Expand Down
6 changes: 4 additions & 2 deletions examples/trans1/trans1.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/ennoo/rivet/rivet"
"github.com/ennoo/rivet/trans/request"
"github.com/ennoo/rivet/trans/response"
"github.com/ennoo/rivet/utils/ip"
"github.com/ennoo/rivet/utils/slip"
"github.com/gin-gonic/gin"
"net/http"
Expand Down Expand Up @@ -50,6 +51,7 @@ func shunt1get1(context *gin.Context) {
}

func shunt1post1(context *gin.Context) {
fmt.Println("ip = ", ip.Get(context.Request))
rivet.Request().Callback(context, http.MethodPost, "http://localhost:8082", "rivet/post", func() *response.Result {
return &response.Result{ResultCode: response.Success, Msg: "降级处理"}
})
Expand All @@ -72,7 +74,7 @@ func shunt1post2(context *gin.Context) {
uri := "v1/agent/health/service/name/test"
_, err := request.SyncPoolGetRequest().RestJSON(method, remote, uri, nil)
if nil != err {
sliper := err.(*slip.Slip)
fmt.Println("sliper = ", sliper.Msg)
slips := err.(*slip.Slip)
fmt.Println("slips = ", slips.Msg)
}
}
26 changes: 16 additions & 10 deletions rivet/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ import (
)

var (
hc = false
sm = false
ud = false
cp string
sn string
hc = false // 是否开启健康检查。开启后为 Get 请求,路径为 /health/check
sm = false // 是否开启外界服务管理功能
ud = false // 是否启用发现服务
cp string // 启用的发现服务组件类型
sn string // 注册到发现服务的服务名称(优先通过环境变量 SERVICE_NAME 获取)
)

// ListenServe 启动监听端口服务对象
Expand Down Expand Up @@ -68,11 +68,13 @@ func Initialize(healthCheck bool, serverManager bool, loadBalance bool) {

// UseDiscovery 启用指定的发现服务
//
// component:启用的发现服务组件类型
//
// url:consul 等发现服务注册地址,包括端口号(优先通过环境变量 CONSUL_URL 获取)
//
// serviceName:注册到 consul 等发现服务的服务名称(优先通过环境变量 SERVICE_NAME 获取)
// serviceName:注册到发现服务的服务名称(优先通过环境变量 SERVICE_NAME 获取)
//
// hostname:注册到 consul 等发现服务的服务地址(如果为空,则尝试通过 /etc/hostname 获取)
// hostname:注册到发现服务的服务地址(如果为空,则尝试通过 /etc/hostname 获取)
//
// port:注册到 consul 的服务端口(优先通过环境变量 PORT 获取)
func UseDiscovery(component, url, serviceName, hostname string, port int) {
Expand All @@ -83,7 +85,11 @@ func UseDiscovery(component, url, serviceName, hostname string, port int) {
if !ud {
log.Rivet.Info("use discovery service {}" + discovery.ComponentConsul)
ud = true
consul.Enroll(url, serviceID, ServiceName(), hostname, port)
if request.LB {
consul.Enroll(url, serviceID, ServiceName(), hostname, port)
} else {
go scheduled.ConsulEnroll(url, serviceID, ServiceName(), hostname, port)
}
}
}
}
Expand All @@ -99,9 +105,9 @@ func SetupRouter(routes ...func(*gin.Engine)) *gin.Engine {
}
if request.LB {
if ud {
scheduled.CheckService(sn, cp)
scheduled.CheckService(serviceID, sn, cp)
} else {
scheduled.CheckService(sn, "")
scheduled.CheckService(serviceID, sn, "")
}
}
for _, route := range routes {
Expand Down
53 changes: 53 additions & 0 deletions scheduled/task_discovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2019. ENNOO - All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package scheduled

import (
"github.com/ennoo/rivet/discovery/consul"
"github.com/robfig/cron"
)

// ConsulEnroll 定时调用此方法检查注册 consul 是否失效
//
// consulUrl:consul 注册地址,包括端口号(优先通过环境变量 CONSUL_URL 获取)
//
// serviceID:注册到 consul 的服务 ID
//
// serviceName:注册到 consul 的服务名称(优先通过环境变量 SERVICE_NAME 获取)
//
// hostname:注册到 consul 的服务地址(如果为空,则尝试通过 /etc/hostname 获取)
//
// port:注册到 consul 的服务端口(优先通过环境变量 PORT 获取)
func ConsulEnroll(consulURL, serviceID, serviceName, hostname string, port int) {
enrollOnce := false
c := cron.New()
// 每隔5秒执行一次:*/5 * * * * ?
// 每隔1分钟执行一次:0 */1 * * * ?
// 每天23点执行一次:0 0 23 * * ?
// 每天凌晨1点执行一次:0 0 1 * * ?
// 每月1号凌晨1点执行一次:0 0 1 1 * ?
// 在26分、29分、33分执行一次:0 26,29,33 * * * ?
// 每天的0点、13点、18点、21点都执行一次:0 0 0,13,18,21 * * ?
_ = c.AddFunc("*/10 * * * * ?", func() {
if enrollOnce {
consul.ReEnroll()
} else {
enrollOnce = true
consul.Enroll(consulURL, serviceID, serviceName, hostname, port)
}
})
c.Start()
}
4 changes: 3 additions & 1 deletion scheduled/task_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ import (
)

var (
selfServiceID string
selfServiceName string
selfDiscoveryComponent string
)

// CheckService 检查可用负载服务列表
func CheckService(serviceName, component string) {
func CheckService(serviceID, serviceName, component string) {
selfServiceID = serviceID
selfServiceName = serviceName
selfDiscoveryComponent = component
if str.IsEmpty(selfDiscoveryComponent) {
Expand Down
8 changes: 7 additions & 1 deletion scheduled/task_service_consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,13 @@ func checkServicesByConsul(abortDiscovery chan int) {
// 检查发现服务状态
agentServiceChecks, slips := consul.ServiceCheck(selfServiceName)
if nil == slips {
if nil == agentServiceChecks || len(agentServiceChecks) <= 0 {
have := false
for index := range agentServiceChecks {
if agentServiceChecks[index].Service.ID == selfServiceID {
have = true
}
}
if !have {
consul.ReEnroll()
}
} else {
Expand Down
39 changes: 39 additions & 0 deletions utils/ip/ip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2019. ENNOO - All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package ip

import (
"net"
"net/http"
)

// Get 返回客户端 IP
func Get(req *http.Request) string {
remoteAddr := req.RemoteAddr
if ip := req.Header.Get("X-Real-IP"); ip != "" {
remoteAddr = ip
} else if ip = req.Header.Get("X-Forwarded-For"); ip != "" {
remoteAddr = ip
} else {
remoteAddr, _, _ = net.SplitHostPort(remoteAddr)
}

if remoteAddr == "::1" {
remoteAddr = "127.0.0.1"
}

return remoteAddr
}

0 comments on commit 363e606

Please sign in to comment.