Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ticket21315 #3

Merged
merged 12 commits into from Jun 28, 2019
@@ -149,10 +149,20 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
return
}
log.Println("Received snowflake: ", id)

// Log geoip stats
remoteIP, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
log.Println("Error processing proxy IP: ", err.Error())
} else {
ctx.metrics.UpdateCountryStats(remoteIP)
}

// Wait for a client to avail an offer to the snowflake, or timeout if nil.
offer := ctx.RequestOffer(id)
if nil == offer {
log.Println("Proxy " + id + " did not receive a Client offer.")
ctx.metrics.proxyIdleCount++
w.WriteHeader(http.StatusGatewayTimeout)
return
}
@@ -176,6 +186,7 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
// Immediately fail if there are no snowflakes available.
if ctx.snowflakes.Len() <= 0 {
log.Println("Client: No snowflake proxies available.")
ctx.metrics.clientDeniedCount++
w.WriteHeader(http.StatusServiceUnavailable)
return
}
@@ -189,6 +200,7 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
select {
case answer := <-snowflake.answerChannel:
log.Println("Client: Retrieving answer")
ctx.metrics.clientProxyMatchCount++
w.Write(answer)
// Initial tracking of elapsed time.
ctx.metrics.clientRoundtripEstimate = time.Since(startTime) /
@@ -221,15 +233,6 @@ func proxyAnswers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
return
}

// Get proxy country stats
remoteIP, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
log.Println("Error processing proxy IP: ", err.Error())
} else {

ctx.metrics.UpdateCountryStats(remoteIP)
}

log.Println("Received answer.")
snowflake.answerChannel <- body
}
@@ -289,7 +292,7 @@ func main() {
metricsFile = os.Stdout
}

metricsLogger := log.New(metricsFile, "", log.LstdFlags|log.LUTC)
metricsLogger := log.New(metricsFile, "", 0)

This comment has been minimized.

@NullHypothesis

NullHypothesis Jun 13, 2019
Contributor

Why are we getting rid of the flags here? I'm not saying we shouldn't; I'm just curious.

This comment has been minimized.

@cohosh

cohosh Jun 14, 2019
Author Owner

The flags for loggers are mostly about formatting timestamps and prefixes for log messages. We don't want any prefixes for the log file, only the information in the specification.


ctx := NewBrokerContext(metricsLogger)

@@ -1,9 +1,51 @@
/*
We export metrics in the following format:
"snowflake-stats-end" YYYY-MM-DD HH:MM:SS (NSEC s) NL
[At most once.]
YYYY-MM-DD HH:MM:SS defines the end of the included measurement
interval of length NSEC seconds (86400 seconds by default).
"snowflake-ips" CC=NUM,CC=NUM,... NL
[At most once.]
List of mappings from two-letter country codes to the number of
unique IP addresses of snowflake proxies that have polled.
"snowflake-ips-total" NUM NL
[At most once.]
A count of the total number of unique IP addresses of snowflake
proxies that have polled.
"snowflake-idle-count" NUM NL
[At most once.]
A count of the number of times a proxy has polled but received
no client offer, rounded up to the nearest multiple of 8.
"client-denied-count" NUM NL
[At most once.]
A count of the number of times a client has requested a proxy
from the broker but no proxies were available, rounded up to
the nearest multiple of 8.
"client-snowflake-match-count" NUM NL
[At most once.]
A count of the number of times a client successfully received a
proxy from the broker, rounded up to the nearest multiple of 8.
*/

package main

import (
// "golang.org/x/net/internal/timeseries"
"fmt"
"log"
"math"
"net"
"sync"
"time"
@@ -13,28 +55,49 @@ var (
once sync.Once
)

const metricsResolution = 60 * 60 * 24 * time.Second //86400 seconds

type CountryStats struct {
addrs map[string]bool
counts map[string]int
}

// Implements Observable
type Metrics struct {
tablev4 *GeoIPv4Table
tablev6 *GeoIPv6Table
countryStats CountryStats
// snowflakes timeseries.Float
logger *log.Logger
tablev4 *GeoIPv4Table
tablev6 *GeoIPv6Table

countryStats CountryStats
clientRoundtripEstimate time.Duration
proxyIdleCount uint
clientDeniedCount uint
clientProxyMatchCount uint
}

func (s CountryStats) Display() string {
return fmt.Sprint(s.counts)
output := ""
for cc, count := range s.counts {
output += fmt.Sprintf("%s=%d,", cc, count)
This conversation was marked as resolved by cohosh

This comment has been minimized.

@NullHypothesis

NullHypothesis Jun 13, 2019
Contributor

This leads to the line ending with a trailing comma. Tor doesn't do this in its bridge-ips line so I think we shouldn't do it here either.

}

// cut off trailing ","
if len(output) > 0 {
return output[:len(output)-1]
}

return output
}

func (m *Metrics) UpdateCountryStats(addr string) {

var country string
var ok bool

if m.countryStats.addrs[addr] {

This comment has been minimized.

@NullHypothesis

NullHypothesis Jun 14, 2019
Contributor

Go suggests the use of two-value assignments to test for the existence of a key, but I think relying on the zero value of a bool should also be fine: https://blog.golang.org/go-maps-in-action#TOC_3.

return
}

ip := net.ParseIP(addr)
if ip.To4() != nil {
//This is an IPv4 address
@@ -54,8 +117,9 @@ func (m *Metrics) UpdateCountryStats(addr string) {
log.Println("Unknown geoip")
}

//update map of countries and counts
//update map of unique ips and counts
m.countryStats.counts[country]++
m.countryStats.addrs[addr] = true

return
}
@@ -90,19 +154,45 @@ func NewMetrics(metricsLogger *log.Logger) (*Metrics, error) {

m.countryStats = CountryStats{
counts: make(map[string]int),
addrs: make(map[string]bool),
}

m.logger = metricsLogger

// Write to log file every hour with updated metrics
go once.Do(func() {
heartbeat := time.Tick(time.Hour)
for range heartbeat {
metricsLogger.Println("Country stats: ", m.countryStats.Display())
go once.Do(m.logMetrics)

return m, nil
}

//restore all metrics to original values
m.countryStats.counts = make(map[string]int)
// Logs metrics in intervals specified by metricsResolution
func (m *Metrics) logMetrics() {
heartbeat := time.Tick(metricsResolution)
for range heartbeat {
m.printMetrics()
m.zeroMetrics()
}
}

}
})
func (m *Metrics) printMetrics() {
m.logger.Println("snowflake-stats-end", time.Now().UTC().Format("2006-01-02 15:04:05"), fmt.Sprintf("(%d s)", int(metricsResolution.Seconds())))
m.logger.Println("snowflake-ips", m.countryStats.Display())
m.logger.Println("snowflake-ips-total", len(m.countryStats.addrs))
m.logger.Println("snowflake-idle-count", binCount(m.proxyIdleCount))
m.logger.Println("client-denied-count", binCount(m.clientDeniedCount))
m.logger.Println("client-snowflake-match-count", binCount(m.clientProxyMatchCount))
}

return m, nil
// Restores all metrics to original values
func (m *Metrics) zeroMetrics() {
m.proxyIdleCount = 0
m.clientDeniedCount = 0
m.clientProxyMatchCount = 0
m.countryStats.counts = make(map[string]int)
m.countryStats.addrs = make(map[string]bool)
}

// Rounds up a count to the nearest multiple of 8.
func binCount(count uint) uint {
return uint((math.Ceil(float64(count) / 8)) * 8)
}
@@ -11,6 +11,7 @@ import (
"net/http/httptest"
"os"
"testing"
"time"
)

func NullLogger() *log.Logger {
@@ -390,3 +391,131 @@ func TestGeoip(t *testing.T) {

})
}

func TestMetrics(t *testing.T) {

Convey("Test metrics...", t, func() {
done := make(chan bool)
buf := new(bytes.Buffer)
ctx := NewBrokerContext(log.New(buf, "", 0))

err := ctx.metrics.LoadGeoipDatabases("test_geoip", "test_geoip6")
So(err, ShouldEqual, nil)

//Test addition of proxy polls
Convey("for proxy polls", func() {
w := httptest.NewRecorder()
data := bytes.NewReader([]byte("test"))
r, err := http.NewRequest("POST", "snowflake.broker/proxy", data)
r.Header.Set("X-Session-ID", "test")
r.RemoteAddr = "129.97.208.23:8888" //CA geoip
So(err, ShouldBeNil)
go func(ctx *BrokerContext) {
proxyPolls(ctx, w, r)
done <- true
}(ctx)
p := <-ctx.proxyPolls //manually unblock poll
p.offerChannel <- nil
<-done

ctx.metrics.printMetrics()
So(buf.String(), ShouldResemble, "snowflake-stats-end "+time.Now().UTC().Format("2006-01-02 15:04:05")+" (86400 s)\nsnowflake-ips CA=1\nsnowflake-ips-total 1\nsnowflake-idle-count 8\nclient-denied-count 0\nclient-snowflake-match-count 0\n")
})

//Test addition of client failures
Convey("for no proxies available", func() {
w := httptest.NewRecorder()
data := bytes.NewReader([]byte("test"))
r, err := http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil)

clientOffers(ctx, w, r)

ctx.metrics.printMetrics()
So(buf.String(), ShouldResemble, "snowflake-stats-end "+time.Now().UTC().Format("2006-01-02 15:04:05")+" (86400 s)\nsnowflake-ips \nsnowflake-ips-total 0\nsnowflake-idle-count 0\nclient-denied-count 8\nclient-snowflake-match-count 0\n")

// Test reset
buf.Reset()
ctx.metrics.zeroMetrics()
ctx.metrics.printMetrics()
So(buf.String(), ShouldResemble, "snowflake-stats-end "+time.Now().UTC().Format("2006-01-02 15:04:05")+" (86400 s)\nsnowflake-ips \nsnowflake-ips-total 0\nsnowflake-idle-count 0\nclient-denied-count 0\nclient-snowflake-match-count 0\n")
})
//Test addition of client matches
Convey("for client-proxy match", func() {
w := httptest.NewRecorder()
data := bytes.NewReader([]byte("test"))
r, err := http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil)

// Prepare a fake proxy to respond with.
snowflake := ctx.AddSnowflake("fake")
go func() {
clientOffers(ctx, w, r)
done <- true
}()
offer := <-snowflake.offerChannel
So(offer, ShouldResemble, []byte("test"))
snowflake.answerChannel <- []byte("fake answer")
<-done

ctx.metrics.printMetrics()
So(buf.String(), ShouldResemble, "snowflake-stats-end "+time.Now().UTC().Format("2006-01-02 15:04:05")+" (86400 s)\nsnowflake-ips \nsnowflake-ips-total 0\nsnowflake-idle-count 0\nclient-denied-count 0\nclient-snowflake-match-count 8\n")
})
//Test rounding boundary
Convey("binning boundary", func() {
w := httptest.NewRecorder()
data := bytes.NewReader([]byte("test"))
r, err := http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil)

clientOffers(ctx, w, r)
clientOffers(ctx, w, r)
clientOffers(ctx, w, r)
clientOffers(ctx, w, r)
clientOffers(ctx, w, r)
clientOffers(ctx, w, r)
clientOffers(ctx, w, r)
clientOffers(ctx, w, r)

ctx.metrics.printMetrics()
So(buf.String(), ShouldResemble, "snowflake-stats-end "+time.Now().UTC().Format("2006-01-02 15:04:05")+" (86400 s)\nsnowflake-ips \nsnowflake-ips-total 0\nsnowflake-idle-count 0\nclient-denied-count 8\nclient-snowflake-match-count 0\n")

clientOffers(ctx, w, r)
buf.Reset()
ctx.metrics.printMetrics()
So(buf.String(), ShouldResemble, "snowflake-stats-end "+time.Now().UTC().Format("2006-01-02 15:04:05")+" (86400 s)\nsnowflake-ips \nsnowflake-ips-total 0\nsnowflake-idle-count 0\nclient-denied-count 16\nclient-snowflake-match-count 0\n")
})

//Test unique ip
Convey("proxy counts by unique ip", func() {
w := httptest.NewRecorder()
data := bytes.NewReader([]byte("test"))
r, err := http.NewRequest("POST", "snowflake.broker/proxy", data)
r.Header.Set("X-Session-ID", "test")
r.RemoteAddr = "129.97.208.23:8888" //CA geoip
So(err, ShouldBeNil)
go func(ctx *BrokerContext) {
proxyPolls(ctx, w, r)
done <- true
}(ctx)
p := <-ctx.proxyPolls //manually unblock poll
p.offerChannel <- nil
<-done

data = bytes.NewReader([]byte("test"))
r, err = http.NewRequest("POST", "snowflake.broker/proxy", data)
r.Header.Set("X-Session-ID", "test")
r.RemoteAddr = "129.97.208.23:8888" //CA geoip
go func(ctx *BrokerContext) {
proxyPolls(ctx, w, r)
done <- true
}(ctx)
p = <-ctx.proxyPolls //manually unblock poll
p.offerChannel <- nil
<-done

ctx.metrics.printMetrics()
So(buf.String(), ShouldResemble, "snowflake-stats-end "+time.Now().UTC().Format("2006-01-02 15:04:05")+" (86400 s)\nsnowflake-ips CA=1\nsnowflake-ips-total 1\nsnowflake-idle-count 8\nclient-denied-count 0\nclient-snowflake-match-count 0\n")
})
})
}