Skip to content

Commit

Permalink
Merge 9fbd8c4 into 55fb6a3
Browse files Browse the repository at this point in the history
  • Loading branch information
minusmajun committed Dec 16, 2020
2 parents 55fb6a3 + 9fbd8c4 commit b47d181
Show file tree
Hide file tree
Showing 28 changed files with 991 additions and 278 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,18 @@ public static Map<String, String> toStringMap(final String text) throws IOExcept
return new HashMap(properties);
}

public static String readString(final byte[] bytes) {
return bytes == null ? null : readString(bytes, 0, bytes.length);
}

public static String readString(byte[] bytes, int offset, int length) {
if (bytes == null) {
return null;
} else {
return length == 0 ? "" : new String(bytes, offset, length, Charset.forName("UTF-8"));
}
}

/**
* 读取字符串,字符长度&lt;=255
*
Expand Down
10 changes: 7 additions & 3 deletions joyqueue-console/joyqueue-data/joyqueue-data-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,13 @@
<artifactId>commons-beanutils</artifactId>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
</dependency>
<!--<dependency>
<groupId>org.joyqueue</groupId>
<artifactId>joyqueue-hbase</artifactId>
</dependency>
</dependency>-->
<dependency>
<groupId>org.joyqueue</groupId>
<artifactId>joyqueue-nsr-core</artifactId>
Expand All @@ -124,10 +128,10 @@
<artifactId>joyqueue-archive-api</artifactId>
</dependency>
<!-- FIXME: 应该依赖joyqueue-archive-api 而不是这个joyqueue-archive-hbase实现-->
<dependency>
<!--<dependency>
<groupId>org.joyqueue</groupId>
<artifactId>joyqueue-archive-hbase</artifactId>
</dependency>
</dependency>-->
<dependency>
<groupId>org.joyqueue</groupId>
<artifactId>joyqueue-client-all</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.convert.CodeConverter;
import org.joyqueue.convert.NsrConsumerConverter;
import org.joyqueue.domain.ClientType;
Expand All @@ -26,7 +27,6 @@
import org.joyqueue.nsr.ConsumerNameServerService;
import org.joyqueue.nsr.NameServerBase;
import org.joyqueue.nsr.model.ConsumerQuery;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Service;

import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import org.joyqueue.model.domain.User;
import org.joyqueue.model.query.QApplication;
import org.joyqueue.model.query.QArchive;
import org.joyqueue.server.archive.store.QueryCondition;
import org.joyqueue.server.archive.store.api.ArchiveStore;
import org.joyqueue.server.archive.store.model.ConsumeLog;
import org.joyqueue.server.archive.store.model.SendLog;
import org.joyqueue.server.archive.store.query.QueryCondition;
import org.joyqueue.service.ApplicationService;
import org.joyqueue.service.ArchiveService;
import org.joyqueue.service.TopicService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

import com.alibaba.fastjson.JSON;
import org.joyqueue.broker.archive.ArchiveUtils;
import org.joyqueue.broker.buffer.Serializer;
import org.joyqueue.util.serializer.Serializer;
import org.joyqueue.exception.ServiceException;
import org.joyqueue.handler.error.ErrorCode;
import org.joyqueue.message.SourceType;
import org.joyqueue.server.archive.store.HBaseSerializer;
import org.joyqueue.server.archive.store.utils.ArchiveSerializer;
import org.joyqueue.server.retry.model.RetryMessageModel;
import org.joyqueue.handler.Constants;
import org.joyqueue.service.MessagePreviewService;
Expand All @@ -45,7 +45,6 @@
import com.jd.laf.web.vertx.response.Responses;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -200,10 +199,10 @@ public BrokerMessage filterBrokerMessage(BrokerMessage brokerMessage, SendLog se
}
for(BrokerMessage m:msgs){
String msgId=ArchiveUtils.messageId(brokerMessage.getTopic(),m.getPartition(),m.getMsgIndexNo());
byte[] msgIdMd5Bytes=HBaseSerializer.md5(msgId,null);
byte[] msgIdMd5Bytes= ArchiveSerializer.md5(msgId,null);
if(logger.isDebugEnabled()) {
logger.debug("current message business id {},message id {},md5 length {},base 64 bytes {},hex {}", m.getBusinessId(), msgId, msgIdMd5Bytes.length,
Base64.getEncoder().encodeToString(msgIdMd5Bytes), HBaseSerializer.byteArrayToHexStr(msgIdMd5Bytes));
Base64.getEncoder().encodeToString(msgIdMd5Bytes), ArchiveSerializer.byteArrayToHexStr(msgIdMd5Bytes));
}
if(Arrays.equals(msgIdMd5Bytes,sendLog.getBytesMessageId())){
return m;
Expand All @@ -221,7 +220,7 @@ public String preview(BrokerMessage brokerMessage,String messageType){
return messagePreviewService.preview(messageType, brokerMessage.getDecompressedBody());
} catch (Throwable e) {
logger.error("parse error",e);
return Bytes.toString(brokerMessage.getDecompressedBody());
return Serializer.readString(brokerMessage.getDecompressedBody());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.jd.laf.web.vertx.annotation.QueryParam;
import com.jd.laf.web.vertx.response.Response;
import com.jd.laf.web.vertx.response.Responses;
import org.apache.commons.net.telnet.TelnetClient;
import org.joyqueue.handler.annotation.PageQuery;
import org.joyqueue.handler.error.ConfigException;
import org.joyqueue.handler.routing.command.NsrCommandSupport;
Expand All @@ -29,7 +30,6 @@
import org.joyqueue.model.domain.Broker;
import org.joyqueue.model.query.QBroker;
import org.joyqueue.service.BrokerService;
import org.apache.commons.net.telnet.TelnetClient;

import static org.joyqueue.handler.Constants.ID;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.joyqueue.server.archive.store;
package org.joyqueue.server.archive.store.query;

import org.joyqueue.server.archive.store.model.Query;
import org.joyqueue.server.archive.store.utils.ArchiveSerializer;

import java.util.Arrays;

/**
* Created by chengzhiliang on 2018/12/4.
Expand Down Expand Up @@ -72,10 +75,21 @@ public byte[] getStartRowKeyByteArr() {
}

public void setStartRowKeyByteArr(String startRowKeyByteArr) {
byte[] bytes = HBaseSerializer.hexStrToByteArray(startRowKeyByteArr);
byte[] bytes = ArchiveSerializer.hexStrToByteArray(startRowKeyByteArr);
this.startRowKeyByteArr = bytes;
}

@Override
public String toString() {
return "QueryCondition{" +
"startRowKey=" + startRowKey +
", stopRowKey=" + stopRowKey +
", count=" + count +
", rowKey=" + rowKey +
", startRowKeyByteArr=" + Arrays.toString(startRowKeyByteArr) +
'}';
}

/**
* 查询RowKey
*/
Expand Down Expand Up @@ -117,5 +131,14 @@ public void setMessageId(String messageId) {
this.messageId = messageId;
}

@Override
public String toString() {
return "RowKey{" +
"topic='" + topic + '\'' +
", time=" + time +
", businessId='" + businessId + '\'' +
", messageId='" + messageId + '\'' +
'}';
}
}
}
Original file line number Diff line number Diff line change
@@ -1,32 +1,15 @@
/**
* Copyright 2019 The JoyQueue Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.joyqueue.server.archive.store;
package org.joyqueue.server.archive.store.utils;

import org.joyqueue.server.archive.store.model.ConsumeLog;
import org.joyqueue.server.archive.store.model.SendLog;
import org.joyqueue.toolkit.lang.Pair;
import org.joyqueue.toolkit.security.Md5;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.security.GeneralSecurityException;

/**
* Created by chengzhiliang on 2018/12/12.
*/
public class HBaseSerializer {
public class ArchiveSerializer {

public static ConsumeLog readConsumeLog(Pair<byte[], byte[]> pair) {
ConsumeLog log = new ConsumeLog();
Expand Down Expand Up @@ -247,7 +230,7 @@ public static SendLog readSendLog4BizId(Pair<byte[], byte[]> pair) {
*
**/
public static byte[] md5(String content,byte[] key) throws GeneralSecurityException {
return Md5.INSTANCE.encrypt(content.getBytes(Charset.forName("utf-8")), key);
return Md5.INSTANCE.encrypt(content.getBytes(Charset.forName("utf-8")), key);
}

public static String byteArrayToHexStr(byte[] byteArray) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.joyqueue.server.archive.store.api.ArchiveStore;
import org.joyqueue.monitor.PointTracer;
import org.joyqueue.server.archive.store.model.*;
import org.joyqueue.server.archive.store.query.QueryCondition;
import org.joyqueue.server.archive.store.utils.ArchiveSerializer;
import org.joyqueue.toolkit.lang.Pair;
import org.joyqueue.toolkit.network.IpUtil;
import org.joyqueue.toolkit.security.Md5;
Expand Down Expand Up @@ -118,7 +120,7 @@ public void putConsumeLog(List<ConsumeLog> consumeLogList, PointTracer tracer) t
int appId = topicAppMapping.getAppId(app);
consumeLog.setAppId(appId);

Pair<byte[], byte[]> pair = HBaseSerializer.convertConsumeLogToKVBytes(consumeLog);
Pair<byte[], byte[]> pair = ArchiveSerializer.convertConsumeLogToKVBytes(consumeLog);

logList.add(pair);
}
Expand All @@ -145,10 +147,10 @@ public void putSendLog(List<SendLog> sendLogList, PointTracer tracer) throws Joy
log.setTopicId(topicId);
log.setAppId(appId);

Pair<byte[], byte[]> pair = HBaseSerializer.convertSendLogToKVBytes(log);
Pair<byte[], byte[]> pair = ArchiveSerializer.convertSendLogToKVBytes(log);
logList.add(pair);

Pair<byte[], byte[]> pair4BizId = HBaseSerializer.convertSendLogToKVBytes4BizId(log);
Pair<byte[], byte[]> pair4BizId = ArchiveSerializer.convertSendLogToKVBytes4BizId(log);
logList.add(pair4BizId);

}
Expand Down Expand Up @@ -233,13 +235,13 @@ public List<SendLog> scanSendLog(Query query) throws JoyQueueException {
for (Pair<byte[], byte[]> pair : scan) {
SendLog log;
if (hasBizId) {
log = HBaseSerializer.readSendLog4BizId(pair);
log = ArchiveSerializer.readSendLog4BizId(pair);
} else {
log = HBaseSerializer.readSendLog(pair);
log = ArchiveSerializer.readSendLog(pair);
}

log.setClientIpStr(toIpString(log.getClientIp()));
log.setRowKeyStart(HBaseSerializer.byteArrayToHexStr(pair.getKey()));
log.setRowKeyStart(ArchiveSerializer.byteArrayToHexStr(pair.getKey()));
String topicName = topicAppMapping.getTopicName(log.getTopicId());
log.setTopic(topicName);

Expand Down Expand Up @@ -412,13 +414,13 @@ public SendLog getOneSendLog(Query query) throws JoyQueueException {
allocate.putInt(topicAppMapping.getTopicId(rowKey.getTopic()));
allocate.putLong(rowKey.getTime());
allocate.put(Md5.INSTANCE.encrypt(rowKey.getBusinessId().getBytes(Charset.forName("utf-8")), null));
allocate.put(HBaseSerializer.hexStrToByteArray(rowKey.getMessageId()));
allocate.put(ArchiveSerializer.hexStrToByteArray(rowKey.getMessageId()));
// rowKey
byte[] bytesRowKey = allocate.array();

Pair<byte[], byte[]> bytes = hBaseClient.getKV(namespace, sendLogTable, cf, col, bytesRowKey);

SendLog log = HBaseSerializer.readSendLog(bytes);
SendLog log = ArchiveSerializer.readSendLog(bytes);

StringBuilder clientIp = new StringBuilder();
IpUtil.toAddress(log.getClientIp(), clientIp);
Expand Down Expand Up @@ -452,7 +454,7 @@ public List<ConsumeLog> scanConsumeLog(String messageId, Integer count) throws J
scanParameters.setCf(cf);
scanParameters.setCol(col);

byte[] messageIdBytes = HBaseSerializer.hexStrToByteArray(messageId);
byte[] messageIdBytes = ArchiveSerializer.hexStrToByteArray(messageId);
scanParameters.setStartRowKey(messageIdBytes);

ByteBuffer bytebuffer = ByteBuffer.allocate(messageIdBytes.length + 1);
Expand All @@ -465,8 +467,8 @@ public List<ConsumeLog> scanConsumeLog(String messageId, Integer count) throws J
List<Pair<byte[], byte[]>> scan = hBaseClient.scan(namespace, scanParameters);

for (Pair<byte[], byte[]> pair : scan) {
ConsumeLog log = HBaseSerializer.readConsumeLog(pair);
log.setMessageId(HBaseSerializer.byteArrayToHexStr(log.getBytesMessageId()));
ConsumeLog log = ArchiveSerializer.readConsumeLog(pair);
log.setMessageId(ArchiveSerializer.byteArrayToHexStr(log.getBytesMessageId()));

StringBuilder clientIp = new StringBuilder();
IpUtil.toAddress(log.getClientIp(), clientIp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
* Created by chengzhiliang on 2018/12/6.
*/
public class ArchiveConfig {
public static final String LOG_DETAIL_PRODUCE_PREFIX = "produce.";
public static final String LOG_DETAIL_CONSUME_PREFIX = "consume.";

private static final String ARCHIVE_PATH ="/archive/";
private PropertySupplier propertySupplier;
private String archivePath;
Expand Down Expand Up @@ -60,6 +63,13 @@ public void setPath(String path) {
}
}

public boolean getLogDetail(String archiveType, String brokerId) {
return (boolean) PropertySupplier.getValue(propertySupplier,
ArchiveConfigKey.ARCHIVE_TRACE_LOG.getName() + archiveType + brokerId,
ArchiveConfigKey.ARCHIVE_TRACE_LOG.getType(),
ArchiveConfigKey.ARCHIVE_TRACE_LOG.getValue());
}

public int getConsumeBatchNum() {
return PropertySupplier.getValue(propertySupplier, ArchiveConfigKey.CONSUME_BATCH_NUM);
}
Expand Down Expand Up @@ -91,6 +101,10 @@ public String getNamespace() {
return PropertySupplier.getValue(propertySupplier, ArchiveConfigKey.ARCHIVE_STORE_NAMESPACE);
}

public int getStoreFialedRetryCount() {
return PropertySupplier.getValue(propertySupplier, ArchiveConfigKey.ARCHIVE_STORE_RETRY_COUNT);
}

public String getTracerType() {
return PropertySupplier.getValue(propertySupplier, BrokerConfigKey.TRACER_TYPE);
}
Expand All @@ -102,4 +116,8 @@ public boolean isReamingEnable() {
public boolean isBacklogEnable() {
return PropertySupplier.getValue(propertySupplier, ArchiveConfigKey.ARCHIVE_BACKLOG_ENABLE);
}

public int getLogRetainDuration() {
return PropertySupplier.getValue(propertySupplier, ArchiveConfigKey.ARCHIVE_LOG_RETAIN_DURATION);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ public enum ArchiveConfigKey implements PropertyDef {
ARCHIVE_SWITCH("archive.switch", false, Type.BOOLEAN),
ARCHIVE_THREAD_POOL_QUEUE_SIZE("archive.thread.pool.queue.size", 10, Type.INT),
ARCHIVE_STORE_NAMESPACE("archive.store.namespace", "joyqueue", Type.STRING),
ARCHIVE_REAMING_ENABLE("archive.reaming.enable", false, Type.BOOLEAN),
ARCHIVE_STORE_RETRY_COUNT("archive.store.retry.count", 3, Type.INT),
ARCHIVE_REAMING_ENABLE("archive.reaming.enable", true, Type.BOOLEAN),
ARCHIVE_BACKLOG_ENABLE("archive.backlog.enable", false, Type.BOOLEAN),

ARCHIVE_TRACE_LOG("archive.trace.log.", false, Type.BOOLEAN),
ARCHIVE_LOG_RETAIN_DURATION("archive.log.retain.duration", 24, Type.INT)
;

private String name;
Expand Down
Loading

0 comments on commit b47d181

Please sign in to comment.