diff --git a/helpers/geo_helper.py b/helpers/geo_helper.py
index aa125ee..d45e6f3 100644
--- a/helpers/geo_helper.py
+++ b/helpers/geo_helper.py
@@ -12,6 +12,7 @@
from phonenumbers import geocoder
import util
+from helpers import live_helper
class InvalidCoordinate(Exception):
pass
@@ -24,6 +25,7 @@ def __init__(self, serv_redis_db, cfg):
host=cfg.get('RedisGlobal', 'host'),
port=cfg.getint('RedisGlobal', 'port'),
db=cfg.getint('RedisMap', 'db'))
+ self.live_helper = live_helper.Live_helper(serv_redis_db, cfg)
#logger
logDir = cfg.get('Log', 'directory')
@@ -118,7 +120,9 @@ def getCoordFromIpAndPublish(self, supposed_ip, categ):
"cityName": rep['full_rep'].city.name,
"regionCode": rep['full_rep'].country.iso_code,
}
- self.serv_coord.publish(self.CHANNELDISP, json.dumps(to_send))
+ j_to_send = json.dumps(to_send)
+ self.serv_coord.publish(self.CHANNELDISP, j_to_send)
+ self.live_helper.add_to_stream_log_cache('Map', j_to_send)
self.logger.info('Published: {}'.format(json.dumps(to_send)))
except ValueError:
self.logger.warning("can't resolve ip")
@@ -163,7 +167,9 @@ def getCoordFromPhoneAndPublish(self, phoneNumber, categ):
"cityName": "",
"regionCode": country_code,
}
- self.serv_coord.publish(self.CHANNELDISP, json.dumps(to_send))
+ j_to_send = json.dumps(to_send)
+ self.serv_coord.publish(self.CHANNELDISP, j_to_send)
+ self.live_helper.add_to_stream_log_cache('Map', j_to_send)
self.logger.info('Published: {}'.format(json.dumps(to_send)))
except phonenumbers.NumberParseException:
self.logger.warning("Can't resolve phone number country")
diff --git a/server.py b/server.py
index f3cdb4a..23e9139 100755
--- a/server.py
+++ b/server.py
@@ -15,6 +15,7 @@
from helpers import contributor_helper
from helpers import users_helper
from helpers import trendings_helper
+from helpers import live_helper
configfile = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'config/config.cfg')
cfg = configparser.ConfigParser()
@@ -41,6 +42,10 @@
port=cfg.getint('RedisGlobal', 'port'),
db=cfg.getint('RedisDB', 'db'))
+streamLogCacheKey = cfg.get('RedisLog', 'streamLogCacheKey')
+streamMapCacheKey = cfg.get('RedisLog', 'streamMapCacheKey')
+
+live_helper = live_helper.Live_helper(serv_redis_db, cfg)
geo_helper = geo_helper.Geo_helper(serv_redis_db, cfg)
contributor_helper = contributor_helper.Contributor_helper(serv_redis_db, cfg)
users_helper = users_helper.Users_helper(serv_redis_db, cfg)
@@ -56,8 +61,6 @@ class LogItem():
FIELDNAME_ORDER = []
FIELDNAME_ORDER_HEADER = []
- FIELDNAME_ORDER.append("Time")
- FIELDNAME_ORDER_HEADER.append("Time")
for item in json.loads(cfg.get('Dashboard', 'fieldname_order')):
if type(item) is list:
FIELDNAME_ORDER_HEADER.append(" | ".join(item))
@@ -66,10 +69,7 @@ class LogItem():
FIELDNAME_ORDER.append(item)
def __init__(self, feed):
- self.time = strftime("%H:%M:%S", now())
- #FIXME Parse feed message?
self.fields = []
- self.fields.append(self.time)
for f in feed:
self.fields.append(f)
@@ -100,15 +100,19 @@ def __init__(self, msg):
logger.error(e)
jsonMsg = { 'name': "undefined" ,'log': json.loads(msg) }
- self.feedName = jsonMsg['name']
+ self.name = jsonMsg['name']
self.zmqName = jsonMsg['zmqName']
self.feed = json.loads(jsonMsg['log'])
self.feed = LogItem(self.feed).get_row()
- def to_json(self):
- to_ret = { 'log': self.feed, 'feedName': self.feedName, 'zmqName': self.zmqName }
+ def to_json_ev(self):
+ to_ret = { 'log': self.feed, 'name': self.name, 'zmqName': self.zmqName }
return 'data: {}\n\n'.format(json.dumps(to_ret))
+ def to_json(self):
+ to_ret = { 'log': self.feed, 'name': self.name, 'zmqName': self.zmqName }
+ return json.dumps(to_ret)
+
###########
## ROUTE ##
###########
@@ -219,11 +223,21 @@ def trendings():
@app.route("/_logs")
def logs():
- return Response(event_stream_log(), mimetype="text/event-stream")
+ if request.accept_mimetypes.accept_json or request.method == 'POST':
+ key = 'Attribute'
+ j = live_helper.get_stream_log_cache(key)
+ return jsonify(j)
+ else:
+ return Response(event_stream_log(), mimetype="text/event-stream")
@app.route("/_maps")
def maps():
- return Response(event_stream_maps(), mimetype="text/event-stream")
+ if request.accept_mimetypes.accept_json or request.method == 'POST':
+ key = 'Map'
+ j = live_helper.get_stream_log_cache(key)
+ return jsonify(j)
+ else:
+ return Response(event_stream_maps(), mimetype="text/event-stream")
@app.route("/_get_log_head")
def getLogHead():
@@ -231,11 +245,12 @@ def getLogHead():
def event_stream_log():
subscriber_log = redis_server_log.pubsub(ignore_subscribe_messages=True)
- subscriber_log.subscribe(cfg.get('RedisLog', 'channel'))
+ subscriber_log.subscribe(live_helper.CHANNEL)
try:
for msg in subscriber_log.listen():
content = msg['data']
- yield EventMessage(content).to_json()
+ ev = EventMessage(content)
+ yield ev.to_json_ev()
except GeneratorExit:
subscriber_log.unsubscribe()
@@ -245,7 +260,8 @@ def event_stream_maps():
try:
for msg in subscriber_map.listen():
content = msg['data'].decode('utf8')
- yield 'data: {}\n\n'.format(content)
+ to_ret = 'data: {}\n\n'.format(content)
+ yield to_ret
except GeneratorExit:
subscriber_map.unsubscribe()
diff --git a/static/js/index/index.js b/static/js/index/index.js
index 21f161c..7c5a752 100644
--- a/static/js/index/index.js
+++ b/static/js/index/index.js
@@ -184,14 +184,19 @@ function connect_source_log() {
source_log.onmessage = function(event) {
var json = jQuery.parseJSON( event.data );
- updateLogTable(json.feedName, json.log, json.zmqName);
+ updateLogTable(json.name, json.log, json.zmqName);
};
}
$(document).ready(function () {
createHead(function() {
if (!!window.EventSource) {
- connect_source_log();
+ $.getJSON( urlForLogs, function( data ) {
+ data.forEach(function(item) {
+ updateLogTable(item.name, item.log, item.zmqName);
+ });
+ connect_source_log();
+ });
} else {
console.log("No event source_log");
}
@@ -202,7 +207,7 @@ $(document).ready(function () {
// LOG TABLE
-function updateLogTable(feedName, log, zmqName) {
+function updateLogTable(name, log, zmqName) {
if (log.length == 0)
return;
@@ -213,7 +218,7 @@ function updateLogTable(feedName, log, zmqName) {
tableBody = document.getElementById('table_log_body');
// only add row for attribute
- if (feedName == "Attribute" ) {
+ if (name == "Attribute" ) {
var categName = log[toPlotLocationLog];
sources.addIfNotPresent(categName);
sources.incCountOnSource(categName);
@@ -226,7 +231,7 @@ function updateLogTable(feedName, log, zmqName) {
tableBody.deleteRow(0);
}
- } else if (feedName == "Keepalive") {
+ } else if (name == "Keepalive") {
// do nothing
} else {
// do nothing
diff --git a/static/js/index/index_map.js b/static/js/index/index_map.js
index c5740bf..12699f2 100644
--- a/static/js/index/index_map.js
+++ b/static/js/index/index_map.js
@@ -23,7 +23,17 @@ class MapEvent {
this.specifName = json.specifName;
this.cityName = json.cityName;
this.text = this.categ + ": " + this.value;
- this.textMarker = "{1}
{2}".replace("{1}", this.country).replace("{2}", this.specifName+", "+this.cityName);
+ let underText = "";
+ if (this.specifName !== null && this.cityName !== null) {
+ underText = this.specifName+", "+this.cityName;
+ } else if (this.specifName !== null) {
+ underText = this.specifName;
+ } else if (this.cityName !== null) {
+ underText = this.cityName;
+ } else {
+ underText = "";
+ }
+ this.textMarker = "{1}
{2}".replace("{1}", this.country).replace("{2}", underText);
}
}
@@ -218,7 +228,6 @@ function connect_source_map() {
setTimeout(function() { connect_source_map(); }, 5000);
};
}
-connect_source_map()
$(document).ready(function () {
$( "#rotation_wait_time_selector" ).change(function() {
@@ -240,4 +249,15 @@ $(document).ready(function () {
ZOOMLEVEL = sel;
mapEventManager.directZoom();
});
+
+ if (!!window.EventSource) {
+ $.getJSON( urlForMaps, function( data ) {
+ data.forEach(function(item) {
+ var marker = L.marker([item.coord.lat, item.coord.lon]).addTo(myOpenStreetMap);
+ var mapEvent = new MapEvent(item, marker);
+ mapEventManager.addMapEvent(mapEvent);
+ });
+ connect_source_map()
+ });
+ }
});
diff --git a/zmq_dispatcher.py b/zmq_dispatcher.py
index e2a9bb7..cfb2dee 100755
--- a/zmq_dispatcher.py
+++ b/zmq_dispatcher.py
@@ -17,6 +17,7 @@
from helpers import contributor_helper
from helpers import users_helper
from helpers import trendings_helper
+from helpers import live_helper
configfile = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'config/config.cfg')
cfg = configparser.ConfigParser()
@@ -30,7 +31,6 @@
logging.basicConfig(filename=logPath, filemode='a', level=logging.INFO)
logger = logging.getLogger('zmq_dispatcher')
-CHANNEL = cfg.get('RedisLog', 'channel')
LISTNAME = cfg.get('RedisLIST', 'listName')
serv_log = redis.StrictRedis(
@@ -46,17 +46,13 @@
port=cfg.getint('RedisGlobal', 'port'),
db=cfg.getint('RedisLIST', 'db'))
+live_helper = live_helper.Live_helper(serv_redis_db, cfg)
geo_helper = geo_helper.Geo_helper(serv_redis_db, cfg)
contributor_helper = contributor_helper.Contributor_helper(serv_redis_db, cfg)
users_helper = users_helper.Users_helper(serv_redis_db, cfg)
trendings_helper = trendings_helper.Trendings_helper(serv_redis_db, cfg)
-def publish_log(zmq_name, name, content, channel=CHANNEL):
- to_send = { 'name': name, 'log': json.dumps(content), 'zmqName': zmq_name }
- serv_log.publish(channel, json.dumps(to_send))
- logger.debug('Published: {}'.format(json.dumps(to_send)))
-
def getFields(obj, fields):
jsonWalker = fields.split('.')
itemToExplore = obj
@@ -68,6 +64,8 @@ def getFields(obj, fields):
if type(itemToExplore) is list:
return { 'name': lastName , 'data': itemToExplore }
else:
+ if i == 'timestamp':
+ itemToExplore = datetime.datetime.utcfromtimestamp(int(itemToExplore)).strftime('%Y-%m-%d %H:%M:%S')
return itemToExplore
except KeyError as e:
return ""
@@ -87,7 +85,7 @@ def handler_dispatcher(zmq_name, jsonObj):
def handler_keepalive(zmq_name, jsonevent):
logger.info('Handling keepalive')
to_push = [ jsonevent['uptime'] ]
- publish_log(zmq_name, 'Keepalive', to_push)
+ live_helper.publish_log(zmq_name, 'Keepalive', to_push)
def handler_user(zmq_name, jsondata):
logger.info('Handling user')
@@ -226,7 +224,7 @@ def handler_attribute(zmq_name, jsonobj, hasAlreadyBeenContributed=False):
action,
isLabeled=eventLabeled)
# Push to log
- publish_log(zmq_name, 'Attribute', to_push)
+ live_helper.publish_log(zmq_name, 'Attribute', to_push)
###############