Skip to content

Commit

Permalink
Improve message streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
julienlancelot committed Apr 1, 2015
1 parent aea924c commit 2d67b38
Show file tree
Hide file tree
Showing 9 changed files with 260 additions and 77 deletions.
Expand Up @@ -52,7 +52,9 @@ private void recursivelyProcessComponent(ComputationContext context, int compone
BatchReport.Component component = reportReader.readComponent(componentRef);
if (component.getType().equals(Constants.ComponentType.FILE)) {
Iterable<BatchReport.Coverage> coverageList = reportReader.readFileCoverage(componentRef);
processCoverage(component, coverageList);
if (coverageList != null) {
processCoverage(component, coverageList);
}
}

for (Integer childRef : component.getChildRefList()) {
Expand Down
Expand Up @@ -70,7 +70,7 @@ public void compute_nothing() throws Exception {

step.execute(new ComputationContext(new BatchReportReader(reportDir), mock(ComponentDto.class)));

assertThat(step.getFileSourceData().getLinesList()).isEmpty();
assertThat(step.getFileSourceData()).isNull();
}

@Test
Expand Down
Expand Up @@ -24,8 +24,6 @@
import com.google.protobuf.Parser;

import java.io.*;
import java.util.Iterator;
import java.util.NoSuchElementException;

public class ProtobufUtil {
private ProtobufUtil() {
Expand All @@ -40,53 +38,20 @@ public static <T extends Message> T readFile(File file, Parser<T> parser) {
}
}

public static <M extends Message> Iterable<M> readFileMessages(final File file, final Parser<M> parser) {
return new Iterable<M>() {
@Override
public Iterator<M> iterator() {
try {
return new Iterator<M>() {
final InputStream inputStream = new BufferedInputStream(new FileInputStream(file));

private M currentMessage;

@Override
public boolean hasNext() {
if (currentMessage == null) {
try {
currentMessage = parser.parseDelimitedFrom(inputStream);
if (currentMessage == null) {
inputStream.close();
}
} catch (InvalidProtocolBufferException e) {
throw new IllegalStateException("Failed to read input stream", e);
} catch (IOException e) {
throw new IllegalStateException("Failed to close input stream", e);
}
}
return currentMessage != null;
}

@Override
public M next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
M messageToReturn = currentMessage;
currentMessage = null;
return messageToReturn;
}
static <T extends Message> T readInputStream(InputStream inputStream, Parser<T> parser) {
try {
return parser.parseDelimitedFrom(inputStream);
} catch (InvalidProtocolBufferException e) {
throw new IllegalStateException("Failed to read input stream", e);
}
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
} catch (FileNotFoundException e) {
throw new IllegalStateException("Unable to find file " + file, e);
}
}
};
static InputStream createInputStream(File file) {
try {
return new BufferedInputStream(new FileInputStream(file));
} catch (FileNotFoundException e) {
throw new IllegalStateException("Unable to find file " + file, e);
}
}

public static void writeToFile(Message message, File toFile) {
Expand Down
@@ -0,0 +1,102 @@
/*
* SonarQube, open source software quality management tool.
* Copyright (C) 2008-2014 SonarSource
* mailto:contact AT sonarsource DOT com
*
* SonarQube is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* SonarQube is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package org.sonar.batch.protocol;

import com.google.protobuf.Message;
import com.google.protobuf.Parser;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.NoSuchElementException;

/**
* An object to iterate over protobuf messages in a file.
* A ReportStream is opened upon creation and is closed by invoking the close method.
*
* Warning, while it extends Iterable, it is not a general-purpose Iterable as it supports only a single Iterator;
* invoking the iterator method to obtain a second or subsequent iterator throws IllegalStateException.
*
* Inspired by {@link java.nio.file.DirectoryStream}
*/
public class ReportStream<R extends Message> implements Closeable, Iterable<R> {

private final Parser<R> parser;
private InputStream inputStream;
private ReportIterator<R> iterator;

public ReportStream(File file, Parser<R> parser) {
this.parser = parser;
this.inputStream = ProtobufUtil.createInputStream(file);
}

@Override
public Iterator<R> iterator() {
if (this.iterator != null) {
throw new IllegalStateException("Iterator already obtained");
} else {
this.iterator = new ReportIterator<>(inputStream, parser);
return this.iterator;
}
}

@Override
public void close() throws IOException {
inputStream.close();
}

public static class ReportIterator<R extends Message> implements Iterator<R> {

private final Parser<R> parser;
private InputStream inputStream;
private R currentMessage;

public ReportIterator(InputStream inputStream, Parser<R> parser) {
this.inputStream = inputStream;
this.parser = parser;
}

@Override
public boolean hasNext() {
if (currentMessage == null) {
currentMessage = ProtobufUtil.readInputStream(inputStream, parser);
}
return currentMessage != null;
}

@Override
public R next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
R messageToReturn = currentMessage;
currentMessage = null;
return messageToReturn;
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
}
Expand Up @@ -20,6 +20,7 @@
package org.sonar.batch.protocol.output;

import org.sonar.batch.protocol.ProtobufUtil;
import org.sonar.batch.protocol.ReportStream;
import org.sonar.batch.protocol.output.BatchReport.Issues;

import javax.annotation.CheckForNull;
Expand Down Expand Up @@ -120,12 +121,13 @@ public List<BatchReport.SyntaxHighlighting.HighlightingRule> readComponentSyntax
return Collections.emptyList();
}

public Iterable<BatchReport.Coverage> readFileCoverage(int fileRef) {
@CheckForNull
public ReportStream<BatchReport.Coverage> readFileCoverage(int fileRef) {
File file = fileStructure.fileFor(FileStructure.Domain.COVERAGE, fileRef);
if (doesFileExists(file)) {
return ProtobufUtil.readFileMessages(file, BatchReport.Coverage.PARSER);
return new ReportStream<>(file, BatchReport.Coverage.PARSER);
}
return Collections.emptyList();
return null;
}

private boolean doesFileExists(File file) {
Expand Down
Expand Up @@ -47,7 +47,7 @@ public static enum Domain {

private final File dir;

FileStructure(File dir) {
public FileStructure(File dir) {
if (!dir.exists() || !dir.isDirectory()) {
throw new IllegalArgumentException("Directory of analysis report does not exist: " + dir);
}
Expand Down
@@ -0,0 +1,104 @@
/*
* SonarQube, open source software quality management tool.
* Copyright (C) 2008-2014 SonarSource
* mailto:contact AT sonarsource DOT com
*
* SonarQube is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* SonarQube is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package org.sonar.batch.protocol;

import org.apache.commons.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.sonar.batch.protocol.output.BatchReport;
import org.sonar.batch.protocol.output.BatchReportWriter;
import org.sonar.batch.protocol.output.FileStructure;

import java.io.File;
import java.util.Arrays;
import java.util.Iterator;
import java.util.NoSuchElementException;

import static org.assertj.core.api.Assertions.assertThat;

public class ReportStreamTest {

@Rule
public TemporaryFolder temp = new TemporaryFolder();

File file;

ReportStream<BatchReport.Coverage> sut;

@Before
public void setUp() throws Exception {
File dir = temp.newFolder();
BatchReportWriter writer = new BatchReportWriter(dir);

writer.writeFileCoverage(1, Arrays.asList(
BatchReport.Coverage.newBuilder()
.setLine(1)
.build()
));

file = new FileStructure(dir).fileFor(FileStructure.Domain.COVERAGE, 1);
}

@After
public void tearDown() throws Exception {
IOUtils.closeQuietly(sut);
}

@Test
public void read_report() throws Exception {
sut = new ReportStream<>(file, BatchReport.Coverage.PARSER);
assertThat(sut).hasSize(1);
sut.close();
}

@Test(expected = IllegalStateException.class)
public void fail_to_get_iterator_twice() throws Exception {
sut = new ReportStream<>(file, BatchReport.Coverage.PARSER);
sut.iterator();

// Fail !
sut.iterator();
}

@Test(expected = NoSuchElementException.class)
public void fail_to_get_next_when_no_next() throws Exception {
sut = new ReportStream<>(file, BatchReport.Coverage.PARSER);
Iterator<BatchReport.Coverage> iterator = sut.iterator();
// Get first element
iterator.next();

// Fail !
iterator.next();
}

@Test(expected = UnsupportedOperationException.class)
public void fail_to_remove() throws Exception {
sut = new ReportStream<>(file, BatchReport.Coverage.PARSER);
Iterator<BatchReport.Coverage> iterator = sut.iterator();

// Fail !
iterator.remove();
}

}
Expand Up @@ -79,8 +79,8 @@ public void empty_list_if_no_highlighting_found() throws Exception {
}

@Test
public void empty_list_if_no_coverage_found() throws Exception {
assertThat(sut.readFileCoverage(123)).isEmpty();
public void return_null_if_no_coverage_found() throws Exception {
assertThat(sut.readFileCoverage(123)).isNull();
}

/**
Expand Down

0 comments on commit 2d67b38

Please sign in to comment.