Skip to content

Commit

Permalink
[minor]Reduce redundancy (#6611)
Browse files Browse the repository at this point in the history
This PR fixes the following four kinds of redundancy:
- redundant collection operations 
- redundant string operation
- redundant type cast 
- redundant array creation
  • Loading branch information
yjshen committed Mar 26, 2020
1 parent 8d129e0 commit 7de44a9
Show file tree
Hide file tree
Showing 48 changed files with 79 additions and 83 deletions.
Expand Up @@ -114,7 +114,7 @@ public ByteBuf getDataBuffer() {

@Override
public byte[] getData() {
byte[] array = new byte[(int) data.readableBytes()];
byte[] array = new byte[data.readableBytes()];
data.getBytes(data.readerIndex(), array);
return array;
}
Expand Down
Expand Up @@ -1120,7 +1120,7 @@ public synchronized void readEntryFailed(ManagedLedgerException mle, Object ctx)
if (((PositionImpl) p).compareTo(this.readPosition) == 0) {
this.setReadPosition(this.readPosition.getNext());
log.warn("[{}][{}] replayPosition{} equals readPosition{}," + " need set next readPositio",
ledger.getName(), name, (PositionImpl) p, this.readPosition);
ledger.getName(), name, p, this.readPosition);
}
ledger.asyncReadEntry((PositionImpl) p, cb, ctx);
});
Expand Down Expand Up @@ -1474,7 +1474,7 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
}
callback.markDeleteFailed(
new ManagedLedgerException("Reset cursor in progress - unable to mark delete position "
+ ((PositionImpl) position).toString()),
+ position.toString()),
ctx);
}

Expand Down
Expand Up @@ -1925,7 +1925,7 @@ private boolean hasLedgerRetentionExpired(long ledgerTimestamp) {
private boolean isLedgerRetentionOverSizeQuota() {
// Handle the -1 size limit as "infinite" size quota
return config.getRetentionSizeInMB() >= 0
&& TOTAL_SIZE_UPDATER.get(this) > ((long) config.getRetentionSizeInMB()) * 1024 * 1024;
&& TOTAL_SIZE_UPDATER.get(this) > config.getRetentionSizeInMB() * 1024 * 1024;
}

private boolean isOffloadedNeedsDelete(OffloadContext offload) {
Expand Down
Expand Up @@ -220,7 +220,7 @@ private void calculateCursorBacklogs(final ManagedLedgerFactoryImpl factory, fin
MetaStore store = factory.getMetaStore();
BookKeeper bk = factory.getBookKeeper();
final CountDownLatch allCursorsCounter = new CountDownLatch(1);
final long errorInReadingCursor = (long) -1;
final long errorInReadingCursor = -1;
ConcurrentOpenHashMap<String, Long> ledgerRetryMap = new ConcurrentOpenHashMap<>();

final MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = ledgers.lastEntry().getValue();
Expand Down
Expand Up @@ -146,7 +146,7 @@ public void asyncReadEntry(PositionImpl position, AsyncCallbacks.ReadEntryCallba
this.getLedgerHandle(position.getLedgerId()).thenAccept((ledger) -> {
asyncReadEntry(ledger, position, callback, ctx);
}).exceptionally((ex) -> {
log.error("[{}] Error opening ledger for reading at position {} - {}", new Object[]{this.name, position, ex.getMessage()});
log.error("[{}] Error opening ledger for reading at position {} - {}", this.name, position, ex.getMessage());
callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), ctx);
return null;
});
Expand Down
Expand Up @@ -76,8 +76,8 @@ private SaslServer createSaslServer(final Subject subject)
int indexOf = servicePrincipalNameAndHostname.indexOf("/");

// e.g. serviceHostnameAndKerbDomain := "myhost.foo.com@EXAMPLE.COM"
final String serviceHostnameAndKerbDomain = servicePrincipalNameAndHostname.substring(indexOf + 1,
servicePrincipalNameAndHostname.length());
final String serviceHostnameAndKerbDomain = servicePrincipalNameAndHostname.substring(indexOf + 1
);
int indexOfAt = serviceHostnameAndKerbDomain.indexOf("@");

// Handle Kerberos Service as well as User Principal Names
Expand Down
Expand Up @@ -79,7 +79,7 @@ public boolean hasDataFromTls() {
@Override
public Certificate[] getTlsCertificates() {
try {
return (Certificate[]) sslSession.getPeerCertificates();
return sslSession.getPeerCertificates();
} catch (SSLPeerUnverifiedException e) {
return null;
}
Expand Down
Expand Up @@ -111,7 +111,7 @@ public ClusterData deserialize(String path, byte[] content) throws Exception {
@Override
@SuppressWarnings("unchecked")
public NamespaceIsolationPolicies deserialize(String path, byte[] content) throws Exception {
return new NamespaceIsolationPolicies((Map<String, NamespaceIsolationData>) ObjectMapperFactory
return new NamespaceIsolationPolicies(ObjectMapperFactory
.getThreadLocal().readValue(content, new TypeReference<Map<String, NamespaceIsolationData>>() {
}));
}
Expand Down
Expand Up @@ -130,9 +130,9 @@ public static boolean isComplete(Object obj) throws IllegalArgumentException {
if (log.isDebugEnabled()) {
log.debug("Validating configuration field '{}' = '{}'", field.getName(), value);
}
boolean isRequired = ((FieldContext) field.getAnnotation(FieldContext.class)).required();
long minValue = ((FieldContext) field.getAnnotation(FieldContext.class)).minValue();
long maxValue = ((FieldContext) field.getAnnotation(FieldContext.class)).maxValue();
boolean isRequired = field.getAnnotation(FieldContext.class).required();
long minValue = field.getAnnotation(FieldContext.class).minValue();
long maxValue = field.getAnnotation(FieldContext.class).maxValue();
if (isRequired && isEmpty(value)) {
error.append(String.format("Required %s is null,", field.getName()));
}
Expand Down
Expand Up @@ -326,7 +326,7 @@ protected void internalDeleteNamespaceBundle(String bundleRange, boolean authori
try {
List<String> topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join();
for (String topic : topics) {
NamespaceBundle topicBundle = (NamespaceBundle) pulsar().getNamespaceService()
NamespaceBundle topicBundle = pulsar().getNamespaceService()
.getBundle(TopicName.get(topic));
if (bundle.equals(topicBundle)) {
throw new RestException(Status.CONFLICT, "Cannot delete non empty bundle");
Expand Down Expand Up @@ -1744,7 +1744,7 @@ protected List<String> internalGetAntiAffinityNamespaces(String cluster, String
return namespaces.stream().filter(ns -> {
Optional<Policies> policies;
try {
policies = policiesCache().get(AdminResource.path(POLICIES, ns.toString()));
policies = policiesCache().get(AdminResource.path(POLICIES, ns));
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -1804,7 +1804,7 @@ private boolean checkQuotas(Policies policies, RetentionPolicies retention) {
if (quota.getLimit() < 0 && (retention.getRetentionSizeInMB() > 0 || retention.getRetentionTimeInMinutes() > 0)) {
return false;
}
if (quota.getLimit() >= ((long) retention.getRetentionSizeInMB() * 1024 * 1024)) {
if (quota.getLimit() >= (retention.getRetentionSizeInMB() * 1024 * 1024)) {
return false;
}
return true;
Expand Down
Expand Up @@ -2529,7 +2529,7 @@ protected void internalGetLastMessageId(AsyncResponse asyncResponse, boolean aut
return;
}

((PersistentTopic) topic).getLastMessageId().whenComplete((v, e) -> {
topic.getLastMessageId().whenComplete((v, e) -> {
if (e != null) {
asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()));
} else {
Expand Down
Expand Up @@ -81,7 +81,7 @@ void doCalculateBrokerHostUsage() {
}

private double getTotalCpuLimit() {
return (double) (100 * Runtime.getRuntime().availableProcessors());
return 100 * Runtime.getRuntime().availableProcessors();
}

private double getTotalCpuUsage() {
Expand Down
Expand Up @@ -123,7 +123,7 @@ public void calculateBrokerHostUsage() {
}

private double getTotalCpuLimit() {
return (double) (100 * Runtime.getRuntime().availableProcessors());
return 100 * Runtime.getRuntime().availableProcessors();
}

/**
Expand Down Expand Up @@ -189,7 +189,7 @@ private double getTotalNicLimitKbps(List<String> nics) {
if (overrideBrokerNicSpeedGbps.isPresent()) {
// Use the override value as configured. Return the total max speed across all available NICs, converted
// from Gbps into Kbps
return ((double) overrideBrokerNicSpeedGbps.get()) * nics.size() * 1024 * 1024;
return overrideBrokerNicSpeedGbps.get() * nics.size() * 1024 * 1024;
}

// Nic speed is in Mbits/s, return kbits/s
Expand Down
Expand Up @@ -209,7 +209,7 @@ public static String getBundleRangeFromBundleName(String bundleName) {
// the bundle format is property/cluster/namespace/0x00000000_0xFFFFFFFF
int pos = bundleName.lastIndexOf("/");
checkArgument(pos != -1);
return bundleName.substring(pos + 1, bundleName.length());
return bundleName.substring(pos + 1);
}

// From a full bundle name, extract the namespace name.
Expand Down
Expand Up @@ -519,9 +519,7 @@ private void updateBundleData() {
}

// This is needed too in case a broker which was assigned a bundle dies and comes back up.
if ( preallocatedBundleToBroker.containsKey(preallocatedBundleName) ) {
preallocatedBundleToBroker.remove(preallocatedBundleName);
}
preallocatedBundleToBroker.remove(preallocatedBundleName);
}
}

Expand Down
Expand Up @@ -250,7 +250,7 @@ public static String getReplicatorName(String replicatorPrefix, String cluster)
private void validatePartitionedTopic(String topic, BrokerService brokerService) throws NamingException {
TopicName topicName = TopicName.get(topic);
String partitionedTopicPath = path(AdminResource.PARTITIONED_TOPIC_PATH_ZNODE,
topicName.getNamespace().toString(), topicName.getDomain().toString(),
topicName.getNamespace(), topicName.getDomain().toString(),
topicName.getEncodedLocalName());
boolean isPartitionedTopic = false;
try {
Expand Down
Expand Up @@ -639,7 +639,7 @@ public void unloadNamespaceBundlesGracefully() {
serviceUnits.forEach(su -> {
if (su instanceof NamespaceBundle) {
try {
pulsar.getNamespaceService().unloadNamespaceBundle((NamespaceBundle) su, 1, TimeUnit.MINUTES);
pulsar.getNamespaceService().unloadNamespaceBundle(su, 1, TimeUnit.MINUTES);
} catch (Exception e) {
log.warn("Failed to unload namespace bundle {}", su, e);
}
Expand Down Expand Up @@ -1780,7 +1780,7 @@ private static ConcurrentOpenHashMap<String, ConfigField> prepareDynamicConfigur
for (Field field : ServiceConfiguration.class.getDeclaredFields()) {
if (field != null && field.isAnnotationPresent(FieldContext.class)) {
field.setAccessible(true);
if (((FieldContext) field.getAnnotation(FieldContext.class)).dynamic()) {
if (field.getAnnotation(FieldContext.class).dynamic()) {
dynamicConfigurationMap.put(field.getName(), new ConfigField(field));
}
}
Expand Down
Expand Up @@ -321,7 +321,7 @@ public CompletableFuture<List<SchemaAndMetadata>> trimDeletedSchemaAndGetList(St
for (int i = lastIndex; i >= 0; i--) {
if (list.get(i).schema.isDeleted()) {
if (i == lastIndex) { // if the latest schema is a delete, there's no schemas to compare
return Collections.<SchemaAndMetadata>emptyList();
return Collections.emptyList();
} else {
return list.subList(i + 1, list.size());
}
Expand Down
Expand Up @@ -126,6 +126,6 @@ public List<MetricFamilySamples> collect() {

@Override
public List<MetricFamilySamples> describe() {
return Collections.<MetricFamilySamples> singletonList(new SummaryMetricFamily(fullname, help, labelNames));
return Collections.singletonList(new SummaryMetricFamily(fullname, help, labelNames));
}
}
Expand Up @@ -116,7 +116,7 @@ public static String joinPath(String... parts) {
public static String splitPath(String source, int slice) {
Iterable<String> parts = Splitter.on('/').limit(slice).split(source);
Iterator<String> s = parts.iterator();
String result = new String();
String result = "";
for (int i = 0; i < slice; i++) {
result = s.next();
}
Expand Down
Expand Up @@ -1247,7 +1247,7 @@ private List<Message<byte[]>> getMessageFromHttpResponse(String topic, Response
for (Entry<String, List<Object>> entry : headers.entrySet()) {
String header = entry.getKey();
if (header.contains("X-Pulsar-PROPERTY-")) {
String keyName = header.substring("X-Pulsar-PROPERTY-".length(), header.length());
String keyName = header.substring("X-Pulsar-PROPERTY-".length());
properties.put(keyName, (String) entry.getValue().get(0));
}
}
Expand Down
Expand Up @@ -48,8 +48,8 @@ public int getEnd() {
}

public Range intersect(Range range) {
int start = range.getStart() > this.getStart() ? range.getStart() : this.getStart();
int end = range.getEnd() < this.getEnd() ? range.getEnd() : this.getEnd();
int start = Math.max(range.getStart(), this.getStart());
int end = Math.min(range.getEnd(), this.getEnd());
if (end >= start) {
return Range.of(start, end);
} else {
Expand Down
Expand Up @@ -104,16 +104,16 @@ public static MessageId newMessageIdFromByteArrayWithTopic(byte[] data, String t
}

public static Authentication newAuthenticationToken(String token) {
return catchExceptions(() -> (Authentication) AUTHENTICATION_TOKEN_String.newInstance(token));
return catchExceptions(() -> AUTHENTICATION_TOKEN_String.newInstance(token));
}

public static Authentication newAuthenticationToken(Supplier<String> supplier) {
return catchExceptions(() -> (Authentication) AUTHENTICATION_TOKEN_Supplier.newInstance(supplier));
return catchExceptions(() -> AUTHENTICATION_TOKEN_Supplier.newInstance(supplier));
}

public static Authentication newAuthenticationTLS(String certFilePath, String keyFilePath) {
return catchExceptions(
() -> (Authentication) AUTHENTICATION_TLS_String_String.newInstance(certFilePath, keyFilePath));
() -> AUTHENTICATION_TLS_String_String.newInstance(certFilePath, keyFilePath));
}

public static Authentication createAuthentication(String authPluginClassName, String authParamsString)
Expand Down
Expand Up @@ -570,7 +570,7 @@ public void connectionOpened(final ClientCnx cnx) {
builder.setLedgerId(startMessageId.getLedgerId());
builder.setEntryId(startMessageId.getEntryId());
if (startMessageId instanceof BatchMessageIdImpl) {
builder.setBatchIndex(((BatchMessageIdImpl) startMessageId).getBatchIndex());
builder.setBatchIndex(startMessageId.getBatchIndex());
}

startMessageIdData = builder.build();
Expand Down
Expand Up @@ -217,7 +217,7 @@ private PublicKey loadPublicKey(byte[] keyBytes) throws Exception {
ecParam.getH(), ecParam.getSeed());
KeyFactory keyFactory = KeyFactory.getInstance(ECDSA, BouncyCastleProvider.PROVIDER_NAME);
ECPublicKeySpec keySpec = new ECPublicKeySpec(((BCECPublicKey) publicKey).getQ(), ecSpec);
publicKey = (PublicKey) keyFactory.generatePublic(keySpec);
publicKey = keyFactory.generatePublic(keySpec);
}
} catch (IOException | NoSuchAlgorithmException | NoSuchProviderException | InvalidKeySpecException e) {
throw new Exception(e);
Expand Down Expand Up @@ -270,7 +270,7 @@ private PrivateKey loadPrivateKey(byte[] keyBytes) throws Exception {
ecParam.getH(), ecParam.getSeed());
KeyFactory keyFactory = KeyFactory.getInstance(ECDSA, BouncyCastleProvider.PROVIDER_NAME);
ECPrivateKeySpec keySpec = new ECPrivateKeySpec(((BCECPrivateKey) privateKey).getS(), ecSpec);
privateKey = (PrivateKey) keyFactory.generatePrivate(keySpec);
privateKey = keyFactory.generatePrivate(keySpec);
}

} catch (IOException e) {
Expand Down
Expand Up @@ -716,7 +716,7 @@ private boolean topicNameValid(String topicName) {
checkArgument(!topics.containsKey(topicName), "Topics already contains topic:" + topicName);

if (this.namespaceName != null) {
checkArgument(TopicName.get(topicName).getNamespace().toString().equals(this.namespaceName.toString()),
checkArgument(TopicName.get(topicName).getNamespace().equals(this.namespaceName.toString()),
"Topic " + topicName + " not in same namespace with Topics");
}

Expand Down Expand Up @@ -871,7 +871,7 @@ private void doSubscribeTopicPartitions(Schema<T> schema,
.filter(consumer1 -> {
String consumerTopicName = consumer1.getTopic();
if (TopicName.get(consumerTopicName).getPartitionedTopicName().equals(
TopicName.get(topicName).getPartitionedTopicName().toString())) {
TopicName.get(topicName).getPartitionedTopicName())) {
return true;
} else {
return false;
Expand Down Expand Up @@ -1194,7 +1194,7 @@ public CompletableFuture<MessageId> getLastMessageIdAsync() {
CompletableFuture
.allOf(messageIdFutures.entrySet().stream().map(Map.Entry::getValue).toArray(CompletableFuture<?>[]::new))
.whenComplete((ignore, ex) -> {
Builder<String, MessageId> builder = ImmutableMap.<String, MessageId>builder();
Builder<String, MessageId> builder = ImmutableMap.builder();
messageIdFutures.forEach((key, future) -> {
MessageId messageId;
try {
Expand Down
Expand Up @@ -45,7 +45,7 @@ class NegativeAcksTracker {

public NegativeAcksTracker(ConsumerBase<?> consumer, ConsumerConfigurationData<?> conf) {
this.consumer = consumer;
this.timer = ((PulsarClientImpl) consumer.getClient()).timer();
this.timer = consumer.getClient().timer();
this.nackDelayNanos = Math.max(TimeUnit.MICROSECONDS.toNanos(conf.getNegativeAckRedeliveryDelayMicros()),
MIN_NACK_DELAY_NANOS);
this.timerIntervalNanos = nackDelayNanos / 3;
Expand Down
Expand Up @@ -1702,7 +1702,7 @@ private static ByteBufPair serializeCommandMessageWithSize(BaseCommand cmd, Byte
throw new RuntimeException(e);
}

return (ByteBufPair) ByteBufPair.get(headers, metadataAndPayload);
return ByteBufPair.get(headers, metadataAndPayload);
}

public static int getNumberOfMessagesInBatch(ByteBuf metadataAndPayload, String subscription,
Expand Down

0 comments on commit 7de44a9

Please sign in to comment.