-
Notifications
You must be signed in to change notification settings - Fork 214
/
HttpPushFactory.java
95 lines (84 loc) · 3.88 KB
/
HttpPushFactory.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/*
* Copyright (c) 2019 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.httppush;
import java.time.Duration;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.service.config.HttpPushConfig;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitor;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.ConnectionLogger;
import org.eclipse.ditto.connectivity.service.messaging.tunnel.SshTunnelState;
import org.eclipse.ditto.internal.utils.metrics.instruments.timer.PreparedTimer;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.http.javadsl.model.HttpRequest;
import org.apache.pekko.http.javadsl.model.HttpResponse;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.stream.javadsl.Flow;
import scala.util.Try;
/**
* Factory of HTTP requests and request handling flows.
*/
public interface HttpPushFactory {
/**
* Create a request template without headers or payload for an HTTP publish target.
* Published external messages set the headers and payload.
*
* @param httpPublishTarget the HTTP publish target.
* @return the HTTP request for the target.
*/
HttpRequest newRequest(HttpPublishTarget httpPublishTarget);
/**
* Create a flow to send HTTP(S) requests.
*
* @param system the actor system with the default Pekko HTTP configuration.
* @param log logger for the flow.
* @param requestTimeout timeout of each request.
* @return flow from request-correlationId pairs to response-correlationId pairs.
*/
default Flow<Pair<HttpRequest, HttpPushContext>, Pair<Try<HttpResponse>, HttpPushContext>, ?> createFlow(
final ActorSystem system,
final LoggingAdapter log,
final Duration requestTimeout) {
return createFlow(system, log, requestTimeout, null, null);
}
/**
* Create a flow to send HTTP(S) requests.
*
* @param system the actor system with the default Pekko HTTP configuration.
* @param log logger for the flow.
* @param requestTimeout timeout of each request.
* @param timer timer to measure HTTP requests.
* @param durationConsumer consumer of measured HTTP request durations.
* @return flow from request-correlationId pairs to response-correlationId pairs.
*/
Flow<Pair<HttpRequest, HttpPushContext>, Pair<Try<HttpResponse>, HttpPushContext>, ?> createFlow(ActorSystem system,
LoggingAdapter log, Duration requestTimeout, @Nullable PreparedTimer timer,
@Nullable BiConsumer<Duration, ConnectionMonitor.InfoProvider> durationConsumer);
/**
* Create an HTTP-push-factory from a valid HTTP-push connection
* with undefined behavior if the connection is not valid.
*
* @param connection the connection.
* @param httpPushConfig configuration of Http connections.
* @param connectionLogger the connection logger
* @param tunnelConfigSupplier a supplier of the SshTunnelState
* @return the HTTP-push-factory.
*/
static HttpPushFactory of(final Connection connection, final HttpPushConfig httpPushConfig,
final ConnectionLogger connectionLogger, final Supplier<SshTunnelState> tunnelConfigSupplier) {
return DefaultHttpPushFactory.of(connection, httpPushConfig, connectionLogger, tunnelConfigSupplier);
}
}