-
Notifications
You must be signed in to change notification settings - Fork 803
/
MicroserviceRegisterTask.java
342 lines (298 loc) · 14.2 KB
/
MicroserviceRegisterTask.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicecomb.serviceregistry.task;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import javax.ws.rs.core.Response.Status;
import org.apache.servicecomb.foundation.common.base.ServiceCombConstants;
import org.apache.servicecomb.serviceregistry.RegistryUtils;
import org.apache.servicecomb.serviceregistry.api.registry.Microservice;
import org.apache.servicecomb.serviceregistry.api.response.GetSchemaResponse;
import org.apache.servicecomb.serviceregistry.client.ServiceRegistryClient;
import org.apache.servicecomb.serviceregistry.client.http.Holder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
public class MicroserviceRegisterTask extends AbstractRegisterTask {
private static final Logger LOGGER = LoggerFactory.getLogger(MicroserviceRegisterTask.class);
private boolean schemaIdSetMatch;
public MicroserviceRegisterTask(EventBus eventBus, ServiceRegistryClient srClient, Microservice microservice) {
super(eventBus, srClient, microservice);
this.taskStatus = TaskStatus.READY;
}
public boolean isSchemaIdSetMatch() {
return schemaIdSetMatch;
}
@Subscribe
public void onMicroserviceInstanceHeartbeatTask(MicroserviceInstanceHeartbeatTask task) {
if (task.getHeartbeatResult() != HeartbeatResult.SUCCESS && isSameMicroservice(task.getMicroservice())) {
LOGGER.info("read MicroserviceInstanceHeartbeatTask status is {}", task.taskStatus);
this.taskStatus = TaskStatus.READY;
this.registered = false;
}
}
@Subscribe
public void onInstanceRegistryFailed(MicroserviceInstanceRegisterTask task) {
if (task.taskStatus != TaskStatus.FINISHED) {
LOGGER.info("read MicroserviceInstanceRegisterTask status is {}", task.taskStatus);
this.taskStatus = TaskStatus.READY;
this.registered = false;
}
}
@Override
protected boolean doRegister() {
LOGGER.info("running microservice register task.");
String serviceId = srClient.getMicroserviceId(microservice.getAppId(),
microservice.getServiceName(),
microservice.getVersion(),
microservice.getEnvironment());
if (!StringUtils.isEmpty(serviceId)) {
// This microservice has been registered, so we just use the serviceId gotten from service center
microservice.setServiceId(serviceId);
LOGGER.info(
"Microservice exists in service center, no need to register. id=[{}] appId=[{}], name=[{}], version=[{}], env=[{}]",
serviceId,
microservice.getAppId(),
microservice.getServiceName(),
microservice.getVersion(),
microservice.getEnvironment());
if (!checkSchemaIdSet()) {
return false;
}
} else {
serviceId = srClient.registerMicroservice(microservice);
if (StringUtils.isEmpty(serviceId)) {
LOGGER.error(
"Registry microservice failed. appId=[{}], name=[{}], version=[{}], env=[{}]",
microservice.getAppId(),
microservice.getServiceName(),
microservice.getVersion(),
microservice.getEnvironment());
return false;
}
// In re-register microservice case, the old instanceId should not be cached
microservice.getInstance().setInstanceId(null);
LOGGER.info(
"Registry Microservice successfully. id=[{}] appId=[{}], name=[{}], version=[{}], schemaIds={}, env=[{}]",
serviceId,
microservice.getAppId(),
microservice.getServiceName(),
microservice.getVersion(),
microservice.getSchemas(),
microservice.getEnvironment());
}
microservice.setServiceId(serviceId);
microservice.getInstance().setServiceId(microservice.getServiceId());
return registerSchemas();
}
private boolean checkSchemaIdSet() {
Microservice existMicroservice = srClient.getMicroservice(microservice.getServiceId());
if (existMicroservice == null) {
LOGGER.error("Error to get microservice from service center when check schema set");
return false;
}
Set<String> existSchemas = new HashSet<>(existMicroservice.getSchemas());
Set<String> localSchemas = new HashSet<>(microservice.getSchemas());
schemaIdSetMatch = existSchemas.equals(localSchemas);
if (!schemaIdSetMatch) {
LOGGER.warn(
"SchemaIds is different between local and service center. "
+ "serviceId=[{}] appId=[{}], name=[{}], version=[{}], env=[{}], local schemaIds={}, service center schemaIds={}",
microservice.getServiceId(),
microservice.getAppId(),
microservice.getServiceName(),
microservice.getVersion(),
microservice.getEnvironment(),
localSchemas,
existSchemas);
return true;
}
LOGGER.info(
"SchemaIds are equals to service center. serviceId=[{}], appId=[{}], name=[{}], version=[{}], env=[{}], schemaIds={}",
microservice.getServiceId(),
microservice.getAppId(),
microservice.getServiceName(),
microservice.getVersion(),
microservice.getEnvironment(),
localSchemas);
return true;
}
private boolean registerSchemas() {
Holder<List<GetSchemaResponse>> scSchemaHolder = srClient.getSchemas(microservice.getServiceId());
if (Status.OK.getStatusCode() != scSchemaHolder.getStatusCode()) {
LOGGER.error("failed to get schemas from service center, statusCode = [{}]", scSchemaHolder.getStatusCode());
return false;
}
Map<String, GetSchemaResponse> scSchemaMap = convertScSchemaMap(scSchemaHolder);
// CHECK: local > sc, local != sc
for (Entry<String, String> localSchemaEntry : microservice.getSchemaMap().entrySet()) {
if (!registerSchema(scSchemaMap, localSchemaEntry)) {
return false;
}
}
// CHECK: local < sc
checkRemainingSchema(scSchemaMap);
schemaIdSetMatch = true;
return true;
}
/**
* Check whether a local schema is equal to a sc schema.
* @return true if the local schema is equal to a sc schema, or be registered to sc successfully;
* false if schema is registered to sc but failed.
* @throws IllegalStateException The environment is not modifiable, and the local schema is different from sc schema
* or not exist in sc.
*/
private boolean registerSchema(Map<String, GetSchemaResponse> scSchemaMap,
Entry<String, String> localSchemaEntry) {
GetSchemaResponse scSchema = scSchemaMap.get(localSchemaEntry.getKey());
boolean onlineSchemaExists = scSchema != null;
LOGGER.info("schemaId [{}] exists [{}], summary exists [{}]", localSchemaEntry.getKey(), onlineSchemaExists,
scSchema != null && scSchema.getSummary() != null);
if (!onlineSchemaExists) {
// local > sc
return registerNewSchema(localSchemaEntry);
}
scSchemaMap.remove(localSchemaEntry.getKey());
// local != sc
return compareAndReRegisterSchema(localSchemaEntry, scSchema);
}
/**
* Try to register a new schema to service center, or throw exception if cannot register.
* @param localSchemaEntry local schema to be registered.
* @return whether local schema is registered successfully.
* @throws IllegalStateException The environment is unmodifiable.
*/
private boolean registerNewSchema(Entry<String, String> localSchemaEntry) {
// The ids of schemas are contained by microservice registry request, which means once a microservice
// is registered in the service center, the schemas that it contains are determined.
// If we get a microservice but cannot find the given schemaId in it's schemaId list, this means that
// the schemas of this microservice has been changed, and we should decide whether to register this new
// schema according to it's environment configuration.
if (onlineSchemaIsModifiable()) {
return registerSingleSchema(localSchemaEntry.getKey(), localSchemaEntry.getValue());
}
throw new IllegalStateException(
"There is a schema only existing in local microservice: [" + localSchemaEntry.getKey()
+ "], which means there are interfaces changed. "
+ "You need to increment microservice version before deploying, "
+ "or you can configure service_description.environment="
+ ServiceCombConstants.DEVELOPMENT_SERVICECOMB_ENV
+ " to work in development environment and ignore this error");
}
/**
* Compare schema summary and determine whether to re-register schema or throw exception.
* @param localSchemaEntry local schema
* @param scSchema schema in service center
* @return true if the two copies of schema are the same, or local schema is re-registered successfully,
* false if the local schema is re-registered to service center but failed.
* @throws IllegalStateException The two copies of schema are different and the environment is not modifiable.
*/
private boolean compareAndReRegisterSchema(Entry<String, String> localSchemaEntry, GetSchemaResponse scSchema) {
String scSchemaSummary = getScSchemaSummary(scSchema);
if (null == scSchemaSummary) {
// cannot get scSchemaSummary, which means there is no schema content in sc, register schema directly
return registerSingleSchema(localSchemaEntry.getKey(), localSchemaEntry.getValue());
}
String localSchemaSummary = RegistryUtils.calcSchemaSummary(localSchemaEntry.getValue());
if (!localSchemaSummary.equals(scSchemaSummary)) {
if (onlineSchemaIsModifiable()) {
LOGGER.info(
"schema[{}]'s content is changed and the current environment is [{}], so re-register it!",
localSchemaEntry.getKey(), ServiceCombConstants.DEVELOPMENT_SERVICECOMB_ENV);
return registerSingleSchema(localSchemaEntry.getKey(), localSchemaEntry.getValue());
}
// env is not development, throw an exception and break the init procedure
throw new IllegalStateException(
"The schema(id=[" + localSchemaEntry.getKey()
+ "]) content held by this instance and the service center is different. "
+ "You need to increment microservice version before deploying. "
+ "Or you can configure service_description.environment="
+ ServiceCombConstants.DEVELOPMENT_SERVICECOMB_ENV
+ " to work in development environment and ignore this error");
}
// summaries are the same
return true;
}
/**
* Try to get or calculate scSchema summary.
* @return summary of scSchema,
* or null if there is no schema content in service center
*/
private String getScSchemaSummary(GetSchemaResponse scSchema) {
String scSchemaSummary = scSchema.getSummary();
if (null != scSchemaSummary) {
return scSchemaSummary;
}
// if there is no online summery, query online schema content directly and calculate summary
String onlineSchemaContent = srClient.getSchema(microservice.getServiceId(), scSchema.getSchemaId());
if (null != onlineSchemaContent) {
scSchemaSummary = RegistryUtils.calcSchemaSummary(onlineSchemaContent);
}
return scSchemaSummary;
}
/**
* Check whether there are schemas remaining in service center but not exist in local microservice.
* @throws IllegalStateException There are schemas only existing in service center, and the environment is unmodifiable.
*/
private void checkRemainingSchema(Map<String, GetSchemaResponse> scSchemaMap) {
if (!scSchemaMap.isEmpty()) {
// there are some schemas only exist in service center
if (!onlineSchemaIsModifiable()) {
// env is not development, throw an exception and break the init procedure
throw new IllegalStateException("There are schemas only existing in service center: " + scSchemaMap.keySet()
+ ", which means there are interfaces changed. "
+ "You need to increment microservice version before deploying, "
+ "or if service_description.environment="
+ ServiceCombConstants.DEVELOPMENT_SERVICECOMB_ENV
+ ", you can delete microservice information in service center and restart this instance.");
}
// Currently nothing to do but print a warning
LOGGER.warn("There are schemas only existing in service center: {}, which means there are interfaces changed. "
+ "It's recommended to increment microservice version before deploying.",
scSchemaMap.keySet());
LOGGER.warn("ATTENTION: The schemas in new version are less than the old version, "
+ "which may cause compatibility problems.");
}
}
private boolean onlineSchemaIsModifiable() {
return ServiceCombConstants.DEVELOPMENT_SERVICECOMB_ENV.equalsIgnoreCase(microservice.getEnvironment());
}
/**
* Register a schema directly.
* @return true if register success, otherwise false
*/
private boolean registerSingleSchema(String schemaId, String content) {
return srClient.registerSchema(microservice.getServiceId(), schemaId, content);
}
private Map<String, GetSchemaResponse> convertScSchemaMap(Holder<List<GetSchemaResponse>> scSchemaHolder) {
Map<String, GetSchemaResponse> scSchemaMap = new HashMap<>();
List<GetSchemaResponse> scSchemaList = scSchemaHolder.getValue();
if (null == scSchemaList) {
return scSchemaMap;
}
for (GetSchemaResponse scSchema : scSchemaList) {
scSchemaMap.put(scSchema.getSchemaId(), scSchema);
}
return scSchemaMap;
}
}