Skip to content

Commit

Permalink
Fixed date-time format used in stats (#909)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed Nov 19, 2017
1 parent dd3318e commit bcc7404
Show file tree
Hide file tree
Showing 11 changed files with 95 additions and 55 deletions.
Expand Up @@ -20,9 +20,6 @@

import static com.google.common.base.Preconditions.checkNotNull;

import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -63,6 +60,7 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
Expand Down Expand Up @@ -350,8 +348,8 @@ public void asyncGetManagedLedgerInfo(String name, ManagedLedgerInfoCallback cal
public void operationComplete(MLDataFormats.ManagedLedgerInfo pbInfo, Stat stat) {
ManagedLedgerInfo info = new ManagedLedgerInfo();
info.version = stat.getVersion();
info.creationDate = DATE_FORMAT.format(Instant.ofEpochMilli(stat.getCreationTimestamp()));
info.modificationDate = DATE_FORMAT.format(Instant.ofEpochMilli(stat.getModificationTimestamp()));
info.creationDate = DateFormatter.format(stat.getCreationTimestamp());
info.modificationDate = DateFormatter.format(stat.getModificationTimestamp());

info.ledgers = new ArrayList<>(pbInfo.getLedgerInfoCount());
if (pbInfo.hasTerminatedPosition()) {
Expand Down Expand Up @@ -385,10 +383,9 @@ public void operationComplete(List<String> cursorsList, Stat stat) {
public void operationComplete(ManagedCursorInfo pbCursorInfo, Stat stat) {
CursorInfo cursorInfo = new CursorInfo();
cursorInfo.version = stat.getVersion();
cursorInfo.creationDate = DATE_FORMAT
.format(Instant.ofEpochMilli(stat.getCreationTimestamp()));
cursorInfo.modificationDate = DATE_FORMAT
.format(Instant.ofEpochMilli(stat.getModificationTimestamp()));
cursorInfo.creationDate = DateFormatter.format(stat.getCreationTimestamp());
cursorInfo.modificationDate = DateFormatter
.format(stat.getModificationTimestamp());

cursorInfo.cursorsLedgerId = pbCursorInfo.getCursorsLedgerId();

Expand Down Expand Up @@ -475,6 +472,4 @@ public BookKeeper getBookKeeper() {
}

private static final Logger log = LoggerFactory.getLogger(ManagedLedgerFactoryImpl.class);

private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSZ").withZone(ZoneId.systemDefault());
}
Expand Up @@ -98,6 +98,7 @@
import org.apache.pulsar.common.policies.data.PersistentTopicStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
Expand All @@ -122,8 +123,6 @@
public class PersistentTopics extends AdminResource {
private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);

private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSZ").withZone(ZoneId.systemDefault());

protected static final int PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS = 1000;
private static final int OFFLINE_TOPIC_STAT_TTL_MINS = 10;

Expand Down Expand Up @@ -1026,7 +1025,7 @@ public void resetCursorOnPosition(@PathParam("property") String property, @PathP
}
}
}

@GET
@Path("/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}/position/{messagePosition}")
@ApiOperation(value = "Peek nth message on a topic subscription.")
Expand Down Expand Up @@ -1077,8 +1076,10 @@ public Response peekNthMessage(@PathParam("property") String property, @PathPara
responseBuilder.header("X-Pulsar-PROPERTY-" + keyValue.getKey(), keyValue.getValue());
}
if (metadata.hasPublishTime()) {
responseBuilder.header("X-Pulsar-publish-time",
DATE_FORMAT.format(Instant.ofEpochMilli(metadata.getPublishTime())));
responseBuilder.header("X-Pulsar-publish-time", DateFormatter.format(metadata.getPublishTime()));
}
if (metadata.hasEventTime()) {
responseBuilder.header("X-Pulsar-event-time", DateFormatter.format(metadata.getEventTime()));
}
if (metadata.hasNumMessagesInBatch()) {
responseBuilder.header("X-Pulsar-num-batch-message", metadata.getNumMessagesInBatch());
Expand Down Expand Up @@ -1410,7 +1411,7 @@ public void operationComplete(List<String> cursors,
});
// wait for all subscriptions to be created
FutureUtil.waitForAll(subscriptionCreationFuture).handle((res, subscriptionException) -> {
// close all topics and then complete result future
// close all topics and then complete result future
FutureUtil.waitForAll(
topics.stream().map(topic -> topic.close()).collect(Collectors.toList()))
.handle((closed, topicCloseException) -> {
Expand Down Expand Up @@ -1444,7 +1445,7 @@ public void operationFailed(MetaStoreException ex) {
});
return result;
}

protected void unloadTopic(DestinationName destination, boolean authoritative) {
validateSuperUserAccess();
validateDestinationOwnership(destination, authoritative);
Expand Down
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.broker.service;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.broker.service.persistent.PersistentTopic.DATE_FORMAT;
import static org.apache.pulsar.common.api.Commands.readChecksum;

import java.time.Instant;
Expand All @@ -33,19 +32,17 @@
import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.util.DateFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -117,7 +114,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
stats = new ConsumerStats();
stats.address = cnx.clientAddress().toString();
stats.consumerName = consumerName;
stats.connectedSince = DATE_FORMAT.format(Instant.now());
stats.connectedSince = DateFormatter.now();
stats.clientVersion = cnx.getClientVersion();

if (subType == SubType.Shared) {
Expand Down
Expand Up @@ -19,12 +19,10 @@
package org.apache.pulsar.broker.service;

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.pulsar.broker.service.persistent.PersistentTopic.DATE_FORMAT;
import static org.apache.pulsar.checksum.utils.Crc32cChecksum.computeChecksum;
import static org.apache.pulsar.common.api.Commands.hasChecksum;
import static org.apache.pulsar.common.api.Commands.readChecksum;

import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

Expand All @@ -38,6 +36,7 @@
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.util.DateFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -85,7 +84,7 @@ public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName
this.msgDrop = this.isNonPersistentTopic ? new Rate() : null;
this.stats = isNonPersistentTopic ? new NonPersistentPublisherStats() : new PublisherStats();
stats.address = cnx.clientAddress().toString();
stats.connectedSince = DATE_FORMAT.format(Instant.now());
stats.connectedSince = DateFormatter.now();
stats.clientVersion = cnx.getClientVersion();
stats.producerName = producerName;
stats.producerId = producerId;
Expand Down
Expand Up @@ -20,9 +20,8 @@

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.bookkeeper.mledger.impl.EntryCacheManager.create;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;

import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -76,16 +75,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;
import com.carrotsearch.hppc.ObjectObjectHashMap;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.RecyclableDuplicateByteBuf;
import io.netty.util.concurrent.FastThreadLocal;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;

public class NonPersistentTopic implements Topic {
private final String topic;
Expand Down Expand Up @@ -113,9 +111,6 @@ public class NonPersistentTopic implements Topic {

private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

public static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSZ")
.withZone(ZoneId.systemDefault());

// Timestamp of when this topic was last seen active
private volatile long lastActive;

Expand Down
Expand Up @@ -92,6 +92,7 @@
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
Expand Down Expand Up @@ -143,8 +144,6 @@ public class PersistentTopic implements Topic, AddEntryCallback {

private static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;

public static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSZ").withZone(ZoneId.systemDefault());

// Timestamp of when this topic was last seen active
private volatile long lastActive;

Expand Down Expand Up @@ -1187,10 +1186,9 @@ public PersistentTopicInternalStats getInternalStats() {
stats.totalSize = ml.getTotalSize();
stats.currentLedgerEntries = ml.getCurrentLedgerEntries();
stats.currentLedgerSize = ml.getCurrentLedgerSize();
stats.lastLedgerCreatedTimestamp = DATE_FORMAT.format(Instant.ofEpochMilli(ml.getLastLedgerCreatedTimestamp()));
stats.lastLedgerCreatedTimestamp = DateFormatter.format(ml.getLastLedgerCreatedTimestamp());
if (ml.getLastLedgerCreationFailureTimestamp() != 0) {
stats.lastLedgerCreationFailureTimestamp = DATE_FORMAT
.format(Instant.ofEpochMilli(ml.getLastLedgerCreationFailureTimestamp()));
stats.lastLedgerCreationFailureTimestamp = DateFormatter.format(ml.getLastLedgerCreationFailureTimestamp());
}

stats.waitingCursorsCount = ml.getWaitingCursorsCount();
Expand Down Expand Up @@ -1220,7 +1218,7 @@ public PersistentTopicInternalStats getInternalStats() {
cs.cursorLedger = cursor.getCursorLedger();
cs.cursorLedgerLastEntry = cursor.getCursorLedgerLastEntry();
cs.individuallyDeletedMessages = cursor.getIndividuallyDeletedMessages();
cs.lastLedgerSwitchTimestamp = DATE_FORMAT.format(Instant.ofEpochMilli(cursor.getLastLedgerSwitchTimestamp()));
cs.lastLedgerSwitchTimestamp = DateFormatter.format(cursor.getLastLedgerSwitchTimestamp());
cs.state = cursor.getState();
cs.numberOfEntriesSinceFirstNotAckedMessage = cursor.getNumberOfEntriesSinceFirstNotAckedMessage();
cs.totalNonContiguousDeletedMessagesRange = cursor.getTotalNonContiguousDeletedMessagesRange();
Expand Down
Expand Up @@ -55,7 +55,7 @@
import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;

import org.apache.pulsar.common.util.DateFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -824,7 +824,7 @@ void connectionOpened(final ClientCnx cnx) {

log.info("[{}] [{}] Created producer on cnx {}", topic, producerName, cnx.ctx().channel());
connectionId = cnx.ctx().channel().toString();
connectedSince = DATE_FORMAT.format(Instant.now());
connectedSince = DateFormatter.now();

if (this.producerName == null) {
this.producerName = producerName;
Expand Down Expand Up @@ -1239,9 +1239,6 @@ public int getPendingQueueSize() {
return pendingMessages.size();
}

private static DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSZ")
.withZone(ZoneId.systemDefault());

private PulsarApi.CompressionType convertCompressionType(CompressionType compressionType) {
switch (compressionType) {
case NONE:
Expand Down
@@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.util;

import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;

/**
* Date-time String formatter utility class
*/
public class DateFormatter {

private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ISO_OFFSET_DATE_TIME
.withZone(ZoneId.systemDefault());

/**
* @return a String representing the current datetime
*/
public static String now() {
return format(Instant.now());
}

/**
* @return a String representing a particular timestamp (in milliseconds)
*/
public static String format(long timestamp) {
return format(Instant.ofEpochMilli(timestamp));
}

/**
* @return a String representing a particular time instant
*/
public static String format(Instant instant) {
return DATE_FORMAT.format(instant);
}

private DateFormatter() {
}
}
Expand Up @@ -21,9 +21,6 @@
import static com.google.common.base.Preconditions.checkArgument;

import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand All @@ -39,6 +36,7 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ConsumerAck;
import org.apache.pulsar.websocket.data.ConsumerMessage;
Expand Down Expand Up @@ -125,7 +123,10 @@ private void receiveMessage() {
dm.messageId = Base64.getEncoder().encodeToString(msg.getMessageId().toByteArray());
dm.payload = Base64.getEncoder().encodeToString(msg.getData());
dm.properties = msg.getProperties();
dm.publishTime = DATE_FORMAT.format(Instant.ofEpochMilli(msg.getPublishTime()));
dm.publishTime = DateFormatter.format(msg.getPublishTime());
if (msg.getEventTime() != 0) {
dm.eventTime = DateFormatter.format(msg.getEventTime());
}
if (msg.hasKey()) {
dm.key = msg.getKey();
}
Expand Down Expand Up @@ -285,8 +286,6 @@ private static String extractSubscription(HttpServletRequest request) {
return parts.get(8);
}

private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSZ").withZone(ZoneId.systemDefault());

private static final Logger log = LoggerFactory.getLogger(ConsumerHandler.class);

}

0 comments on commit bcc7404

Please sign in to comment.