Skip to content

Commit

Permalink
Changes in AcheToCdrExporter and S3Uploader
Browse files Browse the repository at this point in the history
  • Loading branch information
aecio committed Jun 30, 2017
1 parent bbdda6e commit c8928fc
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 57 deletions.
117 changes: 75 additions & 42 deletions src/main/java/focusedCrawler/memex/cdr/AcheToCdrExporter.java
Expand Up @@ -3,35 +3,37 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.zip.GZIPOutputStream;

import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;

import focusedCrawler.target.model.TargetModelJson;
import focusedCrawler.target.repository.FileSystemTargetRepository;
import focusedCrawler.target.repository.FileSystemTargetRepository.DataFormat;
import focusedCrawler.target.repository.FilesTargetRepository;
import focusedCrawler.tools.SimpleBulkIndexer;
import focusedCrawler.util.CliTool;
import focusedCrawler.util.persistence.PersistentHashtable;
import io.airlift.airline.Command;
import io.airlift.airline.Option;

import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.google.common.hash.Hasher;

import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;

@Command(name="AcheToCdrExporter", description="Exports crawled data to CDR format")
public class AcheToCdrExporter extends CliTool {

Expand Down Expand Up @@ -99,11 +101,14 @@ public enum CDRVersion {
@Option(name={"--secretkey", "-sk"}, description="AWS SECRET KEY ID")
String secretKeyID = "";

@Option(name={"--bucket", "-bk"}, description="AWS S3 BUCKET NAME")
@Option(name = {"--bucket", "-bk"}, description = "AWS S3 BUCKET NAME")
String bucketName = "";

private HashMap<String, CDR31MediaObject> mediaObjectCache = new HashMap<String, CDR31MediaObject>(); // key: original url
private S3Uploader s3Uploader = new S3Uploader(); // Requires initialization
@Option(name = {"--tmp-path", "-tmp"}, description = "Path to temporary working folder")
String temp = null;

private PersistentHashtable<CDR31MediaObject> mediaObjectCache;
private S3Uploader s3Uploader;

//
// Runtime variables
Expand All @@ -125,15 +130,29 @@ public void execute() throws Exception {
System.out.println("Generating CDR file at: "+outputFile);
System.out.println(" Compressed repository: "+compressData);
System.out.println(" Hashed file name: "+hashFilename);

s3Uploader.init(this.accessKeyID, this.secretKeyID, this.bucketName);

if(outputFile != null) {
if (temp == null) {
Path tmpPath = Files.createTempDirectory("cdr-export-tmp");
Files.createDirectories(tmpPath);
temp = tmpPath.toString();
}

s3Uploader = new S3Uploader(this.accessKeyID, this.secretKeyID, this.bucketName);
mediaObjectCache =
new PersistentHashtable<CDR31MediaObject>(temp, 1000, CDR31MediaObject.class);

if (outputFile != null) {
GZIPOutputStream gzipStream = new GZIPOutputStream(new FileOutputStream(outputFile));
out = new PrintWriter(gzipStream, true);
}

if(elasticSearchServer != null) {
if (elasticSearchServer != null) {
if (this.outputIndex == null || this.outputIndex.isEmpty())
throw new IllegalArgumentException(
"Argument for Elasticsearch index can't be empty");
if (this.outputType == null || this.outputType.isEmpty())
throw new IllegalArgumentException(
"Argument for Elasticsearch type can't be empty");
bulkIndexer = new SimpleBulkIndexer(elasticSearchServer, userPass, bulkSize);
}

Expand Down Expand Up @@ -175,8 +194,6 @@ public void execute() throws Exception {
}
System.out.printf("Processed %d pages\n", processedPages);

//it.close();

if(out != null) out.close();
if(bulkIndexer!= null) bulkIndexer.close();

Expand Down Expand Up @@ -273,10 +290,10 @@ public void createCDR3DocumentJson(TargetModelJson pageModel) {

public void createCDR31MediaObject(TargetModelJson pageModel) throws IOException {
// Hash and upload to S3
String storedUrl = this.uploadMediaFile(pageModel.getContent());
System.out.println(storedUrl);
String storedUrl = this.uploadMediaFile(pageModel.getContent(), pageModel.getUrl());
System.out.println("Uploaded object: " + storedUrl);

// Creat Media Object for the image
// Create Media Object for the image
CDR31MediaObject obj = new CDR31MediaObject();
obj.setContentType(pageModel.getContentType());
obj.setTimestampCrawl(new Date(pageModel.getFetchTime()));
Expand All @@ -288,18 +305,18 @@ public void createCDR31MediaObject(TargetModelJson pageModel) throws IOException
this.mediaObjectCache.put(pageModel.getUrl(), obj);
}

public String uploadMediaFile(byte[] content) throws IOException {
HashFunction hf = Hashing.sha256();
Hasher hasher = hf.newHasher();
hasher.putBytes(content);
String hs = hasher.hash().toString();
String storedUrl = this.s3Uploader.upload(hs, content);
return storedUrl;
private String uploadMediaFile(byte[] content, String url) throws IOException {
HashFunction hf = Hashing.sha256();
Hasher hasher = hf.newHasher();
hasher.putBytes(content);
String host = new URL(url).getHost();
String hs = reverseDomain(host) + "/" + hasher.hash().toString();
this.s3Uploader.upload(hs, content);
return hs;
}

public String[] extractImgLinks(String html) {
HashSet<String> links = new HashSet();
HashSet<String> links = new HashSet<>();
Document doc = Jsoup.parse(html);
Elements media = doc.select("[src]");

Expand All @@ -312,15 +329,14 @@ public String[] extractImgLinks(String html) {
}

public void createCDR31DocumentJson(TargetModelJson pageModel) {
//HashMap<String, Object> crawlData = new HashMap<>();
//crawlData.put("response_headers", pageModel.getResponseHeaders());
List<CDR31MediaObject> mediaObjects = new ArrayList();
String[] imgLinks = extractImgLinks(pageModel.getContentAsString());
for (String link:imgLinks) {
if (this.mediaObjectCache.containsKey(link)) {
mediaObjects.add(this.mediaObjectCache.get(link));
}
}
List<CDR31MediaObject> mediaObjects = new ArrayList<>();
String[] imgLinks = extractImgLinks(pageModel.getContentAsString());
for (String link : imgLinks) {
CDR31MediaObject object = this.mediaObjectCache.get(link);
if (object != null) {
mediaObjects.add(object);
}
}

CDR31Document.Builder builder = new CDR31Document.Builder()
.setUrl(pageModel.getUrl())
Expand All @@ -337,5 +353,22 @@ public void createCDR31DocumentJson(TargetModelJson pageModel) {
this.id = doc.getId();
this.doc = doc;
}

private String reverseDomain(String domain) {
if(domain == null || domain.isEmpty()) {
return null;
}
String[] hostParts = domain.split("\\.");
if(hostParts.length == 0 ) {
return null;
}
StringBuilder reverseDomain = new StringBuilder();
reverseDomain.append(hostParts[hostParts.length-1]);
for (int i = hostParts.length-2; i >= 0; i--) {
reverseDomain.append('/');
reverseDomain.append(hostParts[i]);
}
return reverseDomain.toString();
}

}
32 changes: 17 additions & 15 deletions src/main/java/focusedCrawler/memex/cdr/S3Uploader.java
@@ -1,31 +1,31 @@
package focusedCrawler.memex.cdr;

import java.io.File;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ByteArrayInputStream;

import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;

public class S3Uploader {
private final AmazonS3 s3client = new AmazonS3Client(new ProfileCredentialsProvider());

private final AmazonS3 s3client;
private String bucketName = "";

public S3Uploader() {}

public void init(String access_key_id, String secret_key_id, String bucketName) {
public S3Uploader(String access_key_id, String secret_key_id, String bucketName) {
System.out.println("S3 Access Key: "+access_key_id);
System.out.println("S3 Secret Key ID: "+secret_key_id);
System.out.println("S3 Bucket Name: "+bucketName);
BasicAWSCredentials awsCreds = new BasicAWSCredentials(access_key_id, secret_key_id);
AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
this.s3client = AmazonS3ClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.withRegion("us-east-1")
.build();
this.bucketName = bucketName;
System.out.println("Initializing S3 Uploader");
Expand All @@ -35,8 +35,10 @@ public void init(String access_key_id, String secret_key_id, String bucketName)
public String upload(String keyName, byte[] content) throws IOException {
try {
InputStream is = new ByteArrayInputStream(content);
s3client.putObject(this.bucketName, keyName, is, new ObjectMetadata());

ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(content.length);
PutObjectRequest put = new PutObjectRequest(this.bucketName, keyName, is, metadata);
s3client.putObject(put);
} catch (AmazonServiceException ase) {
System.out.println("Caught an AmazonServiceException, which " +
"means your request made it " +
Expand Down

0 comments on commit c8928fc

Please sign in to comment.