Skip to content

Commit

Permalink
Implement support for Azure blob storage. Intended for large files (e…
Browse files Browse the repository at this point in the history
….g. pictures)
  • Loading branch information
jimmarino committed Mar 6, 2018
1 parent 125cfb5 commit 05fa90e
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 62 deletions.
2 changes: 2 additions & 0 deletions extensions/cloud/portability-cloud-microsoft/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ dependencies {
compile project(':portability-spi-cloud')
// compile 'com.microsoft.azure:azure-documentdb:1.15.1'

compile group: 'com.microsoft.azure', name: 'azure-storage', version: '7.0.0'

compile 'com.datastax.cassandra:cassandra-driver-core:3.4.0'
testCompile 'org.apache.cassandra:cassandra-all:3.11.1'
testCompile 'org.scassandra:java-client:1.1.2'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,50 +15,66 @@
*/
package org.dataportabilityproject.cloud.microsoft.cosmos;

import static org.dataportabilityproject.cloud.microsoft.cosmos.MicrosoftCloudConstants.DATA_TABLE;
import static org.dataportabilityproject.cloud.microsoft.cosmos.MicrosoftCloudConstants.JOB_TABLE;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.InputStream;
import java.util.UUID;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import org.dataportabilityproject.spi.cloud.storage.JobStore;
import org.dataportabilityproject.spi.cloud.types.JobAuthorization;
import org.dataportabilityproject.spi.cloud.types.PortabilityJob;
import org.dataportabilityproject.types.transfer.models.DataModel;

import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.URISyntaxException;
import java.util.UUID;

import static org.dataportabilityproject.cloud.microsoft.cosmos.MicrosoftCloudConstants.DATA_TABLE;
import static org.dataportabilityproject.cloud.microsoft.cosmos.MicrosoftCloudConstants.JOB_TABLE;

/**
* A {@link JobStore} backed by Cosmos DB. This implementation uses the DataStax Cassandra driver to
* communicate with Cosmos DB.
*/
public class CosmosStore implements JobStore {
static final String JOB_INSERT =
String.format("INSERT INTO %s (job_id, job_data) VALUES (?,?)", JOB_TABLE);
static final String JOB_QUERY = String.format("SELECT * FROM %s WHERE job_id = ?", JOB_TABLE);
static final String JOB_DELETE = String.format("DELETE FROM %s WHERE job_id = ?", JOB_TABLE);
static final String JOB_UPDATE =
String.format("UPDATE %s SET job_data = ? WHERE job_id = ?", JOB_TABLE);

static final String DATA_INSERT =
String.format("INSERT INTO %s (data_id, data_model) VALUES (?,?)", DATA_TABLE);
static final String DATA_QUERY = String.format("SELECT * FROM %s WHERE data_id = ?", DATA_TABLE);
static final String DATA_DELETE = String.format("DELETE FROM %s WHERE data_id = ?", DATA_TABLE);
static final String DATA_UPDATE =
String.format("UPDATE %s SET data_model = ? WHERE data_id = ?", DATA_TABLE);

private final Session session;
private final ObjectMapper mapper;

public CosmosStore(Session session, ObjectMapper mapper) {
this.session = session;
this.mapper = mapper;
}
static final String JOB_INSERT =
String.format("INSERT INTO %s (job_id, job_data) VALUES (?,?)", JOB_TABLE);
static final String JOB_QUERY =
String.format("SELECT * FROM %s WHERE job_id = ?", JOB_TABLE);
static final String JOB_DELETE =
String.format("DELETE FROM %s WHERE job_id = ?", JOB_TABLE);
static final String JOB_UPDATE =
String.format("UPDATE %s SET job_data = ? WHERE job_id = ?", JOB_TABLE);

static final String DATA_INSERT =
String.format("INSERT INTO %s (data_id, data_model) VALUES (?,?)", DATA_TABLE);
static final String DATA_QUERY =
String.format("SELECT * FROM %s WHERE data_id = ?", DATA_TABLE);
static final String DATA_DELETE =
String.format("DELETE FROM %s WHERE data_id = ?", DATA_TABLE);
static final String DATA_UPDATE =
String.format("UPDATE %s SET data_model = ? WHERE data_id = ?", DATA_TABLE);

private static final int UNKNOWN_LENGTH = -1;

private final Session session;
private final CloudBlobClient blobClient;
private final ObjectMapper mapper;

public CosmosStore(Session session, CloudBlobClient blobClient, ObjectMapper mapper) {
this.session = session;
this.blobClient = blobClient;
this.mapper = mapper;
}

public void close() {
if (session != null) {
Expand Down Expand Up @@ -107,17 +123,34 @@ public void removeData(UUID id) {
remove(id, DATA_DELETE);
}

@Override
public void create(UUID jobId, String key, InputStream stream) {
// TODO implement with Azure Blob Storage
throw new UnsupportedOperationException("Not yet implemented");
}
@Override
public void create(UUID jobId, String key, InputStream stream) {
try {
CloudBlobContainer container = blobClient.getContainerReference(jobId.toString());
container.createIfNotExists();
CloudBlockBlob blob = container.getBlockBlobReference(key);
blob.upload(stream, UNKNOWN_LENGTH);
} catch (StorageException | URISyntaxException | IOException e) {
// TODO Wrap in a common type?
throw new RuntimeException(e);
}
}

@Override
public InputStream getStream(UUID jobId, String key) {
// TODO implement with Azure Blob Storage
throw new UnsupportedOperationException("Not yet implemented");
}
@Override
public InputStream getStream(UUID jobId, String key) {
try {
CloudBlobContainer container = blobClient.getContainerReference(jobId.toString());
container.createIfNotExists();
CloudBlockBlob blob = container.getBlockBlobReference(key);
PipedInputStream in = new PipedInputStream();
PipedOutputStream out = new PipedOutputStream(in);
blob.download(out);
return in;
} catch (StorageException | URISyntaxException | IOException e) {
// TODO Wrap in a common type?
throw new RuntimeException(e);
}
}

@Override
public UUID findFirst(JobAuthorization.State jobState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,39 @@

import com.datastax.driver.core.Session;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.blob.CloudBlobClient;

/** Creates and initializes a {@link CosmosStore} instance. Supports Azure and local setup. */
import java.net.URISyntaxException;
import java.security.InvalidKeyException;

/**
* Creates and initializes a {@link CosmosStore} instance. Supports Azure and local setup.
*/
public class CosmosStoreInitializer extends AbstractCosomosStoreInitializer {

/** Returns a new {@link CosmosStore} instance configured for Azure. */
public CosmosStore createStore(ObjectMapper mapper) {
CassandraCluster.Builder builder = CassandraCluster.Builder.newInstance();
// TODO configure builder
CassandraCluster cassandraCluster = builder.build();
Session session = cassandraCluster.createSession(true);
/**
* Returns a new {@link CosmosStore} instance configured for Azure.
*/
public CosmosStore createStore(ObjectMapper mapper) {
try {
CassandraCluster.Builder builder = CassandraCluster.Builder.newInstance();
// TODO configure builder
CassandraCluster cassandraCluster = builder.build();
Session session = cassandraCluster.createSession(true);

createKeyspace(session);
createTables(session);

String connectionString = ""; // TODO externalize connection string
CloudStorageAccount storageAccount = CloudStorageAccount.parse(connectionString);
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();

return new CosmosStore(session, blobClient, mapper);
} catch (URISyntaxException | InvalidKeyException e) {
throw new RuntimeException(e);
}

createKeyspace(session);
createTables(session);
}

return new CosmosStore(session, mapper);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,40 @@

import com.datastax.driver.core.Session;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.blob.CloudBlobClient;

/** Creates and initializes a {@link CosmosStore} instance. Supports Azure and local setup. */
import java.net.URISyntaxException;
import java.security.InvalidKeyException;

/**
* Creates and initializes a {@link CosmosStore} instance. Supports Azure and local setup.
*/
public class LocalCosmosStoreInitializer extends AbstractCosomosStoreInitializer {

/** Returns a new {@link CosmosStore} instance configured for local use. */
public CosmosStore createLocalStore(int port, ObjectMapper mapper) {
CassandraCluster.Builder builder = CassandraCluster.Builder.newInstance();
builder.port(port);
CassandraCluster cassandraCluster = builder.build();
/**
* Returns a new {@link CosmosStore} instance configured for local use.
*/
public CosmosStore createLocalStore(int port, ObjectMapper mapper) {
try {
CassandraCluster.Builder builder = CassandraCluster.Builder.newInstance();
builder.port(port);
CassandraCluster cassandraCluster = builder.build();

Session session = cassandraCluster.createSession(false);

createKeyspace(session);
createTables(session);

String connectionString = "DefaultEndpointsProtocol=http;AccountName=storage_account;AccountKey=storage_key";
CloudStorageAccount storageAccount = CloudStorageAccount.parse(connectionString);
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();

Session session = cassandraCluster.createSession(false);
return new CosmosStore(session, blobClient, mapper);
} catch (URISyntaxException | InvalidKeyException e) {
throw new RuntimeException(e);
}
}

createKeyspace(session);
createTables(session);

return new CosmosStore(session, mapper);
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package org.dataportabilityproject.spi.cloud.storage;

import java.io.IOException;
import java.io.InputStream;
import java.util.UUID;
import org.dataportabilityproject.spi.cloud.types.JobAuthorization;
import org.dataportabilityproject.spi.cloud.types.LegacyPortabilityJob;
import org.dataportabilityproject.spi.cloud.types.PortabilityJob;
import org.dataportabilityproject.types.transfer.models.DataModel;

import java.io.IOException;
import java.io.InputStream;
import java.util.UUID;

/**
* A store for {@link PortabilityJob}s.
*
Expand Down Expand Up @@ -121,11 +122,22 @@ default void removeData(UUID id) {
throw new UnsupportedOperationException();
}

/**
* Persists a new blob entry for the job and key. If one exists for the id/key combination, it may be overwritten.
*
* @param jobId the job id
* @param key the key
* @param stream the blob data stream
*/
default void create(UUID jobId, String key, InputStream stream) {
throw new UnsupportedOperationException();
}

/**
* Returns a blob stream for the job and key or null if not found.
*/
default InputStream getStream(UUID jobId, String key) {
throw new UnsupportedOperationException();
}

}

0 comments on commit 05fa90e

Please sign in to comment.