Skip to content

Commit

Permalink
YARN-9040. Fixed memory leak in LevelDBCacheTimelineStore and DBItera…
Browse files Browse the repository at this point in the history
…tor.

            Contributed by Tarun Parimi
  • Loading branch information
macroadster committed Dec 17, 2018
1 parent 346c0c8 commit 71e0b0d
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 59 deletions.
Expand Up @@ -42,14 +42,14 @@
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.SortedSet; import java.util.SortedSet;


import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID; import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID;
import static org.apache.hadoop.yarn.server.timeline.TimelineStoreMapAdapter.CloseableIterator;


/** /**
* Map based implementation of {@link TimelineStore}. A hash map * Map based implementation of {@link TimelineStore}. A hash map
Expand Down Expand Up @@ -114,66 +114,67 @@ public synchronized TimelineEntities getEntities(String entityType, Long limit,
fields = EnumSet.allOf(Field.class); fields = EnumSet.allOf(Field.class);
} }


Iterator<TimelineEntity> entityIterator = null; TimelineEntity firstEntity = null;
if (fromId != null) { if (fromId != null) {
TimelineEntity firstEntity = entities.get(new EntityIdentifier(fromId, firstEntity = entities.get(new EntityIdentifier(fromId,
entityType)); entityType));
if (firstEntity == null) { if (firstEntity == null) {
return new TimelineEntities(); return new TimelineEntities();
} else {
entityIterator = entities.valueSetIterator(firstEntity);
} }
} }
if (entityIterator == null) {
entityIterator = entities.valueSetIterator();
}


List<TimelineEntity> entitiesSelected = new ArrayList<TimelineEntity>(); List<TimelineEntity> entitiesSelected = new ArrayList<TimelineEntity>();
while (entityIterator.hasNext()) {
TimelineEntity entity = entityIterator.next(); try(CloseableIterator<TimelineEntity> entityIterator =
if (entitiesSelected.size() >= limit) { firstEntity == null ? entities.valueSetIterator() :
break; entities.valueSetIterator(firstEntity)) {
} while (entityIterator.hasNext()) {
if (!entity.getEntityType().equals(entityType)) { TimelineEntity entity = entityIterator.next();
continue; if (entitiesSelected.size() >= limit) {
} break;
if (entity.getStartTime() <= windowStart) {
continue;
}
if (entity.getStartTime() > windowEnd) {
continue;
}
if (fromTs != null && entityInsertTimes.get(new EntityIdentifier(
entity.getEntityId(), entity.getEntityType())) > fromTs) {
continue;
}
if (primaryFilter != null &&
!KeyValueBasedTimelineStoreUtils.matchPrimaryFilter(
entity.getPrimaryFilters(), primaryFilter)) {
continue;
}
if (secondaryFilters != null) { // AND logic
boolean flag = true;
for (NameValuePair secondaryFilter : secondaryFilters) {
if (secondaryFilter != null && !KeyValueBasedTimelineStoreUtils
.matchPrimaryFilter(entity.getPrimaryFilters(), secondaryFilter)
&& !KeyValueBasedTimelineStoreUtils.matchFilter(
entity.getOtherInfo(), secondaryFilter)) {
flag = false;
break;
}
} }
if (!flag) { if (!entity.getEntityType().equals(entityType)) {
continue; continue;
} }
} if (entity.getStartTime() <= windowStart) {
if (entity.getDomainId() == null) { continue;
entity.setDomainId(DEFAULT_DOMAIN_ID); }
} if (entity.getStartTime() > windowEnd) {
if (checkAcl == null || checkAcl.check(entity)) { continue;
entitiesSelected.add(entity); }
if (fromTs != null && entityInsertTimes.get(
new EntityIdentifier(entity.getEntityId(), entity.getEntityType()))
> fromTs) {
continue;
}
if (primaryFilter != null && !KeyValueBasedTimelineStoreUtils
.matchPrimaryFilter(entity.getPrimaryFilters(), primaryFilter)) {
continue;
}
if (secondaryFilters != null) { // AND logic
boolean flag = true;
for (NameValuePair secondaryFilter : secondaryFilters) {
if (secondaryFilter != null && !KeyValueBasedTimelineStoreUtils
.matchPrimaryFilter(entity.getPrimaryFilters(), secondaryFilter)
&& !KeyValueBasedTimelineStoreUtils
.matchFilter(entity.getOtherInfo(), secondaryFilter)) {
flag = false;
break;
}
}
if (!flag) {
continue;
}
}
if (entity.getDomainId() == null) {
entity.setDomainId(DEFAULT_DOMAIN_ID);
}
if (checkAcl == null || checkAcl.check(entity)) {
entitiesSelected.add(entity);
}
} }
} }

List<TimelineEntity> entitiesToReturn = new ArrayList<TimelineEntity>(); List<TimelineEntity> entitiesToReturn = new ArrayList<TimelineEntity>();
for (TimelineEntity entitySelected : entitiesSelected) { for (TimelineEntity entitySelected : entitiesSelected) {
entitiesToReturn.add(KeyValueBasedTimelineStoreUtils.maskFields( entitiesToReturn.add(KeyValueBasedTimelineStoreUtils.maskFields(
Expand Down Expand Up @@ -569,6 +570,7 @@ static Object compactNumber(Object o) {
} }
return o; return o;
} }

} }


} }
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;


import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
Expand Down Expand Up @@ -59,26 +60,54 @@ public void remove(K key) {
} }


@Override @Override
public Iterator<V> public CloseableIterator<V>
valueSetIterator() { valueSetIterator() {
return new TreeSet<>(internalMap.values()).iterator(); return wrapClosableIterator(new TreeSet<>(internalMap.values())
.iterator());
} }


@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Iterator<V> valueSetIterator(V minV) { public CloseableIterator<V> valueSetIterator(V minV) {
if (minV instanceof Comparable) { if (minV instanceof Comparable) {
TreeSet<V> tempTreeSet = new TreeSet<>(); TreeSet<V> tempTreeSet = new TreeSet<>();
for (V value : internalMap.values()) { for (V value : internalMap.values()) {
if (((Comparable) value).compareTo(minV) >= 0) { if (((Comparable) value).compareTo(minV) >= 0) {
tempTreeSet.add(value); tempTreeSet.add(value);
} }
} }
return tempTreeSet.iterator(); return wrapClosableIterator(tempTreeSet.iterator());
} else { } else {
return valueSetIterator(); return valueSetIterator();
} }
} }

private CloseableIterator<V> wrapClosableIterator(
final Iterator<V> iterator) {
return new CloseableIterator<V>() {
private final Iterator<V> internalIterator = iterator;
@Override
public void close() throws IOException {
// Not implemented
}

@Override
public boolean hasNext() {
return internalIterator.hasNext();
}

@Override
public V next() {
return internalIterator.next();
}

@Override
public void remove() {
internalIterator.remove();
}
};

}
} }


public MemoryTimelineStore() { public MemoryTimelineStore() {
Expand Down
Expand Up @@ -18,6 +18,7 @@


package org.apache.hadoop.yarn.server.timeline; package org.apache.hadoop.yarn.server.timeline;


import java.io.Closeable;
import java.util.Iterator; import java.util.Iterator;


/** /**
Expand Down Expand Up @@ -48,13 +49,17 @@ interface TimelineStoreMapAdapter<K, V> {
/** /**
* @return the iterator of the value set of the map * @return the iterator of the value set of the map
*/ */
Iterator<V> valueSetIterator(); CloseableIterator<V> valueSetIterator();


/** /**
* Return the iterator of the value set of the map, starting from minV if type * Return the iterator of the value set of the map, starting from minV if type
* V is comparable. * V is comparable.
* @param minV * @param minV
* @return * @return
*/ */
Iterator<V> valueSetIterator(V minV); CloseableIterator<V> valueSetIterator(V minV);

interface CloseableIterator<V> extends Iterator<V>, Closeable {}
} }


Expand Up @@ -37,7 +37,6 @@


import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator;
import java.util.Map; import java.util.Map;


/** /**
Expand Down Expand Up @@ -211,18 +210,18 @@ public void remove(K entityId) {
} }


@Override @Override
public Iterator<V> valueSetIterator() { public CloseableIterator<V> valueSetIterator() {
return getIterator(null, Long.MAX_VALUE); return getIterator(null, Long.MAX_VALUE);
} }


@Override @Override
public Iterator<V> valueSetIterator(V minV) { public CloseableIterator<V> valueSetIterator(V minV) {
return getIterator( return getIterator(
new EntityIdentifier(minV.getEntityId(), minV.getEntityType()), new EntityIdentifier(minV.getEntityId(), minV.getEntityType()),
minV.getStartTime()); minV.getStartTime());
} }


private Iterator<V> getIterator( private CloseableIterator<V> getIterator(
EntityIdentifier startId, long startTimeMax) { EntityIdentifier startId, long startTimeMax) {


final DBIterator internalDbIterator = entityDb.iterator(); final DBIterator internalDbIterator = entityDb.iterator();
Expand All @@ -247,7 +246,7 @@ private Iterator<V> getIterator(
= entityPrefixKeyBuilder.getBytesForLookup(); = entityPrefixKeyBuilder.getBytesForLookup();
internalDbIterator.seek(startPrefixBytes); internalDbIterator.seek(startPrefixBytes);


return new Iterator<V>() { return new CloseableIterator<V>() {
@Override @Override
public boolean hasNext() { public boolean hasNext() {
if (!internalDbIterator.hasNext()) { if (!internalDbIterator.hasNext()) {
Expand Down Expand Up @@ -284,6 +283,11 @@ public void remove() {
LOG.error("LevelDB map adapter does not support iterate-and-remove" LOG.error("LevelDB map adapter does not support iterate-and-remove"
+ " use cases. "); + " use cases. ");
} }

@Override
public void close() throws IOException {
internalDbIterator.close();
}
}; };
} }
static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
Expand Down

0 comments on commit 71e0b0d

Please sign in to comment.