Skip to content

Commit

Permalink
Merge pull request #21 from muflihun/develop
Browse files Browse the repository at this point in the history
v2.0.0 compatible
  • Loading branch information
abumq committed Mar 1, 2018
2 parents ab656b4 + 6f75011 commit c8b4f11
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 370 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
@@ -1,5 +1,8 @@
# Change Log

## [2.0.0] - 01-03-2018
- Compatibility for server 2.0.0

## [1.1.1] - 23-02-2018
- Removed plain request

Expand Down
Expand Up @@ -81,9 +81,7 @@

import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.reflect.TypeToken;

import org.bouncycastle.asn1.ASN1EncodableVector;
import org.bouncycastle.asn1.ASN1Integer;
Expand All @@ -105,21 +103,17 @@
public class Residue {

private static final Integer TOUCH_THRESHOLD = 60; // should always be min(client_age)
private static final String DEFAULT_ACCESS_CODE = "default";
private static final Integer ALLOCATION_BUFFER_SIZE = 4098;

private final ResidueClient connectionClient = new ResidueClient();
private final ResidueClient tokenClient = new ResidueClient();
private final ResidueClient loggingClient = new ResidueClient();

private final Deque<JsonObject> backlog = new ArrayDeque<>();
private final Map<String, ResidueToken> tokens = new HashMap<>();
private final Map<String, Logger> loggers = new HashMap<>();

private String host;
private Integer port;
private Integer loggingPort;
private Integer tokenPort;
private String applicationName;
private Integer rsaKeySize = 2048;
private Integer keySize = 128;
Expand All @@ -132,8 +126,6 @@ public class Residue {
private Integer bulkSize = 0;
private String defaultLoggerId = "default";

private Map<String, String> accessCodeMap;

private String privateKeySecret;
private String privateKeyFilename;
private String privateKeyPEM;
Expand Down Expand Up @@ -205,10 +197,6 @@ public Boolean isConnected() {
return connected;
}

public void setAccessCodeMap(final Map<String, String> accessCodeMap) {
this.accessCodeMap = accessCodeMap;
}

public void setTimeOffset(final Integer timeOffset) {
this.timeOffset = timeOffset;
}
Expand Down Expand Up @@ -352,30 +340,6 @@ public synchronized void loadConfigurationsFromJson(final String json) throws Ex
Thread.currentThread().setName(jsonObject.get("main_thread_id").getAsString());
}

if (jsonObject.has("access_codes")) {
Type type = new TypeToken<List<Map<String, String>>>() {
}.getType();

Map<String, String> newMap = new HashMap<>();
List<Map<String, String>> accessCodes = new Gson().fromJson(jsonObject.getAsJsonArray("access_codes"), type);
for (Map<String, String> accessCodeMap : accessCodes) {
String loggerId = "";
String code = "";
for (String key : accessCodeMap.keySet()) {
if ("logger_id".equals(key)) {
loggerId = accessCodeMap.get(key);
} else if ("code".equals(key)) {
code = accessCodeMap.get(key);
}
}
if (!loggerId.isEmpty() && !code.isEmpty()) {
newMap.put(loggerId, code);
}
}
setAccessCodeMap(newMap);

}

if (jsonObject.has("server_public_key")) {
setServerKeyFilename(jsonObject.get("server_public_key").getAsString());
}
Expand Down Expand Up @@ -860,11 +824,7 @@ public static boolean connect(final String host, final Integer port) throws Exce
getInstance().connecting = true;
getInstance().connected = false;
getInstance().connectionClient.destroy();
getInstance().tokenClient.destroy();
getInstance().loggingClient.destroy();
synchronized (getInstance().tokens) {
getInstance().tokens.clear();
}

final CountDownLatch latch = new CountDownLatch(3); // 3 connection sockets
if (getInstance().clientId != null && !getInstance().clientId.isEmpty()
Expand Down Expand Up @@ -969,7 +929,6 @@ public void handle(String data, boolean hasError) {
if (finalConnection.get("status").getAsInt() == 0) {
getInstance().age = finalConnection.get("age").getAsInt();
getInstance().loggingPort = finalConnection.get("logging_port").getAsInt();
getInstance().tokenPort = finalConnection.get("token_port").getAsInt();
getInstance().maxBulkSize = finalConnection.get("max_bulk_size").getAsInt();
getInstance().serverFlags = finalConnection.get("flags").getAsInt();
getInstance().serverVersion = finalConnection.get("server_info").getAsJsonObject().get("version").getAsString();
Expand All @@ -984,26 +943,6 @@ public void handle(String data, boolean hasError) {
getInstance().bulkDispatch = false;
}
getInstance().connected = true;
try {
getInstance().tokenClient.connect(getInstance().host, getInstance().tokenPort, new ResponseHandler("tokenClient.reconnect") {
@Override
public void handle(String data, boolean hasError) {
logForDebugging();
if (Flag.REQUIRES_TOKEN.isSet() && Residue.getInstance().accessCodeMap != null) {
for (String key : Residue.getInstance().accessCodeMap.keySet()) {
try {
getInstance().obtainToken(key, Residue.getInstance().accessCodeMap.get(key));
} catch (Exception e) {
e.printStackTrace();
}
}
}
latch.countDown();
}
});
} catch (IOException e) {
latch.countDown();
}
try {
getInstance().loggingClient.connect(getInstance().host, getInstance().loggingPort, new ResponseHandler("loggingClient.reconnect") {
@Override
Expand Down Expand Up @@ -1120,8 +1059,6 @@ public Integer getValue() {
private enum Flag {
NONE(0),
ALLOW_UNKNOWN_LOGGERS(1),
REQUIRES_TOKEN(2),
ALLOW_DEFAULT_ACCESS_CODE(4),
ALLOW_PLAIN_LOG_REQUEST(8),
ALLOW_BULK_LOG_REQUEST(16),
COMPRESSION(256);
Expand Down Expand Up @@ -1233,12 +1170,6 @@ public void failed(Throwable exc, AsynchronousSocketChannel channel) {
}
}

private static class ResidueToken {
private String data;
private Date dateCreated;
private Integer life;
}

/**
* Residue utility functions
*/
Expand Down Expand Up @@ -1462,79 +1393,6 @@ private static long getTimestamp() {
}
}

private void obtainToken(final String loggerId, String accessCode) throws Exception {

if (tokenClient.isConnected) {
if (accessCode == null) {
if (accessCodeMap != null && accessCodeMap.containsKey(loggerId)) {
accessCode = accessCodeMap.get(loggerId);
} else {
accessCode = DEFAULT_ACCESS_CODE;
}
}
if (DEFAULT_ACCESS_CODE.equals(accessCode) && !Flag.ALLOW_DEFAULT_ACCESS_CODE.isSet()) {
throw new Exception("ERROR: Access code for logger [" + loggerId + "] not provided. Loggers without access code are not allowed by the server.");
} else if (DEFAULT_ACCESS_CODE.equals(accessCode) && Flag.ALLOW_DEFAULT_ACCESS_CODE.isSet()) {
// we don't need to get token, server will accept request
// without tokens
return;
}
ResidueUtils.debugLog("Obtaining new token for [" + loggerId + "]");
final CountDownLatch latch = new CountDownLatch(1);
JsonObject j = new JsonObject();
j.addProperty("_t", ResidueUtils.getTimestamp());
j.addProperty("logger_id", loggerId);
j.addProperty("access_code", accessCode);
String request = new Gson().toJson(j);
ResidueUtils.debugLog("Token request: " + request);

String r = ResidueUtils.encrypt(request, key);
tokenClient.send(r, new ResponseHandler("tokenClient.send") {
@Override
public void handle(String data, boolean hasError) {
logForDebugging();
if (!hasError && !data.isEmpty()) {
String finalConnectionStr = ResidueUtils.decrypt(data, key);
JsonObject tokenResponse = new Gson().fromJson(finalConnectionStr, JsonObject.class);
if (tokenResponse == null) {
latch.countDown();
return;
}
if (tokenResponse.get("status").getAsInt() == 0) {
ResidueToken token = new ResidueToken();
token.data = tokenResponse.get("token").getAsString();
token.life = tokenResponse.get("life").getAsInt();
token.dateCreated = new Date();
synchronized (tokens) {
tokens.put(loggerId, token);
}
} else {
lastError = tokenResponse.get("error_text").getAsString();
ResidueUtils.debugLog("Error: " + getInstance().lastError);
}
} else {
ResidueUtils.debugLog("Error while obtaining token");
}
latch.countDown();
}
});
ResidueUtils.debugLog("Waiting 5s for token...");
latch.await(5L, TimeUnit.SECONDS);
}

}

private synchronized boolean hasValidToken(String loggerId) {
if (!Flag.REQUIRES_TOKEN.isSet()) {
return true;
}
if (!tokens.containsKey(loggerId)) {
return false;
}
ResidueToken t = tokens.get(loggerId);
return t != null && (t.life == 0 || (new Date().getTime() / 1000) - (t.dateCreated.getTime() / 1000) < t.life);
}

private boolean isClientValid() {
if (!connected || dateCreated == null) {
return false;
Expand Down Expand Up @@ -1648,7 +1506,6 @@ public void run() {

final JsonArray bulkJ = new JsonArray();
final Set<String> loggerIds = new HashSet<>();
final Map<String, String> tokenList = new HashMap<>();

// build up bulk request
synchronized (backlog) {
Expand All @@ -1662,44 +1519,6 @@ public void run() {
}
}

// Obtain tokens for all the loggers (unique)
for (String loggerId : loggerIds) {
if (!hasValidToken(loggerId)) {
try {
obtainToken(loggerId, null /* means read from map */);
} catch (Exception e) {
// Ignore
e.printStackTrace();
}
}
ResidueToken tokenObj = null;
synchronized (tokens) {
tokenObj = tokens.get(loggerId);
}
if (tokenObj == null
&& (Flag.ALLOW_DEFAULT_ACCESS_CODE.isSet() || !Flag.REQUIRES_TOKEN.isSet())) {
tokenList.put(loggerId, "");
} else if (tokenObj != null) {
tokenList.put(loggerId, tokenObj.data);
} else {
tokenList.put(loggerId, null); // token not found
}
}

// Integrate token to the request
for (JsonElement jElem : bulkJ) {
JsonObject j = jElem.getAsJsonObject();
String loggerId = j.get("logger").getAsString();
String foundToken = tokenList.get(loggerId);
if (foundToken == null) {
ResidueUtils.log("ERROR: Failed to obtain token [" + loggerId + "]");
continue;
}
if (!foundToken.isEmpty()) {
j.addProperty("token", foundToken);
}
}

String request = new Gson().toJson(Boolean.TRUE.equals(bulkDispatch) ? bulkJ : bulkJ.get(0));
if (Flag.COMPRESSION.isSet()) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Expand Down Expand Up @@ -1810,6 +1629,7 @@ public void log(Long datetime, String loggerId, String msg,
Integer sourceLineNumber, String sourceMethodName, String threadName,
Integer vlevel) {
JsonObject j = new JsonObject();
j.addProperty("_t", ResidueUtils.getTimestamp());
j.addProperty("datetime", datetime);
j.addProperty("logger", loggerId);
j.addProperty("msg", msg);
Expand Down
Expand Up @@ -21,7 +21,7 @@ public void connect() {
Residue r = Residue.getInstance();
System.out.println("reconnect()...");

if (false) {
if (true) {
// unencrypted private key
r.setClientId("muflihun00102030");
r.setPrivateKeyPEM("-----BEGIN RSA PRIVATE KEY-----\n" +
Expand Down Expand Up @@ -55,7 +55,7 @@ public void connect() {

if (false) {

// unencrypted private key
// encrypted private key
r.setClientId("muflihun00102031");
r.setPrivateKeySecret("8583fFir");
r.setPrivateKeyPEM("-----BEGIN RSA PRIVATE KEY-----\n" +
Expand Down Expand Up @@ -90,12 +90,6 @@ public void connect() {
"-----END RSA PRIVATE KEY-----");
}

if (false) {
r.setAccessCodeMap(new HashMap<String, String>() {{
put("sample-app", "a2dcb");
}});
}

r.setHost(hostText.getText().toString(), Integer.valueOf(portText.getText().toString()));
r.setUtcTime(true);

Expand Down

0 comments on commit c8b4f11

Please sign in to comment.