From d15c0781c11c3b673d4af05a855b17a9b3efa625 Mon Sep 17 00:00:00 2001 From: Apex Dev Date: Fri, 7 Apr 2017 10:08:45 -0700 Subject: [PATCH] APEXCORE-691 Use type inference for generic instance creation --- .../com/datatorrent/api/AffinityRule.java | 2 +- .../java/com/datatorrent/api/Attribute.java | 6 +- .../java/com/datatorrent/api/Context.java | 76 +++++++++---------- .../java/com/datatorrent/api/StringCodec.java | 8 +- .../org/apache/apex/api/YarnAppLauncher.java | 6 +- .../com/datatorrent/api/AttributeMapTest.java | 2 +- .../bufferserver/internal/DataList.java | 10 +-- .../bufferserver/internal/LogicalNode.java | 4 +- .../bufferserver/server/Server.java | 4 +- .../datatorrent/bufferserver/util/System.java | 2 +- .../packet/SubscribeRequestTupleTest.java | 2 +- .../bufferserver/support/Subscriber.java | 2 +- .../partitioner/StatelessPartitioner.java | 18 ++--- .../common/security/SecurityContext.java | 6 +- .../common/util/DefaultDelayOperator.java | 2 +- .../util/JacksonObjectMapperProvider.java | 6 +- .../common/util/PubSubMessageCodec.java | 14 ++-- .../partitioner/StatelessPartitionerTest.java | 18 ++--- .../common/util/SerializableObjectTest.java | 4 +- .../BlacklistBasedResourceRequestHandler.java | 4 +- .../stram/ResourceRequestHandler.java | 4 +- .../stram/client/PermissionsInfo.java | 8 +- .../stram/client/StramClientUtils.java | 2 +- .../stram/engine/OperatorContext.java | 4 +- .../stram/plan/logical/LogicalPlan.java | 10 +-- .../stram/plan/physical/PhysicalPlan.java | 4 +- .../stram/util/PubSubWebSocketClient.java | 4 +- .../stram/webapp/asm/CompactUtil.java | 2 +- .../datatorrent/stram/webapp/asm/Type.java | 4 +- .../com/datatorrent/stram/CheckpointTest.java | 4 +- .../GenericOperatorPropertyCodecTest.java | 2 +- .../datatorrent/stram/PartitioningTest.java | 24 +++--- .../datatorrent/stram/StreamCodecTest.java | 8 +- .../stram/StreamingContainerManagerTest.java | 4 +- .../stram/cli/ApexCliMiscTest.java | 2 +- .../datatorrent/stram/cli/ApexCliTest.java | 2 +- .../codec/DefaultStatefulStreamCodecTest.java | 8 +- .../stram/engine/AtMostOnceTest.java | 2 +- .../stram/engine/GenericNodeTest.java | 12 +-- .../stram/engine/InputOperatorTest.java | 10 +-- .../datatorrent/stram/engine/NodeTest.java | 2 +- .../stram/engine/ProcessingModeTests.java | 8 +- .../datatorrent/stram/engine/SliderTest.java | 2 +- .../datatorrent/stram/engine/StatsTest.java | 6 +- .../stram/engine/StreamingContainerTest.java | 2 +- .../engine/TestGeneratorInputOperator.java | 4 +- .../stram/engine/WindowGeneratorTest.java | 2 +- .../moduleexperiment/InjectConfigTest.java | 2 +- .../stram/plan/StreamPersistanceTests.java | 8 +- .../stram/plan/TestPlanContext.java | 2 +- .../logical/LogicalPlanConfigurationTest.java | 6 +- .../stram/plan/logical/LogicalPlanTest.java | 12 +-- .../plan/logical/module/ModuleAppTest.java | 10 +-- .../logical/module/TestModuleExpansion.java | 2 +- .../plan/logical/module/TestModules.java | 4 +- .../stram/plan/physical/PhysicalPlanTest.java | 26 +++---- .../stream/BufferServerSubscriberTest.java | 2 +- .../stram/stream/FastPublisherTest.java | 2 +- .../stram/stream/FastStreamTest.java | 2 +- .../stram/stream/InlineStreamTest.java | 8 +- .../stram/stream/OiOEndWindowTest.java | 4 +- .../stram/stream/OiOStreamTest.java | 14 ++-- .../stram/stream/SocketStreamTest.java | 2 +- .../ManualScheduledExecutorService.java | 2 +- .../stram/support/StramTestSupport.java | 6 +- .../stram/util/StablePriorityQueueTest.java | 4 +- .../stram/webapp/OperatorDiscoveryTest.java | 26 +++---- .../stram/webapp/StramWebServicesTest.java | 6 +- .../stram/webapp/TypeDiscoveryTest.java | 12 +-- 69 files changed, 252 insertions(+), 252 deletions(-) diff --git a/api/src/main/java/com/datatorrent/api/AffinityRule.java b/api/src/main/java/com/datatorrent/api/AffinityRule.java index 5e10ccd674..304c086368 100644 --- a/api/src/main/java/com/datatorrent/api/AffinityRule.java +++ b/api/src/main/java/com/datatorrent/api/AffinityRule.java @@ -92,7 +92,7 @@ public AffinityRule(Type type, Locality locality, boolean relaxLocality) public AffinityRule(Type type, Locality locality, boolean relaxLocality, String firstOperator, String... otherOperators) { this(type, locality, relaxLocality); - LinkedList operators = new LinkedList(); + LinkedList operators = new LinkedList<>(); if (firstOperator != null && otherOperators.length >= 1) { operators.add(firstOperator); diff --git a/api/src/main/java/com/datatorrent/api/Attribute.java b/api/src/main/java/com/datatorrent/api/Attribute.java index 2efc84fdb9..821ecb26f9 100644 --- a/api/src/main/java/com/datatorrent/api/Attribute.java +++ b/api/src/main/java/com/datatorrent/api/Attribute.java @@ -236,11 +236,11 @@ public boolean contains(Attribute key) */ public static class AttributeInitializer { - static final HashMap, Set>> map = new HashMap, Set>>(); + static final HashMap, Set>> map = new HashMap<>(); public static Map, Object> getAllAttributes(Context context, Class clazz) { - Map, Object> result = new HashMap, Object>(); + Map, Object> result = new HashMap<>(); try { for (Field f: clazz.getDeclaredFields()) { if (Modifier.isStatic(f.getModifiers()) && Attribute.class.isAssignableFrom(f.getType())) { @@ -273,7 +273,7 @@ public static long initialize(final Class clazz) if (map.containsKey(clazz)) { return 0; } - Set> set = new HashSet>(); + Set> set = new HashSet<>(); try { for (Field f: clazz.getDeclaredFields()) { if (Modifier.isStatic(f.getModifiers()) && Attribute.class.isAssignableFrom(f.getType())) { diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java index 3d3cffefad..94022ffa98 100644 --- a/api/src/main/java/com/datatorrent/api/Context.java +++ b/api/src/main/java/com/datatorrent/api/Context.java @@ -121,17 +121,17 @@ public interface PortContext extends Context /** * Number of tuples the poll buffer can cache without blocking the input stream to the port. */ - Attribute QUEUE_CAPACITY = new Attribute(1024); + Attribute QUEUE_CAPACITY = new Attribute<>(1024); /** * The amount of buffer memory this port requires. There is a buffer server in each container. This is used to calculate total buffer server memory for container. * Also due to the nature of the application, if buffer server needs to use more RAM, from time to time, this number may * not be adhered to. */ - Attribute BUFFER_MEMORY_MB = new Attribute(8 * 64); + Attribute BUFFER_MEMORY_MB = new Attribute<>(8 * 64); /** * Poll period in milliseconds when the port buffer reaches its limits. */ - Attribute SPIN_MILLIS = new Attribute(10); + Attribute SPIN_MILLIS = new Attribute<>(10); /** * Input port attribute. Extend partitioning of an upstream operator w/o intermediate merge. * Can be used to form parallel partitions that span a group of operators. @@ -139,7 +139,7 @@ public interface PortContext extends Context * If multiple ports of an operator have the setting, incoming streams must track back to * a common root partition, i.e. the operator join forks of the same origin. */ - Attribute PARTITION_PARALLEL = new Attribute(false); + Attribute PARTITION_PARALLEL = new Attribute<>(false); /** * Attribute of output port to specify how many partitions should be merged by a single unifier instance. If the * number of partitions exceeds the limit set, a cascading unifier plan will be created. For example, 4 partitions @@ -147,7 +147,7 @@ public interface PortContext extends Context * network I/O or other resource requirement for each unifier container (depends on the specific functionality of * the unifier), enabling horizontal scale by overcoming the single unifier bottleneck. */ - Attribute UNIFIER_LIMIT = new Attribute(Integer.MAX_VALUE); + Attribute UNIFIER_LIMIT = new Attribute<>(Integer.MAX_VALUE); /** * Attribute to specify that the final unifier be always a single unifier. This is useful when in MxN partitioning @@ -158,16 +158,16 @@ public interface PortContext extends Context * the inputs. In this case the default unifier behavior can be specified on the output port and individual * exceptions can be specified on the corresponding input ports. */ - Attribute UNIFIER_SINGLE_FINAL = new Attribute(Boolean.FALSE); + Attribute UNIFIER_SINGLE_FINAL = new Attribute<>(Boolean.FALSE); /** * Whether or not to auto record the tuples */ - Attribute AUTO_RECORD = new Attribute(false); + Attribute AUTO_RECORD = new Attribute<>(false); /** * Whether the output is unified. * This is a read-only attribute to query that whether the output of the operator from multiple instances is being unified. */ - Attribute IS_OUTPUT_UNIFIED = new Attribute(false); + Attribute IS_OUTPUT_UNIFIED = new Attribute<>(false); /** * Provide the codec which can be used to serialize or deserialize the data * that can be received on the port. If it is unspecified the engine may use @@ -193,13 +193,13 @@ public interface OperatorContext extends Context * of the operator. On subsequent run, it's the windowId of the checkpoint from which the operator state * is recovered. */ - Attribute ACTIVATION_WINDOW_ID = new Attribute(Stateless.WINDOW_ID); + Attribute ACTIVATION_WINDOW_ID = new Attribute<>(Stateless.WINDOW_ID); /** * It is a maximum poll period in milliseconds when there are no tuples available on any of the input ports of the * operator. Platform uses the heuristic to change poll period from 0 to SPIN_MILLIS seconds. * Default value is 10 milliseconds. */ - Attribute SPIN_MILLIS = new Attribute(10); + Attribute SPIN_MILLIS = new Attribute<>(10); /** * The maximum number of attempts to restart a failing operator before shutting down the application. * Until this number is reached, when an operator fails to start it is re-spawned in a new container. Once all the @@ -218,15 +218,15 @@ public interface OperatorContext extends Context * by the engine. The attribute is ignored when the operator was already declared stateless through the * {@link Stateless} annotation. */ - Attribute STATELESS = new Attribute(false); + Attribute STATELESS = new Attribute<>(false); /** * Memory resource that the operator requires for optimal functioning. Used to calculate total memory requirement for containers. */ - Attribute MEMORY_MB = new Attribute(1024); + Attribute MEMORY_MB = new Attribute<>(1024); /** * CPU Cores that the operator requires for optimal functioning. Used to calculate total CPU Cores requirement for containers. */ - Attribute VCORES = new Attribute(0); + Attribute VCORES = new Attribute<>(0); /** * The options to be pass to JVM when launching the operator. Options such as java maximum heap size can be specified here. @@ -235,7 +235,7 @@ public interface OperatorContext extends Context /** * Attribute of the operator that tells the platform how many streaming windows make 1 application window. */ - Attribute APPLICATION_WINDOW_COUNT = new Attribute(1); + Attribute APPLICATION_WINDOW_COUNT = new Attribute<>(1); /** * When set it changes the computation to sliding window computation where duration is determined using {@link #APPLICATION_WINDOW_COUNT} that is * slided by duration determined using value of this attribute. Default value is null which is equivalent to that of {@link #APPLICATION_WINDOW_COUNT}. @@ -251,7 +251,7 @@ public interface OperatorContext extends Context * value. Typically user would define this value to be the same as that of APPLICATION_WINDOW_COUNT so checkpointing * will be done at application window boundary. */ - Attribute CHECKPOINT_WINDOW_COUNT = new Attribute(1); + Attribute CHECKPOINT_WINDOW_COUNT = new Attribute<>(1); /** * Name of host to directly control locality of an operator. Complementary to stream locality (NODE_LOCAL affinity). * For example, the user may wish to specify a locality constraint for an input operator relative to its data source. @@ -274,18 +274,18 @@ public interface OperatorContext extends Context * If the processing mode for an operator is specified as EXACTLY_ONCE then the processing mode for all downstream operators * should be specified as AT_MOST_ONCE otherwise it will result in an error. */ - Attribute PROCESSING_MODE = new Attribute(ProcessingMode.AT_LEAST_ONCE); + Attribute PROCESSING_MODE = new Attribute<>(ProcessingMode.AT_LEAST_ONCE); /** * Timeout to identify stalled processing, specified as count of streaming windows. If the last processed * window does not advance within the specified timeout count, the operator will be considered stuck and the * container restart. There are multiple reasons this could happen: clock drift, hardware issue, networking issue, * blocking operator logic, etc. */ - Attribute TIMEOUT_WINDOW_COUNT = new Attribute(120); + Attribute TIMEOUT_WINDOW_COUNT = new Attribute<>(120); /** * Whether or not to auto record the tuples */ - Attribute AUTO_RECORD = new Attribute(false); + Attribute AUTO_RECORD = new Attribute<>(false); /** * How the operator distributes its state and share the input can be influenced by setting the Partitioner attribute. * If this attribute is set to non null value, the instance of the partitioner is used to partition and merge the @@ -348,7 +348,7 @@ interface DAGContext extends Context * Name under which the application will be shown in the resource manager. * If not set, the default is the configuration Java class or property file name. */ - Attribute APPLICATION_NAME = new Attribute("unknown-application-name"); + Attribute APPLICATION_NAME = new Attribute<>("unknown-application-name"); /** * URL to the application's documentation. */ @@ -387,7 +387,7 @@ interface DAGContext extends Context /** * Dump extra debug information in launcher, master and containers. */ - Attribute DEBUG = new Attribute(false); + Attribute DEBUG = new Attribute<>(false); /** * The options to be pass to JVM when launching the containers. Options such as java maximum heap size can be specified here. */ @@ -396,20 +396,20 @@ interface DAGContext extends Context * The amount of memory to be requested for the application master. Not used in local mode. * Default value is 1GB. */ - Attribute MASTER_MEMORY_MB = new Attribute(1024); + Attribute MASTER_MEMORY_MB = new Attribute<>(1024); /** * Where to spool the data once the buffer server capacity is reached. */ - Attribute BUFFER_SPOOLING = new Attribute(true); + Attribute BUFFER_SPOOLING = new Attribute<>(true); /** * The streaming window size to use for the application. It is specified in milliseconds. Default value is 500ms. */ - Attribute STREAMING_WINDOW_SIZE_MILLIS = new Attribute(500); + Attribute STREAMING_WINDOW_SIZE_MILLIS = new Attribute<>(500); /** * The time interval for saving the operator state. It is specified as a multiple of streaming windows. The operator * state is saved periodically with interval equal to the checkpoint interval. Default value is 60 streaming windows. */ - Attribute CHECKPOINT_WINDOW_COUNT = new Attribute(60); + Attribute CHECKPOINT_WINDOW_COUNT = new Attribute<>(60); /** * The path to store application dependencies, recording and other generated files for application master and containers. */ @@ -418,13 +418,13 @@ interface DAGContext extends Context * The size limit for a file where tuple recordings are stored. When tuples are being recorded they are stored * in files. When a file size reaches this limit a new file is created and tuples start getting stored in the new file. Default value is 128k. */ - Attribute TUPLE_RECORDING_PART_FILE_SIZE = new Attribute(128 * 1024); + Attribute TUPLE_RECORDING_PART_FILE_SIZE = new Attribute<>(128 * 1024); /** * The time limit for a file where tuple recordings are stored. When tuples are being recorded they are stored * in files. When a tuple recording file creation time falls beyond the time limit window from the current time a new file * is created and the tuples start getting stored in the new file. Default value is 30hrs. */ - Attribute TUPLE_RECORDING_PART_FILE_TIME_MILLIS = new Attribute(30 * 60 * 60 * 1000); + Attribute TUPLE_RECORDING_PART_FILE_TIME_MILLIS = new Attribute<>(30 * 60 * 60 * 1000); /** * Address to which the application side connects to DT Gateway, in the form of host:port. This will override "dt.gateway.listenAddress" in the configuration. */ @@ -432,7 +432,7 @@ interface DAGContext extends Context /** * Whether or not gateway is expecting SSL connection. */ - Attribute GATEWAY_USE_SSL = new Attribute(false); + Attribute GATEWAY_USE_SSL = new Attribute<>(false); /** * The username for logging in to the gateway, if authentication is enabled. */ @@ -448,48 +448,48 @@ interface DAGContext extends Context /** * Maximum number of simultaneous heartbeat connections to process. Default value is 30. */ - Attribute HEARTBEAT_LISTENER_THREAD_COUNT = new Attribute(30); + Attribute HEARTBEAT_LISTENER_THREAD_COUNT = new Attribute<>(30); /** * How frequently should operators heartbeat to stram. Recommended setting is * 1000ms. Value 0 will disable heartbeat (for unit testing). Default value is 1000ms. */ - Attribute HEARTBEAT_INTERVAL_MILLIS = new Attribute(1000); + Attribute HEARTBEAT_INTERVAL_MILLIS = new Attribute<>(1000); /** * Timeout for master to identify a hung container (full GC etc.). Timeout will result in container restart. * Default value is 30s. */ - Attribute HEARTBEAT_TIMEOUT_MILLIS = new Attribute(30 * 1000); + Attribute HEARTBEAT_TIMEOUT_MILLIS = new Attribute<>(30 * 1000); /** * Timeout for allocating container resources. Default value is 60s. */ - Attribute RESOURCE_ALLOCATION_TIMEOUT_MILLIS = new Attribute(Integer.MAX_VALUE); + Attribute RESOURCE_ALLOCATION_TIMEOUT_MILLIS = new Attribute<>(Integer.MAX_VALUE); /** * Maximum number of windows that can be pending for statistics calculation. Statistics are computed when * the metrics are available from all operators for a window. If the information is not available from all operators then * the window is pending. When the number of pending windows reaches this limit the information for the oldest window * is purged. Default value is 1000 windows. */ - Attribute STATS_MAX_ALLOWABLE_WINDOWS_LAG = new Attribute(1000); + Attribute STATS_MAX_ALLOWABLE_WINDOWS_LAG = new Attribute<>(1000); /** * Whether or not we record statistics. The statistics are recorded for each heartbeat if enabled. The default value is false. */ - Attribute ENABLE_STATS_RECORDING = new Attribute(false); + Attribute ENABLE_STATS_RECORDING = new Attribute<>(false); /** * The time interval for throughput calculation. The throughput is periodically calculated with interval greater than or * equal to the throughput calculation interval. The default value is 10s. */ - Attribute THROUGHPUT_CALCULATION_INTERVAL = new Attribute(10000); + Attribute THROUGHPUT_CALCULATION_INTERVAL = new Attribute<>(10000); /** * The maximum number of samples to use when calculating throughput. In practice fewer samples may be used * if the THROUGHPUT_CALCULATION_INTERVAL is exceeded. Default value is 1000 samples. */ - Attribute THROUGHPUT_CALCULATION_MAX_SAMPLES = new Attribute(1000); + Attribute THROUGHPUT_CALCULATION_MAX_SAMPLES = new Attribute<>(1000); /** * The number of samples to use when using RPC latency to compensate for clock skews and network latency when * calculating stats. Specify 0 if RPC latency should not be used at all to calculate stats. Default value is 100 * samples. */ - Attribute RPC_LATENCY_COMPENSATION_SAMPLES = new Attribute(100); + Attribute RPC_LATENCY_COMPENSATION_SAMPLES = new Attribute<>(100); /** * The agent which can be used to find the jvm options for the container. */ @@ -511,12 +511,12 @@ interface DAGContext extends Context * blacklisting of nodes by application master * Blacklisting for nodes is disabled for the default value */ - Attribute MAX_CONSECUTIVE_CONTAINER_FAILURES_FOR_BLACKLIST = new Attribute(Integer.MAX_VALUE); + Attribute MAX_CONSECUTIVE_CONTAINER_FAILURES_FOR_BLACKLIST = new Attribute<>(Integer.MAX_VALUE); /** * The amount of time to wait before removing failed nodes from blacklist */ - Attribute BLACKLISTED_NODE_REMOVAL_TIME_MILLIS = new Attribute(new Long(60 * 60 * 1000)); + Attribute BLACKLISTED_NODE_REMOVAL_TIME_MILLIS = new Attribute<>(new Long(60 * 60 * 1000)); /** * Affinity rules for specifying affinity and anti-affinity between logical operators diff --git a/api/src/main/java/com/datatorrent/api/StringCodec.java b/api/src/main/java/com/datatorrent/api/StringCodec.java index d4a0a41c35..fa8ab2342d 100644 --- a/api/src/main/java/com/datatorrent/api/StringCodec.java +++ b/api/src/main/java/com/datatorrent/api/StringCodec.java @@ -302,7 +302,7 @@ public T fromString(String string) return clazz.getConstructor(String.class).newInstance(parts[1]); } else { T object = clazz.getConstructor(String.class).newInstance(parts[1]); - HashMap hashMap = new HashMap(); + HashMap hashMap = new HashMap<>(); for (int i = 2; i < parts.length; i++) { String[] keyValPair = parts[i].split(propertySeparator, 2); hashMap.put(keyValPair[0], keyValPair[1]); @@ -365,11 +365,11 @@ public Map fromString(String string) } if (string.isEmpty()) { - return new HashMap(); + return new HashMap<>(); } String[] parts = string.split(separator); - HashMap map = new HashMap(); + HashMap map = new HashMap<>(); for (String part : parts) { String[] kvpair = part.split(equal, 2); map.put(keyCodec.fromString(kvpair[0]), valueCodec.fromString(kvpair[1])); @@ -433,7 +433,7 @@ public Collection fromString(String string) } String[] parts = string.split(separator); - ArrayList arrayList = new ArrayList(parts.length); + ArrayList arrayList = new ArrayList<>(parts.length); for (String part : parts) { arrayList.add(codec.fromString(part)); } diff --git a/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java b/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java index 82cf50ebeb..8ff0205288 100644 --- a/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java +++ b/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java @@ -36,17 +36,17 @@ public abstract class YarnAppLauncher e /** * Parameter to specify extra jars for launch. */ - public static final Attribute LIB_JARS = new Attribute(new StringCodec.String2String()); + public static final Attribute LIB_JARS = new Attribute<>(new StringCodec.String2String()); /** * Parameter to specify the previous application id to use to resume launch from. */ - public static final Attribute ORIGINAL_APP_ID = new Attribute(new StringCodec.String2String()); + public static final Attribute ORIGINAL_APP_ID = new Attribute<>(new StringCodec.String2String()); /** * Parameter to specify the queue name to use for launch. */ - public static final Attribute QUEUE_NAME = new Attribute(new StringCodec.String2String()); + public static final Attribute QUEUE_NAME = new Attribute<>(new StringCodec.String2String()); static { Attribute.AttributeMap.AttributeInitializer.initialize(YarnAppLauncher.class); diff --git a/api/src/test/java/com/datatorrent/api/AttributeMapTest.java b/api/src/test/java/com/datatorrent/api/AttributeMapTest.java index fcb1809d3b..b463619e45 100644 --- a/api/src/test/java/com/datatorrent/api/AttributeMapTest.java +++ b/api/src/test/java/com/datatorrent/api/AttributeMapTest.java @@ -51,7 +51,7 @@ enum Greeting interface iface { - Attribute greeting = new Attribute(Greeting.hello); + Attribute greeting = new Attribute<>(Greeting.hello); } @Test diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java index 84999fa237..d08b9fc31a 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java @@ -336,14 +336,14 @@ public void addDataListener(DataListener dl) { all_listeners.add(dl); //logger.debug("total {} listeners {} -> {}", all_listeners.size(), dl, this); - ArrayList partitions = new ArrayList(); + ArrayList partitions = new ArrayList<>(); if (dl.getPartitions(partitions) > 0) { for (BitVector partition : partitions) { HashSet set; if (listeners.containsKey(partition)) { set = listeners.get(partition); } else { - set = new HashSet(); + set = new HashSet<>(); listeners.put(partition, set); } set.add(dl); @@ -353,7 +353,7 @@ public void addDataListener(DataListener dl) if (listeners.containsKey(DataListener.NULL_PARTITION)) { set = listeners.get(DataListener.NULL_PARTITION); } else { - set = new HashSet(); + set = new HashSet<>(); listeners.put(DataListener.NULL_PARTITION, set); } @@ -363,7 +363,7 @@ public void addDataListener(DataListener dl) public void removeDataListener(DataListener dl) { - ArrayList partitions = new ArrayList(); + ArrayList partitions = new ArrayList<>(); if (dl.getPartitions(partitions) > 0) { for (BitVector partition : partitions) { if (listeners.containsKey(partition)) { @@ -459,7 +459,7 @@ public Status getStatus() // When the number of subscribers becomes high or the number of blocks becomes high, consider optimize it. Block b = first; - Map indices = new HashMap(); + Map indices = new HashMap<>(); int i = 0; while (b != null) { indices.put(b, i++); diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java index 08a483ab13..b06e60a1b8 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java @@ -71,8 +71,8 @@ public LogicalNode(String identifier, String upstream, String group, DataListIte this.identifier = identifier; this.upstream = upstream; this.group = group; - this.physicalNodes = new HashSet(); - this.partitions = new HashSet(); + this.physicalNodes = new HashSet<>(); + this.partitions = new HashSet<>(); this.iterator = iterator; this.skipWindowId = skipWindowId; this.eventloop = eventloop; diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index 7ac518b06f..8a56b5108c 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -260,7 +260,7 @@ public String toString() } private final ConcurrentHashMap publisherBuffers = new ConcurrentHashMap<>(1, 0.75f, 1); - private final ConcurrentHashMap subscriberGroups = new ConcurrentHashMap(); + private final ConcurrentHashMap subscriberGroups = new ConcurrentHashMap<>(); private final ConcurrentHashMap publisherChannels = new ConcurrentHashMap<>(); private final int blockSize; private final int numberOfCacheBlocks; @@ -883,7 +883,7 @@ private void teardown() } } - ArrayList list = new ArrayList(); + ArrayList list = new ArrayList<>(); String publisherIdentifier = datalist.getIdentifier(); Iterator iterator = subscriberGroups.values().iterator(); while (iterator.hasNext()) { diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java index 124cc5fa65..000ce00154 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java @@ -31,7 +31,7 @@ */ public class System { - private static final HashMap eventloops = new HashMap(); + private static final HashMap eventloops = new HashMap<>(); public static void startup(String identifier) { diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java index f7b882907b..371f98acff 100644 --- a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java +++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java @@ -42,7 +42,7 @@ public void testGetSerializedRequest() String down_type = "SubscriberId/StreamType"; String upstream_id = "PublisherId"; int mask = 7; - ArrayList partitions = new ArrayList(); + ArrayList partitions = new ArrayList<>(); partitions.add(5); long startingWindowId = 0xcafebabe00000078L; diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java index 3c0cb0e564..d6e9b1aa11 100644 --- a/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java +++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java @@ -32,7 +32,7 @@ */ public class Subscriber extends com.datatorrent.bufferserver.client.Subscriber { - public final ArrayList resetPayloads = new ArrayList(); + public final ArrayList resetPayloads = new ArrayList<>(); public AtomicInteger tupleCount = new AtomicInteger(0); public WindowIdHolder firstPayload; public WindowIdHolder lastPayload; diff --git a/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java b/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java index 165d8cf3cf..d77b1ae325 100644 --- a/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java +++ b/common/src/main/java/com/datatorrent/common/partitioner/StatelessPartitioner.java @@ -114,7 +114,7 @@ public Collection> definePartitions(Collection> partit newPartitions = Lists.newArrayList(); for (int partitionCounter = 0; partitionCounter < newPartitionCount; partitionCounter++) { - newPartitions.add(new DefaultPartition(partition.getPartitionedInstance())); + newPartitions.add(new DefaultPartition<>(partition.getPartitionedInstance())); } // partition the stream that was first connected in the DAG and send full data to remaining input ports @@ -156,8 +156,8 @@ public void partitioned(Map> partitions) */ public static Collection> repartition(Collection> partitions) { - List> newPartitions = new ArrayList>(); - HashMap> lowLoadPartitions = new HashMap>(); + List> newPartitions = new ArrayList<>(); + HashMap> lowLoadPartitions = new HashMap<>(); for (Partition p: partitions) { int load = p.getLoad(); if (load < 0) { @@ -201,7 +201,7 @@ public static Collection> repartition(Collecti } for (int key: newKeys) { - Partition newPartition = new DefaultPartition(p.getPartitionedInstance()); + Partition newPartition = new DefaultPartition<>(p.getPartitionedInstance()); newPartition.getPartitionKeys().put(e.getKey(), new PartitionKeys(newMask, Sets.newHashSet(key))); newPartitions.add(newPartition); } @@ -224,8 +224,8 @@ public static Collection> repartition(Collecti */ public static Collection> repartitionInputOperator(Collection> partitions) { - List> newPartitions = new ArrayList>(); - List> lowLoadPartitions = new ArrayList>(); + List> newPartitions = new ArrayList<>(); + List> lowLoadPartitions = new ArrayList<>(); for (Partition p: partitions) { int load = p.getLoad(); if (load < 0) { @@ -235,8 +235,8 @@ public static Collection> repartitionInputOper lowLoadPartitions.add(p); } } else if (load > 0) { - newPartitions.add(new DefaultPartition(p.getPartitionedInstance())); - newPartitions.add(new DefaultPartition(p.getPartitionedInstance())); + newPartitions.add(new DefaultPartition<>(p.getPartitionedInstance())); + newPartitions.add(new DefaultPartition<>(p.getPartitionedInstance())); } else { newPartitions.add(p); } @@ -274,7 +274,7 @@ public static Collection> repartitionParallel( T anOperator = newPartitions.iterator().next().getPartitionedInstance(); while (morePartitionsToCreate-- > 0) { - DefaultPartition partition = new DefaultPartition(anOperator); + DefaultPartition partition = new DefaultPartition<>(anOperator); newPartitions.add(partition); } } diff --git a/common/src/main/java/com/datatorrent/common/security/SecurityContext.java b/common/src/main/java/com/datatorrent/common/security/SecurityContext.java index dccd7b79f7..3dc4dda214 100644 --- a/common/src/main/java/com/datatorrent/common/security/SecurityContext.java +++ b/common/src/main/java/com/datatorrent/common/security/SecurityContext.java @@ -32,17 +32,17 @@ public interface SecurityContext extends Context /** * Attribute for the user name for login. */ - Attribute USER_NAME = new Attribute((String)null); + Attribute USER_NAME = new Attribute<>((String)null); /** * Attribute for the password for login. */ - Attribute PASSWORD = new Attribute((char[])null); + Attribute PASSWORD = new Attribute<>((char[])null); /** * Attribute for the realm for login. */ - Attribute REALM = new Attribute((String)null); + Attribute REALM = new Attribute<>((String)null); } diff --git a/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java b/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java index ca7490db2c..f90a888e4c 100644 --- a/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java +++ b/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java @@ -48,7 +48,7 @@ public void process(T tuple) } }; - public transient DefaultOutputPort output = new DefaultOutputPort(); + public transient DefaultOutputPort output = new DefaultOutputPort<>(); protected List lastWindowTuples = new ArrayList<>(); diff --git a/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java b/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java index 7723fed803..ef837a86b7 100644 --- a/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java +++ b/common/src/main/java/com/datatorrent/common/util/JacksonObjectMapperProvider.java @@ -55,9 +55,9 @@ public JacksonObjectMapperProvider() this.objectMapper = new ObjectMapper(); objectMapper.configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, true); objectMapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false); - module.addSerializer(ObjectMapperString.class, new RawSerializer(Object.class)); - module.addSerializer(JSONObject.class, new RawSerializer(Object.class)); - module.addSerializer(JSONArray.class, new RawSerializer(Object.class)); + module.addSerializer(ObjectMapperString.class, new RawSerializer<>(Object.class)); + module.addSerializer(JSONObject.class, new RawSerializer<>(Object.class)); + module.addSerializer(JSONArray.class, new RawSerializer<>(Object.class)); objectMapper.registerModule(module); } diff --git a/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java b/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java index 63d16469d9..af5e10edee 100644 --- a/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java +++ b/common/src/main/java/com/datatorrent/common/util/PubSubMessageCodec.java @@ -53,7 +53,7 @@ public PubSubMessageCodec(ObjectMapper mapper) */ public static String constructPublishMessage(String topic, T data, PubSubMessageCodec codec) throws IOException { - PubSubMessage pubSubMessage = new PubSubMessage(); + PubSubMessage pubSubMessage = new PubSubMessage<>(); pubSubMessage.setType(PubSubMessageType.PUBLISH); pubSubMessage.setTopic(topic); pubSubMessage.setData(data); @@ -72,7 +72,7 @@ public static String constructPublishMessage(String topic, T data, PubSubMes */ public static String constructSubscribeMessage(String topic, PubSubMessageCodec codec) throws IOException { - PubSubMessage pubSubMessage = new PubSubMessage(); + PubSubMessage pubSubMessage = new PubSubMessage<>(); pubSubMessage.setType(PubSubMessageType.SUBSCRIBE); pubSubMessage.setTopic(topic); @@ -90,7 +90,7 @@ public static String constructSubscribeMessage(String topic, PubSubMessageCo */ public static String constructUnsubscribeMessage(String topic, PubSubMessageCodec codec) throws IOException { - PubSubMessage pubSubMessage = new PubSubMessage(); + PubSubMessage pubSubMessage = new PubSubMessage<>(); pubSubMessage.setType(PubSubMessageType.UNSUBSCRIBE); pubSubMessage.setTopic(topic); @@ -108,7 +108,7 @@ public static String constructUnsubscribeMessage(String topic, PubSubMessage */ public static String constructSubscribeNumSubscribersMessage(String topic, PubSubMessageCodec codec) throws IOException { - PubSubMessage pubSubMessage = new PubSubMessage(); + PubSubMessage pubSubMessage = new PubSubMessage<>(); pubSubMessage.setType(PubSubMessageType.SUBSCRIBE_NUM_SUBSCRIBERS); pubSubMessage.setTopic(topic); @@ -126,7 +126,7 @@ public static String constructSubscribeNumSubscribersMessage(String topic, P */ public static String constructUnsubscribeNumSubscribersMessage(String topic, PubSubMessageCodec codec) throws IOException { - PubSubMessage pubSubMessage = new PubSubMessage(); + PubSubMessage pubSubMessage = new PubSubMessage<>(); pubSubMessage.setType(PubSubMessageType.UNSUBSCRIBE_NUM_SUBSCRIBERS); pubSubMessage.setTopic(topic); @@ -135,7 +135,7 @@ public static String constructUnsubscribeNumSubscribersMessage(String topic, public String formatMessage(PubSubMessage pubSubMessage) throws IOException { - HashMap map = new HashMap(); + HashMap map = new HashMap<>(); map.put(PubSubMessage.TYPE_KEY, pubSubMessage.getType().getIdentifier()); map.put(PubSubMessage.TOPIC_KEY, pubSubMessage.getTopic()); T data = pubSubMessage.getData(); @@ -156,7 +156,7 @@ public String formatMessage(PubSubMessage pubSubMessage) throws IOException public PubSubMessage parseMessage(String message) throws IOException { HashMap map = mapper.readValue(message, HashMap.class); - PubSubMessage pubSubMessage = new PubSubMessage(); + PubSubMessage pubSubMessage = new PubSubMessage<>(); pubSubMessage.setType(PubSubMessageType.getPubSubMessageType((String)map.get(PubSubMessage.TYPE_KEY))); pubSubMessage.setTopic((String)map.get(PubSubMessage.TOPIC_KEY)); pubSubMessage.setData((T)map.get(PubSubMessage.DATA_KEY)); diff --git a/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java b/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java index e7c4887d95..2e48f54d6b 100644 --- a/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java +++ b/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java @@ -41,7 +41,7 @@ public class StatelessPartitionerTest public static class DummyOperator implements Operator { - public final DefaultOutputPort output = new DefaultOutputPort(); + public final DefaultOutputPort output = new DefaultOutputPort<>(); private Integer value; @@ -93,10 +93,10 @@ public int getValue() public void partition1Test() { DummyOperator dummyOperator = new DummyOperator(5); - StatelessPartitioner statelessPartitioner = new StatelessPartitioner(); + StatelessPartitioner statelessPartitioner = new StatelessPartitioner<>(); Collection> partitions = Lists.newArrayList(); - DefaultPartition defaultPartition = new DefaultPartition(dummyOperator); + DefaultPartition defaultPartition = new DefaultPartition<>(dummyOperator); partitions.add(defaultPartition); Collection> newPartitions = statelessPartitioner.definePartitions(partitions, new PartitioningContextImpl(null, 0)); @@ -111,10 +111,10 @@ public void partition1Test() public void partition5Test() { DummyOperator dummyOperator = new DummyOperator(5); - StatelessPartitioner statelessPartitioner = new StatelessPartitioner(5); + StatelessPartitioner statelessPartitioner = new StatelessPartitioner<>(5); Collection> partitions = Lists.newArrayList(); - DefaultPartition defaultPartition = new DefaultPartition(dummyOperator); + DefaultPartition defaultPartition = new DefaultPartition<>(dummyOperator); partitions.add(defaultPartition); Collection> newPartitions = statelessPartitioner.definePartitions(partitions, new PartitioningContextImpl(null, 0)); @@ -137,10 +137,10 @@ public void objectPropertyTest() public void testParallelPartitionScaleUP() { DummyOperator dummyOperator = new DummyOperator(5); - StatelessPartitioner statelessPartitioner = new StatelessPartitioner(); + StatelessPartitioner statelessPartitioner = new StatelessPartitioner<>(); Collection> partitions = Lists.newArrayList(); - partitions.add(new DefaultPartition(dummyOperator)); + partitions.add(new DefaultPartition<>(dummyOperator)); Collection> newPartitions = statelessPartitioner.definePartitions(partitions, new PartitioningContextImpl(null, 5)); @@ -151,12 +151,12 @@ public void testParallelPartitionScaleUP() public void testParallelPartitionScaleDown() { DummyOperator dummyOperator = new DummyOperator(5); - StatelessPartitioner statelessPartitioner = new StatelessPartitioner(); + StatelessPartitioner statelessPartitioner = new StatelessPartitioner<>(); Collection> partitions = Lists.newArrayList(); for (int i = 5; i-- > 0; ) { - partitions.add(new DefaultPartition(dummyOperator)); + partitions.add(new DefaultPartition<>(dummyOperator)); } Collection> newPartitions = statelessPartitioner.definePartitions(partitions, diff --git a/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java b/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java index 97debe3364..79fdac72ff 100644 --- a/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java +++ b/common/src/test/java/com/datatorrent/common/util/SerializableObjectTest.java @@ -50,7 +50,7 @@ public void process(T tuple) } }; - public final transient OutputPort output = new DefaultOutputPort(); + public final transient OutputPort output = new DefaultOutputPort<>(); private int i; public void setI(int i) @@ -109,7 +109,7 @@ public String toString() @Test public void testReadResolve() throws Exception { - SerializableOperator pre = new SerializableOperator(); + SerializableOperator pre = new SerializableOperator<>(); pre.setI(10); FileOutputStream fos = new FileOutputStream(filename); diff --git a/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java b/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java index 53d91a5b95..80314c723b 100644 --- a/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java +++ b/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java @@ -73,7 +73,7 @@ public void reissueContainerRequests(AMRMClient amRmClient, Ma for (ContainerRequest cr : requests) { ContainerStartRequest csr = hostSpecificRequests.get(cr); ContainerRequest newCr = new ContainerRequest(cr.getCapability(), null, null, cr.getPriority()); - MutablePair pair = new MutablePair(loopCounter, newCr); + MutablePair pair = new MutablePair<>(loopCounter, newCr); requestedResources.put(csr, pair); containerRequests.add(newCr); hostSpecificRequests.remove(cr); @@ -91,7 +91,7 @@ public void reissueContainerRequests(AMRMClient amRmClient, Ma for (Entry entry : otherContainerRequests.entrySet()) { ContainerRequest cr = entry.getKey(); ContainerStartRequest csr = entry.getValue(); - MutablePair pair = new MutablePair(loopCounter, cr); + MutablePair pair = new MutablePair<>(loopCounter, cr); requestedResources.put(csr, pair); containerRequests.add(cr); } diff --git a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java index e7f9672217..45206bca9d 100644 --- a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java +++ b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java @@ -102,7 +102,7 @@ public void reissueContainerRequests(AMRMClient amRmClient, Ma */ public void addContainerRequest(Map> requestedResources, int loopCounter, List containerRequests, StreamingContainerAgent.ContainerStartRequest csr, ContainerRequest cr) { - MutablePair pair = new MutablePair(loopCounter, cr); + MutablePair pair = new MutablePair<>(loopCounter, cr); requestedResources.put(csr, pair); containerRequests.add(cr); } @@ -164,7 +164,7 @@ public void updateNodeReports(List nodeReports) public List getNodesExceptHost(List hostNames) { - List nodesList = new ArrayList(); + List nodesList = new ArrayList<>(); Set hostNameSet = Sets.newHashSet(); hostNameSet.addAll(hostNames); for (String host : nodeReportMap.keySet()) { diff --git a/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java b/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java index 3a61ee6102..b3744471f8 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java +++ b/engine/src/main/java/com/datatorrent/stram/client/PermissionsInfo.java @@ -32,10 +32,10 @@ public class PermissionsInfo { - private final Set readOnlyRoles = new TreeSet(); - private final Set readOnlyUsers = new TreeSet(); - private final Set readWriteRoles = new TreeSet(); - private final Set readWriteUsers = new TreeSet(); + private final Set readOnlyRoles = new TreeSet<>(); + private final Set readOnlyUsers = new TreeSet<>(); + private final Set readWriteRoles = new TreeSet<>(); + private final Set readWriteUsers = new TreeSet<>(); private boolean readOnlyEveryone = false; private boolean readWriteEveryone = false; diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java index 050729dd14..8076d4abc5 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java +++ b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java @@ -271,7 +271,7 @@ private Token getRMHAToken(org.apache.hadoop.yarn.a } Text rmTokenService = new Text(Joiner.on(',').join(services)); - return new Token( + return new Token<>( rmDelegationToken.getIdentifier().array(), rmDelegationToken.getPassword().array(), new Text(rmDelegationToken.getKind()), diff --git a/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java b/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java index 7113280dc1..284aefb3f8 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java @@ -50,8 +50,8 @@ public class OperatorContext extends BaseContext implements Context.OperatorCont private final int id; private final String name; // the size of the circular queue should be configurable. hardcoded to 1024 for now. - private final CircularBuffer statsBuffer = new CircularBuffer(1024); - private final CircularBuffer requests = new CircularBuffer(1024); + private final CircularBuffer statsBuffer = new CircularBuffer<>(1024); + private final CircularBuffer requests = new CircularBuffer<>(1024); public final boolean stateless; private int windowsFromCheckpoint; diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index 401eea94cb..62c4fd8308 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -760,7 +760,7 @@ private void addStreamCodec(InputPortMeta sinkToPersistPortMeta, InputPort po codecs.put(sinkToPersistPortMeta, inputStreamCodec); InputPortMeta persistOperatorPortMeta = assertGetPortMeta(port); StreamCodec specifiedCodecForPersistOperator = (StreamCodec)persistOperatorPortMeta.getStreamCodec(); - StreamCodecWrapperForPersistance codec = new StreamCodecWrapperForPersistance(codecs, specifiedCodecForPersistOperator); + StreamCodecWrapperForPersistance codec = new StreamCodecWrapperForPersistance<>(codecs, specifiedCodecForPersistOperator); persistOperatorPortMeta.setStreamCodec(codec); } } @@ -1907,14 +1907,14 @@ private void validateAffinityRules() HashMap antiAffinities = new HashMap<>(); HashMap threadLocalAffinities = new HashMap<>(); - List operatorNames = new ArrayList(); + List operatorNames = new ArrayList<>(); for (OperatorMeta operator : getAllOperators()) { operatorNames.add(operator.getName()); - Set containerSet = new HashSet(); + Set containerSet = new HashSet<>(); containerSet.add(operator.getName()); containerAffinities.put(operator.getName(), containerSet); - Set nodeSet = new HashSet(); + Set nodeSet = new HashSet<>(); nodeSet.add(operator.getName()); nodeAffinities.put(operator.getName(), nodeSet); @@ -2073,7 +2073,7 @@ public void combineSets(HashMap> containerAffinities, Operat */ public void convertRegexToList(List operatorNames, AffinityRule rule) { - List operators = new LinkedList(); + List operators = new LinkedList<>(); Pattern p = Pattern.compile(rule.getOperatorRegex()); for (String name : operatorNames) { if (p.matcher(name).matches()) { diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java index ce22bfd3da..a1da94a329 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java @@ -477,13 +477,13 @@ public PhysicalPlan(LogicalPlan dag, PlanContext ctx) // Log container anti-affinity if (LOG.isDebugEnabled()) { for (PTContainer container : containers) { - List antiOperators = new ArrayList(); + List antiOperators = new ArrayList<>(); for (PTContainer c : container.getStrictAntiPrefs()) { for (PTOperator operator : c.getOperators()) { antiOperators.add(operator.getName()); } } - List containerOperators = new ArrayList(); + List containerOperators = new ArrayList<>(); for (PTOperator operator : container.getOperators()) { containerOperators.add(operator.getName()); } diff --git a/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java b/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java index 38c0f4109d..47986cb892 100644 --- a/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java +++ b/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java @@ -116,10 +116,10 @@ public void onError(Throwable t) */ public PubSubWebSocketClient() { - throwable = new AtomicReference(); + throwable = new AtomicReference<>(); ioThreadMultiplier = 1; mapper = (new JacksonObjectMapperProvider()).getContext(null); - codec = new PubSubMessageCodec(mapper); + codec = new PubSubMessageCodec<>(mapper); AsyncHttpClientConfigBean config = new AsyncHttpClientConfigBean(); config.setIoThreadMultiplier(ioThreadMultiplier); diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java b/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java index dd75857e21..9fbb54d429 100644 --- a/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java +++ b/engine/src/main/java/com/datatorrent/stram/webapp/asm/CompactUtil.java @@ -148,7 +148,7 @@ private static void setAnnotationNode(FieldNode fn, CompactFieldNode cfn) List annotations = new LinkedList<>(); for (Object visibleAnnotation : fn.visibleAnnotations) { CompactAnnotationNode node = new CompactAnnotationNode(); - Map annotationMap = new HashMap(); + Map annotationMap = new HashMap<>(); if (visibleAnnotation instanceof AnnotationNode) { AnnotationNode annotation = (AnnotationNode)visibleAnnotation; if (annotation.desc.contains("InputPortFieldAnnotation") diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java b/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java index 1e87b312d2..91a3cf3e90 100644 --- a/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java +++ b/engine/src/main/java/com/datatorrent/stram/webapp/asm/Type.java @@ -87,7 +87,7 @@ class WildcardTypeNode implements Type char boundChar; - ArrayList bounds = new ArrayList(); + ArrayList bounds = new ArrayList<>(); public Type[] getUpperBounds() { @@ -154,7 +154,7 @@ public TypeNode getRawTypeBound() class ParameterizedTypeNode extends TypeNode { - ArrayList actualTypeArguments = new ArrayList(); + ArrayList actualTypeArguments = new ArrayList<>(); public Type[] getActualTypeArguments() { diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java index d7f96d4c17..0c997ec9af 100644 --- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java +++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java @@ -78,7 +78,7 @@ public class CheckpointTest private static class MockInputOperator extends BaseOperator implements InputOperator, Operator.CheckpointNotificationListener { @OutputPortFieldAnnotation( optional = true) - public final transient DefaultOutputPort outport = new DefaultOutputPort(); + public final transient DefaultOutputPort outport = new DefaultOutputPort<>(); private transient int windowCount; private int checkpointState; @@ -326,7 +326,7 @@ public void testUpdateCheckpointsRecovery() public List getCheckpoints(Long... windowIds) { - List list = new ArrayList(windowIds.length); + List list = new ArrayList<>(windowIds.length); for (Long windowId : windowIds) { list.add(new Checkpoint(windowId, 0, 0)); } diff --git a/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java b/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java index b1509e0832..b1f3363b08 100644 --- a/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java +++ b/engine/src/test/java/com/datatorrent/stram/GenericOperatorPropertyCodecTest.java @@ -41,7 +41,7 @@ public class GenericOperatorPropertyCodecTest public void testGenericOperatorPropertyCodec() { LogicalPlan dag = new LogicalPlan(); - Map, Class>> codecs = new HashMap, Class>>(); + Map, Class>> codecs = new HashMap<>(); codecs.put(GenericOperatorProperty.class, GenericOperatorProperty.GenericOperatorPropertyStringCodec.class); dag.setAttribute(com.datatorrent.api.Context.DAGContext.STRING_CODECS, codecs); dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent()); diff --git a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java index ecbeeb61ac..f0199f90ae 100644 --- a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java +++ b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java @@ -86,7 +86,7 @@ public static class CollectorOperator extends BaseOperator /* * Received tuples are stored in a map keyed with the system assigned operator id. */ - public static final ConcurrentHashMap> receivedTuples = new ConcurrentHashMap>(); + public static final ConcurrentHashMap> receivedTuples = new ConcurrentHashMap<>(); private transient int operatorId; public String prefix = ""; @@ -107,7 +107,7 @@ public void process(Object tuple) synchronized (receivedTuples) { List l = receivedTuples.get(id); if (l == null) { - l = Collections.synchronizedList(new ArrayList()); + l = Collections.synchronizedList(new ArrayList<>()); //LOG.debug("adding {} {}", id, l); receivedTuples.put(id, l); } @@ -121,12 +121,12 @@ public void process(Object tuple) }; @OutputPortFieldAnnotation( optional = true) - public final transient DefaultOutputPort output = new DefaultOutputPort(); + public final transient DefaultOutputPort output = new DefaultOutputPort<>(); } public static class TestInputOperator extends BaseOperator implements InputOperator { - public final transient DefaultOutputPort output = new DefaultOutputPort(); + public final transient DefaultOutputPort output = new DefaultOutputPort<>(); transient boolean first; transient long windowId; boolean blockEndStream = false; @@ -178,9 +178,9 @@ public void testDefaultPartitioning() throws Exception CollectorOperator.receivedTuples.clear(); TestInputOperator input = dag.addOperator("input", new TestInputOperator()); - input.testTuples = new ArrayList>(); + input.testTuples = new ArrayList<>(); for (Integer[] tuples: testData) { - input.testTuples.add(new ArrayList(Arrays.asList(tuples))); + input.testTuples.add(new ArrayList<>(Arrays.asList(tuples))); } CollectorOperator collector = dag.addOperator("collector", new CollectorOperator()); collector.prefix = "" + System.identityHashCode(collector); @@ -234,7 +234,7 @@ public static void put(PTOperator oper, int load) { Map m = loadIndicators.get(); if (m == null) { - loadIndicators.set(m = new ConcurrentHashMap()); + loadIndicators.set(m = new ConcurrentHashMap<>()); } m.put(oper.getId(), load); } @@ -341,7 +341,7 @@ public void testDynamicDefaultPartitioning() throws Exception Assert.assertNotNull("" + nodeMap, inputDeployed); // add tuple that matches the partition key and check that each partition receives it - ArrayList inputTuples = new ArrayList(); + ArrayList inputTuples = new ArrayList<>(); LOG.debug("Number of partitions {}", partitions.size()); for (PTOperator p: partitions) { // default partitioning has one port mapping with a single partition key @@ -391,7 +391,7 @@ public void emitTuples() @Override public Collection> definePartitions(Collection> partitions, PartitioningContext context) { - List> newPartitions = new ArrayList>(3); + List> newPartitions = new ArrayList<>(3); Iterator> iterator = partitions.iterator(); Partition templatePartition; for (int i = 0; i < 3; i++) { @@ -401,7 +401,7 @@ public Collection> definePartitions(Collec op.partitionProperty = templatePartition.getPartitionedInstance().partitionProperty; } op.partitionProperty += "_" + i; - newPartitions.add(new DefaultPartition(op)); + newPartitions.add(new DefaultPartition<>(op)); } return newPartitions; } @@ -431,7 +431,7 @@ private void testInputOperatorPartitioning(LogicalPlan dag) throws Exception lc.runAsync(); List partitions = assertNumberPartitions(3, lc, dag.getMeta(input)); - Set partProperties = new HashSet(); + Set partProperties = new HashSet<>(); for (PTOperator p : partitions) { LocalStreamingContainer c = StramTestSupport.waitForActivation(lc, p); Map> nodeMap = c.getNodes(); @@ -460,7 +460,7 @@ private void testInputOperatorPartitioning(LogicalPlan dag) throws Exception PartitionLoadWatch.remove(partitions.get(0)); partitions = assertNumberPartitions(3, lc, dag.getMeta(input)); - partProperties = new HashSet(); + partProperties = new HashSet<>(); for (PTOperator p: partitions) { LocalStreamingContainer c = StramTestSupport.waitForActivation(lc, p); Map> nodeMap = c.getNodes(); diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java index 35bb36320e..cee8247d4b 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java @@ -1007,7 +1007,7 @@ public void testDynamicPartitioningStreamCodec() lastId = assignNewContainers(dnm, lastId); List operators = plan.getOperators(n2meta); - List upstreamOperators = new ArrayList(); + List upstreamOperators = new ArrayList<>(); for (PTOperator operator : operators) { upstreamOperators.addAll(operator.upstreamMerge.values()); /* @@ -1036,7 +1036,7 @@ public void testDynamicPartitioningStreamCodec() lastId = assignNewContainers(dnm, lastId); List operators = plan.getOperators(n3meta); - List upstreamOperators = new ArrayList(); + List upstreamOperators = new ArrayList<>(); for (PTOperator operator : operators) { upstreamOperators.addAll(operator.upstreamMerge.values()); } @@ -1063,7 +1063,7 @@ public void testDynamicPartitioningStreamCodec() lastId = assignNewContainers(dnm, lastId); List operators = plan.getOperators(n2meta); - List upstreamOperators = new ArrayList(); + List upstreamOperators = new ArrayList<>(); for (PTOperator operator : operators) { upstreamOperators.addAll(operator.upstreamMerge.values()); /* @@ -1144,7 +1144,7 @@ private void markAllOperatorsActive(PhysicalPlan plan) private Set getUnifiers(PhysicalPlan plan) { - Set unifiers = new HashSet(); + Set unifiers = new HashSet<>(); for (PTContainer container : plan.getContainers()) { for (PTOperator operator : container.getOperators()) { if (operator.isUnifier()) { diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java index 84622c4e14..abdfcf7499 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java @@ -124,14 +124,14 @@ public void testDeployInfoSerialization() throws Exception input.portName = "inputPortNameOnNode"; input.sourceNodeId = 99; - ndi.inputs = new ArrayList(); + ndi.inputs = new ArrayList<>(); ndi.inputs.add(input); OperatorDeployInfo.OutputDeployInfo output = new OperatorDeployInfo.OutputDeployInfo(); output.declaredStreamId = "streamFromNode"; output.portName = "outputPortNameOnNode"; - ndi.outputs = new ArrayList(); + ndi.outputs = new ArrayList<>(); ndi.outputs.add(output); ContainerHeartbeatResponse scc = new ContainerHeartbeatResponse(); diff --git a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java index f6b72775d4..59f9dcc480 100644 --- a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java +++ b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliMiscTest.java @@ -34,7 +34,7 @@ public class ApexCliMiscTest { ApexCli cli; - static Map env = new HashMap(); + static Map env = new HashMap<>(); static String userHome; @Before diff --git a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java index 2ac1c500db..f1356dfb6d 100644 --- a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java +++ b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java @@ -59,7 +59,7 @@ public class ApexCliTest static TemporaryFolder testFolder = new TemporaryFolder(); ApexCli cli; - static Map env = new HashMap(); + static Map env = new HashMap<>(); static String userHome; @BeforeClass diff --git a/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java index d1a18ae673..26aced84a7 100644 --- a/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java +++ b/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java @@ -112,8 +112,8 @@ public void testVirginKryo() @Test public void testString() { - StatefulStreamCodec coder = new DefaultStatefulStreamCodec(); - StatefulStreamCodec decoder = new DefaultStatefulStreamCodec(); + StatefulStreamCodec coder = new DefaultStatefulStreamCodec<>(); + StatefulStreamCodec decoder = new DefaultStatefulStreamCodec<>(); String hello = "hello"; @@ -182,7 +182,7 @@ public TestTuple(Integer i) public void testFinalFieldSerialization() throws Exception { TestTuple t1 = new TestTuple(5); - DefaultStatefulStreamCodec c = new DefaultStatefulStreamCodec(); + DefaultStatefulStreamCodec c = new DefaultStatefulStreamCodec<>(); DataStatePair dsp = c.toDataStatePair(t1); TestTuple t2 = (TestTuple)c.fromDataStatePair(dsp); Assert.assertEquals("", t1.finalField, t2.finalField); @@ -208,7 +208,7 @@ public void testInnerClassSerialization() throws Exception Object inner = outer.new InnerClass(); for (Object o: new Object[] {outer, inner}) { - DefaultStatefulStreamCodec c = new DefaultStatefulStreamCodec(); + DefaultStatefulStreamCodec c = new DefaultStatefulStreamCodec<>(); DataStatePair dsp = c.toDataStatePair(o); c.fromDataStatePair(dsp); diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java index cc777f7b26..64298def41 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java @@ -77,7 +77,7 @@ public void testLinearInlineOperatorsRecovery() throws Exception @Override public void testNonLinearOperatorRecovery() throws InterruptedException { - final HashSet collection = new HashSet(); + final HashSet collection = new HashSet<>(); com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap map = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); map.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 0); diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java index da5c7b739a..66f1b84ae6 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java @@ -197,7 +197,7 @@ public void process(Object tuple) }; @OutputPortFieldAnnotation( optional = true) - DefaultOutputPort op = new DefaultOutputPort(); + DefaultOutputPort op = new DefaultOutputPort<>(); @Override public void beginWindow(long windowId) @@ -226,7 +226,7 @@ public void teardown() public static class CheckpointDistanceOperator extends GenericOperator { - List distances = new ArrayList(); + List distances = new ArrayList<>(); int numWindows = 0; int maxWindows = 0; @@ -245,7 +245,7 @@ public void beginWindow(long windowId) public void testSynchingLogic() throws InterruptedException { long sleeptime = 25L; - final ArrayList list = new ArrayList(); + final ArrayList list = new ArrayList<>(); GenericOperator go = new GenericOperator(); final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator", new DefaultAttributeMap(), null)); @@ -376,8 +376,8 @@ public void testBufferServerSubscriberActivationBeforeOperator() throws Interrup final Server bufferServer = new Server(eventloop, 0); // find random port final int bufferServerPort = bufferServer.run().getPort(); - final StreamCodec serde = new DefaultStatefulStreamCodec(); - final BlockingQueue tuples = new ArrayBlockingQueue(10); + final StreamCodec serde = new DefaultStatefulStreamCodec<>(); + final BlockingQueue tuples = new ArrayBlockingQueue<>(10); GenericTestOperator go = new GenericTestOperator(); final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator", @@ -905,7 +905,7 @@ private void testCheckpointDistance(int dagCheckPoint, int opCheckPoint) throws CheckpointDistanceOperator go = new CheckpointDistanceOperator(); go.maxWindows = maxWindows; - List checkpoints = new ArrayList(); + List checkpoints = new ArrayList<>(); int window = 0; while (window < maxWindows) { diff --git a/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java index 84217ebeb4..bb2e72faf9 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java @@ -51,10 +51,10 @@ public class InputOperatorTest public static class EvenOddIntegerGeneratorInputOperator implements InputOperator, com.datatorrent.api.Operator.ActivationListener { - public final transient DefaultOutputPort even = new DefaultOutputPort(); - public final transient DefaultOutputPort odd = new DefaultOutputPort(); - private final transient CircularBuffer evenBuffer = new CircularBuffer(1024); - private final transient CircularBuffer oddBuffer = new CircularBuffer(1024); + public final transient DefaultOutputPort even = new DefaultOutputPort<>(); + public final transient DefaultOutputPort odd = new DefaultOutputPort<>(); + private final transient CircularBuffer evenBuffer = new CircularBuffer<>(1024); + private final transient CircularBuffer oddBuffer = new CircularBuffer<>(1024); private volatile Thread dataGeneratorThread; @Override @@ -179,7 +179,7 @@ public void process(T tuple) public void setConnected(boolean flag) { if (flag) { - collections.put(id, list = new ArrayList()); + collections.put(id, list = new ArrayList<>()); } } } diff --git a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java index 55b5eab30b..f669832893 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java @@ -171,7 +171,7 @@ static class Call } - static final ArrayList calls = new ArrayList(); + static final ArrayList calls = new ArrayList<>(); @Override public void save(Object object, int operatorId, long windowId) throws IOException diff --git a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java index 3e208c8dd0..952a36b383 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java @@ -152,8 +152,8 @@ public void testLinearInlineOperatorsRecovery() throws Exception public static class CollectorOperator extends BaseOperator implements com.datatorrent.api.Operator.CheckpointListener { - public static HashSet collection = new HashSet(20); - public static ArrayList duplicates = new ArrayList(); + public static HashSet collection = new HashSet<>(20); + public static ArrayList duplicates = new ArrayList<>(); private boolean simulateFailure; private long checkPointWindowId; public final transient DefaultInputPort input = new DefaultInputPort() @@ -211,7 +211,7 @@ public static class MultiInputOperator implements Operator { public final transient MyInputPort input1 = new MyInputPort(100); public final transient MyInputPort input2 = new MyInputPort(200); - public final transient DefaultOutputPort output = new DefaultOutputPort(); + public final transient DefaultOutputPort output = new DefaultOutputPort<>(); public class MyInputPort extends DefaultInputPort { @@ -255,7 +255,7 @@ public void teardown() @SuppressWarnings("SleepWhileInLoop") public void testNonLinearOperatorRecovery() throws InterruptedException { - final HashSet collection = new HashSet(); + final HashSet collection = new HashSet<>(); com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap map = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); map.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 0); map.put(OperatorContext.PROCESSING_MODE, processingMode); diff --git a/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java b/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java index 8ba57be459..d8d97e00c5 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java @@ -47,7 +47,7 @@ public void beginWindow(long windowId) emit = true; } - public final transient DefaultOutputPort defaultOutputPort = new DefaultOutputPort(); + public final transient DefaultOutputPort defaultOutputPort = new DefaultOutputPort<>(); @Override public void emitTuples() diff --git a/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java b/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java index 46253683f2..47f1ea0577 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java @@ -83,7 +83,7 @@ public void emitTuples() public static class TestInputStatsListener implements StatsListener, Serializable { private static final long serialVersionUID = 1L; - private List inputOperatorStats = new ArrayList(); + private List inputOperatorStats = new ArrayList<>(); @Override public Response processStats(BatchedOperatorStats stats) @@ -101,7 +101,7 @@ public Response processStats(BatchedOperatorStats stats) public static class TestCollector extends GenericTestOperator implements StatsListener { transient long windowId; - List collectorOperatorStats = new ArrayList(); + List collectorOperatorStats = new ArrayList<>(); @Override public Response processStats(BatchedOperatorStats stats) @@ -129,7 +129,7 @@ public void beginWindow(long windowId) public static class TestCollectorStatsListener implements StatsListener, Serializable { private static final long serialVersionUID = 1L; - List collectorOperatorStats = new ArrayList(); + List collectorOperatorStats = new ArrayList<>(); @Override public Response processStats(BatchedOperatorStats stats) diff --git a/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java b/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java index 451972e0c6..36f9d633f3 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java @@ -93,7 +93,7 @@ public void testOiOCommitted() throws IOException, ClassNotFoundException private static class CommitAwareOperator extends BaseOperator implements CheckpointListener, InputOperator { private transient String name; - public final transient DefaultOutputPort output = new DefaultOutputPort(); + public final transient DefaultOutputPort output = new DefaultOutputPort<>(); @InputPortFieldAnnotation(optional = true) public final transient DefaultInputPort input = new DefaultInputPort() diff --git a/engine/src/test/java/com/datatorrent/stram/engine/TestGeneratorInputOperator.java b/engine/src/test/java/com/datatorrent/stram/engine/TestGeneratorInputOperator.java index e6b5cd5f80..9e6c788b4b 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/TestGeneratorInputOperator.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/TestGeneratorInputOperator.java @@ -41,9 +41,9 @@ public class TestGeneratorInputOperator implements InputOperator private int emitInterval = 1000; private final int spinMillis = 50; private String myStringProperty; - private final ConcurrentLinkedQueue externallyAddedTuples = new ConcurrentLinkedQueue(); + private final ConcurrentLinkedQueue externallyAddedTuples = new ConcurrentLinkedQueue<>(); @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort outport = new DefaultOutputPort(); + public final transient DefaultOutputPort outport = new DefaultOutputPort<>(); public int getMaxTuples() { diff --git a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java index 0c95b7596e..85125c70d8 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java @@ -256,7 +256,7 @@ public void put(Object payload) static class RandomNumberGenerator implements InputOperator { - public final transient DefaultOutputPort output = new DefaultOutputPort(); + public final transient DefaultOutputPort output = new DefaultOutputPort<>(); @Override public void emitTuples() diff --git a/engine/src/test/java/com/datatorrent/stram/moduleexperiment/InjectConfigTest.java b/engine/src/test/java/com/datatorrent/stram/moduleexperiment/InjectConfigTest.java index 07302899fa..7dc0686476 100644 --- a/engine/src/test/java/com/datatorrent/stram/moduleexperiment/InjectConfigTest.java +++ b/engine/src/test/java/com/datatorrent/stram/moduleexperiment/InjectConfigTest.java @@ -220,7 +220,7 @@ public void setIntProp(int prop) public transient String transientProperty = "transientProperty"; public java.util.concurrent.ConcurrentHashMap mapProperty = new java.util.concurrent - .ConcurrentHashMap(); + .ConcurrentHashMap<>(); public java.util.concurrent.ConcurrentHashMap getMapProperty() { diff --git a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java index d40fd7bd70..9d66ca31e3 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java @@ -570,12 +570,12 @@ public StreamCodec getStreamCodec() } }; - public final transient DefaultOutputPort output = new DefaultOutputPort(); + public final transient DefaultOutputPort output = new DefaultOutputPort<>(); @Override public Collection definePartitions(Collection partitions, PartitioningContext context) { - Collection newPartitions = new ArrayList(); + Collection newPartitions = new ArrayList<>(); // Mostly for 1 partition we dont need to do this int partitionBits = (Integer.numberOfLeadingZeros(0) - Integer.numberOfLeadingZeros(1)); @@ -590,7 +590,7 @@ public Collection definePartitions(Collection partitions, PartitioningContext co // No partitioning done so far.. // Single partition again, but with only even numbers ok? PassThruOperatorWithCodec newInstance = new PassThruOperatorWithCodec(); - Partition partition = new DefaultPartition(newInstance); + Partition partition = new DefaultPartition<>(newInstance); // Consider partitions are 1 & 2 and we are sending only 1 partition // Partition 1 = even numbers @@ -796,7 +796,7 @@ public void process(Object tuple) } }; - public final transient DefaultOutputPort output = new DefaultOutputPort(); + public final transient DefaultOutputPort output = new DefaultOutputPort<>(); @Override public Collection definePartitions(Collection partitions, PartitioningContext context) diff --git a/engine/src/test/java/com/datatorrent/stram/plan/TestPlanContext.java b/engine/src/test/java/com/datatorrent/stram/plan/TestPlanContext.java index 14f2b1b04d..705904e72e 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/TestPlanContext.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/TestPlanContext.java @@ -45,7 +45,7 @@ public class TestPlanContext implements PlanContext, StorageAgent { - public List events = new ArrayList(); + public List events = new ArrayList<>(); public Collection undeploy; public Collection deploy; public Set releaseContainers; diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java index 18dfd99f8c..3999acef28 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java @@ -625,7 +625,7 @@ public void process(Integer tuple) output.emit(tuple); } }; - public transient DefaultOutputPort output = new DefaultOutputPort(); + public transient DefaultOutputPort output = new DefaultOutputPort<>(); } class DummyOutputOperator extends BaseOperator @@ -644,8 +644,8 @@ public void process(Integer tuple) class TestUnifierAttributeModule implements Module { - public transient ProxyInputPort moduleInput = new ProxyInputPort(); - public transient ProxyOutputPort moduleOutput = new Module.ProxyOutputPort(); + public transient ProxyInputPort moduleInput = new ProxyInputPort<>(); + public transient ProxyOutputPort moduleOutput = new Module.ProxyOutputPort<>(); @Override public void populateDAG(DAG dag, Configuration conf) diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java index dd32cc75bc..1507e2db7d 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java @@ -173,9 +173,9 @@ public void testCycleDetectionWithDelay() public static class ValidationOperator extends BaseOperator { - public final transient DefaultOutputPort goodOutputPort = new DefaultOutputPort(); + public final transient DefaultOutputPort goodOutputPort = new DefaultOutputPort<>(); - public final transient DefaultOutputPort badOutputPort = new DefaultOutputPort(); + public final transient DefaultOutputPort badOutputPort = new DefaultOutputPort<>(); } public static class CounterOperator extends BaseOperator @@ -641,7 +641,7 @@ public void emitTuples() private class TestAnnotationsOperator2 extends BaseOperator implements InputOperator { - public final transient DefaultOutputPort outport1 = new DefaultOutputPort(); + public final transient DefaultOutputPort outport1 = new DefaultOutputPort<>(); @Override public void emitTuples() @@ -653,9 +653,9 @@ public void emitTuples() private class TestAnnotationsOperator3 extends BaseOperator implements InputOperator { @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort outport1 = new DefaultOutputPort(); + public final transient DefaultOutputPort outport1 = new DefaultOutputPort<>(); @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort outport2 = new DefaultOutputPort(); + public final transient DefaultOutputPort outport2 = new DefaultOutputPort<>(); @Override public void emitTuples() @@ -773,7 +773,7 @@ public void testJdkSerializableOperator() throws Exception public void testAttributeValuesSerializableCheck() throws NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException { LogicalPlan dag = new LogicalPlan(); - Attribute attr = new Attribute(new TestAttributeValue(), new Object2String()); + Attribute attr = new Attribute<>(new TestAttributeValue(), new Object2String()); Field nameField = Attribute.class.getDeclaredField("name"); nameField.setAccessible(true); nameField.set(attr, "Test_Attribute"); diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java index 19666787d8..64aaa44710 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java @@ -50,7 +50,7 @@ static class DummyInputOperator extends BaseOperator implements InputOperator { Random r = new Random(); - public transient DefaultOutputPort output = new DefaultOutputPort(); + public transient DefaultOutputPort output = new DefaultOutputPort<>(); @Override public void emitTuples() @@ -73,7 +73,7 @@ public void process(Integer tuple) output.emit(tuple); } }; - public transient DefaultOutputPort output = new DefaultOutputPort(); + public transient DefaultOutputPort output = new DefaultOutputPort<>(); } /* @@ -92,7 +92,7 @@ public void process(Integer tuple) output.emit(tuple); } }; - public transient DefaultOutputPort output = new DefaultOutputPort(); + public transient DefaultOutputPort output = new DefaultOutputPort<>(); } /* @@ -118,8 +118,8 @@ public void process(Integer tuple) static class TestModule implements Module { - public transient ProxyInputPort moduleInput = new ProxyInputPort(); - public transient ProxyOutputPort moduleOutput = new Module.ProxyOutputPort(); + public transient ProxyInputPort moduleInput = new ProxyInputPort<>(); + public transient ProxyOutputPort moduleOutput = new Module.ProxyOutputPort<>(); @Override public void populateDAG(DAG dag, Configuration conf) diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java index 5b5583a091..7759363dcb 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java @@ -59,7 +59,7 @@ public static class DummyInputOperator extends BaseOperator implements InputOper private int inputOperatorProp = 0; Random r = new Random(); - public transient DefaultOutputPort out = new DefaultOutputPort(); + public transient DefaultOutputPort out = new DefaultOutputPort<>(); @Override public void emitTuples() diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java index 8fad613b0f..956db88c30 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java @@ -49,10 +49,10 @@ public static class GenericModule implements Module public volatile Object inport1Tuple = null; @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort outport1 = new DefaultOutputPort(); + public final transient DefaultOutputPort outport1 = new DefaultOutputPort<>(); @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort outport2 = new DefaultOutputPort(); + public final transient DefaultOutputPort outport2 = new DefaultOutputPort<>(); private String emitFormat; diff --git a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java index 61a85a526b..941f5a4b17 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java @@ -476,7 +476,7 @@ public void testInputOperatorPartitioning() OperatorMeta o1Meta = dag.getMeta(o1); dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()})); - TestPartitioner> partitioner = new TestPartitioner>(); + TestPartitioner> partitioner = new TestPartitioner<>(); dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, partitioner); TestPlanContext ctx = new TestPlanContext(); @@ -497,7 +497,7 @@ public void testInputOperatorPartitioning() plan.onStatusUpdate(o1p1); Assert.assertEquals("scale up triggered", 1, ctx.events.size()); // add another partition, keep existing as is - partitioner.extraPartitions.add(new DefaultPartition>(o1)); + partitioner.extraPartitions.add(new DefaultPartition<>(o1)); Runnable r = ctx.events.remove(0); r.run(); partitioner.extraPartitions.clear(); @@ -745,7 +745,7 @@ public void testDefaultRepartitioning() partitions.add(new DefaultPartition(operator, p1Keys, 1, null)); } - ArrayList> lowLoadPartitions = new ArrayList>(); + ArrayList> lowLoadPartitions = new ArrayList<>(); for (Partition p : partitions) { lowLoadPartitions.add(new DefaultPartition<>(p.getPartitionedInstance(), p.getPartitionKeys(), -1, null)); } @@ -793,7 +793,7 @@ public void testDefaultRepartitioning() for (Set expectedKeys: expectedKeysSets) { List> clonePartitions = Lists.newArrayList(); for (PartitionKeys pks: twoBitPartitionKeys) { - Map, PartitionKeys> p1Keys = new HashMap, PartitionKeys>(); + Map, PartitionKeys> p1Keys = new HashMap<>(); p1Keys.put(operator.inport1, pks); int load = expectedKeys.contains(pks) ? 0 : -1; clonePartitions.add(new DefaultPartition(operator, p1Keys, load, null)); @@ -876,7 +876,7 @@ public void testInline() Assert.assertEquals("operators container 0", 1, plan.getContainers().get(0).getOperators().size()); Set c2ExpNodes = Sets.newHashSet(dag.getMeta(o2), dag.getMeta(o3)); - Set c2ActNodes = new HashSet(); + Set c2ActNodes = new HashSet<>(); PTContainer c2 = plan.getContainers().get(1); for (PTOperator pNode : c2.getOperators()) { c2ActNodes.add(pNode.getOperatorMeta()); @@ -1139,7 +1139,7 @@ public void testMxNPartitioning() LogicalPlan dag = new LogicalPlan(); TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class); - TestPartitioner o1Partitioner = new TestPartitioner(); + TestPartitioner o1Partitioner = new TestPartitioner<>(); o1Partitioner.setPartitionCount(2); dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, o1Partitioner); dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch())); @@ -1312,7 +1312,7 @@ public void testMxNPartitioning() } Assert.assertEquals("repartition event", 1, ctx.events.size()); - o1Partitioner.extraPartitions.add(new DefaultPartition(o1)); + o1Partitioner.extraPartitions.add(new DefaultPartition<>(o1)); ctx.events.remove(0).run(); o1Partitioner.extraPartitions.clear(); @@ -1607,7 +1607,7 @@ public Collection> definePartitions(Collection> partit } T paritionable = first.getPartitionedInstance(); for (int i = partitions.size(); i < numTotal; ++i) { - newPartitions.add(new DefaultPartition(paritionable)); + newPartitions.add(new DefaultPartition<>(paritionable)); } return newPartitions; } @@ -1767,7 +1767,7 @@ public void testSingleFinalCascadingUnifier() List o1Unifiers = plan.getMergeOperators(o1Meta); Assert.assertEquals("o1Unifiers " + o1Meta, 3, o1Unifiers.size()); // 2 cascadingUnifiers and one-downstream partition unifier - List finalUnifiers = new ArrayList(); + List finalUnifiers = new ArrayList<>(); for (PTOperator o : o1Unifiers) { Assert.assertEquals("inputs " + o, 2, o.getInputs().size()); Assert.assertEquals("outputs " + o, 1, o.getOutputs().size()); @@ -2054,7 +2054,7 @@ public Collection> definePartitions(Collection> partit if (context.getParallelPartitionCount() > 0 && newPartitions.size() < context.getParallelPartitionCount()) { // parallel partitioned, fill to requested count for (int i = newPartitions.size(); i < context.getParallelPartitionCount(); i++) { - newPartitions.add(new DefaultPartition(partitions.iterator().next().getPartitionedInstance())); + newPartitions.add(new DefaultPartition<>(partitions.iterator().next().getPartitionedInstance())); } } return newPartitions; @@ -2215,9 +2215,9 @@ public void testMxNPartitionForSlidingWindow() GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class); dag.setOperatorAttribute(o1, OperatorContext.APPLICATION_WINDOW_COUNT, 4); dag.setOperatorAttribute(o1, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2); - dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner(2)); + dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<>(2)); dag.getOperatorMeta("o1").getMeta(o1.outport1).getUnifierMeta().getAttributes().put(OperatorContext.MEMORY_MB, 1024); - dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner(2)); + dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<>(2)); dag.setOperatorAttribute(o2, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2); dag.setOperatorAttribute(o2, OperatorContext.APPLICATION_WINDOW_COUNT, 4); @@ -2237,7 +2237,7 @@ public void testParallelPartitionForSlidingWindow() GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class); dag.setOperatorAttribute(o1, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2); - dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner(2)); + dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<>(2)); dag.setInputPortAttribute(o2.inport1, PortContext.PARTITION_PARALLEL, true); dag.setOperatorAttribute(o1, OperatorContext.APPLICATION_WINDOW_COUNT, 4); diff --git a/engine/src/test/java/com/datatorrent/stram/stream/BufferServerSubscriberTest.java b/engine/src/test/java/com/datatorrent/stram/stream/BufferServerSubscriberTest.java index 77334db9c5..2cb3e589cb 100644 --- a/engine/src/test/java/com/datatorrent/stram/stream/BufferServerSubscriberTest.java +++ b/engine/src/test/java/com/datatorrent/stram/stream/BufferServerSubscriberTest.java @@ -41,7 +41,7 @@ public class BufferServerSubscriberTest @Test public void testEmergencySinks() throws InterruptedException { - final List list = new ArrayList(); + final List list = new ArrayList<>(); final StreamCodec myserde = new StreamCodec() { @Override diff --git a/engine/src/test/java/com/datatorrent/stram/stream/FastPublisherTest.java b/engine/src/test/java/com/datatorrent/stram/stream/FastPublisherTest.java index e094d44f89..a48717623a 100644 --- a/engine/src/test/java/com/datatorrent/stram/stream/FastPublisherTest.java +++ b/engine/src/test/java/com/datatorrent/stram/stream/FastPublisherTest.java @@ -50,7 +50,7 @@ public void testSerialization() throws Exception byte[] buffer = publisher.consume(); FastSubscriber subscriber = new FastSubscriber("subscriber", 1024); - subscriber.serde = subscriber.statefulSerde = new DefaultStatefulStreamCodec(); + subscriber.serde = subscriber.statefulSerde = new DefaultStatefulStreamCodec<>(); SweepableReservoir sr = subscriber.acquireReservoir("res", 1024); sr.setSink(new Sink() { diff --git a/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java index fd67121272..32e5095c86 100644 --- a/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java +++ b/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java @@ -94,7 +94,7 @@ public static void tearDown() throws IOException @SuppressWarnings({"SleepWhileInLoop"}) public void testBufferServerStream() throws Exception { - final StreamCodec serde = new DefaultStatefulStreamCodec(); + final StreamCodec serde = new DefaultStatefulStreamCodec<>(); final AtomicInteger messageCount = new AtomicInteger(); Sink sink = new Sink() { diff --git a/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java index ed145c7dd0..9db6fe97a0 100644 --- a/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java +++ b/engine/src/test/java/com/datatorrent/stram/stream/InlineStreamTest.java @@ -55,13 +55,13 @@ public void test() throws Exception { final int totalTupleCount = 5000; - final PassThroughNode operator1 = new PassThroughNode(); + final PassThroughNode operator1 = new PassThroughNode<>(); final GenericNode node1 = new GenericNode(operator1, new OperatorContext(1, "operator1", new DefaultAttributeMap(), null)); node1.setId(1); operator1.setup(node1.context); - final PassThroughNode operator2 = new PassThroughNode(); + final PassThroughNode operator2 = new PassThroughNode<>(); final GenericNode node2 = new GenericNode(operator2, new OperatorContext(2, "operator2", new DefaultAttributeMap(), null)); node2.setId(2); @@ -115,7 +115,7 @@ public int getCount(boolean reset) AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("input", 1024 * 5); node1.connectInputPort("input", reservoir1); - Map> activeNodes = new ConcurrentHashMap>(); + Map> activeNodes = new ConcurrentHashMap<>(); launchNodeThread(node1, activeNodes); launchNodeThread(node2, activeNodes); stream.activate(streamContext); @@ -206,7 +206,7 @@ public void process(T tuple) } }; - public final DefaultOutputPort output = new DefaultOutputPort(); + public final DefaultOutputPort output = new DefaultOutputPort<>(); private boolean logMessages = false; public boolean isLogMessages() diff --git a/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java b/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java index f0748ebdea..287a7964d0 100644 --- a/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java +++ b/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java @@ -47,7 +47,7 @@ public OiOEndWindowTest() public static class TestInputOperator extends BaseOperator implements InputOperator { - public final transient DefaultOutputPort output = new DefaultOutputPort(); + public final transient DefaultOutputPort output = new DefaultOutputPort<>(); @Override public void emitTuples() @@ -60,7 +60,7 @@ public void emitTuples() public static class FirstGenericOperator extends BaseOperator { public static long endwindowCount; - public final transient DefaultOutputPort output = new DefaultOutputPort(); + public final transient DefaultOutputPort output = new DefaultOutputPort<>(); public final transient DefaultInputPort input = new DefaultInputPort() { @Override diff --git a/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java index e614750837..26e913bfd7 100644 --- a/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java +++ b/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java @@ -298,7 +298,7 @@ public void validateNegativeOiOiOExtendeddiamond() public static class ThreadIdValidatingInputOperator implements InputOperator { - public final transient DefaultOutputPort output = new DefaultOutputPort(); + public final transient DefaultOutputPort output = new DefaultOutputPort<>(); public static long threadId; @Override @@ -416,12 +416,12 @@ public void teardown() assert (threadList.contains(Thread.currentThread().getId())); } - public final transient DefaultOutputPort output = new DefaultOutputPort(); + public final transient DefaultOutputPort output = new DefaultOutputPort<>(); } public static class ThreadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts extends ThreadIdValidatingGenericIntermediateOperator { - public final transient DefaultOutputPort output2 = new DefaultOutputPort(); + public final transient DefaultOutputPort output2 = new DefaultOutputPort<>(); } public static class ThreadIdValidatingGenericOperatorWithTwoInputPorts implements Operator @@ -621,9 +621,9 @@ public void validateOiOiOTreeImplementation() throws Exception slc.run(); Assert.assertEquals("nonOIO: Number of threads ThreadIdValidatingGenericIntermediateOperator", 3, ThreadIdValidatingGenericIntermediateOperator.threadList.size()); - Assert.assertEquals("nonOIO: Number of unique threads ThreadIdValidatingGenericIntermediateOperator", 3, (new HashSet(ThreadIdValidatingGenericIntermediateOperator.threadList)).size()); + Assert.assertEquals("nonOIO: Number of unique threads ThreadIdValidatingGenericIntermediateOperator", 3, (new HashSet<>(ThreadIdValidatingGenericIntermediateOperator.threadList)).size()); Assert.assertEquals("nonOIO: Number of threads ThreadIdValidatingOutputOperator", 4, ThreadIdValidatingOutputOperator.threadList.size()); - Assert.assertEquals("nonOIO: Number of unique threads ThreadIdValidatingOutputOperator", 4, (new HashSet(ThreadIdValidatingOutputOperator.threadList)).size()); + Assert.assertEquals("nonOIO: Number of unique threads ThreadIdValidatingOutputOperator", 4, (new HashSet<>(ThreadIdValidatingOutputOperator.threadList)).size()); Assert.assertFalse("nonOIO:: inputOperator1 : ThreadIdValidatingOutputOperator", ThreadIdValidatingOutputOperator.threadList.contains(ThreadIdValidatingInputOperator.threadId)); Assert.assertFalse("nonOIO:: inputOperator1 : ThreadIdValidatingGenericIntermediateOperator", ThreadIdValidatingGenericIntermediateOperator.threadList.contains(ThreadIdValidatingInputOperator.threadId)); @@ -640,9 +640,9 @@ public void validateOiOiOTreeImplementation() throws Exception slc.run(); Assert.assertEquals("OIO: Number of threads ThreadIdValidatingGenericIntermediateOperator", 3, ThreadIdValidatingGenericIntermediateOperator.threadList.size()); - Assert.assertEquals("OIO: Number of unique threads ThreadIdValidatingGenericIntermediateOperator", 1, (new HashSet(ThreadIdValidatingGenericIntermediateOperator.threadList)).size()); + Assert.assertEquals("OIO: Number of unique threads ThreadIdValidatingGenericIntermediateOperator", 1, (new HashSet<>(ThreadIdValidatingGenericIntermediateOperator.threadList)).size()); Assert.assertEquals("OIO: Number of threads ThreadIdValidatingOutputOperator", 4, ThreadIdValidatingOutputOperator.threadList.size()); - Assert.assertEquals("OIO: Number of unique threads ThreadIdValidatingOutputOperator", 3, (new HashSet(ThreadIdValidatingOutputOperator.threadList)).size()); + Assert.assertEquals("OIO: Number of unique threads ThreadIdValidatingOutputOperator", 3, (new HashSet<>(ThreadIdValidatingOutputOperator.threadList)).size()); Assert.assertTrue("OIO:: inputOperator1 : ThreadIdValidatingOutputOperator", ThreadIdValidatingOutputOperator.threadList.contains(ThreadIdValidatingInputOperator.threadId)); Assert.assertTrue("OIO:: inputOperator1 : ThreadIdValidatingGenericIntermediateOperator", ThreadIdValidatingGenericIntermediateOperator.threadList.contains(ThreadIdValidatingInputOperator.threadId)); } diff --git a/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java index 38460ea5c8..15de1773a1 100644 --- a/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java +++ b/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java @@ -149,7 +149,7 @@ public void testBufferServerStreamWithLateActivationForSubscriber() throws Excep @Before public void init() { - final StreamCodec serde = new DefaultStatefulStreamCodec(); + final StreamCodec serde = new DefaultStatefulStreamCodec<>(); messageCount = new AtomicInteger(0); Sink sink = new Sink() diff --git a/engine/src/test/java/com/datatorrent/stram/support/ManualScheduledExecutorService.java b/engine/src/test/java/com/datatorrent/stram/support/ManualScheduledExecutorService.java index 902b8b6b4d..af821ea1ec 100644 --- a/engine/src/test/java/com/datatorrent/stram/support/ManualScheduledExecutorService.java +++ b/engine/src/test/java/com/datatorrent/stram/support/ManualScheduledExecutorService.java @@ -37,7 +37,7 @@ class TimedRunnable public long time; } - PriorityQueue queue = new PriorityQueue(16, new Comparator() + PriorityQueue queue = new PriorityQueue<>(16, new Comparator() { @Override public int compare(TimedRunnable o1, TimedRunnable o2) diff --git a/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java b/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java index 0326b6a25a..7399aff8d7 100644 --- a/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java +++ b/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java @@ -322,7 +322,7 @@ public URI toURI() public static class TestHomeDirectory extends TestWatcher { - Map env = new HashMap(); + Map env = new HashMap<>(); String userHome; @Override @@ -444,7 +444,7 @@ public boolean equals(Object obj) private static final long serialVersionUID = 201404091805L; } - transient HashMap store = new HashMap(); + transient HashMap store = new HashMap<>(); @Override public synchronized void save(Object object, int operatorId, long windowId) throws IOException @@ -467,7 +467,7 @@ public synchronized void delete(int operatorId, long windowId) throws IOExceptio @Override public synchronized long[] getWindowIds(int operatorId) throws IOException { - ArrayList windowIds = new ArrayList(); + ArrayList windowIds = new ArrayList<>(); for (OperatorWindowIdPair key : store.keySet()) { if (key.operatorId == operatorId) { windowIds.add(key.windowId); diff --git a/engine/src/test/java/com/datatorrent/stram/util/StablePriorityQueueTest.java b/engine/src/test/java/com/datatorrent/stram/util/StablePriorityQueueTest.java index 5bc70dcc1d..9d12fdc00b 100644 --- a/engine/src/test/java/com/datatorrent/stram/util/StablePriorityQueueTest.java +++ b/engine/src/test/java/com/datatorrent/stram/util/StablePriorityQueueTest.java @@ -65,7 +65,7 @@ public void tearDown() @Test public void testElement() { - StablePriorityQueue instance = new StablePriorityQueue(1); + StablePriorityQueue instance = new StablePriorityQueue<>(1); Integer i = 10; instance.add(i); Object result = instance.element(); @@ -78,7 +78,7 @@ public void testElement() @Test public void testOffer() { - StablePriorityQueue instance = new StablePriorityQueue(1); + StablePriorityQueue instance = new StablePriorityQueue<>(1); Integer i = 10; assertTrue(instance.offer(i)); Object result = instance.peek(); diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java index 9ac28c0a35..e1fb860e79 100644 --- a/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java +++ b/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java @@ -109,9 +109,9 @@ public void process(Map tuple) }; @OutputPortFieldAnnotation(optional = false, error = true) - public final transient DefaultOutputPort output = new DefaultOutputPort(); + public final transient DefaultOutputPort output = new DefaultOutputPort<>(); - public final transient DefaultOutputPort output1 = new DefaultOutputPort(); + public final transient DefaultOutputPort output1 = new DefaultOutputPort<>(); @Override public String getName() @@ -417,7 +417,7 @@ public static String[] getClassFileInClasspath() { String classpath = System.getProperty("java.class.path"); String[] paths = classpath.split(":"); - List fnames = new LinkedList(); + List fnames = new LinkedList<>(); for (String cp : paths) { File f = new File(cp); if (!f.isDirectory()) { @@ -480,7 +480,7 @@ public void testFindDescendants() throws Exception @Test public void testValueSerialization() throws Exception { - TestOperator> bean = new TestOperator>(); + TestOperator> bean = new TestOperator<>(); bean.map.put("key1", new Structured()); bean.stringArray = new String[]{"one", "two", "three"}; bean.stringList = Lists.newArrayList("four", "five"); @@ -491,7 +491,7 @@ public void testValueSerialization() throws Exception bean.structuredArray[0].name = "s1"; bean.color = Color.BLUE; bean.booleanProp = true; - bean.nestedList = new LinkedList(); + bean.nestedList = new LinkedList<>(); Structured st = new Structured(); st.name = "nestedone"; st.size = 10; @@ -640,15 +640,15 @@ public static class TestOperator> extends BaseO private List nestedList; private Properties props; private Structured nested; - private Map map = new HashMap(); + private Map map = new HashMap<>(); private String[] stringArray; private Color color; private Structured[] structuredArray; private T[] genericArray; - private Map>> nestedParameterizedType = new HashMap>>(); + private Map>> nestedParameterizedType = new HashMap<>(); private Map wildcardType = new HashMap(); - private List listofIntArray = new LinkedList(); - private List parameterizedTypeVariable = new LinkedList(); + private List listofIntArray = new LinkedList<>(); + private List parameterizedTypeVariable = new LinkedList<>(); private Z genericType; private int[][] multiDimensionPrimitiveArray; private Structured[][] multiDimensionComplexArray; @@ -1056,7 +1056,7 @@ public void testArraySerialization() throws Exception @Test public void testLogicalPlanConfiguration() throws Exception { - TestOperator> bean = new InputTestOperator>(); + TestOperator> bean = new InputTestOperator<>(); bean.map.put("key1", new Structured()); bean.stringArray = new String[]{"one", "two", "three"}; bean.stringList = Lists.newArrayList("four", "five"); @@ -1113,12 +1113,12 @@ public void testLogicalPlanConfiguration() throws Exception public static class SchemaRequiredOperator extends BaseOperator implements InputOperator { @OutputPortFieldAnnotation(schemaRequired = true) - public final transient DefaultOutputPort output = new DefaultOutputPort(); + public final transient DefaultOutputPort output = new DefaultOutputPort<>(); @OutputPortFieldAnnotation(schemaRequired = false) - public final transient DefaultOutputPort output1 = new DefaultOutputPort(); + public final transient DefaultOutputPort output1 = new DefaultOutputPort<>(); - public final transient DefaultOutputPort output2 = new DefaultOutputPort(); + public final transient DefaultOutputPort output2 = new DefaultOutputPort<>(); @Override public void emitTuples() diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java index 9f414dc14c..76fa963201 100644 --- a/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java +++ b/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java @@ -113,7 +113,7 @@ public FutureTask logicalPlanModification(final List lastRequests = requests; // delegate processing to dispatch thread - FutureTask future = new FutureTask(new Callable() + FutureTask future = new FutureTask<>(new Callable() { @Override public Object call() throws Exception @@ -351,7 +351,7 @@ public void testAttributes() throws Exception @Test public void testSubmitLogicalPlanChange() throws JSONException, Exception { - List requests = new ArrayList(); + List requests = new ArrayList<>(); WebResource r = resource(); CreateOperatorRequest request1 = new CreateOperatorRequest(); @@ -366,7 +366,7 @@ public void testSubmitLogicalPlanChange() throws JSONException, Exception requests.add(request2); ObjectMapper mapper = new ObjectMapper(); - final Map m = new HashMap(); + final Map m = new HashMap<>(); m.put("requests", requests); final JSONObject jsonRequest = new JSONObject(mapper.writeValueAsString(m)); diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/TypeDiscoveryTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/TypeDiscoveryTest.java index 09b8785262..8674e9f348 100644 --- a/engine/src/test/java/com/datatorrent/stram/webapp/TypeDiscoveryTest.java +++ b/engine/src/test/java/com/datatorrent/stram/webapp/TypeDiscoveryTest.java @@ -87,10 +87,10 @@ public void process(T1 tuple) { } }; - final OutputPort outportT2 = new DefaultOutputPort(); - final OutputPort outportNumberParam = new DefaultOutputPort(); + final OutputPort outportT2 = new DefaultOutputPort<>(); + final OutputPort outportNumberParam = new DefaultOutputPort<>(); final StringOutputPort outportString = new StringOutputPort(this); - final OutputPort> outportList = new DefaultOutputPort>(); + final OutputPort> outportList = new DefaultOutputPort<>(); final GenericSubClassOutputPort outClassObject = new GenericSubClassOutputPort(this); } @@ -107,8 +107,8 @@ public void process(String tuple) { } }; - final OutputPort> outportT2 = new DefaultOutputPort>(); - final OutputPort outportNumberParam = new DefaultOutputPort(); + final OutputPort> outportT2 = new DefaultOutputPort<>(); + final OutputPort outportNumberParam = new DefaultOutputPort<>(); final StringOutputPort outportString = new StringOutputPort(this); } @@ -163,7 +163,7 @@ public void testTypeDiscovery() throws Exception static class ParameterizedTypeOperator extends BaseOperator { - final OutputPort output = new DefaultOutputPort(); + final OutputPort output = new DefaultOutputPort<>(); } static class StringParameterOperator extends ParameterizedTypeOperator