Skip to content

Commit

Permalink
feat(plc4go/connection-pool): Added a PlcConnectionPool for go
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisdutz committed Jan 19, 2022
1 parent 9a061f6 commit c9c0374
Show file tree
Hide file tree
Showing 15 changed files with 1,806 additions and 308 deletions.
1 change: 1 addition & 0 deletions plc4go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/rs/zerolog v1.20.0
github.com/snksoft/crc v1.1.0
github.com/stretchr/testify v1.5.1
github.com/subchen/go-xmldom v1.1.2
github.com/tebeka/go2xunit v1.4.10 // indirect
github.com/viney-shih/go-lock v1.1.1
Expand Down
101 changes: 91 additions & 10 deletions plc4go/internal/plc4go/simulated/Connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/apache/plc4x/plc4go/internal/plc4go/spi"
_default "github.com/apache/plc4x/plc4go/internal/plc4go/spi/default"
internalModel "github.com/apache/plc4x/plc4go/internal/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/internal/plc4go/spi/utils"
"github.com/apache/plc4x/plc4go/pkg/plc4go"
"github.com/apache/plc4x/plc4go/pkg/plc4go/model"
"github.com/pkg/errors"
Expand All @@ -36,6 +37,8 @@ type Connection struct {
valueHandler spi.PlcValueHandler
options map[string][]string
connected bool
connectionId string
tracer *spi.Tracer
}

func NewConnection(device *Device, fieldHandler spi.PlcFieldHandler, valueHandler spi.PlcValueHandler, options map[string][]string) *Connection {
Expand All @@ -45,17 +48,43 @@ func NewConnection(device *Device, fieldHandler spi.PlcFieldHandler, valueHandle
valueHandler: valueHandler,
options: options,
connected: false,
connectionId: utils.GenerateId(4),
}
if traceEnabledOption, ok := options["traceEnabled"]; ok {
if len(traceEnabledOption) == 1 {
connection.tracer = spi.NewTracer(connection.connectionId)
}
}
return connection
}

func (c *Connection) GetConnectionId() string {
return c.connectionId
}

func (c *Connection) IsTraceEnabled() bool {
return c.tracer != nil
}

func (c *Connection) GetTracer() *spi.Tracer {
return c.tracer
}

func (c *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
ch := make(chan plc4go.PlcConnectionConnectResult)
go func() {
// Check if the connection was already connected
if c.connected {
if c.tracer != nil {
c.tracer.AddTrace("connect", "error: already connected")
}
// Return an error to the user.
ch <- _default.NewDefaultPlcConnectionConnectResult(c, errors.New("already connected"))
return
}
var txId string
if c.tracer != nil {
txId = c.tracer.AddTransactionalStartTrace("connect", "started")
}
if delayString, ok := c.options["connectionDelay"]; ok {
// This is the length of the array, not the string
Expand All @@ -66,10 +95,24 @@ func (c *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
}
}
}
// Mark the connection as "connected"
c.connected = true
// Return the connection in a connected state to the user.
ch <- _default.NewDefaultPlcConnectionConnectResult(c, nil)
// If we want the connection to fail, do so, otherwise return the connection.
if errorString, ok := c.options["connectionError"]; ok {
// If the ping operation should fail with an error, do so.
if len(errorString) == 1 {
ch <- _default.NewDefaultPlcConnectionConnectResult(c, errors.New(errorString[0]))
}
if c.tracer != nil {
c.tracer.AddTransactionalTrace(txId, "connect", "error: "+errorString[0])
}
} else {
// Mark the connection as "connected"
c.connected = true
if c.tracer != nil {
c.tracer.AddTransactionalTrace(txId, "connect", "success")
}
// Return the connection in a connected state to the user.
ch <- _default.NewDefaultPlcConnectionConnectResult(c, nil)
}
}()
return ch
}
Expand All @@ -81,11 +124,20 @@ func (c *Connection) BlockingClose() {
func (c *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
ch := make(chan plc4go.PlcConnectionCloseResult)
go func() {
// Check if the connection is connected
// Check if the connection is connected.
if !c.connected {
if c.tracer != nil {
c.tracer.AddTrace("close", "error: not connected")
}
// Return an error to the user.
ch <- _default.NewDefaultPlcConnectionCloseResult(c, errors.New("not connected"))
return
}
var txId string
if c.tracer != nil {
txId = c.tracer.AddTransactionalStartTrace("close", "started")
}
// If a delay was configured, wait for the pre-configured time.
if delayString, ok := c.options["closingDelay"]; ok {
// This is the length of the array, not the string
if len(delayString) == 1 {
Expand All @@ -95,8 +147,11 @@ func (c *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
}
}
}
// Mark the connection as "disconnected"
// Mark the connection as "disconnected".
c.connected = false
if c.tracer != nil {
c.tracer.AddTransactionalTrace(txId, "close", "success")
}
// Return a new connection to the user.
ch <- _default.NewDefaultPlcConnectionCloseResult(c, nil)
}()
Expand All @@ -110,6 +165,19 @@ func (c *Connection) IsConnected() bool {
func (c *Connection) Ping() <-chan plc4go.PlcConnectionPingResult {
ch := make(chan plc4go.PlcConnectionPingResult)
go func() {
// Check if the connection is connected
if !c.connected {
if c.tracer != nil {
c.tracer.AddTrace("ping", "error: not connected")
}
// Return an error to the user.
ch <- _default.NewDefaultPlcConnectionPingResult(errors.New("not connected"))
return
}
var txId string
if c.tracer != nil {
txId = c.tracer.AddTransactionalStartTrace("ping", "started")
}
if delayString, ok := c.options["pingDelay"]; ok {
// This is the length of the array, not the string
if len(delayString) == 1 {
Expand All @@ -119,8 +187,21 @@ func (c *Connection) Ping() <-chan plc4go.PlcConnectionPingResult {
}
}
}
// Return a new connection to the user.
ch <- _default.NewDefaultPlcConnectionPingResult(nil)
if errorString, ok := c.options["pingError"]; ok {
// If the ping operation should fail with an error, do so.
if len(errorString) == 1 {
ch <- _default.NewDefaultPlcConnectionPingResult(errors.New(errorString[0]))
}
if c.tracer != nil {
c.tracer.AddTransactionalTrace(txId, "ping", "error: "+errorString[0])
}
} else {
// Otherwise, give a positive response.
if c.tracer != nil {
c.tracer.AddTransactionalTrace(txId, "ping", "success")
}
ch <- _default.NewDefaultPlcConnectionPingResult(nil)
}
}()
return ch
}
Expand All @@ -142,11 +223,11 @@ func (c *Connection) GetMetadata() model.PlcConnectionMetadata {
}

func (c *Connection) ReadRequestBuilder() model.PlcReadRequestBuilder {
return internalModel.NewDefaultPlcReadRequestBuilder(c.fieldHandler, NewReader(c.device, c.options))
return internalModel.NewDefaultPlcReadRequestBuilder(c.fieldHandler, NewReader(c.device, c.options, c.tracer))
}

func (c *Connection) WriteRequestBuilder() model.PlcWriteRequestBuilder {
return internalModel.NewDefaultPlcWriteRequestBuilder(c.fieldHandler, c.valueHandler, NewWriter(c.device, c.options))
return internalModel.NewDefaultPlcWriteRequestBuilder(c.fieldHandler, c.valueHandler, NewWriter(c.device, c.options, c.tracer))
}

func (c *Connection) SubscriptionRequestBuilder() model.PlcSubscriptionRequestBuilder {
Expand Down
27 changes: 2 additions & 25 deletions plc4go/internal/plc4go/simulated/Connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ func TestConnection_ReadRequestBuilder(t *testing.T) {
options: map[string][]string{},
connected: true,
},
want: internalModel.NewDefaultPlcReadRequestBuilder(NewFieldHandler(), NewReader(NewDevice("hurz"), map[string][]string{})),
want: internalModel.NewDefaultPlcReadRequestBuilder(NewFieldHandler(), NewReader(NewDevice("hurz"), map[string][]string{}, nil)),
},
}
for _, tt := range tests {
Expand Down Expand Up @@ -715,7 +715,7 @@ func TestConnection_WriteRequestBuilder(t *testing.T) {
options: map[string][]string{},
connected: true,
},
want: internalModel.NewDefaultPlcWriteRequestBuilder(NewFieldHandler(), NewValueHandler(), NewWriter(NewDevice("hurz"), map[string][]string{})),
want: internalModel.NewDefaultPlcWriteRequestBuilder(NewFieldHandler(), NewValueHandler(), NewWriter(NewDevice("hurz"), map[string][]string{}, nil)),
},
}
for _, tt := range tests {
Expand All @@ -733,26 +733,3 @@ func TestConnection_WriteRequestBuilder(t *testing.T) {
})
}
}

func TestNewConnection(t *testing.T) {
type args struct {
device *Device
fieldHandler spi.PlcFieldHandler
valueHandler spi.PlcValueHandler
options map[string][]string
}
tests := []struct {
name string
args args
want *Connection
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := NewConnection(tt.args.device, tt.args.fieldHandler, tt.args.valueHandler, tt.args.options); !reflect.DeepEqual(got, tt.want) {
t.Errorf("NewConnection() = %v, want %v", got, tt.want)
}
})
}
}
12 changes: 11 additions & 1 deletion plc4go/internal/plc4go/simulated/Reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package simulated

import (
"github.com/apache/plc4x/plc4go/internal/plc4go/spi"
model2 "github.com/apache/plc4x/plc4go/internal/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/pkg/plc4go/model"
"github.com/apache/plc4x/plc4go/pkg/plc4go/values"
Expand All @@ -30,18 +31,24 @@ import (
type Reader struct {
device *Device
options map[string][]string
tracer *spi.Tracer
}

func NewReader(device *Device, options map[string][]string) Reader {
func NewReader(device *Device, options map[string][]string, tracer *spi.Tracer) Reader {
return Reader{
device: device,
options: options,
tracer: tracer,
}
}

func (r Reader) Read(readRequest model.PlcReadRequest) <-chan model.PlcReadRequestResult {
ch := make(chan model.PlcReadRequestResult)
go func() {
var txId string
if r.tracer != nil {
txId = r.tracer.AddTransactionalStartTrace("read", "started")
}
// Possibly add a delay.
if delayString, ok := r.options["readDelay"]; ok {
if len(delayString) == 1 {
Expand Down Expand Up @@ -73,6 +80,9 @@ func (r Reader) Read(readRequest model.PlcReadRequest) <-chan model.PlcReadReque
}
}

if r.tracer != nil {
r.tracer.AddTransactionalTrace(txId, "read", "success")
}
// Emit the response
ch <- &model2.DefaultPlcReadRequestResult{
Request: readRequest,
Expand Down
2 changes: 1 addition & 1 deletion plc4go/internal/plc4go/simulated/Reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func TestReader_Read(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := NewReader(tt.fields.device, tt.fields.options)
r := NewReader(tt.fields.device, tt.fields.options, nil)
readRequest := model3.NewDefaultPlcReadRequest(tt.args.fields, tt.args.fieldNames, r, nil)
timeBeforeReadRequest := time.Now()
readResponseChannel := r.Read(readRequest)
Expand Down
12 changes: 11 additions & 1 deletion plc4go/internal/plc4go/simulated/Writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package simulated

import (
"github.com/apache/plc4x/plc4go/internal/plc4go/spi"
model2 "github.com/apache/plc4x/plc4go/internal/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/pkg/plc4go/model"
"strconv"
Expand All @@ -29,18 +30,24 @@ import (
type Writer struct {
device *Device
options map[string][]string
tracer *spi.Tracer
}

func NewWriter(device *Device, options map[string][]string) Writer {
func NewWriter(device *Device, options map[string][]string, tracer *spi.Tracer) Writer {
return Writer{
device: device,
options: options,
tracer: tracer,
}
}

func (w Writer) Write(writeRequest model.PlcWriteRequest) <-chan model.PlcWriteRequestResult {
ch := make(chan model.PlcWriteRequestResult)
go func() {
var txId string
if w.tracer != nil {
txId = w.tracer.AddTransactionalStartTrace("write", "started")
}
// Possibly add a delay.
if delayString, ok := w.options["writeDelay"]; ok {
if len(delayString) == 1 {
Expand All @@ -65,6 +72,9 @@ func (w Writer) Write(writeRequest model.PlcWriteRequest) <-chan model.PlcWriteR
}
}

if w.tracer != nil {
w.tracer.AddTransactionalTrace(txId, "write", "success")
}
// Emit the response
ch <- &model2.DefaultPlcWriteRequestResult{
Request: writeRequest,
Expand Down
2 changes: 1 addition & 1 deletion plc4go/internal/plc4go/simulated/Writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func TestWriter_Write(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w := NewWriter(tt.fields.device, tt.fields.options)
w := NewWriter(tt.fields.device, tt.fields.options, nil)
writeRequest := model3.NewDefaultPlcWriteRequest(tt.args.fields, tt.args.fieldNames, tt.args.values, w, nil)
timeBeforeWriteRequest := time.Now()
writeResponseChannel := w.Write(writeRequest)
Expand Down
32 changes: 32 additions & 0 deletions plc4go/internal/plc4go/spi/PlcConnection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package spi

import (
"github.com/apache/plc4x/plc4go/pkg/plc4go"
)

type PlcConnection interface {
plc4go.PlcConnection

GetConnectionId() string
IsTraceEnabled() bool
GetTracer() *Tracer
}
Loading

0 comments on commit c9c0374

Please sign in to comment.