Skip to content
This repository has been archived by the owner on May 20, 2021. It is now read-only.

S3 cse encryption and compression #160

Merged
merged 3 commits into from Feb 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/main/java/com/airbnb/airpal/AirpalConfiguration.java
Expand Up @@ -72,6 +72,11 @@ public class AirpalConfiguration extends Configuration
@JsonProperty
private String s3Bucket;

@Getter
@Setter
@JsonProperty
private String s3EncryptionMaterialsProvider;

@Getter
@Setter
@JsonProperty
Expand Down Expand Up @@ -107,4 +112,11 @@ public class AirpalConfiguration extends Configuration
@JsonProperty
@NotNull
private boolean useS3 = false;

@Getter
@Setter
@Valid
@JsonProperty
@NotNull
private boolean compressedOutput = false;
}
Expand Up @@ -10,9 +10,11 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.List;
import java.util.UUID;
import java.util.zip.GZIPOutputStream;

@Slf4j
public class CsvOutputBuilder implements JobOutputBuilder
Expand All @@ -34,13 +36,20 @@ public class CsvOutputBuilder implements JobOutputBuilder
@JsonIgnore
private final UUID jobUUID;

public CsvOutputBuilder(boolean includeHeader, UUID jobUUID, long maxFileSizeBytes) throws IOException {
public CsvOutputBuilder(boolean includeHeader, UUID jobUUID, long maxFileSizeBytes, boolean compressedOutput) throws IOException {
this.includeHeader = includeHeader;
this.jobUUID = jobUUID;
this.outputFile = File.createTempFile(jobUUID.toString(), FILE_SUFFIX);
this.maxFileSizeBytes = maxFileSizeBytes;
this.countingOutputStream = new CountingOutputStream(new FileOutputStream(this.outputFile));
this.csvWriter = new CSVWriter(new OutputStreamWriter(this.countingOutputStream));
OutputStreamWriter writer;
if (compressedOutput) {
writer = new OutputStreamWriter(new GZIPOutputStream(this.countingOutputStream));
}
else {
writer = new OutputStreamWriter(this.countingOutputStream);
}
this.csvWriter = new CSVWriter(writer);
}

@Override
Expand Down
Expand Up @@ -15,14 +15,15 @@
public class OutputBuilderFactory
{
private final long maxFileSizeBytes;
private final boolean isCompressedOutput;

public JobOutputBuilder forJob(Job job)
throws IOException, InvalidQueryException
{
PersistentJobOutput output = job.getOutput();
switch (output.getType()) {
case "csv":
return new CsvOutputBuilder(true, job.getUuid(), maxFileSizeBytes);
return new CsvOutputBuilder(true, job.getUuid(), maxFileSizeBytes, isCompressedOutput);
case "hive":
HiveTablePersistentOutput hiveOutput = (HiveTablePersistentOutput) output;
URI location = output.getLocation();
Expand Down
Expand Up @@ -13,12 +13,13 @@ public class CSVPersistorFactory
private AmazonS3 s3Client;
private String s3Bucket;
private ExpiringFileStore expiringFileStore;
private boolean compressedOutput;

public Persistor getPersistor(Job job, PersistentJobOutput jobOutput)
{
// TODO: Support variable CSV persistor.
if (useS3Persistor) {
return new S3FilePersistor(s3Client, s3Bucket, 0L);
return new S3FilePersistor(s3Client, s3Bucket, 0L, compressedOutput);
} else {
return new FlatFilePersistor(expiringFileStore);
}
Expand Down
Expand Up @@ -25,6 +25,7 @@ public class S3FilePersistor
private final AmazonS3 s3Client;
private final String outputBucket;
private final long maxSizeForTextView;
private final boolean compressedOutput;

@Override
public boolean canPersist(QueryExecutionAuthorizer authorizer)
Expand All @@ -46,6 +47,9 @@ public URI persist(JobOutputBuilder outputBuilder, Job job)
val objectMetaData = new ObjectMetadata();
objectMetaData.setContentLength(file.length());
objectMetaData.setContentType(MediaType.CSV_UTF_8.toString());
if (compressedOutput) {
objectMetaData.setContentEncoding("gzip");
}

val putRequest = new PutObjectRequest(
outputBucket,
Expand Down
48 changes: 42 additions & 6 deletions src/main/java/com/airbnb/airpal/modules/AirpalModule.java
Expand Up @@ -43,6 +43,8 @@
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.AmazonS3EncryptionClient;
import com.amazonaws.services.s3.model.EncryptionMaterialsProvider;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.common.eventbus.AsyncEventBus;
Expand All @@ -62,6 +64,7 @@
import io.dropwizard.setup.Environment;
import lombok.extern.slf4j.Slf4j;
import org.apache.shiro.web.env.EnvironmentLoaderListener;
import org.jetbrains.annotations.Nullable;
import org.skife.jdbi.v2.DBI;

import javax.inject.Named;
Expand Down Expand Up @@ -268,14 +271,47 @@ public AWSCredentials provideAWSCredentials()

@Singleton
@Provides
public AmazonS3 provideAmazonS3Client(AWSCredentials awsCredentials)
public AmazonS3 provideAmazonS3Client(AWSCredentials awsCredentials, EncryptionMaterialsProvider encryptionMaterialsProvider)
{
if (awsCredentials == null) {
InstanceProfileCredentialsProvider iamCredentials = new InstanceProfileCredentialsProvider();
return new AmazonS3Client(iamCredentials);
if (encryptionMaterialsProvider == null) {
return new AmazonS3Client(new InstanceProfileCredentialsProvider());
}
else {
return new AmazonS3EncryptionClient(new InstanceProfileCredentialsProvider(), encryptionMaterialsProvider);
}
}

return new AmazonS3Client(awsCredentials);
if (encryptionMaterialsProvider == null) {
return new AmazonS3Client(awsCredentials);
}
else {
return new AmazonS3EncryptionClient(awsCredentials, encryptionMaterialsProvider);
}
}

@Nullable
@Singleton
@Provides
private EncryptionMaterialsProvider provideEncryptionMaterialsProvider() {
String empClassName = config.getS3EncryptionMaterialsProvider();
if (empClassName != null) {
try {
Class<?> empClass = Class.forName(empClassName);
Object instance = empClass.newInstance();
if (instance instanceof EncryptionMaterialsProvider) {
return (EncryptionMaterialsProvider)instance;
}
else {
throw new IllegalArgumentException("Class " + empClassName + " must implement EncryptionMaterialsProvider");
}
}
catch (Exception x) {
throw new RuntimeException("Unable to initialize EncryptionMaterialsProvider class " + empClassName + ": " + x, x);
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to move the EncryptionMaterialsProvider into its own provider method, having this method depend on it.


return null;
}

@Singleton
Expand Down Expand Up @@ -318,7 +354,7 @@ public ExpiringFileStore provideExpiringFileStore()
@Singleton
public CSVPersistorFactory provideCSVPersistorFactory(ExpiringFileStore fileStore, AmazonS3 s3Client, @Named("s3Bucket") String s3Bucket)
{
return new CSVPersistorFactory(config.isUseS3(), s3Client, s3Bucket, fileStore);
return new CSVPersistorFactory(config.isUseS3(), s3Client, s3Bucket, fileStore, config.isCompressedOutput());
}

@Provides
Expand All @@ -333,6 +369,6 @@ public PersistorFactory providePersistorFactory(CSVPersistorFactory csvPersistor
public OutputBuilderFactory provideOutputBuilderFactory()
{
long maxFileSizeInBytes = Math.round(Math.floor(config.getMaxOutputSize().getValue(DataSize.Unit.BYTE)));
return new OutputBuilderFactory(maxFileSizeInBytes);
return new OutputBuilderFactory(maxFileSizeInBytes, config.isCompressedOutput());
}
}
@@ -1,5 +1,6 @@
package com.airbnb.airpal.resources;

import com.amazonaws.services.s3.model.ObjectMetadata;
import com.opencsv.CSVReader;
import com.airbnb.airpal.core.store.files.ExpiringFileStore;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand All @@ -23,13 +24,15 @@
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.net.URI;
import java.util.zip.GZIPInputStream;

@Slf4j
@Path("/api/preview")
Expand Down Expand Up @@ -82,9 +85,19 @@ private Response getS3Preview(URI fileURI, int numLines) {
outputKey
).withRange(0, 100 * 1024);
val object = s3Client.getObject(request);
try (val s3Reader = new CSVReader(new BufferedReader(new InputStreamReader(object.getObjectContent())))) {
return getPreviewFromCSV(s3Reader, numLines);
} catch (IOException e) {
ObjectMetadata objectMetadata = object.getObjectMetadata();
boolean gzip = "gzip".equalsIgnoreCase(objectMetadata.getContentEncoding());
try (InputStream input = object.getObjectContent()) {
InputStreamReader reader;
if (gzip) {
reader = new InputStreamReader(new GZIPInputStream(input));
}
else {
reader = new InputStreamReader(input);
}
return getPreviewFromCSV(new CSVReader(reader), numLines);
}
catch (IOException e) {
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
}
}
Expand Down
Expand Up @@ -2,6 +2,7 @@

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.google.common.io.ByteStreams;
import com.google.inject.Inject;
import lombok.val;
Expand Down Expand Up @@ -52,7 +53,12 @@ public Response getFile(@PathParam("filename") String filename)
if (object == null) {
return Response.status(Response.Status.NOT_FOUND).build();
} else {
return Response.ok(new StreamingOutput() {
ObjectMetadata objectMetadata = object.getObjectMetadata();
Response.ResponseBuilder builder = Response.ok().type(objectMetadata.getContentType());
if (objectMetadata.getContentEncoding() != null) {
builder = builder.encoding(objectMetadata.getContentEncoding()); // gzip
}
return builder.entity(new StreamingOutput() {
@Override
public void write(OutputStream output)
throws IOException, WebApplicationException
Expand Down