/
CouchCtrlListener.java
109 lines (87 loc) · 2.71 KB
/
CouchCtrlListener.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package org.couchdb.android;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import android.util.Log;
public class CouchCtrlListener {
private String couchUrl;
private String ctrl;
private String adminUser;
private String adminPass;
private Boolean cancelled = false;
private Boolean running = false;
public CouchCtrlListener(String couchUrl, String db, String user,
String pass) {
this.couchUrl = couchUrl;
this.adminUser = user;
this.adminPass = pass;
this.ctrl = db + "-ctrl";
}
public void start() {
Log.v(CouchProcess.TAG, "Starting Listener for " + ctrl);
if (!running) {
try {
running = true;
JSONObject dbInfo = HTTPRequest.get(couchUrl + ctrl, headers()).json;
int updateSeq = dbInfo.getInt("update_seq");
changes(updateSeq);
} catch (JSONException e) {
e.printStackTrace();
}
} else if (running && cancelled) {
cancelled = false;
}
}
private void doReplication(JSONObject json) throws JSONException {
HTTPRequest req = HTTPRequest.httpRequest("POST", couchUrl
+ "_replicate", json.toString(), headers());
// Java will just close the connection when it gets a 404
// without reading the result
JSONObject result;
try {
result = new JSONObject(req.result);
} catch (Exception o) {
result = new JSONObject();
}
result.put("http_status", req.status);
json.put("result", result);
json.put("status", "complete");
String id = json.getString("_id");
HTTPRequest.httpRequest("PUT", couchUrl + ctrl + "/" + id,
json.toString(), headers());
}
private void handleChange(JSONObject doc) throws JSONException {
if (doc.has("status") && doc.get("status").equals("complete")) {
return;
}
if (doc.has("source") && doc.has("target")) {
doReplication(doc);
}
}
private void changes(int seq) throws JSONException {
while (!cancelled) {
String url = couchUrl + ctrl
+ "/_changes?include_docs=true&feed=longpoll&since="
+ Integer.toString(seq);
JSONObject json = HTTPRequest.get(url, headers()).json;
Log.v(CouchProcess.TAG, "Received Changes for " + ctrl);
seq = json.getInt("last_seq");
JSONArray results = json.getJSONArray("results");
for (int i = 0; i < results.length(); i++) {
handleChange(results.getJSONObject(i).getJSONObject("doc"));
}
}
Log.v(CouchProcess.TAG, "Changes listener on " + ctrl + " has stopped");
running = false;
cancelled = false;
}
public void cancel() {
Log.v(CouchProcess.TAG, "Cancelling changes listener for " + ctrl);
cancelled = true;
}
private String[][] headers() {
String auth = Base64Coder.encodeString(adminUser + ":" + adminPass);
String[][] headers = { { "Authorization", "Basic " + auth } };
return headers;
}
}