Permalink
Browse files

Added domain list and spam analysis XML communication

  • Loading branch information...
1 parent adfdda8 commit ff9441b846f92bee207ba41e8a2e70a8ec388d5b @desbma desbma committed Jan 9, 2012
Showing with 150 additions and 20 deletions.
  1. +1 −1 README.md
  2. +53 −11 ddc_client.py
  3. +1 −1 ddc_process.py
  4. +90 −3 ddc_server.py
  5. +5 −4 test.sh → test_1client.sh
View
@@ -34,7 +34,7 @@ When a client has done it's job, it sends the server the same 'domainlist' node
* ddc_client.py : Code for a crawling worker
* ddc_process.py : This file contains the code that simulates the binary component, currently it returns dumb results just to simulate
* ddc_server.py : Code for the server that distributes the crawling work to the clients and gets the result from them
- * test.sh : Bash script to do a small simulation by lanching the server and connecting a client to it
+ * test_1client.sh : Bash script to do a small simulation by lanching the server and connecting a client to it
## Dependencies
View
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
-import argparse, logging, urllib.parse
+import argparse, logging, time, urllib.parse, xml.etree.ElementTree
import httplib2
import ddc_process
@@ -10,24 +10,66 @@ class DistributedCrawlerClient():
PROTOCOL_VERSION = 1
PROCESSOR_COMPONENT_VERSION = 1
- http_client = httplib2.Http(timeout=10,disable_ssl_certificate_validation=True)
+ http_client = httplib2.Http(timeout=10)
def __init__(self,server,port):
self.base_url = "http://%s:%d/rest" % (server,port)
def start(self,):
- # see README.md for params description
- response = self.request({ 'action' : 'getdomains',
- 'version' : str(self.PROTOCOL_VERSION),
- 'pc_version' : str(self.PROCESSOR_COMPONENT_VERSION) }).decode("utf-8")
- print(response)
+ while True:
+ # see README.md for params description
+ response = self.request({ "action" : "getdomains",
+ "version" : str(self.PROTOCOL_VERSION),
+ "pc_version" : str(self.PROCESSOR_COMPONENT_VERSION) }).decode("utf-8")
- def request(self,params):
+ # read response
+ xml_response = xml.etree.ElementTree.fromstring(response)
+ xml_domains = xml_response.findall("domainlist/domain")
+ domain_count = len(xml_domains)
+
+ # TODO look for an upgrade node and act accordingly
+
+ # if the server has no work for us, take a nap
+ if not domain_count:
+ logging.getLogger().info("Got no domains to check from server, sleeping for 30s...")
+ time.sleep(30)
+ continue
+
+ # check domains
+ logging.getLogger().info("Got %d domains to check from server" % (domain_count) )
+ domains_state = [ False for i in range(domain_count) ]
+ for (i, xml_domain) in enumerate(xml_domains):
+ domain = xml_domain.get("name")
+ logging.getLogger().debug("Checking domain '%s'" % (domain) )
+ domains_state[i] = ddc_process.is_spam(domain)
+ # TODO should add a special XML attribute for when a domain check fails (network, etc.)
+
+ # prepare POST request
+ # TODO we could lazily reuse the previous XML tree
+ xml_root = xml.etree.ElementTree.Element("ddc")
+ xml_domain_list = xml.etree.ElementTree.SubElement(xml_root,"domainlist")
+ for (xml_domain, is_spam) in zip(xml_domains,domains_state):
+ xml.etree.ElementTree.SubElement(xml_domain_list,"domain",attrib={"name" : xml_domain.get("name"),
+ "spam" : str(int(is_spam)) })
+
+ # send POST request
+ post_data = xml.etree.ElementTree.tostring(xml_root)
+ self.request({ "action" : "senddomainsdata",
+ "version" : str(self.PROTOCOL_VERSION),
+ "pc_version" : str(self.PROCESSOR_COMPONENT_VERSION) },
+ True,
+ post_data) # we don't care for what the server actually returns here
+
+ def request(self,url_params,post_request=False,post_data=None):
# construct url
- url = "%s?%s" % (self.base_url,urllib.parse.urlencode(params))
+ url = "%s?%s" % (self.base_url,urllib.parse.urlencode(url_params))
# send request
- logging.getLogger().debug("Fetching '%s' ..." % (url) )
- response, content = self.http_client.request(url)
+ if post_request:
+ logging.getLogger().info("Posting data to '%s'" % (url) )
+ response, content = self.http_client.request(url,"POST",post_data)
+ else:
+ logging.getLogger().info("Fetching '%s'" % (url) )
+ response, content = self.http_client.request(url)
return content
View
@@ -3,4 +3,4 @@
def is_spam(domain):
- return len(strip(domain))%2 > 0
+ return len(domain.strip())%2 > 0
View
@@ -1,15 +1,50 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
-import argparse, gzip, http.server, logging, urllib.parse, xml.etree.ElementTree, zlib
+import argparse, gzip, http.server, logging, random, string, urllib.parse, xml.etree.ElementTree, zlib
class XmlMessage:
+ MAX_DOMAIN_LIST_SIZE = 5
+
def __init__(self,protocol_version,page_processor_version):
self.xml = xml.etree.ElementTree.Element("ddc")
+
# TODO generate upgrade nodes
- # TODO generate domain list nodes
+
+ # generate domain list nodes
+ xml_domain_list = xml.etree.ElementTree.SubElement(self.xml,"domainlist")
+ domains_to_send_count = min(len(DistributedCrawlerServer.unchecked_domains),self.MAX_DOMAIN_LIST_SIZE)
+ for i in range(domains_to_send_count):
+ domain_index = random.randint(0,len(DistributedCrawlerServer.unchecked_domains)-1) # pick a random domain in the list
+ domain = DistributedCrawlerServer.unchecked_domains[domain_index]
+ xml.etree.ElementTree.SubElement(xml_domain_list,"domain",attrib={"name":domain})
+ del DistributedCrawlerServer.unchecked_domains[domain_index]
+ DistributedCrawlerServer.pending_domains.append(domain)
+ logging.getLogger().debug("Picked unchecked domain %s to be checked" % (domain) )
+
+ # TODO enforce redundancy of spam checks: a domain must be checked by at least X clients
+
+ if (domains_to_send_count < self.MAX_DOMAIN_LIST_SIZE) and DistributedCrawlerServer.pending_domains:
+ # if, no more domains to check, (re)check pending domains (a client might not have responded)
+ additional_domains_to_send_count = min(len(DistributedCrawlerServer.pending_domains),self.MAX_DOMAIN_LIST_SIZE-domains_to_send_count)
+ for i in range(additional_domains_to_send_count):
+ domain = DistributedCrawlerServer.pending_domains[0] # this time we pick the first one because it's a queue
+ xml.etree.ElementTree.SubElement(xml_domain_list,"domain",attrib={"name":domain})
+ logging.getLogger().debug("Picked pending domain %s to be checked" % (domain) )
+ else:
+ additional_domains_to_send_count = 0
+
+ total_domains_to_send_count = domains_to_send_count + additional_domains_to_send_count
+ if total_domains_to_send_count:
+ logging.getLogger().debug("Picked %d domains to be checked" % (total_domains_to_send_count) )
+ else:
+ logging.getLogger().warning("No more domains to be checked")
+
+ # TODO add a key (custom cryptographic hash of the domain list), and check that key when the clients responds
+ # to be sure the client will not check different domains that the ones it has been sent.
+ # NOTE: the hash function needs to be hidden (closed source), and must change frequently so that it can not be guessed with a large number of hashed domain lists
def __str__(self):
return xml.etree.ElementTree.tostring(self.xml,"unicode")
@@ -19,6 +54,10 @@ class DistributedCrawlerServer(http.server.HTTPServer):
LAST_PROTOCOL_VERSION = 1
+ unchecked_domains = [ "domain%04d.com" % (i) for i in range(50) ] # we generate random domains for simulation
+ pending_domains = []
+ checked_domains = {} # this holds the results as ie: checked_domains["spam-domain.com"] = True
+
def __init__(self,port):
super().__init__(("127.0.0.1",port),RequestHandler)
@@ -51,6 +90,7 @@ def do_GET(self):
params["action"][0] != "getdomains" or \
"version" not in params or \
"pc_version" not in params:
+ logging.getLogger().warning("Invalid query parameters for URL '%s'" % (self.path) )
self.send_error(400)
else:
# generate xml
@@ -73,7 +113,7 @@ def do_GET(self):
else:
compression = "identity"
- # TODO add encryption
+ # TODO add encryption?
# send http headers
self.send_response(200)
@@ -96,6 +136,53 @@ def do_GET(self):
self.send_error(500)
raise
+ def do_POST(self):
+ try:
+ # parse request url
+ parsed_url = urllib.parse.urlsplit(self.path)
+
+ if parsed_url.path == "/rest":
+ # parse url parameters
+ params = urllib.parse.parse_qs(parsed_url.query,keep_blank_values=False,strict_parsing=True)
+
+ # check query is well formed
+ if "action" not in params or \
+ params["action"][0] != "senddomainsdata" or \
+ "version" not in params or \
+ "pc_version" not in params:
+ logging.getLogger().warning("Invalid query parameters for URL '%s'" % (self.path) )
+ self.send_error(400)
+ else:
+ # TODO do version check of the client to decide to ignore it or not
+
+ # read post data
+ post_data = self.rfile.read(int(self.headers["content-length"]))
+ xml_post_data = xml.etree.ElementTree.fromstring(post_data.decode("utf-8"))
+
+ # read domain analysis results
+ for xml_domain in xml_post_data.iterfind("domainlist/domain"):
+ domain = xml_domain.get("name")
+ logging.getLogger().debug("Got client analysis for domain '%s'" % (domain) )
+ if domain not in DistributedCrawlerServer.pending_domains:
+ # this domain has already been checked by another client
+ logging.getLogger().debug("Domain '%s' has already been checked, ignoring new analysis" % (domain) )
+ continue
+ is_spam = (xml_domain.get("spam") == "1")
+ DistributedCrawlerServer.checked_domains[domain] = is_spam
+ DistributedCrawlerServer.pending_domains.remove(domain)
+
+ # thanks buddy client!
+ self.send_response(204) # 204 is like 200 OK, but the client should expect no content
+ self.end_headers()
+
+ else:
+ # buggy client, crawler, or someone else we don't care about...
+ self.send_error(404)
+
+ except:
+ # boom!
+ self.send_error(500)
+ raise
if __name__ == "__main__":
@@ -4,15 +4,16 @@ cd "$(dirname -- $0)"
# start server
./ddc_server.py -p 10001 &
-server_pid=$?
+server_job_id=$?
# wait a bit to be sure the server is ready
sleep 1s
# start client
./ddc_client.py -s 127.0.0.1 -p 10001 &
-client_pid=$?
+client_job_id=$?
-# kill server after X s
+# kill both after X s
sleep 10s
-kill $server_pid
+kill $client_job_id
+kill $server_job_id

0 comments on commit ff9441b

Please sign in to comment.