Skip to content

Commit

Permalink
Refactoring to use CollectionUtils.mapValues (#8059)
Browse files Browse the repository at this point in the history
* doc updates and changes to use the CollectionUtils.mapValues utility method

* Add Structural Search patterns to intelliJ

* refactoring from PR comments

* put -> putIfAbsent

* do single key lookup
  • Loading branch information
Surekha authored and clintropolis committed Jul 18, 2019
1 parent 2d6d1c1 commit da16144
Show file tree
Hide file tree
Showing 12 changed files with 78 additions and 63 deletions.
22 changes: 22 additions & 0 deletions .idea/inspectionProfiles/Druid.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions core/src/main/java/org/apache/druid/utils/CollectionUtils.java
Expand Up @@ -21,10 +21,12 @@

import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import org.apache.druid.java.util.common.ISE;

import java.util.AbstractCollection;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Spliterator;
Expand Down Expand Up @@ -85,6 +87,8 @@ public static <E> TreeSet<E> newTreeSet(Comparator<? super E> comparator, Iterab
/**
* Returns a transformed map from the given input map where the value is modified based on the given valueMapper
* function.
* Unlike {@link Maps#transformValues}, this method applies the mapping function eagerly to all key-value pairs
* in the source map and returns a new {@link HashMap}, while {@link Maps#transformValues} returns a lazy map view.
*/
public static <K, V, V2> Map<K, V2> mapValues(Map<K, V> map, Function<V, V2> valueMapper)
{
Expand All @@ -93,6 +97,25 @@ public static <K, V, V2> Map<K, V2> mapValues(Map<K, V> map, Function<V, V2> val
return result;
}

/**
* Returns a transformed map from the given input map where the key is modified based on the given keyMapper
* function. This method fails if keys collide after applying the given keyMapper function and
* throws a IllegalStateException.
*
* @throws ISE if key collisions occur while applying specified keyMapper
*/
public static <K, V, K2> Map<K2, V> mapKeys(Map<K, V> map, Function<K, K2> keyMapper)
{
final Map<K2, V> result = Maps.newHashMapWithExpectedSize(map.size());
map.forEach((k, v) -> {
final K2 k2 = keyMapper.apply(k);
if (result.putIfAbsent(k2, v) != null) {
throw new ISE("Conflicting key[%s] calculated via keyMapper for original key[%s]", k2, k);
}
});
return result;
}

private CollectionUtils()
{
}
Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.utils.CircularBuffer;
import org.apache.druid.utils.CollectionUtils;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.TopicPartition;

Expand All @@ -54,7 +55,6 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* Kafka indexing task runner supporting incremental segments publishing
Expand Down Expand Up @@ -163,12 +163,10 @@ private void possiblyResetOffsetsOrWait(
}

if (doReset) {
sendResetRequestAndWait(resetPartitions.entrySet()
.stream()
.collect(Collectors.toMap(x -> StreamPartition.of(
x.getKey().topic(),
x.getKey().partition()
), Map.Entry::getValue)), taskToolbox);
sendResetRequestAndWait(CollectionUtils.mapKeys(resetPartitions, streamPartition -> StreamPartition.of(
streamPartition.topic(),
streamPartition.partition()
)), taskToolbox);
} else {
log.warn("Retrying in %dms", task.getPollRetryMs());
pollRetryLock.lockInterruptibly();
Expand Down
Expand Up @@ -266,6 +266,8 @@ protected List<SeekableStreamIndexTask<Integer, Long>> createIndexTasks(


@Override
// suppress use of CollectionUtils.mapValues() since the valueMapper function is dependent on map key here
@SuppressWarnings("SSBasedInspection")
protected Map<Integer, Long> getLagPerPartition(Map<Integer, Long> currentOffsets)
{
return currentOffsets
Expand Down
Expand Up @@ -45,6 +45,7 @@
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.utils.CollectionUtils;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -621,25 +622,15 @@ private Map<String, Long> getDeltaValues(Map<String, Long> total, Map<String, Lo

public Map<String, Long> getSuccessfulTaskCount()
{
Map<String, Long> total = totalSuccessfulTaskCount.entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> e.getValue().get()
));
Map<String, Long> total = CollectionUtils.mapValues(totalSuccessfulTaskCount, AtomicLong::get);
Map<String, Long> delta = getDeltaValues(total, prevTotalSuccessfulTaskCount);
prevTotalSuccessfulTaskCount = total;
return delta;
}

public Map<String, Long> getFailedTaskCount()
{
Map<String, Long> total = totalFailedTaskCount.entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> e.getValue().get()
));
Map<String, Long> total = CollectionUtils.mapValues(totalFailedTaskCount, AtomicLong::get);
Map<String, Long> delta = getDeltaValues(total, prevTotalFailedTaskCount);
prevTotalFailedTaskCount = total;
return delta;
Expand Down
Expand Up @@ -80,6 +80,7 @@
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CircularBuffer;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -1274,8 +1275,10 @@ protected void sendResetRequestAndWait(
)
throws IOException
{
Map<PartitionIdType, SequenceOffsetType> partitionOffsetMap = outOfRangePartitions
.entrySet().stream().collect(Collectors.toMap(x -> x.getKey().getPartitionId(), Map.Entry::getValue));
Map<PartitionIdType, SequenceOffsetType> partitionOffsetMap = CollectionUtils.mapKeys(
outOfRangePartitions,
StreamPartition::getPartitionId
);

boolean result = taskToolbox
.getTaskActionClient()
Expand Down
Expand Up @@ -35,13 +35,13 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.utils.CollectionUtils;

import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

public class ExpressionPostAggregator implements PostAggregator
{
Expand Down Expand Up @@ -187,12 +187,7 @@ public ExpressionPostAggregator decorate(final Map<String, AggregatorFactory> ag
expression,
ordering,
macroTable,
aggregators.entrySet().stream().collect(
Collectors.toMap(
entry -> entry.getKey(),
entry -> entry.getValue()::finalizeComputation
)
),
CollectionUtils.mapValues(aggregators, aggregatorFactory -> obj -> aggregatorFactory.finalizeComputation(obj)),
parsed,
dependentFields
);
Expand Down
Expand Up @@ -43,6 +43,7 @@
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;

Expand Down Expand Up @@ -689,14 +690,7 @@ WrappedCommitter wrapCommitter(final Committer committer)
)
)
),
snapshot.entrySet()
.stream()
.collect(
Collectors.toMap(
Entry::getKey,
e -> e.getValue().lastSegmentId
)
),
CollectionUtils.mapValues(snapshot, segmentsForSequence -> segmentsForSequence.lastSegmentId),
committer.getMetadata()
);

Expand Down
Expand Up @@ -32,12 +32,11 @@
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.apache.druid.utils.CollectionUtils;

import javax.annotation.Nullable;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
* This class is responsible for managing data sources and their states like timeline, total segment size, and number of
Expand Down Expand Up @@ -115,8 +114,7 @@ Map<String, DataSourceState> getDataSources()
*/
public Map<String, Long> getDataSourceSizes()
{
return dataSources.entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getTotalSegmentSize()));
return CollectionUtils.mapValues(dataSources, SegmentManager.DataSourceState::getTotalSegmentSize);
}

/**
Expand All @@ -127,8 +125,7 @@ public Map<String, Long> getDataSourceSizes()
*/
public Map<String, Long> getDataSourceCounts()
{
return dataSources.entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getNumSegments()));
return CollectionUtils.mapValues(dataSources, SegmentManager.DataSourceState::getNumSegments);
}

public boolean isSegmentCached(final DataSegment segment)
Expand Down
Expand Up @@ -36,7 +36,6 @@
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;

/**
* Contains a representation of the current state of the cluster by tier.
Expand Down Expand Up @@ -69,16 +68,13 @@ private DruidCluster(
)
{
this.realtimes = realtimes == null ? new HashSet<>() : new HashSet<>(realtimes);
this.historicals = historicals
.entrySet()
.stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
(Map.Entry<String, Iterable<ServerHolder>> e) ->
CollectionUtils.newTreeSet(Comparator.reverseOrder(), e.getValue())
)
);
this.historicals = CollectionUtils.mapValues(
historicals,
holders -> CollectionUtils.newTreeSet(
Comparator.reverseOrder(),
holders
)
);
}

public void add(ServerHolder serverHolder)
Expand Down
Expand Up @@ -21,11 +21,11 @@

import com.google.common.base.Preconditions;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CollectionUtils;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class ClusterCostCache
{
Expand Down Expand Up @@ -82,10 +82,7 @@ public void removeServer(String serverName)
public ClusterCostCache build()
{
return new ClusterCostCache(
serversCostCache
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build()))
CollectionUtils.mapValues(serversCostCache, ServerCostCache.Builder::build)
);
}
}
Expand Down
Expand Up @@ -21,10 +21,10 @@

import com.google.common.base.Preconditions;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CollectionUtils;

import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

public class ServerCostCache
{
Expand Down Expand Up @@ -89,10 +89,7 @@ public ServerCostCache build()
{
return new ServerCostCache(
allSegmentsCostCache.build(),
segmentsPerDataSource
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build()))
CollectionUtils.mapValues(segmentsPerDataSource, SegmentsCostCache.Builder::build)
);
}
}
Expand Down

0 comments on commit da16144

Please sign in to comment.