Browse files

PEP8

  • Loading branch information...
1 parent 2c8afe6 commit 2093e1d49506bf5f13f3297dfe3b151141fd7405 @desbma desbma committed Oct 18, 2012
Showing with 87 additions and 89 deletions.
  1. +26 −27 ddc_client.py
  2. +61 −62 ddc_server.py
View
53 ddc_client.py
@@ -11,7 +11,7 @@ class DebugLogRecordFactory():
def __init__(self):
self.default_logrecord_factory = logging.getLogRecordFactory()
- def log(self,*args, **kwargs):
+ def log(self, *args, **kwargs):
record = self.default_logrecord_factory(*args, **kwargs)
record.msg = "[CLIENT] %s" % (record.msg)
return record
@@ -23,7 +23,7 @@ class NeedRestartException(Exception):
class InvalidServerResponse(Exception):
- def __init__(self,http_code):
+ def __init__(self, http_code):
self.http_code = http_code
def __str__(self):
@@ -35,8 +35,8 @@ class DistributedCrawlerClient():
CLIENT_VERSION = 1
http_client = httplib2.Http(timeout=10)
- def __init__(self,server,port):
- self.base_url = "http://%s:%d" % (server,port)
+ def __init__(self, server, port):
+ self.base_url = "http://%s:%d" % (server, port)
self.api_base_url = "%s/domains" % (self.base_url)
def start(self,):
@@ -48,8 +48,8 @@ def start(self,):
try:
# see README.md for params description
- response = self.apiRequest({ "version" : str(__class__.CLIENT_VERSION),
- "pc_version" : str(ddc_process.VERSION) }).decode("utf-8")
+ response = self.apiRequest({"version" : str(__class__.CLIENT_VERSION),
+ "pc_version" : str(ddc_process.VERSION)}).decode("utf-8")
# read response
xml_response = xml.etree.ElementTree.fromstring(response)
@@ -61,13 +61,13 @@ def start(self,):
for xml_upgrade in xml_response.findall("upgrades/upgrade"):
type = xml_upgrade.get("type")
version = xml_upgrade.get("version")
- logging.getLogger().info("Upgrading '%s' component to version %s" % (type,version) )
+ logging.getLogger().info("Upgrading '%s' component to version %s" % (type, version))
url = self.base_url + xml_upgrade.get("url")
response, content = __class__.http_client.request(url)
zip_filename = response["content-disposition"].split(";")[1].split("=")[1]
- with open(zip_filename,"r+b") as file_handle:
+ with open(zip_filename, "r+b") as file_handle:
file_handle.write(content)
- archive = zipfile.ZipFile(file_handle,"r")
+ archive = zipfile.ZipFile(file_handle, "r")
archive.extractall()
archive.close()
need_restart = True
@@ -81,12 +81,12 @@ def start(self,):
continue
# check domains
- logging.getLogger().info("Got %d domains to check from server" % (domain_count) )
+ logging.getLogger().info("Got %d domains to check from server" % (domain_count))
spam_domain_indexes = set()
failed_domain_indexes = set()
for (i, xml_domain) in enumerate(xml_domains):
domain = xml_domain.get("name")
- logging.getLogger().debug("Checking domain '%s'" % (domain) )
+ logging.getLogger().debug("Checking domain '%s'" % (domain))
try:
if ddc_process.is_spam(domain):
spam_domain_indexes.add(i)
@@ -95,21 +95,21 @@ def start(self,):
# prepare POST request content
xml_root = xml.etree.ElementTree.Element("ddc")
- xml_domain_list = xml_response.find("domainlist") # reuse the previous XML domain list
+ xml_domain_list = xml_response.find("domainlist") # reuse the previous XML domain list
for (i, xml_domain) in enumerate(xml_domain_list.iterfind("domain")):
if i in failed_domain_indexes:
- xml_domain.set("failed","1")
+ xml_domain.set("failed", "1")
else:
is_spam = (i in spam_domain_indexes)
- xml_domain.set("spam",str(int(is_spam)))
+ xml_domain.set("spam", str(int(is_spam)))
xml_root.append(xml_domain_list)
# send POST request
post_data = xml.etree.ElementTree.tostring(xml_root)
- self.apiRequest( { "version" : str(__class__.CLIENT_VERSION),
- "pc_version" : str(ddc_process.VERSION) },
- True,
- post_data) # we don't care for what the server actually returns here
+ self.apiRequest({"version" : str(__class__.CLIENT_VERSION),
+ "pc_version" : str(ddc_process.VERSION)},
+ True,
+ post_data) # we don't care for what the server actually returns here
except InvalidServerResponse as e:
logging.getLogger().warning(e)
@@ -118,16 +118,15 @@ def start(self,):
logging.getLogger().info("Restarting client")
exit(7)
-
- def apiRequest(self,url_params,post_request=False,post_data=None):
+ def apiRequest(self, url_params, post_request=False, post_data=None):
# construct url
- url = "%s?%s" % (self.api_base_url,urllib.parse.urlencode(url_params))
+ url = "%s?%s" % (self.api_base_url, urllib.parse.urlencode(url_params))
# send request
if post_request:
- logging.getLogger().info("Posting data to '%s'" % (url) )
- response, content = __class__.http_client.request(url,"POST",post_data)
+ logging.getLogger().info("Posting data to '%s'" % (url))
+ response, content = __class__.http_client.request(url, "POST", post_data)
else:
- logging.getLogger().info("Fetching '%s'" % (url) )
+ logging.getLogger().info("Fetching '%s'" % (url))
response, content = __class__.http_client.request(url)
if response.status not in (200, 202):
raise InvalidServerResponse(response.status)
@@ -154,7 +153,7 @@ def apiRequest(self,url_params,post_request=False,post_data=None):
cli_parser.add_argument("-v",
"--verbosity",
action="store",
- choices=("quiet","warning","info","debug"),
+ choices=("quiet", "warning", "info", "debug"),
default="info",
dest="verbosity",
help="Level of output to diplay")
@@ -164,7 +163,7 @@ def apiRequest(self,url_params,post_request=False,post_data=None):
logging.basicConfig(format="%(message)s")
logger = logging.getLogger()
if options.verbosity == "quiet":
- logger.setLevel(logging.CRITICAL+1)
+ logger.setLevel(logging.CRITICAL + 1)
elif options.verbosity == "warning":
logger.setLevel(logging.WARNING)
elif options.verbosity == "info":
@@ -175,5 +174,5 @@ def apiRequest(self,url_params,post_request=False,post_data=None):
logging.setLogRecordFactory(logrecord_factory.log)
# start client
- client = DistributedCrawlerClient(options.server,options.port)
+ client = DistributedCrawlerClient(options.server, options.port)
client.start()
View
123 ddc_server.py
@@ -13,7 +13,7 @@ class DebugLogRecordFactory():
def __init__(self):
self.default_logrecord_factory = logging.getLogRecordFactory()
- def log(self,*args, **kwargs):
+ def log(self, *args, **kwargs):
record = self.default_logrecord_factory(*args, **kwargs)
record.msg = "[SERVER] %s" % (record.msg)
return record
@@ -53,47 +53,47 @@ class XmlMessage:
MAX_DOMAIN_LIST_SIZE = 20
- def __init__(self,client_version,page_processor_version):
+ def __init__(self, client_version, page_processor_version):
self.xml = xml.etree.ElementTree.Element("ddc")
# generate upgrade nodes
- xml_upgrade = xml.etree.ElementTree.SubElement(self.xml,"upgrades")
+ xml_upgrade = xml.etree.ElementTree.SubElement(self.xml, "upgrades")
need_upgrade = False
if client_version < DistributedCrawlerServer.LAST_CLIENT_VERSION:
# need to upgrade the client
- xml.etree.ElementTree.SubElement(xml_upgrade,"upgrade",attrib={"type" : "client",
- "url" : "/upgrade?file=client-v%d.zip" % (DistributedCrawlerServer.LAST_CLIENT_VERSION) ,
- "version" : str(DistributedCrawlerServer.LAST_CLIENT_VERSION) } )
+ xml.etree.ElementTree.SubElement(xml_upgrade, "upgrade", attrib={"type" : "client",
+ "url" : "/upgrade?file=client-v%d.zip" % (DistributedCrawlerServer.LAST_CLIENT_VERSION),
+ "version" : str(DistributedCrawlerServer.LAST_CLIENT_VERSION)})
need_upgrade = True
if page_processor_version < DistributedCrawlerServer.LAST_PC_VERSION:
# need to upgrade the page processing component
- xml.etree.ElementTree.SubElement(xml_upgrade,"upgrade",attrib={"type" : "page analysis",
- "url" : "/upgrade?file=page-processor-v%d.zip" % (DistributedCrawlerServer.LAST_PC_VERSION),
- "version" : str(DistributedCrawlerServer.LAST_CLIENT_VERSION) } )
+ xml.etree.ElementTree.SubElement(xml_upgrade, "upgrade", attrib={"type" : "page analysis",
+ "url" : "/upgrade?file=page-processor-v%d.zip" % (DistributedCrawlerServer.LAST_PC_VERSION),
+ "version" : str(DistributedCrawlerServer.LAST_CLIENT_VERSION)})
need_upgrade = True
if not need_upgrade:
# generate domain list nodes
- xml_domain_list = xml.etree.ElementTree.SubElement(self.xml,"domainlist")
- domains_to_send_count = min(len(DistributedCrawlerServer.domains_to_check),__class__.MAX_DOMAIN_LIST_SIZE)
+ xml_domain_list = xml.etree.ElementTree.SubElement(self.xml, "domainlist")
+ domains_to_send_count = min(len(DistributedCrawlerServer.domains_to_check), __class__.MAX_DOMAIN_LIST_SIZE)
for i in range(domains_to_send_count):
- domain = random.choice(DistributedCrawlerServer.domains_to_check) # pick a random domain in the list
- xml.etree.ElementTree.SubElement(xml_domain_list,"domain",attrib={"name":domain})
- logging.getLogger().debug("Picked domain %s to be checked" % (domain) )
+ domain = random.choice(DistributedCrawlerServer.domains_to_check) # pick a random domain in the list
+ xml.etree.ElementTree.SubElement(xml_domain_list, "domain", attrib={"name" : domain})
+ logging.getLogger().debug("Picked domain %s to be checked" % (domain))
# add a signature, so we can detect a malicious client trying to send fake results for different domains
- sig = __class__.getXmlDomainListSig(xml_domain_list,as_bytes=False)[1]
- xml_domain_list.set("sig",sig)
+ sig = __class__.getXmlDomainListSig(xml_domain_list, as_bytes=False)[1]
+ xml_domain_list.set("sig", sig)
if not domains_to_send_count:
logging.getLogger().warning("No more domains to be checked")
def __str__(self):
- return xml.etree.ElementTree.tostring(self.xml,"unicode")
+ return xml.etree.ElementTree.tostring(self.xml, "unicode")
@staticmethod
- def getXmlDomainListSig(xml_domain_list,as_bytes=True,as_string=True):
- hasher = hmac.new(HMAC_KEY,digestmod=hashlib.sha512)
+ def getXmlDomainListSig(xml_domain_list, as_bytes=True, as_string=True):
+ hasher = hmac.new(HMAC_KEY, digestmod=hashlib.sha512)
for domain in xml_domain_list.iterfind("domain"):
hasher.update(domain.get("name").encode("utf-8"))
if as_bytes:
@@ -111,21 +111,21 @@ class DistributedCrawlerServer(http.server.HTTPServer):
LAST_CLIENT_VERSION = SERVER_PROTOCOL_VERSION = 1
LAST_PC_VERSION = 1
- KNOWN_CLIENT_VERSIONS = frozenset(range(1,LAST_CLIENT_VERSION+1))
- KNOWN_PC_VERSIONS = frozenset(range(1,LAST_PC_VERSION+1))
+ KNOWN_CLIENT_VERSIONS = frozenset(range(1, LAST_CLIENT_VERSION + 1))
+ KNOWN_PC_VERSIONS = frozenset(range(1, LAST_PC_VERSION + 1))
MIN_ANALYSIS_PER_DOMAIN = 3
- SIGNATURE_BLACKLIST_TIMEOUT_S = 60*60*24*30*3 # 3 month
+ SIGNATURE_BLACKLIST_TIMEOUT_S = 60 * 60 * 24 * 30 * 3 # 3 month
- domains_to_check = [ "domain%04d.com" % (i) for i in range(50) ] # we generate random domains for simulation
- checked_domains = {} # this holds the results as ie: checked_domains["a-domain.com"] = (is_spam, number_of_clients_who_checked_this_domain)
+ domains_to_check = ["domain%04d.com" % (i) for i in range(50)] # we generate random domains for simulation
+ checked_domains = {} # this holds the results as ie: checked_domains["a-domain.com"] = (is_spam, number_of_clients_who_checked_this_domain)
- excluded_sigs = [] # list of temporarily exluded domainlist signatures to prevent client spamming
- excluded_sigs_time = [] # timestamps of the time when each signature has been excluded
+ excluded_sigs = [] # list of temporarily exluded domainlist signatures to prevent client spamming
+ excluded_sigs_time = [] # timestamps of the time when each signature has been excluded
- def __init__(self,port):
- super().__init__(("127.0.0.1",port),RequestHandler)
+ def __init__(self, port):
+ super().__init__(("127.0.0.1", port), RequestHandler)
def start(self):
logging.getLogger().info("DuckDuckGo distributed crawler server v%d started" % (__class__.SERVER_PROTOCOL_VERSION))
@@ -142,41 +142,40 @@ def do_GET(self):
try:
# parse request url & url parameters
parsed_url = urllib.parse.urlsplit(self.path)
- params = urllib.parse.parse_qs(parsed_url.query,keep_blank_values=False,strict_parsing=True)
+ params = urllib.parse.parse_qs(parsed_url.query, keep_blank_values=False, strict_parsing=True)
if parsed_url.path == "/upgrade":
# check query is well formed
- if "file" not in params or \
- not self.isSafeFilename(params["file"][0]): # we check for evil injection here
- raise InvalidRequestException(self.path,self.client_address[0],"Invalid query parameters")
+ if "file" not in params or not self.isSafeFilename(params["file"][0]): # we check for evil injection here
+ raise InvalidRequestException(self.path, self.client_address[0], "Invalid query parameters")
# serve file (might short-circuit that part with an Apache/Nginx URL rediretion directly to the static content)
upgrade_file = params["file"][0]
try:
- with open(upgrade_file,"rb") as file_handle:
+ with open(upgrade_file, "rb") as file_handle:
# send http headers
self.send_response(200)
- self.send_header("Content-Type", "application/zip")
- self.send_header("Content-Length", os.path.getsize(upgrade_file))
- self.send_header("Content-Disposition", "attachement; filename=%s" % (upgrade_file) )
+ self.send_header("Content-Type", "application/zip")
+ self.send_header("Content-Length", os.path.getsize(upgrade_file))
+ self.send_header("Content-Disposition", "attachement; filename=%s" % (upgrade_file))
self.end_headers()
# send file
self.wfile.write(file_handle.read())
except (IOError, OSError):
- raise InvalidRequestException(self.path,self.client_address[0],"Upgrade file '%s' does not exist or is not readable" % (upgrade_file))
+ raise InvalidRequestException(self.path, self.client_address[0], "Upgrade file '%s' does not exist or is not readable" % (upgrade_file))
elif parsed_url.path == "/domains":
# check query is well formed
if not self.validParams(params):
- raise InvalidRequestException(self.path,self.client_address[0],"Invalid query parameters")
+ raise InvalidRequestException(self.path, self.client_address[0], "Invalid query parameters")
# generate xml
- xml_response = str(XmlMessage(int(params["version"][0]),int(params["pc_version"][0])))
+ xml_response = str(XmlMessage(int(params["version"][0]), int(params["pc_version"][0])))
# prepare response
raw_response = xml_response.encode("utf-8")
if "accept-encoding" in self.headers:
- supported_compressions = frozenset(map(lambda x: x.strip(),self.headers["accept-encoding"].split(",")))
+ supported_compressions = frozenset(map(lambda x: x.strip(), self.headers["accept-encoding"].split(",")))
else:
supported_compressions = frozenset()
if "gzip" in supported_compressions:
@@ -205,7 +204,7 @@ def do_GET(self):
else:
# buggy client, crawler, or someone else we don't care about...
- raise InvalidRequestException(self.path,self.client_address[0],"URL not found",404)
+ raise InvalidRequestException(self.path, self.client_address[0], "URL not found", 404)
except InvalidRequestException as e:
logging.getLogger().warning(e)
@@ -223,11 +222,11 @@ def do_POST(self):
if parsed_url.path == "/domains":
# parse url parameters
- params = urllib.parse.parse_qs(parsed_url.query,keep_blank_values=False,strict_parsing=True)
+ params = urllib.parse.parse_qs(parsed_url.query, keep_blank_values=False, strict_parsing=True)
# check query is well formed
if not self.validParams(params):
- raise InvalidRequestException(self.path,self.client_address[0],"Invalid query parameters")
+ raise InvalidRequestException(self.path, self.client_address[0], "Invalid query parameters")
# TODO do version check of the client to decide to ignore it or not
@@ -238,10 +237,10 @@ def do_POST(self):
# check domainlist signature
xml_domainlist = xml_post_data.find("domainlist")
if xml_domainlist is None:
- raise MalformedXmlException(self.path,self.client_address[0])
+ raise MalformedXmlException(self.path, self.client_address[0])
domainlist_sig = XmlMessage.getXmlDomainListSig(xml_domainlist)
if xml_domainlist.get("sig") != domainlist_sig[1]:
- raise PotentiallyMaliciousRequestException(self.path,self.client_address[0],"Invalid signature for domainlist")
+ raise PotentiallyMaliciousRequestException(self.path, self.client_address[0], "Invalid signature for domainlist")
# remove outdated exluded signatures
current_time = int(time.time())
@@ -256,34 +255,34 @@ def do_POST(self):
# sig not in blacklist, all good
pass
else:
- if len(DistributedCrawlerServer.domains_to_check) >= XmlMessage.MAX_DOMAIN_LIST_SIZE: # without this the server will exclude all analysis when there is only a few domains left
+ if len(DistributedCrawlerServer.domains_to_check) >= XmlMessage.MAX_DOMAIN_LIST_SIZE: # without this the server will exclude all analysis when there is only a few domains left
# blacklist the signature for another SIGNATURE_BLACKLIST_TIMEOUT_S
del DistributedCrawlerServer.excluded_sigs[index]
del DistributedCrawlerServer.excluded_sigs_time[index]
DistributedCrawlerServer.excluded_sigs.append(domainlist_sig[0])
DistributedCrawlerServer.excluded_sigs_time.append(current_time)
- raise PotentiallyMaliciousRequestException(self.path,self.client_address[0],"Client is spamming an already sent domainlist")
+ raise PotentiallyMaliciousRequestException(self.path, self.client_address[0], "Client is spamming an already sent domainlist")
# update exluded signature list
- DistributedCrawlerServer.excluded_sigs.append(domainlist_sig[0]) # we store the signature in its binary form for space efficiency (the list will grow huge)
+ DistributedCrawlerServer.excluded_sigs.append(domainlist_sig[0]) # we store the signature in its binary form for space efficiency (the list will grow huge)
DistributedCrawlerServer.excluded_sigs_time.append(current_time)
# read domain analysis results
for xml_domain in xml_post_data.iterfind("domainlist/domain"):
domain = xml_domain.get("name")
if xml_domain.get("failed") == "1":
- logging.getLogger().warning("Client failed to check domain '%s'" % (domain) )
+ logging.getLogger().warning("Client failed to check domain '%s'" % (domain))
# TODO exclude domain if too many clients have fail too check it?
continue
- logging.getLogger().debug("Got client analysis for domain '%s'" % (domain) )
+ logging.getLogger().debug("Got client analysis for domain '%s'" % (domain))
is_spam = (xml_domain.get("spam") == "1")
if domain in DistributedCrawlerServer.checked_domains:
# this domain has already been checked by at least another client
previous_is_spam = DistributedCrawlerServer.checked_domains[domain][0]
- analysis_count = DistributedCrawlerServer.checked_domains[domain][1] +1
+ analysis_count = DistributedCrawlerServer.checked_domains[domain][1] + 1
if (previous_is_spam != is_spam) and (analysis_count > 1):
# differents clients gave different analysis, reset analysis count
- logging.getLogger().warning("Conflicting client analysis for domain '%s'" % (domain) )
+ logging.getLogger().warning("Conflicting client analysis for domain '%s'" % (domain))
analysis_count = 0
else:
if analysis_count >= DistributedCrawlerServer.MIN_ANALYSIS_PER_DOMAIN:
@@ -294,10 +293,10 @@ def do_POST(self):
# ValueError is thrown if the domain is not in the list which can happen if another client has already sent the MIN_ANALYSIS_PER_DOMAIN'th analysis
# => we dont't care
pass
- logging.getLogger().debug("Domain '%s' has has been checked %d times, is_spam=%s" % (domain, analysis_count, is_spam) )
+ logging.getLogger().debug("Domain '%s' has has been checked %d times, is_spam=%s" % (domain, analysis_count, is_spam))
else:
analysis_count = 1
- logging.getLogger().debug("Domain '%s' is checked for the first time, is_spam=%s" % (domain, is_spam) )
+ logging.getLogger().debug("Domain '%s' is checked for the first time, is_spam=%s" % (domain, is_spam))
DistributedCrawlerServer.checked_domains[domain] = (is_spam, analysis_count)
# thanks buddy client!
@@ -308,7 +307,7 @@ def do_POST(self):
else:
# buggy client, crawler, or someone else we don't care about...
- raise InvalidRequestException(self.path,self.client_address[0],"URL not found",404)
+ raise InvalidRequestException(self.path, self.client_address[0], "URL not found", 404)
except (MalformedXmlException, PotentiallyMaliciousRequestException, InvalidRequestException) as e:
logging.getLogger().warning(e)
@@ -319,12 +318,12 @@ def do_POST(self):
self.send_error(500)
raise
- def log_message(self,format,*args):
+ def log_message(self, format, *args):
# circumvent base HTTP logging and use our custom logger via the logging module (see /usr/local/lib/python3.2/http/server.py in a standard Unix Python3.2 install)
#super().log_message(format,*args)
- logging.getLogger().info("%s - - [%s] %s" % (self.address_string(),self.log_date_time_string(),format%args) )
+ logging.getLogger().info("%s - - [%s] %s" % (self.address_string(), self.log_date_time_string(), format % args))
- def isSafeFilename(self,filename):
+ def isSafeFilename(self, filename):
# ensure a filename has the form XXX.XX, with no slashes, double dots, etc. to protect from injection
safe_chars = frozenset(string.ascii_letters + string.digits + "-")
components = filename.split(".")
@@ -336,13 +335,13 @@ def isSafeFilename(self,filename):
return False
return True
- def validParams(self,params):
+ def validParams(self, params):
if "version" not in params or "pc_version" not in params:
return False
try:
v, v_pc = int(params["version"][0]), int(params["pc_version"][0])
except ValueError:
- return False # integer conversion failed
+ return False # integer conversion failed
return (v in DistributedCrawlerServer.KNOWN_CLIENT_VERSIONS) and (v_pc in DistributedCrawlerServer.KNOWN_PC_VERSIONS)
@@ -360,7 +359,7 @@ def validParams(self,params):
cli_parser.add_argument("-v",
"--verbosity",
action="store",
- choices=("quiet","warning","info","debug"),
+ choices=("quiet", "warning", "info", "debug"),
default="info",
dest="verbosity",
help="Level of output to diplay")
@@ -370,7 +369,7 @@ def validParams(self,params):
logging.basicConfig(format="%(message)s")
logger = logging.getLogger()
if options.verbosity == "quiet":
- logger.setLevel(logging.CRITICAL+1)
+ logger.setLevel(logging.CRITICAL + 1)
elif options.verbosity == "warning":
logger.setLevel(logging.WARNING)
elif options.verbosity == "info":

0 comments on commit 2093e1d

Please sign in to comment.