Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

MUNCTIONAL!

  • Loading branch information...
commit 8d22c32408623becf84ffeedd59d609ef9c15510 1 parent 39f9764
Jacob Kaplan-Moss authored January 27, 2011
111  django_website/aggregator/management/commands/update_feeds.py
@@ -5,6 +5,8 @@
5 5
 import socket
6 6
 import sys
7 7
 import time
  8
+import threading
  9
+import Queue
8 10
 from django.core.management.base import BaseCommand
9 11
 from django_website.aggregator.models import Feed, FeedItem
10 12
 
@@ -14,60 +16,97 @@ class Command(BaseCommand):
14 16
     Universal Feed Parser (http://feedparser.org)
15 17
     """
16 18
     LOCKFILE = "/tmp/update_feeds.lock"
  19
+    
  20
+    option_list = BaseCommand.option_list + (
  21
+        optparse.make_option('-t', '--threads',
  22
+            metavar='NUM',
  23
+            type='int',
  24
+            default=4,
  25
+            help='Number of updater threads (default: 4).'
  26
+        ),
  27
+    )
17 28
 
18 29
     def handle(self, *args, **kwargs):
19 30
         try:
20 31
             lockfile = os.open(self.LOCKFILE, os.O_CREAT | os.O_EXCL)
21 32
         except OSError:
  33
+            print >> sys.stderr, "Lockfile exists (%s). Aborting." % self.LOCKFILE
22 34
             sys.exit(1)
23 35
 
24 36
         try:
25 37
             verbose = kwargs.get('verbosity')
26 38
             socket.setdefaulttimeout(15)
27  
-            self.update_feeds(verbose=verbose)
  39
+            self.update_feeds(verbose=verbose, num_threads=kwargs['threads'])
28 40
         except:
29 41
             sys.exit(1)
30 42
         finally:
31 43
             os.close(lockfile)
32 44
             os.unlink(self.LOCKFILE)
33 45
 
34  
-    def update_feeds(self, verbose=False):
35  
-        total = Feed.objects.filter(is_defunct=False).count()
36  
-        for count, feed in enumerate(Feed.objects.filter(is_defunct=False)):
37  
-            if verbose:
38  
-                print "%s (%d/%d)" % (feed, count+1, total)
39  
-            parsed_feed = feedparser.parse(feed.feed_url)
40  
-            for entry in parsed_feed.entries:
41  
-                title = entry.title.encode(parsed_feed.encoding, "xmlcharrefreplace")
42  
-                guid = entry.get("id", entry.link).encode(parsed_feed.encoding, "xmlcharrefreplace")
43  
-                link = entry.link.encode(parsed_feed.encoding, "xmlcharrefreplace")
  46
+    def update_feeds(self, verbose=False, num_threads=4):
  47
+        feed_queue = Queue.Queue()
  48
+        for feed in Feed.objects.filter(is_defunct=False):
  49
+            feed_queue.put(feed)
44 50
 
45  
-                if not guid:
46  
-                    guid = link
  51
+        threadpool = []
  52
+        for i in range(num_threads):
  53
+            threadpool.append(FeedUpdateWorker(feed_queue, verbose))
  54
+            
  55
+        [t.start() for t in threadpool]
  56
+        [t.join() for t in threadpool]
47 57
 
48  
-                if hasattr(entry, "summary"):
49  
-                    content = entry.summary
50  
-                elif hasattr(entry, "content"):
51  
-                    content = entry.content[0].value
52  
-                elif hasattr(entry, "description"):
53  
-                    content = entry.description
54  
-                else:
55  
-                    content = u""
56  
-                content = content.encode(parsed_feed.encoding, "xmlcharrefreplace")
  58
+class FeedUpdateWorker(threading.Thread):
  59
+    
  60
+    def __init__(self, q, verbose, **kwargs):
  61
+        super(FeedUpdateWorker, self).__init__(**kwargs)
  62
+        self.verbose = verbose
  63
+        self.q = q
  64
+        
  65
+    def run(self):
  66
+        while 1:
  67
+            try:
  68
+                feed = self.q.get_nowait()
  69
+            except Queue.Empty:
  70
+                return
  71
+            self.update_feed(feed)
  72
+            self.q.task_done()
  73
+            
  74
+    def update_feed(self, feed):
  75
+        if self.verbose:
  76
+            print feed
  77
+        
  78
+        parsed_feed = feedparser.parse(feed.feed_url)
  79
+        for entry in parsed_feed.entries:
  80
+            title = entry.title.encode(parsed_feed.encoding, "xmlcharrefreplace")
  81
+            guid = entry.get("id", entry.link).encode(parsed_feed.encoding, "xmlcharrefreplace")
  82
+            link = entry.link.encode(parsed_feed.encoding, "xmlcharrefreplace")
  83
+
  84
+            if not guid:
  85
+                guid = link
57 86
 
58  
-                try:
59  
-                    if entry.has_key('modified_parsed'):
60  
-                        date_modified = datetime.datetime.fromtimestamp(time.mktime(entry.modified_parsed))
61  
-                    elif parsed_feed.feed.has_key('modified_parsed'):
62  
-                        date_modified = datetime.datetime.fromtimestamp(time.mktime(parsed_feed.feed.modified_parsed))
63  
-                    elif parsed_feed.has_key('modified'):
64  
-                        date_modified = datetime.datetime.fromtimestamp(time.mktime(parsed_feed.modified))
65  
-                    else:
66  
-                        date_modified = datetime.datetime.now()
67  
-                except TypeError:
  87
+            if hasattr(entry, "summary"):
  88
+                content = entry.summary
  89
+            elif hasattr(entry, "content"):
  90
+                content = entry.content[0].value
  91
+            elif hasattr(entry, "description"):
  92
+                content = entry.description
  93
+            else:
  94
+                content = u""
  95
+            content = content.encode(parsed_feed.encoding, "xmlcharrefreplace")
  96
+
  97
+            try:
  98
+                if entry.has_key('modified_parsed'):
  99
+                    date_modified = datetime.datetime.fromtimestamp(time.mktime(entry.modified_parsed))
  100
+                elif parsed_feed.feed.has_key('modified_parsed'):
  101
+                    date_modified = datetime.datetime.fromtimestamp(time.mktime(parsed_feed.feed.modified_parsed))
  102
+                elif parsed_feed.has_key('modified'):
  103
+                    date_modified = datetime.datetime.fromtimestamp(time.mktime(parsed_feed.modified))
  104
+                else:
68 105
                     date_modified = datetime.datetime.now()
  106
+            except TypeError:
  107
+                date_modified = datetime.datetime.now()
69 108
 
70  
-                try:
71  
-                    feed.feeditem_set.get(guid=guid)
72  
-                except FeedItem.DoesNotExist:
73  
-                    feed.feeditem_set.create(title=title, link=link, summary=content, guid=guid, date_modified=date_modified)
  109
+            try:
  110
+                feed.feeditem_set.get(guid=guid)
  111
+            except FeedItem.DoesNotExist:
  112
+                feed.feeditem_set.create(title=title, link=link, summary=content, guid=guid, date_modified=date_modified)

0 notes on commit 8d22c32

Please sign in to comment.
Something went wrong with that request. Please try again.