Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions hertzbeat-collector/hertzbeat-collector-collector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@
<version>${hertzbeat.version}</version>
</dependency>

<!-- collector-milvus -->
<dependency>
<groupId>org.apache.hertzbeat</groupId>
<artifactId>hertzbeat-collector-milvus</artifactId>
<version>${hertzbeat.version}</version>
</dependency>

<!-- spring -->
<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down
37 changes: 37 additions & 0 deletions hertzbeat-collector/hertzbeat-collector-milvus/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hertzbeat</groupId>
<artifactId>hertzbeat-collector</artifactId>
<version>2.0-SNAPSHOT</version>
</parent>

<artifactId>hertzbeat-collector-milvus</artifactId>
<name>${project.artifactId}</name>

<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.hertzbeat</groupId>
<artifactId>hertzbeat-collector-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hertzbeat</groupId>
<artifactId>hertzbeat-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hertzbeat</groupId>
<artifactId>hertzbeat-collector-basic</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hertzbeat.collector.milvus;

import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.collector.collect.AbstractCollect;
import org.apache.hertzbeat.collector.collect.common.http.CommonHttpClient;
import org.apache.hertzbeat.collector.collect.prometheus.parser.MetricFamily;
import org.apache.hertzbeat.collector.collect.prometheus.parser.OnlineParser;
import org.apache.hertzbeat.common.constants.CommonConstants;
import org.apache.hertzbeat.common.entity.job.Metrics;
import org.apache.hertzbeat.common.entity.job.protocol.HttpProtocol;
import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.util.EntityUtils;
import org.springframework.util.CollectionUtils;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Milvus Collector
*/
@Slf4j
public class MilvusCollectorImpl extends AbstractCollect {

private static final String PROTOCOL = "milvus";

@Override
public void preCheck(Metrics metrics) throws IllegalArgumentException {
if (metrics == null || metrics.getMilvus() == null) {
throw new IllegalArgumentException("Milvus collect must has milvus params");
}
}

@Override
public void collect(CollectRep.MetricsData.Builder builder, Metrics metrics) {
HttpProtocol milvus = metrics.getMilvus();
String host = milvus.getHost();
String port = milvus.getPort();
String url = "http://" + host + ":" + port + "/metrics";

HttpGet request = new HttpGet(url);
try (CloseableHttpResponse response = CommonHttpClient.getHttpClient().execute(request)) {
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200) {
builder.setCode(CollectRep.Code.FAIL);
builder.setMsg("StatusCode " + statusCode);
return;
}
HttpEntity entity = response.getEntity();
if (entity != null) {
parseResponse(entity.getContent(), metrics.getAliasFields(), builder);
}
EntityUtils.consumeQuietly(entity);
} catch (Exception e) {
builder.setCode(CollectRep.Code.FAIL);
builder.setMsg(e.getMessage());
log.error("Milvus collect error", e);
}
}

private void parseResponse(InputStream content, List<String> aliasFields, CollectRep.MetricsData.Builder builder) throws IOException {
Map<String, MetricFamily> metricFamilyMap = OnlineParser.parseMetrics(content, builder.getMetrics());
if (metricFamilyMap == null || metricFamilyMap.isEmpty()) {
return;
}
MetricFamily metricFamily = metricFamilyMap.get(builder.getMetrics());
if (null == metricFamily || CollectionUtils.isEmpty(metricFamily.getMetricList())) {
return;
}
for (MetricFamily.Metric metric : metricFamily.getMetricList()) {
Map<String, String> labelMap = metric.getLabels()
.stream()
.collect(Collectors.toMap(MetricFamily.Label::getName, MetricFamily.Label::getValue));
CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
for (String aliasField : aliasFields) {
String columnValue = labelMap.get(aliasField);
if (columnValue != null) {
valueRowBuilder.addColumn(columnValue);
} else if (CommonConstants.PROM_VALUE.equals(aliasField) || CommonConstants.PROM_METRIC_VALUE.equals(aliasField)) {
valueRowBuilder.addColumn(String.valueOf(metric.getValue()));
} else {
valueRowBuilder.addColumn(CommonConstants.NULL_VALUE);
}
}
builder.addValueRow(valueRowBuilder.build());
}
}

@Override
public String supportProtocol() {
return PROTOCOL;
}
}
1 change: 1 addition & 0 deletions hertzbeat-collector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
<module>hertzbeat-collector-nebulagraph</module>
<module>hertzbeat-collector-rocketmq</module>
<module>hertzbeat-collector-kafka</module>
<module>hertzbeat-collector-milvus</module>
</modules>

<dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ public class Metrics {
* Monitoring configuration information using the public s7 protocol
*/
private S7Protocol s7;
/**
* Monitoring configuration information using the public milvus protocol
*/
private HttpProtocol milvus;
/**
* collector use - Temporarily store subTask metrics response data
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,14 @@ public class SshTunnel implements CommonRequestProtocol, Protocol {
* share connection session
*/
private String shareConnection = "true";

@Override
public void setHost(String host) {
this.host = host;
}

@Override
public void setPort(String port) {
this.port = port;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ public class HttpProtocol implements CommonRequestProtocol, Protocol {
*/
private String enableUrlEncoding = "true";

@Override
public void setHost(String host) {
this.host = host;
}

@Override
public void setPort(String port) {
this.port = port;
}

/**
* authentication information
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,14 @@ public class RedisProtocol implements CommonRequestProtocol, Protocol {
* SSH TUNNEL
*/
private SshTunnel sshTunnel;

@Override
public void setHost(String host) {
this.host = host;
}

@Override
public void setPort(String port) {
this.port = port;
}
}
40 changes: 40 additions & 0 deletions script/define/app/milvus.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
app: milvus
category: cache
name:
zh-CN: Milvus Vector DB
en-US: Milvus Vector DB
params:
- field: host
name:
zh-CN: Host
en-US: Host
type: host
required: true
- field: port
name:
zh-CN: Port
en-US: Port
type: number
range: '[0,65535]'
required: true
defaultValue: 9091
metrics:
- name: milvus_proxy_reqs_total
priority: 0
fields:
- field: type
type: 1
label: true
- field: status
type: 1
label: true
- field: value
type: 0
protocol: milvus
milvus:
host: ^_^host^_^
port: ^_^port^_^
aliasFields:
- type
- status
- value