-
Notifications
You must be signed in to change notification settings - Fork 214
/
StreamingType.java
124 lines (108 loc) · 3.9 KB
/
StreamingType.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
/*
* Copyright (c) 2017 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.utils.pubsub;
import java.util.Arrays;
import java.util.Optional;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.Command;
/**
* Enumeration of the different types which can be streamed (e.g. to an open Websocket connection). Each
* type is also aware of the key used for distributed pub/sub in the Pekko cluster.
*/
public enum StreamingType {
/**
* Streaming type of thing events.
*/
EVENTS("things.events:"),
/**
* Streaming type of message commands. The pubsub topic must be equal to the type prefix of message commands.
*/
MESSAGES("messages.commands:"),
/**
* Streaming type of live commands.
*/
LIVE_COMMANDS("things-live-commands"),
/**
* Streaming type of live events.
*/
LIVE_EVENTS("things-live-events"),
/**
* Streaming type of policy announcements.
*
* @since 2.0.0
*/
POLICY_ANNOUNCEMENTS("policy-announcements");
/**
* Equal to TopicPath.Channel.LIVE.getName().
* Not referencing TopicPath in order not to depend on ditto-protocol-adapter.
*/
private static final String LIVE_CHANNEL_NAME = "live";
private final String distributedPubSubTopic;
StreamingType(final String distributedPubSubTopic) {
this.distributedPubSubTopic = distributedPubSubTopic;
}
/**
* @return the key used for distributed pub/sub in the Pekko cluster for this StreamingType.
*/
public String getDistributedPubSubTopic() {
return distributedPubSubTopic;
}
/**
* Returns a {@code StreamingType} from a given {@code distributedPubSubTopic} representation.
*
* @param distributedPubSubTopic the string representation of the topic.
* @return the StreamingType.
*/
public static StreamingType fromTopic(final String distributedPubSubTopic) {
return Arrays.stream(values())
.filter(header -> distributedPubSubTopic.equals(header.getDistributedPubSubTopic()))
.findFirst()
.orElseThrow(
() -> new IllegalStateException("Unknown distributedPubSubTopic: " + distributedPubSubTopic));
}
/**
* Test whether a signal belongs to the live channel.
*
* @param signal the signal.
* @return whether it is a live signal.
*/
public static boolean isLiveSignal(final Signal<?> signal) {
return signal.getDittoHeaders().getChannel().filter(LIVE_CHANNEL_NAME::equals).isPresent();
}
/**
* Get the approximate streaming type of a signal as far as it can be discerned.
*
* @param signal the signal.
* @return the streaming type most appropriate for the signal.
*/
public static Optional<StreamingType> fromSignal(final Signal<?> signal) {
final StreamingType result;
final boolean isThingEvent = signal.getType().startsWith(EVENTS.getDistributedPubSubTopic());
if (isLiveSignal(signal)) {
if (isThingEvent) {
result = LIVE_EVENTS;
} else if (signal.getType().startsWith(MESSAGES.getDistributedPubSubTopic())) {
result = MESSAGES;
} else if (signal instanceof Command<?>) {
result = LIVE_COMMANDS;
} else {
result = null;
}
} else if (isThingEvent) {
result = EVENTS;
} else {
result = null;
}
return Optional.ofNullable(result);
}
}