/
WireAsset.java
471 lines (405 loc) · 16.3 KB
/
WireAsset.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
/*******************************************************************************
* Copyright (c) 2016, 2023 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Eurotech
* Amit Kumar Mondal
*******************************************************************************/
package org.eclipse.kura.internal.wire.asset;
import static java.util.Objects.isNull;
import static java.util.Objects.requireNonNull;
import static org.eclipse.kura.channel.ChannelType.READ_WRITE;
import static org.eclipse.kura.channel.ChannelType.WRITE;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.kura.KuraException;
import org.eclipse.kura.asset.AssetConfiguration;
import org.eclipse.kura.asset.provider.BaseAsset;
import org.eclipse.kura.channel.Channel;
import org.eclipse.kura.channel.ChannelRecord;
import org.eclipse.kura.channel.ChannelType;
import org.eclipse.kura.channel.listener.ChannelEvent;
import org.eclipse.kura.channel.listener.ChannelListener;
import org.eclipse.kura.core.configuration.metatype.Tad;
import org.eclipse.kura.core.configuration.metatype.Tocd;
import org.eclipse.kura.driver.PreparedRead;
import org.eclipse.kura.type.TypedValue;
import org.eclipse.kura.type.TypedValues;
import org.eclipse.kura.util.collection.CollectionUtil;
import org.eclipse.kura.wire.WireComponent;
import org.eclipse.kura.wire.WireEmitter;
import org.eclipse.kura.wire.WireEnvelope;
import org.eclipse.kura.wire.WireHelperService;
import org.eclipse.kura.wire.WireReceiver;
import org.eclipse.kura.wire.WireRecord;
import org.eclipse.kura.wire.WireSupport;
import org.osgi.framework.ServiceReference;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.wireadmin.Wire;
/**
* The Class WireAsset is a wire component which provides all necessary higher
* level abstractions of a Kura asset. This wire asset is an integral wire
* component in Kura Wires topology as it represents an industrial device with a
* field protocol driver associated to it.<br/>
* <br/>
*
* The WireRecord to be emitted by every wire asset comprises the following keys
*
* <ul>
* <li>{@code <channelName>}</li>
* <li>{@code <channelName>_assetName}</li>
* <li>{@code <channelName>_timestamp}</li>
* </ul>
*
* For example, if the processing of data of a channel with a name of
* {@code LED} becomes
* <b>successful</b>, the data will be as follows:
*
* <pre>
* 1. LED = true
* 2. LED_assetName = MODICON_PLC
* 4. LED_timestamp = 201648274712
* </pre>
*
* <br/>
* Also note that, if the channel name is equal to the received value of the
* channel wire field name, then it would be considered as a WRITE wire field
* value to the specific channel. <br/>
* <br/>
* For instance, {@code A} asset sends a {@link WireRecord} to {@code B} asset
* and the
* received {@link WireRecord} contains list of Wire Fields. If there exists a
* Wire
* Field which signifies the channel name and if this channel name also exists
* in {@code B}'s list of configured channels, then the Wire Field which
* contains the typed value of this channel in the received {@link WireRecord}
* will be
* considered as a WRITE Value in that specific channel in B and this value will
* be written to {@code B}'s channel
*
* @see Channel
* @see ChannelRecord
* @see WireRecord
* @see org.eclipse.kura.asset.Asset
*/
public final class WireAsset extends BaseAsset implements WireEmitter, WireReceiver {
private static final Logger logger = LogManager.getLogger(WireAsset.class);
private volatile WireHelperService wireHelperService;
private WireSupport wireSupport;
private Optional<ValueChangeCache> valueChangeCache = Optional.empty();
private WireAssetOptions options = new WireAssetOptions();
private PreparedEmit preparedEmit;
/**
* Binds the Wire Helper Service.
*
* @param wireHelperService
* the new Wire Helper Service
*/
public void bindWireHelperService(final WireHelperService wireHelperService) {
if (isNull(this.wireHelperService)) {
this.wireHelperService = wireHelperService;
}
}
/**
* Unbinds the Wire Helper Service.
*
* @param wireHelperService
* the new Wire Helper Service
*/
public void unbindWireHelperService(final WireHelperService wireHelperService) {
if (this.wireHelperService == wireHelperService) {
this.wireHelperService = null;
}
}
/**
* OSGi service component activation callback.
*
* @param componentContext
* the component context
* @param properties
* the service properties
*/
@Override
protected void activate(final ComponentContext componentContext, final Map<String, Object> properties) {
logger.debug("Activating Wire Asset...");
this.wireSupport = this.wireHelperService.newWireSupport(this,
(ServiceReference<WireComponent>) componentContext.getServiceReference());
super.activate(componentContext, properties);
logger.debug("Activating Wire Asset...Done");
}
/**
* OSGi service component update callback.
*
* @param properties
* the service properties
*/
@Override
public void updated(final Map<String, Object> properties) {
logger.debug("Updating Wire Asset...");
this.options = new WireAssetOptions(properties);
if (this.options.emitOnChange()) {
this.valueChangeCache = Optional.of(new ValueChangeCache());
} else {
this.valueChangeCache = Optional.empty();
}
super.updated(properties);
logger.debug("Updating Wire Asset...Done");
}
/**
* OSGi service component deactivate callback.
*
* @param context
* the context
*/
@Override
protected void deactivate(final ComponentContext context) {
logger.debug("Deactivating Wire Asset...");
super.deactivate(context);
logger.debug("Deactivating Wire Asset...Done");
}
/** {@inheritDoc} */
@Override
public void consumersConnected(final Wire[] wires) {
this.wireSupport.consumersConnected(wires);
}
/** {@inheritDoc} */
@Override
protected String getFactoryPid() {
return CONF_PID;
}
/**
* This method is triggered as soon as the wire component receives a Wire
* Envelope. After it receives a {@link WireEnvelope}, it checks for all
* associated
* channels to read and write and perform the operations accordingly. The
* order of executions are performed the following way:
*
* <ul>
* <li>Perform all read operations on associated reading channels</li>
* <li>Perform all write operations on associated writing channels</li>
* <ul>
*
* Both the aforementioned operations are performed as soon as this Wire
* Component
* receives {@code Non Null} {@link WireEnvelop} from its upstream Wire
* Component(s).
*
* @param wireEnvelope
* the received {@link WireEnvelope}
* @throws NullPointerException
* if {@link WireEnvelope} is null
*/
@Override
public void onWireReceive(final WireEnvelope wireEnvelope) {
requireNonNull(wireEnvelope, "Wire Envelope cannot be null");
emitAllReadChannels();
final List<WireRecord> records = wireEnvelope.getRecords();
for (WireRecord wireRecord : records) {
final List<ChannelRecord> channelRecordsToWrite = determineWritingChannels(wireRecord);
writeChannels(channelRecordsToWrite);
}
}
@Override
protected void onPreparedReadCreated(PreparedRead preparedRead) {
this.preparedEmit = new PreparedEmit(preparedRead.getChannelRecords());
}
@Override
protected void onPreparedReadReleased(PreparedRead preparedRead) {
this.preparedEmit = null;
}
private void emitAllReadChannels() {
if (hasReadChannels()) {
try {
emitChannelRecords(readAllChannels());
} catch (final Exception e) {
logger.error("Error while performing read from the Wire Asset...", e);
}
}
}
/**
* Determine the channels to write
*
* @param records
* the list of {@link WireRecord}s to parse
* @return list of Channel Records containing the values to be written
* @throws NullPointerException
* if argument is null
*/
private List<ChannelRecord> determineWritingChannels(final WireRecord record) {
requireNonNull(record, "Wire Record cannot be null");
final List<ChannelRecord> channelRecordsToWrite = CollectionUtil.newArrayList();
final AssetConfiguration assetConfiguration = getAssetConfiguration();
final Map<String, Channel> channels = assetConfiguration.getAssetChannels();
for (final Entry<String, Channel> channelEntry : channels.entrySet()) {
final Channel channel = channelEntry.getValue();
final String channelName = channel.getName();
final ChannelType channelType = channel.getType();
if (channelType != WRITE && channelType != READ_WRITE) {
continue;
}
Map<String, TypedValue<?>> wireRecordProperties = record.getProperties();
if (wireRecordProperties.containsKey(channelName)) {
final TypedValue<?> value = wireRecordProperties.get(channelName);
if (channel.getValueType() == value.getType()) {
channelRecordsToWrite.add(channel.createWriteRecord(value));
}
}
}
return channelRecordsToWrite;
}
/**
* Emit the provided list of channel records to the associated wires.
*
* @param channelRecords
* the list of channel records conforming to the
* aforementioned
* specification
* @throws NullPointerException
* if provided records list is null
* @throws IllegalArgumentException
* if provided records list is empty
*/
private void emitChannelRecords(final List<ChannelRecord> channelRecords) {
requireNonNull(channelRecords, "List of Channel Records cannot be null");
if (channelRecords.isEmpty()) {
throw new IllegalArgumentException("Channel Records cannot be empty");
}
final List<ChannelRecord> toBeEmitted = this.valueChangeCache.map(c -> c.filterRecords(channelRecords))
.orElse(channelRecords);
final Map<String, TypedValue<?>> wireRecordProperties;
if (this.preparedEmit != null) {
wireRecordProperties = this.preparedEmit.execute(toBeEmitted);
} else {
wireRecordProperties = Utils.toWireRecordProperties(toBeEmitted, this.options);
}
if (!this.options.emitEmptyEnvelopes() && wireRecordProperties.isEmpty()) {
return;
}
try {
wireRecordProperties.put(WireAssetConstants.PROP_ASSET_NAME.value(),
TypedValues.newStringValue(getKuraServicePid()));
} catch (KuraException e) {
logger.error("Configurations cannot be null", e);
}
this.wireSupport.emit(Collections.singletonList(new WireRecord(wireRecordProperties)));
}
/**
* Perform Channel Write operation
*
* @param channelRecordsToWrite
* the list of {@link ChannelRecord}s
* @throws NullPointerException
* if the provided list is null
*/
private void writeChannels(final List<ChannelRecord> channelRecordsToWrite) {
requireNonNull(channelRecordsToWrite, "List of Channel Records cannot be null");
if (channelRecordsToWrite.isEmpty()) {
return;
}
try {
write(channelRecordsToWrite);
} catch (final Exception e) {
logger.error("Error while performing write from the Wire Asset...", e);
}
}
private boolean isListeningChannel(final Map<String, Object> properties) {
try {
return Boolean.parseBoolean(properties.get(WireAssetConstants.LISTEN_PROP_NAME.value()).toString());
} catch (Exception e) {
logger.warn("Failed to retreive \"listen\" property from channel configuration");
return false;
}
}
@Override
protected boolean isChannelListenerValid(final ChannelListenerHolder reg, final Channel channel) {
if (!super.isChannelListenerValid(reg, channel)) {
return false;
}
final ChannelListener listener = reg.getChannelListener();
if (!(listener instanceof EmitterChannelListener)) {
return true;
}
return ((EmitterChannelListener) listener).outer() != this;
}
@Override
protected void updateChannelListenerRegistrations(final Set<ChannelListenerHolder> listeners,
final AssetConfiguration config) {
super.updateChannelListenerRegistrations(listeners, config);
config.getAssetChannels().entrySet().stream().filter(e -> isListeningChannel(e.getValue().getConfiguration()))
.map(e -> new ChannelListenerHolder(e.getValue(), new EmitterChannelListener()))
.forEach(listeners::add);
}
@Override
@SuppressWarnings("unchecked")
protected List<Tad> getAssetChannelDescriptor() {
return (List<Tad>) WireAssetChannelDescriptor.get().getDescriptor();
}
@Override
protected Tocd getOCD() {
return new WireAssetOCD();
}
/** {@inheritDoc} */
@Override
public Object polled(final Wire wire) {
return this.wireSupport.polled(wire);
}
/** {@inheritDoc} */
@Override
public void producersConnected(final Wire[] wires) {
this.wireSupport.producersConnected(wires);
}
/** {@inheritDoc} */
@Override
public void updated(final Wire wire, final Object value) {
this.wireSupport.updated(wire, value);
}
private class EmitterChannelListener implements ChannelListener {
@Override
public void onChannelEvent(ChannelEvent event) {
if (WireAsset.this.options.emitAllChannels()) {
emitAllReadChannels();
} else {
final ChannelRecord eventRecord = event.getChannelRecord();
if (eventRecord.getUnit() == null || eventRecord.getUnit().isEmpty()) {
final Channel channel = getAssetConfiguration().getAssetChannels()
.get(eventRecord.getChannelName());
if (channel != null) {
eventRecord.setUnit(channel.getUnit());
}
}
emitChannelRecords(Collections.singletonList(eventRecord));
}
}
public WireAsset outer() {
return WireAsset.this;
}
}
private class PreparedEmit {
private final List<ChannelRecord> preparedRecords;
private final List<RecordFiller> recordFillers;
PreparedEmit(final List<ChannelRecord> records) {
this.preparedRecords = records;
this.recordFillers = RecordFillers.create(this.preparedRecords, WireAsset.this.options);
}
Map<String, TypedValue<?>> execute(final List<ChannelRecord> channelRecords) {
if (channelRecords != this.preparedRecords) {
// driver changed the record list
// fallback to slow mode
return Utils.toWireRecordProperties(channelRecords, WireAsset.this.options);
}
return Utils.toWireRecordProperties(channelRecords, WireAsset.this.options, this.recordFillers);
}
}
}