Skip to content

Commit

Permalink
Add RxJava Multi-threading to ProcessClients
Browse files Browse the repository at this point in the history
  • Loading branch information
ekigamba committed Nov 17, 2017
1 parent 3420f17 commit c45fdfc
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 5 deletions.
5 changes: 5 additions & 0 deletions opensrp-path/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ dependencies {
exclude group: 'com.android.support'
}

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
// Because RxAndroid releases are few and far between, it is recommended you also
// explicitly depend on RxJava's latest version for bug fixes and new features.
compile 'io.reactivex.rxjava2:rxjava:2.1.5'

testCompile("org.robolectric:shadows-multidex:3.4-rc2") {
exclude group: 'com.google.guava', module: 'guava'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@
import java.util.Calendar;
import java.util.Map;

import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import util.NetworkUtils;

public class SyncIntentService extends IntentService {
Expand Down Expand Up @@ -88,11 +95,15 @@ private FetchStatus doSync() {

private FetchStatus pullECFromServer(String locations) throws Exception {
int totalCount = 0;
ECSyncUpdater ecUpdater = ECSyncUpdater.getInstance(context);
final ECSyncUpdater ecUpdater = ECSyncUpdater.getInstance(context);

while (true) {
long startSyncTimeStamp = ecUpdater.getLastSyncTimeStamp();
final long startSyncTimeStamp = ecUpdater.getLastSyncTimeStamp();
Timer fetchTimer = new Timer();
fetchTimer.start();
int eCount = ecUpdater.fetchAllClientsAndEvents(AllConstants.SyncFilters.FILTER_LOCATION_ID, locations);
fetchTimer.stop();
fetchTimer.logDuration("Fetch clients and events");
totalCount += eCount;
if (eCount < 0) {
return FetchStatus.fetchedFailed;
Expand All @@ -102,9 +113,25 @@ private FetchStatus pullECFromServer(String locations) throws Exception {

Log.i(getClass().getName(), "Sync count: " + eCount);

long lastSyncTimeStamp = ecUpdater.getLastSyncTimeStamp();
PathClientProcessor.getInstance(context).processClient(ecUpdater.allEvents(startSyncTimeStamp, lastSyncTimeStamp));
sendSyncStatusBroadcastMessage(FetchStatus.fetched);
final long lastSyncTimeStamp = ecUpdater.getLastSyncTimeStamp();

final Subscription subsc = Observable.
just("")
.subscribeOn(Schedulers.io())
.map(new Func1<String, Object>() {
@Override
public Object call(String s) {
processClients(ecUpdater, startSyncTimeStamp, lastSyncTimeStamp);
return null;
}
})
.subscribe(
new Action1<Object>() {
@Override
public void call(Object o) {
}
}
);
}

if (totalCount == 0) {
Expand All @@ -116,6 +143,21 @@ private FetchStatus pullECFromServer(String locations) throws Exception {
}
}

private void processClients(ECSyncUpdater ecUpdater, long startSyncTimeStamp, long lastSyncTimeStamp) {

try {
Timer processClients = new Timer();
processClients.start();
PathClientProcessor.getInstance(context).processClient(ecUpdater.allEvents(startSyncTimeStamp, lastSyncTimeStamp));
processClients.stop();
processClients.logDuration("Process clients");
sendSyncStatusBroadcastMessage(FetchStatus.fetched);
} catch (Exception e) {
e.printStackTrace();
}

}

private void pushToServer() {
pushECToServer();
}
Expand Down Expand Up @@ -181,4 +223,28 @@ private void drishtiLogInfo(String message) {
}


public static class Timer {
private long startTime;
private long stopTime = 0l;
private boolean stopped = false;

public void start() {
startTime = System.currentTimeMillis();
}

public long stop() {
stopTime = System.currentTimeMillis();
stopped = true;
return (stopTime - startTime);
}

public long getDuration() {
return (stopped) ? (stopTime - startTime) : System.currentTimeMillis() - startTime;
}

public void logDuration(String durationName) {
Log.e("TIMER ", durationName + ": " + getDuration());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ private JSONObject fetchAsJsonObject(String filter, String filterValue) throws E
public int fetchAllClientsAndEvents(String filterName, String filterValue) {
try {

SyncIntentService.Timer fetchTimer = new SyncIntentService.Timer();
fetchTimer.start();
JSONObject jsonObject = fetchAsJsonObject(filterName, filterValue);
fetchTimer.stop();
fetchTimer.logDuration("Fetch JSON ");

int eventsCount = jsonObject.has("no_of_events") ? jsonObject.getInt("no_of_events") : 0;
if (eventsCount == 0) {
Expand All @@ -82,7 +86,11 @@ public int fetchAllClientsAndEvents(String filterName, String filterValue) {
JSONArray events = jsonObject.has("events") ? jsonObject.getJSONArray("events") : new JSONArray();
JSONArray clients = jsonObject.has("clients") ? jsonObject.getJSONArray("clients") : new JSONArray();

SyncIntentService.Timer batchSaveTimer = new SyncIntentService.Timer();
batchSaveTimer.start();
long lastSyncTimeStamp = batchSave(events, clients);
batchSaveTimer.stop();
batchSaveTimer.logDuration("Batch Save ");
if (lastSyncTimeStamp > 0l) {
updateLastSyncTimeStamp(lastSyncTimeStamp);
}
Expand Down

0 comments on commit c45fdfc

Please sign in to comment.