/
AbstractProtocolAdapterBase.java
1619 lines (1489 loc) · 71.1 KB
/
AbstractProtocolAdapterBase.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*******************************************************************************
* Copyright (c) 2016, 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*******************************************************************************/
package org.eclipse.hono.service;
import java.net.HttpURLConnection;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.eclipse.hono.adapter.client.registry.TenantClient;
import org.eclipse.hono.adapter.client.telemetry.EventSender;
import org.eclipse.hono.adapter.client.telemetry.TelemetrySender;
import org.eclipse.hono.adapter.client.util.ServiceClient;
import org.eclipse.hono.auth.Device;
import org.eclipse.hono.client.BasicDeviceConnectionClientFactory;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.CommandContext;
import org.eclipse.hono.client.CommandResponse;
import org.eclipse.hono.client.CommandResponseSender;
import org.eclipse.hono.client.CommandTargetMapper;
import org.eclipse.hono.client.ConnectionLifecycle;
import org.eclipse.hono.client.CredentialsClientFactory;
import org.eclipse.hono.client.DeviceConnectionClient;
import org.eclipse.hono.client.DeviceConnectionClientFactory;
import org.eclipse.hono.client.DisconnectListener;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.ProtocolAdapterCommandConsumer;
import org.eclipse.hono.client.ProtocolAdapterCommandConsumerFactory;
import org.eclipse.hono.client.ReconnectListener;
import org.eclipse.hono.client.RegistrationClient;
import org.eclipse.hono.client.RegistrationClientFactory;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.config.ProtocolAdapterProperties;
import org.eclipse.hono.service.auth.ValidityBasedTrustOptions;
import org.eclipse.hono.service.http.HttpUtils;
import org.eclipse.hono.service.limiting.ConnectionLimitManager;
import org.eclipse.hono.service.metric.MetricsTags.ConnectionAttemptOutcome;
import org.eclipse.hono.service.monitoring.ConnectionEventProducer;
import org.eclipse.hono.service.resourcelimits.NoopResourceLimitChecks;
import org.eclipse.hono.service.resourcelimits.ResourceLimitChecks;
import org.eclipse.hono.service.util.ServiceBaseUtils;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.CredentialsConstants;
import org.eclipse.hono.util.DeviceConnectionConstants;
import org.eclipse.hono.util.EventConstants;
import org.eclipse.hono.util.Lifecycle;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.QoS;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.RegistrationConstants;
import org.eclipse.hono.util.ResourceIdentifier;
import org.eclipse.hono.util.Strings;
import org.eclipse.hono.util.TelemetryConstants;
import org.eclipse.hono.util.TelemetryExecutionContext;
import org.eclipse.hono.util.TenantConstants;
import org.eclipse.hono.util.TenantObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import io.micrometer.core.instrument.Timer.Sample;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.opentracing.SpanContext;
import io.opentracing.tag.Tags;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.TrustOptions;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import io.vertx.ext.healthchecks.Status;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
/**
* A base class for implementing protocol adapters.
* <p>
* Provides connections to device registration and telemetry and event service endpoints.
*
* @param <T> The type of configuration properties used by this service.
*/
public abstract class AbstractProtocolAdapterBase<T extends ProtocolAdapterProperties> extends AbstractServiceBase<T> {
/**
* The <em>application/octet-stream</em> content type.
*/
protected static final String CONTENT_TYPE_OCTET_STREAM = MessageHelper.CONTENT_TYPE_OCTET_STREAM;
/**
* The key used for storing a Micrometer {@code Sample} in an
* execution context.
*/
protected static final String KEY_MICROMETER_SAMPLE = "micrometer.sample";
private TelemetrySender telemetrySender;
private EventSender eventSender;
private RegistrationClientFactory registrationClientFactory;
private TenantClient tenantClient;
private BasicDeviceConnectionClientFactory deviceConnectionClientFactory;
private CredentialsClientFactory credentialsClientFactory;
private ProtocolAdapterCommandConsumerFactory commandConsumerFactory;
private CommandTargetMapper commandTargetMapper;
private ConnectionLimitManager connectionLimitManager;
private ConnectionEventProducer connectionEventProducer;
private ResourceLimitChecks resourceLimitChecks = new NoopResourceLimitChecks();
private final ConnectionEventProducer.Context connectionEventProducerContext = new ConnectionEventProducer.Context() {
@Override
public EventSender getMessageSenderClient() {
return AbstractProtocolAdapterBase.this.eventSender;
}
@Override
public TenantClient getTenantClient() {
return AbstractProtocolAdapterBase.this.tenantClient;
}
};
/**
* Adds a Micrometer sample to a command context.
*
* @param ctx The context to add the sample to.
* @param sample The sample.
* @throws NullPointerException if ctx is {@code null}.
*/
protected static final void addMicrometerSample(final CommandContext ctx, final Sample sample) {
Objects.requireNonNull(ctx);
ctx.put(KEY_MICROMETER_SAMPLE, sample);
}
/**
* Gets the timer used to track the processing of a command message.
*
* @param ctx The command context to extract the sample from.
* @return The sample or {@code null} if the context does not
* contain a sample.
* @throws NullPointerException if ctx is {@code null}.
*/
protected static final Sample getMicrometerSample(final CommandContext ctx) {
Objects.requireNonNull(ctx);
return ctx.get(KEY_MICROMETER_SAMPLE);
}
/**
* Sets the configuration by means of Spring dependency injection.
* <p>
* Most protocol adapters will support a single transport protocol to communicate with devices only. For those
* adapters there will only be a single bean instance available in the application context of type <em>T</em>.
*/
@Autowired
@Override
public void setConfig(final T configuration) {
setSpecificConfig(configuration);
}
/**
* Sets the client to use for accessing the Tenant service.
*
* @param client The client.
* @throws NullPointerException if the client is {@code null}.
*/
@Qualifier(TenantConstants.TENANT_ENDPOINT)
@Autowired
public final void setTenantClient(final TenantClient client) {
this.tenantClient = Objects.requireNonNull(client);
}
/**
* Gets the client used for accessing the Tenant service.
*
* @return The client.
*/
public final TenantClient getTenantClient() {
return tenantClient;
}
/**
* Sets the factory to use for creating a client for the Device Connection service.
*
* @param factory The factory.
* @throws NullPointerException if the factory is {@code null}.
*/
@Qualifier(DeviceConnectionConstants.DEVICE_CONNECTION_ENDPOINT)
@Autowired
public final void setDeviceConnectionClientFactory(final BasicDeviceConnectionClientFactory factory) {
this.deviceConnectionClientFactory = Objects.requireNonNull(factory);
}
/**
* Gets the factory used for creating a client for the Device Connection service.
*
* @return The factory.
* @deprecated Use {@link #getDeviceConnectionClient(String)} in order to access
* device connection information.
*/
@Deprecated(forRemoval = true)
public final DeviceConnectionClientFactory getDeviceConnectionClientFactory() {
if (deviceConnectionClientFactory instanceof DeviceConnectionClientFactory) {
return (DeviceConnectionClientFactory) deviceConnectionClientFactory;
} else {
return null;
}
}
/**
* Gets a client for interacting with the Device Connection service.
*
* @param tenantId The tenant that the client is scoped to.
* @return The client.
* @throws IllegalStateException if no client factory is set.
*/
protected final Future<DeviceConnectionClient> getDeviceConnectionClient(final String tenantId) {
if (deviceConnectionClientFactory == null) {
throw new IllegalStateException("Device Connection client factory is not set");
}
return deviceConnectionClientFactory.getOrCreateDeviceConnectionClient(tenantId);
}
/**
* Sets the client to use for sending telemetry messages downstream.
*
* @param sender The sender.
* @throws NullPointerException if the sender is {@code null}.
*/
@Qualifier(TelemetryConstants.TELEMETRY_ENDPOINT)
@Autowired
public final void setTelemetrySender(final TelemetrySender sender) {
this.telemetrySender = Objects.requireNonNull(sender);
}
/**
* Gets the client being used for sending telemetry messages downstream.
*
* @return The sender.
*/
public final TelemetrySender getTelemetrySender() {
return telemetrySender;
}
/**
* Sets the client to use for sending events downstream.
*
* @param sender The sender.
* @throws NullPointerException if the sender is {@code null}.
*/
@Qualifier(EventConstants.EVENT_ENDPOINT)
@Autowired
public final void setEventSender(final EventSender sender) {
this.eventSender = Objects.requireNonNull(sender);
}
/**
* Gets the client being used for sending events downstream.
*
* @return The sender.
*/
public final EventSender getEventSender() {
return eventSender;
}
/**
* Sets the factory to use for creating a client for the Device Registration service.
*
* @param factory The factory.
* @throws NullPointerException if the factory is {@code null}.
*/
@Qualifier(RegistrationConstants.REGISTRATION_ENDPOINT)
@Autowired
public final void setRegistrationClientFactory(final RegistrationClientFactory factory) {
this.registrationClientFactory = Objects.requireNonNull(factory);
}
/**
* Gets the factory used for creating a client for the Device Registration service.
*
* @return The factory.
*/
public final RegistrationClientFactory getRegistrationClientFactory() {
return registrationClientFactory;
}
/**
* Sets the factory to use for creating a client for the Credentials service.
*
* @param factory The factory.
* @throws NullPointerException if the factory is {@code null}.
*/
@Qualifier(CredentialsConstants.CREDENTIALS_ENDPOINT)
@Autowired
public final void setCredentialsClientFactory(final CredentialsClientFactory factory) {
this.credentialsClientFactory = Objects.requireNonNull(factory);
}
/**
* Gets the factory used for creating a client for the Credentials service.
*
* @return The factory.
*/
public final CredentialsClientFactory getCredentialsClientFactory() {
return credentialsClientFactory;
}
/**
* Sets the producer for connections events.
* <p>
* Note that subclasses are not required to actually emit connection events.
* In particular, adapters for connection-less protocols like e.g. HTTP will
* most likely not emit such events.
*
* @param connectionEventProducer The instance which will handle the production of connection events. Depending on
* the setup this could be a simple log message or an event using the Hono Event API.
* @throws NullPointerException if the producer is {@code null}.
*/
@Autowired(required = false)
public void setConnectionEventProducer(final ConnectionEventProducer connectionEventProducer) {
this.connectionEventProducer = Objects.requireNonNull(connectionEventProducer);
log.info("using [{}] for reporting connection events, if applicable for device protocol", connectionEventProducer);
}
/**
* Gets the producer of connection events.
*
* @return The implementation for producing connection events. May be {@code null}.
*/
public ConnectionEventProducer getConnectionEventProducer() {
return this.connectionEventProducer;
}
/**
* Gets this adapter's type name.
* <p>
* The name should be unique among all protocol adapters that are part of a Hono installation. There is no specific
* scheme to follow but it is recommended to include the adapter's origin and the protocol that the adapter supports
* in the name and to use lower case letters only.
* <p>
* Based on this recommendation, Hono's standard HTTP adapter for instance might report <em>hono-http</em> as its
* type name.
* <p>
* The name returned by this method is added to message that are forwarded to downstream consumers.
*
* @return The adapter's name.
*/
protected abstract String getTypeName();
/**
* Gets the number of seconds after which this protocol adapter should give up waiting for an upstream command for a
* device of a given tenant.
* <p>
* Protocol adapters may override this method to e.g. use a static value for all tenants.
*
* @param tenant The tenant that the device belongs to.
* @param deviceTtd The TTD value provided by the device in seconds.
* @return A succeeded future that contains {@code null} if device TTD is {@code null}, or otherwise the lesser of
* device TTD and the value returned by {@link TenantObject#getMaxTimeUntilDisconnect(String)}.
* @throws NullPointerException if tenant is {@code null}.
*/
protected Future<Integer> getTimeUntilDisconnect(final TenantObject tenant, final Integer deviceTtd) {
Objects.requireNonNull(tenant);
if (deviceTtd == null) {
return Future.succeededFuture();
} else {
return Future.succeededFuture(Math.min(tenant.getMaxTimeUntilDisconnect(getTypeName()), deviceTtd));
}
}
/**
* Sets the factory to use for creating clients to receive commands via the AMQP Messaging Network.
*
* @param factory The factory.
* @throws NullPointerException if factory is {@code null}.
*/
@Autowired
public final void setCommandConsumerFactory(final ProtocolAdapterCommandConsumerFactory factory) {
this.commandConsumerFactory = Objects.requireNonNull(factory);
}
/**
* Gets the factory used for creating clients to receive commands via the AMQP Messaging Network.
*
* @return The factory.
*/
public final ProtocolAdapterCommandConsumerFactory getCommandConsumerFactory() {
return this.commandConsumerFactory;
}
/**
* Sets the component for mapping an incoming command to the gateway (if applicable)
* and protocol adapter instance that can handle it.
*
* @param commandTargetMapper The mapper component.
* @throws NullPointerException if commandTargetMapper is {@code null}.
*/
@Autowired
public final void setCommandTargetMapper(final CommandTargetMapper commandTargetMapper) {
this.commandTargetMapper = Objects.requireNonNull(commandTargetMapper);
}
/**
* Sets the ResourceLimitChecks instance used to check if the number of connections exceeded the limit or not.
*
* @param resourceLimitChecks The ResourceLimitChecks instance
* @throws NullPointerException if the resourceLimitChecks is {@code null}.
*/
@Autowired(required = false)
public final void setResourceLimitChecks(final ResourceLimitChecks resourceLimitChecks) {
this.resourceLimitChecks = Objects.requireNonNull(resourceLimitChecks);
}
/**
* Gets the ResourceLimitChecks instance used to check if the number of connections exceeded the limit or not.
*
* @return The ResourceLimitChecks instance.
*/
protected final ResourceLimitChecks getResourceLimitChecks() {
return this.resourceLimitChecks;
}
/**
* Sets the manager to use for connection limits.
*
* @param connectionLimitManager The implementation that manages the connection limit.
*/
public final void setConnectionLimitManager(final ConnectionLimitManager connectionLimitManager) {
this.connectionLimitManager = connectionLimitManager;
}
/**
* Gets the manager to use for connection limits.
*
* @return The manager. May be {@code null}.
*/
protected final ConnectionLimitManager getConnectionLimitManager() {
return connectionLimitManager;
}
/**
* Establishes the connections to the services this adapter depends on.
* <p>
* Note that the connections will most likely not have been established when the
* returned future completes. The {@link #isConnected()} method can be used to
* determine the current connection status.
*
* @return A future indicating the outcome of the startup process. the future will
* fail if the {@link #getTypeName()} method returns {@code null} or an empty string
* or if any of the service clients are not set. Otherwise the future will succeed.
*/
@Override
protected final Future<Void> startInternal() {
final Promise<Void> result = Promise.promise();
if (Strings.isNullOrEmpty(getTypeName())) {
result.fail(new IllegalStateException("adapter does not define a typeName"));
} else if (tenantClient == null) {
result.fail(new IllegalStateException("Tenant client factory must be set"));
} else if (telemetrySender == null) {
result.fail(new IllegalStateException("Telemetry message sender must be set"));
} else if (eventSender == null) {
result.fail(new IllegalStateException("Event sender must be set"));
} else if (registrationClientFactory == null) {
result.fail(new IllegalStateException("Device Registration client factory must be set"));
} else if (credentialsClientFactory == null) {
result.fail(new IllegalStateException("Credentials client factory must be set"));
} else if (commandConsumerFactory == null) {
result.fail(new IllegalStateException("Command & Control client factory must be set"));
} else if (deviceConnectionClientFactory == null) {
result.fail(new IllegalStateException("Device Connection client factory must be set"));
} else {
log.info("using ResourceLimitChecks [{}]", resourceLimitChecks.getClass().getName());
startServiceClient(telemetrySender, "Telemetry");
startServiceClient(eventSender, "Event");
startServiceClient(tenantClient, "Tenant service");
connectToService(registrationClientFactory, "Device Registration service");
connectToService(credentialsClientFactory, "Credentials service");
if (deviceConnectionClientFactory instanceof ConnectionLifecycle) {
connectToService((ConnectionLifecycle<?>) deviceConnectionClientFactory, "Device Connection service");
}
connectToService(
commandConsumerFactory,
"Command & Control",
this::onCommandConnectionLost,
this::onCommandConnectionEstablished)
.onComplete(c -> {
if (c.succeeded()) {
onCommandConnectionEstablished(c.result());
}
});
// initialize components dependent on the above clientFactories
commandTargetMapper.initialize(registrationClientFactory, deviceConnectionClientFactory);
commandConsumerFactory.initialize(commandTargetMapper, deviceConnectionClientFactory);
doStart(result);
}
return result.future();
}
/**
* Invoked after the adapter has started up.
* <p>
* This default implementation simply completes the promise.
* <p>
* Subclasses should override this method to perform any work required on start-up of this protocol adapter.
*
* @param startPromise The promise to complete once start up is complete.
*/
protected void doStart(final Promise<Void> startPromise) {
startPromise.complete();
}
@Override
protected final Future<Void> stopInternal() {
log.info("stopping protocol adapter");
final Promise<Void> result = Promise.promise();
doStop(result);
return result.future()
.compose(s -> closeServiceClients())
.recover(t -> {
log.info("error while stopping protocol adapter", t);
return Future.failedFuture(t);
})
.map(ok -> {
log.info("successfully stopped protocol adapter");
return null;
});
}
private Future<?> closeServiceClients() {
@SuppressWarnings("rawtypes")
final List<Future> results = new ArrayList<>();
results.add(stopServiceClient(tenantClient));
results.add(disconnectFromService(registrationClientFactory));
results.add(disconnectFromService(credentialsClientFactory));
results.add(disconnectFromService(commandConsumerFactory));
if (deviceConnectionClientFactory instanceof ConnectionLifecycle) {
results.add(disconnectFromService((ConnectionLifecycle<?>) deviceConnectionClientFactory));
}
results.add(stopServiceClient(eventSender));
results.add(stopServiceClient(telemetrySender));
return CompositeFuture.all(results);
}
/**
* Stops a service client.
* <p>
* This method invokes the client's {@link Lifecycle#stop()} method.
*
* @param client The client to stop.
* @return A future indicating the outcome of stopping the client.
* @throws NullPointerException if any of the parameters are {@code null}.
*/
protected final Future<Void> stopServiceClient(final Lifecycle client) {
Objects.requireNonNull(client);
return client.stop();
}
private Future<Void> disconnectFromService(final ConnectionLifecycle<?> connection) {
final Promise<Void> disconnectTracker = Promise.promise();
if (connection == null) {
disconnectTracker.complete();
} else {
connection.disconnect(disconnectTracker);
}
return disconnectTracker.future();
}
/**
* Invoked directly before the adapter is shut down.
* <p>
* This default implementation always completes the promise.
* <p>
* Subclasses should override this method to perform any work required before shutting down this protocol adapter.
*
* @param stopPromise The promise to complete once all work is done and shut down should commence.
*/
protected void doStop(final Promise<Void> stopPromise) {
// to be overridden by subclasses
stopPromise.complete();
}
/**
* Checks if this adapter is enabled for a given tenant, requiring the tenant itself to be enabled as well.
*
* @param tenantConfig The tenant to check for.
* @return A succeeded future if the given tenant and this adapter are enabled.
* Otherwise the future will be failed with a {@link ClientErrorException}
* containing the 403 Forbidden status code.
* @throws NullPointerException if tenant config is {@code null}.
*/
protected final Future<TenantObject> isAdapterEnabled(final TenantObject tenantConfig) {
Objects.requireNonNull(tenantConfig);
if (tenantConfig.isAdapterEnabled(getTypeName())) {
log.debug("protocol adapter [{}] is enabled for tenant [{}]",
getTypeName(), tenantConfig.getTenantId());
return Future.succeededFuture(tenantConfig);
} else if (!tenantConfig.isEnabled()) {
log.debug("tenant [{}] is disabled", tenantConfig.getTenantId());
return Future.failedFuture(new ClientErrorException(HttpURLConnection.HTTP_FORBIDDEN,
"tenant is disabled"));
} else {
log.debug("protocol adapter [{}] is disabled for tenant [{}]",
getTypeName(), tenantConfig.getTenantId());
return Future.failedFuture(new ClientErrorException(HttpURLConnection.HTTP_FORBIDDEN,
"adapter disabled for tenant"));
}
}
/**
* Checks if the maximum number of concurrent connections across all protocol
* adapters from devices of a particular tenant has been reached.
* <p>
* This default implementation uses the
* {@link ResourceLimitChecks#isConnectionLimitReached(TenantObject, SpanContext)} method
* to verify if the tenant's overall connection limit across all adapters
* has been reached and also invokes {@link #checkMessageLimit(TenantObject, long, SpanContext)}
* and {@link #checkConnectionDurationLimit(TenantObject, SpanContext)} to check
* if the tenant's message and connection duration limits have been exceeded or not.
*
* @param tenantConfig The tenant to check the connection limit for.
* @param spanContext The currently active OpenTracing span context that is used to
* trace the limits verification or {@code null}
* if no span is currently active.
* @return A succeeded future if the connection and message limits have not been reached yet
* or if the limits could not be checked.
* Otherwise the future will be failed with a {@link AuthorizationException}.
* @throws NullPointerException if tenant is {@code null}.
*/
protected Future<Void> checkConnectionLimit(final TenantObject tenantConfig, final SpanContext spanContext) {
Objects.requireNonNull(tenantConfig);
final Future<Void> connectionLimitCheckResult = resourceLimitChecks.isConnectionLimitReached(tenantConfig, spanContext)
.recover(t -> Future.succeededFuture(Boolean.FALSE))
.compose(isExceeded -> {
if (isExceeded) {
return Future.failedFuture(new TenantConnectionsExceededException(tenantConfig.getTenantId(), null, null));
} else {
return Future.succeededFuture();
}
});
final Future<Void> messageLimitCheckResult = checkMessageLimit(tenantConfig, 1, spanContext)
.recover(t -> {
if (t instanceof ClientErrorException) {
return Future.failedFuture(new DataVolumeExceededException(tenantConfig.getTenantId(), null, null));
}
return Future.failedFuture(t);
});
return CompositeFuture.all(
connectionLimitCheckResult,
checkConnectionDurationLimit(tenantConfig, spanContext),
messageLimitCheckResult).mapEmpty();
}
/**
* Checks if a tenant's message limit will be exceeded by a given payload.
* <p>
* This default implementation uses the
* {@link ResourceLimitChecks#isMessageLimitReached(TenantObject, long, SpanContext)} method
* to verify if the tenant's message limit has been reached.
*
* @param tenantConfig The tenant to check the message limit for.
* @param payloadSize The size of the message payload in bytes.
* @param spanContext The currently active OpenTracing span context that is used to
* trace the limits verification or {@code null}
* if no span is currently active.
* @return A succeeded future if the message limit has not been reached yet
* or if the limits could not be checked.
* Otherwise the future will be failed with a {@link ClientErrorException}
* containing the 429 Too many requests status code.
* @throws NullPointerException if tenant is {@code null}.
*/
protected Future<Void> checkMessageLimit(final TenantObject tenantConfig, final long payloadSize,
final SpanContext spanContext) {
Objects.requireNonNull(tenantConfig);
return resourceLimitChecks
.isMessageLimitReached(tenantConfig,
ServiceBaseUtils.calculatePayloadSize(payloadSize, tenantConfig),
spanContext)
.recover(t -> Future.succeededFuture(Boolean.FALSE))
.compose(isExceeded -> {
if (isExceeded) {
return Future.failedFuture(
new ClientErrorException(HttpResponseStatus.TOO_MANY_REQUESTS.code()));
} else {
return Future.succeededFuture();
}
});
}
/**
* Checks if the maximum connection duration across all protocol adapters
* for a particular tenant has been reached.
* <p>
* This default implementation uses the
* {@link ResourceLimitChecks#isConnectionDurationLimitReached(TenantObject, SpanContext)}
* method to verify if the tenant's overall connection duration across all adapters
* has been reached.
*
* @param tenantConfig The tenant to check the connection duration limit for.
* @param spanContext The currently active OpenTracing span context that is used to
* trace the limits verification or {@code null}
* if no span is currently active.
* @return A succeeded future if the connection duration limit has not yet been reached
* or if the limit could not be checked.
* Otherwise, the future will be failed with a {@link AuthorizationException}.
* @throws NullPointerException if tenantConfig is {@code null}.
*/
protected Future<Void> checkConnectionDurationLimit(final TenantObject tenantConfig,
final SpanContext spanContext) {
Objects.requireNonNull(tenantConfig);
return resourceLimitChecks.isConnectionDurationLimitReached(tenantConfig, spanContext)
.recover(t -> Future.succeededFuture(Boolean.FALSE))
.compose(isExceeded -> {
if (isExceeded) {
return Future.failedFuture(new ConnectionDurationExceededException(tenantConfig.getTenantId(), null, null));
} else {
return Future.succeededFuture();
}
});
}
/**
* Validates a message's target address for consistency with Hono's addressing rules.
*
* @param address The address to validate.
* @param authenticatedDevice The device that has uploaded the message.
* @return A future indicating the outcome of the check.
* <p>
* The future will be completed with the validated target address if all
* checks succeed. Otherwise the future will be failed with a
* {@link ClientErrorException}.
* @throws NullPointerException if address is {@code null}.
*/
protected final Future<ResourceIdentifier> validateAddress(final ResourceIdentifier address, final Device authenticatedDevice) {
Objects.requireNonNull(address);
final Promise<ResourceIdentifier> result = Promise.promise();
if (authenticatedDevice == null) {
if (Strings.isNullOrEmpty(address.getTenantId()) || Strings.isNullOrEmpty(address.getResourceId())) {
result.fail(new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST,
"unauthenticated client must provide tenant and device ID in message address"));
} else {
result.complete(address);
}
} else {
if (!Strings.isNullOrEmpty(address.getTenantId()) && Strings.isNullOrEmpty(address.getResourceId())) {
result.fail(new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST,
"message address must not contain tenant ID only"));
} else if (!Strings.isNullOrEmpty(address.getTenantId()) && !address.getTenantId().equals(authenticatedDevice.getTenantId())) {
result.fail(new ClientErrorException(HttpURLConnection.HTTP_FORBIDDEN, "can only publish for device of same tenant"));
} else if (Strings.isNullOrEmpty(address.getTenantId()) && Strings.isNullOrEmpty(address.getResourceId())) {
// use authenticated device's tenant and device ID
final ResourceIdentifier resource = ResourceIdentifier.from(address,
authenticatedDevice.getTenantId(), authenticatedDevice.getDeviceId());
result.complete(resource);
} else if (Strings.isNullOrEmpty(address.getTenantId())) {
// use authenticated device's tenant ID
final ResourceIdentifier resource = ResourceIdentifier.from(address,
authenticatedDevice.getTenantId(), address.getResourceId());
result.complete(resource);
} else {
result.complete(address);
}
}
return result.future().recover(t -> {
log.debug("validation failed for address [{}], device [{}]: {}", address, authenticatedDevice, t.getMessage());
return Future.failedFuture(t);
});
}
/**
* Checks whether a given device is registered and enabled.
*
* @param device The device to check.
* @param context The currently active OpenTracing span that is used to
* trace the retrieval of the assertion or {@code null}
* if no span is currently active.
* @return A future indicating the outcome.
* The future will be succeeded if the device is registered and enabled.
* Otherwise, the future will be failed with a {@link RegistrationAssertionException}
* containing the root cause of the failure to assert the registration.
* @throws NullPointerException if device is {@code null}.
*/
protected final Future<Void> checkDeviceRegistration(final Device device, final SpanContext context) {
Objects.requireNonNull(device);
return getRegistrationAssertion(
device.getTenantId(),
device.getDeviceId(),
null,
context)
.recover(t -> Future.failedFuture(new RegistrationAssertionException(
device.getTenantId(),
"failed to assert registration status of " + device, t)))
.mapEmpty();
}
/**
* Starts a service client.
* <p>
* This method invokes the given client's {@link Lifecycle#start()} method.
*
* @param serviceClient The client to start.
* @param serviceName The name of the service that the client is for (used for logging).
* @return A future indicating the outcome of starting the client.
* @throws NullPointerException if any of the parameters are {@code null}.
*/
protected final Future<Void> startServiceClient(final Lifecycle serviceClient, final String serviceName) {
Objects.requireNonNull(serviceClient);
Objects.requireNonNull(serviceName);
return serviceClient.start().map(c -> {
log.info("{} client [{}] successfully connected", serviceName, serviceClient);
return c;
}).recover(t -> {
log.warn("{} client [{}] failed to connect", serviceName, serviceClient, t);
return Future.failedFuture(t);
});
}
/**
* Establishes a connection to a Hono Service component.
*
* @param factory The client factory for the service that is to be connected.
* @param serviceName The name of the service that is to be connected (used for logging).
* @return A future that will succeed once the connection has been established. The future will fail if the
* connection cannot be established.
* @throws NullPointerException if serviceName is {@code null}.
* @throws IllegalArgumentException if factory is {@code null}.
* @param <C> The type of connection that the factory uses.
*/
protected final <C> Future<C> connectToService(final ConnectionLifecycle<C> factory, final String serviceName) {
return connectToService(factory, serviceName, null, null);
}
/**
* Establishes a connection to a Hono Service component.
*
* @param factory The client factory for the service that is to be connected.
* @param serviceName The name of the service that is to be connected (used for logging).
* @param disconnectListener A listener to invoke when the connection is lost unexpectedly
* or {@code null} if no listener should be invoked.
* @param reconnectListener A listener to invoke when the connection has been re-established
* after it had been lost unexpectedly or {@code null} if no listener
* should be invoked.
* @return A future that will succeed once the connection has been established. The future will fail if the
* connection cannot be established.
* @throws NullPointerException if serviceName is {@code null}.
* @throws IllegalArgumentException if factory is {@code null}.
* @param <C> The type of connection that the factory uses.
*/
protected final <C> Future<C> connectToService(
final ConnectionLifecycle<C> factory,
final String serviceName,
final DisconnectListener<C> disconnectListener,
final ReconnectListener<C> reconnectListener) {
Objects.requireNonNull(factory);
factory.addDisconnectListener(c -> {
log.info("lost connection to {}", serviceName);
if (disconnectListener != null) {
disconnectListener.onDisconnect(c);
}
});
factory.addReconnectListener(c -> {
log.info("connection to {} re-established", serviceName);
if (reconnectListener != null) {
reconnectListener.onReconnect(c);
}
});
return factory.connect().map(c -> {
log.info("connected to {}", serviceName);
return c;
}).recover(t -> {
log.warn("failed to connect to {}", serviceName, t);
return Future.failedFuture(t);
});
}
/**
* Invoked when a connection for receiving commands and sending responses has been
* unexpectedly lost.
* <p>
* Subclasses may override this method in order to perform housekeeping and/or clear
* state that is associated with the connection. Implementors <em>must not</em> try
* to re-establish the connection, the adapter will try to re-establish the connection
* by default.
* <p>
* This default implementation does nothing.
*
* @param commandConnection The lost connection.
*/
protected void onCommandConnectionLost(final HonoConnection commandConnection) {
// empty by default
}
/**
* Invoked when a connection for receiving commands and sending responses has been
* established.
* <p>
* Note that this method is invoked once the initial connection has been established
* but also when the connection has been re-established after a connection loss.
* <p>
* Subclasses may override this method in order to e.g. re-establish device specific
* links for receiving commands or to create a permanent link for receiving commands
* for all devices.
* <p>
* This default implementation does nothing.
*
* @param commandConnection The (re-)established connection.
*/
protected void onCommandConnectionEstablished(final HonoConnection commandConnection) {
// empty by default
}
/**
* Checks if this adapter is connected to the services it depends on.
* <p>
* Subclasses may override this method in order to add checks or omit checks for
* connection to services that are not used/needed by the adapter.
*
* @return A future indicating the outcome of the check. The future will succeed if this adapter is currently
* connected to
* <ul>
* <li>a Tenant service</li>
* <li>a Device Registration service</li>
* <li>a Credentials service</li>
* <li>the AMQP Messaging Network for receiving command messages</li>
* <li>a Device Connection service</li>
* </ul>
* Otherwise, the future will fail.
*/
protected Future<Void> isConnected() {
@SuppressWarnings("rawtypes")
final List<Future> connections = new ArrayList<>();
connections.add(Optional.ofNullable(registrationClientFactory)
.map(client -> client.isConnected())
.orElseGet(() -> Future.failedFuture(new ServerErrorException(
HttpURLConnection.HTTP_UNAVAILABLE, "Device Registration client factory is not set"))));
connections.add(Optional.ofNullable(credentialsClientFactory)
.map(client -> client.isConnected())
.orElseGet(() -> Future.failedFuture(new ServerErrorException(
HttpURLConnection.HTTP_UNAVAILABLE, "Credentials client factory is not set"))));
connections.add(Optional.ofNullable(commandConsumerFactory)
.map(client -> client.isConnected())
.orElseGet(() -> Future.failedFuture(new ServerErrorException(
HttpURLConnection.HTTP_UNAVAILABLE, "Command & Control client factory is not set"))));
connections.add(Optional.ofNullable(deviceConnectionClientFactory)
.map(client -> {
if (deviceConnectionClientFactory instanceof ConnectionLifecycle) {
return ((ConnectionLifecycle<?>) client).isConnected();
} else {
return Future.succeededFuture();
}
})
.orElseGet(() -> Future.failedFuture(new ServerErrorException(
HttpURLConnection.HTTP_UNAVAILABLE, "Device Connection client factory is not set"))));
return CompositeFuture.all(connections).mapEmpty();
}