Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
370 lines (345 sloc) 12.5 KB

记一次goroutine泄漏问题查找原因与解决

问题描述: 有个需求要实现根据url参数在反向代理负载均衡处分发到不同的k8s service,反向代理负载均衡用的是caddy,于是将caddy源码拉下来,在原来proxy插件的基础上修改成以下idproxy插件,实现根据id参数分发到id-xxx.namespace的k8s服务

func init() {
  httpserver.RegisterDevDirective("idproxy", "proxy")
  caddy.RegisterPlugin("idproxy", caddy.Plugin{
    ServerType: "http",
    Action:     setup,
  })
}

type Upstream struct {
  proxy.Upstream
}

func (u Upstream) Select(r *http.Request) *proxy.UpstreamHost {
  uh := u.Upstream.Select(r)
  if uh == nil {
    return nil
  }
  if len(idproxyQueryUrl) > 0 && len(idproxyPaths) > 0 {
    for _, path := range idproxyPaths {
      if strings.HasPrefix(r.URL.Path, path) {
        id, _ := strconv.ParseInt(r.URL.Query().Get("id"), 10, 64)
        baseURL, _ := url.Parse(uh.Name)
        baseURL.Host = fmt.Sprintf("id-%d.%s", id, idproxyNamespace)
        return &proxy.UpstreamHost{
          Name:                         baseURL.String(),
          FailTimeout:                  uh.FailTimeout,
          UpstreamHeaders:              uh.UpstreamHeaders,
          DownstreamHeaders:            uh.DownstreamHeaders,
          WithoutPathPrefix:            uh.WithoutPathPrefix,
          MaxConns:                     uh.MaxConns,
          UpstreamHeaderReplacements:   uh.UpstreamHeaderReplacements,
          DownstreamHeaderReplacements: uh.DownstreamHeaderReplacements,
        }
      }
      return uh
    }
  }
  return uh
}

// setup configures a new Proxy middleware instance.
func setup(c *caddy.Controller) error {
  upstreams, err := proxy.NewStaticUpstreams(c.Dispenser, httpserver.GetConfig(c).Host())
  if err != nil {
    return err
  }
  for i := range upstreams {
    upstreams[i] = Upstream{upstreams[i]}
  }
  httpserver.GetConfig(c).AddMiddleware(func(next httpserver.Handler) httpserver.Handler {
    return proxy.Proxy{Next: next, Upstreams: upstreams}
  })

  // Register shutdown handlers.
  for _, upstream := range upstreams {
    c.OnShutdown(upstream.Stop)
  }

  return nil
}

部署上去后,用了个每秒都有请求过来的服务测试,发现pod隔一段时间就会被k8s OOMKill,重启了几十次,猜想可能是内存泄漏,查看了下内存和CPU使用图,发现内存在一直增长,但速度不快 于是马上加了下net/http/pprof包的handler来查看运行状况,发现goroutine在一直增加,并且发现有两万多goroutine卡在http包的serve这里,readRequest一直在等待,所以应该是tcp连接泄露了,但一直保持连接状态没有关闭,可能是http的连接池那里出问题了 0x7db6c2 net/http.(*conn).readRequest+0x162 /usr/local/go/src/net/http/server.go:967 0x7dfe30 net/http.(*conn).serve+0x850 /usr/local/go/src/net/http/server.go:1878 查看标准库源码查到Request都是由Transport处理的,并且tcp连接也是在这里建立并复用,所以有可能是客户端的Transport出问题了 file: src/net/http/transport.go

func (t *Transport) roundTrip(req *Request) (*Response, error) {
  code...
  // Get the cached or newly-created connection to either the
  // host (for http or https), the http proxy, or the http proxy
  // pre-CONNECTed to https server. In any case, we'll be ready
  // to send it requests.
  pconn, err := t.getConn(treq, cm)
  if err != nil {
    t.setReqCanceler(req, nil)
    req.closeBody()
    return nil, err
  }
  
  var resp *Response
  if pconn.alt != nil {
    // HTTP/2 path.
    t.decHostConnCount(cm.key()) // don't count cached http2 conns toward conns per host
    t.setReqCanceler(req, nil)   // not cancelable with CancelRequest
    resp, err = pconn.alt.RoundTrip(req)
  } else {
    resp, err = pconn.roundTrip(treq)
  }
  if err == nil {
    return resp, nil
  }
  code...
}
// getConn dials and creates a new persistConn to the target as
// specified in the connectMethod. This includes doing a proxy CONNECT
// and/or setting up TLS.  If this doesn't return an error, the persistConn
// is ready to write requests to.
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (*persistConn, error) {
  req := treq.Request
  trace := treq.trace
  ctx := req.Context()
  if trace != nil && trace.GetConn != nil {
    trace.GetConn(cm.addr())
  }
  if pc, idleSince := t.getIdleConn(cm); pc != nil {
    if trace != nil && trace.GotConn != nil {
      trace.GotConn(pc.gotIdleConnTrace(idleSince))
    }
    // set request canceler to some non-nil function so we
    // can detect whether it was cleared between now and when
    // we enter roundTrip
    t.setReqCanceler(req, func(error) {})
    return pc, nil
  }

  type dialRes struct {
    pc  *persistConn
    err error
  }
  dialc := make(chan dialRes)
  cmKey := cm.key()

  // Copy these hooks so we don't race on the postPendingDial in
  // the goroutine we launch. Issue 11136.
  testHookPrePendingDial := testHookPrePendingDial
  testHookPostPendingDial := testHookPostPendingDial

  handlePendingDial := func() {
    testHookPrePendingDial()
    go func() {
      if v := <-dialc; v.err == nil {
        t.putOrCloseIdleConn(v.pc)
      } else {
        t.decHostConnCount(cmKey)
      }
      testHookPostPendingDial()
    }()
  }

  cancelc := make(chan error, 1)
  t.setReqCanceler(req, func(err error) { cancelc <- err })

  if t.MaxConnsPerHost > 0 {
    select {
    case <-t.incHostConnCount(cmKey):
      // count below conn per host limit; proceed
    case pc := <-t.getIdleConnCh(cm):
      if trace != nil && trace.GotConn != nil {
        trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()})
      }
      return pc, nil
    case <-req.Cancel:
      return nil, errRequestCanceledConn
    case <-req.Context().Done():
      return nil, req.Context().Err()
    case err := <-cancelc:
      if err == errRequestCanceled {
        err = errRequestCanceledConn
      }
      return nil, err
    }
  }

  go func() {
    pc, err := t.dialConn(ctx, cm)
    dialc <- dialRes{pc, err}
    }()
  code...
}

对于向代理负载均衡的服务,负载均衡是客户端,于是回头查看caddy proxy的源码 file: vendor/github.com/mholt/caddy/caddyhttp/proxy/proxy.go

// ServeHTTP satisfies the httpserver.Handler interface.
func (p Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) {
  code...
  for {
    // since Select() should give us "up" hosts, keep retrying
    // hosts until timeout (or until we get a nil host).
    host := upstream.Select(r)
    if host == nil {
        if backendErr == nil {
            backendErr = errors.New("no hosts available upstream")
        }
        if !keepRetrying(backendErr) {
            break
        }
        continue
    }
    if rr, ok := w.(*httpserver.ResponseRecorder); ok && rr.Replacer != nil {
        rr.Replacer.Set("upstream", host.Name)
    }
  
    proxy := host.ReverseProxy
  
    // a backend's name may contain more than just the host,
    // so we parse it as a URL to try to isolate the host.
    if nameURL, err := url.Parse(host.Name); err == nil {
        outreq.Host = nameURL.Host
        if proxy == nil {
            proxy = NewSingleHostReverseProxy(nameURL,
                host.WithoutPathPrefix,
                http.DefaultMaxIdleConnsPerHost,
                upstream.GetTimeout(),
                upstream.GetFallbackDelay(),
            )
        }
  
        // use upstream credentials by default
        if outreq.Header.Get("Authorization") == "" && nameURL.User != nil {
            pwd, _ := nameURL.User.Password()
            outreq.SetBasicAuth(nameURL.User.Username(), pwd)
        }
    } else {
        outreq.Host = host.Name
    }
    if proxy == nil {
        return http.StatusInternalServerError, errors.New("proxy for host '" + host.Name + "' is nil")
    }
    code...
    }
  }
}

在Proxy.ServeHTTP中,因为重写的Select函数返回的UpstreamHost没有ReverseProxy,所以每次都会新建一个,但是UpstreamHost是没有存下来的,另外ReverseProxy里面又会新建Transport,所以每次新建的Transport就泄露掉了,但是处理请求建立的tcp连接因为没有主动关闭,所以服务端一直保持着长链接,直到tcp链接超时才能被关闭,但定时的请求一直在新建连接,最后导致内存超出限制,被k8s kill调 file: vendor/github.com/mholt/caddy/caddyhttp/proxy/reverseproxy.go

func NewSingleHostReverseProxy(target *url.URL, without string, keepalive int, timeout, fallbackDelay time.Duration) *ReverseProxy {
  code...
  rp := &ReverseProxy{
      Director:      director,
      FlushInterval: 250 * time.Millisecond, // flushing good for streaming & server-sent events
      srvResolver:   net.DefaultResolver,
      dialer:        &dialer,
  }

  if target.Scheme == "unix" {
      rp.Transport = &http.Transport{
          Dial: socketDial(target.String(), timeout),
      }
  } else if target.Scheme == "quic" {
      rp.Transport = &h2quic.RoundTripper{
          QuicConfig: &quic.Config{
              HandshakeTimeout: defaultCryptoHandshakeTimeout,
              KeepAlive:        true,
          },
      }
  } else if keepalive != http.DefaultMaxIdleConnsPerHost || strings.HasPrefix(target.Scheme, "srv") {
      dialFunc := rp.dialer.Dial
      if strings.HasPrefix(target.Scheme, "srv") {
          dialFunc = rp.srvDialerFunc(target.String(), timeout)
      }

      transport := &http.Transport{
          Proxy:                 http.ProxyFromEnvironment,
          Dial:                  dialFunc,
          TLSHandshakeTimeout:   defaultCryptoHandshakeTimeout,
          ExpectContinueTimeout: 1 * time.Second,
      }
      if keepalive == 0 {
          transport.DisableKeepAlives = true
      } else {
          transport.MaxIdleConnsPerHost = keepalive
      }
      if httpserver.HTTP2 {
          if err := http2.ConfigureTransport(transport); err != nil {
              log.Println("[ERROR] failed to configure transport to use HTTP/2: ", err)
          }
      }
      rp.Transport = transport
  } else {
      transport := &http.Transport{
          Proxy: http.ProxyFromEnvironment,
          Dial:  rp.dialer.Dial,
      }
      if httpserver.HTTP2 {
          if err := http2.ConfigureTransport(transport); err != nil {
              log.Println("[ERROR] failed to configure transport to use HTTP/2: ", err)
          }
      }
      rp.Transport = transport
  }
  return rp
}

解决: 将带ReverseProxy的UpstreamHost存下来复用,caddy插件改成这样

type Upstream struct {
    proxy.Upstream
    hostPool map[string]*proxy.UpstreamHost
    lock     sync.RWMutex
}

type NewHoster interface {
    NewHost(host string) (*proxy.UpstreamHost, error)
}

func (u *Upstream) getUpstreamHost(host string) *proxy.UpstreamHost {
    u.lock.RLock()
    defer u.lock.RUnlock()
    return u.hostPool[host]
}

func (u *Upstream) addUpstreamHost(host string, uh *proxy.UpstreamHost) {
    u.lock.Lock()
    defer u.lock.Unlock()
    u.hostPool[host] = uh
}

func (u *Upstream) Select(r *http.Request) *proxy.UpstreamHost {
    uh := u.Upstream.Select(r)
    if uh == nil {
        return nil
    }
    if len(idproxyQueryUrl) > 0 && len(idproxyPaths) > 0 {
        for _, path := range idproxyPaths {
            if strings.HasPrefix(r.URL.Path, path) {
                id, _ := strconv.ParseInt(r.URL.Query().Get("id"), 10, 64)
                host := fmt.Sprintf("id-%d.%s", id, idproxyNamespace)
                uh := u.getUpstreamHost(host)
                if uh != nil {
                    return uh
                }
                uh, err := u.Upstream.(NewHoster).NewHost(host)
                if err != nil {
                    log.Println(err)
                    return nil
                }
                u.addUpstreamHost(host, uh)
                return uh
            }
        }
    }
    return uh
}

// setup configures a new Proxy middleware instance.
func setup(c *caddy.Controller) error {
    upstreams, err := proxy.NewStaticUpstreams(c.Dispenser, httpserver.GetConfig(c).Host())
    if err != nil {
        return err
    }
    for i := range upstreams {
        upstreams[i] = &Upstream{
            Upstream: upstreams[i],
            hostPool: make(map[string]*proxy.UpstreamHost),
        }
    }
    httpserver.GetConfig(c).AddMiddleware(func(next httpserver.Handler) httpserver.Handler {
        return proxy.Proxy{Next: next, Upstreams: upstreams}
    })

    // Register shutdown handlers.
    for _, upstream := range upstreams {
        c.OnShutdown(upstream.Stop)
    }

    return nil
}
You can’t perform that action at this time.