Skip to content

Commit

Permalink
Merge 45f47b5 into 10bc198
Browse files Browse the repository at this point in the history
  • Loading branch information
rogeralsing committed Apr 29, 2021
2 parents 10bc198 + 45f47b5 commit 6414e1b
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 9 deletions.
8 changes: 7 additions & 1 deletion actor/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type ActorSystem struct {
Guardians *guardiansValue
DeadLetter *deadLetterProcess
Extensions *extensions.Extensions
Config *Config
}

func (as *ActorSystem) NewLocalPID(id string) *PID {
Expand All @@ -42,15 +43,20 @@ func (as *ActorSystem) GetHostPort() (host string, port int, err error) {
}

func NewActorSystem() *ActorSystem {
system := &ActorSystem{}
return NewActorSystemWithConfig(defaultActorSystemConfig())
}

func NewActorSystemWithConfig(config Config) *ActorSystem {
system := &ActorSystem{}
system.Config = &config
system.ProcessRegistry = NewProcessRegistry(system)
system.Root = NewRootContext(system, EmptyMessageHeader)
system.Guardians = NewGuardians(system)
system.EventStream = eventstream.NewEventStream()
system.DeadLetter = NewDeadLetter(system)
system.Extensions = extensions.NewExtensions()
SubscribeSupervision(system)
system.Extensions.Register(NewMetrics())

return system
}
52 changes: 52 additions & 0 deletions actor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package actor

import "time"

type Config struct {
DeadLetterThrottleInterval time.Duration //throttle deadletter logging after this interval
DeadLetterThrottleCount int //throttle deadletter logging after this count
DeadLetterRequestLogging bool //do not log deadletters with sender
DeveloperSupervisionLogging bool //console log and promote supervision logs to Warning level
DiagnosticsSerializer func(Actor) string //extract diagnostics from actor and return as string
}

func defaultActorSystemConfig() Config {
return Config{
DeadLetterThrottleInterval: time.Duration(0),
DeadLetterThrottleCount: 0,
DeadLetterRequestLogging: true,
DeveloperSupervisionLogging: false,
DiagnosticsSerializer: func(actor Actor) string {
return ""
},
}
}

func NewConfig() Config {
return defaultActorSystemConfig()
}

func (asc Config) WithDeadLetterThrottleInterval(duration time.Duration) Config {
asc.DeadLetterThrottleInterval = duration
return asc
}

func (asc Config) WithDeadLetterThrottleCount(count int) Config {
asc.DeadLetterThrottleCount = count
return asc
}

func (asc Config) WithDeadLetterRequestLogging(enabled bool) Config {
asc.DeadLetterRequestLogging = enabled
return asc
}

func (asc Config) WithDeveloperSupervisionLogging(enabled bool) Config {
asc.DeveloperSupervisionLogging = enabled
return asc
}

func (asc Config) WithDiagnosticsSerializer(serializer func(Actor) string) Config {
asc.DiagnosticsSerializer = serializer
return asc
}
6 changes: 6 additions & 0 deletions actor/deadletter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ func NewDeadLetter(actorSystem *ActorSystem) *deadLetterProcess {
actorSystem.ProcessRegistry.Add(dp, "deadletter")
_ = actorSystem.EventStream.Subscribe(func(msg interface{}) {
if deadLetter, ok := msg.(*DeadLetterEvent); ok {

//bail out if sender is set and deadletter request logging is false
if !actorSystem.Config.DeadLetterRequestLogging && deadLetter.Sender != nil {
return
}

plog.Debug("[DeadLetter]", log.Stringer("pid", deadLetter.PID), log.TypeOf("msg", deadLetter.Message), log.Stringer("sender", deadLetter.Sender))
// send back a response instead of timeout.
if deadLetter.Sender != nil {
Expand Down
32 changes: 32 additions & 0 deletions actor/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package actor

import "github.com/AsynkronIT/protoactor-go/extensions"

var extensionId = extensions.NextExtensionId()

type Metrics struct {
enabled bool
}

func (m *Metrics) Enabled() bool {
return m.enabled
}
func (m *Metrics) Id() extensions.ExtensionId {
return extensionId
}

func NewMetrics() *Metrics {
return &Metrics{}
}

//func (m *Metrics) NewGauge() {
//
//}
//
//func (m *Metrics) NewCounter() {
//
//}
//
//func (m *Metrics) NewHistogram() {
//
//}
22 changes: 14 additions & 8 deletions actor/protos.proto
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
syntax = "proto3";
package actor;

// import "google/protobuf/any.proto";
import "github.com/gogo/protobuf/gogoproto/gogo.proto";

option (gogoproto.gostring_all) = false;
Expand All @@ -13,15 +12,15 @@ message PID {
string id = 2;
}

// user messages
message PoisonPill {}
//user messages
message PoisonPill {
}

// instead of timeout when you request a unreachable PID.
message DeadLetterResponse {
PID Target = 1;
PID target = 1;
}

// system messages
//system messages
message Watch {
PID watcher = 1;
}
Expand All @@ -32,7 +31,14 @@ message Unwatch {

message Terminated {
PID who = 1;
bool address_terminated = 2;
TerminatedReason why = 2;
}

enum TerminatedReason {
Stopped = 0;
AddressTerminated = 1;
NotFound = 2;
}

message Stop {}
message Stop {
}

0 comments on commit 6414e1b

Please sign in to comment.