Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

531 implementation network layer #536

Open
wants to merge 11 commits into
base: Release4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions src/OpcUaStackPubSub/DataSetReader/DataSetReaderBase.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
Copyright 2022 Kai Huebl (kai@huebl-sgh.de)

Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser
Datei nur in Übereinstimmung mit der Lizenz erlaubt.
Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0.

Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart,
erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE
GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend.

Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen
im Rahmen der Lizenz finden Sie in der Lizenz.

Autor: Kai Huebl (kai@huebl-sgh.de), Aleksey Timin (atimin@gmail.com)
*/

#include "OpcUaStackPubSub/DataSetReader/DataSetReaderBase.h"

namespace OpcUaStackPubSub
{

DataSetReaderBase::DataSetReaderBase(void)
{
}

DataSetReaderBase::~DataSetReaderBase(void)
{
}

void
DataSetReaderBase::dataSetReaderId(uint32_t dataSetReaderId)
{
dataSetReaderId_ = dataSetReaderId;
}

uint32_t
DataSetReaderBase::dataSetReaderId(void)
{
return dataSetReaderId_;
}

bool
DataSetReaderBase::encodeDataSet(boost::asio::streambuf& dataSet, uint16_t& size)
{
// FIXME: todo
return true;
}

}
50 changes: 50 additions & 0 deletions src/OpcUaStackPubSub/DataSetReader/DataSetReaderBase.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
Copyright 2022 Kai Huebl (kai@huebl-sgh.de)

Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser
Datei nur in Übereinstimmung mit der Lizenz erlaubt.
Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0.

Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart,
erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE
GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend.

Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen
im Rahmen der Lizenz finden Sie in der Lizenz.

Autor: Kai Huebl (kai@huebl-sgh.de), Aleksey Timin (atimin@gmail.com)
*/

#ifndef __OpcUaStackPubSub_DataSetReaderBase_h__
#define __OpcUaStackPubSub_DataSetReaderBase_h__

#include <boost/asio.hpp>

#include <map>

#include "OpcUaStackCore/Base/os.h"

namespace OpcUaStackPubSub
{

class DLLEXPORT DataSetReaderBase
{
public:
typedef boost::shared_ptr<DataSetReaderBase> SPtr;
typedef std::map<uint16_t, SPtr> Map;

DataSetReaderBase(void);
virtual ~DataSetReaderBase(void);

void dataSetReaderId(uint32_t dataSetReaderId);
uint32_t dataSetReaderId(void);

bool encodeDataSet(boost::asio::streambuf& dataSet, uint16_t& size);

public:
uint32_t dataSetReaderId_ = 0; // unique data set writer id
};

}

#endif
164 changes: 164 additions & 0 deletions src/OpcUaStackPubSub/MessageTransport/MessageTransport.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
Copyright 2022 Kai Huebl (kai@huebl-sgh.de)

Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser
Datei nur in Übereinstimmung mit der Lizenz erlaubt.
Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0.

Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart,
erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE
GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend.

Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen
im Rahmen der Lizenz finden Sie in der Lizenz.

Autor: Kai Huebl (kai@huebl-sgh.de)

*/

#include "OpcUaStackPubSub/MessageTransport/MessageTransport.h"
#include "OpcUaStackPubSub/NetworkMessage/NetworkMessage.h"
#include "OpcUaStackCore/Base/Log.h"
#include "OpcUaStackPubSub/Events/NetworkSendEvent.h"
#include "OpcUaStackPubSub/Events/NetworkRecvEvent.h"

using namespace OpcUaStackCore;

namespace OpcUaStackPubSub
{

MessageTransport::MessageTransport(
const std::string& connectionName, // message bus member name
const std::string& serviceName,
OpcUaStackCore::IOThread::SPtr& ioThread,
OpcUaStackCore::MessageBus::SPtr& messageBus
)
: ServerServiceBase()
{
// set parameter
ioThread_ = ioThread;

// set parameter in server service base
serviceName_ = serviceName;
ServerServiceBase::ioThread_ = ioThread.get();
strand_ = ioThread->createStrand();
messageBus_ = messageBus;
connectionName_ = connectionName;

// register message bus receiver
MessageBusMemberConfig messageBusMemberConfig;
messageBusMemberConfig.strand(strand_);
messageBusMember_ = messageBus_->registerMember(serviceName_, messageBusMemberConfig);

// activate receiver
activateReceiver(
[this, &messageBusMemberConfig](const MessageBusMember::WPtr& handleFrom, Message::SPtr& message) {
// receive message from internal message bus

auto event = boost::static_pointer_cast<Event>(message);
switch (event->eventType())
{
case EventType::NetworkRecvEvent:
{
NetworkRecvEvent::SPtr event = boost::static_pointer_cast<NetworkRecvEvent>(message);

std::iostream ios(&(event->streamBuf()));
NetworkMessage networkMessage;
networkMessage.opcUaBinaryDecode(ios);

auto publisherId = networkMessage.networkMessageHeader()->publisherId();

auto it = networkMessageProcessorMap_.find(publisherId);
if (it != networkMessageProcessorMap_.end()) {
auto readerGroupBusMember = messageBus_->registerMember(it->second, messageBusMemberConfig);
messageBus_->messageSend(messageBusMember_, readerGroupBusMember, event);
} else {
Log(Error, "network message processor does not exist for this networkmessage")
.parameter("PublisherId", publisherId);
}

break;
}
case EventType::NetworkSendEvent:
{
Log(Info, "Recieved NetworkSendEvent");
NetworkSendEvent::SPtr event = boost::static_pointer_cast<NetworkSendEvent>(message);
messageBus_->messageSend(messageBusMember_, connectionBusMember_, event);
break;
}
default:
{
Log(Error, "invalid message received in message transport module")
.parameter("ServiceName", serviceName_)
.parameter("Event", (uint32_t)event->eventType());
}
}
}
);
}

MessageTransport::~MessageTransport(void)
{
// deactivate receiver
deactivateReceiver();
messageBus_->deregisterMember(messageBusMember_);
}

bool
MessageTransport::startup(void)
{
// get reference to connection from message bus
if (!messageBus_->existMember(connectionName_)) {
Log(Error, "udp connection message bus member don't exist")
.parameter("UdpConnectionName", connectionName_);
return false;
}
connectionBusMember_ = messageBus_->getMember(connectionName_);

return true;
}

bool
MessageTransport::shutdown(void)
{
// FIXME: todo
return true;
}

bool
MessageTransport::registerNetworkMessageProcessor(
uint32_t publisherId, // publisher id
const std::string& networkMessageProcessorName // message bus member name
)
{
auto it = networkMessageProcessorMap_.find(publisherId);
if (it != networkMessageProcessorMap_.end()) {
Log(Error, "register network message processor error, because network message processor d already exist")
.parameter("MessageTransport", serviceName_)
.parameter("NetworkMessageProcessor PublisherId", publisherId);
return false;
}

// add network message processor to map
networkMessageProcessorMap_.insert(std::make_pair(publisherId, networkMessageProcessorName));
return true;
}

bool
MessageTransport::deregisterNetworkMessageProcessor(
uint32_t publisherId
)
{
auto it = networkMessageProcessorMap_.find(publisherId);
if (it == networkMessageProcessorMap_.end()) {
Log(Error, "deregister network message processor error, because network message processor does not exist")
.parameter("MessageTransport", serviceName_)
.parameter("NetworkMessageProcessor PublisherId", publisherId);
return false;
}

networkMessageProcessorMap_.erase(it);
return true;
}

}
75 changes: 75 additions & 0 deletions src/OpcUaStackPubSub/MessageTransport/MessageTransport.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
Copyright 2022 Kai Huebl (kai@huebl-sgh.de)

Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser
Datei nur in Übereinstimmung mit der Lizenz erlaubt.
Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0.

Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart,
erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE
GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend.

Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen
im Rahmen der Lizenz finden Sie in der Lizenz.

Autor: Kai Huebl (kai@huebl-sgh.de)

DESCRIPTION:

1. Subscriber Mode:
The message transport subscriber module receives a message from the communication
module via the internal message bus.The publisher ID is determined from the received
message and the assigned network message processor is determined. The received message
is forwarded to the assigned network message processor module via the internal message
bus.
2. Publisher Mode
The message transport publisher module receives a message from one ore more
network message writer groups via the internal message bus. The received message
is forwarded to the assigned communication module via the internal message bus.

*/

#ifndef __OpcUaStackPubSub_MessageTransport_h__
#define __OpcUaStackPubSub_MessageTransport_h__

#include "OpcUaStackServer/ServiceSet/ServerServiceBase.h"

namespace OpcUaStackPubSub
{

class DLLEXPORT MessageTransport
: public OpcUaStackCore::Object
, public OpcUaStackServer::ServerServiceBase
{
public:
using SPtr = boost::shared_ptr<MessageTransport>;

MessageTransport(
const std::string& connectionName, // message bus member name
const std::string& serviceName,
OpcUaStackCore::IOThread::SPtr& ioThread,
OpcUaStackCore::MessageBus::SPtr& messageBus
);
~MessageTransport(void);

bool startup(void);
bool shutdown(void);

bool registerNetworkMessageProcessor(
uint32_t publisherId, // publisher id
const std::string& networkMessageProcessorName // message bus member name
);
bool deregisterNetworkMessageProcessor(
uint32_t publisherId
);

private:
OpcUaStackCore::IOThread::SPtr ioThread_; // smart pointer to io thread
std::string connectionName_;
OpcUaStackCore::MessageBusMember::WPtr connectionBusMember_;
std::map<uint32_t, std::string> networkMessageProcessorMap_;
};

}

#endif
Loading