Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARCRecord entered inconsistent state for some ARC files #40

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion src/main/java/org/archive/io/arc/ARCReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ protected ARCRecord createArchiveRecord(InputStream is, long offset)
}
return (ARCRecord)getCurrentRecord();
}

/**
* Returns version of this ARC file. Usually read from first record of ARC.
* If we're reading without having first read the first record -- e.g.
Expand Down
20 changes: 12 additions & 8 deletions src/main/java/org/archive/io/arc/ARCRecord.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public class ARCRecord extends ArchiveRecord implements ARCConstants {
public String getHeaderString() {
return this.headerString;
}

/**
* Constructor.
*
Expand Down Expand Up @@ -233,15 +233,15 @@ public ARCRecord(InputStream in, final String identifier,
this(in, identifier, offset, digest, strict, parseHttpHeaders,
false, null);
}

private ArchiveRecordHeader parseHeaders(final InputStream in,
final String identifier, final long offset, final boolean strict,
final boolean isAlignedOnFirstRecord, String version)
throws IOException {

ArrayList<String> firstLineValues = new ArrayList<String>(20);
getTokenizedHeaderLine(in, firstLineValues);

int bodyOffset = 0;
if (offset == 0 && isAlignedOnFirstRecord) {
// If offset is zero and we were aligned at first record on
Expand Down Expand Up @@ -343,7 +343,7 @@ private int getTokenizedHeaderLine(final InputStream stream,

// save verbatim header String
this.headerString = StringUtils.join(list," ");

return read;
}

Expand Down Expand Up @@ -569,7 +569,6 @@ private InputStream readHttpHeader() throws IOException {
getHeader().getLength() <= MIN_HTTP_HEADER_LENGTH) {
return null;
}

String statusLine;
byte[] statusBytes;
int eolCharCount = 0;
Expand Down Expand Up @@ -597,14 +596,19 @@ private InputStream readHttpHeader() throws IOException {

// If it's actually the status line, break, otherwise continue skipping any
// previous header values
if (!statusLine.contains(":") && StatusLine.startsWithHTTP(statusLine)) {
// Old code contained {@code !statusLine.contains(":")}, which conflicts with RFC2616-sec6
if (StatusLine.startsWithHTTP(statusLine)) {
break;
}


if (statusLine.replace("\r", "").isEmpty()) { // No more header lines
break;
}

// Add bytes read to error "offset" to add to position
errOffset += statusBytes.length;
}

if (errOffset > 0) {
this.incrementPosition(errOffset);
}
Expand Down
222 changes: 222 additions & 0 deletions src/test/java/org/archive/io/SubInputStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/*
* $Header: $
* $Revision$
* $Date$
*
* ====================================================================
*
* Copyright 1999-2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
/*
*
*/
package org.archive.io;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;

/**
* Encapsulates another stream, keeping track of local as well as global offsets.
* The SubStream has a max size and close() ensures that the encapsulated Stream
* is fast-forwarded to that point.
* </p><p>
* Note: Calling close on the SubInputStream does not close the wrapped stream.
* Note 2: This implementation relies on the wrapped InputStream not to return 0
* in {@link java.io.InputStream#available()} until that stream has been depleted.
*/
public class SubInputStream extends InputStream {
private final InputStream inner;
private final long globalPosOrigo;

private long length;
private long mark = -1;
private long pos = 0;

/**
* Wraps the inner Stream with no max on bytes read. Reduces functionality to tracking position.
* </p><p>
* Note: The length can be specified later with {@link #setLength(long)}.
* @param inner data source.
*/
public SubInputStream(InputStream inner) {
this(inner, Long.MAX_VALUE, 0);
}
/**
* Wraps the inner InputStream with a max on bytes read.
* @param inner data source.
* @param length the number of bytes that at a maximum can be read from inner.
*/
public SubInputStream(InputStream inner, long length) {
this(inner, length, 0);
}
/**
* Wraps the inner InputStream with a max on bytes read.
* @param inner data source.
* @param length the number of bytes that at a maximum can be read from inner.
* @param globalPosition the position in the inner stream.
*/
public SubInputStream(InputStream inner, long length, long globalPosition) {
this.inner = inner;
globalPosOrigo = globalPosition;
this.length = length;
}

/**
* @return the position from the virtual stream.
*/
public long getPosition() {
return pos;
}

/**
* @return the position in the wrapped stream, if the starting point was stated during construction.
*/
public long getGlobalPosition() {
return globalPosOrigo + pos;
}

public void setLength(long length) {
if (length <= pos) {
throw new IllegalStateException(
"The position is " + pos + " which is past the allowed virtual length " + length);
}
this.length = length;
}

public long getLength() {
return length;
}

/**
* Reads to the next '\n' and returns the line as an UTF-8 string, excluding trailing
* carriage returns {@code '\r'} and newlines {@code '\n'}.
* @return the next line or null if EOF.
* @throws IOException if there was a problem reading bytes from inner.
*/
public String readLine() throws IOException {
byte[] bytes = readLineBytes();
if (bytes == null) {
return null;
}
int length = bytes.length;
while (length > 0 && (bytes[length-1] == '\n' || bytes[length-1] == '\r')) {
length--;
}
return new String(bytes, 0, length, "utf-8");
}
/**
* Reads to the next '\n' and returns the line as raw bytes, including the delimiting '\n'.
* @return the next line.
* @throws IOException if there was a problem reading bytes from inner.
*/
public byte[] readLineBytes() throws IOException {
ByteArrayOutputStream by = new ByteArrayOutputStream();
int b;
while ((b = read()) != -1) {
by.write(b);
if (b == '\n') {
break;
}
}
return by.size() == 0 && b == -1 ? null : by.toByteArray();
}

/* Delegates from inner InputStream */

@Override
public int read() throws IOException {
if (available() == 0) {
return -1;
}
int c = inner.read();
if (c != -1) {
pos++;
}
return c;
}

@Override
public int read(byte[] b) throws IOException {
return read(b, 0, b.length);
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
if (len == 0) {
return 0;
}
if (available() == 0) {
return -1;
}
len = Math.min(len, available());
int r = inner.read(b, off, len);
if (r != -1) { // EOF
pos += r;
}
return r;
}

@Override
public long skip(long n) throws IOException {
n = Math.min(n, available());
if (n <= 0) {
return 0;
}
long s = inner.skip(n);
pos += s;
return s;
}

@Override
public int available() throws IOException {
return (int) Math.min(inner.available(), length - pos);
}

@SuppressWarnings("ResultOfMethodCallIgnored")
@Override
public void close() throws IOException {
if (pos < length) {
skip(length - pos);
}
}

@Override
public void mark(int readlimit) {
mark = pos;
inner.mark(readlimit);
}

@Override
public void reset() throws IOException {
if (mark == -1) {
throw new IOException("A mark must be set before reset is called");
}
inner.reset();
pos = mark;
}

@Override
public boolean markSupported() {
return inner.markSupported();
}
}
72 changes: 66 additions & 6 deletions src/test/java/org/archive/io/arc/ARCReaderFactoryTest.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package org.archive.io.arc;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.io.*;
import java.net.URL;
import java.util.List;

import org.archive.io.ArchiveReader;
import org.archive.io.ArchiveRecord;

import junit.framework.TestCase;
import org.archive.io.SubInputStream;

/**
*
Expand All @@ -21,6 +20,9 @@
public class ARCReaderFactoryTest extends TestCase {

private File testfile1 = new File("src/test/resources/org/archive/format/arc/IAH-20080430204825-00000-blackbook-truncated.arc");
//private File testfile_nl = new File("src/test/resources/org/archive/format/arc/137542-153-20111129020925-00316-kb-prod-har-003.kb.dk_truncated.arc");
private File testfile_nl = getResource(
"org/archive/format/arc/137542-153-20111129020925-00316-kb-prod-har-003.kb.dk_truncated.arc");

/**
* Test reading uncompressed arcfile for issue
Expand Down Expand Up @@ -53,5 +55,63 @@ private void offsetResourceTest( File testfile, long offset, String uri ) throws
if( raf != null )
raf.close();
}


public void testBaseSampleARC() throws IOException {
testARCReaderIteration(testfile1, 9, 7);
}
/*
This failed with the old http-header parsing code in {@code ARCRecord#readHttpHeader}.
*/
public void testNewlinedSampleARC() throws IOException {
testARCReaderIteration(testfile_nl, 4, 3); // Status has 2*200 & 1*404
}

// Independent of the ARCReader code
public void testBaseSampleIntegrity() throws IOException {
List<String> urls = ARCTestHelper.getURLs(testfile1);
assertEquals("The correct number of URLs should be extracted", 9, urls.size());
}
public void testVerifyNewlinedSampleIntegrity() throws IOException {
List<String> urls = ARCTestHelper.getURLs(testfile_nl);
assertEquals("The correct number of URLs should be extracted", 4, urls.size());
}

public void testNewlinedSampleARCContentLength() throws IOException {
ARCTestHelper.testARCContentLength(testfile_nl);
}
public void testBaseSampleARCContentLength() throws IOException {
ARCTestHelper.testARCContentLength(testfile1);
}
// public void testLocalSampleARCContentLength() throws IOException {
// ARCTestHelper.testARCContentLength(
// new File("/home/te/tmp/warc/137542-153-20111129020925-00316-kb-prod-har-003.kb.dk.arc"));
// }

// Uncomment println for manual inspection of first content line
private void testARCReaderIteration(File arc, int expectedRecords, int hasStatus) throws IOException {
ARCReader reader = ARCReaderFactory.get(arc);
int recordCount = 0;
int okCount = 0;
for (ArchiveRecord record : reader) {
if (((ARCRecord)record).getStatusCode() != -1) {
okCount++;
}
SubInputStream sub = new SubInputStream(record);
sub.skip(record.getHeader().getContentBegin());
//System.out.println(record.getPosition() + "> " + sub.readLine());
sub.close();
recordCount++;
}
reader.close();
assertEquals("There should be the right number of records in " + arc, expectedRecords, recordCount);
assertEquals("There should be the right number of status 200 records in " + arc, hasStatus, okCount);
}

private static File getResource(String resource) {
URL url = Thread.currentThread().getContextClassLoader().getResource(resource);
if (url == null) {
throw new RuntimeException("The resource '" + resource + "' could not be located in the class path");
}
return new File(url.getFile());
}
}
Loading