Skip to content

Commit

Permalink
Fix miscellaneous issues from static code analysis 2 (#2899)
Browse files Browse the repository at this point in the history
We bring back Codacy for static code analysis. There are more than 200 issues marked with error-prone by Codacy. This commit fixes a few of them(~50), which are small miscellaneous changes. Others will be solved in subsequent commits.
  • Loading branch information
chetangudisagar committed Apr 12, 2021
1 parent 5349243 commit 59786d8
Show file tree
Hide file tree
Showing 31 changed files with 222 additions and 265 deletions.
29 changes: 18 additions & 11 deletions cmdlets/src/main/clojure/org/corfudb/shell.clj 100644 → 100755
Expand Up @@ -8,6 +8,8 @@
(import org.corfudb.runtime.clients.SequencerClient)
(import org.corfudb.runtime.clients.LogUnitClient)
(import org.corfudb.runtime.clients.ManagementClient)
(import org.corfudb.util.NodeLocator)
(import org.corfudb.runtime.CorfuRuntime$CorfuRuntimeParameters)
(use 'clojure.reflect)

(defn -class-starts-with [obj name] (if (nil? obj) false (.. (.. (.. obj (getClass)) (getName)) (startsWith name))))
Expand Down Expand Up @@ -114,19 +116,24 @@ The variable *r holds the last runtime obtrained, and *o holds the last router o
([endpoint] (get-router endpoint nil))
([endpoint opts] (do
(cond
(nil? opts) (def *o (new NettyClientRouter (get-host endpoint) (get-port endpoint)))
(nil? opts)
(def *o (new NettyClientRouter (get-host endpoint) (get-port endpoint)))
(.. opts (get "--enable-tls"))
(def *o (new NettyClientRouter
(get-host endpoint)
(get-port endpoint)
(.. opts (get "--enable-tls"))
(.. opts (get "--keystore"))
(.. opts (get "--keystore-password-file"))
(.. opts (get "--truststore"))
(.. opts (get "--truststore-password-file"))
(.. opts (get "--enable-sasl-plain-text-auth"))
(.. opts (get "--sasl-plain-text-username-file"))
(.. opts (get "--sasl-plain-text-password-file"))))
(-> (NodeLocator/builder)
(.host (get-host endpoint))
(.port (get-port endpoint))
(.build))
(-> (CorfuRuntime$CorfuRuntimeParameters/builder)
(.tls-enabled (.. opts (get "--enable-tls")))
(.keystore (.. opts (get "--keystore")))
(.ks-password-file (.. opts (get "--keystore-password-file")))
(.truststore (.. opts (get "--truststore")))
(.ts-password-file (.. opts (get "--truststore-password-file")))
(.sasl-plain-text-enabled (.. opts (get "--enable-sasl-plain-text-auth")))
(.username-file (.. opts (get "--sasl-plain-text-username-file")))
(.password-file (.. opts (get "--sasl-plain-text-password-file")))
(.build))))
:else (def *o (new NettyClientRouter (get-host endpoint) (get-port endpoint))))
(add-client (new org.corfudb.runtime.clients.LayoutHandler))
(add-client (new org.corfudb.runtime.clients.LogUnitHandler))
Expand Down
Expand Up @@ -9,8 +9,6 @@
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.binder.cache.CaffeineCacheMetrics;
import java.util.Optional;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.corfudb.common.metrics.micrometer.MeterRegistryProvider;
import org.corfudb.common.util.Memory;
Expand All @@ -19,6 +17,9 @@
import org.corfudb.protocols.wireprotocol.ILogData;
import org.corfudb.protocols.wireprotocol.LogData;

import java.util.Optional;
import java.util.function.Supplier;

import static java.lang.Math.toIntExact;

/**
Expand All @@ -42,12 +43,9 @@ public class LogUnitServerCache {
private final int MAX_STREAM_THRESHOLD = 20;

private final Optional<Timer> readTimer;
private final Optional<Gauge> loadTime;
String loadTimeName = "logunit.cache.load_time";
private final Optional<Gauge> hitRatio;
String hitRatioName = "logunit.cache.hit_ratio";
private final Optional<Gauge> weight;
String weightName = "logunit.cache.weight";
private final String loadTimeName = "logunit.cache.load_time";
private final String hitRatioName = "logunit.cache.hit_ratio";
private final String weightName = "logunit.cache.weight";

public LogUnitServerCache(LogUnitServerConfig config, StreamLog streamLog) {
this.streamLog = streamLog;
Expand All @@ -61,14 +59,14 @@ public LogUnitServerCache(LogUnitServerConfig config, StreamLog streamLog) {

MeterRegistryProvider.getInstance().ifPresent(registry ->
CaffeineCacheMetrics.monitor(registry, dataCache, "logunit.read_cache"));
hitRatio = MeterRegistryProvider.getInstance().map(registry ->
MeterRegistryProvider.getInstance().map(registry ->
Gauge.builder(hitRatioName,
dataCache, cache -> cache.stats().hitRate()).register(registry));
loadTime = MeterRegistryProvider.getInstance().map(registry ->
MeterRegistryProvider.getInstance().map(registry ->
Gauge.builder(loadTimeName,
dataCache, cache -> cache.stats().totalLoadTime())
.register(registry));
weight = MeterRegistryProvider.getInstance().map(registry ->
MeterRegistryProvider.getInstance().map(registry ->
Gauge.builder(weightName,
dataCache, cache -> cache.stats().evictionWeight())
.register(registry));
Expand Down
Expand Up @@ -87,8 +87,8 @@ public class SequencerServerCache {
*
* @param cacheSize cache size
*/
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private final Optional<DistributionSummary> evictionsPerTrimCall;
private final Optional<Gauge> windowSize;

public SequencerServerCache(int cacheSize, long maxConflictNewSequencer) {
this.cacheSize = cacheSize;
Expand All @@ -111,7 +111,7 @@ public SequencerServerCache(int cacheSize, long maxConflictNewSequencer) {
.publishPercentileHistogram()
.baseUnit("eviction")
.register(registry));
windowSize = MeterRegistryProvider.getInstance().map(registry ->
MeterRegistryProvider.getInstance().map(registry ->
Gauge.builder(windowSizeName,
conflictKeys, HashMap::size).register(registry));

Expand Down
Expand Up @@ -32,7 +32,6 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;

/**
Expand Down Expand Up @@ -77,8 +76,6 @@ public class DefaultClusterManager extends CorfuReplicationClusterManagerBaseAda
@Getter
public ClusterManagerCallback clusterManagerCallback;

private Thread thread;

private CorfuRuntime corfuRuntime;

private CorfuStore corfuStore;
Expand Down Expand Up @@ -111,7 +108,7 @@ public void start() {
corfuStore.subscribe(configStreamListener, CONFIG_NAMESPACE,
Collections.singletonList(new TableSchema(CONFIG_TABLE_NAME,
UuidMsg.class, UuidMsg.class, UuidMsg.class)), ts);
thread = new Thread(clusterManagerCallback);
Thread thread = new Thread(clusterManagerCallback);
thread.start();
}

Expand Down
Expand Up @@ -194,11 +194,6 @@ public class LogReplicationFSM {
*/
private final SnapshotSender snapshotSender;

/**
* Remote Cluster Descriptor to which this FSM drives the log replication
*/
private final ClusterDescriptor remoteCluster;

/**
* Ack Reader for Snapshot and Log Entry Syncs
*/
Expand Down Expand Up @@ -232,7 +227,7 @@ public LogReplicationFSM(CorfuRuntime runtime, LogReplicationConfig config, Clus
* @param dataSender application callback for snapshot and log entry sync messages
* @param logEntryReader log entry logreader implementation
* @param readProcessor read processor (for data transformation)
* @param remoteCluster remote cluster descriptor
* @param remoteCluster Remote Cluster Descriptor to which this FSM drives the log replication
* @param workers FSM executor service for state tasks
*/
@VisibleForTesting
Expand All @@ -242,7 +237,6 @@ public LogReplicationFSM(CorfuRuntime runtime, SnapshotReader snapshotReader, Da

this.snapshotReader = snapshotReader;
this.logEntryReader = logEntryReader;
this.remoteCluster = remoteCluster;
this.ackReader = ackReader;

// Create transmitters to be used by the the sync states (Snapshot and LogEntry) to read and send data
Expand Down
Expand Up @@ -327,7 +327,7 @@ private void applyShadowStream(UUID streamId, long snapshot) {
// This variable reflects the minimum timestamp for all shadow streams in the current snapshot cycle.
// We seek up to this address, assuming that no trim should occur beyond this snapshot start
long currentMinShadowStreamTimestamp = logReplicationMetadataManager.getMinSnapshotSyncShadowStreamTs();
OpaqueStream shadowOpaqueStream = new OpaqueStream(rt, rt.getStreamsView().get(shadowStreamId, options));
OpaqueStream shadowOpaqueStream = new OpaqueStream(rt.getStreamsView().get(shadowStreamId, options));
shadowOpaqueStream.seek(currentMinShadowStreamTimestamp);
Stream shadowStream = shadowOpaqueStream.streamUpTo(snapshot);

Expand Down
@@ -1,11 +1,9 @@
package org.corfudb.infrastructure.logreplication.replication.send.logreader;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.google.protobuf.TextFormat;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.netty.buffer.ByteBuf;
import lombok.Getter;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -47,8 +45,6 @@
*/
public class StreamsLogEntryReader implements LogEntryReader {

private CorfuRuntime rt;

private final LogReplicationEntryType MSG_TYPE = LogReplicationEntryType.LOG_ENTRY_MESSAGE;

// Set of UUIDs for the corresponding streams
Expand Down Expand Up @@ -85,8 +81,7 @@ public class StreamsLogEntryReader implements LogEntryReader {
private StreamIteratorMetadata currentProcessedEntryMetadata;

public StreamsLogEntryReader(CorfuRuntime runtime, LogReplicationConfig config) {
this.rt = runtime;
this.rt.parseConfigurationString(runtime.getLayoutServers().get(0)).connect();
runtime.parseConfigurationString(runtime.getLayoutServers().get(0)).connect();
this.maxDataSizePerMsg = config.getMaxDataSizePerMsg();
this.currentProcessedEntryMetadata = new StreamIteratorMetadata(Address.NON_ADDRESS, false);
this.messageSizeDistributionSummary = configureMessageSizeDistributionSummary();
Expand All @@ -103,7 +98,7 @@ public StreamsLogEntryReader(CorfuRuntime runtime, LogReplicationConfig config)
log.debug("Streams to replicate total={}, stream_names={}, stream_ids={}", streamUUIDs.size(), streams, streamUUIDs);

//create an opaque stream for transaction stream
txOpaqueStream = new TxOpaqueStream(rt);
txOpaqueStream = new TxOpaqueStream(runtime);
}

private LogReplicationEntryMsg generateMessageWithOpaqueEntryList(
Expand Down Expand Up @@ -313,7 +308,7 @@ public static class TxOpaqueStream {
public TxOpaqueStream(CorfuRuntime rt) {
//create an opaque stream for transaction stream
this.rt = rt;
txStream = new OpaqueStream(rt, rt.getStreamsView().get(ObjectsView.TRANSACTION_STREAM_ID));
txStream = new OpaqueStream(rt.getStreamsView().get(ObjectsView.TRANSACTION_STREAM_ID));
streamUpTo();
}

Expand Down
@@ -1,6 +1,5 @@
package org.corfudb.infrastructure.logreplication.replication.send.logreader;

import com.google.protobuf.ByteString;
import com.google.protobuf.TextFormat;
import io.micrometer.core.instrument.DistributionSummary;
import lombok.Getter;
Expand Down Expand Up @@ -291,7 +290,7 @@ public static class OpaqueStreamIterator {
.ignoreTrimmed(false)
.cacheEntries(false)
.build();
Stream stream = (new OpaqueStream(rt, rt.getStreamsView().get(uuid, options))).streamUpTo(snapshot);
Stream stream = (new OpaqueStream(rt.getStreamsView().get(uuid, options))).streamUpTo(snapshot);
iterator = stream.iterator();
maxVersion = 0;
}
Expand Down
Expand Up @@ -36,8 +36,6 @@ public class LogReplicationStreamNameTableManager {

private ILogReplicationConfigAdapter logReplicationConfigAdapter;

private final CorfuRuntime corfuRuntime;

private String pluginConfigFilePath;

private CorfuStore corfuStore;
Expand All @@ -49,8 +47,7 @@ public class LogReplicationStreamNameTableManager {

public LogReplicationStreamNameTableManager(CorfuRuntime runtime, String pluginConfigFilePath) {
this.pluginConfigFilePath = pluginConfigFilePath;
this.corfuRuntime = runtime;
corfuStore = new CorfuStore(corfuRuntime);
corfuStore = new CorfuStore(runtime);

initStreamNameFetcherPlugin();
}
Expand Down
Expand Up @@ -25,8 +25,6 @@
public class RestoreRedundancyMergeSegmentsWorkflow implements IWorkflow {


private final RestoreRedundancyMergeSegmentsRequest request;

@Getter
private final UUID id;

Expand All @@ -41,7 +39,6 @@ public class RestoreRedundancyMergeSegmentsWorkflow implements IWorkflow {
public RestoreRedundancyMergeSegmentsWorkflow(
RestoreRedundancyMergeSegmentsRequest request) {
this.id = UUID.randomUUID();
this.request = request;
this.actions = ImmutableList.of(
RestoreRedundancyMergeSegments.builder()
.currentNode(request.getEndpoint())
Expand Down
Expand Up @@ -71,7 +71,6 @@ public class OrchestratorTest {
private Orchestrator orchestrator;

// Additional objects that need to be mocked or spied on.
private ServerContext mockServerContext;
private IServerRouter mockServerRouter;
private ChannelHandlerContext mockChannelHandlerContext;
private Orchestrator.WorkflowFactory workflowFactory;
Expand Down Expand Up @@ -148,7 +147,7 @@ private void sendAndValidateWorkflowDispatch(RequestPayloadMsg payload, UUID exp
*/
@Before
public void setup() {
mockServerContext = mock(ServerContext.class);
ServerContext mockServerContext = mock(ServerContext.class);
mockServerRouter = mock(IServerRouter.class);
mockChannelHandlerContext = mock(ChannelHandlerContext.class);
workflowFactory = spy(new Orchestrator.WorkflowFactory());
Expand Down
Expand Up @@ -17,7 +17,6 @@
import org.corfudb.universe.node.server.vm.VmCorfuServer;
import org.corfudb.universe.node.server.vm.VmCorfuServerParams;
import org.corfudb.universe.node.server.vm.VmCorfuServerParams.VmName;
import org.corfudb.universe.node.stress.vm.VmStress;
import org.corfudb.universe.universe.vm.VmManager;
import org.corfudb.universe.universe.vm.VmUniverseParams;

Expand Down Expand Up @@ -60,18 +59,10 @@ protected Node buildServer(VmCorfuServerParams nodeParams) {
.credentials(universeParams.getCredentials().getVmCredentials())
.build();

VmStress stress = VmStress.builder()
.params(params)
.universeParams(universeParams)
.vmManager(vmManager)
.commandHelper(commandHelper)
.build();

return VmCorfuServer.builder()
.universeParams(universeParams)
.params(params)
.vmManager(vmManager)
.stress(stress)
.remoteOperationHelper(commandHelper)
.loggingParams(loggingParams)
.build();
Expand Down
Expand Up @@ -8,6 +8,7 @@
import lombok.Getter;
import lombok.NonNull;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import org.corfudb.universe.node.Node.NodeParams;
import org.corfudb.universe.node.Node.NodeType;
import org.corfudb.universe.node.server.CorfuServer.Mode;
Expand All @@ -20,7 +21,7 @@
import java.util.Optional;
import java.util.Set;

@Builder(builderMethodName = "serverParamsBuilder")
@SuperBuilder(builderMethodName = "serverParamsBuilder")
@AllArgsConstructor
@EqualsAndHashCode
@ToString
Expand Down
Expand Up @@ -11,7 +11,6 @@
import org.corfudb.universe.node.server.CorfuServer;
import org.corfudb.universe.node.server.process.CorfuProcessManager;
import org.corfudb.universe.node.server.process.CorfuServerPath;
import org.corfudb.universe.node.stress.vm.VmStress;
import org.corfudb.universe.universe.vm.VmManager;
import org.corfudb.universe.universe.vm.VmUniverseParams;
import org.corfudb.universe.util.IpAddress;
Expand Down Expand Up @@ -39,9 +38,6 @@ public class VmCorfuServer extends AbstractCorfuServer<VmCorfuServerParams, VmUn
@NonNull
private final RemoteOperationHelper remoteOperationHelper;

@NonNull
private final VmStress stress;

@NonNull
private final CorfuProcessManager processManager;

Expand All @@ -51,11 +47,10 @@ public class VmCorfuServer extends AbstractCorfuServer<VmCorfuServerParams, VmUn
@Builder
public VmCorfuServer(
VmCorfuServerParams params, VmManager vmManager, VmUniverseParams universeParams,
VmStress stress, RemoteOperationHelper remoteOperationHelper, LoggingParams loggingParams) {
RemoteOperationHelper remoteOperationHelper, LoggingParams loggingParams) {
super(params, universeParams, loggingParams);
this.vmManager = vmManager;
this.ipAddress = getIpAddress();
this.stress = stress;
this.remoteOperationHelper = remoteOperationHelper;
this.serverPath = new CorfuServerPath(params);
this.processManager = new CorfuProcessManager(serverPath, params);
Expand Down

0 comments on commit 59786d8

Please sign in to comment.