Permalink
Browse files

Make autoupdate operational and testable + test scripts fixes + fixed…

… other bugs
  • Loading branch information...
desbma committed Jan 22, 2012
1 parent 493bbfb commit 2d1c9f29ff4b767e83502709b89953f31419661c
Showing with 172 additions and 85 deletions.
  1. +76 −48 ddc_client.py
  2. +40 −32 ddc_server.py
  3. +7 −5 test_1client.sh
  4. +49 −0 test_upgrade.sh
View
@@ -1,68 +1,96 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
-import argparse, logging, time, urllib.parse, xml.etree.ElementTree
+import argparse, logging, time, urllib.parse, xml.etree.ElementTree, zipfile
import httplib2
import ddc_process
+class NeedRestartException(Exception):
+ pass
+
+
class DistributedCrawlerClient():
CLIENT_VERSION = 1
http_client = httplib2.Http(timeout=10)
def __init__(self,server,port):
- self.base_url = "http://%s:%d/rest" % (server,port)
+ self.base_url = "http://%s:%d" % (server,port)
+ self.api_base_url = "%s/rest" % (self.base_url)
def start(self,):
logging.getLogger().info("DuckDuckGo distributed crawler client v%d started" % (__class__.CLIENT_VERSION))
-
- while True:
- # see README.md for params description
- response = self.request({ "action" : "getdomains",
- "version" : str(__class__.CLIENT_VERSION),
- "pc_version" : str(ddc_process.VERSION) }).decode("utf-8")
-
- # read response
- xml_response = xml.etree.ElementTree.fromstring(response)
- xml_domains = xml_response.findall("domainlist/domain")
- domain_count = len(xml_domains)
-
- # TODO look for an upgrade node and act accordingly
-
- # if the server has no work for us, take a nap
- if not domain_count:
- logging.getLogger().info("Got no domains to check from server, sleeping for 30s...")
- time.sleep(30)
- continue
-
- # check domains
- logging.getLogger().info("Got %d domains to check from server" % (domain_count) )
- domains_state = [ False for i in range(domain_count) ]
- for (i, xml_domain) in enumerate(xml_domains):
- domain = xml_domain.get("name")
- logging.getLogger().debug("Checking domain '%s'" % (domain) )
- domains_state[i] = ddc_process.is_spam(domain)
- # TODO should add a special XML attribute for when a domain check fails (network, etc.)
-
- # prepare POST request content
- xml_root = xml.etree.ElementTree.Element("ddc")
- xml_domain_list = xml_response.find("domainlist") # reuse the previous XML domain list
- for (xml_domain, is_spam) in zip(xml_domain_list.iterfind("domain"),domains_state):
- xml_domain.set("spam",str(int(is_spam)))
- xml_root.append(xml_domain_list)
-
- # send POST request
- post_data = xml.etree.ElementTree.tostring(xml_root)
- self.request({ "action" : "senddomainsdata",
- "version" : str(__class__.CLIENT_VERSION),
- "pc_version" : str(ddc_process.VERSION) },
- True,
- post_data) # we don't care for what the server actually returns here
-
- def request(self,url_params,post_request=False,post_data=None):
+ logging.getLogger().info("Page analysis component v%d loaded" % (ddc_process.VERSION))
+
+ try:
+ while True:
+ # see README.md for params description
+ response = self.api_request({ "action" : "getdomains",
+ "version" : str(__class__.CLIENT_VERSION),
+ "pc_version" : str(ddc_process.VERSION) }).decode("utf-8")
+
+ # read response
+ xml_response = xml.etree.ElementTree.fromstring(response)
+ xml_domains = xml_response.findall("domainlist/domain")
+ domain_count = len(xml_domains)
+
+ # upgrade components if necessary
+ need_restart = False
+ for xml_upgrade in xml_response.findall("upgrades/upgrade"):
+ type = xml_upgrade.get("type")
+ version = xml_upgrade.get("version")
+ logging.getLogger().info("Upgrading '%s' component to version %s" % (type,version) )
+ url = self.base_url + xml_upgrade.get("url")
+ response, content = __class__.http_client.request(url)
+ zip_filename = response["content-disposition"].split(";")[1].split("=")[1]
+ with open(zip_filename,"r+b") as file_handle:
+ file_handle.write(content)
+ archive = zipfile.ZipFile(file_handle,"r")
+ archive.extractall()
+ archive.close()
+ need_restart = True
+ if need_restart:
+ raise NeedRestartException()
+
+ # if the server has no work for us, take a nap
+ if not domain_count:
+ logging.getLogger().info("Got no domains to check from server, sleeping for 30s...")
+ time.sleep(30)
+ continue
+
+ # check domains
+ logging.getLogger().info("Got %d domains to check from server" % (domain_count) )
+ domains_state = [ False for i in range(domain_count) ]
+ for (i, xml_domain) in enumerate(xml_domains):
+ domain = xml_domain.get("name")
+ logging.getLogger().debug("Checking domain '%s'" % (domain) )
+ domains_state[i] = ddc_process.is_spam(domain)
+ # TODO should add a special XML attribute for when a domain check fails (network, etc.)
+
+ # prepare POST request content
+ xml_root = xml.etree.ElementTree.Element("ddc")
+ xml_domain_list = xml_response.find("domainlist") # reuse the previous XML domain list
+ for (xml_domain, is_spam) in zip(xml_domain_list.iterfind("domain"),domains_state):
+ xml_domain.set("spam",str(int(is_spam)))
+ xml_root.append(xml_domain_list)
+
+ # send POST request
+ post_data = xml.etree.ElementTree.tostring(xml_root)
+ self.api_request( { "action" : "senddomainsdata",
+ "version" : str(__class__.CLIENT_VERSION),
+ "pc_version" : str(ddc_process.VERSION) },
+ True,
+ post_data) # we don't care for what the server actually returns here
+
+ except NeedRestartException:
+ logging.getLogger().info("Restarting client")
+ exit(7)
+
+
+ def api_request(self,url_params,post_request=False,post_data=None):
# construct url
- url = "%s?%s" % (self.base_url,urllib.parse.urlencode(url_params))
+ url = "%s?%s" % (self.api_base_url,urllib.parse.urlencode(url_params))
# send request
if post_request:
logging.getLogger().info("Posting data to '%s'" % (url) )
View
@@ -2,7 +2,6 @@
# -*- coding: utf-8 -*-
import argparse, gzip, hashlib, http.server, logging, os.path, random, string, time, urllib.parse, xml.etree.ElementTree, zlib
-import ddc_process # just to get the version
class InvalidRequestException(Exception):
@@ -32,29 +31,35 @@ def __init__(self,client_version,page_processor_version):
# generate upgrade nodes
xml_upgrade = xml.etree.ElementTree.SubElement(self.xml,"upgrades")
+ need_upgrade = False
if client_version < DistributedCrawlerServer.LAST_CLIENT_VERSION:
# need to upgrade the client
- xml.etree.ElementTree.SubElement(xml_upgrade,"upgrade",attrib={"type" : "client",
- "url" : "/upgrade?file=client-v%d.zip" % (DistributedCrawlerServer.LAST_CLIENT_VERSION) })
- if page_processor_version < ddc_process.VERSION:
+ xml.etree.ElementTree.SubElement(xml_upgrade,"upgrade",attrib={"type" : "client",
+ "url" : "/upgrade?file=client-v%d.zip" % (DistributedCrawlerServer.LAST_CLIENT_VERSION) ,
+ "version" : str(DistributedCrawlerServer.LAST_CLIENT_VERSION) } )
+ need_upgrade = True
+ if page_processor_version < DistributedCrawlerServer.LAST_PC_VERSION:
# need to upgrade the page processing component
- xml.etree.ElementTree.SubElement(xml_upgrade,"upgrade",attrib={"type" : "client",
- "url" : "/upgrade?file=page-processor-v%d.zip" % (ddc_process.VERSION) })
-
- # generate domain list nodes
- xml_domain_list = xml.etree.ElementTree.SubElement(self.xml,"domainlist")
- domains_to_send_count = min(len(DistributedCrawlerServer.domains_to_check),__class__.MAX_DOMAIN_LIST_SIZE)
- for i in range(domains_to_send_count):
- domain = random.choice(DistributedCrawlerServer.domains_to_check) # pick a random domain in the list
- xml.etree.ElementTree.SubElement(xml_domain_list,"domain",attrib={"name":domain})
- logging.getLogger().debug("Picked domain %s to be checked" % (domain) )
-
- # add a signature, so we can detect a malicious client trying to send fake results for different domains
- sig = __class__.getXmlDomainListSig(xml_domain_list,as_bytes=False)[1]
- xml_domain_list.set("sig",sig)
-
- if not domains_to_send_count:
- logging.getLogger().warning("No more domains to be checked")
+ xml.etree.ElementTree.SubElement(xml_upgrade,"upgrade",attrib={"type" : "page analysis",
+ "url" : "/upgrade?file=page-processor-v%d.zip" % (DistributedCrawlerServer.LAST_PC_VERSION),
+ "version" : str(DistributedCrawlerServer.LAST_CLIENT_VERSION) } )
+ need_upgrade = True
+
+ if not need_upgrade:
+ # generate domain list nodes
+ xml_domain_list = xml.etree.ElementTree.SubElement(self.xml,"domainlist")
+ domains_to_send_count = min(len(DistributedCrawlerServer.domains_to_check),__class__.MAX_DOMAIN_LIST_SIZE)
+ for i in range(domains_to_send_count):
+ domain = random.choice(DistributedCrawlerServer.domains_to_check) # pick a random domain in the list
+ xml.etree.ElementTree.SubElement(xml_domain_list,"domain",attrib={"name":domain})
+ logging.getLogger().debug("Picked domain %s to be checked" % (domain) )
+
+ # add a signature, so we can detect a malicious client trying to send fake results for different domains
+ sig = __class__.getXmlDomainListSig(xml_domain_list,as_bytes=False)[1]
+ xml_domain_list.set("sig",sig)
+
+ if not domains_to_send_count:
+ logging.getLogger().warning("No more domains to be checked")
def __str__(self):
return xml.etree.ElementTree.tostring(self.xml,"unicode")
@@ -81,6 +86,7 @@ def getXmlDomainListSig(xml_domain_list,as_bytes=True,as_string=True):
class DistributedCrawlerServer(http.server.HTTPServer):
LAST_CLIENT_VERSION = SERVER_PROTOCOL_VERSION = 1
+ LAST_PC_VERSION = 1
MIN_ANALYSIS_PER_DOMAIN = 3
SIGNATURE_BLACKLIST_TIMEOUT_S = 60*60*24*30*3 # 3 month
@@ -114,21 +120,22 @@ def do_GET(self):
if parsed_url.path == "/upgrade":
# check query is well formed
if "file" not in params or \
- not self.isSafeFilename(params["files"][0]): # we check for evil injection here
+ not self.isSafeFilename(params["file"][0]): # we check for evil injection here
raise InvalidRequestException(self.path,self.client_address[0],"Invalid query parameters")
# serve file (might short-circuit that part with an Apache/Nginx URL rediretion directly to the static content)
- upgrade_file = params["files"][0]
+ upgrade_file = params["file"][0]
try:
with open(upgrade_file,"rb") as file_handle:
# send http headers
self.send_response(200)
- self.send_header("Content-Type", "application/zip")
- self.send_header("Content-Length", file_size)
+ self.send_header("Content-Type", "application/zip")
+ self.send_header("Content-Length", os.path.getsize(upgrade_file))
+ self.send_header("Content-Disposition", "attachement; filename=%s" % (upgrade_file) )
self.end_headers()
# send file
self.wfile.write(file_handle.read())
- except IOError:
+ except (IOError, OSError):
raise InvalidRequestException(self.path,self.client_address[0],"Upgrade file '%s' does not exist or is not readable" % (upgrade_file))
elif parsed_url.path == "/rest":
@@ -229,12 +236,13 @@ def do_POST(self):
# sig not in blacklist, all good
pass
else:
- # blacklist the signature for another SIGNATURE_BLACKLIST_TIMEOUT_S
- del DistributedCrawlerServer.excluded_sigs[index]
- del DistributedCrawlerServer.excluded_sigs_time[index]
- DistributedCrawlerServer.excluded_sigs.append(domainlist_sig[0])
- DistributedCrawlerServer.excluded_sigs_time.append(current_time)
- raise PotentiallyMaliciousRequestException(self.path,self.client_address[0],"Client is spamming an already sent domainlist",204)
+ if len(domains_to_check) >= XmlMessage.MAX_DOMAIN_LIST_SIZE: # without this the server will exclude all analysis when there is only a few domains left
+ # blacklist the signature for another SIGNATURE_BLACKLIST_TIMEOUT_S
+ del DistributedCrawlerServer.excluded_sigs[index]
+ del DistributedCrawlerServer.excluded_sigs_time[index]
+ DistributedCrawlerServer.excluded_sigs.append(domainlist_sig[0])
+ DistributedCrawlerServer.excluded_sigs_time.append(current_time)
+ raise PotentiallyMaliciousRequestException(self.path,self.client_address[0],"Client is spamming an already sent domainlist",204)
# update exluded signature list
DistributedCrawlerServer.excluded_sigs.append(domainlist_sig[0]) # we store the signature in its binary form for space efficiency (the list will grow huge)
View
@@ -1,19 +1,21 @@
#!/bin/bash
+KILL_AFTER=${1:-'10s'}
+
cd "$(dirname -- $0)"
# start server
./ddc_server.py -p 10001 &
-server_job_id=$?
+server_job_id=$!
# wait a bit to be sure the server is ready
sleep 1s
# start client
-./ddc_client.py -s 127.0.0.1 -p 10001 &
-client_job_id=$?
+./ddc_client.py -s 127.0.0.1 -p 10001
+client_job_id=$!
# kill both after X s
-sleep 10s
+sleep $KILL_AFTER
kill $client_job_id
-kill $server_job_id
+kill $server_job_id
View
@@ -0,0 +1,49 @@
+#!/bin/bash
+
+cd "$(dirname -- $0)"
+
+TMP_DIR=$(mktemp -d /tmp/$(basename -- $0).XXXXXXXXXX)
+
+# generate fake version change
+original_client_version=$(grep 'CLIENT_VERSION =' ddc_client.py | cut -d '=' -f 2)
+original_client_version=${original_client_version#*' '}
+original_page_processor_version=$(grep 'VERSION =' ddc_process.py | cut -d '=' -f 2)
+original_page_processor_version=${original_page_processor_version#*' '}
+new_client_version=$(( $original_client_version + 1 ))
+new_page_processor_version=$(( $original_page_processor_version + 1 ))
+sed -i "s/^\(.*LAST_CLIENT_VERSION = SERVER_PROTOCOL_VERSION = \).*$/\1$new_client_version/" ddc_server.py
+sed -i "s/^\(.*LAST_PC_VERSION = \).*$/\1$new_page_processor_version/" ddc_server.py
+
+# create upgrade archives
+cp ddc_client.py ddc_process.py $TMP_DIR/
+sed -i "s/^\(.*CLIENT_VERSION = \).*$/\1$new_client_version/" "$TMP_DIR/ddc_client.py"
+sed -i "s/^\(.*VERSION = \).*$/\1$new_page_processor_version/" "$TMP_DIR/ddc_process.py"
+zip -qj "client-v${new_client_version}.zip" "$TMP_DIR/ddc_client.py"
+zip -qj "page-processor-v${new_page_processor_version}.zip" "$TMP_DIR/ddc_process.py"
+
+# start server
+./ddc_server.py -p 10001 &
+server_job_id=$!
+
+# wait a bit to be sure the server is ready
+sleep 1s
+
+# start client
+./ddc_client.py -s 127.0.0.1 -p 10001
+./ddc_client.py -s 127.0.0.1 -p 10001 &
+client_job_id=$!
+
+# kill both after X s
+sleep 5s
+kill $client_job_id
+kill $server_job_id
+
+# restore original versions
+sed -i "s/^\(.*LAST_CLIENT_VERSION = SERVER_PROTOCOL_VERSION = \).*$/\1$original_client_version/" ddc_server.py
+sed -i "s/^\(.*LAST_PC_VERSION = \).*$/\1$original_page_processor_version/" ddc_server.py
+sed -i "s/^\(.*CLIENT_VERSION = \).*$/\1$original_client_version/" ddc_client.py
+sed -i "s/^\(.*VERSION = \).*$/\1$original_page_processor_version/" ddc_process.py
+
+# cleanup
+rm -R "$TMP_DIR"
+rm "client-v$new_client_version.zip" "page-processor-v$new_page_processor_version.zip"

0 comments on commit 2d1c9f2

Please sign in to comment.