Skip to content

Commit

Permalink
Prototype for Room and Job Queue integration and cache invalidation p…
Browse files Browse the repository at this point in the history
…attern
  • Loading branch information
DwayneJengSage committed Jun 23, 2018
1 parent a08a255 commit b4a6426
Show file tree
Hide file tree
Showing 10 changed files with 682 additions and 1 deletion.
8 changes: 7 additions & 1 deletion android-sdk/build.gradle
Expand Up @@ -33,7 +33,7 @@ android {
}

dependencies {
api 'org.sagebionetworks.bridge:rest-client:0.15.6', {
api 'org.sagebionetworks.bridge:rest-client:0.15.23', {
exclude group: 'joda-time', module: 'joda-time'
}
api 'org.sagebionetworks:BridgeDataUploadUtils:0.2.3', {
Expand All @@ -51,6 +51,12 @@ dependencies {

implementation fileTree(dir: 'libs', include: ['*.jar'])

def room_version = "1.1.0"
implementation "android.arch.persistence.room:runtime:$room_version"
annotationProcessor "android.arch.persistence.room:compiler:$room_version"

implementation 'com.birbit:android-priority-jobqueue:2.0.1'

implementation 'com.google.dagger:dagger-android:2.14.1'
implementation 'com.google.dagger:dagger-android-support:2.14.1'
annotationProcessor 'com.google.dagger:dagger-android-processor:2.14.1'
Expand Down
@@ -0,0 +1,85 @@
/*
* Copyright 2018 Sage Bionetworks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.sagebionetworks.bridge.android.jobqueue;

import javax.inject.Inject;

import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
import com.birbit.android.jobqueue.Job;
import com.birbit.android.jobqueue.Params;
import com.birbit.android.jobqueue.RetryConstraint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.sagebionetworks.bridge.android.manager.ActivityManagerV2;
import org.sagebionetworks.bridge.rest.exceptions.BridgeSDKException;

public abstract class BridgeJob extends Job {
private static final Logger LOG = LoggerFactory.getLogger(BridgeJob.class);

private static final long BACKOFF_DELAY = 100;
private static final int MAX_TRIES = 5;

// As per sample code, priority 500 is considered medium priority.
private static final int PRIORITY = 500;

@Inject
private ActivityManagerV2 activityManagerV2;

protected BridgeJob() {
super(new Params(PRIORITY).requireNetwork().persist());
}

protected ActivityManagerV2 getActivityManagerV2() {
return activityManagerV2;
}

@Override
protected int getRetryLimit() {
return MAX_TRIES;
}

@Override
public void onAdded() {
// Job has been scheduled but not executed yet. Nothing to do yet.
}

@Override
protected void onCancel(int cancelReason, @Nullable Throwable throwable) {
LOG.error("Error running async job, job=" + this.getClass().getName() + ", cancelReason=" +
cancelReason + ", errorClass=" +
(throwable != null ? throwable.getClass().getName() : "null") + ", message=" +
(throwable != null ? throwable.getMessage() : "null"));
}

@Override
protected RetryConstraint shouldReRunOnThrowable(@NonNull Throwable throwable, int runCount,
int maxRunCount) {
if (throwable instanceof BridgeSDKException) {
int statusCode = ((BridgeSDKException) throwable).getStatusCode();
if (statusCode >= 400 && statusCode <= 499) {
// Determinisitic failure from the server. Do not retry.
return RetryConstraint.CANCEL;
}
}

// In all other cases, we should retry, up to the limit, with exponential backoff.
return RetryConstraint.createExponentialBackoff(runCount, BACKOFF_DELAY);
}
}
@@ -0,0 +1,44 @@
/*
* Copyright 2018 Sage Bionetworks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.sagebionetworks.bridge.android.jobqueue;

import org.joda.time.DateTime;
import rx.Observable;

import org.sagebionetworks.bridge.android.persistence.ScheduledActivityEntity;

public class GetActivitiesByDateJob extends BridgeJob {
private final DateTime startTime;
private final DateTime endTime;

public GetActivitiesByDateJob(DateTime startTime, DateTime endTime) {
this.startTime = startTime;
this.endTime = endTime;
}

@Override
public void onRun() {
// Defer to activity manager to get and cache activities.
Observable<ScheduledActivityEntity> observable = getActivityManagerV2()
.downloadAndCacheScheduledActivitiesForDates(startTime, endTime);

// Queued jobs are run asynchronously. The pattern here is that when onRun completes, the
// job is complete, so block on observable completion.
observable.toBlocking().lastOrDefault(null);
}
}
@@ -0,0 +1,63 @@
/*
* Copyright 2018 Sage Bionetworks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.sagebionetworks.bridge.android.jobqueue;

import java.lang.reflect.Type;
import java.util.List;

import com.google.gson.reflect.TypeToken;
import rx.Completable;
import rx.Observable;

import org.sagebionetworks.bridge.android.persistence.ScheduledActivityEntity;
import org.sagebionetworks.bridge.rest.RestUtils;
import org.sagebionetworks.bridge.rest.model.ScheduledActivity;

public class UpdateActivitiesJob extends BridgeJob {
private static final Type ACTIVITY_LIST_TYPE = new TypeToken<List<ScheduledActivity>>(){}
.getType();

private transient List<ScheduledActivity> activityList;
private final String serializedActivityList;

public UpdateActivitiesJob(List<ScheduledActivity> activityList) {
this.activityList = activityList;

// Job is serializable so that JobPriorityQueue can persist the job status. However,
// ScheduledActivity is not serializable. To make this serializable, we store it as JSON.
this.serializedActivityList = RestUtils.GSON.toJson(activityList);
}

private List<ScheduledActivity> getActivityList() {
if (activityList == null) {
activityList = RestUtils.GSON.fromJson(serializedActivityList, ACTIVITY_LIST_TYPE);
}
return activityList;
}

@Override
public void onRun() {
// Defer to activity manager to update activities.
Completable completable = getActivityManagerV2().updateRemoteScheduledActivities(
getActivityList());

// Queued jobs are run asynchronously. The pattern here is that when onRun completes, the
// job is complete, so block on completable completion.
completable.await();
}
}
@@ -0,0 +1,161 @@
/*
* Copyright 2018 Sage Bionetworks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.sagebionetworks.bridge.android.manager;

import static com.google.common.base.Preconditions.checkNotNull;
import static org.sagebionetworks.bridge.android.util.retrofit.RxUtils.toBodySingle;

import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import javax.inject.Singleton;

import android.support.annotation.AnyThread;
import android.support.annotation.NonNull;
import com.birbit.android.jobqueue.JobManager;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Completable;
import rx.Observable;

import org.sagebionetworks.bridge.android.jobqueue.GetActivitiesByDateJob;
import org.sagebionetworks.bridge.android.persistence.PersistenceUtils;
import org.sagebionetworks.bridge.android.persistence.ScheduledActivityDAO;
import org.sagebionetworks.bridge.android.persistence.ScheduledActivityEntity;
import org.sagebionetworks.bridge.rest.model.ScheduledActivity;

@AnyThread
@Singleton
public class ActivityManagerV2 {
private static final Logger LOG = LoggerFactory.getLogger(ActivityManagerV2.class);

// Cache activities for 24 hours.
private static final long CACHE_TTL_MILLIS = 24 * 60 * 60 * 1000;

@NonNull
private final AtomicReference<AuthenticationManager.AuthStateHolder>
authStateHolderAtomicReference;

@NonNull
private final JobManager jobManager;

@NonNull
private final ScheduledActivityDAO scheduledActivityDAO;

public ActivityManagerV2(@NonNull AuthenticationManager authManager,
@NonNull JobManager jobManager, @NonNull ScheduledActivityDAO scheduledActivityDAO) {
checkNotNull(authManager);
checkNotNull(jobManager);
checkNotNull(scheduledActivityDAO);

this.authStateHolderAtomicReference = authManager.getAuthStateReference();
this.jobManager = jobManager;
this.scheduledActivityDAO = scheduledActivityDAO;
}

public Observable<ScheduledActivity> getActivities(@NonNull DateTime startTime,

This comment has been minimized.

Copy link
@liujoshua

liujoshua Jun 25, 2018

Thinking about the serialization/deserialization and mapping issues, I'm thinking it'd be good for this manager to return its own ScheduledActivityModel. This would keep ActivityManager better encapsulated in case we do something like move to a /v4/activities API

@NonNull DateTime endTime) {
DateTime normalizedEndTime;
try {
normalizedEndTime = normalizeEndTime(startTime, endTime);
} catch (Exception e) {
// in case we do something wrong with JodaTime
return Observable.error(e);
}

return Observable.fromCallable(() -> scheduledActivityDAO
.getScheduledActivitiesForDates(startTime.getMillis(), normalizedEndTime.getMillis()))
.flatMap(activityList -> {
if (activityList == null || activityList.isEmpty()) {
// Cache has no elements. Call server and return an in-place Observable.
return downloadAndCacheScheduledActivitiesForDates(startTime,
normalizedEndTime);
} else {
// Check cache expiration.
long now = System.currentTimeMillis();
boolean hasExpired = false;
for (ScheduledActivityEntity oneActivity : activityList) {
if (oneActivity.getLastSyncedOn() + CACHE_TTL_MILLIS < now) {
hasExpired = true;
break;
}
}

// If we have expired cache elements, kick off a job to update the cache in
// the background.
if (hasExpired) {
jobManager.addJobInBackground(new GetActivitiesByDateJob(startTime,
normalizedEndTime));
}

// Return the cached elements we have, in case we don't have network
// connectivity or the call is otherwise too expensive.
return Observable.from(activityList);
}
}).map(PersistenceUtils::databaseActivityToServerActivity);
}

public Observable<ScheduledActivityEntity> downloadAndCacheScheduledActivitiesForDates(
@NonNull DateTime startTime, @NonNull DateTime normalizedEndTime) {
// todo pagination
return toBodySingle(authStateHolderAtomicReference.get().forConsentedUsersApi
.getScheduledActivitiesByDateRange(startTime, normalizedEndTime))
.toObservable()
.flatMap(list -> Observable.from(list.getItems()))
.map(activity -> {
ScheduledActivityEntity activityEntity = PersistenceUtils
.serverActivityToDatabaseActivity(activity);
scheduledActivityDAO.writeScheduledActivities(activityEntity);
return activityEntity;
});
}

private static DateTime normalizeEndTime(@NonNull DateTime startTime,
@NonNull DateTime endTime) {
int startOffset = startTime.getZone().getOffset(startTime);
int endOffset = endTime.getZone().getOffset(endTime);
if (startOffset != endOffset) {
DateTime normalizedEndTime = endTime.toDateTime(DateTimeZone.forOffsetMillis(startOffset));
LOG.warn("Correcting for mismatched offset. startTime: {}, endTime: {}, newEndTime: {}", startTime,
endTime, normalizedEndTime);
return normalizedEndTime;
} else {
return endTime;
}
}

public Completable updateActivities(@NonNull List<ScheduledActivity> activityList) {
return Completable.fromAction(() -> {
List<ScheduledActivityEntity> entityList = Lists.transform(activityList,
PersistenceUtils::serverActivityToDatabaseActivity);
scheduledActivityDAO.writeScheduledActivities(Iterables.toArray(entityList,
ScheduledActivityEntity.class));
}).mergeWith(Completable.fromAction(() -> {

}));
}

public Completable updateRemoteScheduledActivities(
@NonNull List<ScheduledActivity> activityList) {
return toBodySingle(authStateHolderAtomicReference.get().forConsentedUsersApi
.updateScheduledActivities(activityList)).toCompletable();
}
}

0 comments on commit b4a6426

Please sign in to comment.