Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #4856]support mcp over xds #5124

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 28 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,31 @@ under the License.
Also, please refer to each LICENSE.<component>.txt file, which is located in
the 'license' directory of the distribution file, for the license terms of the
components that this product depends on.

------
This product has a bundle Protocol Buffers��
Protocol Buffers
=======================
Protocol Buffers for Go with Gadgets

Copyright (c) 2013, The GoGo Authors. All rights reserved.
http://github.com/gogo/protobuf

------
This product has a bundle Istio��
Istio
=======================

Copyright 2018 Istio Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
36 changes: 36 additions & 0 deletions istio/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,23 @@
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/io.envoyproxy.controlplane/api -->
<dependency>
<groupId>io.envoyproxy.controlplane</groupId>
<artifactId>api</artifactId>
<version>0.1.27</version>
</dependency>
</dependencies>


<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.5.0.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<artifactId>maven-source-plugin</artifactId>
Expand All @@ -118,6 +132,28 @@
</descriptorRefs>
</configuration>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf-java.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${protoc-gen-grpc-java.version}:exe:${os.detected.classifier}</pluginArtifact>

<useArgumentFile>true</useArgumentFile>
<checkStaleness>true</checkStaleness>
<staleMillis>10000</staleMillis>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
<resources>
<resource>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,5 @@
* @since 1.1.4
*/
public class CollectionTypes {

public static final String SERVICE_ENTRY = "istio/networking/v1alpha3/serviceentries";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.istio.mcp;

import com.alibaba.nacos.istio.misc.Loggers;
import com.google.protobuf.Any;
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.grpc.stub.StreamObserver;
import istio.mcp.v1alpha1.ResourceOuterClass;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Nacos MCP server.
*
* <p>This MCP serves as a ResourceSource defined by Istio.
*
* @author huaicheng.lzp
* @since 1.2.1
*/
@org.springframework.stereotype.Service
public class NacosMcpOverXdsService extends AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase {

private final AtomicInteger connectIdGenerator = new AtomicInteger(0);

private final Map<Integer, StreamObserver<DiscoveryResponse>> connnections = new ConcurrentHashMap<>(16);

private final ConcurrentHashMap<Integer, Boolean> connectionInited = new ConcurrentHashMap<>();

private static final String MCP_RESOURCES_URL = "type.googleapis.com/istio.mcp.v1alpha1.Resource";

private static final String SERVICEENTY_TYPE = "networking.istio.io/v1alpha3/ServiceEntry";

private Map<String, ResourceOuterClass.Resource> resourceMapCache;

/**
* Send resources to connections.
*
* @param resourceMap all mcp resource
*/
public void sendResources(Map<String, ResourceOuterClass.Resource> resourceMap) {
resourceMapCache = resourceMap;
Loggers.MAIN.info("send resources for mcpOverXds,count : {}", resourceMap.size());
DiscoveryResponse discoveryResponse = generateResponse(resourceMap);
if (Loggers.MAIN.isDebugEnabled()) {
Loggers.MAIN.debug("discoveryResponse:{}", discoveryResponse.toString());
}
for (StreamObserver<DiscoveryResponse> observer : connnections.values()) {
Loggers.MAIN.info("mcpOverXds send to:{}", observer.toString());
observer.onNext(discoveryResponse);
}
}

/**
* generate response by resource.
*
* @param resourceMap all mcp resource
* @return
*/
private DiscoveryResponse generateResponse(Map<String, ResourceOuterClass.Resource> resourceMap) {
List<Any> anies = new ArrayList<>();
for (ResourceOuterClass.Resource resource : resourceMap.values()) {
Any any = Any.newBuilder().setValue(resource.toByteString()).setTypeUrl(MCP_RESOURCES_URL).build();
anies.add(any);
}
return DiscoveryResponse.newBuilder().addAllResources(anies)
.setNonce(String.valueOf(System.currentTimeMillis())).setTypeUrl(SERVICEENTY_TYPE).build();
}

@Override
public StreamObserver<DiscoveryRequest> streamAggregatedResources(
StreamObserver<DiscoveryResponse> responseObserver) {

int id = connectIdGenerator.incrementAndGet();
connnections.put(id, responseObserver);

return new StreamObserver<DiscoveryRequest>() {
private final int connectionId = id;

@Override
public void onNext(DiscoveryRequest discoveryRequest) {
Loggers.MAIN.info("receiving request, {}", discoveryRequest.toString());

if (discoveryRequest.getErrorDetail() != null && discoveryRequest.getErrorDetail().getCode() != 0) {

Loggers.MAIN.error("NACK error code: {}, message: {}", discoveryRequest.getErrorDetail().getCode(),
discoveryRequest.getErrorDetail().getMessage());
return;
}
if (SERVICEENTY_TYPE.equals(discoveryRequest.getTypeUrl())) {
Boolean inited = connectionInited.get(id);
if (inited == null || !inited) {
connectionInited.put(id, true);
if (resourceMapCache != null) {
DiscoveryResponse discoveryResponse = generateResponse(resourceMapCache);
Loggers.MAIN.info("ACK for serviceEntry discoveryRequest {}", discoveryRequest.toString());
responseObserver.onNext(discoveryResponse);
}
}
}
}

@Override
public void onError(Throwable throwable) {
Loggers.MAIN.error("stream error.", throwable);
connnections.remove(connectionId);
}

@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import javax.annotation.PostConstruct;
import java.io.IOException;


/**
* Nacos MCP server.
*
Expand All @@ -52,6 +51,12 @@ public class NacosMcpServer {
@Autowired
private NacosMcpService nacosMcpService;

@Autowired
private NacosMcpOverXdsService nacosMcpOverXdsService;

@Autowired
private NacosToMcpResources nacosToMcpResources;

/**
* Start.
*
Expand All @@ -67,9 +72,9 @@ public void start() throws IOException {
Loggers.MAIN.info("MCP server, starting Nacos MCP server...");

server = ServerBuilder.forPort(port).addService(ServerInterceptors.intercept(nacosMcpService, intercepter))
.build();
.addService(ServerInterceptors.intercept(nacosMcpOverXdsService, intercepter)).build();
server.start();

nacosToMcpResources.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
Expand Down