From 06986c11658dd13ce68b7d08e69e46bafcde4bcd Mon Sep 17 00:00:00 2001 From: Taras Halturin Date: Wed, 1 Feb 2023 13:50:40 +0100 Subject: [PATCH 1/8] fixes #143 --- node/core.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/core.go b/node/core.go index adbcbc08..6d4ddfdf 100644 --- a/node/core.go +++ b/node/core.go @@ -433,7 +433,7 @@ func (c *core) deleteProcess(pid etf.Pid) { c.mutexNames.Unlock() c.mutexAliases.Lock() - for alias := range c.aliases { + for alias := range p.aliases { delete(c.aliases, alias) } c.mutexAliases.Unlock() From 98866857d6b83de185f2ad98f0c50259483db4ac Mon Sep 17 00:00:00 2001 From: Taras Halturin Date: Fri, 10 Feb 2023 00:14:41 +0100 Subject: [PATCH 2/8] update version --- version.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/version.go b/version.go index 03145a08..c2cc7875 100644 --- a/version.go +++ b/version.go @@ -1,7 +1,7 @@ package ergo const ( - Version = "2.2.0" // Ergo Framework version + Version = "2.2.2" // Ergo Framework version VersionPrefix = "ergo" // Prefix using for the full version name - VersionOTP int = 24 // Erlang version support + VersionOTP int = 25 // Erlang version support ) From 27f491c3d87c4171406b0a85982c3f0086206d47 Mon Sep 17 00:00:00 2001 From: Taras Halturin Date: Sat, 18 Feb 2023 10:57:27 +0100 Subject: [PATCH 3/8] gen.Pool implementation --- README.md | 1 + gen/pool.go | 129 +++++++++++++++++++++++++++++++++++++++++++++ gen/pool_worker.go | 84 +++++++++++++++++++++++++++++ node/core.go | 2 +- 4 files changed, 215 insertions(+), 1 deletion(-) create mode 100644 gen/pool.go create mode 100644 gen/pool_worker.go diff --git a/README.md b/README.md index 7cceaa89..94028168 100644 --- a/README.md +++ b/README.md @@ -224,6 +224,7 @@ See [https://github.com/ergo-services/examples](https://github.com/ergo-services * [gen.Application](https://github.com/ergo-services/examples/tree/master/application) * [gen.Supervisor](https://github.com/ergo-services/examples/tree/master/supervisor) * [gen.Server](https://github.com/ergo-services/examples/tree/master/genserver) +* [gen.Pool](https://github.com/ergo-services/examples/tree/master/genpool) * [gen.Stage](https://github.com/ergo-services/examples/tree/master/genstage) * [gen.Saga](https://github.com/ergo-services/examples/tree/master/gensaga) * [gen.Raft](https://github.com/ergo-services/examples/tree/master/genraft) diff --git a/gen/pool.go b/gen/pool.go new file mode 100644 index 00000000..57ef3513 --- /dev/null +++ b/gen/pool.go @@ -0,0 +1,129 @@ +package gen + +import ( + "fmt" + + "github.com/ergo-services/ergo/etf" + "github.com/ergo-services/ergo/lib" +) + +type PoolBehavior interface { + InitPool(process *PoolProcess, args ...etf.Term) (PoolOptions, error) +} + +type PoolProcess struct { + ServerProcess + options PoolOptions + workers []etf.Pid + monitors map[etf.Ref]int + i int +} + +type Pool struct { + Server +} + +type PoolOptions struct { + NumWorkers int + Worker PoolWorkerBehavior + WorkerOptions ProcessOptions + WorkerArgs []etf.Term +} + +func (p *Pool) Init(process *ServerProcess, args ...etf.Term) error { + behavior, ok := process.Behavior().(PoolBehavior) + if !ok { + return fmt.Errorf("Pool: not a PoolBehavior") + } + + pool := &PoolProcess{ + ServerProcess: *process, + monitors: make(map[etf.Ref]int), + } + + // do not inherit parent State + pool.State = nil + poolOptions, err := behavior.InitPool(pool, args...) + if err != nil { + return err + } + + poolOptions.WorkerOptions.Context = process.Context() + pool.options = poolOptions + process.State = pool + + for i := 0; i < poolOptions.NumWorkers; i++ { + w, err := process.Spawn("", poolOptions.WorkerOptions, poolOptions.Worker, + poolOptions.WorkerArgs...) + if err != nil { + return err + } + + pool.workers = append(pool.workers, w.Self()) + ref := process.MonitorProcess(w.Self()) + pool.monitors[ref] = i + } + + return nil +} + +func (p *Pool) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus) { + pool := process.State.(*PoolProcess) + msg := workerCallMessage{ + from: from, + message: message, + } + if err := p.send(pool, msg); err != nil { + lib.Warning("Pool (HandleCall): all workers are busy. Message dropped") + } + return nil, ServerStatusIgnore +} +func (p *Pool) HandleCast(process *ServerProcess, message etf.Term) ServerStatus { + pool := process.State.(*PoolProcess) + msg := workerCastMessage{ + message: message, + } + if err := p.send(pool, msg); err != nil { + lib.Warning("Pool (HandleCast): all workers are busy. Message dropped") + } + return ServerStatusOK +} +func (p *Pool) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus { + pool := process.State.(*PoolProcess) + switch m := message.(type) { + case MessageDown: + // worker terminated. restart it + + i, exist := pool.monitors[m.Ref] + if exist == false { + break + } + delete(pool.monitors, m.Ref) + w, err := process.Spawn("", pool.options.WorkerOptions, pool.options.Worker, + pool.options.WorkerArgs...) + if err != nil { + panicMessage := fmt.Sprintf("Pool: can't restart worker - %s", err) + panic(panicMessage) + } + pool.workers[i] = w.Self() + return ServerStatusOK + } + + if err := p.send(pool, message); err != nil { + lib.Warning("Pool (HandleInfo): all workers are busy. Message dropped") + } + + return ServerStatusOK +} + +func (p *Pool) send(pool *PoolProcess, message etf.Term) error { + for retry := 0; retry < pool.options.NumWorkers; retry++ { + pool.i++ + worker := pool.workers[pool.i%pool.options.NumWorkers] + if err := pool.Send(worker, message); err == nil { + return ServerStatusOK + } + } + + return fmt.Errorf("error") +} diff --git a/gen/pool_worker.go b/gen/pool_worker.go new file mode 100644 index 00000000..c481d422 --- /dev/null +++ b/gen/pool_worker.go @@ -0,0 +1,84 @@ +package gen + +import ( + "fmt" + + "github.com/ergo-services/ergo/etf" + "github.com/ergo-services/ergo/lib" +) + +type workerCastMessage struct { + message etf.Term +} +type workerCallMessage struct { + from ServerFrom + message etf.Term +} + +type PoolWorkerBehavior interface { + ServerBehavior + InitPoolWorker(process *PoolWorkerProcess, args ...etf.Term) error + HandleWorkerInfo(process *PoolWorkerProcess, message etf.Term) + HandleWorkerCast(process *PoolWorkerProcess, message etf.Term) + HandleWorkerCall(process *PoolWorkerProcess, message etf.Term) etf.Term +} + +type PoolWorkerProcess struct { + ServerProcess +} + +type PoolWorker struct { + Server +} + +func (pw *PoolWorker) Init(process *ServerProcess, args ...etf.Term) error { + behavior, ok := process.Behavior().(PoolWorkerBehavior) + + if !ok { + return fmt.Errorf("Pool: not a PoolWorkerBehavior") + } + + worker := &PoolWorkerProcess{ + ServerProcess: *process, + } + + worker.State = nil + + if err := behavior.InitPoolWorker(worker, args...); err != nil { + return err + } + + process.State = worker + return nil +} + +func (pw *PoolWorker) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus { + worker := process.State.(*PoolWorkerProcess) + behavior := worker.Behavior().(PoolWorkerBehavior) + switch m := message.(type) { + case workerCallMessage: + result := behavior.HandleWorkerCall(worker, m.message) + process.SendReply(m.from, result) + case workerCastMessage: + behavior.HandleWorkerCast(worker, m.message) + default: + behavior.HandleWorkerInfo(worker, message) + } + return ServerStatusOK +} + +// HandleWorkerInfo +func (pw *PoolWorker) HandleWorkerInfo(process *PoolWorkerProcess, message etf.Term) { + lib.Warning("HandleWorkerInfo: unhandled message %#v", message) +} + +// HandleWorkerCast +func (pw *PoolWorker) HandleWorkerCast(process *PoolWorkerProcess, message etf.Term) { + lib.Warning("HandleWorkerCast: unhandled message %#v", message) +} + +// HandleWorkerCall +func (pw *PoolWorker) HandleWorkerCall(process *PoolWorkerProcess, message etf.Term) etf.Term { + lib.Warning("HandleWorkerCall: unhandled message %#v", message) + return nil +} diff --git a/node/core.go b/node/core.go index 6d4ddfdf..3d9a515c 100644 --- a/node/core.go +++ b/node/core.go @@ -433,7 +433,7 @@ func (c *core) deleteProcess(pid etf.Pid) { c.mutexNames.Unlock() c.mutexAliases.Lock() - for alias := range p.aliases { + for _, alias := range p.aliases { delete(c.aliases, alias) } c.mutexAliases.Unlock() From eb33ae94e5a5c6bdc225bf98410df8cc2608d529 Mon Sep 17 00:00:00 2001 From: Taras Halturin Date: Sat, 18 Feb 2023 11:10:55 +0100 Subject: [PATCH 4/8] update gen/README.md --- gen/README.md | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/gen/README.md b/gen/README.md index d2967f9f..dc564ac8 100644 --- a/gen/README.md +++ b/gen/README.md @@ -4,41 +4,65 @@ ### Server Generic server behavior. +Example: [gen.Server](https://github.com/ergo-services/examples/tree/master/genserver) + ### Supervisor Generic supervisor behavior. A supervisor is responsible for starting, stopping, and monitoring its child processes. The basic idea of a supervisor is that it is to keep its child processes alive by restarting them when necessary. +Example: [gen.Supervisor](https://github.com/ergo-services/examples/tree/master/supervisor) + ### Application Generic application behavior. +Example: [gen.Application](https://github.com/ergo-services/examples/tree/master/application) + +### Pool + Generic pool of workers. + + This behavior implements a basic design pattern with a pool of workers. All messages/requests received by the pool process are forwarded to the workers using the "Round Robin" algorithm. The worker process is automatically restarting on termination. + +Example: [gen.Pool](https://github.com/ergo-services/examples/tree/master/genpool) + ### Web Web API Gateway behavior. - The Web API Gateway pattern is also sometimes known as the "Backend For Frontend" (BFF) because you build it while thinking about the needs of the client app. Therefore, BFF sits between the client apps and the microservices. It acts as a reverse proxy, routing requests from clients to services. Here is example [examples/genweb](/examples/genweb). + The Web API Gateway pattern is also sometimes known as the "Backend For Frontend" (BFF) because you build it while thinking about the needs of the client app. Therefore, BFF sits between the client apps and the microservices. It acts as a reverse proxy, routing requests from clients to services. + +Example: [gen.Web](https://github.com/ergo-services/examples/tree/master/genweb) ### TCP Socket acceptor pool for TCP protocols. This behavior aims to provide everything you need to accept TCP connections and process packets with a small code base and low latency while being easy to use. +Example: [gen.TCP](https://github.com/ergo-services/examples/tree/master/gentcp) + ### UDP UDP acceptor pool for UDP protocols This behavior provides the same feature set as TCP but for handling UDP packets using pool of handlers. +Example: [gen.UDP](https://github.com/ergo-services/examples/tree/master/genudp) + ### Stage Generic stage behavior (originated from Elixir's [GenStage](https://hexdocs.pm/gen_stage/GenStage.html)). -This is abstraction built on top of `gen.Server` to provide a simple way to create a distributed Producer/Consumer architecture, while automatically managing the concept of backpressure. This implementation is fully compatible with Elixir's GenStage. Example is here [examples/genstage](/examples/genstage) or just run `go run ./examples/genstage` to see it in action +This is abstraction built on top of `gen.Server` to provide a simple way to create a distributed Producer/Consumer architecture, while automatically managing the concept of backpressure. This implementation is fully compatible with Elixir's GenStage. + +Example: [gen.Stage](https://github.com/ergo-services/examples/tree/master/genstage) ### Saga Generic saga behavior. -It implements Saga design pattern - a sequence of transactions that updates each service state and publishes the result (or cancels the transaction or triggers the next transaction step). `gen.Saga` also provides a feature of interim results (can be used as transaction progress or as a part of pipeline processing), time deadline (to limit transaction lifespan), two-phase commit (to make distributed transaction atomic). Here is example [examples/gensaga](/examples/gensaga). +It implements Saga design pattern - a sequence of transactions that updates each service state and publishes the result (or cancels the transaction or triggers the next transaction step). `gen.Saga` also provides a feature of interim results (can be used as transaction progress or as a part of pipeline processing), time deadline (to limit transaction lifespan), two-phase commit (to make distributed transaction atomic). + +Example: [gen.Saga](https://github.com/ergo-services/examples/tree/master/gensaga) ### Raft Generic raft behavior. -It's improved implementation of [Raft consensus algorithm](https://raft.github.io). The key improvement is using quorum under the hood to manage the leader election process and make the Raft cluster more reliable. This implementation supports quorums of 3, 5, 7, 9, or 11 quorum members. Here is an example of this feature [examples/raft](/examples/genraft). +It's improved implementation of [Raft consensus algorithm](https://raft.github.io). The key improvement is using quorum under the hood to manage the leader election process and make the Raft cluster more reliable. This implementation supports quorums of 3, 5, 7, 9, or 11 quorum members. +Example: [gen.Raft](https://github.com/ergo-services/examples/tree/master/genraft) From 17f67161521e192d356ea90a170fd975eb12642f Mon Sep 17 00:00:00 2001 From: Taras Halturin Date: Fri, 24 Feb 2023 15:24:43 +0100 Subject: [PATCH 5/8] fixes #130 --- gen/application.go | 16 ++++++++-------- node/node.go | 18 +++++++++++++----- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/gen/application.go b/gen/application.go index f9acf48d..95762c71 100644 --- a/gen/application.go +++ b/gen/application.go @@ -18,21 +18,21 @@ const ( // ApplicationStartPermanent If a permanent application terminates, // all other applications and the runtime system (node) are also terminated. - ApplicationStartPermanent = "permanent" + ApplicationStartPermanent ApplicationStartType = "permanent" // ApplicationStartTemporary If a temporary application terminates, // this is reported but no other applications are terminated. - ApplicationStartTemporary = "temporary" + ApplicationStartTemporary ApplicationStartType = "temporary" // ApplicationStartTransient If a transient application terminates // with reason normal, this is reported but no other applications are // terminated. If a transient application terminates abnormally, that // is with any other reason than normal, all other applications and // the runtime system (node) are also terminated. - ApplicationStartTransient = "transient" + ApplicationStartTransient ApplicationStartType = "transient" - // EnvKeySpec - EnvKeySpec EnvKey = "ergo:AppSpec" + // EnvKeyAppSpec + EnvKeyAppSpec EnvKey = "ergo:AppSpec" ) // ApplicationBehavior interface @@ -78,13 +78,13 @@ type ApplicationInfo struct { // ProcessInit func (a *Application) ProcessInit(p Process, args ...etf.Term) (ProcessState, error) { - spec := p.Env(EnvKeySpec).(*ApplicationSpec) - spec, ok := p.Env(EnvKeySpec).(*ApplicationSpec) + spec := p.Env(EnvKeyAppSpec).(*ApplicationSpec) + spec, ok := p.Env(EnvKeyAppSpec).(*ApplicationSpec) if !ok { return ProcessState{}, fmt.Errorf("ProcessInit: not an ApplicationBehavior") } // remove variable from the env - p.SetEnv(EnvKeySpec, nil) + p.SetEnv(EnvKeyAppSpec, nil) p.SetTrapExit(true) diff --git a/node/node.go b/node/node.go index b93092d2..4ac5e0e8 100644 --- a/node/node.go +++ b/node/node.go @@ -287,14 +287,20 @@ func (n *node) ApplicationStartTransient(appName string, args ...etf.Term) (gen. return n.applicationStart(gen.ApplicationStartTransient, appName, args...) } -// ApplicationStart start Application with start type ApplicationStartTemporary +// ApplicationStartTemporary start Application with start type ApplicationStartTemporary // If an application terminates, this is reported but no other applications // are terminated -func (n *node) ApplicationStart(appName string, args ...etf.Term) (gen.Process, error) { +func (n *node) ApplicationStartTemporary(appName string, args ...etf.Term) (gen.Process, error) { return n.applicationStart(gen.ApplicationStartTemporary, appName, args...) } -func (n *node) applicationStart(startType, appName string, args ...etf.Term) (gen.Process, error) { +// ApplicationStart start Application with start type defined in the gen.ApplicationSpec.StartType +// on the loading application +func (n *node) ApplicationStart(appName string, args ...etf.Term) (gen.Process, error) { + return n.applicationStart("", appName, args...) +} + +func (n *node) applicationStart(startType gen.ApplicationStartType, appName string, args ...etf.Term) (gen.Process, error) { rb, err := n.RegisteredBehavior(appBehaviorGroup, appName) if err != nil { return nil, lib.ErrAppUnknown @@ -305,7 +311,9 @@ func (n *node) applicationStart(startType, appName string, args ...etf.Term) (ge return nil, lib.ErrAppUnknown } - spec.StartType = startType + if startType != "" { + spec.StartType = startType + } // to prevent race condition on starting application we should // make sure that nobodyelse starting it @@ -324,7 +332,7 @@ func (n *node) applicationStart(startType, appName string, args ...etf.Term) (ge } env := map[gen.EnvKey]interface{}{ - gen.EnvKeySpec: spec, + gen.EnvKeyAppSpec: spec, } options := gen.ProcessOptions{ Env: env, From 013a2d6fb9a15ce40b813533cbcf8b9fe21b3c46 Mon Sep 17 00:00:00 2001 From: Taras Halturin Date: Wed, 1 Mar 2023 10:24:24 +0100 Subject: [PATCH 6/8] Update README.md --- README.md | 67 +++++++++++++++++++++---------------------------------- 1 file changed, 25 insertions(+), 42 deletions(-) diff --git a/README.md b/README.md index 94028168..ce0b4915 100644 --- a/README.md +++ b/README.md @@ -19,36 +19,30 @@ The goal of this project is to leverage Erlang/OTP experience with Golang perfor ### Cloud ### -You can join your services made with Ergo Framework into a single cluster with transparent networking using our **Cloud Overlay Network** where they can connect to each other smoothly, no matter where they run - AWS, Azure or GCP, or anywhere else. All these connections are secured with end-to-end encryption. Read more in this article [https://blog.ergo.services/cloud-overlay-network-3a133d47efe5](https://blog.ergo.services/cloud-overlay-network-3a133d47efe5). +Distributed Cloud is coming. With Ergo Framework you can join your services into a single cluster with transparent networking using our **Cloud Overlay Network** where they can connect to each other smoothly, no matter where they run - AWS, Azure or GCP, or anywhere else. All these connections are secured with end-to-end encryption. Read more in this article [https://blog.ergo.services/cloud-overlay-network-3a133d47efe5](https://blog.ergo.services/cloud-overlay-network-3a133d47efe5). ### Features ### ![image](https://user-images.githubusercontent.com/118860/113710255-c57d5500-96e3-11eb-9970-20f49008a990.png) -* Support Erlang 25 (including [Alias](https://blog.erlang.org/My-OTP-24-Highlights/#eep-53-process-aliases) and [Remote Spawn](https://blog.erlang.org/OTP-23-Highlights/#distributed-spawn-and-the-new-erpc-module) features) +* Support Erlang 25 - allows you connect your node to (and accept connection from) any Erlang/Elixir node within a cluster * Spawn Erlang-like processes * Register/unregister processes with simple atom -* `gen.Server` behavior support (with atomic state) -* `gen.Supervisor` behavior support with all known [restart strategies](https://erlang.org/doc/design_principles/sup_princ.html#restart-strategy) support - * One For One - * One For All - * Rest For One - * Simple One For One -* `gen.Application` behavior support with all known [starting types](https://erlang.org/doc/design_principles/applications.html#application-start-types) support - * Permanent - * Temporary - * Transient -* `gen.Stage` behavior support (originated from Elixir's [GenStage](https://hexdocs.pm/gen_stage/GenStage.html)). This is abstraction built on top of `gen.Server` to provide a simple way to create a distributed Producer/Consumer architecture, while automatically managing the concept of backpressure. This implementation is fully compatible with Elixir's GenStage. Example is here [examples/genstage](https://github.com/ergo-services/examples/tree/master/genstage) or just run `go run ./examples/genstage` to see it in action -* `gen.Saga` behavior support. It implements Saga design pattern - a sequence of transactions that updates each service state and publishes the result (or cancels the transaction or triggers the next transaction step). `gen.Saga` also provides a feature of interim results (can be used as transaction progress or as a part of pipeline processing), time deadline (to limit transaction lifespan), two-phase commit (to make distributed transaction atomic). Here is example [examples/gensaga](https://github.com/ergo-services/examples/tree/master/gensaga). -* `gen.Raft` behavior support. It's improved implementation of [Raft consensus algorithm](https://raft.github.io). The key improvement is using quorum under the hood to manage the leader election process and make the Raft cluster more reliable. This implementation supports quorums of 3, 5, 7, 9, or 11 quorum members. Here is an example of this feature [examples/genraft](https://github.com/ergo-services/examples/tree/master/genraft). -* Connect to (accept connection from) any Erlang/Elixir node within a cluster -* Making sync request `ServerProcess.Call`, async - `ServerProcess.Cast` or `Process.Send` in fashion of `gen_server:call`, `gen_server:cast`, `erlang:send` accordingly -* Monitor processes/nodes, local/remote -* Link processes local/remote -* RPC callbacks support -* [embedded EPMD](#epmd) (in order to get rid of erlang' dependencies) -* Unmarshalling terms into the struct using `etf.TermIntoStruct`, `etf.TermProplistIntoStruct` or to the string using `etf.TermToString` -* Custom marshaling/unmarshaling via `Marshal` and `Unmarshal` interfaces +* Set of ready-to-use disign patterns (behaviors) + * `gen.Server` behavior with atomic state and Erlang's gen_server support to make sync request `ServerProcess.Call`, async - `ServerProcess.Cast` or `Process.Send` in fashion of `gen_server:call`, `gen_server:cast`, `erlang:send` accordingly + * `gen.Supervisor` behavior with all known [restart strategies](https://erlang.org/doc/design_principles/sup_princ.html#restart-strategy) (One For One, One For All, Rest For One, Simple One For One) + * `gen.Application` behavior with all known [starting types](https://erlang.org/doc/design_principles/applications.html#application-start-types) (Permanent, Temporary, Transient) + * `gen.Pool` a basic design pattern with a pool of workers. All messages/requests received by the pool process are forwarded to the workers using the "Round Robin" algorithm. The worker process is automatically restarting on termination + * `gen.TCP` - socket acceptor pool for TCP protocols. This behavior aims to provide everything you need to accept TCP connections and process packets with a small code base and low latency while being easy to use. + * `gen.UDP` - acceptor pool for UDP protocols. This behavior provides the same feature set as TCP but for handling UDP packets using pool of handlers + * `gen.Web` - Web API Gateway behavior. This behavior allows you to listen HTTP port and handle HTTP-request using pool of workers. + * `gen.Stage` behavior support (originated from Elixir's [GenStage](https://hexdocs.pm/gen_stage/GenStage.html)). This is abstraction built on top of `gen.Server` to provide a simple way to create a distributed Producer/Consumer architecture, while automatically managing the concept of backpressure. This implementation is fully compatible with Elixir's GenStage. Example is here [examples/genstage](https://github.com/ergo-services/examples/tree/master/genstage) or just run `go run ./examples/genstage` to see it in action + * `gen.Saga` behavior support. It implements Saga design pattern - a sequence of transactions that updates each service state and publishes the result (or cancels the transaction or triggers the next transaction step). `gen.Saga` also provides a feature of interim results (can be used as transaction progress or as a part of pipeline processing), time deadline (to limit transaction lifespan), two-phase commit (to make distributed transaction atomic). Here is example [examples/gensaga](https://github.com/ergo-services/examples/tree/master/gensaga). + * `gen.Raft` behavior support. It's improved implementation of [Raft consensus algorithm](https://raft.github.io). The key improvement is using quorum under the hood to manage the leader election process and make the Raft cluster more reliable. This implementation supports quorums of 3, 5, 7, 9, or 11 quorum members. Here is an example of this feature [examples/genraft](https://github.com/ergo-services/examples/tree/master/genraft) +* Monitor processes/nodes, local/remote with Erlang support +* Link processes local/remote with Erlang support +* [embedded EPMD](#epmd) (in order to get rid of erlang' dependencies) with Erlang support +* Unmarshalling terms into the struct using `etf.TermIntoStruct`, `etf.TermProplistIntoStruct` or to the string using `etf.TermToString` including custom marshaling/unmarshaling via `Marshal` and `Unmarshal` interfaces. But it's highly recommended to use `etf.RegisterType` so you will be receiving messages in a native Golang-type * Encryption (TLS 1.3) support (including autogenerating self-signed certificates) * Compression support (with customization of compression level and threshold). It can be configured for the node or a particular process. * Proxy support with end-to-end encryption, includeing compression/fragmentation/linking/monitoring features. @@ -67,6 +61,13 @@ Golang introduced [v2 rule](https://go.dev/blog/v2-go-modules) a while ago to so Here are the changes of latest release. For more details see the [ChangeLog](ChangeLog.md) +#### [v2.2.2](https://github.com/ergo-services/ergo/releases/tag/v1.999.222) 2023-03-01 [tag version v1.999.222] #### + +* Introduced `gen.Pool`. This behavior implements a basic design pattern with a pool of workers. All messages/requests received by the pool process are forwarded to the workers using the "Round Robin" algorithm. The worker process is automatically restarting on termination. See example here [examples/genpool](https://github.com/ergo-services/examples/tree/master/genpool) +* Removed Erlang RPC support. A while ago Erlang has changed the way of handling this kind of request making this feature more similar to the regular `gen.Server`. So, there is no reason to keep supporting it. Use a regular way of messaging instead - `gen.Server`. +* Fixed issue #130 (`StartType` option in `gen.ApplicationSpec` is ignored for the autostarting applications) +* Fixed issue #143 (incorrect cleaning up the aliases belonging to the terminated process) + #### [v2.2.1](https://github.com/ergo-services/ergo/releases/tag/v1.999.221) 2023-02-01 [tag version v1.999.221] #### * Now you can join your services made with Ergo Framework into a single cluster with transparent networking using our **Cloud Overlay Network** where they can connect to each other smoothly, no matter where they run - AWS, Azure or GCP, or anywhere else. All these connections are secured with end-to-end encryption. Read more in this article [https://blog.ergo.services/cloud-overlay-network-3a133d47efe5](https://blog.ergo.services/cloud-overlay-network-3a133d47efe5). Here is an example of this feature in action [examples/cloud](https://github.com/ergo-services/examples/tree/master/cloud) @@ -81,24 +82,6 @@ Here are the changes of latest release. For more details see the [ChangeLog](Cha * Fixed #117 (incorrect hanshake process finalization) * Fixed #139 (panic of the gen.Stage partition dispatcher) -#### [v2.2.0](https://github.com/ergo-services/ergo/releases/tag/v1.999.220) 2022-10-18 [tag version v1.999.220] #### - -* Introduced `gen.Web` behavior. It implements **Web API Gateway pattern** is also sometimes known as the "Backend For Frontend" (BFF). See example [examples/genweb](https://github.com/ergo-services/examples/tree/master/genweb) -* Introduced `gen.TCP` behavior - **socket acceptor pool for TCP protocols**. It provides everything you need to accept TCP connections and process packets with a small code base and low latency. Here is simple example [examples/gentcp](https://github.com/ergo-services/examples/tree/master/gentcp) -* Introduced `gen.UDP` - the same as `gen.TCP`, but for UDP protocols. Example is here [examples/genudp](https://github.com/ergo-services/examples/tree/master/genudp) -* Introduced **Events**. This is a simple pub/sub feature within a node - any `gen.Process` can become a producer by registering a new event `gen.Event` using method `gen.Process.RegisterEvent`, while the others can subscribe to these events using `gen.Process.MonitorEvent`. Subscriber process will also receive `gen.MessageEventDown` if a producer process went down (terminated). This feature behaves in a monitor manner but only works within a node. You may also want to subscribe to a system event - `node.EventNetwork` to receive event notification on connect/disconnect any peers. Here is simple example of this feature [examples/events](https://github.com/ergo-services/examples/tree/master/events) -* Introduced **Cloud Client** - allows connecting to the cloud platform [https://ergo.sevices](https://ergo.services). You may want to register your email there, and we will inform you about the platform launch day -* Introduced **type registration** for the ETF encoding/decoding. This feature allows you to get rid of manually decoding with `etf.TermIntoStruct` for the receiving messages. Register your type using `etf.RegisterType(...)`, and you will be receiving messages in a native type -* Predefined set of errors has moved to the `lib` package -* Updated `gen.ServerBehavior.HandleDirect` method (got extra argument `etf.Ref` to distinguish the requests). This change allows you to handle these requests asynchronously using method `gen.ServerProcess.Reply(...)` -* Updated `node.Options`. Now it has field `Listeners` (type `node.Listener`). It allows you to start any number of listeners with custom options - `Port`, `TLS` settings, or custom `Handshake`/`Proto` interfaces -* Fixed build on 32-bit arch -* Fixed freezing on ARM arch #102 -* Fixed problem with encoding negative int8 -* Fixed #103 (there was an issue on interop with Elixir's GenStage) -* Fixed node stuck on start if it uses the name which is already taken in EPMD -* Fixed incorrect `gen.ProcessOptions.Context` handling - ### Benchmarks ### Here is simple EndToEnd test demonstrates performance of messaging subsystem @@ -308,4 +291,4 @@ is your company using Ergo? add your company logo/name here ### Commercial support -please, visit https://ergo.services for more information +please, contact ceo@ergo.services for more information From 157209a51e96f0fdf98021dad5178542c6ef1fde Mon Sep 17 00:00:00 2001 From: Taras Halturin Date: Wed, 1 Mar 2023 10:29:32 +0100 Subject: [PATCH 7/8] Update ChangeLog.md --- ChangeLog.md | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/ChangeLog.md b/ChangeLog.md index dda309c9..8020192a 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file. This format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +#### [v2.2.2](https://github.com/ergo-services/ergo/releases/tag/v1.999.222) 2023-03-01 [tag version v1.999.222] #### + +* Introduced `gen.Pool`. This behavior implements a basic design pattern with a pool of workers. All messages/requests received by the pool process are forwarded to the workers using the "Round Robin" algorithm. The worker process is automatically restarting on termination. See example here [examples/genpool](https://github.com/ergo-services/examples/tree/master/genpool) +* Removed Erlang RPC support. A while ago Erlang has changed the way of handling this kind of request making this feature more similar to the regular `gen.Server`. So, there is no reason to keep supporting it. Use a regular way of messaging instead - `gen.Server`. +* Fixed issue #130 (`StartType` option in `gen.ApplicationSpec` is ignored for the autostarting applications) +* Fixed issue #143 (incorrect cleaning up the aliases belonging to the terminated process) + #### [v2.2.1](https://github.com/ergo-services/ergo/releases/tag/v1.999.221) 2023-01-18 [tag version v1.999.221] #### * Now you can join your services made with Ergo Framework into a single cluster with transparent networking using our **Cloud Overlay Network** where they can connect to each other smoothly, no matter where they run - AWS, Azure or GCP, or anywhere else. All these connections are secured with end-to-end encryption. Read more in this article [https://https://medium.com/@ergo-services/cloud-overlay-network](https://https://medium.com/@ergo-services/cloud-overlay-network). Here is an example of this feature in action [examples/cloud](https://github.com/ergo-services/examples/tree/master/cloud) @@ -106,7 +113,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * Fixed Atom Cache race conditioned issue #54 * Fixed ETF encoder issues #64 #66 -#### [1.2.0](https://github.com/ergo-services/ergo/releases/tag/v1.2.0) - 2021-04-07 #### +#### [v1.2.0](https://github.com/ergo-services/ergo/releases/tag/v1.2.0) - 2021-04-07 [tag version v1.2.0] #### * Added TLS support. Introduced new option `TLSmode` in `ergo.NodeOptions` with the following values: - `ergo.TLSmodeDisabled` default value. encryption is disabled @@ -126,14 +133,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * Fixed issue with termination linked processes * Fixed platform-dependent issues. Now Ergo Framework has tested and confirmed support of Linux, MacOS, Windows. -#### [1.1.0](https://github.com/ergo-services/ergo/releases/tag/1.1.0) - 2020-04-23 #### +#### [v1.1.0](https://github.com/ergo-services/ergo/releases/tag/v1.1.0) - 2020-04-23 [tag version v1.1.0] #### * Fragmentation support (which was introduced in Erlang/OTP 22) * Completely rewritten network subsystem (DIST/ETF). * Improved performance in terms of network messaging (outperforms original Erlang/OTP up to x5 times. See [Benchmarks](#benchmarks)) -#### [1.0.0](https://github.com/ergo-services/ergo/releases/tag/1.0.0) - 2020-03-03 #### -## There is a bunch of changes we deliver with this release +#### [v1.0.0](https://github.com/ergo-services/ergo/releases/tag/1.0.0) - 2020-03-03 [tag version 1.0.0] #### * We have changed the name - Ergo (or Ergo Framework). GitHub's repo has been renamed as well. We also created cloned repo `ergonode` to support users of @@ -149,7 +155,7 @@ the new one. * Improved ETF TermIntoStruct decoder * Improved code structure and readability -#### [0.2.0](https://github.com/ergo-services/ergo/releases/tag/0.2.0) - 2019-02-23 #### +#### [v0.2.0](https://github.com/ergo-services/ergo/releases/tag/0.2.0) - 2019-02-23 [tag version 0.2.0] #### * Now we make versioning releases * Improve node creation. Now you can specify the listening port range. See 'Usage' for details * Add embedded EPMD. Trying to start internal epmd service on starting ergonode. From 41f796140eee4259e9ce48aeb96dffb90a1cb820 Mon Sep 17 00:00:00 2001 From: Taras Halturin Date: Wed, 1 Mar 2023 10:37:47 +0100 Subject: [PATCH 8/8] remove Erlang RPC/Observer support --- apps/erlang/net_kernel.go | 140 -------------------- apps/erlang/observer_backend.go | 181 -------------------------- apps/erlang/rex.go | 223 -------------------------------- gen/server.go | 33 ----- gen/types.go | 11 -- node/node.go | 43 ------ node/types.go | 2 - tests/rpc_test.go | 138 -------------------- 8 files changed, 771 deletions(-) delete mode 100644 apps/erlang/observer_backend.go delete mode 100644 apps/erlang/rex.go delete mode 100644 tests/rpc_test.go diff --git a/apps/erlang/net_kernel.go b/apps/erlang/net_kernel.go index 9fbc9990..18e4e6aa 100644 --- a/apps/erlang/net_kernel.go +++ b/apps/erlang/net_kernel.go @@ -4,13 +4,10 @@ package erlang import ( "context" - "runtime" - "time" "github.com/ergo-services/ergo/etf" "github.com/ergo-services/ergo/gen" "github.com/ergo-services/ergo/lib" - "github.com/ergo-services/ergo/lib/osdep" ) func CreateApp() gen.ApplicationBehavior { @@ -56,14 +53,6 @@ func (nks *netKernelSup) Init(args ...etf.Term) (gen.SupervisorSpec, error) { Name: "global_name_server", Child: &globalNameServer{}, }, - { - Name: "rex", - Child: &rex{}, - }, - { - Name: "observer_backend", - Child: &observerBackend{}, - }, { Name: "erlang", Child: &erlang{}, @@ -93,140 +82,11 @@ func (nk *netKernel) Init(process *gen.ServerProcess, args ...etf.Term) error { // HandleCall func (nk *netKernel) HandleCall(process *gen.ServerProcess, from gen.ServerFrom, message etf.Term) (reply etf.Term, status gen.ServerStatus) { lib.Log("NET_KERNEL: HandleCall: %#v, From: %#v", message, from) - status = gen.ServerStatusOK - - switch t := (message).(type) { - case etf.Tuple: - if len(t) == 2 { - switch tag := t[0].(type) { - case etf.Atom: - if string(tag) == "is_auth" { - lib.Log("NET_KERNEL: is_auth: %#v", t[1]) - reply = etf.Atom("yes") - } - } - } - if len(t) == 5 { - switch t.Element(3) { - case etf.Atom("procs_info"): - // etf.Tuple{"spawn_link", "observer_backend", "procs_info", etf.List{etf.Pid{}}, etf.Pid{}} - sendTo := t.Element(4).(etf.List).Element(1).(etf.Pid) - go sendProcInfo(process, sendTo) - reply = process.Self() - case etf.Atom("fetch_stats"): - // etf.Tuple{"spawn_link", "observer_backend", "fetch_stats", etf.List{etf.Pid{}, 500}, etf.Pid{}} - sendTo := t.Element(4).(etf.List).Element(1).(etf.Pid) - period := t.Element(4).(etf.List).Element(2).(int) - if _, ok := nk.routinesCtx[sendTo]; ok { - reply = etf.Atom("error") - return - } - - process.MonitorProcess(sendTo) - ctx, cancel := context.WithCancel(process.Context()) - nk.routinesCtx[sendTo] = cancel - go sendStats(ctx, process, sendTo, period, cancel) - reply = process.Self() - } - } - - } return } // HandleInfo func (nk *netKernel) HandleInfo(process *gen.ServerProcess, message etf.Term) gen.ServerStatus { lib.Log("NET_KERNEL: HandleInfo: %#v", message) - switch m := message.(type) { - case gen.MessageDown: - if cancel, ok := nk.routinesCtx[m.Pid]; ok { - cancel() - delete(nk.routinesCtx, m.Pid) - } - } return gen.ServerStatusOK } - -func sendProcInfo(p gen.Process, to etf.Pid) { - list := p.ProcessList() - procsInfoList := etf.List{} - for i := range list { - info := list[i].Info() - // {procs_info, self(), etop_collect(Pids, [])} - procsInfoList = append(procsInfoList, - etf.Tuple{ - etf.Atom("etop_proc_info"), // record name #etop_proc_info - list[i].Self(), // pid - 0, // mem - 0, // reds - etf.Atom(list[i].Name()), // etf.Tuple{etf.Atom("ergo"), etf.Atom(list[i].Name()), 0}, // name - 0, // runtime - info.CurrentFunction, // etf.Tuple{etf.Atom("ergo"), etf.Atom(info.CurrentFunction), 0}, // cf - info.MessageQueueLen, // mq - }, - ) - - } - - procsInfo := etf.Tuple{ - etf.Atom("procs_info"), - p.Self(), - procsInfoList, - } - p.Send(to, procsInfo) - // erlang's observer waits for the EXIT message since this function was executed via spawn - p.Send(to, etf.Tuple{etf.Atom("EXIT"), p.Self(), etf.Atom("normal")}) -} - -func sendStats(ctx context.Context, p gen.Process, to etf.Pid, period int, cancel context.CancelFunc) { - var utime, utimetotal, stime, stimetotal int64 - defer cancel() - for { - - select { - case <-time.After(time.Duration(period) * time.Millisecond): - - runtime.ReadMemStats(&m) - - total := etf.Tuple{etf.Atom("total"), m.TotalAlloc} - system := etf.Tuple{etf.Atom("system"), m.HeapSys} - processes := etf.Tuple{etf.Atom("processes"), m.Alloc} - processesUsed := etf.Tuple{etf.Atom("processes_used"), m.HeapInuse} - atom := etf.Tuple{etf.Atom("atom"), 0} - atomUsed := etf.Tuple{etf.Atom("atom_used"), 0} - binary := etf.Tuple{etf.Atom("binary"), 0} - code := etf.Tuple{etf.Atom("code"), 0} - ets := etf.Tuple{etf.Atom("ets"), 0} - - utime, stime = osdep.ResourceUsage() - utimetotal += utime - stimetotal += stime - stats := etf.Tuple{ - etf.Atom("stats"), - 1, - etf.List{ - etf.Tuple{1, utime, utimetotal}, - etf.Tuple{2, stime, stimetotal}, - }, - etf.Tuple{ - etf.Tuple{etf.Atom("input"), 0}, - etf.Tuple{etf.Atom("output"), 0}, - }, - etf.List{ - total, - system, - processes, - processesUsed, - atom, - atomUsed, - binary, - code, - ets, - }, - } - p.Send(to, stats) - case <-ctx.Done(): - return - } - } -} diff --git a/apps/erlang/observer_backend.go b/apps/erlang/observer_backend.go deleted file mode 100644 index bb72644a..00000000 --- a/apps/erlang/observer_backend.go +++ /dev/null @@ -1,181 +0,0 @@ -package erlang - -// https://github.com/erlang/otp/blob/master/lib/observer/src/observer_procinfo.erl - -import ( - "runtime" - "unsafe" - - "github.com/ergo-services/ergo/etf" - "github.com/ergo-services/ergo/gen" - "github.com/ergo-services/ergo/lib" - "github.com/ergo-services/ergo/node" -) - -var m runtime.MemStats - -type observerBackend struct { - gen.Server -} - -// Init initializes process state using arbitrary arguments -func (o *observerBackend) Init(process *gen.ServerProcess, args ...etf.Term) error { - lib.Log("OBSERVER: Init: %#v", args) - - funProcLibInitialCall := func(a ...etf.Term) etf.Term { - return etf.Tuple{etf.Atom("proc_lib"), etf.Atom("init_p"), 5} - } - node := process.Env(node.EnvKeyNode).(node.Node) - node.ProvideRPC("proc_lib", "translate_initial_call", funProcLibInitialCall) - - funAppmonInfo := func(a ...etf.Term) etf.Term { - from := a[0] // pid - am, e := process.Spawn("", gen.ProcessOptions{}, &appMon{}, from) - if e != nil { - return etf.Tuple{etf.Atom("error")} - } - return etf.Tuple{etf.Atom("ok"), am.Self()} - } - node.ProvideRPC("appmon_info", "start_link2", funAppmonInfo) - - return nil -} - -// HandleCall -func (o *observerBackend) HandleCall(state *gen.ServerProcess, from gen.ServerFrom, message etf.Term) (etf.Term, gen.ServerStatus) { - lib.Log("OBSERVER: HandleCall: %v, From: %#v", message, from) - function := message.(etf.Tuple).Element(1).(etf.Atom) - // args := message.(etf.Tuple).Element(2).(etf.List) - switch function { - case etf.Atom("sys_info"): - //etf.Tuple{"call", "observer_backend", "sys_info", - // etf.List{}, etf.Pid{Node:"erl-examplenode@127.0.0.1", Id:0x46, Serial:0x0, Creation:0x2}} - reply := etf.Term(o.sysInfo(state.Process)) - return reply, gen.ServerStatusOK - case etf.Atom("get_table_list"): - // TODO: add here implementation if we decide to support ETS tables - // args should be like: - // etf.List{"ets", etf.List{etf.Tuple{"sys_hidden", "true"}, etf.Tuple{"unread_hidden", "true"}}} - reply := etf.Term(etf.List{}) - return reply, gen.ServerStatusOK - case etf.Atom("get_port_list"): - reply := etf.Term(etf.List{}) - return reply, gen.ServerStatusOK - } - - return "ok", gen.ServerStatusOK -} - -func (o *observerBackend) sysInfo(p gen.Process) etf.List { - // observer_backend:sys_info() - node := p.Env(node.EnvKeyNode).(node.Node) - processCount := etf.Tuple{etf.Atom("process_count"), len(p.ProcessList())} - processLimit := etf.Tuple{etf.Atom("process_limit"), 262144} - atomCount := etf.Tuple{etf.Atom("atom_count"), 0} - atomLimit := etf.Tuple{etf.Atom("atom_limit"), 1} - etsCount := etf.Tuple{etf.Atom("ets_count"), 0} - etsLimit := etf.Tuple{etf.Atom("ets_limit"), 1} - portCount := etf.Tuple{etf.Atom("port_count"), 0} - portLimit := etf.Tuple{etf.Atom("port_limit"), 1} - ut := node.Uptime() - uptime := etf.Tuple{etf.Atom("uptime"), ut * 1000} - runQueue := etf.Tuple{etf.Atom("run_queue"), 0} - ioInput := etf.Tuple{etf.Atom("io_input"), 0} - ioOutput := etf.Tuple{etf.Atom("io_output"), 0} - logicalProcessors := etf.Tuple{etf.Atom("logical_processors"), runtime.NumCPU()} - logicalProcessorsOnline := etf.Tuple{etf.Atom("logical_processors_online"), runtime.NumCPU()} - logicalProcessorsAvailable := etf.Tuple{etf.Atom("logical_processors_available"), runtime.NumCPU()} - schedulers := etf.Tuple{etf.Atom("schedulers"), 1} - schedulersOnline := etf.Tuple{etf.Atom("schedulers_online"), 1} - schedulersAvailable := etf.Tuple{etf.Atom("schedulers_available"), 1} - v := node.Version() - otpRelease := etf.Tuple{etf.Atom("otp_release"), v.OTP} - version := etf.Tuple{etf.Atom("version"), etf.Atom(v.Release)} - systemArchitecture := etf.Tuple{etf.Atom("system_architecture"), etf.Atom(runtime.GOARCH)} - kernelPoll := etf.Tuple{etf.Atom("kernel_poll"), true} - smpSupport := etf.Tuple{etf.Atom("smp_support"), true} - threads := etf.Tuple{etf.Atom("threads"), true} - threadsPoolSize := etf.Tuple{etf.Atom("threads_pool_size"), 1} - i := int(1) - wordsizeInternal := etf.Tuple{etf.Atom("wordsize_internal"), int(unsafe.Sizeof(i))} - wordsizeExternal := etf.Tuple{etf.Atom("wordsize_external"), int(unsafe.Sizeof(i))} - tmp := etf.Tuple{etf.Atom("instance"), 0, - etf.List{ - etf.Tuple{etf.Atom("mbcs"), etf.List{ - etf.Tuple{etf.Atom("blocks_size"), 1, 1, 1}, - etf.Tuple{etf.Atom("carriers_size"), 1, 1, 1}, - }}, - etf.Tuple{etf.Atom("sbcs"), etf.List{ - etf.Tuple{etf.Atom("blocks_size"), 0, 0, 0}, - etf.Tuple{etf.Atom("carriers_size"), 0, 0, 0}, - }}, - }} - - allocInfo := etf.Tuple{etf.Atom("alloc_info"), etf.List{ - etf.Tuple{etf.Atom("temp_alloc"), etf.List{tmp}}, - etf.Tuple{etf.Atom("sl_alloc"), etf.List{tmp}}, - etf.Tuple{etf.Atom("std_alloc"), etf.List{tmp}}, - etf.Tuple{etf.Atom("ll_alloc"), etf.List{tmp}}, - etf.Tuple{etf.Atom("eheap_alloc"), etf.List{tmp}}, - etf.Tuple{etf.Atom("ets_alloc"), etf.List{tmp}}, - etf.Tuple{etf.Atom("fix_alloc"), etf.List{tmp}}, - etf.Tuple{etf.Atom("literal_alloc"), etf.List{tmp}}, - etf.Tuple{etf.Atom("binary_alloc"), etf.List{tmp}}, - etf.Tuple{etf.Atom("driver_alloc"), etf.List{tmp}}, - }} - - // Meminfo = erlang:memory(). - runtime.ReadMemStats(&m) - - total := etf.Tuple{etf.Atom("total"), m.HeapAlloc} - system := etf.Tuple{etf.Atom("system"), m.HeapAlloc} - processes := etf.Tuple{etf.Atom("processes"), m.HeapAlloc} - processesUsed := etf.Tuple{etf.Atom("processes_used"), m.HeapAlloc} - atom := etf.Tuple{etf.Atom("atom"), 0} - atomUsed := etf.Tuple{etf.Atom("atom_used"), 0} - binary := etf.Tuple{etf.Atom("binary"), 0} - code := etf.Tuple{etf.Atom("code"), 0} - ets := etf.Tuple{etf.Atom("ets"), 0} - - info := etf.List{ - processCount, - processLimit, - atomCount, - atomLimit, - etsCount, - etsLimit, - portCount, - portLimit, - uptime, - runQueue, - ioInput, - ioOutput, - logicalProcessors, - logicalProcessorsOnline, - logicalProcessorsAvailable, - schedulers, - schedulersOnline, - schedulersAvailable, - otpRelease, - version, - systemArchitecture, - kernelPoll, - smpSupport, - threads, - threadsPoolSize, - wordsizeInternal, - wordsizeExternal, - allocInfo, - // Meminfo - total, - system, - processes, - processesUsed, - atom, - atomUsed, - binary, - code, - ets, - } - return info -} diff --git a/apps/erlang/rex.go b/apps/erlang/rex.go deleted file mode 100644 index 155ba371..00000000 --- a/apps/erlang/rex.go +++ /dev/null @@ -1,223 +0,0 @@ -package erlang - -// https://github.com/erlang/otp/blob/master/lib/kernel/src/rpc.erl - -import ( - "fmt" - - "github.com/ergo-services/ergo/etf" - "github.com/ergo-services/ergo/gen" - "github.com/ergo-services/ergo/lib" - "github.com/ergo-services/ergo/node" -) - -type modFun struct { - module string - function string -} - -var ( - allowedModFun = []string{ - "observer_backend", - } -) - -type rex struct { - gen.Server - // Keep methods in the object. Won't be lost if the process restarted for some reason - methods map[modFun]gen.RPC -} - -// Init -func (r *rex) Init(process *gen.ServerProcess, args ...etf.Term) error { - lib.Log("REX: Init: %#v", args) - // Do not overwrite existing methods if this process restarted - if r.methods == nil { - r.methods = make(map[modFun]gen.RPC, 0) - } - - for i := range allowedModFun { - mf := modFun{ - allowedModFun[i], - "*", - } - r.methods[mf] = nil - } - node := process.Env(node.EnvKeyNode).(node.Node) - node.ProvideRemoteSpawn("erpc", &erpc{}) - return nil -} - -// HandleCall -func (r *rex) HandleCall(process *gen.ServerProcess, from gen.ServerFrom, message etf.Term) (etf.Term, gen.ServerStatus) { - lib.Log("REX: HandleCall: %#v, From: %#v", message, from) - switch m := message.(type) { - case etf.Tuple: - //etf.Tuple{"call", "observer_backend", "sys_info", - // etf.List{}, etf.Pid{Node:"erl-examplenode@127.0.0.1", Id:0x46, Serial:0x0, Creation:0x2}} - switch m.Element(1) { - case etf.Atom("call"): - module := m.Element(2).(etf.Atom) - function := m.Element(3).(etf.Atom) - args := m.Element(4).(etf.List) - reply := r.handleRPC(process, module, function, args) - if reply != nil { - return reply, gen.ServerStatusOK - } - - to := gen.ProcessID{Name: string(module), Node: process.NodeName()} - m := etf.Tuple{m.Element(3), m.Element(4)} - reply, err := process.Call(to, m) - if err != nil { - reply = etf.Term(etf.Tuple{etf.Atom("error"), err}) - } - return reply, gen.ServerStatusOK - - } - } - - reply := etf.Term(etf.Tuple{etf.Atom("badrpc"), etf.Atom("unknown")}) - return reply, gen.ServerStatusOK -} - -// HandleInfo -func (r *rex) HandleInfo(process *gen.ServerProcess, message etf.Term) gen.ServerStatus { - // add this handler to suppres any messages from erlang - return gen.ServerStatusOK -} - -// HandleDirect -func (r *rex) HandleDirect(process *gen.ServerProcess, ref etf.Ref, message interface{}) (interface{}, gen.DirectStatus) { - switch m := message.(type) { - case gen.MessageManageRPC: - mf := modFun{ - module: m.Module, - function: m.Function, - } - // provide RPC - if m.Provide { - if _, ok := r.methods[mf]; ok { - return nil, lib.ErrTaken - } - r.methods[mf] = m.Fun - return nil, gen.DirectStatusOK - } - - // revoke RPC - if _, ok := r.methods[mf]; ok { - delete(r.methods, mf) - return nil, gen.DirectStatusOK - } - return nil, fmt.Errorf("unknown RPC name") - - default: - return nil, lib.ErrUnsupportedRequest - } -} - -func (r *rex) handleRPC(process *gen.ServerProcess, module, function etf.Atom, args etf.List) (reply interface{}) { - defer func() { - if x := recover(); x != nil { - err := fmt.Sprintf("panic reason: %s", x) - // recovered - reply = etf.Tuple{ - etf.Atom("badrpc"), - etf.Tuple{ - etf.Atom("EXIT"), - etf.Tuple{ - etf.Atom("panic"), - etf.List{ - etf.Tuple{module, function, args, etf.List{err}}, - }, - }, - }, - } - } - }() - mf := modFun{ - module: string(module), - function: string(function), - } - // calling dynamically declared rpc method - if function, ok := r.methods[mf]; ok { - return function(args...) - } - - // unknown request. return error - badRPC := etf.Tuple{ - etf.Atom("badrpc"), - etf.Tuple{ - etf.Atom("EXIT"), - etf.Tuple{ - etf.Atom("undef"), - etf.List{ - etf.Tuple{ - module, - function, - args, - etf.List{}, - }, - }, - }, - }, - } - // calling a local module if its been registered as a process) - if process.ProcessByName(mf.module) == nil { - return badRPC - } - - if value, err := process.Call(mf.module, args); err != nil { - return badRPC - } else { - return value - } -} - -type erpcMFA struct { - id etf.Ref - m etf.Atom - f etf.Atom - a etf.List -} -type erpc struct { - gen.Server -} - -// Init -func (e *erpc) Init(process *gen.ServerProcess, args ...etf.Term) error { - lib.Log("ERPC [%v]: Init: %#v", process.Self(), args) - mfa := erpcMFA{ - id: args[0].(etf.Ref), - m: args[1].(etf.Atom), - f: args[2].(etf.Atom), - a: args[3].(etf.List), - } - process.Cast(process.Self(), mfa) - return nil - -} - -// HandleCast -func (e *erpc) HandleCast(process *gen.ServerProcess, message etf.Term) gen.ServerStatus { - lib.Log("ERPC [%v]: HandleCast: %#v", process.Self(), message) - mfa := message.(erpcMFA) - rsr := process.Env("ergo:RemoteSpawnRequest").(gen.RemoteSpawnRequest) - - call := etf.Tuple{etf.Atom("call"), mfa.m, mfa.f, mfa.a} - value, _ := process.Call("rex", call) - - reply := etf.Tuple{ - etf.Atom("DOWN"), - rsr.Ref, - etf.Atom("process"), - process.Self(), - etf.Tuple{ - mfa.id, - etf.Atom("return"), - value, - }, - } - process.Send(rsr.From, reply) - - return gen.ServerStatusStop -} diff --git a/gen/server.go b/gen/server.go index a59211dd..6b8fca35 100644 --- a/gen/server.go +++ b/gen/server.go @@ -137,39 +137,6 @@ func (sp *ServerProcess) CallWithTimeout(to interface{}, message etf.Term, timeo } -// CallRPC evaluate rpc call with given node/MFA -func (sp *ServerProcess) CallRPC(node, module, function string, args ...etf.Term) (etf.Term, error) { - return sp.CallRPCWithTimeout(DefaultCallTimeout, node, module, function, args...) -} - -// CallRPCWithTimeout evaluate rpc call with given node/MFA and timeout -func (sp *ServerProcess) CallRPCWithTimeout(timeout int, node, module, function string, args ...etf.Term) (etf.Term, error) { - lib.Log("[%s] RPC calling: %s:%s:%s", sp.NodeName(), node, module, function) - - message := etf.Tuple{ - etf.Atom("call"), - etf.Atom(module), - etf.Atom(function), - etf.List(args), - sp.Self(), - } - to := ProcessID{"rex", node} - return sp.CallWithTimeout(to, message, timeout) -} - -// CastRPC evaluate rpc cast with given node/MFA -func (sp *ServerProcess) CastRPC(node, module, function string, args ...etf.Term) error { - lib.Log("[%s] RPC casting: %s:%s:%s", sp.NodeName(), node, module, function) - message := etf.Tuple{ - etf.Atom("cast"), - etf.Atom(module), - etf.Atom(function), - etf.List(args), - } - to := ProcessID{"rex", node} - return sp.Cast(to, message) -} - // SendReply sends a reply message to the sender made ServerProcess.Call request. // Useful for the case with dispatcher and pool of workers: Dispatcher process // forwards Call requests (asynchronously) within a HandleCall callback to the worker(s) diff --git a/gen/types.go b/gen/types.go index 473ebb63..b0b5adfe 100644 --- a/gen/types.go +++ b/gen/types.go @@ -403,17 +403,6 @@ type MessageFallback struct { Message etf.Term } -// RPC defines rpc function type -type RPC func(...etf.Term) etf.Term - -// MessageManageRPC is using to manage RPC feature provides by "rex" process -type MessageManageRPC struct { - Provide bool - Module string - Function string - Fun RPC -} - // MessageDirectChildren type intended to be used in Process.Children which returns []etf.Pid // You can handle this type of message in your HandleDirect callback to enable Process.Children // support for your gen.Server actor. diff --git a/node/node.go b/node/node.go index 4ac5e0e8..8ff7fda8 100644 --- a/node/node.go +++ b/node/node.go @@ -393,49 +393,6 @@ func (n *node) MonitoredBy(process etf.Pid) []etf.Pid { return n.processMonitoredBy(process) } -// ProvideRPC register given module/function as RPC method -func (n *node) ProvideRPC(module string, function string, fun gen.RPC) error { - lib.Log("[%s] RPC provide: %s:%s %#v", n.name, module, function, fun) - rex := n.ProcessByName("rex") - if rex == nil { - return fmt.Errorf("RPC is disabled") - } - - message := gen.MessageManageRPC{ - Provide: true, - Module: module, - Function: function, - Fun: fun, - } - if _, err := rex.Direct(message); err != nil { - return err - } - - return nil -} - -// RevokeRPC unregister given module/function -func (n *node) RevokeRPC(module, function string) error { - lib.Log("[%s] RPC revoke: %s:%s", n.name, module, function) - - rex := n.ProcessByName("rex") - if rex == nil { - return fmt.Errorf("RPC is disabled") - } - - message := gen.MessageManageRPC{ - Provide: false, - Module: module, - Function: function, - } - - if _, err := rex.Direct(message); err != nil { - return err - } - - return nil -} - // ProvideRemoteSpawn func (n *node) ProvideRemoteSpawn(name string, behavior gen.ProcessBehavior) error { return n.RegisterBehavior(remoteBehaviorGroup, name, behavior, nil) diff --git a/node/types.go b/node/types.go index 7809906a..0a1be51a 100644 --- a/node/types.go +++ b/node/types.go @@ -73,8 +73,6 @@ type Node interface { ApplicationStartTransient(appName string, args ...etf.Term) (gen.Process, error) ApplicationStop(appName string) error - ProvideRPC(module string, function string, fun gen.RPC) error - RevokeRPC(module, function string) error ProvideRemoteSpawn(name string, object gen.ProcessBehavior) error RevokeRemoteSpawn(name string) error diff --git a/tests/rpc_test.go b/tests/rpc_test.go deleted file mode 100644 index 12aa709b..00000000 --- a/tests/rpc_test.go +++ /dev/null @@ -1,138 +0,0 @@ -package tests - -import ( - "fmt" - "testing" - "time" - - "github.com/ergo-services/ergo" - "github.com/ergo-services/ergo/etf" - "github.com/ergo-services/ergo/gen" - "github.com/ergo-services/ergo/node" -) - -type testRPCGenServer struct { - gen.Server - res chan interface{} -} - -type testRPCCase1 struct { - node string - mod string - fun string - args []etf.Term -} - -func (trpc *testRPCGenServer) HandleCall(process *gen.ServerProcess, from gen.ServerFrom, message etf.Term) (etf.Term, gen.ServerStatus) { - return message, gen.ServerStatusOK -} - -func (trpc *testRPCGenServer) HandleInfo(process *gen.ServerProcess, message etf.Term) gen.ServerStatus { - switch m := message.(type) { - case testRPCCase1: - if v, e := process.CallRPC(m.node, m.mod, m.fun, m.args...); e != nil { - trpc.res <- e - } else { - trpc.res <- v - } - return gen.ServerStatusOK - - } - - return gen.ServerStatusStop -} -func TestRPC(t *testing.T) { - fmt.Printf("\n=== Test RPC\n") - - node1, _ := ergo.StartNode("nodeRPC@localhost", "cookies", node.Options{}) - gs1 := &testRPCGenServer{ - res: make(chan interface{}, 2), - } - - testFun1 := func(a ...etf.Term) etf.Term { - // return the last argument as a result - return a[len(a)-1] - } - - fmt.Printf("Registering RPC method 'testMod.testFun' on %s: ", node1.Name()) - time.Sleep(100 * time.Millisecond) // waiting for start 'rex' gen_server - if e := node1.ProvideRPC("testMod", "testFun", testFun1); e != nil { - t.Fatal(e) - } else { - fmt.Println("OK") - } - - node1gs1, _ := node1.Spawn("gs1", gen.ProcessOptions{}, gs1, nil) - - fmt.Printf("Call RPC method 'testMod.testFun' with 1 arg on %s: ", node1.Name()) - case1 := testRPCCase1{ - node: "nodeRPC@localhost", - mod: "testMod", - fun: "testFun", - args: []etf.Term{12345}, - } - if err := node1gs1.Send(node1gs1.Self(), case1); err != nil { - t.Fatal(err) - } - waitForResultWithValue(t, gs1.res, 12345) - - fmt.Printf("Call RPC method 'testMod.testFun' with 3 arg on %s: ", node1.Name()) - case1 = testRPCCase1{ - node: "nodeRPC@localhost", - mod: "testMod", - fun: "testFun", - args: []etf.Term{12345, 5.678, node1gs1.Self()}, - } - if err := node1gs1.Send(node1gs1.Self(), case1); err != nil { - t.Fatal(err) - } - waitForResultWithValue(t, gs1.res, node1gs1.Self()) - - fmt.Printf("Revoking RPC method 'testMod.testFun' on %s: ", node1.Name()) - if e := node1.RevokeRPC("testMod", "testFun"); e != nil { - t.Fatal(e) - } else { - fmt.Println("OK") - } - - fmt.Printf("Call revoked RPC method 'testMod.testFun' with 1 arg on %s: ", node1.Name()) - expected1 := etf.Tuple{etf.Atom("badrpc"), - etf.Tuple{etf.Atom("EXIT"), - etf.Tuple{etf.Atom("undef"), - etf.List{ - etf.Tuple{ - etf.Atom("testMod"), - etf.Atom("testFun"), - etf.List{12345}, etf.List{}}}}}} - case1 = testRPCCase1{ - node: "nodeRPC@localhost", - mod: "testMod", - fun: "testFun", - args: []etf.Term{12345}, - } - if err := node1gs1.Send(node1gs1.Self(), case1); err != nil { - t.Fatal(err) - } - waitForResultWithValue(t, gs1.res, expected1) - - fmt.Printf("Call RPC unknown method 'xxx.xxx' on %s: ", node1.Name()) - expected2 := etf.Tuple{etf.Atom("badrpc"), - etf.Tuple{etf.Atom("EXIT"), - etf.Tuple{etf.Atom("undef"), - etf.List{ - etf.Tuple{ - etf.Atom("xxx"), - etf.Atom("xxx"), - etf.List{12345}, etf.List{}}}}}} - case1 = testRPCCase1{ - node: "nodeRPC@localhost", - mod: "xxx", - fun: "xxx", - args: []etf.Term{12345}, - } - if err := node1gs1.Send(node1gs1.Self(), case1); err != nil { - t.Fatal(err) - } - waitForResultWithValue(t, gs1.res, expected2) - -}