Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Added cassandra testing module, updated delta to work better with uni…

…code.
  • Loading branch information...
commit e9480d85b78ebd64f07a76e02a9fd5aa17606dba 1 parent b2b1f06
@wehriam wehriam authored
View
37 hiispider/components/cassandra.py
@@ -16,7 +16,7 @@
from twisted.internet.defer import inlineCallbacks, returnValue
from hiispider.components.base import Component, shared
from hiispider.components.logger import Logger
-
+import binascii
LOGGER = logging.getLogger(__name__)
@@ -57,7 +57,7 @@ def initialize(self):
keyspace=self.keyspace,
pool_size=self.pool_size)
self.client.startService()
- LOGGER.info('%s initialized.' % self.__class__.__name__)
+ LOGGER.info('%s initialized, connected to: %s.' % (self.__class__.__name__, self.servers))
def shutdown(self):
LOGGER.info("Stopping %s" % self.__class__.__name__)
@@ -77,6 +77,37 @@ def remove(self, *args, **kwargs):
return self.client.remove(*args, **kwargs)
@shared
+ def get(self, *args, **kwargs):
+ return self.client.get(*args, **kwargs)
+
+ @shared
+ def get_key_range(self, *args, **kwargs):
+ return self.client.get_key_range(*args, **kwargs)
+
+ @shared
+ def get_slice(self, *args, **kwargs):
+ return self.client.get_slice(*args, **kwargs)
+
+ @shared
+ @inlineCallbacks
+ def get_delta(self, delta_id):
+ """Get data from cassandra by user delta_id."""
+ try:
+ columns = yield self.client.get_slice(
+ key=binascii.unhexlify(delta_id),
+ column_family=self.cf_delta)
+ except NotFoundException:
+ LOGGER.error("%s not found." % delta_id)
+ return
+ results = dict([(x.column.name, x.column.value) for x in columns])
+ results["data"] = decompress(results["data"])
+ if "old_data" in results:
+ results["old_data"] = decompress(results["old_data"])
+ if "new_data" in results:
+ results["new_data"] = decompress(results["new_data"])
+ returnValue(results)
+
+ @shared
@inlineCallbacks
def getDataByIDAndUUID(self, user_id, uuid):
"""Get data from cassandra by user id and uuid."""
@@ -88,7 +119,7 @@ def getDataByIDAndUUID(self, user_id, uuid):
except NotFoundException:
return
obj = yield threads.deferToThread(decompress, data.column.value)
- returnValue(obj)
+ returnValue(obj)
@shared
@inlineCallbacks
View
21 hiispider/delta.py
@@ -73,9 +73,11 @@ def _shift(a):
def _simplesort(a):
if type(a) is dict:
- return [(x, _simplesort(a[x])) for x in sorted(a)]
+ return [(unicode(x), _simplesort(a[x])) for x in sorted(a)]
elif type(a) is list:
return sorted([_simplesort(x) for x in a])
+ elif type(a) is str:
+ return unicode(a)
return a
@@ -86,21 +88,22 @@ def _sort(a, ignores, includes):
# If there are include paths, iterate through keys in these paths.
included_keys = _included(includes)
if included_keys:
- return [(x, _sort(a[x], _shift(ignores), _shift(includes)))
+ return [(unicode(x), _sort(a[x], _shift(ignores), _shift(includes)))
for x in sorted(included_keys) if x in a]
else:
# Disregard keys at the top level of the ignore paths.
ignored_keys = _ignored(ignores)
- return [(x, _sort(a[x], _shift(ignores), _shift(includes)))
+ return [(unicode(x), _sort(a[x], _shift(ignores), _shift(includes)))
for x in sorted(a) if x not in ignored_keys]
elif type(a) is list:
try:
return sorted([_sort(x, ignores, includes) for x in a])
except ValueError:
return a
+ elif type(a) is str:
+ return unicode(a)
return a
-
def _hash(a, ignores, includes):
# Item is hashable no need to serialize.
try:
@@ -148,9 +151,9 @@ def _narrow(a, b, path):
indicated by the path.
"""
if not b:
- if isinstance(a, dict):
+ if type(a) is dict:
b = {}
- elif isinstance(a, list):
+ elif type(a) is list:
b = []
# If the path is empty, no need to narrow any further.
# If there is nothing to narrow, no need to narrow further.
@@ -161,7 +164,7 @@ def _narrow(a, b, path):
a.__class__,
b.__class__))
key = path[0]
- if isinstance(a, list):
+ if type(a) is list:
a_dicts = [x[key] for x in a if type(x) is dict and key in x]
b_dicts = [x[key] for x in b if type(x) is dict and key in x]
if a_dicts or b_dicts:
@@ -252,10 +255,10 @@ def _call(self, a, b, pathdata):
raise TypeError("Cannot generate delta from %s to %s." % (
a.__class__,
b.__class__))
- elif isinstance(a, list):
+ elif type(a) is list:
values = _compare_lists(a, b, ignores, includes)
return [Delta(pathstring, x, dates=dates) for x in values]
- elif isinstance(a, dict):
+ elif type(a) is dict:
if self.return_new_keys:
return [Delta(pathstring, {x:a[x]}, dates=dates) for x in set(a) - set(b)]
elif _hash(a, ignores, includes) != _hash(b, ignores, includes):
View
3  hiispider/metacomponents/__init__.py
@@ -13,6 +13,7 @@
from jobscheduler import JobScheduler
from identityscheduler import IdentityScheduler
from testing import Testing
+from deltatesting import DeltaTesting
-__all__ = ['PageGetter', 'Worker', 'JobGetter', 'Interface', "JobScheduler", "IdentityScheduler", "Testing"]
+__all__ = ['PageGetter', 'Worker', 'JobGetter', 'Interface', "JobScheduler", "IdentityScheduler", "Testing", "DeltaTesting"]
View
65 hiispider/metacomponents/deltatesting.py
@@ -0,0 +1,65 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import time
+import logging
+import zlib
+import ujson as json
+from copy import copy
+from telephus.cassandra.c08.ttypes import NotFoundException
+from twisted.internet.defer import inlineCallbacks, returnValue
+from hiispider.components import *
+from hiispider.metacomponents import *
+import logging
+from hiiguid import HiiGUID
+from zlib import decompress
+from difflib import SequenceMatcher
+import binascii
+from pprint import pformat
+
+LOGGER = logging.getLogger(__name__)
+
+
+class DeltaTesting(Component):
+
+ allow_clients = False
+ requires = [Cassandra]
+
+ def __init__(self, server, config, server_mode, **kwargs):
+ super(DeltaTesting, self).__init__(server, server_mode)
+ self.config = copy(config)
+ self.service_mapping = config["service_mapping"]
+ self.server.expose(self.regenerate_by_uuid)
+
+ @inlineCallbacks
+ def regenerate_by_uuid(self, uuid):
+ delta = yield self.server.cassandra.get_delta(uuid)
+ function_name = self.service_mapping.get(
+ delta["subservice"],
+ delta["subservice"])
+ delta_func = self.server.functions[function_name]["delta"]
+ if not delta_func:
+ raise Exception("Delta function %s "
+ "does not exist." % delta["service"])
+ deltas = delta_func(delta["new_data"], delta["old_data"])
+ if not deltas:
+ raise Exception("No deltas were generated.")
+ # Find nearest delta.
+ delta_options = []
+ s = SequenceMatcher()
+ s.set_seq1(json.dumps(delta["data"]))
+ for delta in deltas:
+ LOGGER.debug(pformat(delta.data))
+ value = json.dumps(delta.data)
+ s.set_seq2(value)
+ delta_options.append((s.ratio(), value, delta.data))
+ # Sort to find the most similar option.
+ delta_options = sorted(delta_options, key=lambda x: x[0])
+ replacement_data = zlib.compress(json.dumps(delta_options[-1][2]))
+ ts = str(time.time())
+ mapping = {"updated":ts, "data":replacement_data}
+ yield self.server.cassandra.batch_insert(
+ key=binascii.unhexlify(uuid),
+ column_family=self.server.cassandra.cf_delta,
+ mapping=mapping)
+ LOGGER.debug("%s deltas generated." % len(deltas))
+ returnValue(True)
View
10 hiispider/metacomponents/jobexecuter.py
@@ -144,8 +144,16 @@ def generate_deltas(self, new_data, job, save=True):
old_data = yield self.server.cassandra.getData(job, consistency=2)
if not old_data:
return
+
# make sure old_data and new_data are similar (strings are all unicode, ect)
- new_data = json.decode(json.encode(new_data))
+
+ # Taking this out as encoding and decoding should be done by the same
+ # module (now ujson) and encoding / decoding is CPU intensive.
+ # Note that the autodelta now compensates for unicode / str differences
+ # - JDW
+
+ # # new_data = json.decode(json.encode(new_data))
+
# get deltas by comparing new and old data sets
deltas = delta_func(new_data, old_data)
for delta in deltas:
View
3  hiispider/metacomponents/testing.py
@@ -62,9 +62,6 @@ def execute_by_uuid(self, uuid):
f = self.server.functions[job.function_name]
try:
data = yield maybeDeferred(f['function'], **job.kwargs)
- delta_enabled = self.config.get('delta_enabled', False)
- if delta_enabled:
- yield self.server.worker.generate_deltas(data, job, save=False)
except Exception, e:
LOGGER.debug(job.kwargs)
if hasattr(e, "response"):
View
3  hiispider/server.py
@@ -46,7 +46,8 @@
Interface,
JobScheduler,
IdentityScheduler,
- Testing]
+ Testing,
+ DeltaTesting]
# The intra-server poll interval
POLL_INTERVAL = 60
View
3  start_configs.txt
@@ -1,6 +1,9 @@
# Job Scheduler
python componentserver.py --components=jobscheduler,mysql,jobqueue,logger --http_port=8000 verbose
+# Delta Testing server
+python componentserver.py --components=deltatesting,cassandra --http_port=8000 verbose
+
# Testing server
python componentserver.py --components=testing,mysql,pagegetter,cassandra,redis --http_port=8000 verbose
View
24 tests/delta.py
@@ -59,15 +59,21 @@ def autogenerate_z_include(*args, **kwargs):
# 'a' and 'b' have the included key, and it changes.
a = [{"x":1, "y":1}, {"x":3, "y":2, "z":1}]
b = [{"x":1, "y":1}, {"x":3, "y":2, "z":2}, {"x":4, "y":9, "z":3}]
- self.assertEqual(autogenerate_z_include(a, b), [{"x":3, "y":2, "z":1}])
+ self.assertEqual(
+ sorted(autogenerate_z_include(a, b)),
+ sorted([{"x":3, "y":2, "z":1}]))
# 'a' and 'b' have the included key, and it changes.
a = [{"x":1, "y":1}, {"x":3, "y":2, "z":2}, {"x":4, "y":9, "z":3}]
b = [{"x":1, "y":1}, {"x":3, "y":2, "z":1}]
- self.assertEqual(autogenerate_z_include(a, b), [{"x":3, "y":2, "z":2}, {"x":4, "y":9, "z":3}])
+ self.assertEqual(
+ sorted(autogenerate_z_include(a, b)),
+ sorted([{"x":3, "y":2, "z":2}, {"x":4, "y":9, "z":3}]))
# 'a' and 'b' have the included key, and it changes.
a = [{"x":1, "y":1, "z":1}, {"x":3, "y":2, "z":2}, {"x":4, "y":9, "z":3}]
b = [{"x":4, "y":5, "z":1}]
- self.assertEqual(autogenerate_z_include(a, b), [{"x":3, "y":2, "z":2}, {"x":4, "y":9, "z":3}])
+ self.assertEqual(
+ sorted(autogenerate_z_include(a, b)),
+ sorted([{"x":3, "y":2, "z":2}, {"x":4, "y":9, "z":3}]))
# Nested, multiple includes
autogenerator = delta.Autogenerator(includes=["example/y", "example/z"], paths="example")
a = {"example":[{"x":1, "y":2, "z":3}]}
@@ -174,3 +180,15 @@ def autogenerate_goodreads(*args, **kwargs):
old = decode(read("%s/goodreads/old.json" % DATAPATH))
new = decode(read("%s/goodreads/new.json" % DATAPATH))
self.assertEqual(autogenerate_goodreads(old, new), [])
+
+ def test_unicode(self):
+ a = {"x":["y", 1, 2]}
+ b = {"x":[1, u"y", 2]}
+ self.assertEqual(autogenerate(a, b), [])
+ a = {"x":["y", 1, 2]}
+ b = {u"x":[1, u"y", 2]}
+ self.assertEqual(autogenerate(a, b), [])
+ a = [{"z":1}, {"x":["y", 1, 2]}]
+ b = [{u"x":[1, u"y", 2]}, {u"z":1}]
+ self.assertEqual(autogenerate(a, b), [])
+
Please sign in to comment.
Something went wrong with that request. Please try again.