diff --git a/hertzbeat-collector/hertzbeat-collector-collector/pom.xml b/hertzbeat-collector/hertzbeat-collector-collector/pom.xml index 8a108c99de0..ad99d5e286b 100644 --- a/hertzbeat-collector/hertzbeat-collector-collector/pom.xml +++ b/hertzbeat-collector/hertzbeat-collector-collector/pom.xml @@ -70,6 +70,13 @@ ${hertzbeat.version} + + + org.apache.hertzbeat + hertzbeat-collector-milvus + ${hertzbeat.version} + + org.springframework.boot diff --git a/hertzbeat-collector/hertzbeat-collector-milvus/pom.xml b/hertzbeat-collector/hertzbeat-collector-milvus/pom.xml new file mode 100644 index 00000000000..90b93da2c5a --- /dev/null +++ b/hertzbeat-collector/hertzbeat-collector-milvus/pom.xml @@ -0,0 +1,37 @@ + + + 4.0.0 + + org.apache.hertzbeat + hertzbeat-collector + 2.0-SNAPSHOT + + + hertzbeat-collector-milvus + ${project.artifactId} + + + 17 + 17 + UTF-8 + + + + + org.apache.hertzbeat + hertzbeat-collector-common + provided + + + org.apache.hertzbeat + hertzbeat-common + + + org.apache.hertzbeat + hertzbeat-collector-basic + ${project.version} + + + diff --git a/hertzbeat-collector/hertzbeat-collector-milvus/src/main/java/org/apache/hertzbeat/collector/milvus/MilvusCollectorImpl.java b/hertzbeat-collector/hertzbeat-collector-milvus/src/main/java/org/apache/hertzbeat/collector/milvus/MilvusCollectorImpl.java new file mode 100644 index 00000000000..c03dac95660 --- /dev/null +++ b/hertzbeat-collector/hertzbeat-collector-milvus/src/main/java/org/apache/hertzbeat/collector/milvus/MilvusCollectorImpl.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.collector.milvus; + +import lombok.extern.slf4j.Slf4j; +import org.apache.hertzbeat.collector.collect.AbstractCollect; +import org.apache.hertzbeat.collector.collect.common.http.CommonHttpClient; +import org.apache.hertzbeat.collector.collect.prometheus.parser.MetricFamily; +import org.apache.hertzbeat.collector.collect.prometheus.parser.OnlineParser; +import org.apache.hertzbeat.common.constants.CommonConstants; +import org.apache.hertzbeat.common.entity.job.Metrics; +import org.apache.hertzbeat.common.entity.job.protocol.HttpProtocol; +import org.apache.hertzbeat.common.entity.message.CollectRep; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.util.EntityUtils; +import org.springframework.util.CollectionUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Milvus Collector + */ +@Slf4j +public class MilvusCollectorImpl extends AbstractCollect { + + private static final String PROTOCOL = "milvus"; + + @Override + public void preCheck(Metrics metrics) throws IllegalArgumentException { + if (metrics == null || metrics.getMilvus() == null) { + throw new IllegalArgumentException("Milvus collect must has milvus params"); + } + } + + @Override + public void collect(CollectRep.MetricsData.Builder builder, Metrics metrics) { + HttpProtocol milvus = metrics.getMilvus(); + String host = milvus.getHost(); + String port = milvus.getPort(); + String url = "http://" + host + ":" + port + "/metrics"; + + HttpGet request = new HttpGet(url); + try (CloseableHttpResponse response = CommonHttpClient.getHttpClient().execute(request)) { + int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode != 200) { + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg("StatusCode " + statusCode); + return; + } + HttpEntity entity = response.getEntity(); + if (entity != null) { + parseResponse(entity.getContent(), metrics.getAliasFields(), builder); + } + EntityUtils.consumeQuietly(entity); + } catch (Exception e) { + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg(e.getMessage()); + log.error("Milvus collect error", e); + } + } + + private void parseResponse(InputStream content, List aliasFields, CollectRep.MetricsData.Builder builder) throws IOException { + Map metricFamilyMap = OnlineParser.parseMetrics(content, builder.getMetrics()); + if (metricFamilyMap == null || metricFamilyMap.isEmpty()) { + return; + } + MetricFamily metricFamily = metricFamilyMap.get(builder.getMetrics()); + if (null == metricFamily || CollectionUtils.isEmpty(metricFamily.getMetricList())) { + return; + } + for (MetricFamily.Metric metric : metricFamily.getMetricList()) { + Map labelMap = metric.getLabels() + .stream() + .collect(Collectors.toMap(MetricFamily.Label::getName, MetricFamily.Label::getValue)); + CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder(); + for (String aliasField : aliasFields) { + String columnValue = labelMap.get(aliasField); + if (columnValue != null) { + valueRowBuilder.addColumn(columnValue); + } else if (CommonConstants.PROM_VALUE.equals(aliasField) || CommonConstants.PROM_METRIC_VALUE.equals(aliasField)) { + valueRowBuilder.addColumn(String.valueOf(metric.getValue())); + } else { + valueRowBuilder.addColumn(CommonConstants.NULL_VALUE); + } + } + builder.addValueRow(valueRowBuilder.build()); + } + } + + @Override + public String supportProtocol() { + return PROTOCOL; + } +} diff --git a/hertzbeat-collector/pom.xml b/hertzbeat-collector/pom.xml index a85b368439f..667fa300f81 100644 --- a/hertzbeat-collector/pom.xml +++ b/hertzbeat-collector/pom.xml @@ -41,6 +41,7 @@ hertzbeat-collector-nebulagraph hertzbeat-collector-rocketmq hertzbeat-collector-kafka + hertzbeat-collector-milvus diff --git a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java index 3b010bc399a..8fc0fab371f 100644 --- a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java +++ b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java @@ -300,6 +300,10 @@ public class Metrics { * Monitoring configuration information using the public s7 protocol */ private S7Protocol s7; + /** + * Monitoring configuration information using the public milvus protocol + */ + private HttpProtocol milvus; /** * collector use - Temporarily store subTask metrics response data */ diff --git a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/SshTunnel.java b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/SshTunnel.java index 154669560fa..9eb37c8590e 100644 --- a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/SshTunnel.java +++ b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/SshTunnel.java @@ -77,4 +77,14 @@ public class SshTunnel implements CommonRequestProtocol, Protocol { * share connection session */ private String shareConnection = "true"; + + @Override + public void setHost(String host) { + this.host = host; + } + + @Override + public void setPort(String port) { + this.port = port; + } } diff --git a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/HttpProtocol.java b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/HttpProtocol.java index 1b48e4871a9..d04836a849a 100644 --- a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/HttpProtocol.java +++ b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/HttpProtocol.java @@ -101,6 +101,16 @@ public class HttpProtocol implements CommonRequestProtocol, Protocol { */ private String enableUrlEncoding = "true"; + @Override + public void setHost(String host) { + this.host = host; + } + + @Override + public void setPort(String port) { + this.port = port; + } + /** * authentication information */ diff --git a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/RedisProtocol.java b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/RedisProtocol.java index 393e43f91ad..b6968c28385 100644 --- a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/RedisProtocol.java +++ b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/RedisProtocol.java @@ -66,5 +66,14 @@ public class RedisProtocol implements CommonRequestProtocol, Protocol { * SSH TUNNEL */ private SshTunnel sshTunnel; + + @Override + public void setHost(String host) { + this.host = host; + } + @Override + public void setPort(String port) { + this.port = port; + } } diff --git a/script/define/app/milvus.yml b/script/define/app/milvus.yml new file mode 100644 index 00000000000..1dc89ccaa30 --- /dev/null +++ b/script/define/app/milvus.yml @@ -0,0 +1,40 @@ +app: milvus +category: cache +name: + zh-CN: Milvus Vector DB + en-US: Milvus Vector DB +params: + - field: host + name: + zh-CN: Host + en-US: Host + type: host + required: true + - field: port + name: + zh-CN: Port + en-US: Port + type: number + range: '[0,65535]' + required: true + defaultValue: 9091 +metrics: + - name: milvus_proxy_reqs_total + priority: 0 + fields: + - field: type + type: 1 + label: true + - field: status + type: 1 + label: true + - field: value + type: 0 + protocol: milvus + milvus: + host: ^_^host^_^ + port: ^_^port^_^ + aliasFields: + - type + - status + - value