Skip to content

Commit

Permalink
Merge pull request #5 from frecar/frecar/metrics
Browse files Browse the repository at this point in the history
Add: Simple MetricsProducer class and collect some metrics
  • Loading branch information
frecar committed May 29, 2015
2 parents 4f7d4cb + 25f920e commit bae4fec
Show file tree
Hide file tree
Showing 8 changed files with 255 additions and 21 deletions.
44 changes: 36 additions & 8 deletions src/main/java/Main.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import metrics.MetricsCollector;
import obelix.ObelixBatchImport;
import obelix.ObelixCache;
import obelix.ObelixFeeder;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.factory.GraphDatabaseFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import queue.impl.ObelixQueueElement;
import queue.impl.RedisObelixQueue;
import queue.interfaces.ObelixQueue;
import store.impl.RedisObelixStore;
Expand All @@ -13,6 +15,7 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;

public class Main {

Expand All @@ -34,6 +37,8 @@ public static void main(String... args) {
String neoLocation = "graph.db";
String redisQueuePrefix = "obelix:queue:";
String redisQueueName = "logentries";
String metricsSaveLocation = "obelix_metrics.json";

int maxRelationships = 30;
int workers = 1;
int webPort = 4500;
Expand Down Expand Up @@ -110,13 +115,13 @@ public static void main(String... args) {
for (String arg : args) {
if (arg.equals("--recommendation-depth")) {
try {
int depth = Integer.parseInt(args[carg+1]);
int depth = Integer.parseInt(args[carg + 1]);

if(depth<0 || depth > 10) {
if (depth < 0 || depth > 10) {
throw new NumberFormatException();
}

recommendationDepth = args[carg+1];
recommendationDepth = args[carg + 1];

} catch (NumberFormatException e) {
LOGGER.error("Wrong format for --recommendation-depth option, use a number from 0-10");
Expand All @@ -133,7 +138,7 @@ public static void main(String... args) {
LOGGER.info("--redis-queue-name: " + redisQueueName);
LOGGER.info("--web-port: " + webPort);

if(batchImportAll) {
if (batchImportAll) {
LOGGER.info("Starting batch import of all");
ObelixBatchImport.run(neoLocation, redisQueueName);
LOGGER.info("Done importing everything! woho!");
Expand All @@ -147,7 +152,7 @@ public static void main(String... args) {
registerShutdownHook(graphDb);

ObelixQueue redisQueueManager = new RedisObelixQueue(redisQueuePrefix, redisQueueName);
ObelixQueue usersCacheQueue= new RedisObelixQueue(redisQueuePrefix, "cache:users");
ObelixQueue usersCacheQueue = new RedisObelixQueue(redisQueuePrefix, "cache:users");

// Warm up neo4j cache
/*
Expand All @@ -162,17 +167,40 @@ public static void main(String... args) {
System.out.println("Neo4j is warmed up!");
}*/

(new Thread(new ObelixFeeder(graphDb, maxRelationships,
feedDummyData(redisQueueManager);

MetricsCollector metricsCollector =
new MetricsCollector(metricsSaveLocation, graphDb,
redisQueueManager, usersCacheQueue);

new Thread(metricsCollector).start();

(new Thread(new ObelixFeeder(graphDb, metricsCollector, maxRelationships,
redisQueueManager, usersCacheQueue, 1))).start();

(new Thread(new ObelixWebServer(graphDb, webPort, recommendationDepth, clientSettings()))).start();
(new Thread(new ObelixWebServer(graphDb, webPort,
recommendationDepth, clientSettings()))).start();

(new Thread(new ObelixCache(graphDb, usersCacheQueue, new RedisObelixStore(redisQueuePrefix),
(new Thread(new ObelixCache(graphDb, metricsCollector, usersCacheQueue,
new RedisObelixStore(redisQueuePrefix),
buildForAllUsersOnStartup, recommendationDepth, maxRelationships,
clientSettings()))).start();

}

static void feedDummyData(ObelixQueue queue) {

for (int i = 0; i < 5; i++) {

String user = String.valueOf(new Random().nextInt(90000));
String item = String.valueOf(new Random().nextInt(90000));

String testData = "\"{\\\"file_format\\\": \\\"page_view\\\", \\\"timestamp\\\": 1431962580.7399549, \\\"item\\\": " + item + ", \\\"user\\\": " + user + ", \\\"ip\\\": \\\"188.218.111.19\\\", \\\"type\\\": \\\"events.pageviews\\\"}\"";

queue.push(new ObelixQueueElement(testData));
}
}

public static Map<String, String> clientSettings() {
Map<String, String> result = new HashMap<>();
result.put("redis_prefix", "obelix::");
Expand Down
6 changes: 2 additions & 4 deletions src/main/java/graph/RelationGraph.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,17 @@ public List<String> getAll(String type) {
} catch (NotFoundException e) {
LOGGER.error("Ignores one relationship, probably deleted..");
}

}

LOGGER.info("Newest timestamp: " + newestTimestamp);
LOGGER.info("Oldest timestamp: " + oldestTimestamp);
//LOGGER.info("Newest timestamp: " + newestTimestamp);
//LOGGER.info("Oldest timestamp: " + oldestTimestamp);

tx.success();

return relations_ids;
}
}


/*
public List<String> getAllRelationsForUserGivenLabel(String nodeLabel, String nodeID, String relType,
String sinceTimestamp, String untilTimestmap) {
Expand Down
160 changes: 160 additions & 0 deletions src/main/java/metrics/MetricsCollector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package metrics;

import com.google.gson.JsonObject;
import graph.ItemGraph;
import graph.RelationGraph;
import graph.UserGraph;
import org.json.JSONObject;
import org.neo4j.graphdb.GraphDatabaseService;
import queue.interfaces.ObelixQueue;
import store.impl.ObelixStoreElement;
import store.impl.RedisObelixStore;
import store.interfaces.ObelixStore;

import java.util.HashMap;
import java.util.Map;

public class MetricsCollector implements Runnable {

private final String metricsSaveLocation;
private final GraphDatabaseService graphDb;
private final ObelixQueue redisQueueManager;
private final ObelixQueue usersCacheQueue;
private final ObelixStore storage;

private Map<String, Integer> metrics = new HashMap<>();
private Map<String, Integer> totalMetrics = new HashMap<>();

public MetricsCollector(String metricsSaveLocation,
GraphDatabaseService graphDb,
ObelixQueue redisQueueManager,
ObelixQueue usersCacheQueue) {

this.storage = new RedisObelixStore();
this.metricsSaveLocation = metricsSaveLocation;
this.graphDb = graphDb;
this.redisQueueManager = redisQueueManager;
this.usersCacheQueue = usersCacheQueue;
loadStoredMetrics();
}

private void loadStoredMetrics() {

ObelixStoreElement obelixMetrics = this.storage.get("total_metrics");
if (obelixMetrics != null) {

if (obelixMetrics.data.has("recommendations_built")) {

this.totalMetrics.put("recommendations_built",
obelixMetrics.data.getInt("recommendations_built"));

this.totalMetrics.put("feeded",
obelixMetrics.data.getInt("feeded"));
}
}
}

private void saveTotalMetrics() {
JSONObject jsonObject = new JSONObject();
jsonObject.put("recommendations_built", this.totalMetrics.get("recommendations_built"));
jsonObject.put("feeded", this.totalMetrics.get("feeded"));
this.storage.set("total_metrics", new ObelixStoreElement(jsonObject));
}

private void saveMetrics() {
JSONObject jsonObject = new JSONObject();

for(String key : this.metrics.keySet()) {
jsonObject.put(key, this.metrics.get(key));
}

for(String key : this.totalMetrics.keySet()) {
jsonObject.put("total_" + key, this.totalMetrics.get(key));
}

this.storage.set("metrics", new ObelixStoreElement(jsonObject));
}

public void printMetrics() {
JsonObject object = new JsonObject();

for (String key : this.metrics.keySet()) {
if (this.totalMetrics.containsKey(key)) {
object.addProperty("total_" + key, this.totalMetrics.get(key));
}

object.addProperty(key, this.metrics.get(key));
}

saveMetrics();
saveTotalMetrics();
resetMetrics();

}

private void resetMetrics() {
for (String key : metrics.keySet()) {
if (this.metrics.containsKey(key)) {
this.metrics.put(key, 0);
}
}
}

private synchronized void addTotalMetricValue(String key, int value) {
if (!this.totalMetrics.containsKey(key)) {
this.totalMetrics.put(key, 0);
}
this.totalMetrics.put(key, this.totalMetrics.get(key) + value);
}

public synchronized void addAccumalativeMetricValue(String key, int value) {
if (!this.metrics.containsKey(key)) {
this.metrics.put(key, 0);
}

this.metrics.put(key, this.metrics.get(key) + value);
this.addTotalMetricValue(key, value);
}

public synchronized void addStaticMetricValue(String key, int value) {
if (!this.metrics.containsKey(key)) {
this.metrics.put(key, 0);
}

this.metrics.put(key, value);

}

private synchronized void addQueueStats() {
addStaticMetricValue("logentries_queue_size", redisQueueManager.getAll().size());
addStaticMetricValue("cache_queue_size", usersCacheQueue.getAll().size());
}

private synchronized void addGraphStats() {
UserGraph userGraph = new UserGraph(graphDb);
RelationGraph relationGraph = new RelationGraph(graphDb);
ItemGraph itemGraph = new ItemGraph(graphDb);

addStaticMetricValue("all_users_count", userGraph.getAll().size());
addStaticMetricValue("all_items_count", itemGraph.getAll().size());
addStaticMetricValue("all_relationships_count", relationGraph.getAll().size());
}

@Override
public void run() {

while (true) {
addQueueStats();
addGraphStats();
this.printMetrics();
//this.resetMetrics();

try {
// wait for 5 seconds
Thread.sleep(300000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
1 change: 0 additions & 1 deletion src/main/java/obelix/Obelix.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public Obelix(Builder builder) {
this.numberOfRelationships = builder.numberOfRelationships;
this.neo4jStore = builder.neo4jstore;
this.redisQueueName = builder.redisQueueName != null ? builder.redisQueueName : "logentries";

}

public void run() {
Expand Down
33 changes: 29 additions & 4 deletions src/main/java/obelix/ObelixCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import graph.UserGraph;
import graph.exceptions.ObelixNodeNotFoundException;
import metrics.MetricsCollector;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.TransactionFailureException;
Expand All @@ -25,6 +26,7 @@ public class ObelixCache implements Runnable {


private final static Logger LOGGER = LoggerFactory.getLogger(ObelixCache.class.getName());
private final MetricsCollector metricsCollector;

GraphDatabaseService graphDb;
ObelixQueue redisQueueManager;
Expand All @@ -37,11 +39,30 @@ public class ObelixCache implements Runnable {
int maxRelationships;
boolean buildForAllUsersOnStartup;

public ObelixCache(GraphDatabaseService graphDb, ObelixQueue usersCacheQueue,
public ObelixCache(GraphDatabaseService graphDb, MetricsCollector metricsCollector,
ObelixQueue usersCacheQueue,
ObelixStore obelixStore,
boolean buildForAllUsersOnStartup, String recommendationDepth,
int maxRelationships, Map<String, String> clientSettings) {

this.metricsCollector = metricsCollector;
this.redisStoreManager = obelixStore;
this.graphDb = graphDb;
this.redisQueueManager = usersCacheQueue;
this.userGraph = new UserGraph(graphDb);
this.buildForAllUsersOnStartup = buildForAllUsersOnStartup;
this.recommendationDepth = recommendationDepth;
this.maxRelationships = maxRelationships;
this.clientSettings = clientSettings;
}

public ObelixCache(GraphDatabaseService graphDb,
ObelixQueue usersCacheQueue,
ObelixStore obelixStore,
boolean buildForAllUsersOnStartup, String recommendationDepth,
int maxRelationships, Map<String, String> clientSettings) {

this.metricsCollector = null;
this.redisStoreManager = obelixStore;
this.graphDb = graphDb;
this.redisQueueManager = usersCacheQueue;
Expand All @@ -59,7 +80,7 @@ private void buildSettingsCache() {

private void buildCacheForUser(String userid) {

LOGGER.info("Building cache for userid: " + userid);
LOGGER.info("Building recommendations for user id: " + userid);

Map<String, Double> recommendations;

Expand All @@ -70,8 +91,12 @@ private void buildCacheForUser(String userid) {
"recommendations::" + userid,
new ObelixStoreElement(jsonTransformer.render(recommendations)));

if(metricsCollector != null) {
this.metricsCollector.addAccumalativeMetricValue("recommendations_built", 1);
}

} catch (ObelixNodeNotFoundException | NoSuchElementException | IllegalArgumentException e) {
LOGGER.info("Cache for user " + userid + " failed to build..! Can't find the user");
LOGGER.info("Recommendations for user " + userid + " failed to build..! Can't find the user");
}
}

Expand Down Expand Up @@ -121,8 +146,8 @@ public void run() {
} catch (InterruptedException e) {
e.printStackTrace();
}

}

} catch (Exception e) {
LOGGER.error("ObelixCache Exception", e);
LOGGER.info("Restarting ObelixCache.run()!");
Expand Down

0 comments on commit bae4fec

Please sign in to comment.