Skip to content

Commit

Permalink
Merge pull request #141 from google/portability-job
Browse files Browse the repository at this point in the history
Make JobStore impls use the new PortabilityJob for #126
  • Loading branch information
rtannenbaum committed Feb 15, 2018
2 parents d8bb624 + 978ffd7 commit dc85c0e
Show file tree
Hide file tree
Showing 10 changed files with 288 additions and 424 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@
import java.util.UUID;
import org.dataportabilityproject.spi.cloud.storage.JobStore;
import org.dataportabilityproject.spi.cloud.types.JobAuthorization;
import org.dataportabilityproject.spi.cloud.types.LegacyPortabilityJob;
import org.dataportabilityproject.spi.cloud.types.LegacyPortabilityJobConverter;
import org.dataportabilityproject.spi.cloud.types.PortabilityJob;

/**
* A {@link JobStore} implementation based on Google Cloud Platform's Datastore.
Expand All @@ -52,69 +51,101 @@ public final class GoogleCloudDatastore implements JobStore {
private static final String CREATED_FIELD = "created";

private final Datastore datastore;
private final boolean encryptedFlow;

public GoogleCloudDatastore(Datastore datastore, boolean encryptedFlow) {
public GoogleCloudDatastore(Datastore datastore) {
this.datastore = datastore;
this.encryptedFlow = encryptedFlow;
}

/**
* Inserts a new {@link LegacyPortabilityJob} keyed by its job ID in Datastore.
* Inserts a new {@link PortabilityJob} keyed by its job ID in Datastore.
*
* <p>To update an existing {@link LegacyPortabilityJob} instead, use {@link #update}.
* <p>To update an existing {@link PortabilityJob} instead, use {@link #update}.
*
* @throws IOException if a job already exists for {@code jobId}, or if there was a different
* @throws IOException if a job already exists for {@code job}'s ID, or if there was a different
* problem inserting the job.
*/
@Override
public void create(UUID jobId, LegacyPortabilityJob job) throws IOException {
public void createJob(UUID jobId, PortabilityJob job) throws IOException {
Preconditions.checkNotNull(jobId);
Transaction transaction = datastore.newTransaction();
Entity shouldNotExist = transaction.get(getKey(jobId));
if (shouldNotExist != null) {
transaction.rollback();
throw new IOException("Record already exists for jobID " + jobId + ": " + shouldNotExist);
throw new IOException("Record already exists for jobID: " + jobId + ". Record: "
+ shouldNotExist);
}
Entity entity = createEntity(jobId, job.asMap());
Entity entity = createEntity(jobId, job.toMap());
try {
transaction.put(entity);
} catch (DatastoreException e) {
transaction.rollback();
throw new IOException(
"Could not create initial record for jobID " + jobId + " (record: " + entity + ")", e);
"Could not create initial record for jobID: " + jobId + ". Record: " + entity, e);
}
transaction.commit();
}

/**
* Finds the {@link LegacyPortabilityJob} keyed by {@code jobId} in Datastore, or null if none found.
* Atomically updates the {@link PortabilityJob} keyed by {@code jobId} to {@code job},
* in Datastore using a {@link Transaction}.
*
* TODO(rtannenbaum): Consider validating authorization state was the previous one, when updating
* authorization state within this transaction. Previous API allowed for passing in of a previous
* expected state, but we shouldn't need to pass that in, given the context of the new state we
* should know what comes before it.
*/
@Override
public LegacyPortabilityJob find(UUID jobId) {
Entity entity = datastore.get(getKey(jobId));
if (entity == null) {
return null;
public void updateJob(UUID jobId, PortabilityJob job) throws IOException {
Preconditions.checkNotNull(jobId);
Transaction transaction = datastore.newTransaction();
Key key = getKey(jobId);

try {
Entity previousEntity = transaction.get(key);
if (previousEntity == null) {
transaction.rollback();
throw new IOException("Could not find record for jobId: " + jobId);
}

Entity newEntity = createEntity(key, job.toMap());
transaction.put(newEntity);
transaction.commit();
} catch (Throwable t) {
transaction.rollback();
throw new IOException("Failed atomic update of jobId: " + jobId, t);
}
}

/**
* Removes the {@link PortabilityJob} keyed by {@code jobId} in Datastore.
*
* @throws IOException if the job doesn't exist, or there was a different problem deleting it.
*/
@Override
public void remove(UUID jobId) throws IOException {
try {
datastore.delete(getKey(jobId));
} catch (DatastoreException e) {
throw new IOException("Could not remove jobId: " + jobId, e);
}
return LegacyPortabilityJob.mapToJob(getProperties(entity));
}

/**
* Finds the {@link LegacyPortabilityJob} keyed by {@code jobId} in Datastore, and verify it is in
* state {@code jobState}.
* Returns the job for the id or null if not found.
*
* @param jobId the job id
*/
@Override
public LegacyPortabilityJob find(UUID jobId, JobAuthorization.State jobState) {
LegacyPortabilityJob job = find(jobId);
Preconditions.checkNotNull(job,
"Expected job {} to be in state {}, but the job was not found", jobId, jobState);
Preconditions.checkState(job.jobState() == jobState,
"Expected job {} to be in state {}, but was {}", jobState, job.jobState());
return job;
public PortabilityJob findJob(UUID jobId) {
Entity entity = datastore.get(getKey(jobId));
if (entity == null) {
return null;
}
return PortabilityJob.fromMap(getProperties(entity));
}

/**
* Finds the ID of the first {@link LegacyPortabilityJob} in state {@code jobState} in Datastore,
* Finds the ID of the first {@link PortabilityJob} in state {@code jobState} in Datastore,
* or null if none found.
*
* TODO(rtannenbaum): Order by creation time so we can process jobs in a FIFO manner. Trying to
Expand All @@ -124,7 +155,7 @@ public LegacyPortabilityJob find(UUID jobId, JobAuthorization.State jobState) {
public UUID findFirst(JobAuthorization.State jobState) {
Query<Key> query = Query.newKeyQueryBuilder()
.setKind(KIND)
.setFilter(PropertyFilter.eq(LegacyPortabilityJobConverter.JOB_STATE, jobState.name()))
.setFilter(PropertyFilter.eq(PortabilityJob.AUTHORIZATION_STATE, jobState.name()))
//.setOrderBy(OrderBy.asc("created"))
.setLimit(1)
.build();
Expand All @@ -136,56 +167,6 @@ public UUID findFirst(JobAuthorization.State jobState) {
return UUID.fromString(key.getName());
}

/**
* Removes the {@link LegacyPortabilityJob} keyed by {@code jobId} in Datastore.
*
* @throws IOException if the job doesn't exist, or there was a different problem deleting it.
*/
@Override
public void remove(UUID jobId) throws IOException {
try {
datastore.delete(getKey(jobId));
} catch (DatastoreException e) {
throw new IOException("Could not remove job " + jobId, e);
}
}

/**
* Atomically updates the {@link LegacyPortabilityJob} keyed by {@code jobId} to {@code job},
* in Datastore using a {@link Transaction}, and verifies that it was previously in the expected
* {@code previousState}.
*
* @throws IOException if the job was not in the expected state in Datastore, or there was another
* problem updating it.
*/
@Override
public void update(UUID jobId, LegacyPortabilityJob job, JobAuthorization.State previousState)
throws IOException {
Preconditions.checkNotNull(jobId);
Transaction transaction = datastore.newTransaction();
Key key = getKey(jobId);

try {
Entity previousEntity = transaction.get(key);
if (previousEntity == null) {
transaction.rollback();
throw new IOException("Could not find record for jobId " + jobId);
}
if (previousState != null && getJobState(previousEntity) != previousState) {
throw new IOException("Job " + jobId + " existed in an unexpected state. "
+ "Expected: " + previousState + " but was: " + getJobState(previousEntity));
}

Entity newEntity = createEntity(key, job.asMap());
transaction.put(newEntity);
transaction.commit();
} catch (Throwable t) {
transaction.rollback();
throw new IOException("Failed atomic update of job " + jobId + " (was state "
+ previousState + ").", t);
}
}

private Entity createEntity(Key key, Map<String, Object> data) throws IOException {
Entity.Builder builder = Entity.newBuilder(key).set(CREATED_FIELD, Timestamp.now());

Expand Down Expand Up @@ -255,20 +236,4 @@ private static Map<String, Object> getProperties(Entity entity) {
private Key getKey(UUID jobId) {
return datastore.newKeyFactory().setKind(KIND).newKey(jobId.toString());
}

/**
* Return {@code entity}'s {@link JobAuthorization.State}, or null if missing.
*
* @param entity a {@link LegacyPortabilityJob}'s representation in {@link #datastore}.
*/
private JobAuthorization.State getJobState(Entity entity) {
String jobState = entity.getString(LegacyPortabilityJobConverter.JOB_STATE);
// TODO: Remove null check once we enable encryptedFlow everywhere. Null should only be allowed
// in legacy non-encrypted case
if (!encryptedFlow) {
return jobState == null ? null : JobAuthorization.State.valueOf(jobState);
}
Preconditions.checkNotNull(jobState, "Job should never exist without a state");
return JobAuthorization.State.valueOf(jobState);
}
}

0 comments on commit dc85c0e

Please sign in to comment.