-
Notifications
You must be signed in to change notification settings - Fork 201
/
DaprHttpClient.java
197 lines (175 loc) · 5.85 KB
/
DaprHttpClient.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
/*
* Copyright (c) Microsoft Corporation and Dapr Contributors.
* Licensed under the MIT License.
*/
package io.dapr.actors.runtime;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import io.dapr.client.DaprHttp;
import reactor.core.publisher.Mono;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
/**
* A DaprClient over HTTP for Actor's runtime.
*/
class DaprHttpClient implements DaprClient {
/**
* Internal serializer for state.
*/
private static final ActorObjectSerializer INTERNAL_SERIALIZER = new ActorObjectSerializer();
/**
* Shared Json Factory as per Jackson's documentation, used only for this class.
*/
private static final JsonFactory JSON_FACTORY = new JsonFactory();
/**
* The HTTP client to be used.
*
* @see DaprHttp
*/
private final DaprHttp client;
/**
* Internal constructor.
*
* @param client Dapr's http client.
*/
DaprHttpClient(DaprHttp client) {
this.client = client;
}
/**
* {@inheritDoc}
*/
@Override
public Mono<byte[]> getState(String actorType, String actorId, String keyName) {
String[] pathSegments = new String[] { DaprHttp.API_VERSION, "actors", actorType, actorId, "state", keyName };
Mono<DaprHttp.Response> responseMono =
this.client.invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments, null, "", null, null);
return responseMono.map(r -> {
if ((r.getStatusCode() != 200) && (r.getStatusCode() != 204)) {
throw new IllegalStateException(
String.format("Error getting actor state: %s/%s/%s", actorType, actorId, keyName));
}
return r.getBody();
});
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> saveStateTransactionally(
String actorType,
String actorId,
List<ActorStateOperation> operations) {
// Constructing the JSON via a stream API to avoid creating transient objects to be instantiated.
byte[] payload = null;
try (ByteArrayOutputStream writer = new ByteArrayOutputStream()) {
JsonGenerator generator = JSON_FACTORY.createGenerator(writer);
// Start array
generator.writeStartArray();
for (ActorStateOperation stateOperation : operations) {
// Start operation object.
generator.writeStartObject();
generator.writeStringField("operation", stateOperation.getOperationType());
// Start request object.
generator.writeObjectFieldStart("request");
generator.writeStringField("key", stateOperation.getKey());
Object value = stateOperation.getValue();
if (value != null) {
if (value instanceof String) {
// DefaultObjectSerializer is a JSON serializer, so we just pass it on.
generator.writeFieldName("value");
generator.writeRawValue((String) value);
} else if (value instanceof byte[]) {
// Custom serializer uses byte[].
// DefaultObjectSerializer is just a passthrough for byte[], so we handle it here too.
generator.writeBinaryField("value", (byte[]) value);
} else {
return Mono.error(() -> {
throw new IllegalArgumentException("Actor state value must be String or byte[]");
});
}
}
// End request object.
generator.writeEndObject();
// End operation object.
generator.writeEndObject();
}
// End array
generator.writeEndArray();
generator.close();
writer.flush();
payload = writer.toByteArray();
} catch (IOException e) {
return Mono.error(e);
}
String[] pathSegments = new String[] { DaprHttp.API_VERSION, "actors", actorType, actorId, "state" };
return this.client.invokeApi(DaprHttp.HttpMethods.PUT.name(), pathSegments, null, payload, null, null).then();
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> registerReminder(
String actorType,
String actorId,
String reminderName,
ActorReminderParams reminderParams) {
String[] pathSegments = new String[] {
DaprHttp.API_VERSION,
"actors",
actorType,
actorId,
"reminders",
reminderName
};
return Mono.fromCallable(() -> INTERNAL_SERIALIZER.serialize(reminderParams))
.flatMap(data ->
this.client.invokeApi(DaprHttp.HttpMethods.PUT.name(), pathSegments, null, data, null, null)
).then();
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> unregisterReminder(String actorType, String actorId, String reminderName) {
String[] pathSegments = new String[] {
DaprHttp.API_VERSION,
"actors",
actorType,
actorId,
"reminders",
reminderName
};
return this.client.invokeApi(DaprHttp.HttpMethods.DELETE.name(), pathSegments, null, null, null).then();
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> registerTimer(
String actorType,
String actorId,
String timerName,
ActorTimerParams timerParams) {
return Mono.fromCallable(() -> INTERNAL_SERIALIZER.serialize(timerParams))
.flatMap(data -> {
String[] pathSegments = new String[] {
DaprHttp.API_VERSION,
"actors",
actorType,
actorId,
"timers",
timerName
};
return this.client.invokeApi(DaprHttp.HttpMethods.PUT.name(), pathSegments, null, data, null, null);
}).then();
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> unregisterTimer(String actorType, String actorId, String timerName) {
String[] pathSegments = new String[] { DaprHttp.API_VERSION, "actors", actorType, actorId, "timers", timerName };
return this.client.invokeApi(DaprHttp.HttpMethods.DELETE.name(), pathSegments, null, null, null).then();
}
}