Permalink
Browse files

changed strategy for handler threads again (now one per database, off…

… main thread)

changed continuous replication strategy to avoid memory leak
  • Loading branch information...
1 parent 99ca973 commit 6c6cb62305c2e3f6d7aedff9ea53fc4ea8971080 @mschoch mschoch committed Jul 10, 2012
@@ -26,7 +26,6 @@
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Observable;
@@ -37,6 +36,9 @@
import android.database.SQLException;
import android.database.sqlite.SQLiteDatabase;
import android.database.sqlite.SQLiteException;
+import android.os.Handler;
+import android.os.HandlerThread;
+import android.os.Looper;
import android.util.Log;
import com.couchbase.touchdb.TDDatabase.TDContentOptions;
@@ -65,6 +67,9 @@
private List<TDReplicator> activeReplicators;
private TDBlobStore attachments;
+ private HandlerThread handlerThread;
+ private Handler handler;
+
/**
* Options for what metadata to include in document bodies
*/
@@ -168,6 +173,13 @@ public TDDatabase(String path) {
assert(path.startsWith("/")); //path must be absolute
this.path = path;
this.name = FileDirUtils.getDatabaseNameFromPath(path);
+
+ //start a handler thead to do work for this database
+ handlerThread = new HandlerThread("HandlerThread for " + toString());
+ handlerThread.start();
+ //Get the looper from the handlerThread
+ Looper looper = handlerThread.getLooper();
+ handler = new Handler(looper);
}
public String toString() {
@@ -316,12 +328,16 @@ public boolean close() {
views = null;
if(activeReplicators != null) {
- Iterator<TDReplicator> iter = activeReplicators.iterator();
- while(iter.hasNext()) {
- TDReplicator replicator = iter.next();
- replicator.stop();
- iter.remove();
+ for(TDReplicator replicator : activeReplicators) {
+ replicator.databaseClosing();
}
+ activeReplicators = null;
+ }
+
+ if(handlerThread != null) {
+ handler = null;
+ handlerThread.quit();
+ handlerThread = null;
}
if(database != null && database.isOpen()) {
@@ -2258,13 +2274,6 @@ public TDReplicator getReplicator(URL remote, HttpClientFactory httpClientFactor
return result;
}
- public void replicatorDidStop(TDReplicator replicator) {
- replicator.databaseClosing(); // get it to detach from me
- if(activeReplicators != null) {
- activeReplicators.remove(replicator);
- }
- }
-
public String lastSequenceWithRemoteURL(URL url, boolean push) {
Cursor cursor = null;
String result = null;
@@ -2473,6 +2482,10 @@ public TDStatus deleteLocalDocument(String docID, String revID) {
return new TDStatus(TDStatus.INTERNAL_SERVER_ERROR);
}
}
+
+ public Handler getHandler() {
+ return handler;
+ }
}
class TDValidationContextImpl implements TDValidationContext {
@@ -33,7 +33,7 @@
private static final int MAX_OPEN_HTTP_CONNECTIONS = 16;
- protected TDBatcher<List<Object>> revsToInsert;
+ protected TDBatcher<List<Object>> downloadsToInsert;
protected List<TDRevision> revsToPull;
protected long nextFakeSequence;
protected long maxInsertedFakeSequence;
@@ -51,8 +51,8 @@ public TDPuller(TDDatabase db, URL remote, boolean continuous, HttpClientFactory
@Override
public void beginReplicating() {
- if(revsToInsert == null) {
- revsToInsert = new TDBatcher<List<Object>>(200, 1000, new TDBatchProcessor<List<Object>>() {
+ if(downloadsToInsert == null) {
+ downloadsToInsert = new TDBatcher<List<Object>>(db.getHandler(), 200, 1000, new TDBatchProcessor<List<Object>>() {
@Override
public void process(List<List<Object>> inbox) {
insertRevisions(inbox);
@@ -61,7 +61,7 @@ public void process(List<List<Object>> inbox) {
}
nextFakeSequence = maxInsertedFakeSequence = 0;
Log.w(TDDatabase.TAG, this + " starting ChangeTracker with since=" + lastSequence);
- changeTracker = new TDChangeTracker(remote, continuous ? TDChangeTrackerMode.Continuous : TDChangeTrackerMode.OneShot, lastSequence, this);
+ changeTracker = new TDChangeTracker(remote, continuous ? TDChangeTrackerMode.LongPoll : TDChangeTrackerMode.OneShot, lastSequence, this);
if(filterName != null) {
changeTracker.setFilterName(filterName);
if(filterParams != null) {
@@ -74,7 +74,7 @@ public void process(List<List<Object>> inbox) {
@Override
public void stop() {
- inExternalShutdown = true;
+
if(!running) {
return;
}
@@ -83,15 +83,20 @@ public void stop() {
changeTracker.stop();
changeTracker = null;
- asyncTaskFinished(1);
+ synchronized(this) {
+ revsToPull = null;
+ }
+
super.stop();
+
+ downloadsToInsert.flush();
}
@Override
public void stopped() {
- revsToInsert.flush();
- revsToInsert.close();
+ downloadsToInsert.flush();
+ downloadsToInsert.close();
super.stopped();
}
@@ -184,6 +189,17 @@ public void processInbox(TDRevisionList inbox) {
revsToPull.addAll(inbox);
pullRemoteRevisions();
+
+ //TEST
+ //adding wait here to prevent revsToPull from getting too large
+ while(revsToPull != null && revsToPull.size() > 1000) {
+ pullRemoteRevisions();
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ //wake up
+ }
+ }
}
/**
@@ -193,7 +209,7 @@ public void processInbox(TDRevisionList inbox) {
* to keep the process moving, need to synchronize check for size with removal
*/
public synchronized void pullRemoteRevisions() {
- while(httpConnectionCount < MAX_OPEN_HTTP_CONNECTIONS && revsToPull.size() > 0) {
+ while(httpConnectionCount < MAX_OPEN_HTTP_CONNECTIONS && revsToPull != null && revsToPull.size() > 0) {
pullRemoteRevision(revsToPull.get(0));
revsToPull.remove(0);
}
@@ -240,7 +256,7 @@ public void onCompletion(Object result, Throwable e) {
List<Object> toInsert = new ArrayList<Object>();
toInsert.add(rev);
toInsert.add(history);
- revsToInsert.queueObject(toInsert);
+ downloadsToInsert.queueObject(toInsert);
asyncTaskStarted();
} else {
Log.w(TDDatabase.TAG, this + ": Missing revision history in response from " + pathInside);
@@ -285,6 +301,9 @@ public int compare(List<Object> list1, List<Object> list2) {
boolean allGood = true;
TDPulledRevision lastGoodRev = null;
+ if(db == null) {
+ return;
+ }
db.beginTransaction();
boolean success = false;
try {
@@ -328,7 +347,10 @@ public int compare(List<Object> list1, List<Object> list2) {
}
List<String> knownCurrentRevIDs(TDRevision rev) {
- return db.getAllRevisionsOfDocumentID(rev.getDocId(), true).getAllRevIds();
+ if(db != null) {
+ return db.getAllRevisionsOfDocumentID(rev.getDocId(), true).getAllRevIds();
+ }
+ return null;
}
public String joinQuotedEscaped(List<String> strings) {
@@ -109,13 +109,16 @@ public void beginReplicating() {
@Override
public void stop() {
- inExternalShutdown = true;
+ stopObserving();
+ super.stop();
+ }
+
+ private void stopObserving() {
if(observing) {
observing = false;
db.deleteObserver(this);
asyncTaskFinished(1);
}
- super.stop();
}
@Override
@@ -12,7 +12,6 @@
import org.apache.http.impl.client.DefaultHttpClient;
import android.os.Handler;
-import android.os.Looper;
import android.util.Log;
import com.couchbase.touchdb.TDDatabase;
@@ -50,8 +49,6 @@
protected String filterName;
protected Map<String,Object> filterParams;
- protected boolean inExternalShutdown = false;
-
protected static final int PROCESSOR_DELAY = 500;
protected static final int INBOX_CAPACITY = 100;
@@ -64,13 +61,10 @@ public TDReplicator(TDDatabase db, URL remote, boolean continuous, HttpClientFac
this.db = db;
this.remote = remote;
this.continuous = continuous;
-
- Looper looper = Looper.getMainLooper();
- //Create a new handler - passing in the looper for it to use
- this.handler = new Handler(looper);
+ this.handler = db.getHandler();
- batcher = new TDBatcher<TDRevision>(INBOX_CAPACITY, PROCESSOR_DELAY, new TDBatchProcessor<TDRevision>() {
+ batcher = new TDBatcher<TDRevision>(db.getHandler(), INBOX_CAPACITY, PROCESSOR_DELAY, new TDBatchProcessor<TDRevision>() {
@Override
public void process(List<TDRevision> inbox) {
Log.v(TDDatabase.TAG, "*** " + toString() + ": BEGIN processInbox (" + inbox.size() + " sequences)");
@@ -183,6 +177,8 @@ public void stop() {
return;
}
Log.v(TDDatabase.TAG, toString() + " STOPPING...");
+ batcher.flush();
+ continuous = false;
if(asyncTaskCount == 0) {
stopped();
}
@@ -191,14 +187,12 @@ public void stop() {
public void stopped() {
Log.v(TDDatabase.TAG, toString() + " STOPPED");
running = false;
+ this.changesProcessed = this.changesTotal = 0;
- batcher.flush();
- batcher.close();
+ saveLastSequence();
- this.changesProcessed = this.changesTotal = 0;
- if(!inExternalShutdown) {
- db.replicatorDidStop(this);
- }
+ batcher = null;
+ db = null;
}
public synchronized void asyncTaskStarted() {
@@ -229,7 +223,7 @@ public void sendAsyncRequest(String method, String relativePath, Object body, TD
String urlStr = remote.toExternalForm() + relativePath;
try {
URL url = new URL(urlStr);
- TDRemoteRequest request = new TDRemoteRequest(clientFacotry, method, url, body, onCompletion);
+ TDRemoteRequest request = new TDRemoteRequest(db.getHandler(), clientFacotry, method, url, body, onCompletion);
request.start();
} catch (MalformedURLException e) {
Log.e(TDDatabase.TAG, "Malformed URL for async request", e);
@@ -248,6 +242,9 @@ public void maybeCreateRemoteDB() {
* and the remote database's URL.
*/
public String remoteCheckpointDocID() {
+ if(db == null) {
+ return null;
+ }
String input = db.privateUUID() + "\n" + remote.toExternalForm() + "\n" + (isPush() ? "1" : "0");
return TDMisc.TDHexSHA1Digest(input.getBytes());
}
@@ -313,8 +310,12 @@ public void saveLastSequence() {
}
body.put("lastSequence", lastSequence);
+ String remoteCheckpointDocID = remoteCheckpointDocID();
+ if(remoteCheckpointDocID == null) {
+ return;
+ }
savingCheckpoint = true;
- sendAsyncRequest("PUT", "/_local/" + remoteCheckpointDocID(), body, new TDRemoteRequestCompletionBlock() {
+ sendAsyncRequest("PUT", "/_local/" + remoteCheckpointDocID, body, new TDRemoteRequestCompletionBlock() {
@Override
public void onCompletion(Object result, Throwable e) {
Oops, something went wrong.

0 comments on commit 6c6cb62

Please sign in to comment.