Skip to content

Commit

Permalink
added Twisted support
Browse files Browse the repository at this point in the history
  • Loading branch information
forrestv committed Dec 14, 2011
1 parent b4841de commit a498558
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 27 deletions.
110 changes: 84 additions & 26 deletions dds.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import ctypes
import os
import struct
import sys
import weakref

Expand Down Expand Up @@ -163,6 +162,21 @@ def ex():

DDS_DYNAMIC_DATA_MEMBER_ID_UNSPECIFIED = 0

DDSType.Listener._fields_ = [
('listener_data', ctypes.c_void_p),
]

DDSType.DataReaderListener._fields_ = [
('as_listener', DDSType.Listener),
('on_requested_deadline_missed', ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.POINTER(DDSType.DataReader), ctypes.POINTER(DDSType.RequestedDeadlineMissedStatus))),
('on_requested_incompatible_qos', ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.POINTER(DDSType.DataReader), ctypes.POINTER(DDSType.RequestedIncompatibleQosStatus))),
('on_sample_rejected', ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.POINTER(DDSType.DataReader), ctypes.POINTER(DDSType.SampleRejectedStatus))),
('on_liveliness_changed', ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.POINTER(DDSType.DataReader), ctypes.POINTER(DDSType.LivelinessChangedStatus))),
('on_data_available', ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.POINTER(DDSType.DataReader))),
('on_subscription_matched', ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.POINTER(DDSType.DataReader), ctypes.POINTER(DDSType.SubscriptionMatchedStatus))),
('on_sample_lost', ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.POINTER(DDSType.DataReader), ctypes.POINTER(DDSType.SampleLostStatus))),
]

class TCKind(object):
NULL = 0
SHORT = 1
Expand All @@ -171,7 +185,7 @@ class TCKind(object):
ULONG = 4
FLOAT = 5
DOUBLE = 6
BOOLEAN = 7
BOOLEAN = 7
CHAR = 8
OCTET = 9
STRUCT = 10
Expand All @@ -191,6 +205,8 @@ class TCKind(object):
RAW_BYTES = 0x7e
RAW_BYTES_KEYED = 0x7f

DATA_AVAILABLE_STATUS = 0x0001 << 10

# Function prototypes

_dyn_basic_types = {
Expand Down Expand Up @@ -232,6 +248,8 @@ def _define_func((p, errcheck, restype, argtypes)):
('Subscriber_create_datareader', check_null, ctypes.POINTER(DDSType.DataReader), [ctypes.POINTER(DDSType.Subscriber), ctypes.POINTER(DDSType.TopicDescription), ctypes.POINTER(DDSType.DataReaderQos), ctypes.POINTER(DDSType.DataReaderListener), DDS_StatusMask]),
('Subscriber_delete_datareader', check_code, DDS_ReturnCode_t, [ctypes.POINTER(DDSType.Subscriber), ctypes.POINTER(DDSType.DataReader)]),

('DataReader_set_listener', check_code, DDS_ReturnCode_t, [ctypes.POINTER(DDSType.DataReader), ctypes.POINTER(DDSType.DataReaderListener), DDS_StatusMask]),

('DynamicDataTypeSupport_new', check_null, ctypes.POINTER(DDSType.DynamicDataTypeSupport), [ctypes.POINTER(DDSType.TypeCode), ctypes.POINTER(DDSType.DynamicDataTypeProperty_t)]),
('DynamicDataTypeSupport_delete', None, None, [ctypes.POINTER(DDSType.DynamicDataTypeSupport)]),
('DynamicDataTypeSupport_register_type', check_code, DDS_ReturnCode_t, [ctypes.POINTER(DDSType.DynamicDataTypeSupport), ctypes.POINTER(DDSType.DomainParticipant), ctypes.c_char_p]),
Expand All @@ -251,7 +269,7 @@ def _define_func((p, errcheck, restype, argtypes)):
('DynamicData_get_string', check_code, DDS_ReturnCode_t, [ctypes.POINTER(DDSType.DynamicData), ctypes.POINTER(ctypes.c_char_p), ctypes.POINTER(DDS_UnsignedLong), ctypes.c_char_p, DDS_DynamicDataMemberId]),
('DynamicData_get_wstring', check_code, DDS_ReturnCode_t, [ctypes.POINTER(DDSType.DynamicData), ctypes.POINTER(ctypes.c_wchar_p), ctypes.POINTER(DDS_UnsignedLong), ctypes.c_char_p, DDS_DynamicDataMemberId]),
('DynamicData_set_string', check_code, DDS_ReturnCode_t, [ctypes.POINTER(DDSType.DynamicData), ctypes.c_char_p, DDS_DynamicDataMemberId, ctypes.c_char_p]),
('DynamicData_set_wstring', check_code, DDS_ReturnCode_t, [ctypes.POINTER(DDSType.DynamicData), ctypes.c_char_p, DDS_DynamicDataMemberId, ctypes.c_wchar_p]),
('DynamicData_set_wstring', check_code, DDS_ReturnCode_t, [ctypes.POINTER(DDSType.DynamicData), ctypes.c_char_p, DDS_DynamicDataMemberId, ctypes.c_wchar_p]),
('DynamicData_bind_complex_member', check_code, DDS_ReturnCode_t, [ctypes.POINTER(DDSType.DynamicData), ctypes.POINTER(DDSType.DynamicData), ctypes.c_char_p, DDS_DynamicDataMemberId]),
('DynamicData_unbind_complex_member', check_code, DDS_ReturnCode_t, [ctypes.POINTER(DDSType.DynamicData), ctypes.POINTER(DDSType.DynamicData)]),
('DynamicData_get_member_type', check_code, DDS_ReturnCode_t, [ctypes.POINTER(DDSType.DynamicData), ctypes.POINTER(ctypes.POINTER(DDSType.TypeCode)), ctypes.c_char_p, DDS_DynamicDataMemberId]),
Expand Down Expand Up @@ -384,39 +402,83 @@ def unpack_dd(dd):
else:
raise NotImplementedError(kind)

_outside_refs = set()
_refs = set()

class Topic(object):
def __init__(self, dds, name, data_type):
self._dds = dds
self.name = name
self.data_type = data_type
del dds, name, data_type

self._support = DDSFunc.DynamicDataTypeSupport_new(self.data_type._get_typecode(), get('DYNAMIC_DATA_TYPE_PROPERTY_DEFAULT', DDSType.DynamicDataTypeProperty_t))
self._support = support = DDSFunc.DynamicDataTypeSupport_new(self.data_type._get_typecode(), get('DYNAMIC_DATA_TYPE_PROPERTY_DEFAULT', DDSType.DynamicDataTypeProperty_t))
self._support.register_type(self._dds._participant, self.data_type.name)

self._topic = self._dds._participant.create_topic(
self._topic = topic = self._dds._participant.create_topic(
self.name,
self.data_type.name,
get('TOPIC_QOS_DEFAULT', DDSType.TopicQos),
None,
0,
)

self._writer = self._dds._publisher.create_datawriter(
self._writer = writer = self._dds._publisher.create_datawriter(
self._topic,
get('DATAWRITER_QOS_DEFAULT', DDSType.DataWriterQos),
None,
0,
)
self._dyn_narrowed_writer = DDSFunc.DynamicDataWriter_narrow(self._writer)

self._reader = self._dds._subscriber.create_datareader(
self._listener = None
self._reader = reader = self._dds._subscriber.create_datareader(
self._topic.as_topicdescription(),
get('DATAREADER_QOS_DEFAULT', DDSType.DataReaderQos),
None,
self._listener,
0,
)
self._dyn_narrowed_reader = DDSFunc.DynamicDataReader_narrow(self._reader)

self._callbacks = {}

def cleanup(ref):
dds._publisher.delete_datawriter(writer)
dds._subscriber.delete_datareader(reader)
dds._participant.delete_topic(topic)
support.unregister_type(dds._participant, data_type.name)
support.delete()

_refs.remove(ref)
_refs.add(weakref.ref(self, cleanup))

def _enable_listener(self):
assert self._listener is None
self._listener = DDSType.DataReaderListener(on_data_available=ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.POINTER(DDSType.DataReader))(self._data_available_callback))
self._reader.set_listener(self._listener, DATA_AVAILABLE_STATUS)
_outside_refs.add(self) # really want self._listener, but this does the same thing

def _disable_listener(self):
assert self._listener is not None
self._reader.set_listener(None, 0)
self._listener = None
_outside_refs.remove(self)

def add_data_available_callback(self, cb):
'''Warning: callback is called back in another thread!'''
if not self._callbacks:
self._enable_listener()
ref = max(self._callbacks) if self._callbacks else 0
self._callbacks[ref] = cb
return ref

def remove_data_available_callback(self, ref):
del self._callbacks[ref]
if not self._callbacks:
self._disable_listener()

def _data_available_callback(self, listener_data, datareader):
for cb in self._callbacks.itervalues():
cb()

def send(self, msg):
sample = self._support.create_data()
Expand All @@ -438,35 +500,38 @@ def recv(self):
return unpack_dd(data_seq.get_reference(0))
finally:
self._dyn_narrowed_reader.return_loan(ctypes.byref(data_seq), ctypes.byref(info_seq))

def __del__(self):
self._dds._publisher.delete_datawriter(self._writer)
self._dds._subscriber.delete_datareader(self._reader)
self._dds._participant.delete_topic(self._topic)
self._support.unregister_type(self._dds._participant, self.data_type.name)
self._support.delete()

class DDS(object):
def __init__(self, domain_id=0):
self._participant = DDSFunc.DomainParticipantFactory_get_instance().create_participant(
self._participant = participant = DDSFunc.DomainParticipantFactory_get_instance().create_participant(
domain_id,
get('PARTICIPANT_QOS_DEFAULT', DDSType.DomainParticipantQos),
None,
0,
)

self._publisher = self._participant.create_publisher(
self._publisher = publisher = self._participant.create_publisher(
get('PUBLISHER_QOS_DEFAULT', DDSType.PublisherQos),
None,
0,
)

self._subscriber = self._participant.create_subscriber(
self._subscriber = subscriber = self._participant.create_subscriber(
get('SUBSCRIBER_QOS_DEFAULT', DDSType.SubscriberQos),
None,
0,
)
self._open_topics = weakref.WeakValueDictionary()

def cleanup(ref):
participant.delete_subscriber(subscriber)
participant.delete_publisher(publisher)

# very slow for some reason
DDSFunc.DomainParticipantFactory_get_instance().delete_participant(participant)

_refs.remove(ref)
_refs.add(weakref.ref(self, cleanup))

def get_topic(self, name, data_type):
res = self._open_topics.get(name, None)
Expand All @@ -477,13 +542,6 @@ def get_topic(self, name, data_type):
res = Topic(self, name, data_type)
self._open_topics[name] = res
return res

def __del__(self):
self._participant.delete_subscriber(self._subscriber)
self._participant.delete_publisher(self._publisher)

# very slow for some reason
DDSFunc.DomainParticipantFactory_get_instance().delete_participant(self._participant)


class LibraryType(object):
Expand Down
1 change: 0 additions & 1 deletion test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import sys
import time
import traceback
import random

import dds
Expand Down
38 changes: 38 additions & 0 deletions twistedds.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from twisted.internet import main, reactor
from twisted.python import failure

import dds

class DDSTransport(object):
def __init__(self, topic, protocol):
self.topic = topic
self.protocol = protocol

def connect(self):
self._cb_ref = self.topic.add_data_available_callback(self._data_available_callback)
self.protocol.transport = self
self.protocol.connectionMade()

def send(self, msg):
self.topic.send(msg)

def _data_available_callback(self):
reactor.callFromThread(self._data_available_callback2)

def _data_available_callback2(self):
while True:
try:
msg = self.topic.recv()
except dds.Error, e:
if e.message == 'no data':
break
raise
else:
self.protocol.messageReceived(msg)

def loseConnection(self):
self.topic.remove_data_available_callback(self._cb_ref)
self.protocol.connectionLost(failure.Failure(main.CONNECTION_DONE))

def connectDDS(topic, protocol):
DDSTransport(topic, protocol).connect()
18 changes: 18 additions & 0 deletions twistedds_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from twisted.internet import reactor

import dds
import twistedds

class TestProtocol(object):
def connectionMade(self):
print 'connection made!'

def messageReceived(self, msg):
print 'message received:', msg

def connectionLost(self, reason):
print 'connection lost! reason:', reason

twistedds.connectDDS(dds.DDS().get_topic('t2', dds.Library('./libddsmessages_c.so').HydrophoneMessage), TestProtocol())

reactor.run()

0 comments on commit a498558

Please sign in to comment.