Permalink
Browse files

connection pooling, new style of echoprint matching

  • Loading branch information...
1 parent 4307111 commit 03391310e27f46a413ebfb6c39eb1e3036fefc0d bwhitman committed May 25, 2011
Showing with 106 additions and 56 deletions.
  1. +49 −54 API/fp.py
  2. +56 −0 API/solr.py
  3. +1 −2 README
View
103 API/fp.py
@@ -11,14 +11,14 @@
import solr
import pickle
from collections import defaultdict
-import zlib, base64, re, time, random, string
+import zlib, base64, re, time, random, string, math
try:
import json
except ImportError:
import simplejson as json
-_fp_solr = solr.SolrConnection("http://localhost:8502/solr/fp")
+_fp_solr = solr.SolrConnectionPool("http://localhost:8502/solr/fp")
_hexpoch = int(time.time() * 1000)
logger = logging.getLogger(__name__)
@@ -82,7 +82,7 @@ def pairs(l, n=2):
def decode_code_string(compressed_code_string):
- compressed_code_string = compressed_code_string.encode('ascii','ignore')
+ compressed_code_string = compressed_code_string.encode('utf8')
# do the zlib/base64 stuff
try:
# this will decode both URL safe b64 and non-url-safe
@@ -96,11 +96,9 @@ def decode_code_string(compressed_code_string):
actual_code = inflate_code_string(actual_code)
return actual_code
-# Given a query code string of any type (space sep, compressed, hexed, etc), find the best match from the FP flat (or local, or alt.)
-# Do all the elbow stuff, time matching, etc. This is called directly by the API.
-def best_match_for_query(code_string, elbow=8, local=False):
+def best_match_for_query(code_string, elbow=10, local=False):
# DEC strings come in as unicode so we have to force them to ASCII
- code_string = code_string.encode('ascii','ignore')
+ code_string = code_string.encode("utf8")
tic = int(time.time()*1000)
# First see if this is a compressed code
@@ -115,7 +113,7 @@ def best_match_for_query(code_string, elbow=8, local=False):
return Response(Response.NOT_ENOUGH_CODE, tic=tic)
# Query the FP flat directly.
- response = query_fp(code_string, rows=10, local=local, get_data=True)
+ response = query_fp(code_string, rows=30, local=local, get_data=True)
logger.debug("solr qtime is %d" % (response.header["QTime"]))
if len(response.results) == 0:
@@ -128,47 +126,38 @@ def best_match_for_query(code_string, elbow=8, local=False):
return Response(Response.SINGLE_GOOD_MATCH, TRID=response.results[0]["track_id"], score=top_match_score, qtime=response.header["QTime"], tic=tic)
else:
return Response(Response.SINGLE_BAD_MATCH, qtime=response.header["QTime"], tic=tic)
+
+ # Get the actual score for all responses
+ original_scores = {}
+ actual_scores = {}
- # OK, there are at least two matches (we almost always are in this case.)
- # Check if the delta between the top match and the 2nd top match is within elbow.
- if top_match_score - int(response.results[1]["score"]) < elbow:
- # No strong match. This either means there is a duplicate (likely) or this song doesn't exist. So get the actual score for each result
- original_scores = {}
- actual_scores = {}
-
- # For each result compute the "actual score" (based on the histogram matching)
- for r in response.results:
- original_scores[r["track_id"]] = int(r["score"])
- actual_scores[r["track_id"]] = actual_matches(code_string, r["fp"], elbow=elbow)
-
- # Sort the actual scores
- sorted_actual_scores = sorted(actual_scores.iteritems(), key=lambda (k,v): (v,k), reverse=True)
- # Get the top one
- (actual_score_top_track_id, actual_score_top_score) = sorted_actual_scores[0]
- # Get the 2nd top one (we know there is always at least 2 matches)
- (actual_score_2nd_track_id, actual_score_2nd_score) = sorted_actual_scores[1]
-
- # If the top actual score is greater than the minimum (elbow) then ...
- if actual_score_top_score >= elbow:
- # Check if the actual score is greater than its fast score. if it is, it is certainly a match.
- if actual_score_top_score > original_scores[actual_score_top_track_id]:
- return Response(Response.MULTIPLE_GOOD_MATCH_HISTOGRAM_INCREASED, TRID=actual_score_top_track_id, score=actual_score_top_score, qtime=response.header["QTime"], tic=tic)
- else:
- # If the actual score went down it still could be close enough, so check for that
- if original_scores[actual_score_top_track_id] - actual_score_top_score <= (actual_score_top_score / 2):
- return Response(Response.MULTIPLE_GOOD_MATCH_HISTOGRAM_DECREASED, TRID=actual_score_top_track_id, score=actual_score_top_score, qtime=response.header["QTime"], tic=tic)
- else:
- # If the actual score was not close enough, then no match.
- return Response(Response.MULTIPLE_BAD_HISTOGRAM_MATCH, qtime=response.header["QTime"], tic=tic)
- else:
- # last ditch. if the 2nd top actual score is much less than the top score let it through.
- if (actual_score_top_score >= elbow/2) and ((actual_score_top_score - actual_score_2nd_score) >= (actual_score_top_score / 2)): # for examples [10,4], 10-4 = 6, which >= 5, so OK
- return Response(Response.MULTIPLE_GOOD_MATCH_HISTOGRAM_DECREASED, TRID=actual_score_top_track_id, score=actual_score_top_score, qtime=response.header["QTime"], tic=tic)
- else:
- return Response(Response.MULTIPLE_BAD_HISTOGRAM_MATCH, qtime = response.header["QTime"], tic=tic)
+ # For each result compute the "actual score" (based on the histogram matching)
+ for r in response.results:
+ original_scores[r["track_id"]] = int(r["score"])
+ actual_scores[r["track_id"]] = actual_matches(code_string, r["fp"], elbow = elbow)
+
+ # Sort the actual scores
+ sorted_actual_scores = sorted(actual_scores.iteritems(), key=lambda (k,v): (v,k), reverse=True)
+
+ # Find if the highest is an outlier (then it's a match) - >1 SD higher than the mean
+ scores = [s for (tr, s) in sorted_actual_scores]
+
+ sum_scores = sum(scores)
+ mean = float(sum_scores) / float(len(scores))
+ variance = 0.0
+ for s in scores:
+ variance = variance + (s - mean) * (s - mean)
+ variance = variance / (len(scores))
+ std = math.sqrt(variance)
+
+ top_score = sorted_actual_scores[0][1]
+ if top_score > mean + std:
+ logger.debug("top score %d, mean %d, std %d" % (top_score, mean, std))
+ return Response(Response.MULTIPLE_GOOD_MATCH, TRID=sorted_actual_scores[0][0], score=int(top_score), qtime=response.header["QTime"], tic=tic)
else:
- # There was a strong match, the first one.
- return Response(Response.MULTIPLE_GOOD_MATCH, TRID=response.results[0]["track_id"], score=int(response.results[0]["score"]), qtime=response.header["QTime"], tic=tic)
+ return Response(Response.MULTIPLE_BAD_HISTOGRAM_MATCH, qtime=response.header["QTime"], tic=tic)
+
+ # If no outlier, then no match
@@ -297,8 +286,8 @@ def delete(track_ids, do_commit=True, local=False):
if local:
print "not implemented yet"
return
-
- _fp_solr.delete_many(track_ids)
+ with solr.pooled_connection(_fp_solr) as host:
+ host.delete_many(track_ids)
if do_commit:
commit()
@@ -312,12 +301,15 @@ def ingest(code_string_dict, do_commit=True, local=False):
for t in code_string_dict.keys():
docs.append({"track_id":t, "fp":code_string_dict[t]})
- _fp_solr.add_many(docs)
+ with solr.pooled_connection(_fp_solr) as host:
+ host.add_many(docs)
+
if do_commit:
commit()
def commit(local=False):
- _fp_solr.commit()
+ with solr.pooled_connection(_fp_solr) as host:
+ host.commit()
def query_fp(code_string, rows=15, local=False, get_data=False):
if local:
@@ -326,9 +318,11 @@ def query_fp(code_string, rows=15, local=False, get_data=False):
try:
# query the fp flat
if get_data:
- resp = _fp_solr.query(code_string, qt="/hashq", rows=rows, fields="track_id,fp")
+ with solr.pooled_connection(_fp_solr) as host:
+ resp = host.query(code_string, qt="/hashq", rows=rows, fields="track_id,fp")
else:
- resp = _fp_solr.query(code_string, qt="/hashq", rows=rows, fields="track_id")
+ with solr.pooled_connection(_fp_solr) as host:
+ resp = host.query(code_string, qt="/hashq", rows=rows, fields="track_id")
return resp
except solr.SolrException:
return None
@@ -337,7 +331,8 @@ def fp_code_for_track_id(track_id, local=False):
if local:
return local_fp_code_for_track_id(track_id)
# Get it from solr
- resp = _fp_solr.query("track_id:"+track_id, rows=1, fields="fp")
+ with solr.pooled_connection(_fp_solr) as host:
+ resp = host.query("track_id:"+track_id, rows=1, fields="fp")
if len(resp.results):
return resp.results[0]["fp"]
else:
View
@@ -298,6 +298,9 @@
from xml.sax.saxutils import escape, quoteattr
from xml.dom.minidom import parseString
from types import BooleanType, FloatType, IntType, ListType, LongType, StringType, UnicodeType
+from contextlib import contextmanager
+import Queue
+
__version__ = "1.3.0"
@@ -308,6 +311,59 @@
# EN special-use methods
+
+@contextmanager
+def pooled_connection(pool):
+ """
+ Provides some syntactic sugar for using a ConnectionPool. Example use:
+
+ pool = ConnectionPool(SolrConnection, 'http://localhost:8080/solr')
+ with pooled_connection(pool) as conn:
+ docs = conn.query('*:*')
+ """
+ conn = pool.get()
+ try:
+ yield conn
+ except Exception:
+ raise
+ else:
+ # only return connection to pool if an exception wasn't raised
+ pool.put(conn)
+
+class ConnectionPool(object):
+ "Thread-safe connection pool."
+
+ def __init__(self, klass, *args, **kwargs):
+ """
+ Initialize a new connection pool, where klass is the connection class.
+ Provide any addition args or kwargs to pass during initialization of new connections.
+
+ If a kwarg named pool_size is provided, it will dictate the maximum number of connections to retain in the pool.
+ If none is provided, it will default to 20.
+ """
+ self._args = args
+ self._kwargs = kwargs
+ self._queue = Queue.Queue(self._kwargs.pop('pool_size', 20))
+ self._klass = klass
+
+ def get(self):
+ "Get an available connection, creating a new one if needed."
+ try:
+ return self._queue.get_nowait()
+ except Queue.Empty:
+ return self._klass(*self._args, **self._kwargs)
+
+ def put(self, conn):
+ "Return a connection to the pool."
+ try:
+ self._queue.put_nowait(conn)
+ except Queue.Full:
+ pass
+
+class SolrConnectionPool(ConnectionPool):
+ def __init__(self, url, **kwargs):
+ ConnectionPool.__init__(self, SolrConnection, url, **kwargs)
+
def str2bool(s):
if(isinstance(s,bool)):
View
3 README
@@ -21,7 +21,6 @@ API/ - python libraries for querying and ingesting into the Echoprint server
API/api.py - web.py sample API wrapper for evaluation
API/fp.py - main python module for Echoprint
API/solr.py - Solr's python module (with slight enhancements)
-API/threadrun.py - simplified threadpool
Hashr/ - java project for a custom solr field type to handle Echoprint data
@@ -85,7 +84,7 @@ fp_code : packed code from codegen
<field name="fp" type="fphash" indexed="true" stored="false" required="true"/>
-Then override fp.py's fp_code_for_track_id method with your own datastore accessor. Stored Echoprint codes are not used in practice unless there is a close match between tracks at the index level. Note that fp_code_for_track_id is called threaded.
+Then override fp.py's fp_code_for_track_id method with your own datastore accessor.
- You can run Echoprint in "local" mode which uses a python dict to store and index codes instead of Solr. You can store and index about 100K tracks in 1GB or so in practice using this mode. This is only useful for small scale testing. Each fp.py method takes an optional "local" kwarg.

0 comments on commit 0339131

Please sign in to comment.