diff --git a/Makefile b/Makefile index a5171c9..b93179f 100644 --- a/Makefile +++ b/Makefile @@ -147,6 +147,11 @@ LOG_LEVEL := info ISTIO_HOST := $(shell ifconfig -a | awk '/inet / {print $$2}' | grep -v '127.' | grep -v '192.' | head -n 1) +WITH_KIND ?= OFF +ifeq ($(WITH_KIND),ON) + NET_FLAGS := --network kind +endif + .PHONY: start-aigw-xds start-aigw-xds: cat etc/envoy-istio.yaml \ @@ -154,6 +159,7 @@ start-aigw-xds: > etc/envoy-xds.yaml @echo "using ${MC_HOST} as Metadata Center Host" docker run --entrypoint /bin/bash --name dev_aigw --rm -d \ + $(NET_FLAGS) \ -e AIGW_META_DATA_CENTER_HOST=${MC_HOST} \ -e AIGW_META_DATA_CENTER_PORT=${MC_PORT} \ -v $(PWD)/etc/envoy-xds.yaml:/etc/envoy.yaml \ @@ -193,20 +199,33 @@ build-image: LOG_LEVEL = info PILOT_IMAGE = $(DOCKER_MIRROR)docker.io/istio/pilot:1.27.3 -PILOT_CMD = pilot-discovery discovery \ - --log_output_level $(LOG_LEVEL) \ - --meshConfig /etc/istio.yaml \ - --configDir /etc/config_crds \ - --httpsAddr= --registries= +PILOT_BASE := pilot-discovery discovery \ + --log_output_level $(LOG_LEVEL) \ + --meshConfig /etc/istio.yaml \ + --configDir /etc/config_crds \ + --httpsAddr= + +ifeq ($(WITH_KIND),ON) + PILOT_REGISTRY := --registries Kubernetes --kubeconfig /etc/kubeconfig.yaml + MAPPED_KIND_CONFIG := -v "$(PWD)/etc/kind-kubeconfig.yaml:/etc/kubeconfig.yaml" +else + PILOT_REGISTRY := --registries= +endif + +PILOT_CMD := $(PILOT_BASE) $(PILOT_REGISTRY) .PHONY: start-istio start-istio: docker run --name dev_istio \ --entrypoint bash --rm -it -d \ + $(NET_FLAGS) \ + --privileged=true \ + --user root \ -e INJECT_ENABLED=false \ -e ENABLE_CA_SERVER=false \ -v $(PWD)/etc/istio.yaml:/etc/istio.yaml \ -v $(PWD)/etc/config_crds:/etc/config_crds \ + $(MAPPED_KIND_CONFIG) \ -p 15010:15010 \ -p 8080:8080\ ${PILOT_IMAGE} \ @@ -215,3 +234,10 @@ start-istio: .PHONY: stop-istio stop-istio: docker stop dev_istio + +.PHONY: start-mock-service +start-mock-service: + cd $(PWD)/test/service_discovery/ + docker build -t mock-service:1.0 . + kind load docker-image mock-service:1.0 --name istio-test + kubectl apply -f mock_service_config.yaml \ No newline at end of file diff --git a/docs/en/developer-guide.md b/docs/en/developer-guide.md index efef3d9..a266f11 100644 --- a/docs/en/developer-guide.md +++ b/docs/en/developer-guide.md @@ -1,23 +1,22 @@ -## Developer Guide +# Developer Guide -## Two Ways to Start AIGW +## 1. Two Ways to Start AIGW 1. Local Independent Way: Using static configuration for service discovery, easy for local development. 2. Integrated with Istio: Using Istio as the control plane, leveraging Istio's service discovery capabilities, suitable for production environments. -## Environment Preparation +## 2. Environment Preparation 1. docker 2. golang 1.22+ -## Start Metadata Center +## 3. Start Metadata Center -Both methods require starting the Metadata Center service, as -AIGW leverage the Metadata Center component to implement near real-time load metric collection. +Both methods require starting the Metadata Center service, as AIGW leverage the Metadata Center component to implement near real-time load metric collection. Please refer to the [Metadata Center documentation](https://github.com/aigw-project/metadata-center/blob/main/docs/en/developer_guide.md ) to start the local Metadata Center service. The Metadata Center defaults to listening on the local IP and port `8080`. -## Compilation +## 4. Compilation Compile AIGW into a shared object: @@ -25,16 +24,16 @@ Compile AIGW into a shared object: make build-so ``` -## Local Independent Way +## 5. Local Independent Way For the convenience of local development, AIGW supports static configuration for service discovery, i.e., specifying the address and port of service instances through configuration files. -### Static Configuration for Service Discovery +### 5.1 Static Configuration for Service Discovery You can refer to the example at: [etc/clusters.json](../../etc/clusters.json), which defines `127.0.0.1:10001` as the instance of the `qwen3.service`. -## Start Service +#### Start Service Start AIGW using [etc/envoy-local.yaml](../../etc/envoy-local.yaml) as the Envoy configuration file and [etc/clusters.json](../../etc/clusters.json) as the static service discovery configuration file: @@ -42,13 +41,13 @@ Start AIGW using [etc/envoy-local.yaml](../../etc/envoy-local.yaml) as the Envoy make start-aigw-local ``` -## Integrated with Istio Way +### 5.2 Integrated with Istio Comming soon. Integrating with Istio as the control plane, using Istio's service discovery capabilities, can automatically synchronize service instance information with the k8s cluster, suitable for production environments. -### Start Istio +#### Start Istio For easy debugging, we start a local Istio control plane that watch the CRD files in the `etc/config_crds` directory. @@ -56,18 +55,48 @@ For easy debugging, we start a local Istio control plane that watch the CRD file make start-istio ``` -### Service Discovery +#### Service Discovery We use the ServiceEntry resource to define service instances, as shown in the [etc/config_crds/service-entry.yaml](etc/config_crds/service-entry.yaml) file. -### Start Service +#### Start Service Both methods can start AIGW integrated with Istio: ```shell -make start-aigw-istio +make start-aigw-xds ``` -## After Starting +### 5.3 Integration with Istio & Kubernetes + +#### Prepare the Kubernetes cluster +Follow the [Kubernetes + Istio Setup Guidance](../../docs/zh/service_discovery_guide/setup_env_zh.md) to deploy a Kubernetes cluster with Kind and start Istio. + +#### Start Istio & subscribe to the Kubernetes Service API +Export the Kind cluster kubeconfig to the `./etc` directory so Istio can subscribe to it: + +```bash +kind get kubeconfig --name istio-test > ./etc/kind-kubeconfig.yaml +``` + +Start Istio: +```bash +make WITH_KIND=ON start-istio +``` + +#### Start the Mock Service +Launch the Mock Service as an upstream component that will be discovered by AIGW: +```bash +make start-mock-service +``` + +#### Start the AIGW Service +Start AIGW, which brings up a custom xDS server, subscribes CDS/EDS from Istio Pilot, and starts a gRPC server for Envoy to fetch configuration. +Data flow: Istio Pilot => AIGW custom xDS server => Envoy. +```bash +make WITH_KIND=ON start-aigw-xds +``` + +## 6. After Starting Both two ways will start two services: 1. Port `10000`: AIGW service @@ -75,7 +104,7 @@ Both two ways will start two services: It will also use the locally started Metadata Center for load metric collection by default, that listening on the local IP and port `8080`. -## Testing +## 7. Testing Send a request using curl: @@ -95,7 +124,7 @@ curl 'localhost:10000/v1/chat/completions' \ }' ``` -## Stop Service +## 8. Stop Service ```shell make stop-aigw diff --git a/docs/zh/developer-guide.md b/docs/zh/developer-guide.md index b113cac..115704d 100644 --- a/docs/zh/developer-guide.md +++ b/docs/zh/developer-guide.md @@ -1,23 +1,23 @@ -## 开发者指南 +# 开发者指南 -## 运行方式 +## 1. 运行方式 AIGW 支持多种运行方式: 1. 本地独立运行,使用静态配置文件的方式进行服务发现,适合本地开发和调试 2. 集成 Istio 作为控制面,使用 Istio 提供的服务发现能力,适合生产环境 -## 环境准备 +## 2. 环境准备 1. docker 2. golang 1.22+ -## 启动 Metadata Center +## 3. 启动 Metadata Center 两种方式都需要启动 Metadata Center 服务,因为 AIGW 依赖 Metadata Center 组件来实现准实时的负载指标统计, 请参考 [Metadata Center 文档](https://github.com/aigw-project/metadata-center/blob/main/docs/zh/developer_guide.md) 启动本地的 Metadata Center 服务。 Metadata Center 默认监听本地 IP 和 `8080` 端口。 -## 编译 +## 4. 编译 将 AIGW 编译为 shared object: @@ -25,15 +25,15 @@ Metadata Center 默认监听本地 IP 和 `8080` 端口。 make build-so ``` -## 本地独立运行方式 +## 5. 本地独立运行方式 为了本地开发方便,AIGW 支持本地独立运行方式,使用静态配置的方式进行服务发现,也即通过配置文件指定服务实例的地址和端口。 -### 静态配置服务发现 +### 5.1 静态配置服务发现 示例可以查看:[etc/clusters.json](../../etc/clusters.json),该文件定义了 `127.0.0.1:10001` 作为 `qwen3.service` 这个服务的实例。 -### 启动服务 +#### 启动服务 将使用 [etc/envoy-local.yaml](../../etc/envoy-local.yaml) 作为 Envoy 的配置文件,并使用 [etc/clusters.json](../../etc/clusters.json) 作为静态服务发现的配置文件启动 AIGW: @@ -41,13 +41,13 @@ make build-so make start-aigw-local ``` -## 集成 Istio 运行方式 +### 5.2 集成 Istio 运行方式 Comming soon. 集成 Istio 作为控制面,使用 Istio 的服务发现能力,可以与 k8s 集群自动同步服务实例信息,适合生产环境。 -### 启动 Istio +#### 启动 Istio 为了方便调试,我们启动一个本地的 Istio 控制面,并且监听 `etc/config_crds` 目录下的 CRD 文件。 @@ -55,11 +55,11 @@ Comming soon. make start-istio ``` -### 服务发现 +#### 服务发现 我们使用 ServiceEntry 资源来定义服务实例,如 [etc/config_crds/service-entry.yaml](etc/config_crds/service-entry.yaml) 文件所示。 -### 启动服务 +#### 启动服务 将使用 [etc/envoy-istio.yaml](../../etc/envoy-istio.yaml) 作为 Envoy 的配置文件,并订阅本地 Istio 作为控制面来启动 AIGW: @@ -67,7 +67,36 @@ make start-istio make start-aigw-xds ``` -## 服务启动检查 +### 5.3 集成 Istio & k8s 运行方式 + +#### 配置k8s集群环境 +参考[k8s + Istio 环境搭建指南](../../docs/zh/service_discovery_guide/setup_env_zh.md)使用Kind部署k8s集群并启动Istio; + +#### 启动 Istio & 订阅 k8s Service API +导出Kind集群配置到./etc目录,供Istio订阅: +```bash +kind get kubeconfig --name istio-test > ./etc/kind-kubeconfig.yaml +``` + +启动Istio: +```bash +make WITH_KIND=ON start-istio +``` + +#### 启动 Mock Service +启动Mock Service,作为upstream服务组件被AIGW发现: +```bash +make start-mock-service +``` + +#### 启动服务 +启动AIGW,拉起自定义xDS服务器,从Istio Pilot订阅CDS/EDS信息,并启动gRPC Server供Envoy拉取。 +服务信息传递流程:Istio Pilot => AIGW 自定义xDS服务器 => Envoy。 +```bash +make WITH_KIND=ON start-aigw-xds +``` + +## 6. 服务启动检查 两种启动方式,都将监听两个端口: 1. `10000` 端口:AIGW 服务 @@ -75,7 +104,7 @@ make start-aigw-xds 并将默认使用本地启动的 Metadata Center 进行负载指标统计,默认使用本地 IP 和 `8080` 端口。 -## 测试 +## 7. 测试 使用 curl 发送请求: @@ -95,7 +124,7 @@ curl 'localhost:10000/v1/chat/completions' \ }' ``` -## 停止服务 +## 8. 停止服务 ```shell make stop-aigw diff --git a/etc/envoy-istio.yaml b/etc/envoy-istio.yaml index fd0ff28..bb648ef 100644 --- a/etc/envoy-istio.yaml +++ b/etc/envoy-istio.yaml @@ -1,5 +1,5 @@ node: - id: router~127.0.0.1~TODO_ID~TO_DOMAIN + id: router~127.0.0.1~aigw.default~cluster.local cluster: cluster admin: diff --git a/pkg/aigateway/discovery/staticdemo/cds_server.go b/pkg/aigateway/discovery/staticdemo/cds_server.go index 9db80cf..8e363b8 100644 --- a/pkg/aigateway/discovery/staticdemo/cds_server.go +++ b/pkg/aigateway/discovery/staticdemo/cds_server.go @@ -25,6 +25,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + managertypes "github.com/aigw-project/aigw/pkg/aigateway/clustermanager/types" "github.com/aigw-project/aigw/pkg/aigateway/discovery/common" ) @@ -100,6 +101,28 @@ func (s *cdsServerImpl) processAllClusters() { s.responseChan <- resp } +func (s *cdsServerImpl) processSubscribedClusters(subscribeClusters []string) { + clusters := []*managertypes.ClusterInfo{} + for _, cname := range subscribeClusters { + c, err := s.provider.GetClusterInfo(cname) + if c == nil || err != nil { + api.LogErrorf("could not find cluster %s, err %v", cname, err) + continue + } + clusters = append(clusters, c) + } + nonce := common.GenerateNonce() + resources := make([]*discovery.Resource, 0, len(clusters)) + for _, c := range clusters { + clustercfg := common.GenerateCluster(c.Name, c.Endpoints, false) + res := common.ConvertClusterToResource(clustercfg, c.Name) + resources = append(resources, res) + } + + resp := common.GenerateDeltaDiscoveryResponseWithRemovedResources(resource.ClusterType, nonce, resources, nil) + s.responseChan <- resp +} + func (s *cdsServerImpl) DeltaClusters(stream cluster.ClusterDiscoveryService_DeltaClustersServer) error { api.LogInfof("new delta clusters stream: %v", stream) @@ -141,9 +164,12 @@ func (s *cdsServerImpl) DeltaClusters(stream cluster.ClusterDiscoveryService_Del } // TODO: delta watching cluster + subscribedClusters := []string{} for _, r := range req.ResourceNamesSubscribe { + subscribedClusters = append(subscribedClusters, r) api.LogInfof("delta watching cluster: %s", r) } + s.processSubscribedClusters(subscribedClusters) } } } diff --git a/pkg/aigateway/discovery/staticdemo/provider.go b/pkg/aigateway/discovery/staticdemo/provider.go index 11f5a11..1e0aacf 100644 --- a/pkg/aigateway/discovery/staticdemo/provider.go +++ b/pkg/aigateway/discovery/staticdemo/provider.go @@ -15,11 +15,22 @@ package staticdemo import ( + "context" "encoding/json" "errors" + "fmt" "io" + "net" "os" + "time" + clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/types/known/anypb" "mosn.io/htnn/api/pkg/filtermanager/api" managertypes "github.com/aigw-project/aigw/pkg/aigateway/clustermanager/types" @@ -27,6 +38,12 @@ import ( const ( staticClusterFile = "/etc/aigw/static_clusters.json" + + // istio + istioPodAddrEnv = "AIGW_ISTIO_ADDR" + defaultAddr = "istiod.istio-system.svc.cluster.local:15010" + ipv4PortRe = `^((?:\d{1,3}\.){3}\d{1,3}):(\d{1,5})$` + defaultNodeId = "router~127.0.0.1~aigw.default~cluster.local" ) type StaticEndpoint struct { @@ -44,6 +61,7 @@ type ClustersConfig struct { } var config ClustersConfig +var istioXdsAddr string func init() { fp, err := os.Open(staticClusterFile) @@ -65,6 +83,28 @@ func init() { } api.LogInfof("static cluster config loaded: %+v", config) + + // load istio pod addr + istioXdsAddr = resolveIstioAddr() +} + +// get istio xds server ip:port from enviroment variable; use defaultNodeId by default +func resolveIstioAddr() string { + raw := os.Getenv(istioPodAddrEnv) + if raw == "" { + api.LogInfof("set istio xds addr, env %s not set, use default %s\n", istioPodAddrEnv, defaultAddr) + return defaultAddr + } + + ip, port, err := net.SplitHostPort(raw) + if err != nil { + api.LogWarnf("set istio xds addr, env %s format invalid: %s, use default %s\n", istioPodAddrEnv, raw, defaultAddr) + return defaultAddr + } + + addr := fmt.Sprintf("%s:%s", ip, port) + api.LogInfof("set istio xds addr, env %s valid, use %s\n", istioPodAddrEnv, addr) + return addr } type StaticClusterProvider struct { @@ -89,12 +129,135 @@ func NewStaticClusterProvider() managertypes.ClusterInfoProvider { } } + p.AutoUpdateFromPilot(defaultNodeId, 10*time.Second) + api.LogInfof("new static cluster provider: %+v", p) startCdsServer(defaultCdsAddress, p) return p } +func extractClusterNames(resources []*anypb.Any) []string { + var names []string + for _, res := range resources { + var c clusterpb.Cluster + if err := res.UnmarshalTo(&c); err != nil { + api.LogErrorf("unmarshal cluster failed: %v", err) + continue + } + names = append(names, c.Name) + } + return names +} + +// extract endpoint info from ClusterLoadAssignment +func extractFromCLA(cla *endpointpb.ClusterLoadAssignment) []managertypes.Endpoint { + var eps []managertypes.Endpoint + for _, locality := range cla.Endpoints { + for _, lb := range locality.LbEndpoints { + sock := lb.GetEndpoint().GetAddress().GetSocketAddress() + if sock == nil { + continue + } + ep := managertypes.Endpoint{ + Address: sock.Address, + Port: sock.PortSpecifier.(*corepb.SocketAddress_PortValue).PortValue, + Labels: make(map[string]string), + } + if meta := lb.GetMetadata(); meta != nil { + for k, v := range meta.GetFilterMetadata() { + if k == "istio" { + for kk, vv := range v.GetFields() { + ep.Labels[kk] = vv.GetStringValue() + } + } + } + } + eps = append(eps, ep) + } + } + return eps +} + +// subscribe istio pilot and pull the cds info +func (p *StaticClusterProvider) subscribeIstioPilot(nodeID string) error { + conn, err := grpc.NewClient( + istioXdsAddr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + return err + } + defer conn.Close() + + client := discoverypb.NewAggregatedDiscoveryServiceClient(conn) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + stream, err := client.StreamAggregatedResources(ctx) + if err != nil { + return err + } + + // get cds + cdsReq := &discoverypb.DiscoveryRequest{ + Node: &corepb.Node{Id: nodeID}, + TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster", + } + if err := stream.Send(cdsReq); err != nil { + return err + } + cdsResp, err := stream.Recv() + if err != nil { + return err + } + clusterNames := extractClusterNames(cdsResp.Resources) + if len(clusterNames) == 0 { + return nil + } + + // get eds + edsReq := &discoverypb.DiscoveryRequest{ + Node: &corepb.Node{Id: nodeID}, + TypeUrl: "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", + ResourceNames: clusterNames, + } + if err := stream.Send(edsReq); err != nil { + return err + } + edsResp, err := stream.Recv() + if err != nil { + return err + } + + // fill eds info to provider + for _, res := range edsResp.Resources { + var cla endpointpb.ClusterLoadAssignment + if err := res.UnmarshalTo(&cla); err != nil { + api.LogErrorf("unmarshal cla failed: %v", err) + continue + } + p.allClusters[cla.ClusterName] = &managertypes.ClusterInfo{ + Name: cla.ClusterName, + Endpoints: extractFromCLA(&cla), + } + } + return nil +} + +// updata the snapshot form istio +func (p *StaticClusterProvider) AutoUpdateFromPilot(nodeID string, interval time.Duration) { + go func() { + for { + err := p.subscribeIstioPilot(nodeID) + if err != nil { + api.LogErrorf("failed to pull from pilot: %v", err) + } + time.Sleep(interval) + } + }() +} + func (p *StaticClusterProvider) GetAllClusters() []*managertypes.ClusterInfo { clusters := make([]*managertypes.ClusterInfo, 0, len(p.allClusters)) for _, cluster := range p.allClusters { diff --git a/test/service_discovery/DockerFile b/test/service_discovery/DockerFile new file mode 100644 index 0000000..3756f23 --- /dev/null +++ b/test/service_discovery/DockerFile @@ -0,0 +1,24 @@ +# Copyright The AIGW Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM golang:1.21-alpine AS builder +WORKDIR /app +COPY main.go . +RUN go build -o mock-service main.go + +FROM alpine:latest +WORKDIR /app +COPY --from=builder /app/mock-service . +EXPOSE 8081 +CMD ["./mock-service"] \ No newline at end of file diff --git a/test/service_discovery/mock_service.go b/test/service_discovery/mock_service.go new file mode 100644 index 0000000..8de841d --- /dev/null +++ b/test/service_discovery/mock_service.go @@ -0,0 +1,47 @@ +// Copyright The AIGW Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "log" + "net/http" + "os" +) + +func main() { + port := "8081" + if os.Getenv("PORT") != "" { + port = os.Getenv("PORT") + } + + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + hostname, _ := os.Hostname() + fmt.Fprintf(w, `{ + "message": "Hello from mock service", + "hostname": "%s", + "path": "%s", + "method": "%s" + }`, hostname, r.URL.Path, r.Method) + }) + + http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + fmt.Fprintln(w, "OK") + }) + + log.Printf("Mock service starting on port %s", port) + log.Fatal(http.ListenAndServe(":"+port, nil)) +} diff --git a/test/service_discovery/mock_service_config.yaml b/test/service_discovery/mock_service_config.yaml new file mode 100644 index 0000000..761fcd8 --- /dev/null +++ b/test/service_discovery/mock_service_config.yaml @@ -0,0 +1,43 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: mock-service + namespace: default +spec: + replicas: 2 + selector: + matchLabels: + app: mock-service + template: + metadata: + labels: + app: mock-service + version: v1 + spec: + containers: + - name: mock-service + image: mock-service:1.0 + ports: + - containerPort: 8081 + env: + - name: PORT + value: "8081" + readinessProbe: + httpGet: + path: /health + port: 8081 + initialDelaySeconds: 5 + periodSeconds: 5 +--- +apiVersion: v1 +kind: Service +metadata: + name: mock-service + namespace: default +spec: + selector: + app: mock-service + ports: + - port: 8081 + targetPort: 8081 + name: http \ No newline at end of file