Skip to content
Permalink
Browse files
Fix warnings
* unused variables, private methods
* unclosed AutoClosables
* missing serialVersionUID
* non-static access to static method
  • Loading branch information
ctubbsii committed Feb 18, 2016
1 parent 3ea920c commit 6282e87d9eb057eafd0b9004211f697c06dda094
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 134 deletions.
@@ -16,12 +16,9 @@

public class DomainStats {

private String domain;
private Long total = (long) 0;

public DomainStats(String domain) {
this.domain = domain;
}
public DomainStats(String domain) {}

public Long getTotal() {
return total;
@@ -20,8 +20,6 @@
import java.net.URL;
import java.util.List;

import io.fluo.webindex.core.DataConfig;
import io.fluo.webindex.data.spark.IndexEnv;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -31,6 +29,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.fluo.webindex.core.DataConfig;
import io.fluo.webindex.data.spark.IndexEnv;

public class Copy {

private static final Logger log = LoggerFactory.getLogger(Copy.class);
@@ -59,44 +60,47 @@ public static void main(String[] args) throws Exception {
DataConfig dataConfig = DataConfig.load();

SparkConf sparkConf = new SparkConf().setAppName("webindex-copy");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
try (JavaSparkContext ctx = new JavaSparkContext(sparkConf)) {

FileSystem hdfs = FileSystem.get(ctx.hadoopConfiguration());
Path destPath = new Path(args[2]);
if (!hdfs.exists(destPath)) {
hdfs.mkdirs(destPath);
}
FileSystem hdfs = FileSystem.get(ctx.hadoopConfiguration());
Path destPath = new Path(args[2]);
if (!hdfs.exists(destPath)) {
hdfs.mkdirs(destPath);
}

log.info("Copying {} files (Range {} of paths file {}) from AWS to HDFS {}", copyList.size(),
args[1], args[0], destPath.toString());

JavaRDD<String> copyRDD = ctx.parallelize(copyList, dataConfig.getNumExecutorInstances());

log.info("Copying {} files (Range {} of paths file {}) from AWS to HDFS {}", copyList.size(),
args[1], args[0], destPath.toString());

JavaRDD<String> copyRDD = ctx.parallelize(copyList, dataConfig.getNumExecutorInstances());

final String prefix = DataConfig.CC_URL_PREFIX;
final String destDir = destPath.toString();

copyRDD.foreachPartition(iter -> {
FileSystem fs = IndexEnv.getHDFS(hadoopConfDir);
iter.forEachRemaining(ccPath -> {
try {
Path dfsPath = new Path(destDir + "/" + getFilename(ccPath));
if (fs.exists(dfsPath)) {
log.error("File {} exists in HDFS and should have been previously filtered",
dfsPath.getName());
} else {
String urlToCopy = prefix + ccPath;
log.info("Starting copy of {} to {}", urlToCopy, destDir);
try (OutputStream out = fs.create(dfsPath);
BufferedInputStream in = new BufferedInputStream(new URL(urlToCopy).openStream())) {
IOUtils.copy(in, out);
}
log.info("Created {}", dfsPath.getName());
}
} catch (IOException e) {
log.error("Exception while copying {}", ccPath, e);
}
});
});
ctx.stop();
final String prefix = DataConfig.CC_URL_PREFIX;
final String destDir = destPath.toString();

copyRDD
.foreachPartition(iter -> {
FileSystem fs = IndexEnv.getHDFS(hadoopConfDir);
iter.forEachRemaining(ccPath -> {
try {
Path dfsPath = new Path(destDir + "/" + getFilename(ccPath));
if (fs.exists(dfsPath)) {
log.error("File {} exists in HDFS and should have been previously filtered",
dfsPath.getName());
} else {
String urlToCopy = prefix + ccPath;
log.info("Starting copy of {} to {}", urlToCopy, destDir);
try (OutputStream out = fs.create(dfsPath);
BufferedInputStream in =
new BufferedInputStream(new URL(urlToCopy).openStream())) {
IOUtils.copy(in, out);
}
log.info("Created {}", dfsPath.getName());
}
} catch (IOException e) {
log.error("Exception while copying {}", ccPath, e);
}
});
});
ctx.stop();
}
}
}
@@ -19,15 +19,6 @@
import java.util.ArrayList;
import java.util.List;

import io.fluo.api.client.FluoClient;
import io.fluo.api.client.FluoFactory;
import io.fluo.api.client.LoaderExecutor;
import io.fluo.api.config.FluoConfiguration;
import io.fluo.webindex.core.DataConfig;
import io.fluo.webindex.core.models.Page;
import io.fluo.webindex.data.fluo.PageLoader;
import io.fluo.webindex.data.spark.IndexEnv;
import io.fluo.webindex.data.util.ArchiveUtil;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
@@ -42,6 +33,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.fluo.api.client.FluoClient;
import io.fluo.api.client.FluoFactory;
import io.fluo.api.client.LoaderExecutor;
import io.fluo.api.config.FluoConfiguration;
import io.fluo.webindex.core.DataConfig;
import io.fluo.webindex.core.models.Page;
import io.fluo.webindex.data.fluo.PageLoader;
import io.fluo.webindex.data.spark.IndexEnv;
import io.fluo.webindex.data.util.ArchiveUtil;

public class LoadHdfs {

private static final Logger log = LoggerFactory.getLogger(LoadHdfs.class);
@@ -56,7 +57,7 @@ public static void main(String[] args) throws Exception {
IndexEnv.validateDataDir(dataDir);

final String hadoopConfDir = IndexEnv.getHadoopConfDir();
DataConfig dataConfig = DataConfig.load();
DataConfig.load();

List<String> loadPaths = new ArrayList<>();
FileSystem hdfs = IndexEnv.getHDFS();
@@ -71,37 +72,38 @@ public static void main(String[] args) throws Exception {
log.info("Loading {} files into Fluo from {}", loadPaths.size(), dataDir);

SparkConf sparkConf = new SparkConf().setAppName("webindex-load-hdfs");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
try (JavaSparkContext ctx = new JavaSparkContext(sparkConf)) {

JavaRDD<String> paths = ctx.parallelize(loadPaths, loadPaths.size());
JavaRDD<String> paths = ctx.parallelize(loadPaths, loadPaths.size());

paths.foreachPartition(iter -> {
final FluoConfiguration fluoConfig = new FluoConfiguration(new File("fluo.properties"));
FileSystem fs = IndexEnv.getHDFS(hadoopConfDir);
try (FluoClient client = FluoFactory.newClient(fluoConfig);
LoaderExecutor le = client.newLoaderExecutor()) {
iter.forEachRemaining(path -> {
Path filePath = new Path(path);
try {
if (fs.exists(filePath)) {
FSDataInputStream fsin = fs.open(filePath);
ArchiveReader reader = WARCReaderFactory.get(filePath.getName(), fsin, true);
for (ArchiveRecord record : reader) {
Page page = ArchiveUtil.buildPageIgnoreErrors(record);
if (page.getOutboundLinks().size() > 0) {
log.info("Loading page {} with {} links", page.getUrl(), page.getOutboundLinks()
.size());
le.execute(PageLoader.updatePage(page));
paths.foreachPartition(iter -> {
final FluoConfiguration fluoConfig = new FluoConfiguration(new File("fluo.properties"));
FileSystem fs = IndexEnv.getHDFS(hadoopConfDir);
try (FluoClient client = FluoFactory.newClient(fluoConfig);
LoaderExecutor le = client.newLoaderExecutor()) {
iter.forEachRemaining(path -> {
Path filePath = new Path(path);
try {
if (fs.exists(filePath)) {
FSDataInputStream fsin = fs.open(filePath);
ArchiveReader reader = WARCReaderFactory.get(filePath.getName(), fsin, true);
for (ArchiveRecord record : reader) {
Page page = ArchiveUtil.buildPageIgnoreErrors(record);
if (page.getOutboundLinks().size() > 0) {
log.info("Loading page {} with {} links", page.getUrl(), page
.getOutboundLinks().size());
le.execute(PageLoader.updatePage(page));
}
}
}
} catch (IOException e) {
log.error("Exception while processing {}", path, e);
}
} catch (IOException e) {
log.error("Exception while processing {}", path, e);
}
});
}
});
});
}
});

ctx.stop();
ctx.stop();
}
}
}
@@ -18,6 +18,15 @@
import java.net.URL;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.archive.io.ArchiveReader;
import org.archive.io.ArchiveRecord;
import org.archive.io.warc.WARCReaderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.fluo.api.client.FluoClient;
import io.fluo.api.client.FluoFactory;
import io.fluo.api.client.LoaderExecutor;
@@ -27,14 +36,6 @@
import io.fluo.webindex.data.fluo.PageLoader;
import io.fluo.webindex.data.spark.IndexEnv;
import io.fluo.webindex.data.util.ArchiveUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.archive.io.ArchiveReader;
import org.archive.io.ArchiveRecord;
import org.archive.io.warc.WARCReaderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LoadS3 {

@@ -52,41 +53,42 @@ public static void main(String[] args) throws Exception {
System.exit(1);
}

DataConfig dataConfig = DataConfig.load();
DataConfig.load();

SparkConf sparkConf = new SparkConf().setAppName("webindex-load-s3");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
try (JavaSparkContext ctx = new JavaSparkContext(sparkConf)) {

log.info("Loading {} files (Range {} of paths file {}) from AWS", loadList.size(), args[1],
args[0]);
log.info("Loading {} files (Range {} of paths file {}) from AWS", loadList.size(), args[1],
args[0]);

JavaRDD<String> loadRDD = ctx.parallelize(loadList, loadList.size());
JavaRDD<String> loadRDD = ctx.parallelize(loadList, loadList.size());

final String prefix = DataConfig.CC_URL_PREFIX;
final String prefix = DataConfig.CC_URL_PREFIX;

loadRDD.foreachPartition(iter -> {
final FluoConfiguration fluoConfig = new FluoConfiguration(new File("fluo.properties"));
try (FluoClient client = FluoFactory.newClient(fluoConfig);
LoaderExecutor le = client.newLoaderExecutor()) {
iter.forEachRemaining(path -> {
String urlToCopy = prefix + path;
log.info("Loading {} to Fluo", urlToCopy);
try {
ArchiveReader reader = WARCReaderFactory.get(new URL(urlToCopy), 0);
for (ArchiveRecord record : reader) {
Page page = ArchiveUtil.buildPageIgnoreErrors(record);
if (page.getOutboundLinks().size() > 0) {
log.info("Loading page {} with {} links", page.getUrl(), page.getOutboundLinks()
.size());
le.execute(PageLoader.updatePage(page));
loadRDD.foreachPartition(iter -> {
final FluoConfiguration fluoConfig = new FluoConfiguration(new File("fluo.properties"));
try (FluoClient client = FluoFactory.newClient(fluoConfig);
LoaderExecutor le = client.newLoaderExecutor()) {
iter.forEachRemaining(path -> {
String urlToCopy = prefix + path;
log.info("Loading {} to Fluo", urlToCopy);
try {
ArchiveReader reader = WARCReaderFactory.get(new URL(urlToCopy), 0);
for (ArchiveRecord record : reader) {
Page page = ArchiveUtil.buildPageIgnoreErrors(record);
if (page.getOutboundLinks().size() > 0) {
log.info("Loading page {} with {} links", page.getUrl(), page.getOutboundLinks()
.size());
le.execute(PageLoader.updatePage(page));
}
}
} catch (Exception e) {
log.error("Exception while processing {}", path, e);
}
} catch (Exception e) {
log.error("Exception while processing {}", path, e);
}
});
}
});
ctx.stop();
});
}
});
ctx.stop();
}
}
}
@@ -23,6 +23,8 @@

public class IndexStats implements Serializable {

private static final long serialVersionUID = 1L;

private static final Logger log = LoggerFactory.getLogger(IndexUtil.class);

private Accumulator<Integer> numPages;
@@ -16,14 +16,15 @@

import java.io.File;

import org.apache.accumulo.core.client.Connector;

import io.dropwizard.Application;
import io.dropwizard.setup.Bootstrap;
import io.dropwizard.setup.Environment;
import io.dropwizard.views.ViewBundle;
import io.fluo.api.config.FluoConfiguration;
import io.fluo.webindex.core.DataConfig;
import io.fluo.core.util.AccumuloUtil;
import org.apache.accumulo.core.client.Connector;
import io.fluo.webindex.core.DataConfig;

public class WebIndexApp extends Application<WebIndexConfig> {

@@ -44,7 +45,7 @@ public void initialize(Bootstrap<WebIndexConfig> bootstrap) {
@Override
public void run(WebIndexConfig config, Environment environment) {

DataConfig dataConfig = config.getDataConfig();
DataConfig dataConfig = WebIndexConfig.getDataConfig();
File fluoConfigFile = new File(dataConfig.getFluoPropsPath());
FluoConfiguration fluoConfig = new FluoConfiguration(fluoConfigFile);

0 comments on commit 6282e87

Please sign in to comment.