Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leveled logging #45

Merged
merged 11 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions go/0kn/cmd/xtrellis/client.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package main

import (
"log"

"github.com/31333337/bmrng/go/0kn/pkg/utils"
"github.com/31333337/bmrng/go/trellis/client"
"github.com/31333337/bmrng/go/trellis/config"
"github.com/31333337/bmrng/go/trellis/errors"
Expand All @@ -17,21 +16,27 @@ func LaunchClient(args ArgsClient) {
addr := args.Addr
errors.Addr = addr

log.Printf("Launching client with address %s", addr)
logger := utils.GetLogger()
sugar := logger.Sugar()
defer sugar.Sync()
sugar.Infow(
"Launching client",
"address %s", addr,
)

servers, err := config.UnmarshalServersFromFile(serversFile)
if err != nil {
log.Fatalf("Could not read servers file %s", serversFile)
sugar.Fatalf("Could not read servers file %s", serversFile)
}

groups, err := config.UnmarshalGroupsFromFile(groupsFile)
if err != nil {
log.Fatalf("Could not read group file %s", groupsFile)
sugar.Fatalf("Could not read group file %s", groupsFile)
}

clients, err := config.UnmarshalServersFromFile(clientsFile)
if err != nil {
log.Fatalf("Could not read clients file %s", clientsFile)
sugar.Fatalf("Could not read clients file %s", clientsFile)
}

clientRunner := client.NewClientRunner(servers, groups)
Expand Down
17 changes: 14 additions & 3 deletions go/0kn/cmd/xtrellis/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ const (
)

func LaunchCoordinator(args ArgsCoordinator, argParser *arg.Parser) {
logger := utils.GetLogger()
sugar := logger.Sugar()
defer sugar.Sync()
sugar.Info("Started Launch Coordinator")
processArgs(&args, argParser)

switch {
Expand All @@ -45,6 +49,10 @@ func LaunchCoordinator(args ArgsCoordinator, argParser *arg.Parser) {
}

func processArgs(args *ArgsCoordinator, argParser *arg.Parser) {
logger := utils.GetLogger()
sugar := logger.Sugar()
defer sugar.Sync()

if args.GroupSize == 0 {
if args.F != 0 {
if args.NumGroups != 0 {
Expand All @@ -53,7 +61,7 @@ func processArgs(args *ArgsCoordinator, argParser *arg.Parser) {
args.GroupSize, args.NumGroups = config.CalcFewGroups2(args.F, args.NumServers)
}
} else {
log.Printf("Set groupsize or f")
sugar.Infof("Set groupsize or f")
argParser.WriteHelp(os.Stdout)
return
}
Expand All @@ -68,7 +76,7 @@ func processArgs(args *ArgsCoordinator, argParser *arg.Parser) {
if args.F != 0 {
args.NumLayers = config.NumLayers(args.NumUsers, args.F)
} else {
log.Printf("Set numlayers or f")
sugar.Infof("Set numlayers or f")
argParser.WriteHelp(os.Stdout)
return
}
Expand All @@ -94,7 +102,10 @@ func processArgs(args *ArgsCoordinator, argParser *arg.Parser) {
args.NumClientServers = 0
}

log.Printf("%+v", args)
sugar.Infow(
"args passed to xtrellis",
"args: %v", args,
)
}

func setupNetwork(args ArgsCoordinator) *coordinator.CoordinatorNetwork {
Expand Down
21 changes: 14 additions & 7 deletions go/0kn/cmd/xtrellis/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"log"

"github.com/31333337/bmrng/go/0kn/internal/conf"
"github.com/31333337/bmrng/go/0kn/pkg/utils"
"github.com/31333337/bmrng/go/trellis/config"
"github.com/31333337/bmrng/go/trellis/errors"
"github.com/31333337/bmrng/go/trellis/network"
Expand All @@ -21,36 +22,42 @@ func LaunchServer(args ArgsServer) {

// from trellis/cmd/server/
func runServer(args ArgsServer) {

serversFile := args.ServerFile
groupsFile := args.GroupFile
addr := args.Addr
serverPrivateFile := args.ServerPrivateFile
errors.Addr = addr

log.Printf("Launching server with address %s", addr)
logger := utils.GetLogger()
sugar := logger.Sugar()
defer sugar.Sync()
sugar.Infow(
"Launching server",
"address %s", addr,
)

// load public server config for mix-net round
servers, err := config.UnmarshalServersFromFile(serversFile)
if err != nil {
log.Fatalf("Could not read servers file %s: %v", serversFile, err)
sugar.Fatalf("Could not read servers file %s", serversFile)
}

// find server id by address in public config
id, _ := network.FindConfig(addr, servers)
if id < 0 {
log.Fatalf("Could not find %s in servers file", addr)
sugar.Fatalf("Could not find %s in servers file", addr)
}

// load private server config
serversPrivate, err := config.UnmarshalServersFromFile(serverPrivateFile)
if err != nil {
log.Fatalf("Could not read private servers file %s: %v", serverPrivateFile, err)
sugar.Fatalf("Could not read private servers file %s: %v", serverPrivateFile, err)
}

// find server config by address in private config
_, cfg := network.FindConfig(addr, serversPrivate)
if cfg == nil {
log.Fatalf("Could not find %s in private servers file", addr)
sugar.Fatalf("Could not find %s in private servers file", addr)
}

// replace public config with private (complete) config
Expand All @@ -59,7 +66,7 @@ func runServer(args ArgsServer) {

groups, err := config.UnmarshalGroupsFromFile(groupsFile)
if err != nil {
log.Fatalf("Could not read group file %s: %v", groupsFile, err)
sugar.Fatalf("Could not read group file %s: %v", groupsFile, err)
}

// will start in blocked state
Expand Down
5 changes: 0 additions & 5 deletions go/0kn/cmd/xtrellis/xtrellis.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"path/filepath"

arg "github.com/alexflint/go-arg"

"github.com/31333337/bmrng/go/0kn/pkg/utils"
)

// set the working directory from env var and change to the directory
Expand Down Expand Up @@ -51,9 +49,6 @@ func main() {
var args Args
argParser := arg.MustParse(&args)

utils.SetDebugLogEnabled(args.Debug)
utils.SetDebugLogCallerEnabled(args.DebugCaller)

setWorkingDirectory()

switch {
Expand Down
2 changes: 2 additions & 0 deletions go/0kn/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/31333337/bmrng/api v0.0.0-20231013211547-989695299184
github.com/31333337/bmrng/go/trellis v0.0.0-20231013211547-989695299184
github.com/alexflint/go-arg v1.4.3
go.uber.org/zap v1.26.0
google.golang.org/protobuf v1.31.0
)

Expand All @@ -15,6 +16,7 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/oasisprotocol/curve25519-voi v0.0.0-20230904125328-1f23a7beb09a // indirect
go.dedis.ch/kyber/v3 v3.1.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
Expand Down
12 changes: 10 additions & 2 deletions go/0kn/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
go.dedis.ch/fixbuf v1.0.3/go.mod h1:yzJMt34Wa5xD37V5RTdmp38cz3QhMagdGoem9anUalw=
go.dedis.ch/kyber/v3 v3.0.4/go.mod h1:OzvaEnPvKlyrWyp3kGXlFdp7ap1VC6RkZDTaPikqhsQ=
go.dedis.ch/kyber/v3 v3.0.9/go.mod h1:rhNjUUg6ahf8HEg5HUvVBYoWY4boAafX8tYxX+PS+qg=
Expand All @@ -34,6 +35,12 @@ go.dedis.ch/kyber/v3 v3.1.0/go.mod h1:kXy7p3STAurkADD+/aZcsznZGKVHEqbtmdIzvPfrs1
go.dedis.ch/protobuf v1.0.5/go.mod h1:eIV4wicvi6JK0q/QnfIEGeSFNG0ZeB24kzut5+HaRLo=
go.dedis.ch/protobuf v1.0.7/go.mod h1:pv5ysfkDX/EawiPqcW3ikOxsL5t+BqnV6xHSmE79KI4=
go.dedis.ch/protobuf v1.0.11/go.mod h1:97QR256dnkimeNdfmURz0wAMNVbd1VmLXhG1CrTYrJ4=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
golang.org/x/crypto v0.0.0-20190123085648-057139ce5d2b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
Expand All @@ -54,5 +61,6 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
35 changes: 21 additions & 14 deletions go/0kn/pkg/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,9 @@ func GetMessageForClient(clientId int64) ([]byte, error) {
// This replaces `coordinator.Check` testing for message ids.
// Note: There are duplicates (why?), so sort unique.
func CheckFinalMessages(messages [][]byte, numExpected int) bool {
logger := utils.GetLogger()
sugar := logger.Sugar()
defer sugar.Sync()

// get a packet identifier that is unique among all packets of a round
getPacketUID := func(h *gatewayv1.PacketHeader) uint64 {
Expand Down Expand Up @@ -322,25 +325,22 @@ func CheckFinalMessages(messages [][]byte, numExpected int) bool {

switch p.Type {
case gatewayv1.PacketType_PACKET_TYPE_START:
utils.DebugLog("[Gateway] <<< [mix-net] 🟢 START stream [%d]", id)
sugar.Debugf("[Gateway] <<< [mix-net] 🟢 START stream [%d]", id)
streamOut[id] = NewMessageQueue()
streamOutStateMu.Lock()
streamOutState[id] = STREAM_OUT_START
streamOutStateMu.Unlock()
break

case gatewayv1.PacketType_PACKET_TYPE_DATA:
utils.DebugLog("[Gateway] <<< [mix-net] 🔶 DATA stream [%d][%d]", id, p.Sequence)
sugar.Debugf("[Gateway] <<< [mix-net] 🔶 DATA stream [%d][%d]", id, p.Sequence)
uid := getPacketUID(p)
streamOut[id].Enqueue(uniqueMessages[uid])
break

case gatewayv1.PacketType_PACKET_TYPE_END:
utils.DebugLog("[Gateway] <<< [mix-net] 🟥 END stream [%d]", id)
sugar.Debugf("[Gateway] <<< [mix-net] 🟥 END stream [%d]", id)
streamOutStateMu.Lock()
streamOutState[id] = STREAM_OUT_END
streamOutStateMu.Unlock()
break
}
}

Expand Down Expand Up @@ -396,10 +396,14 @@ func proxyStart(addrIn string) {
func proxyHandleConnection(conn net.Conn) {
defer conn.Close()

logger := utils.GetLogger()
sugar := logger.Sugar()
defer sugar.Sync()

streamId := getStreamId()
var packetCounter uint64 = 0

utils.DebugLog("[Gateway] >>> Accepted connection from %s id=%d", conn.RemoteAddr(), streamId)
sugar.Debugf("[Gateway] >>> Accepted connection from %s id=%d", conn.RemoteAddr(), streamId)

// Send a message through the mix-net
sendMessage := func(h *gatewayv1.PacketHeader, dataOrMessage []byte, packed bool) {
Expand All @@ -416,7 +420,7 @@ func proxyHandleConnection(conn net.Conn) {
}
}

utils.DebugLog("[Gateway] >>> [mix-net] Send stream [%d][%d]", h.StreamId, h.Sequence)
sugar.Debugf("[Gateway] >>> [mix-net] Send stream [%d][%d]", h.StreamId, h.Sequence)

// stage message for a mix-net client to pick it up
msgQueueIn.Enqueue(message)
Expand All @@ -442,7 +446,7 @@ func proxyHandleConnection(conn net.Conn) {
}

// prepare message buffer, determine space remaining for data
messageBuffer, space, err := prepareMessageBuffer(header)
messageBuffer, space, _ := prepareMessageBuffer(header)

// read at most "space" bytes from stream, record number ("n") of bytes actually read
data := make([]byte, space)
Expand All @@ -452,11 +456,11 @@ func proxyHandleConnection(conn net.Conn) {
// end transmission
if err == io.EOF {
header.Type = gatewayv1.PacketType_PACKET_TYPE_END
utils.DebugLog("[Gateway] >>> Finished receiving data stream id=%d", streamId)
sugar.Debugf("[Gateway] >>> Finished receiving data stream id=%d", streamId)

} else {
header.Type = gatewayv1.PacketType_PACKET_TYPE_ERROR
utils.DebugLog("[Gateway] >>> Error receiving data stream id=%d: %s", streamId, err.Error())
sugar.Debugf("[Gateway] >>> Error receiving data stream id=%d: %s", streamId, err.Error())
}

sendMessage(header, nil, false)
Expand All @@ -483,6 +487,9 @@ func getStreamId() uint64 {
// An HTTP server to serve data output
// conceptual placeholder to get mixed data out of the gateway in lieu of more protocol
func httpServerStart(addrOut string) {
logger := utils.GetLogger()
sugar := logger.Sugar()
defer sugar.Sync()
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
var id uint64 = 0

Expand Down Expand Up @@ -516,11 +523,11 @@ func httpServerStart(addrOut string) {
}

// wait until there is a stream
utils.DebugLog("[Gateway] <<< Waiting for stream...")
sugar.Debugf("[Gateway] <<< Waiting for stream...")
time.Sleep(time.Duration(40) * time.Millisecond)
}

utils.DebugLog("[Gateway] <<< stream leaving gateway id=%d, number of streams=%d", id, len(streamOutState))
sugar.Debugf("[Gateway] <<< stream leaving gateway id=%d, number of streams=%d", id, len(streamOutState))

if id != 0 {
for {
Expand All @@ -535,7 +542,7 @@ func httpServerStart(addrOut string) {
break
} else if state == STREAM_OUT_START {
// if stream has not finished transmitting, wait for more data to exit the mix-net
utils.DebugLog("[Gateway] <<< Waiting for stream data to exit mix-net...")
sugar.Debugf("[Gateway] <<< Waiting for stream data to exit mix-net...")
time.Sleep(time.Duration(10) * time.Millisecond)
continue
}
Expand Down
31 changes: 31 additions & 0 deletions go/0kn/pkg/utils/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package utils

import (
"encoding/json"

"go.uber.org/zap"
)

func GetLogger() *zap.Logger {
// TODO: get this from a config file
rawJSON := []byte(`{
"level": "debug",
"encoding": "json",
"outputPaths": ["stdout", "/tmp/logs"],
"errorOutputPaths": ["stderr"],
"initialFields": {"foo": "bar"},
"encoderConfig": {
"messageKey": "message",
"levelKey": "level",
"levelEncoder": "lowercase"
}
}`)

var cfg zap.Config
if err := json.Unmarshal(rawJSON, &cfg); err != nil {
panic(err)
}
logger := zap.Must(cfg.Build())

return logger
}
Loading
Loading