Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
fcef98a
feat: add otel support for connect
woshigaopp Aug 19, 2025
e59f4c5
feat: add e2e test for connect integrate
woshigaopp Aug 21, 2025
e678d7e
feat: add otel integrate entry
woshigaopp Aug 21, 2025
b11139d
feat: add s3 metric exporter
woshigaopp Aug 21, 2025
ddc7d97
feat: modify readme
woshigaopp Aug 21, 2025
78ec2c4
feat: modify gradle
woshigaopp Sep 8, 2025
67c1a5c
feat: modify gradle
woshigaopp Sep 8, 2025
08eb317
feat: modify gradle
woshigaopp Sep 8, 2025
599ebaf
feat: modify gradle add e2e
woshigaopp Sep 9, 2025
289ce6a
feat: add log module
woshigaopp Sep 23, 2025
9dea756
feat: add log module
woshigaopp Sep 23, 2025
faac7ab
feat: add log module readme
woshigaopp Sep 23, 2025
f2ecc16
feat: modify log module readme
woshigaopp Sep 23, 2025
40a00d9
feat: modify log module readme
woshigaopp Oct 14, 2025
e0fc4e8
feat: adjust core code
woshigaopp Oct 15, 2025
c3ee0b0
feat: adjust core code
woshigaopp Oct 16, 2025
231b86a
feat: adjust core code
woshigaopp Oct 16, 2025
57e4db1
feat: adjust core code
woshigaopp Oct 16, 2025
651ab89
merge
woshigaopp Oct 16, 2025
9650799
feat: remove opentelemetry proto
woshigaopp Oct 16, 2025
163c2af
feat: refactor code
woshigaopp Oct 16, 2025
a4066df
feat: refactor code
woshigaopp Oct 16, 2025
23abb0f
feat: refactor code
woshigaopp Oct 16, 2025
0e3e1e5
feat: refactor code
woshigaopp Oct 16, 2025
82abcd7
feat: refactor code
woshigaopp Oct 16, 2025
43345d4
Merge branch '1.6' into feature/connect_new_1.6
woshigaopp Oct 16, 2025
febb8b5
feat: refactor code
woshigaopp Oct 16, 2025
a0de21a
feat: refactor code
woshigaopp Oct 16, 2025
0abd7d0
feat: support multi az
woshigaopp Oct 17, 2025
fe0cb08
Merge branch '1.6' into feature/connect_new_1.6
woshigaopp Oct 17, 2025
d440ab2
feat: add connect az test
woshigaopp Oct 17, 2025
ac93c25
feat: add connect az test
woshigaopp Oct 20, 2025
2e4ac84
feat: add connect az test
woshigaopp Oct 20, 2025
4395020
feat: add connect az test
woshigaopp Oct 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions automq-log-uploader/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# AutoMQ Log Uploader Module

This module provides asynchronous S3 log upload capability based on Log4j 1.x. Other submodules only need to depend on this module and configure it simply to synchronize logs to object storage. Core components:

- `com.automq.log.uploader.S3RollingFileAppender`: Extends `RollingFileAppender`, pushes log events to the uploader while writing to local files.
- `com.automq.log.uploader.LogUploader`: Asynchronously buffers, compresses, and uploads logs; supports configuration switches and periodic cleanup.
- `com.automq.log.uploader.S3LogConfig`/`S3LogConfigProvider`: Abstracts the configuration required for uploading. The default implementation `PropertiesS3LogConfigProvider` reads from `automq-log.properties`.

## Quick Integration

1. Add dependency in your module's `build.gradle`:
```groovy
implementation project(':automq-log-uploader')
```
2. Create `automq-log.properties` in the resources directory (or customize `S3LogConfigProvider`):
```properties
log.s3.enable=true
log.s3.bucket=0@s3://your-log-bucket?region=us-east-1
log.s3.cluster.id=my-cluster
log.s3.node.id=1
log.s3.selector.type=kafka
log.s3.selector.kafka.bootstrap.servers=PLAINTEXT://kafka:9092
log.s3.selector.kafka.group.id=automq-log-uploader-my-cluster
```
3. Reference the Appender in `log4j.properties`:
```properties
log4j.appender.s3_uploader=com.automq.log.uploader.S3RollingFileAppender
log4j.appender.s3_uploader.File=logs/server.log
log4j.appender.s3_uploader.MaxFileSize=100MB
log4j.appender.s3_uploader.MaxBackupIndex=10
log4j.appender.s3_uploader.layout=org.apache.log4j.PatternLayout
log4j.appender.s3_uploader.layout.ConversionPattern=[%d] %p %m (%c)%n
```
If you need to customize the configuration provider, you can set:
```properties
log4j.appender.s3_uploader.configProviderClass=com.example.CustomS3LogConfigProvider
```

## Key Configuration Description

| Configuration Item | Description |
| ------ | ---- |
| `log.s3.enable` | Whether to enable S3 upload function.
| `log.s3.bucket` | It is recommended to use AutoMQ Bucket URI (e.g. `0@s3://bucket?region=us-east-1&pathStyle=true`). If using a shorthand bucket name, additional fields such as `log.s3.region` need to be provided.
| `log.s3.cluster.id` / `log.s3.node.id` | Used to construct the object storage path `automq/logs/{cluster}/{node}/{hour}/{uuid}`.
| `log.s3.selector.type` | Leader election strategy (`static`, `nodeid`, `file`, `kafka`, or custom).
| `log.s3.primary.node` | Used with `static` strategy to indicate whether the current node is the primary node.
| `log.s3.selector.kafka.*` | Additional configuration required for Kafka leader election, such as `bootstrap.servers`, `group.id`, etc.
| `log.s3.active.controller` | **Deprecated**, please use `log.s3.selector.type=static` + `log.s3.primary.node=true`.

The upload schedule can be overridden by environment variables:

- `AUTOMQ_OBSERVABILITY_UPLOAD_INTERVAL`: Maximum upload interval (milliseconds).
- `AUTOMQ_OBSERVABILITY_CLEANUP_INTERVAL`: Retention period (milliseconds), old objects earlier than this time will be cleaned up.

### Leader Election Strategies

To avoid multiple nodes executing S3 cleanup tasks simultaneously, the log uploader has a built-in leader election mechanism consistent with the OpenTelemetry module:

1. **static**: Specify which node is the leader using `log.s3.primary.node=true|false`.
2. **nodeid**: Becomes the leader node when `log.s3.node.id` equals `primaryNodeId`, which can be set in the URL or properties with `log.s3.selector.primary.node.id`.
3. **file**: Uses a shared file for preemptive leader election, configure `log.s3.selector.file.leaderFile=/shared/leader`, `log.s3.selector.file.leaderTimeoutMs=60000`.
4. **kafka**: Default strategy. All nodes join the same consumer group of a single-partition topic, the node holding the partition becomes the leader. Necessary configuration:
```properties
log.s3.selector.type=kafka
log.s3.selector.kafka.bootstrap.servers=PLAINTEXT://kafka:9092
log.s3.selector.kafka.topic=__automq_log_uploader_leader_cluster1
log.s3.selector.kafka.group.id=automq-log-uploader-cluster1
```
Advanced parameters such as security (SASL/SSL), timeout, etc. can be provided through `log.s3.selector.kafka.*`.
5. **custom**: Implement `com.automq.log.uploader.selector.LogUploaderNodeSelectorProvider` and register it through SPI to introduce a custom leader election strategy.

## Extension

If the application already has its own dependency injection/configuration method, you can implement `S3LogConfigProvider` and call it at startup:

```java
import com.automq.log.uploader.S3RollingFileAppender;

S3RollingFileAppender.setConfigProvider(new CustomConfigProvider());
```

All `S3RollingFileAppender` instances will share this provider.
19 changes: 19 additions & 0 deletions automq-log-uploader/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
plugins {
id 'java-library'
}

repositories {
mavenCentral()
}

dependencies {
api project(':s3stream')

implementation project(':clients')
implementation libs.reload4j
implementation libs.slf4jApi
implementation libs.slf4jBridge
implementation libs.nettyBuffer
implementation libs.guava
implementation libs.commonLang
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.log.uploader;

import com.automq.log.uploader.selector.LogUploaderNodeSelector;
import com.automq.log.uploader.selector.LogUploaderNodeSelectorFactory;
import com.automq.stream.s3.operator.BucketURI;
import com.automq.stream.s3.operator.ObjectStorage;
import com.automq.stream.s3.operator.ObjectStorageFactory;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;

import static com.automq.log.uploader.LogConfigConstants.DEFAULT_LOG_S3_ACTIVE_CONTROLLER;
import static com.automq.log.uploader.LogConfigConstants.DEFAULT_LOG_S3_CLUSTER_ID;
import static com.automq.log.uploader.LogConfigConstants.DEFAULT_LOG_S3_ENABLE;
import static com.automq.log.uploader.LogConfigConstants.DEFAULT_LOG_S3_NODE_ID;
import static com.automq.log.uploader.LogConfigConstants.LOG_PROPERTIES_FILE;
import static com.automq.log.uploader.LogConfigConstants.LOG_S3_ACCESS_KEY;
import static com.automq.log.uploader.LogConfigConstants.LOG_S3_ACTIVE_CONTROLLER_KEY;
import static com.automq.log.uploader.LogConfigConstants.LOG_S3_BUCKET_KEY;
import static com.automq.log.uploader.LogConfigConstants.LOG_S3_CLUSTER_ID_KEY;
import static com.automq.log.uploader.LogConfigConstants.LOG_S3_ENABLE_KEY;
import static com.automq.log.uploader.LogConfigConstants.LOG_S3_ENDPOINT_KEY;
import static com.automq.log.uploader.LogConfigConstants.LOG_S3_NODE_ID_KEY;
import static com.automq.log.uploader.LogConfigConstants.LOG_S3_PRIMARY_NODE_KEY;
import static com.automq.log.uploader.LogConfigConstants.LOG_S3_REGION_KEY;
import static com.automq.log.uploader.LogConfigConstants.LOG_S3_SECRET_KEY;
import static com.automq.log.uploader.LogConfigConstants.LOG_S3_SELECTOR_PREFIX;
import static com.automq.log.uploader.LogConfigConstants.LOG_S3_SELECTOR_PRIMARY_NODE_ID_KEY;
import static com.automq.log.uploader.LogConfigConstants.LOG_S3_SELECTOR_TYPE_KEY;

public class DefaultS3LogConfig implements S3LogConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultS3LogConfig.class);

private final Properties props;
private ObjectStorage objectStorage;
private LogUploaderNodeSelector nodeSelector;

public DefaultS3LogConfig() {
this(null);
}

public DefaultS3LogConfig(Properties overrideProps) {
this.props = new Properties();
if (overrideProps != null) {
this.props.putAll(overrideProps);
}
if (overrideProps == null) {
try (InputStream input = getClass().getClassLoader().getResourceAsStream(LOG_PROPERTIES_FILE)) {
if (input != null) {
props.load(input);
LOGGER.info("Loaded log configuration from {}", LOG_PROPERTIES_FILE);
} else {
LOGGER.warn("Could not find {}, using default log configurations.", LOG_PROPERTIES_FILE);
}
} catch (IOException ex) {
LOGGER.error("Failed to load log configuration from {}.", LOG_PROPERTIES_FILE, ex);
}
}
initializeNodeSelector();
}

@Override
public boolean isEnabled() {
return Boolean.parseBoolean(props.getProperty(LOG_S3_ENABLE_KEY, String.valueOf(DEFAULT_LOG_S3_ENABLE)));
}

@Override
public String clusterId() {
return props.getProperty(LOG_S3_CLUSTER_ID_KEY, DEFAULT_LOG_S3_CLUSTER_ID);
}

@Override
public int nodeId() {
return Integer.parseInt(props.getProperty(LOG_S3_NODE_ID_KEY, String.valueOf(DEFAULT_LOG_S3_NODE_ID)));
}

@Override
public synchronized ObjectStorage objectStorage() {
if (this.objectStorage != null) {
return this.objectStorage;
}
String bucket = props.getProperty(LOG_S3_BUCKET_KEY);
if (StringUtils.isBlank(bucket)) {
LOGGER.error("Mandatory log config '{}' is not set.", LOG_S3_BUCKET_KEY);
return null;
}

String normalizedBucket = bucket.trim();
if (!normalizedBucket.contains("@")) {
String region = props.getProperty(LOG_S3_REGION_KEY);
if (StringUtils.isBlank(region)) {
LOGGER.error("'{}' must be provided when '{}' is not a full AutoMQ bucket URI.",
LOG_S3_REGION_KEY, LOG_S3_BUCKET_KEY);
return null;
}
String endpoint = props.getProperty(LOG_S3_ENDPOINT_KEY);
String accessKey = props.getProperty(LOG_S3_ACCESS_KEY);
String secretKey = props.getProperty(LOG_S3_SECRET_KEY);

StringBuilder builder = new StringBuilder("0@s3://").append(normalizedBucket)
.append("?region=").append(region.trim());
if (StringUtils.isNotBlank(endpoint)) {
builder.append("&endpoint=").append(endpoint.trim());
}
if (StringUtils.isNotBlank(accessKey) && StringUtils.isNotBlank(secretKey)) {
builder.append("&authType=static")
.append("&accessKey=").append(accessKey.trim())
.append("&secretKey=").append(secretKey.trim());
}
normalizedBucket = builder.toString();
}

BucketURI logBucket = BucketURI.parse(normalizedBucket);
this.objectStorage = ObjectStorageFactory.instance().builder(logBucket).threadPrefix("s3-log-uploader").build();
return this.objectStorage;
}

@Override
public LogUploaderNodeSelector nodeSelector() {
if (nodeSelector == null) {
initializeNodeSelector();
}
return nodeSelector;
}

private void initializeNodeSelector() {
String selectorType = props.getProperty(LOG_S3_SELECTOR_TYPE_KEY, "static");
Map<String, String> selectorConfig = new HashMap<>();
Map<String, String> rawConfig = getPropertiesWithPrefix(LOG_S3_SELECTOR_PREFIX);
String normalizedType = selectorType == null ? "" : selectorType.toLowerCase(Locale.ROOT);
for (Map.Entry<String, String> entry : rawConfig.entrySet()) {
String key = entry.getKey();
if (normalizedType.length() > 0 && key.toLowerCase(Locale.ROOT).startsWith(normalizedType + ".")) {
key = key.substring(normalizedType.length() + 1);
}
if ("type".equalsIgnoreCase(key) || key.isEmpty()) {
continue;
}
selectorConfig.putIfAbsent(key, entry.getValue());
}

selectorConfig.putIfAbsent("isPrimaryUploader",
props.getProperty(LOG_S3_PRIMARY_NODE_KEY,
props.getProperty(LOG_S3_ACTIVE_CONTROLLER_KEY, String.valueOf(DEFAULT_LOG_S3_ACTIVE_CONTROLLER))));

String primaryNodeId = props.getProperty(LOG_S3_SELECTOR_PRIMARY_NODE_ID_KEY);
if (StringUtils.isNotBlank(primaryNodeId)) {
selectorConfig.putIfAbsent("primaryNodeId", primaryNodeId.trim());
}

try {
this.nodeSelector = LogUploaderNodeSelectorFactory.createSelector(selectorType, clusterId(), nodeId(), selectorConfig);
} catch (Exception e) {
LOGGER.error("Failed to create log uploader selector of type {}", selectorType, e);
this.nodeSelector = LogUploaderNodeSelector.staticSelector(false);
}
}

private Map<String, String> getPropertiesWithPrefix(String prefix) {
Map<String, String> result = new HashMap<>();
if (prefix == null || prefix.isEmpty()) {
return result;
}
for (String key : props.stringPropertyNames()) {
if (key.startsWith(prefix)) {
String trimmed = key.substring(prefix.length());
if (!trimmed.isEmpty()) {
result.put(trimmed, props.getProperty(key));
}
}
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.log.uploader;

public class LogConfigConstants {
private LogConfigConstants() {
}

public static final String LOG_PROPERTIES_FILE = "automq-log.properties";

public static final String LOG_S3_ENABLE_KEY = "log.s3.enable";
public static final boolean DEFAULT_LOG_S3_ENABLE = false;

public static final String LOG_S3_BUCKET_KEY = "log.s3.bucket";
public static final String LOG_S3_REGION_KEY = "log.s3.region";
public static final String LOG_S3_ENDPOINT_KEY = "log.s3.endpoint";

public static final String LOG_S3_ACCESS_KEY = "log.s3.access.key";
public static final String LOG_S3_SECRET_KEY = "log.s3.secret.key";

public static final String LOG_S3_CLUSTER_ID_KEY = "log.s3.cluster.id";
public static final String DEFAULT_LOG_S3_CLUSTER_ID = "automq-cluster";

public static final String LOG_S3_NODE_ID_KEY = "log.s3.node.id";
public static final int DEFAULT_LOG_S3_NODE_ID = 0;

/**
* @deprecated Use selector configuration instead.
*/
@Deprecated
public static final String LOG_S3_ACTIVE_CONTROLLER_KEY = "log.s3.active.controller";
@Deprecated
public static final boolean DEFAULT_LOG_S3_ACTIVE_CONTROLLER = true;

public static final String LOG_S3_PRIMARY_NODE_KEY = "log.s3.primary.node";
public static final String LOG_S3_SELECTOR_PRIMARY_NODE_ID_KEY = "log.s3.selector.primary.node.id";
public static final String LOG_S3_SELECTOR_TYPE_KEY = "log.s3.selector.type";
public static final String LOG_S3_SELECTOR_PREFIX = "log.s3.selector.";
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* limitations under the License.
*/

package com.automq.shell.log;
package com.automq.log.uploader;

import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -47,10 +47,10 @@ public void validate() {
throw new IllegalArgumentException("Level cannot be blank");
}
if (StringUtils.isBlank(logger)) {
throw new IllegalArgumentException("Level cannot be blank");
throw new IllegalArgumentException("Logger cannot be blank");
}
if (StringUtils.isBlank(message)) {
throw new IllegalArgumentException("Level cannot be blank");
throw new IllegalArgumentException("Message cannot be blank");
}
}

Expand Down
Loading
Loading