Skip to content

Commit

Permalink
Fix: OsmPbfReader shuts down cleanly if bad input file
Browse files Browse the repository at this point in the history
  • Loading branch information
clarisma committed Jan 17, 2023
1 parent 2245dc3 commit 073ed64
Showing 1 changed file with 36 additions and 22 deletions.
58 changes: 36 additions & 22 deletions src/main/java/com/geodesk/io/osm/OsmPbfReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public class OsmPbfReader

protected static void log(String s)
{
Log.debug("%s: %s", Thread.currentThread().getName(), s);
// Log.debug("%s: %s", Thread.currentThread().getName(), s);
}

protected long fileSize()
Expand Down Expand Up @@ -177,13 +177,8 @@ public String name()
protected void fail(Throwable ex)
{
/*
synchronized (log)
{
// TODO: remove
log.error("Thread {} failed: {}",
Thread.currentThread().getName(), ex.getMessage());
ex.printStackTrace();
}
Log.debug("%s: Failing with exception: %s",
Thread.currentThread().getName(), ex.getMessage());
*/
error = ex;
inputThread.interrupt();
Expand Down Expand Up @@ -253,6 +248,10 @@ private class InputThread extends Thread
int headerLength = (headerLengthBuffer[0] << 24) |
(headerLengthBuffer[1] << 16) | (headerLengthBuffer[2] << 8) |
headerLengthBuffer[3];
if(headerLength < 8 || headerLength > 64 * 1024)
{
throw new PbfException("Invalid OSM-PBF header block.");
}
byte[] headerData = new byte[headerLength];
if (in.read(headerData) != headerLength)
{
Expand Down Expand Up @@ -282,7 +281,7 @@ private class InputThread extends Thread
{
block.data = new byte[dataLength];
}
catch(OutOfMemoryError ex)
catch(Throwable ex)
{
throw new RuntimeException(
String.format("Failed to allocate block data (%d bytes)", dataLength, ex));
Expand Down Expand Up @@ -388,14 +387,6 @@ protected int currentBlockLength()
*/
protected void header(HeaderData hd)
{
/*
Log.debug("Source: %s", hd.source);
Log.debug("Writing Program: %s", hd.writingProgram);
Log.debug("Required features:");
for(String s: hd.requiredFeatures) Log.debug("- %s", s);
Log.debug("Optional features:");
for(String s: hd.optionalFeatures) Log.debug("- %s", s);
*/
}

/**
Expand Down Expand Up @@ -513,10 +504,10 @@ private void switchPhase(int newPhase) throws InterruptedException
if(newPhase == currentPhase) return;

/*
log(String.format("Switching phase from %s to %s...",
phases[currentPhase].name(),
newPhase == PHASE_DONE ? "DONE" : phases[newPhase].name()));
*/
Log.debug("%s: Switching phase from %s to %s...",
Thread.currentThread().getName(), phases[currentPhase].name(),
newPhase == PHASE_DONE ? "DONE" : phases[newPhase].name());
*/

if(newPhase < currentPhase)
{
Expand Down Expand Up @@ -569,11 +560,15 @@ private void switchPhase(int newPhase) throws InterruptedException
}
try
{
// log("Waiting for other threads to finish phase " + phases[newPhase-1].name());
/*
Log.debug("%s: Waiting for other threads to finish phase %s",
Thread.currentThread().getName(), phases[newPhase-1].name());
*/
phases[newPhase-1].await();
}
catch (InterruptedException ex)
{
// Log.debug("%s interrupted", Thread.currentThread().getName());
throw ex;
}
currentPhase = newPhase;
Expand All @@ -594,6 +589,23 @@ private void switchPhase(int newPhase) throws InterruptedException
}
}

/**
* Marks all phases as complete, without invoking any processing (and
* without waiting for the other threads to complete their phases).
* Used to cleanly shut down in the event of an unrecoverable error.
*/
private void cancelPhases()
{
while(currentPhase < PHASE_DONE)
{
/*
Log.debug("%s: Cancelling phase %s",
Thread.currentThread().getName(), phases[currentPhase].name());
*/
phases[currentPhase++].countDown();
}
}

private byte[] inflate(byte[] data, int start, int len, int uncompressedLen) throws DataFormatException
{
Inflater unzip = new Inflater();
Expand Down Expand Up @@ -628,10 +640,12 @@ private byte[] inflate(byte[] data, int start, int len, int uncompressedLen) thr
}
catch (InterruptedException ex)
{
cancelPhases();
break; // stop processing
}
catch (Throwable ex)
{
cancelPhases();
fail(ex); // everything else is a real error
}
}
Expand Down

0 comments on commit 073ed64

Please sign in to comment.