Skip to content

Commit

Permalink
Fix #22471: NPE in MapillaryExportDownloadThread.loadingFinished
Browse files Browse the repository at this point in the history
This was due to killing old download threads as quickly as possible, to
avoid filling up the queue and making a user wait.

Signed-off-by: Taylor Smock <tsmock@meta.com>
  • Loading branch information
tsmock committed Oct 27, 2022
1 parent 5f8588b commit 7c5c79f
Show file tree
Hide file tree
Showing 8 changed files with 361 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,23 @@ public static void downloadPicture(INode img, MapillaryCache.Type type) {
* The listener that is going to receive the picture.
*/
public static void submit(INode image, MapillaryCache.Type type, ICachedLoaderListener lis) {
submit(image, type, true, lis);
}

/**
* Requests the picture with the given key and quality and uses the given
* listener.
*
* @param image
* The picture to be requested.
* @param lis
* The listener that is going to receive the picture.
*/
public static void submit(INode image, MapillaryCache.Type type, boolean removeCurrent, ICachedLoaderListener lis) {
try {
final MapillaryCache cache = new MapillaryCache(image, type);
if (cache.getUrl() != null) {
cache.submit(lis != null ? lis : IGNORE_DOWNLOAD, false);
cache.submit(lis != null ? lis : IGNORE_DOWNLOAD, false, removeCurrent);
} else {
Logging.error("Mapillary: {0} has no url. Maybe API limits have been reached?",
MapillaryImageUtils.getKey(image));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,15 +243,27 @@ protected BufferedImageCacheEntry createCacheEntry(byte[] content) {
return new BufferedImageCacheEntry(content);
}

@Override
public void submit(ICachedLoaderListener listener, boolean force) throws IOException {
/**
* Submit a new task
*
* @param listener The listener to notify
* @param force {@code true} if the load should skip all caches
* @param removeCurrent {@code true} if any outstanding tasks should be canceled
* @throws IOException If something happens during fetch ore read
*/
public void submit(ICachedLoaderListener listener, boolean force, boolean removeCurrent) throws IOException {
// Clear the queue for larger images
if (this.type == Type.ORIGINAL || this.type == Type.THUMB_2048) {
if (removeCurrent && (this.type == Type.ORIGINAL || this.type == Type.THUMB_2048)) {
this.cancelOutstandingTasks();
}
super.submit(listener, force);
}

@Override
public void submit(ICachedLoaderListener listener, boolean force) throws IOException {
this.submit(listener, force, true);
}

@Override
protected boolean isObjectLoadable() {
if (this.cacheData == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import javax.imageio.ImageIO;

import org.openstreetmap.josm.data.cache.BufferedImageCacheEntry;
import org.openstreetmap.josm.data.cache.CacheEntry;
import org.openstreetmap.josm.data.cache.CacheEntryAttributes;
import org.openstreetmap.josm.data.cache.ICachedLoaderListener;
Expand All @@ -29,15 +31,19 @@
* @see MapillaryExportWriterThread
*/
public class MapillaryExportDownloadThread implements Runnable, ICachedLoaderListener {
private static final AtomicInteger THREAD_COUNT = new AtomicInteger();

private final ArrayBlockingQueue<BufferedImage> queue;
private final ArrayBlockingQueue<INode> queueImages;

private final INode image;
private final MapillaryExportWriterThread exportWriterThread;

/**
* Main constructor.
*
* @param exportWriterThread
* The thread to notify of failures ({@code queue}/{@code queueImages} should both be in the thread)
* @param image
* Image to be downloaded.
* @param queue
Expand All @@ -47,17 +53,30 @@ public class MapillaryExportDownloadThread implements Runnable, ICachedLoaderLis
* Queue of {@link INode} objects for the
* {@link MapillaryExportWriterThread}.
*/
public MapillaryExportDownloadThread(INode image, ArrayBlockingQueue<BufferedImage> queue,
ArrayBlockingQueue<INode> queueImages) {
public MapillaryExportDownloadThread(MapillaryExportWriterThread exportWriterThread, INode image,
ArrayBlockingQueue<BufferedImage> queue, ArrayBlockingQueue<INode> queueImages) {
this.queue = queue;
this.image = image;
this.queueImages = queueImages;
this.exportWriterThread = exportWriterThread;
}

@Override
public void run() {
if (MapillaryImageUtils.getKey(this.image) != 0) {
CacheUtils.submit(this.image, MapillaryCache.Type.ORIGINAL, this);
final int threadCount = MapillaryCache.THREAD_LIMIT.get();
synchronized (THREAD_COUNT) {
while (THREAD_COUNT.get() > threadCount - 1) {
try {
THREAD_COUNT.wait(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
THREAD_COUNT.incrementAndGet();
}
CacheUtils.submit(this.image, MapillaryCache.Type.ORIGINAL, false, this);
} else {
throw new UnsupportedOperationException(tr("We cannot export {0}",
image.getInterestingTags().entrySet().stream()
Expand All @@ -68,9 +87,26 @@ public void run() {

@Override
public synchronized void loadingFinished(CacheEntry data, CacheEntryAttributes attributes, LoadResult result) {
THREAD_COUNT.decrementAndGet();
synchronized (THREAD_COUNT) {
THREAD_COUNT.notifyAll();
}
if (result != LoadResult.SUCCESS) {
this.exportWriterThread.decrementSize();
return;
}
try {
this.queue.put(ImageIO.read(new ByteArrayInputStream(data.getContent())));
final BufferedImage bufferedImage;
if (data instanceof BufferedImageCacheEntry) {
bufferedImage = ((BufferedImageCacheEntry) data).getImage();
} else {
bufferedImage = ImageIO.read(new ByteArrayInputStream(data.getContent()));
}
this.queue.put(bufferedImage);
this.queueImages.put(this.image);
synchronized (this.queue) {
this.queue.notifyAll();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Logging.error(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.openstreetmap.josm.data.osm.INode;
import org.openstreetmap.josm.gui.PleaseWaitRunnable;
import org.openstreetmap.josm.gui.progress.swing.PleaseWaitProgressMonitor;
import org.openstreetmap.josm.plugins.mapillary.cache.MapillaryCache;
import org.openstreetmap.josm.plugins.mapillary.utils.MapillaryImageUtils;
import org.openstreetmap.josm.tools.Logging;

Expand All @@ -33,14 +34,14 @@
*/
public class MapillaryExportManager<T extends INode> extends PleaseWaitRunnable {

private final ArrayBlockingQueue<BufferedImage> queue = new ArrayBlockingQueue<>(10);
private final ArrayBlockingQueue<INode> queueImages = new ArrayBlockingQueue<>(10);
private final ArrayBlockingQueue<BufferedImage> queue = new ArrayBlockingQueue<>(MapillaryCache.THREAD_LIMIT.get());
private final ArrayBlockingQueue<INode> queueImages = new ArrayBlockingQueue<>(MapillaryCache.THREAD_LIMIT.get());

private final int amount;
private final Set<T> images;
private final String path;

private Thread writer;
private MapillaryExportWriterThread writer;
private ThreadPoolExecutor ex;

/**
Expand Down Expand Up @@ -77,25 +78,30 @@ protected void realRun() {
}
return;
}
ArrayBlockingQueue<Runnable> executionQueue = new ArrayBlockingQueue<>(10);
this.ex = new ThreadPoolExecutor(20, 35, 25, TimeUnit.SECONDS, executionQueue);
ArrayBlockingQueue<Runnable> executionQueue = new ArrayBlockingQueue<>(MapillaryCache.THREAD_LIMIT.get());
this.ex = new ThreadPoolExecutor(1, 1, 25, TimeUnit.SECONDS, executionQueue);
for (INode image : this.images) {
if (MapillaryImageUtils.isImage(image)) {
synchronized (this) {
while (this.ex.getQueue().remainingCapacity() == 0) {
try {
this.wait(1000);
this.wait(10);
} catch (InterruptedException e) {
Logging.error(e);
Thread.currentThread().interrupt();
return;
}
}
}
try {
this.ex.execute(new MapillaryExportDownloadThread(image, this.queue, this.queueImages));
this.ex
.execute(new MapillaryExportDownloadThread(this.writer, image, this.queue, this.queueImages));
} catch (RejectedExecutionException e) {
Logging.error(e);
}
} else {
// We need to ensure that the writer thread gets the number of "images" expected.
this.writer.decrementSize();
}
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ public class MapillaryExportWriterThread extends Thread {
private final String path;
private final ArrayBlockingQueue<BufferedImage> queue;
private final ArrayBlockingQueue<INode> queueImages;
private final int amount;
private final ProgressMonitor monitor;
private int amount;
private int written;

/**
* Main constructor.
Expand Down Expand Up @@ -74,6 +75,19 @@ public void run() {
INode mimg;
String finalPath;
for (int i = 0; i < this.amount; i++) {
while (this.queue.peek() == null) {
if (this.amount == this.written) {
return;
}
synchronized (this.queue) {
try {
this.queue.wait(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
try {
img = this.queue.take();
mimg = this.queueImages.take();
Expand Down Expand Up @@ -138,8 +152,12 @@ public void run() {
Logging.info("Unable to set last modified time for {0} to {1}", file,
MapillaryImageUtils.getDate(mimg));
}
this.written++;
} catch (InterruptedException e) {
Logging.info("Mapillary export cancelled");
if (this.written != this.amount) {
Logging.info("Mapillary export cancelled");
Logging.trace(e);
}
Thread.currentThread().interrupt();
return;
} catch (IOException | ImageReadException | ImageWriteException e) {
Expand All @@ -151,4 +169,16 @@ public void run() {
this.monitor.setCustomText("Downloaded " + (i + 1) + "/" + this.amount);
}
}

/**
* Called when the size is decreased
*/
public void decrementSize() {
this.amount = this.amount - 1;
if (this.amount == this.written) {
synchronized (this.queue) {
this.queue.notifyAll();
}
}
}
}

0 comments on commit 7c5c79f

Please sign in to comment.