Skip to content

Commit

Permalink
Merge pull request #113 from google/migrate-stores
Browse files Browse the repository at this point in the history
Migrate PortabilityJob, AuthData, and job stores to the new directory structure
  • Loading branch information
rtannenbaum committed Feb 2, 2018
2 parents c64e9cf + 19140d4 commit f5d17ae
Show file tree
Hide file tree
Showing 56 changed files with 312 additions and 382 deletions.
3 changes: 3 additions & 0 deletions extensions/cloud/portability-cloud-google/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@
dependencies {
compile project(':portability-spi-cloud')
compile project(':portability-spi-transfer')

compile("com.google.cloud:google-cloud-storage:${googleDatastoreVersion}")
compile("com.google.cloud:google-cloud-datastore:${googleDatastoreVersion}")
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,37 +39,36 @@
import java.io.ObjectOutputStream;
import java.util.Map;
import java.util.Map.Entry;
import org.dataportabilityproject.cloud.interfaces.PersistentKeyValueStore;
import org.dataportabilityproject.job.PortabilityJob.JobState;
import org.dataportabilityproject.job.PortabilityJob;
import org.dataportabilityproject.job.PortabilityJobConverter;
import org.dataportabilityproject.shared.settings.CommonSettings;
import org.dataportabilityproject.spi.cloud.storage.JobStore;
import org.dataportabilityproject.spi.cloud.types.LegacyPortabilityJob;
import org.dataportabilityproject.spi.cloud.types.LegacyPortabilityJob.JobState;
import org.dataportabilityproject.spi.cloud.types.OldPortabilityJobConverter;

/**
* A {@link PersistentKeyValueStore} implementation based on Google Cloud Platform's Datastore.
* A {@link JobStore} implementation based on Google Cloud Platform's Datastore.
*/
public final class GooglePersistentKeyValueStore implements PersistentKeyValueStore {
public final class GoogleCloudDatastore implements JobStore {
private static final String KIND = "persistentKey";
private static final String CREATED_FIELD = "created";

private final Datastore datastore;
private final CommonSettings commonSettings;
private final boolean encryptedFlow;

public GooglePersistentKeyValueStore(Datastore datastore, CommonSettings commonSettings) {
public GoogleCloudDatastore(Datastore datastore, boolean encryptedFlow) {
this.datastore = datastore;
this.commonSettings = commonSettings;
this.encryptedFlow = encryptedFlow;
}

/**
* Inserts a new {@link PortabilityJob} keyed by its job ID in Datastore.
* Inserts a new {@link LegacyPortabilityJob} keyed by its job ID in Datastore.
*
* <p>To update an existing {@link PortabilityJob} instead, use {@link #update}.
* <p>To update an existing {@link LegacyPortabilityJob} instead, use {@link #update}.
*
* @throws IOException if a job already exists for {@code jobId}, or if there was a different
* problem inserting the job.
*/
@Override
public void create(PortabilityJob job) throws IOException {
public void create(LegacyPortabilityJob job) throws IOException {
Preconditions.checkNotNull(job.id());
String jobId = job.id();
Transaction transaction = datastore.newTransaction();
Expand All @@ -90,24 +89,24 @@ public void create(PortabilityJob job) throws IOException {
}

/**
* Finds the {@link PortabilityJob} keyed by {@code jobId} in Datastore, or null if none found.
* Finds the {@link LegacyPortabilityJob} keyed by {@code jobId} in Datastore, or null if none found.
*/
@Override
public PortabilityJob find(String jobId) {
public LegacyPortabilityJob find(String jobId) {
Entity entity = datastore.get(getKey(jobId));
if (entity == null) {
return null;
}
return PortabilityJob.mapToJob(getProperties(entity));
return LegacyPortabilityJob.mapToJob(getProperties(entity));
}

/**
* Finds the {@link PortabilityJob} keyed by {@code jobId} in Datastore, and verify it is in
* Finds the {@link LegacyPortabilityJob} keyed by {@code jobId} in Datastore, and verify it is in
* state {@code jobState}.
*/
@Override
public PortabilityJob find(String jobId, JobState jobState) {
PortabilityJob job = find(jobId);
public LegacyPortabilityJob find(String jobId, JobState jobState) {
LegacyPortabilityJob job = find(jobId);
Preconditions.checkNotNull(job,
"Expected job {} to be in state {}, but the job was not found", jobId, jobState);
Preconditions.checkState(job.jobState() == jobState,
Expand All @@ -116,14 +115,14 @@ public PortabilityJob find(String jobId, JobState jobState) {
}

/**
* Finds the ID of the first {@link PortabilityJob} in state {@code jobState} in Datastore, or null
* Finds the ID of the first {@link LegacyPortabilityJob} in state {@code jobState} in Datastore, or null
* if none found.
*/
@Override
public String findFirst(JobState jobState) {
Query<Entity> query = Query.newEntityQueryBuilder()
.setKind(KIND)
.setFilter(PropertyFilter.eq(PortabilityJobConverter.JOB_STATE, jobState.name()))
.setFilter(PropertyFilter.eq(OldPortabilityJobConverter.JOB_STATE, jobState.name()))
.setOrderBy(OrderBy.asc("created"))
.setLimit(1)
.build();
Expand All @@ -132,11 +131,11 @@ public String findFirst(JobState jobState) {
return null;
}
Entity entity = results.next();
return (String) entity.getValue(PortabilityJobConverter.ID_DATA_KEY).get();
return (String) entity.getValue(OldPortabilityJobConverter.ID_DATA_KEY).get();
}

/**
* Removes the {@link PortabilityJob} keyed by {@code jobId} in Datastore.
* Removes the {@link LegacyPortabilityJob} keyed by {@code jobId} in Datastore.
*
* @throws IOException if the job doesn't exist, or there was a different problem deleting it.
*/
Expand All @@ -150,15 +149,15 @@ public void remove(String jobId) throws IOException {
}

/**
* Atomically updates the {@link PortabilityJob} keyed by {@code jobId} to {@code portabilityJob},
* Atomically updates the {@link LegacyPortabilityJob} keyed by {@code jobId} to {@code OldPortabilityJob},
* in Datastore using a {@link Transaction}, and verifies that it was previously in the expected
* {@code previousState}.
*
* @throws IOException if the job was not in the expected state in Datastore, or there was another
* problem updating it.
*/
@Override
public void update(PortabilityJob job, JobState previousState)
public void update(LegacyPortabilityJob job, JobState previousState)
throws IOException {
Preconditions.checkNotNull(job.id());
String jobId = job.id();
Expand Down Expand Up @@ -259,13 +258,13 @@ private Key getKey(String jobId) {
/**
* Return {@code entity}'s {@link JobState}, or null if missing.
*
* @param entity a {@link PortabilityJob}'s representation in {@link #datastore}.
* @param entity a {@link LegacyPortabilityJob}'s representation in {@link #datastore}.
*/
private JobState getJobState(Entity entity) {
String jobState = entity.getString(PortabilityJobConverter.JOB_STATE);
String jobState = entity.getString(OldPortabilityJobConverter.JOB_STATE);
// TODO: Remove null check once we enable encryptedFlow everywhere. Null should only be allowed
// in legacy non-encrypted case
if (!commonSettings.getEncryptedFlow()) {
if (!encryptedFlow) {
return jobState == null ? null : JobState.valueOf(jobState);
}
Preconditions.checkNotNull(jobState, "Job should never exist without a state");
Expand Down
20 changes: 20 additions & 0 deletions extensions/cloud/portability-cloud-local/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2018 The Data-Portability Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

dependencies {
compile project(':portability-spi-cloud')
compile project(':portability-spi-transfer')
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,34 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import org.dataportabilityproject.cloud.interfaces.PersistentKeyValueStore;
import org.dataportabilityproject.job.PortabilityJob.JobState;
import org.dataportabilityproject.job.PortabilityJob;
import org.dataportabilityproject.job.PortabilityJobConverter;
import org.dataportabilityproject.shared.settings.CommonSettings;
import org.dataportabilityproject.spi.cloud.storage.JobStore;
import org.dataportabilityproject.spi.cloud.types.LegacyPortabilityJob;
import org.dataportabilityproject.spi.cloud.types.LegacyPortabilityJob.JobState;
import org.dataportabilityproject.spi.cloud.types.OldPortabilityJobConverter;

/**
* An in-memory {@link PersistentKeyValueStore} implementation that uses a concurrent map as its
* An in-memory {@link JobStore} implementation that uses a concurrent map as its
* store.
*/
public final class InMemoryPersistentKeyValueStore implements PersistentKeyValueStore {
public final class InMemoryKeyValueStore implements JobStore {
private final ConcurrentHashMap<String, Map<String, Object>> map;
private final CommonSettings commonSettings;
private final boolean encryptedFlow;

public InMemoryPersistentKeyValueStore(CommonSettings commonSettings) {
map = new ConcurrentHashMap<>();
this.commonSettings = commonSettings;
public InMemoryKeyValueStore(boolean encryptedFlow) {
this.map = new ConcurrentHashMap<>();
this.encryptedFlow = encryptedFlow;
}

/**
* Inserts a new {@link PortabilityJob} keyed by its job ID in the map.
* Inserts a new {@link LegacyPortabilityJob} keyed by its job ID in the map.
*
* <p>To update an existing {@link PortabilityJob} instead, use {@link #update}.
* <p>To update an existing {@link LegacyPortabilityJob} instead, use {@link #update}.
*
* @throws IOException if a job already exists for {@code jobId}, or if there was a different
* problem inserting the job.
*/
@Override
public synchronized void create(PortabilityJob job) throws IOException {
public synchronized void create(LegacyPortabilityJob job) throws IOException {
Preconditions.checkNotNull(job.id());
String jobId = job.id();
if (map.get(jobId) != null) {
Expand All @@ -58,23 +57,23 @@ public synchronized void create(PortabilityJob job) throws IOException {
}

/**
* Finds the {@link PortabilityJob} keyed by {@code jobId} in the map, or null if not found.
* Finds the {@link LegacyPortabilityJob} keyed by {@code jobId} in the map, or null if not found.
*/
@Override
public PortabilityJob find(String key) {
public LegacyPortabilityJob find(String key) {
if (!map.containsKey(key)) {
return null;
}
return PortabilityJob.mapToJob(map.get(key));
return LegacyPortabilityJob.mapToJob(map.get(key));
}

/**
* Finds the {@link PortabilityJob} keyed by {@code jobId} in the map, and verify it is in
* Finds the {@link LegacyPortabilityJob} keyed by {@code jobId} in the map, and verify it is in
* state {@code jobState}.
*/
@Override
public PortabilityJob find(String jobId, JobState jobState) {
PortabilityJob job = find(jobId);
public LegacyPortabilityJob find(String jobId, JobState jobState) {
LegacyPortabilityJob job = find(jobId);
Preconditions.checkNotNull(job,
"Expected job {} to be in state {}, but the job was not found", jobId, jobState);
Preconditions.checkState(job.jobState() == jobState,
Expand All @@ -83,15 +82,15 @@ public PortabilityJob find(String jobId, JobState jobState) {
}

/**
* Finds the ID of the first {@link PortabilityJob} in state {@code jobState} in the map, or null
* Finds the ID of the first {@link LegacyPortabilityJob} in state {@code jobState} in the map, or null
* if none found.
*/
@Override
public synchronized String findFirst(JobState jobState) {
// Mimic an index lookup
for (Entry<String, Map<String, Object>> job : map.entrySet()) {
Map<String, Object> properties = job.getValue();
if (JobState.valueOf(properties.get(PortabilityJobConverter.JOB_STATE).toString())
if (JobState.valueOf(properties.get(OldPortabilityJobConverter.JOB_STATE).toString())
== jobState) {
String jobId = job.getKey();
return jobId;
Expand All @@ -101,7 +100,7 @@ public synchronized String findFirst(JobState jobState) {
}

/**
* Removes the {@link PortabilityJob} keyed by {@code jobId} in the map.
* Removes the {@link LegacyPortabilityJob} keyed by {@code jobId} in the map.
*
* @throws IOException if the job doesn't exist, or there was a different problem deleting it.
*/
Expand All @@ -114,14 +113,14 @@ public void remove(String jobId) throws IOException {
}

/**
* Atomically updates the {@link PortabilityJob} keyed by {@code jobId} to {@code portabilityJob}
* Atomically updates the {@link LegacyPortabilityJob} keyed by {@code jobId} to {@code OldPortabilityJob}
* in the map, and verifies that it was previously in the expected {@code previousState}.
*
* @throws IOException if the job was not in the expected state in the map, or there was another
* problem updating it.
*/
@Override
public void update(PortabilityJob job, JobState previousState)
public void update(LegacyPortabilityJob job, JobState previousState)
throws IOException{
Preconditions.checkNotNull(job.id());
String jobId = job.id();
Expand All @@ -143,13 +142,13 @@ public void update(PortabilityJob job, JobState previousState)
/**
* Return {@code data}'s {@link JobState}, or null if missing.
*
* @param data a {@link PortabilityJob}'s representation in {@link #map}.
* @param data a {@link LegacyPortabilityJob}'s representation in {@link #map}.
*/
private JobState getJobState(Map<String, Object> data) {
Object jobState = data.get(PortabilityJobConverter.JOB_STATE);
Object jobState = data.get(OldPortabilityJobConverter.JOB_STATE);
// TODO: Remove null check once we enable encryptedFlow everywhere. Null should only be allowed
// in legacy non-encrypted case
if (!commonSettings.getEncryptedFlow()) {
if (!encryptedFlow) {
return jobState == null ? null : JobState.valueOf(jobState.toString());
}
Preconditions.checkNotNull(jobState, "Job should never exist without a state");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
/*
* Copyright 2017 Google Inc.
* Copyright 2018 The Data-Portability Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dataportabilityproject.shared.auth;

import java.io.Serializable;

/**
* Holder of auth information tha can be passed into a service specific {@code AuthGenerator}
* to generate service specific
* Extensions for running the system on Google App Engine.
*/
public abstract class AuthData implements Serializable {
package org.dataportabilityproject.cloud.local;




}
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@
import java.net.HttpCookie;
import javax.crypto.SecretKey;
import org.dataportabilityproject.cloud.interfaces.CloudFactory;
import org.dataportabilityproject.cloud.interfaces.PersistentKeyValueStore;
import org.dataportabilityproject.job.Crypter;
import org.dataportabilityproject.job.CrypterFactory;
import org.dataportabilityproject.job.PortabilityJob.JobState;
import org.dataportabilityproject.job.PortabilityJob;
import org.dataportabilityproject.job.SecretKeyGenerator;
import org.dataportabilityproject.shared.ServiceMode;
import org.dataportabilityproject.shared.auth.AuthData;
import org.dataportabilityproject.spi.cloud.storage.JobStore;
import org.dataportabilityproject.spi.cloud.types.LegacyPortabilityJob;
import org.dataportabilityproject.spi.cloud.types.LegacyPortabilityJob.JobState;
import org.dataportabilityproject.types.transfer.auth.AuthData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -43,11 +43,11 @@ class CryptoHelper {
private static final Logger logger = LoggerFactory.getLogger(CryptoHelper.class);
private static final Gson GSON = new Gson();

private final PersistentKeyValueStore store;
private final JobStore store;

@Inject
CryptoHelper(CloudFactory cloudFactory) {
this.store = cloudFactory.getPersistentKeyValueStore();
this.store = cloudFactory.getJobStore();
}

/**
Expand Down Expand Up @@ -81,7 +81,7 @@ void encryptAndSetCookie(Headers headers, String jobId, ServiceMode serviceMode,
}

private SecretKey getSessionKey(String jobId) {
PortabilityJob job = store.find(jobId);
LegacyPortabilityJob job = store.find(jobId);
Preconditions.checkState(job != null && job.jobState() == JobState.PENDING_AUTH_DATA);
String encodedSessionKey = job.sessionKey();
Preconditions.checkState(!Strings.isNullOrEmpty(encodedSessionKey),
Expand Down

0 comments on commit f5d17ae

Please sign in to comment.