Skip to content
Permalink
Browse files
Convert to use parent pom
Fix newly enforced findbugs and modernizer rules
Disable jar sealing issues due to maven-shade-plugin and ITs
Mitigate some checkstyle issues, but leave it disabled
(checkstyle was not previously running on this project, and some of its
 indent rules conflict with the formatter-maven-plugin for lambdas)
  • Loading branch information
ctubbsii committed Feb 26, 2016
1 parent c997183 commit df2ba6ae7b2a40ac9488fffd9d3a562120d0cfe9
Show file tree
Hide file tree
Showing 13 changed files with 75 additions and 252 deletions.
@@ -22,7 +22,7 @@
public class DataConfig {

public static String CC_URL_PREFIX = "https://aws-publicdatasets.s3.amazonaws.com/";
public static String WI_EXECUTOR_INSTANCES = "WI_EXECUTOR_INSTANCES";
public static final String WI_EXECUTOR_INSTANCES = "WI_EXECUTOR_INSTANCES";

public String fluoHome;
public String hadoopConfDir;
@@ -31,7 +31,7 @@ public class URL implements Serializable {
private static final String HTTP_PROTO = "http://";
private static final String HTTPS_PROTO = "https://";
private static final String PAGE_ID_SEP = ">";
public static InetAddressValidator validator = InetAddressValidator.getInstance();
public static final InetAddressValidator validator = InetAddressValidator.getInstance();

private static final long serialVersionUID = 1L;

@@ -169,8 +169,9 @@ public String getHost() {
}

public String getReverseHost() {
if (hasIPHost())
if (hasIPHost()) {
return host;
}
return reverseHost(host);
}

@@ -26,13 +26,13 @@ public void testBasic() {
Page page = new Page(URLTest.from("http://example.com").toPageID());
Assert.assertEquals("http://example.com/", page.getUrl());
Assert.assertEquals("com.example>>o>/", page.getPageID());
Assert.assertEquals(new Long(0), page.getNumOutbound());
Assert.assertEquals(Long.valueOf(0), page.getNumOutbound());
Assert.assertTrue(page.addOutbound(Link.of(URLTest.from("http://test1.com"), "test1")));
Assert.assertEquals(new Long(1), page.getNumOutbound());
Assert.assertEquals(Long.valueOf(1), page.getNumOutbound());
Assert.assertTrue(page.addOutbound(Link.of(URLTest.from("http://test2.com"), "test2")));
Assert.assertEquals(new Long(2), page.getNumOutbound());
Assert.assertEquals(Long.valueOf(2), page.getNumOutbound());
Assert.assertFalse(page.addOutbound(Link.of(URLTest.from("http://test2.com"), "test1234")));
Assert.assertEquals(new Long(2), page.getNumOutbound());
Assert.assertEquals(Long.valueOf(2), page.getNumOutbound());

Gson gson = new Gson();
String json = gson.toJson(page);
@@ -19,7 +19,6 @@
import io.fluo.api.data.Bytes;
import io.fluo.api.data.RowColumn;
import io.fluo.webindex.core.models.Page;
import io.fluo.webindex.core.models.URL;
import io.fluo.webindex.data.fluo.UriMap.UriInfo;
import io.fluo.webindex.data.spark.IndexEnv;
import io.fluo.webindex.data.spark.IndexStats;
@@ -45,32 +45,32 @@ public static void main(String[] args) throws Exception {
System.exit(1);
}

DataConfig dataConfig = DataConfig.load();
DataConfig.load();

SparkConf sparkConf = new SparkConf().setAppName("webindex-test-parser");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
try (JavaSparkContext ctx = new JavaSparkContext(sparkConf)) {

log.info("Parsing {} files (Range {} of paths file {}) from AWS", loadList.size(), args[1],
args[0]);
log.info("Parsing {} files (Range {} of paths file {}) from AWS", loadList.size(), args[1],
args[0]);

JavaRDD<String> loadRDD = ctx.parallelize(loadList, loadList.size());
JavaRDD<String> loadRDD = ctx.parallelize(loadList, loadList.size());

final String prefix = DataConfig.CC_URL_PREFIX;
final String prefix = DataConfig.CC_URL_PREFIX;

loadRDD.foreachPartition(iter -> {
iter.forEachRemaining(path -> {
String urlToCopy = prefix + path;
log.info("Parsing {}", urlToCopy);
try {
ArchiveReader reader = WARCReaderFactory.get(new URL(urlToCopy), 0);
for (ArchiveRecord record : reader) {
ArchiveUtil.buildPageIgnoreErrors(record);
loadRDD.foreachPartition(iter -> {
iter.forEachRemaining(path -> {
String urlToCopy = prefix + path;
log.info("Parsing {}", urlToCopy);
try {
ArchiveReader reader = WARCReaderFactory.get(new URL(urlToCopy), 0);
for (ArchiveRecord record : reader) {
ArchiveUtil.buildPageIgnoreErrors(record);
}
} catch (Exception e) {
log.error("Exception while processing {}", path, e);
}
} catch (Exception e) {
log.error("Exception while processing {}", path, e);
}
});
});
});
ctx.stop();
}
}
}
@@ -34,7 +34,7 @@ public DomainExport(Optional<Long> oldCount, Optional<Long> newCount) {

@Override
protected Map<RowColumn, Bytes> generateData(String domain, Optional<Long> count) {
if (count.orElse(0l) == 0) {
if (count.orElse(0L) == 0) {
return Collections.emptyMap();
}
return Collections.singletonMap(new RowColumn("d:" + domain, PAGECOUNT_COL),
@@ -15,6 +15,7 @@
package io.fluo.webindex.data.fluo;

import java.net.MalformedURLException;
import java.util.Objects;

import com.google.common.base.Preconditions;
import com.google.gson.Gson;
@@ -46,7 +47,7 @@ public static PageLoader updatePage(Page page) {
}

public static PageLoader deletePage(URL url) throws MalformedURLException {
Preconditions.checkNotNull(url, "Url cannot be null");
Objects.requireNonNull(url, "Url cannot be null");
PageLoader update = new PageLoader();
update.action = Action.DELETE;
update.delUrl = url;
@@ -40,7 +40,7 @@
*/
public class UriMap {

public static String URI_MAP_ID = "um";
public static final String URI_MAP_ID = "um";

public static class UriInfo implements Serializable {

@@ -82,6 +82,11 @@ public boolean equals(Object o) {
return false;
}

@Override
public int hashCode() {
return docs + (int) linksTo;
}

public static UriInfo merge(UriInfo u1, UriInfo u2) {
UriInfo total = new UriInfo(0, 0);
total.add(u1);
@@ -143,9 +148,9 @@ public void updatingValues(TransactionBase tx, Iterator<Update<String, UriInfo>>

String pageDomain = URL.fromPageID(update.getKey()).getReverseDomain();
if (oldVal.equals(UriInfo.ZERO) && !newVal.equals(UriInfo.ZERO)) {
domainUpdates.merge(pageDomain, 1l, (o, n) -> o + n);
domainUpdates.merge(pageDomain, 1L, (o, n) -> o + n);
} else if (newVal.equals(UriInfo.ZERO) && !oldVal.equals(UriInfo.ZERO)) {
domainUpdates.merge(pageDomain, -1l, (o, n) -> o + n);
domainUpdates.merge(pageDomain, -1L, (o, n) -> o + n);
}
}

@@ -38,7 +38,6 @@
import io.fluo.recipes.common.Pirtos;
import io.fluo.webindex.core.DataConfig;
import io.fluo.webindex.core.models.Page;
import io.fluo.webindex.core.models.URL;
import io.fluo.webindex.data.FluoApp;
import io.fluo.webindex.data.fluo.PageObserver;
import io.fluo.webindex.data.fluo.UriMap.UriInfo;
@@ -65,8 +64,8 @@ public class IndexEnv {

private static final Logger log = LoggerFactory.getLogger(IndexEnv.class);

private final String accumuloTable;
private Connector conn;
final private String accumuloTable;
private FluoConfiguration fluoConfig;
private FileSystem hdfs;
private Path failuresDir;
@@ -126,6 +125,10 @@ private static FluoConfiguration getFluoConfig(DataConfig dataConfig) {
return new FluoConfiguration(new File(dataConfig.getFluoPropsPath()));
}

public FluoConfiguration getFluoConfig() {
return fluoConfig;
}

private static SortedSet<Text> getSplits(String filename) {
SortedSet<Text> splits = new TreeSet<>();
InputStream is = IndexEnv.class.getClassLoader().getResourceAsStream("splits/" + filename);
@@ -151,6 +154,12 @@ public static FileSystem getHDFS() throws IOException {
return getHDFS(getHadoopConfDir());
}

public static FileSystem getHDFS(String hadoopConfDir) throws IOException {
Configuration config = new Configuration();
config.addResource(hadoopConfDir);
return FileSystem.get(config);
}

public static void validateDataDir(String dataDir) {
try {
FileSystem hdfs = getHDFS();
@@ -173,12 +182,6 @@ public static void validateDataDir(String dataDir) {
}
}

public static FileSystem getHDFS(String hadoopConfDir) throws IOException {
Configuration config = new Configuration();
config.addResource(hadoopConfDir);
return FileSystem.get(config);
}

public void initAccumuloIndexTable() {
if (conn.tableOperations().exists(accumuloTable)) {
try {
@@ -283,10 +286,6 @@ public Connector getAccumuloConnector() {
return conn;
}

public FluoConfiguration getFluoConfig() {
return fluoConfig;
}

public static List<String> getPathsRange(String ccPaths, String range) {
if (!(new File(ccPaths).exists())) {
log.error("CC paths file {} does not exist", ccPaths);
@@ -17,13 +17,9 @@
import com.google.common.net.HostSpecifier;
import com.google.common.net.InternetDomainName;
import io.fluo.webindex.core.models.URL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataUrl {

private static final Logger log = LoggerFactory.getLogger(DataUrl.class);

public static String domainFromHost(String host) {
return InternetDomainName.from(host).topPrivateDomain().name();
}
@@ -12,7 +12,7 @@
* the License.
*/

package io.fluo.webindex.data.fluo;
package io.fluo.webindex.data.fluo.it;

import java.io.File;
import java.util.ArrayList;
@@ -42,6 +42,7 @@
import io.fluo.webindex.core.models.URL;
import io.fluo.webindex.data.FluoApp;
import io.fluo.webindex.data.SparkTestUtil;
import io.fluo.webindex.data.fluo.PageLoader;
import io.fluo.webindex.data.fluo.UriMap.UriInfo;
import io.fluo.webindex.data.spark.Hex;
import io.fluo.webindex.data.spark.IndexEnv;
@@ -78,7 +79,7 @@ public class IndexIT {

private static final Logger log = LoggerFactory.getLogger(IndexIT.class);

public static TemporaryFolder folder = new TemporaryFolder();
public static final TemporaryFolder folder = new TemporaryFolder();
public static MiniAccumuloCluster cluster;
private static MiniFluo miniFluo;
private static final PasswordToken password = new PasswordToken("secret");
@@ -25,7 +25,6 @@
import io.fluo.api.data.RowColumn;
import io.fluo.webindex.core.models.Link;
import io.fluo.webindex.core.models.Page;
import io.fluo.webindex.core.models.URL;
import io.fluo.webindex.data.FluoApp;
import io.fluo.webindex.data.SparkTestUtil;
import io.fluo.webindex.data.fluo.UriMap.UriInfo;

0 comments on commit df2ba6a

Please sign in to comment.