Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate PortabilityJob, AuthData, and job stores to the new directory structure #113

Merged
merged 4 commits into from
Feb 2, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions extensions/cloud/portability-cloud-google/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@
dependencies {
compile project(':portability-spi-cloud')
compile project(':portability-spi-transfer')

compile("com.google.cloud:google-cloud-storage:${googleDatastoreVersion}")
compile("com.google.cloud:google-cloud-datastore:${googleDatastoreVersion}")
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,37 +39,36 @@
import java.io.ObjectOutputStream;
import java.util.Map;
import java.util.Map.Entry;
import org.dataportabilityproject.cloud.interfaces.PersistentKeyValueStore;
import org.dataportabilityproject.job.PortabilityJob.JobState;
import org.dataportabilityproject.job.PortabilityJob;
import org.dataportabilityproject.job.PortabilityJobConverter;
import org.dataportabilityproject.shared.settings.CommonSettings;
import org.dataportabilityproject.spi.cloud.storage.JobStore;
import org.dataportabilityproject.spi.cloud.types.OldPortabilityJob;
import org.dataportabilityproject.spi.cloud.types.OldPortabilityJob.JobState;
import org.dataportabilityproject.spi.cloud.types.OldPortabilityJobConverter;

/**
* A {@link PersistentKeyValueStore} implementation based on Google Cloud Platform's Datastore.
* A {@link JobStore} implementation based on Google Cloud Platform's Datastore.
*/
public final class GooglePersistentKeyValueStore implements PersistentKeyValueStore {
public final class GoogleCloudDatastore implements JobStore {
private static final String KIND = "persistentKey";
private static final String CREATED_FIELD = "created";

private final Datastore datastore;
private final CommonSettings commonSettings;
private final boolean encryptedFlow;

public GooglePersistentKeyValueStore(Datastore datastore, CommonSettings commonSettings) {
public GoogleCloudDatastore(Datastore datastore, boolean encryptedFlow) {
this.datastore = datastore;
this.commonSettings = commonSettings;
this.encryptedFlow = encryptedFlow;
}

/**
* Inserts a new {@link PortabilityJob} keyed by its job ID in Datastore.
* Inserts a new {@link OldPortabilityJob} keyed by its job ID in Datastore.
*
* <p>To update an existing {@link PortabilityJob} instead, use {@link #update}.
* <p>To update an existing {@link OldPortabilityJob} instead, use {@link #update}.
*
* @throws IOException if a job already exists for {@code jobId}, or if there was a different
* problem inserting the job.
*/
@Override
public void create(PortabilityJob job) throws IOException {
public void create(OldPortabilityJob job) throws IOException {
Preconditions.checkNotNull(job.id());
String jobId = job.id();
Transaction transaction = datastore.newTransaction();
Expand All @@ -90,24 +89,24 @@ public void create(PortabilityJob job) throws IOException {
}

/**
* Finds the {@link PortabilityJob} keyed by {@code jobId} in Datastore, or null if none found.
* Finds the {@link OldPortabilityJob} keyed by {@code jobId} in Datastore, or null if none found.
*/
@Override
public PortabilityJob find(String jobId) {
public OldPortabilityJob find(String jobId) {
Entity entity = datastore.get(getKey(jobId));
if (entity == null) {
return null;
}
return PortabilityJob.mapToJob(getProperties(entity));
return OldPortabilityJob.mapToJob(getProperties(entity));
}

/**
* Finds the {@link PortabilityJob} keyed by {@code jobId} in Datastore, and verify it is in
* Finds the {@link OldPortabilityJob} keyed by {@code jobId} in Datastore, and verify it is in
* state {@code jobState}.
*/
@Override
public PortabilityJob find(String jobId, JobState jobState) {
PortabilityJob job = find(jobId);
public OldPortabilityJob find(String jobId, JobState jobState) {
OldPortabilityJob job = find(jobId);
Preconditions.checkNotNull(job,
"Expected job {} to be in state {}, but the job was not found", jobId, jobState);
Preconditions.checkState(job.jobState() == jobState,
Expand All @@ -116,14 +115,14 @@ public PortabilityJob find(String jobId, JobState jobState) {
}

/**
* Finds the ID of the first {@link PortabilityJob} in state {@code jobState} in Datastore, or null
* Finds the ID of the first {@link OldPortabilityJob} in state {@code jobState} in Datastore, or null
* if none found.
*/
@Override
public String findFirst(JobState jobState) {
Query<Entity> query = Query.newEntityQueryBuilder()
.setKind(KIND)
.setFilter(PropertyFilter.eq(PortabilityJobConverter.JOB_STATE, jobState.name()))
.setFilter(PropertyFilter.eq(OldPortabilityJobConverter.JOB_STATE, jobState.name()))
.setOrderBy(OrderBy.asc("created"))
.setLimit(1)
.build();
Expand All @@ -132,11 +131,11 @@ public String findFirst(JobState jobState) {
return null;
}
Entity entity = results.next();
return (String) entity.getValue(PortabilityJobConverter.ID_DATA_KEY).get();
return (String) entity.getValue(OldPortabilityJobConverter.ID_DATA_KEY).get();
}

/**
* Removes the {@link PortabilityJob} keyed by {@code jobId} in Datastore.
* Removes the {@link OldPortabilityJob} keyed by {@code jobId} in Datastore.
*
* @throws IOException if the job doesn't exist, or there was a different problem deleting it.
*/
Expand All @@ -150,15 +149,15 @@ public void remove(String jobId) throws IOException {
}

/**
* Atomically updates the {@link PortabilityJob} keyed by {@code jobId} to {@code portabilityJob},
* Atomically updates the {@link OldPortabilityJob} keyed by {@code jobId} to {@code OldPortabilityJob},
* in Datastore using a {@link Transaction}, and verifies that it was previously in the expected
* {@code previousState}.
*
* @throws IOException if the job was not in the expected state in Datastore, or there was another
* problem updating it.
*/
@Override
public void update(PortabilityJob job, JobState previousState)
public void update(OldPortabilityJob job, JobState previousState)
throws IOException {
Preconditions.checkNotNull(job.id());
String jobId = job.id();
Expand Down Expand Up @@ -259,13 +258,13 @@ private Key getKey(String jobId) {
/**
* Return {@code entity}'s {@link JobState}, or null if missing.
*
* @param entity a {@link PortabilityJob}'s representation in {@link #datastore}.
* @param entity a {@link OldPortabilityJob}'s representation in {@link #datastore}.
*/
private JobState getJobState(Entity entity) {
String jobState = entity.getString(PortabilityJobConverter.JOB_STATE);
String jobState = entity.getString(OldPortabilityJobConverter.JOB_STATE);
// TODO: Remove null check once we enable encryptedFlow everywhere. Null should only be allowed
// in legacy non-encrypted case
if (!commonSettings.getEncryptedFlow()) {
if (!encryptedFlow) {
return jobState == null ? null : JobState.valueOf(jobState);
}
Preconditions.checkNotNull(jobState, "Job should never exist without a state");
Expand Down
20 changes: 20 additions & 0 deletions extensions/cloud/portability-cloud-local/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2018 The Data-Portability Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

dependencies {
compile project(':portability-spi-cloud')
compile project(':portability-spi-transfer')
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,34 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import org.dataportabilityproject.cloud.interfaces.PersistentKeyValueStore;
import org.dataportabilityproject.job.PortabilityJob.JobState;
import org.dataportabilityproject.job.PortabilityJob;
import org.dataportabilityproject.job.PortabilityJobConverter;
import org.dataportabilityproject.shared.settings.CommonSettings;
import org.dataportabilityproject.spi.cloud.storage.JobStore;
import org.dataportabilityproject.spi.cloud.types.OldPortabilityJob;
import org.dataportabilityproject.spi.cloud.types.OldPortabilityJob.JobState;
import org.dataportabilityproject.spi.cloud.types.OldPortabilityJobConverter;

/**
* An in-memory {@link PersistentKeyValueStore} implementation that uses a concurrent map as its
* An in-memory {@link JobStore} implementation that uses a concurrent map as its
* store.
*/
public final class InMemoryPersistentKeyValueStore implements PersistentKeyValueStore {
public final class InMemoryKeyValueStore implements JobStore {
private final ConcurrentHashMap<String, Map<String, Object>> map;
private final CommonSettings commonSettings;
private final boolean encryptedFlow;

public InMemoryPersistentKeyValueStore(CommonSettings commonSettings) {
map = new ConcurrentHashMap<>();
this.commonSettings = commonSettings;
public InMemoryKeyValueStore(boolean encryptedFlow) {
this.map = new ConcurrentHashMap<>();
this.encryptedFlow = encryptedFlow;
}

/**
* Inserts a new {@link PortabilityJob} keyed by its job ID in the map.
* Inserts a new {@link OldPortabilityJob} keyed by its job ID in the map.
*
* <p>To update an existing {@link PortabilityJob} instead, use {@link #update}.
* <p>To update an existing {@link OldPortabilityJob} instead, use {@link #update}.
*
* @throws IOException if a job already exists for {@code jobId}, or if there was a different
* problem inserting the job.
*/
@Override
public synchronized void create(PortabilityJob job) throws IOException {
public synchronized void create(OldPortabilityJob job) throws IOException {
Preconditions.checkNotNull(job.id());
String jobId = job.id();
if (map.get(jobId) != null) {
Expand All @@ -58,23 +57,23 @@ public synchronized void create(PortabilityJob job) throws IOException {
}

/**
* Finds the {@link PortabilityJob} keyed by {@code jobId} in the map, or null if not found.
* Finds the {@link OldPortabilityJob} keyed by {@code jobId} in the map, or null if not found.
*/
@Override
public PortabilityJob find(String key) {
public OldPortabilityJob find(String key) {
if (!map.containsKey(key)) {
return null;
}
return PortabilityJob.mapToJob(map.get(key));
return OldPortabilityJob.mapToJob(map.get(key));
}

/**
* Finds the {@link PortabilityJob} keyed by {@code jobId} in the map, and verify it is in
* Finds the {@link OldPortabilityJob} keyed by {@code jobId} in the map, and verify it is in
* state {@code jobState}.
*/
@Override
public PortabilityJob find(String jobId, JobState jobState) {
PortabilityJob job = find(jobId);
public OldPortabilityJob find(String jobId, JobState jobState) {
OldPortabilityJob job = find(jobId);
Preconditions.checkNotNull(job,
"Expected job {} to be in state {}, but the job was not found", jobId, jobState);
Preconditions.checkState(job.jobState() == jobState,
Expand All @@ -83,15 +82,15 @@ public PortabilityJob find(String jobId, JobState jobState) {
}

/**
* Finds the ID of the first {@link PortabilityJob} in state {@code jobState} in the map, or null
* Finds the ID of the first {@link OldPortabilityJob} in state {@code jobState} in the map, or null
* if none found.
*/
@Override
public synchronized String findFirst(JobState jobState) {
// Mimic an index lookup
for (Entry<String, Map<String, Object>> job : map.entrySet()) {
Map<String, Object> properties = job.getValue();
if (JobState.valueOf(properties.get(PortabilityJobConverter.JOB_STATE).toString())
if (JobState.valueOf(properties.get(OldPortabilityJobConverter.JOB_STATE).toString())
== jobState) {
String jobId = job.getKey();
return jobId;
Expand All @@ -101,7 +100,7 @@ public synchronized String findFirst(JobState jobState) {
}

/**
* Removes the {@link PortabilityJob} keyed by {@code jobId} in the map.
* Removes the {@link OldPortabilityJob} keyed by {@code jobId} in the map.
*
* @throws IOException if the job doesn't exist, or there was a different problem deleting it.
*/
Expand All @@ -114,14 +113,14 @@ public void remove(String jobId) throws IOException {
}

/**
* Atomically updates the {@link PortabilityJob} keyed by {@code jobId} to {@code portabilityJob}
* Atomically updates the {@link OldPortabilityJob} keyed by {@code jobId} to {@code OldPortabilityJob}
* in the map, and verifies that it was previously in the expected {@code previousState}.
*
* @throws IOException if the job was not in the expected state in the map, or there was another
* problem updating it.
*/
@Override
public void update(PortabilityJob job, JobState previousState)
public void update(OldPortabilityJob job, JobState previousState)
throws IOException{
Preconditions.checkNotNull(job.id());
String jobId = job.id();
Expand All @@ -143,13 +142,13 @@ public void update(PortabilityJob job, JobState previousState)
/**
* Return {@code data}'s {@link JobState}, or null if missing.
*
* @param data a {@link PortabilityJob}'s representation in {@link #map}.
* @param data a {@link OldPortabilityJob}'s representation in {@link #map}.
*/
private JobState getJobState(Map<String, Object> data) {
Object jobState = data.get(PortabilityJobConverter.JOB_STATE);
Object jobState = data.get(OldPortabilityJobConverter.JOB_STATE);
// TODO: Remove null check once we enable encryptedFlow everywhere. Null should only be allowed
// in legacy non-encrypted case
if (!commonSettings.getEncryptedFlow()) {
if (!encryptedFlow) {
return jobState == null ? null : JobState.valueOf(jobState.toString());
}
Preconditions.checkNotNull(jobState, "Job should never exist without a state");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
/*
* Copyright 2017 Google Inc.
* Copyright 2018 The Data-Portability Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dataportabilityproject.shared.auth;

import java.io.Serializable;

/**
* Holder of auth information tha can be passed into a service specific {@code AuthGenerator}
* to generate service specific
* Extensions for running the system on Google App Engine.
*/
public abstract class AuthData implements Serializable {
package org.dataportabilityproject.cloud.local;




}
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@
import java.net.HttpCookie;
import javax.crypto.SecretKey;
import org.dataportabilityproject.cloud.interfaces.CloudFactory;
import org.dataportabilityproject.cloud.interfaces.PersistentKeyValueStore;
import org.dataportabilityproject.job.Crypter;
import org.dataportabilityproject.job.CrypterFactory;
import org.dataportabilityproject.job.PortabilityJob.JobState;
import org.dataportabilityproject.job.PortabilityJob;
import org.dataportabilityproject.job.SecretKeyGenerator;
import org.dataportabilityproject.shared.ServiceMode;
import org.dataportabilityproject.shared.auth.AuthData;
import org.dataportabilityproject.spi.cloud.storage.JobStore;
import org.dataportabilityproject.spi.cloud.types.OldPortabilityJob;
import org.dataportabilityproject.spi.cloud.types.OldPortabilityJob.JobState;
import org.dataportabilityproject.types.transfer.auth.AuthData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -43,11 +43,11 @@ class CryptoHelper {
private static final Logger logger = LoggerFactory.getLogger(CryptoHelper.class);
private static final Gson GSON = new Gson();

private final PersistentKeyValueStore store;
private final JobStore store;

@Inject
CryptoHelper(CloudFactory cloudFactory) {
this.store = cloudFactory.getPersistentKeyValueStore();
this.store = cloudFactory.getJobStore();
}

/**
Expand Down Expand Up @@ -81,7 +81,7 @@ void encryptAndSetCookie(Headers headers, String jobId, ServiceMode serviceMode,
}

private SecretKey getSessionKey(String jobId) {
PortabilityJob job = store.find(jobId);
OldPortabilityJob job = store.find(jobId);
Preconditions.checkState(job != null && job.jobState() == JobState.PENDING_AUTH_DATA);
String encodedSessionKey = job.sessionKey();
Preconditions.checkState(!Strings.isNullOrEmpty(encodedSessionKey),
Expand Down