/
TestPublisher.java
189 lines (166 loc) · 7.18 KB
/
TestPublisher.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
/*
*
*
* Distributed under the OpenDDS License.
* See: http://www.opendds.org/license.html
*/
import DDS.*;
import OpenDDS.DCPS.*;
import org.omg.CORBA.StringSeqHolder;
import Messenger.*;
public class TestPublisher {
private static final int N_MSGS = 40;
public static boolean checkReliable(String[] args) {
for (int i = 0; i < args.length; ++i) {
if (args[i].equals("-r")) {
return true;
}
}
return false;
}
public static void main(String[] args) {
System.out.println("Start Publisher");
boolean reliable = checkReliable(args);
DomainParticipantFactory dpf =
TheParticipantFactory.WithArgs(new StringSeqHolder(args));
if (dpf == null) {
System.err.println("ERROR: Domain Participant Factory not found");
return;
}
DomainParticipant dp = dpf.create_participant(4,
PARTICIPANT_QOS_DEFAULT.get(), null, DEFAULT_STATUS_MASK.value);
if (dp == null) {
System.err.println("ERROR: Domain Participant creation failed");
return;
}
MessageTypeSupportImpl servant = new MessageTypeSupportImpl();
if (servant.register_type(dp, "") != RETCODE_OK.value) {
System.err.println("ERROR: register_type failed");
return;
}
Topic top = dp.create_topic("Movie Discussion List",
servant.get_type_name(),
TOPIC_QOS_DEFAULT.get(),
null,
DEFAULT_STATUS_MASK.value);
if (top == null) {
System.err.println("ERROR: Topic creation failed");
return;
}
Publisher pub = dp.create_publisher(PUBLISHER_QOS_DEFAULT.get(), null,
DEFAULT_STATUS_MASK.value);
if (pub == null) {
System.err.println("ERROR: Publisher creation failed");
return;
}
// Use the default transport configuration (do nothing)
DataWriterQos dw_qos = new DataWriterQos();
dw_qos.durability = new DurabilityQosPolicy();
dw_qos.durability.kind = DurabilityQosPolicyKind.from_int(0);
dw_qos.durability_service = new DurabilityServiceQosPolicy();
dw_qos.durability_service.history_kind = HistoryQosPolicyKind.from_int(0);
dw_qos.durability_service.service_cleanup_delay = new Duration_t();
dw_qos.deadline = new DeadlineQosPolicy();
dw_qos.deadline.period = new Duration_t();
dw_qos.latency_budget = new LatencyBudgetQosPolicy();
dw_qos.latency_budget.duration = new Duration_t();
dw_qos.liveliness = new LivelinessQosPolicy();
dw_qos.liveliness.kind = LivelinessQosPolicyKind.from_int(0);
dw_qos.liveliness.lease_duration = new Duration_t();
dw_qos.reliability = new ReliabilityQosPolicy();
dw_qos.reliability.kind = ReliabilityQosPolicyKind.from_int(0);
dw_qos.reliability.max_blocking_time = new Duration_t();
dw_qos.destination_order = new DestinationOrderQosPolicy();
dw_qos.destination_order.kind = DestinationOrderQosPolicyKind.from_int(0);
dw_qos.history = new HistoryQosPolicy();
dw_qos.history.kind = HistoryQosPolicyKind.from_int(0);
dw_qos.resource_limits = new ResourceLimitsQosPolicy();
dw_qos.transport_priority = new TransportPriorityQosPolicy();
dw_qos.lifespan = new LifespanQosPolicy();
dw_qos.lifespan.duration = new Duration_t();
dw_qos.user_data = new UserDataQosPolicy();
dw_qos.user_data.value = new byte[0];
dw_qos.ownership = new OwnershipQosPolicy();
dw_qos.ownership.kind = OwnershipQosPolicyKind.from_int(0);
dw_qos.ownership_strength = new OwnershipStrengthQosPolicy();
dw_qos.writer_data_lifecycle = new WriterDataLifecycleQosPolicy();
dw_qos.representation = new DataRepresentationQosPolicy();
dw_qos.representation.value = new short[0];
DataWriterQosHolder qosh = new DataWriterQosHolder(dw_qos);
pub.get_default_datawriter_qos(qosh);
qosh.value.history.kind = HistoryQosPolicyKind.KEEP_ALL_HISTORY_QOS;
if (reliable) {
qosh.value.reliability.kind =
ReliabilityQosPolicyKind.RELIABLE_RELIABILITY_QOS;
}
DataWriter dw = pub.create_datawriter(top,
qosh.value,
null,
DEFAULT_STATUS_MASK.value);
if (dw == null) {
System.err.println("ERROR: DataWriter creation failed");
return;
}
System.out.println("Publisher Created DataWriter");
StatusCondition sc = dw.get_statuscondition();
sc.set_enabled_statuses(PUBLICATION_MATCHED_STATUS.value);
WaitSet ws = new WaitSet();
ws.attach_condition(sc);
PublicationMatchedStatusHolder matched =
new PublicationMatchedStatusHolder(new PublicationMatchedStatus());
Duration_t timeout = new Duration_t(DURATION_INFINITE_SEC.value,
DURATION_INFINITE_NSEC.value);
while (true) {
final int result = dw.get_publication_matched_status(matched);
if (result != RETCODE_OK.value) {
System.err.println("ERROR: get_publication_matched_status()" +
"failed.");
return;
}
if (matched.value.current_count >= 1) {
System.out.println("Publisher Matched");
break;
}
ConditionSeqHolder cond = new ConditionSeqHolder(new Condition[]{});
if (ws.wait(cond, timeout) != RETCODE_OK.value) {
System.err.println("ERROR: wait() failed.");
return;
}
}
ws.detach_condition(sc);
MessageDataWriter mdw = MessageDataWriterHelper.narrow(dw);
Message msg = new Message();
msg.subject_id = 99;
int handle = mdw.register_instance(msg);
msg.from = "OpenDDS-Java";
msg.subject = "Review";
msg.text = "Worst. Movie. Ever.";
msg.count = 0;
int ret = RETCODE_TIMEOUT.value;
for (; msg.count < N_MSGS; ++msg.count) {
while ((ret = mdw.write(msg, handle)) == RETCODE_TIMEOUT.value) {
}
if (ret != RETCODE_OK.value) {
System.err.println("ERROR " + msg.count +
" write() returned " + ret);
}
try {
Thread.sleep(100);
} catch(InterruptedException ie) {
}
}
while (matched.value.current_count != 0) {
final int result = mdw.get_publication_matched_status(matched);
try {
Thread.sleep(100);
} catch(InterruptedException ie) {
}
}
System.out.println("Stop Publisher");
// Clean up
dp.delete_contained_entities();
dpf.delete_participant(dp);
TheServiceParticipant.shutdown();
System.out.println("Publisher exiting");
}
}