Skip to content

Commit

Permalink
Merge 8738d9b into bab29b9
Browse files Browse the repository at this point in the history
  • Loading branch information
cupen authored Jan 25, 2021
2 parents bab29b9 + 8738d9b commit b3cd691
Show file tree
Hide file tree
Showing 12 changed files with 127 additions and 38 deletions.
6 changes: 6 additions & 0 deletions _examples/cluster-restartgracefully/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ ttl:=10s
loops:=10000
clients:=10
interval:=0ms
env=prod

start:
tmux new-session -d -s eg
tmux setenv -t eg PROTO_ACTOR_ENV $(env)
tmux split-window -t "eg:0" -v
tmux split-window -t "eg:0.0" -h -p 66
tmux split-window -t "eg:0.1" -h -p 50
Expand All @@ -23,6 +25,10 @@ start-with-etcd:
make start cp=etcd


debug:
PROTO_ACTOR_ENV=dev make start cp=etcd


mock-clients-10w:
make start cp=etcd clients=100000 loops=10 interval=100ms

Expand Down
14 changes: 6 additions & 8 deletions _examples/cluster-restartgracefully/go.mod
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
module cluster-restartgracefully

go 1.13
go 1.14

replace github.com/AsynkronIT/protoactor-go => ../..
replace (
github.com/AsynkronIT/protoactor-go => ../..
go.etcd.io/bbolt => github.com/coreos/bbolt v1.3.5
google.golang.org/grpc => google.golang.org/grpc v1.26.0
)

require (
github.com/AsynkronIT/goconsole v0.0.0-20160504192649-bfa12eebf716
github.com/AsynkronIT/protoactor-go v0.0.0-00010101000000-000000000000
github.com/cespare/xxhash v1.1.0 // indirect
github.com/coreos/etcd v3.3.25+incompatible // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/go-redis/redis v6.15.9+incompatible
github.com/gogo/protobuf v1.3.1
go.uber.org/zap v1.16.0 // indirect
google.golang.org/grpc v1.25.1
)
1 change: 0 additions & 1 deletion _examples/cluster-restartgracefully/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ var (
)

func main() {
cluster.SetLogLevel(log.InfoLevel)
var provider = flag.String("provider", "consul", "clients count.")
var actorTTL = flag.Duration("ttl", 10*time.Second, "time to live of actor.")
var port = flag.Int("port", 0, "listen port.")
Expand Down
2 changes: 1 addition & 1 deletion cluster/etcd/etcd_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

var (
plog = log.New(log.InfoLevel, "[CLUSTER/ETCD]")
plog = log.New(log.DefaultLevel, "[CLU/ETCD]")
)

type Provider struct {
Expand Down
2 changes: 1 addition & 1 deletion cluster/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

var (
plog = log.New(log.InfoLevel, "[CLUSTER]")
plog = log.New(log.DefaultLevel, "[CLUSTER]")
)

// SetLogLevel sets the log level for the logger.
Expand Down
4 changes: 2 additions & 2 deletions cluster/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func newPartitionActor(p *partitionValue, kind string) actor.Producer {
func (state *partitionActor) Receive(context actor.Context) {
switch msg := context.Message().(type) {
case *actor.Started:
plog.Info("Started", log.String("kind", state.kind), log.String("id", context.Self().Id))
plog.Debug("Started", log.String("kind", state.kind), log.String("id", context.Self().Id))
case *remote.ActorPidRequest:
state.spawn(msg, context)
case *actor.Terminated:
Expand Down Expand Up @@ -270,7 +270,7 @@ func (state *partitionActor) memberLeft(msg *MemberLeftEvent, context actor.Cont
}

func (state *partitionActor) memberJoined(msg *MemberJoinedEvent, context actor.Context) {
plog.Info("Member joined", log.String("kind", state.kind), log.String("name", msg.Name()))
plog.Debug("Member joined", log.String("kind", state.kind), log.String("name", msg.Name()))
for actorID := range state.partition {
address := state.partitionValue.cluster.MemberList.getPartitionMember(actorID, state.kind)
if address != "" && address != state.partitionValue.cluster.ActorSystem.Address() {
Expand Down
4 changes: 2 additions & 2 deletions cluster/partition_identity_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (p *partitionIdentityActor) Receive(ctx actor.Context) {
}

func (p *partitionIdentityActor) onStart(ctx actor.Context) {
plog.Info("Started PartitionIdentity", p.logPartition)
plog.Debug("Started PartitionIdentity", p.logPartition)
p.lastEventTimestamp = time.Now()
p.self = ctx.Self()
}
Expand Down Expand Up @@ -169,9 +169,9 @@ func (p *partitionIdentityActor) handleClusterTopology(msg *ClusterTopology, ctx
}
wg.Wait()
plog.Info("Updated ClusterTopology",
log.String("kind", p.partitionKind.Kind),
log.Uint64("eventId", msg.EventId),
log.Int("members", len(msg.Members)),
log.String("kind", p.partitionKind.Kind),
log.Duration("cost", time.Since(now)))
return
}
Expand Down
4 changes: 3 additions & 1 deletion cluster/partition_kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ func (pm *PartitionKind) start(_chash chash.ConsistentHash) error {
return err
}
}
plog.Info("Started partition", log.String("kind", pm.Kind))

address := pm.identity.PID().GetAddress()
plog.Info("Started Partition", log.String("kind", pm.Kind), log.String("address", address))
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion cluster/partition_placement_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (p *partitionPlacementActor) Receive(ctx actor.Context) {
}

func (p *partitionPlacementActor) onStart(ctx actor.Context) {
plog.Info("Started PartitionPlacement", p.logPartition)
plog.Debug("Started PartitionPlacement", p.logPartition)
p.self = ctx.Self()
}

Expand Down
16 changes: 14 additions & 2 deletions log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"
)

// Level of log.
type Level int32

const (
Expand All @@ -17,6 +18,7 @@ const (
WarnLevel
ErrorLevel
OffLevel
DefaultLevel
)

var (
Expand All @@ -34,8 +36,18 @@ type Logger struct {
enableCaller bool
}

// New a Logger
func New(level Level, prefix string, context ...Field) *Logger {
return &Logger{level: level, prefix: prefix, context: context}
opts := Current
if level == DefaultLevel {
level = opts.logLevel
}
return &Logger{
level: level,
prefix: prefix,
context: context,
enableCaller: opts.enableCaller,
}
}

func (l *Logger) WithCaller() *Logger {
Expand Down Expand Up @@ -83,7 +95,7 @@ func (l *Logger) newEvent(msg string, level Level, fields ...Field) Event {
Fields: fields,
}
if l.enableCaller {
ev.Caller = newCallerInfo(2)
ev.Caller = newCallerInfo(3)
}
return ev
}
Expand Down
79 changes: 66 additions & 13 deletions log/options.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,77 @@
package log

type optionFn func()
import (
"os"
)

var (
Development = &Options{
logLevel: DebugLevel,
enableCaller: true,
}

Production = &Options{
logLevel: InfoLevel,
enableCaller: false,
}

Current = Production
)

func init() {
env := os.Getenv("PROTO_ACTOR_ENV")
switch env {
case "dev":
Current = Development
case "prod":
Current = Production
default:
Current = Production
}
}

// Options for log.
type Options struct {
logLevel Level
enableCaller bool
}

// Setup is used to configure the log system
func (o *Options) With(opts ...option) *Options {
cloned := *o
for _, opt := range opts {
opt(&cloned)
}
return &cloned
}

type option func(*Options)

// WithEventSubscriber option replaces the default Event subscriber with fn.
//
// Specifying nil will disable logging of events.
func WithEventSubscriber(fn func(evt Event)) optionFn {
return func() {
if sub != nil {
Unsubscribe(sub)
}
if fn != nil {
sub = Subscribe(fn)
}
func WithEventSubscriber(fn func(evt Event)) option {
return func(opts *Options) {
resetEventSubscriber(fn)
}
}

// SetOptions is used to configure the log system
func SetOptions(opts ...optionFn) {
for _, opt := range opts {
opt()
// WithCaller option will print the file name and line number.
func WithCaller(enabled bool) option {
return func(opts *Options) {
opts.enableCaller = enabled
}
}

func WithDefaultLevel(level Level) option {
if level == DefaultLevel {
level = InfoLevel
}
return func(opts *Options) {
opts.logLevel = level
}
}

func SetOptions(opts ...option) {
Current = Current.With(opts...)
}
31 changes: 25 additions & 6 deletions log/string_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,20 @@ var (

func init() {
l := &ioLogger{c: make(chan Event, 100), out: os.Stderr}
sub = Subscribe(func(evt Event) {
resetEventSubscriber(func(evt Event) {
l.c <- evt
})
go l.listenEvent()
}

func resetEventSubscriber(f func(evt Event)) {
if sub != nil {
Unsubscribe(sub)
sub = nil
}
sub = Subscribe(f)
}

func (l *ioLogger) listenEvent() {
for true {
e := <-l.c
Expand Down Expand Up @@ -87,14 +95,25 @@ func (l *ioLogger) formatHeader(buf *bytes.Buffer, prefix string, t time.Time, l
buf.WriteByte('\t')
}

func (l *ioLogger) formatCaller(buf *bytes.Buffer, caller *CallerInfo) {
fname := caller.ShortFileName()
buf.WriteString(fname)
buf.WriteByte(':')
buf.WriteString(strconv.Itoa(caller.line))
if v := (32 - len(fname)); v > 16 {
buf.Write([]byte{'\t', '\t', '\t'})
} else if v > 8 {
buf.Write([]byte{'\t', '\t'})
} else {
buf.WriteByte('\t')
}
}

func (l *ioLogger) writeEvent(e Event) {
var buf = bytes.Buffer{}
l.formatHeader(&buf, e.Prefix, e.Time, e.Level)
if e.Caller.line > 0 {
buf.WriteString(e.Caller.ShortFileName())
buf.WriteByte(':')
buf.WriteString(strconv.Itoa(e.Caller.line))
buf.WriteByte('\t')
l.formatCaller(&buf, &e.Caller)
}
if len(e.Message) > 0 {
buf.WriteString(e.Message)
Expand Down Expand Up @@ -163,7 +182,7 @@ func (e ioEncoder) EncodeCaller(key string, val CallerInfo) {
fname := val.fname
idx := strings.LastIndexByte(fname, '/')
if idx >= len(fname) {
fname = fname
// fname = fname
} else {
fname = fname[idx+1:]
}
Expand Down

0 comments on commit b3cd691

Please sign in to comment.