Skip to content

Commit

Permalink
Added LGPL licence headers
Browse files Browse the repository at this point in the history
  • Loading branch information
Elias Karakoulakis committed Dec 27, 2011
1 parent cabdac0 commit be7923d
Show file tree
Hide file tree
Showing 14 changed files with 1,206 additions and 138 deletions.
84 changes: 58 additions & 26 deletions Main.cpp
@@ -1,5 +1,29 @@
/*
Thrift4OZW - An Apache Thrift wrapper for OpenZWave
----------------------------------------------------
Copyright (c) 2011 Elias Karakoulakis <elias.karakoulakis@gmail.com>
SOFTWARE NOTICE AND LICENSE
Thrift4OZW is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published
by the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
Thrift4OZW is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with Thrift4OZW. If not, see <http://www.gnu.org/licenses/>.
for more information on the LGPL, see:
http://en.wikipedia.org/wiki/GNU_Lesser_General_Public_License
*/

//
// OpenZWave hub for Project Ansible
// Main.cpp: OpenZWave Thrift Server for Project Ansible
// (c) 2011 Elias Karakoulakis <elias.karakoulakis@gmail.com>
//

Expand Down Expand Up @@ -46,8 +70,7 @@ typedef struct
list<ValueID> m_values;
} NodeInfo;
//
static list<NodeInfo*> g_nodes;
static std::map<uint64, ValueID*> g_values;
static list<NodeInfo*> g_nodes;

// OpenZWave includes
#include "Notification.h"
Expand Down Expand Up @@ -100,6 +123,7 @@ void OnNotification
)
{
bool notify_stomp = true;
bool send_valueID = false;

// Must do this inside a critical section to avoid conflicts with the main thread
g_criticalSection.lock();
Expand All @@ -118,12 +142,8 @@ void OnNotification
// Add the new value to the node's value list
ValueID v = _notification->GetValueID();
nodeInfo->m_values.push_back( v );
uint64 key = v.GetId();
// ekarak: also add it to global ValueID map
//std::cout << "========================= Adding "<<key<<std::hex<< " to g_values..."<<std::endl;
g_values[ key ] = &v;
}
//send_valueID = true;
send_valueID = true;
break;
}

Expand Down Expand Up @@ -153,14 +173,13 @@ void OnNotification
}
}
}
g_values.erase(_notification->GetValueID().GetId());
//send_valueID = true;
send_valueID = true;
break;
}

/**< A node value has been updated from the Z-Wave network. */
case Notification::Type_ValueChanged: {
//send_valueID = true;
send_valueID = true;
/**< The associations for the node have changed. The application
should rebuild any group information it holds about the node. */
}
Expand Down Expand Up @@ -281,12 +300,13 @@ void OnNotification
//
if (notify_stomp) {
STOMP::hdrmap headers;
headers["ValueHomeID"] = to_string<uint32_t>(_notification->GetValueID().GetHomeId(), std::hex);
headers["NotificationNodeId"] = to_string<uint16_t>(_notification->GetNodeId(), std::hex);
headers["NotificationType"] = to_string<uint32_t>(_notification->GetType(), std::hex);
headers["NotificationByte"] = to_string<uint32_t>(_notification->GetByte(), std::hex);
//if (send_valueID) {
headers["NotificationByte"] = to_string<uint16_t>(_notification->GetByte(), std::hex);
if (send_valueID) {
headers["HomeID"] = to_string<uint32_t>(_notification->GetValueID().GetHomeId(), std::hex);
headers["ValueID"] = to_string<uint64_t>(_notification->GetValueID().GetId(), std::hex);
//}
}
//
string empty = "" ;
stomp_client->send(*notifications_topic, headers, empty);
Expand All @@ -297,16 +317,26 @@ void OnNotification

// Send all known values via STOMP
void send_all_values() {
std::map<uint64, ValueID*>::iterator it;
for ( it=g_values.begin() ; it != g_values.end(); it++ ) {
ValueID* v = (*it).second;
STOMP::hdrmap headers;
headers["ValueHomeID"] = to_string<uint32_t>(v->GetHomeId(), std::hex);
headers["ValueID"] = to_string<uint64_t>(v->GetId(), std::hex);
//
string empty = "" ;
stomp_client->send(*notifications_topic, headers, empty);
}
//
g_criticalSection.lock();
//
for( list<NodeInfo*>::iterator node_it = g_nodes.begin(); node_it != g_nodes.end(); ++node_it )
{
NodeInfo* nodeInfo = *node_it;

for( list<ValueID>::iterator val_iter = nodeInfo->m_values.begin(); val_iter != nodeInfo->m_values.end(); ++val_iter )
{
ValueID v = *val_iter;
STOMP::hdrmap headers;
headers["HomeID"] = to_string<uint32_t>(v.GetHomeId(), std::hex);
headers["ValueID"] = to_string<uint64_t>(v.GetId(), std::hex);
//
string empty = "" ;
stomp_client->send(*notifications_topic, headers, empty);
}
}
//
g_criticalSection.unlock();
}

// ------------------------
Expand Down Expand Up @@ -357,7 +387,9 @@ int main(int argc, char **argv) {
int port = 9090;
shared_ptr<RemoteManagerHandler> handler(new RemoteManagerHandler());
shared_ptr<TProcessor> processor(new RemoteManagerProcessor(handler));
shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
TServerSocket* ss = new TServerSocket(port);
ss->setRecvTimeout(3000);
shared_ptr<TServerTransport> serverTransport(ss);
shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Expand Up @@ -83,7 +83,7 @@ Stomp_sm.o: Stomp_sm.cpp
StompSocket.o: StompSocket.cpp StompSocket.h
g++ $(CFLAGS) -c StompSocket.cpp $(INCLUDES)

PocoStomp.o: Stomp_sm.cpp StompSocket.o
PocoStomp.o: PocoStomp.cpp PocoStomp.h Stomp_sm.cpp StompSocket.o
g++ $(CFLAGS) -c PocoStomp.cpp $(INCLUDES)

Main.o: Main.cpp Stomp_sm.o gen-cpp/RemoteManager_server.cpp
Expand Down
172 changes: 112 additions & 60 deletions PocoStomp.cpp
@@ -1,30 +1,30 @@
//-----------------------------------------------------------------------------
//
/*
Thrift4OZW - An Apache Thrift wrapper for OpenZWave
----------------------------------------------------
Copyright (c) 2011 Elias Karakoulakis <elias.karakoulakis@gmail.com>
SOFTWARE NOTICE AND LICENSE
Thrift4OZW is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published
by the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
Thrift4OZW is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with Thrift4OZW. If not, see <http://www.gnu.org/licenses/>.
for more information on the LGPL, see:
http://en.wikipedia.org/wiki/GNU_Lesser_General_Public_License
*/

// PocoStomp.cpp
//
// a STOMP (Simple Text Oriented Messaging Protocol) client for OZW
// using the Poco library for platform interoperability
//
// Copyright (c) 2011 Elias Karakoulakis
//
// SOFTWARE NOTICE AND LICENSE
//
// This file is part of OpenZWave.
//
// OpenZWave is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published
// by the Free Software Foundation, either version 3 of the License,
// or (at your option) any later version.
//
// OpenZWave is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with OpenZWave. If not, see <http://www.gnu.org/licenses/>.
//
//-----------------------------------------------------------------------------

#include <stdlib.h>
#include <iostream>
Expand Down Expand Up @@ -57,10 +57,14 @@ namespace STOMP {
// constructor
// ####################################################
PocoStomp::PocoStomp(const std::string& hostname, int port):
m_hostname(hostname),
m_port(port),
m_connection(new Connection),
m_ackmode(ACK_AUTO),
m_fsm(* this) // initialize the state machine
{
m_connection->addr = NULL;
m_connection->socket = NULL;
// insert valid server commands in set
m_stomp_server_commands.insert("CONNECTED");
m_stomp_server_commands.insert("MESSAGE");
Expand All @@ -75,11 +79,6 @@ PocoStomp::PocoStomp(const std::string& hostname, int port):
m_initcond = new Poco::Condition();
m_mutex = new Poco::Mutex();
(m_thread = new Poco::Thread())->start(*this);
m_connection->addr = new Poco::Net::SocketAddress(hostname, port);
m_connection->socket = new Poco::Net::StompSocket(*(m_connection->addr));
// TODO: check connection
// signal FSM that we're connected
m_fsm.socket_connected();
}

// ####################################################
Expand All @@ -92,32 +91,70 @@ PocoStomp::~PocoStomp()
delete(m_thread);
delete(m_connection);
delete(m_mutex);
delete(m_initcond);
delete(m_initcond);
delete(m_initcond_mutex);
}

// ####################################################
bool PocoStomp::socket_connect()
// ####################################################
{
while (m_fsm.getState().getId() != StompFSM_map::SocketConnected.getId()) {
try {
if (m_connection->addr != NULL) {
delete m_connection->addr;
}
if (m_connection->socket != NULL) {
delete m_connection->socket;
}
m_connection->addr = new Poco::Net::SocketAddress(m_hostname, m_port);
m_connection->socket = new Poco::Net::StompSocket(*(m_connection->addr));
m_fsm.socket_connected();
} catch (exception& e) {
std::cerr << "Exception: " << e.what() << std::endl;
m_fsm.socket_disconnected();
sleep(3);
} catch (...) {
std::cerr << "other error in socket_connect()" << std::endl;
m_fsm.socket_disconnected();
sleep(3);
}
}
return(m_fsm.getState().getId() == StompFSM_map::SocketConnected.getId());
}

// ####################################################
bool PocoStomp::connect()
// ####################################################
{
bool result=true;
bool result=socket_connect();
m_mutex->lock();
if (m_fsm.getState().getId() == StompFSM_map::SocketConnected.getId()) {
//std::cout << "Sending CONNECT frame...";
Frame _frame("CONNECT");
//
if (stomp_write(&_frame)) {
m_fsm.send_frame(&_frame);
try {
if (m_fsm.getState().getId() == StompFSM_map::SocketConnected.getId()) {
//std::cout << "Sending CONNECT frame...";
Frame _frame("CONNECT");
//
if (stomp_write(&_frame)) {
m_fsm.send_frame(&_frame);
}
//
}
//
} catch (exception& e) {
std::cerr << "Exception: " << e.what() << std::endl;
m_fsm.socket_disconnected();
sleep(3);
} catch (...) {
std::cerr << "other error in connect()" << std::endl;
m_fsm.socket_disconnected();
sleep(3);
}
m_mutex->unlock();
return(result);
}


// ####################################################
bool PocoStomp::subscribe (std::string& _topic, pfnOnStompMessage_t _callback)
bool PocoStomp::subscribe(std::string& _topic, pfnOnStompMessage_t _callback)
// ####################################################
{
bool result = true;
Expand Down Expand Up @@ -358,44 +395,59 @@ void PocoStomp::stop_timer()
void PocoStomp::run()
// ####################################################
{

PFrame _frame;
bool cond1, cond2;
bool frame_popped = false;
int state_id;
//wait for stomp client initialization
m_initcond_mutex->lock();
m_initcond->wait(*m_initcond_mutex);
m_initcond_mutex->unlock();
// ok, stomp object is initialized, lets start.
for (;;) {
while (true) {
//
m_mutex->lock();
//
// phase 1: read incoming frames, if any
if (stomp_read(&_frame)) {
m_fsm.receive_frame(_frame);
//std::cout << "received frame:" << _frame->command << std::endl;
free(_frame);
}
//
// phase 2: send first frame in queue, if any
state_id = m_fsm.getState().getId();
cond1 = (state_id == StompFSM_map::Ready.getId());
cond2 = (m_sendqueue.size() > 0);
if (cond1 && cond2) {
_frame = m_sendqueue.front();
m_sendqueue.pop();
if (stomp_write(_frame)) {
m_fsm.send_frame(_frame);
//std::cout << "sent frame:" << _frame->command << std::endl;
run_loop_start:
try {
// phase 1: read incoming frames, if any
if (stomp_read(&_frame)) {
m_fsm.receive_frame(_frame);
//std::cout << "received frame:" << _frame->command << std::endl;
free(_frame);
}
//
// phase 2: send first frame in queue, if any
state_id = m_fsm.getState().getId();
cond1 = (state_id == StompFSM_map::Ready.getId());
cond2 = (m_sendqueue.size() > 0);
if (cond1 && cond2) {
_frame = m_sendqueue.front();
m_sendqueue.pop();
frame_popped = true;
if (stomp_write(_frame)) {
m_fsm.send_frame(_frame);
//std::cout << "sent frame:" << _frame->command << std::endl;
}
free(_frame);
}
free(_frame);
}
} catch (exception& e) {
std::cerr << "Exception in PocoStomp::run(): " << e.what() << std::endl;
if (frame_popped) m_sendqueue.push(_frame);
m_fsm.socket_disconnected();
sleep(3);
connect();
goto run_loop_start;
} catch (...) {
std::cerr << "default exception in PocoStomp::run()";
exit(-1);
};
//
m_mutex->unlock();
//
Poco::Thread::sleep(50);
}
}


}

0 comments on commit be7923d

Please sign in to comment.