Skip to content

Commit

Permalink
Style fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
alexkiro committed Jul 12, 2014
1 parent 20ec88c commit 3159206
Show file tree
Hide file tree
Showing 12 changed files with 85 additions and 71 deletions.
4 changes: 2 additions & 2 deletions pyzor/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ class BatchClient(Client):
def __init__(self, accounts=None, timeout=None, spec=None, batch_size=10):
Client.__init__(self, accounts=accounts, timeout=timeout, spec=spec)
self.batch_size = batch_size
self.r_requests = None
self.w_requests = None
self.r_requests = {}
self.w_requests = {}
self.flush()

def report(self, digest, address=("public.pyzor.org", 24441)):
Expand Down
26 changes: 13 additions & 13 deletions pyzor/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

import pyzor.account

# Configuration files for the Pyzor Server

# Configuration files for the Pyzor Server
def load_access_file(access_fn, accounts):
"""Load the ACL from the specified file, if it exists, and return an
ACL dictionary, where each key is a username and each value is a set
Expand Down Expand Up @@ -57,7 +57,7 @@ def load_access_file(access_fn, accounts):
log.warn("Invalid ACL line: %r", line)
continue
try:
allowed = {"allow": True, "deny" : False}[allowed]
allowed = {"allow": True, "deny": False}[allowed]
except KeyError:
log.warn("Invalid ACL line: %r", line)
continue
Expand Down Expand Up @@ -85,6 +85,7 @@ def load_access_file(access_fn, accounts):
log.info("ACL: %r", acl)
return acl


def load_passwd_file(passwd_fn):
"""Load the accounts from the specified file.
Expand Down Expand Up @@ -116,8 +117,8 @@ def load_passwd_file(passwd_fn):
log.info("Accounts: %s", ",".join(accounts))
return accounts

# Configuration files for the Pyzor Client

# Configuration files for the Pyzor Client
def load_accounts(filepath):
"""Layout of file is: host : port : username : salt,key"""
accounts = {}
Expand All @@ -136,8 +137,8 @@ def load_accounts(filepath):
continue
try:
port = int(port)
except ValueError, e:
log.warn("account file: invalid line %d: %s", lineno, e)
except ValueError as ex:
log.warn("account file: invalid line %d: %s", lineno, ex)
address = (host, port)
salt, key = pyzor.account.key_from_hexstr(key)
if not salt and not key:
Expand All @@ -146,8 +147,8 @@ def load_accounts(filepath):
continue
try:
accounts[address] = pyzor.account.Account(username, salt, key)
except ValueError, e:
log.warn("account file: invalid line %d: %s", lineno, e)
except ValueError as ex:
log.warn("account file: invalid line %d: %s", lineno, ex)
else:
log.warn("No accounts are setup. All commands will be executed by "
"the anonymous user.")
Expand All @@ -161,8 +162,8 @@ def load_servers(filepath):
servers = []
else:
servers = []
with open(filepath) as f:
for line in f:
with open(filepath) as serverf:
for line in serverf:
line = line.strip()
if re.match("[^#][a-zA-Z0-9.-]+:[0-9]+", line):
address, port = line.rsplit(":", 1)
Expand All @@ -173,10 +174,10 @@ def load_servers(filepath):
servers = [("public.pyzor.org", 24441)]
return servers

# Common configurations

# Common configurations
def setup_logging(log_name, filepath, debug):
"""Setup logging according to the specified options. Return the Logger
"""Setup logging according to the specified options. Return the Logger
object.
"""
fmt = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
Expand Down Expand Up @@ -206,6 +207,7 @@ def setup_logging(log_name, filepath, debug):

return logger


def expand_homefiles(homefiles, category, homedir, config):
"""Set the full file path for these configuration files."""
for filename in homefiles:
Expand All @@ -216,5 +218,3 @@ def expand_homefiles(homefiles, category, homedir, config):
if not os.path.isabs(filepath):
filepath = os.path.join(homedir, filepath)
config.set(category, filename, filepath)


9 changes: 7 additions & 2 deletions pyzor/digest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,21 @@
HASH = hashlib.sha1
HASH_SIZE = len(HASH(b"").hexdigest())


class HTMLStripper(HTMLParser.HTMLParser):
"""Strip all tags from the HTML."""
def __init__(self, collector):
HTMLParser.HTMLParser.__init__(self)
self.reset()
self.collector = collector

def handle_data(self, data):
"""Keep track of the data."""
data = data.strip()
if data:
self.collector.append(data)


class DataDigester(object):
"""The major workhouse class."""
__slots__ = ['value', 'digest']
Expand Down Expand Up @@ -49,7 +52,9 @@ class DataDigester(object):
# Note that an empty string will always be used to remove whitespace.
unwanted_txt_repl = ''

def __init__(self, msg, spec=digest_spec):
def __init__(self, msg, spec=None):
if spec is None:
spec = digest_spec
self.value = None
self.digest = HASH()

Expand Down Expand Up @@ -128,7 +133,7 @@ def digest_payloads(cls, msg):
errors = "ignore"
if not charset:
charset = "ascii"
elif (charset.lower().replace("_", "-") in ("quopri-codec",
elif (charset.lower().replace("_", "-") in ("quopri-codec",
"quopri", "quoted-printable", "quotedprintable")):
errors = "strict"

Expand Down
2 changes: 0 additions & 2 deletions pyzor/engines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,3 @@
"mysql": mysql.handle,
"redis": redis_.handle,
}


2 changes: 2 additions & 0 deletions pyzor/engines/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
DBHandle = namedtuple("DBHandle", ["single_threaded", "multi_threaded",
"multi_processing"])


class DatabaseError(Exception):
pass


class Record(object):
"""Prefix conventions used in this class:
r = report (spam)
Expand Down
13 changes: 7 additions & 6 deletions pyzor/engines/gdbm_.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
except ImportError:
_has_gdbm = False

import sys
import time
import logging
import datetime
import threading

from pyzor.engines.common import *
from pyzor.engines.common import Record, DBHandle


class GdbmDBHandle(object):
absolute_source = True
Expand All @@ -35,6 +35,8 @@ class GdbmDBHandle(object):
def __init__(self, fn, mode, max_age=None):
self.max_age = max_age
self.db = gdbm.open(fn, mode)
self.reorganize_timer = None
self.sync_timer = None
self.start_reorganizing()
self.start_syncing()

Expand All @@ -43,14 +45,14 @@ def __iter__(self):
while k != None:
yield k
k = self.db.nextkey(k)

def iteritems(self):
for k in self:
try:
yield k, self._really_getitem(k)
except Exception as e:
self.log.warning("Invalid record %s: %s", k, e)

def items(self):
return list(self.iteritems())

Expand Down Expand Up @@ -158,6 +160,7 @@ def decode_record_1(cls, s):
setattr(r, f, decode(part))
return r


class ThreadedGdbmDBHandle(GdbmDBHandle):
"""Like GdbmDBHandle, but handles multi-threaded access."""

Expand Down Expand Up @@ -187,5 +190,3 @@ def apply_method(self, method, varargs=(), kwargs=None):
handle = DBHandle(single_threaded=GdbmDBHandle,
multi_threaded=ThreadedGdbmDBHandle,
multi_processing=None)


24 changes: 13 additions & 11 deletions pyzor/engines/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,14 @@ def __init__(self, fn, mode, max_age=None):
self.host, self.user, self.passwd, self.db_name, \
self.table_name = fn.split(",")
self.last_connect_attempt = 0 # We have never connected.
self.reorganize_timer = None
self.reconnect()
self.start_reorganizing()

def _get_new_connection(self):
"""Returns a new db connection."""
db = MySQLdb.connect(host=self.host, user=self.user,
db=self.db_name, passwd=self.passwd)
db=self.db_name, passwd=self.passwd)
db.autocommit(True)
return db

Expand Down Expand Up @@ -89,7 +90,7 @@ def __iter__(self):
break
yield row[0]
c.close()

def iteritems(self):
c = self.db.cursor(cursorclass=MySQLdb.cursors.SSCursor)
c.execute("SELECT digest, r_count, wl_count, r_entered, r_updated, "
Expand All @@ -100,7 +101,7 @@ def iteritems(self):
break
yield row[0], Record(*row[1:])
c.close()

def items(self):
return list(self.iteritems())

Expand All @@ -115,8 +116,8 @@ def __del__(self):
def _safe_call(self, name, method, args):
try:
return method(*args, db=self.db)
except (MySQLdb.Error, AttributeError), e:
self.log.error("%s failed: %s", name, e)
except (MySQLdb.Error, AttributeError) as ex:
self.log.error("%s failed: %s", name, ex)
self.reconnect()
# Retrying just complicates the logic - we don't really care if
# a single query fails (and it's possible that it would fail)
Expand Down Expand Up @@ -198,8 +199,8 @@ def start_reorganizing(self):
self.reorganize_timer.setDaemon(True)
self.reorganize_timer.start()

class ThreadedMySQLDBHandle(MySQLDBHandle):

class ThreadedMySQLDBHandle(MySQLDBHandle):
def __init__(self, fn, mode, max_age=None, bound=None):
self.bound = bound
if self.bound:
Expand All @@ -222,15 +223,15 @@ def _safe_call(self, name, method, args):
db = self._get_connection()
try:
return method(*args, db=db)
except (MySQLdb.Error, AttributeError) as e:
self.log.error("%s failed: %s", name, e)
except (MySQLdb.Error, AttributeError) as ex:
self.log.error("%s failed: %s", name, ex)
if not self.bound:
raise DatabaseError("Database temporarily unavailable.")
try:
# Connection might be timeout, ping and retry
db.ping(True)
return method(*args, db=db)
except (MySQLdb.Error, AttributeError) as e:
except (MySQLdb.Error, AttributeError) as ex:
# attempt a new connection, if we can retry
db = self._reconnect(db)
raise DatabaseError("Database temporarily unavailable.")
Expand Down Expand Up @@ -261,6 +262,7 @@ def __del__(self):
except Queue.Empty:
break


class ProcessMySQLDBHandle(MySQLDBHandle):
def __init__(self, fn, mode, max_age=None):
MySQLDBHandle.__init__(self, fn, mode, max_age=max_age)
Expand All @@ -276,8 +278,8 @@ def _safe_call(self, name, method, args):
try:
db = self._get_new_connection()
return method(*args, db=db)
except (MySQLdb.Error, AttributeError) as e:
self.log.error("%s failed: %s", name, e)
except (MySQLdb.Error, AttributeError) as ex:
self.log.error("%s failed: %s", name, ex)
raise DatabaseError("Database temporarily unavailable.")
finally:
if db is not None:
Expand Down
8 changes: 4 additions & 4 deletions pyzor/engines/redis_.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ def _decode_record(r):
def __iter__(self):
for key in self.db.keys(self._real_key("*")):
yield key.rsplit(".", 1)[-1]

def iteritems(self):
for key in self:
try:
yield key, self[key]
except Exception as e:
self.log.warning("Invalid record %s: %s", key, e)
except Exception as ex:
self.log.warning("Invalid record %s: %s", key, ex)

def items(self):
return list(self.iteritems())
Expand Down Expand Up @@ -109,8 +109,8 @@ def __setitem__(self, key, value):
def __delitem__(self, key):
self.db.delete(self._real_key(key))

class ThreadedRedisDBHandle(RedisDBHandle):

class ThreadedRedisDBHandle(RedisDBHandle):
def __init__(self, fn, mode, max_age=None, bound=None):
RedisDBHandle.__init__(self, fn, mode, max_age=max_age)

Expand Down
12 changes: 6 additions & 6 deletions pyzor/forwarder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@
import logging
import threading


class Forwarder(object):
"""Forwards digest to remote pyzor servers"""

def __init__(self, forwarding_client, remote_servers,
max_queue_size=10000):
"""
forward_client: a pyzor.client.Client instance to use as
forward_client: a pyzor.client.Client instance to use as
forwarding client
remote_servers: a list of (hostname,port) tuples where digests should
remote_servers: a list of (hostname,port) tuples where digests should
be forwarded to
max_queue_size: max amount of queued digests
"""
Expand Down Expand Up @@ -39,9 +40,9 @@ def _forward_loop(self):
self.forwarding_client.whitelist(digest, server)
else:
self.forwarding_client.report(digest, server)
except Exception as e:
except Exception as ex:
self.log.warn('Forwarding digest %s to %s failed: %s',
digest, server, e)
digest, server, ex)

def queue_forward_request(self, digest, whitelist=False):
"""If forwarding is enabled, insert a digest into the forwarding queue
Expand All @@ -58,8 +59,7 @@ def queue_forward_request(self, digest, whitelist=False):

def start_forwarding(self):
"""start the forwarding thread"""
t = threading.Thread(target=self._forward_loop)
t.start()
threading.Thread(target=self._forward_loop).start()

def stop_forwarding(self):
"""disable forwarding and tell the forwarding thread to end itself"""
Expand Down
Loading

0 comments on commit 3159206

Please sign in to comment.