Permalink
Browse files

Cache uses linkedlists instead of arrays + fixed concurrency issue

  • Loading branch information...
jnioche committed Mar 14, 2017
1 parent 83fa3e3 commit 9fefac8721d5e1e5bbfe762a785d8e53dbdc7e5f
@@ -21,6 +21,8 @@
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -58,7 +60,7 @@
**/
@SuppressWarnings("serial")
public class StatusUpdaterBolt extends AbstractStatusUpdaterBolt implements
RemovalListener<String, Tuple[]>, BulkProcessor.Listener {
RemovalListener<String, List<Tuple>>, BulkProcessor.Listener {
private static final Logger LOG = LoggerFactory
.getLogger(StatusUpdaterBolt.class);
@@ -88,7 +90,7 @@
private ElasticSearchConnection connection;
private Cache<String, Tuple[]> waitAck;
private Cache<String, List<Tuple>> waitAck;
private MultiCountMetric eventCounter;
@@ -164,13 +166,20 @@ public void store(String url, Status status, Metadata metadata,
// without having been sent to ES
synchronized (waitAck) {
// check that the same URL is not being sent to ES
if (waitAck.getIfPresent(sha256hex) != null) {
List<Tuple> alreadySent = waitAck.getIfPresent(sha256hex);
if (alreadySent != null) {
// if this object is discovered - adding another version of it
// won't make any difference
LOG.debug(
"Already being sent to ES {} with status {} and ID {}",
url, status, sha256hex);
if (status.equals(Status.DISCOVERED)) {
// done to prevent concurrency issues
// the ack method could have been called
// after the entries from waitack were
// purged which can lead to entries being added straight to
// waitack even if nothing was sent to ES
metadata.setValue("es.status.skipped.sending", "true");
return;
}
}
@@ -242,26 +251,30 @@ public void ack(Tuple t, String url) {
synchronized (waitAck) {
String sha256hex = org.apache.commons.codec.digest.DigestUtils
.sha256Hex(url);
Tuple[] tt = waitAck.getIfPresent(sha256hex);
List<Tuple> tt = waitAck.getIfPresent(sha256hex);
if (tt == null) {
tt = new Tuple[] { t };
} else {
Tuple[] tt2 = new Tuple[tt.length + 1];
System.arraycopy(tt, 0, tt2, 0, tt.length);
tt2[tt.length] = t;
tt = tt2;
// check that there has been no removal of the entry since
Metadata metadata = (Metadata) t.getValueByField("metadata");
if (metadata.getFirstValue("es.status.skipped.sending") != null) {
LOG.debug(
"Indexing skipped for {} with ID {} but key removed since",
url, sha256hex);
return;
}
tt = new LinkedList<>();
}
tt.add(t);
waitAck.put(sha256hex, tt);
LOG.debug("Added to waitAck {} with ID {} total {}", url,
sha256hex, tt.length);
sha256hex, tt.size());
}
}
public void onRemoval(RemovalNotification<String, Tuple[]> removal) {
public void onRemoval(RemovalNotification<String, List<Tuple>> removal) {
if (!removal.wasEvicted())
return;
LOG.error("Purged from waitAck {} with {} values", removal.getKey(),
removal.getValue().length);
removal.getValue().size());
for (Tuple t : removal.getValue()) {
_collector.fail(t);
}
@@ -283,9 +296,9 @@ public void afterBulk(long executionId, BulkRequest request,
BulkItemResponse bir = bulkitemiterator.next();
itemcount++;
String id = bir.getId();
Tuple[] xx = waitAck.getIfPresent(id);
List<Tuple> xx = waitAck.getIfPresent(id);
if (xx != null) {
LOG.debug("Acked {} tuple(s) for ID {}", xx.length, id);
LOG.debug("Acked {} tuple(s) for ID {}", xx.size(), id);
for (Tuple x : xx) {
acked++;
// ack and put in cache
@@ -322,9 +335,9 @@ public void afterBulk(long executionId, BulkRequest request,
while (itreq.hasNext()) {
DocWriteRequest bir = itreq.next();
String id = bir.id();
Tuple[] xx = waitAck.getIfPresent(id);
List<Tuple> xx = waitAck.getIfPresent(id);
if (xx != null) {
LOG.debug("Failed {} tuple(s) for ID {}", xx.length, id);
LOG.debug("Failed {} tuple(s) for ID {}", xx.size(), id);
for (Tuple x : xx) {
// fail it
_collector.fail(x);

0 comments on commit 9fefac8

Please sign in to comment.