Skip to content

Commit

Permalink
fix scrapy#4250: add batch deliveries
Browse files Browse the repository at this point in the history
  • Loading branch information
BroodingKangaroo committed Apr 10, 2020
1 parent 49357cc commit c5e89dc
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 14 deletions.
54 changes: 40 additions & 14 deletions scrapy/extensions/feedexport.py
Expand Up @@ -241,6 +241,7 @@ def __init__(self, crawler):

self.storages = self._load_components('FEED_STORAGES')
self.exporters = self._load_components('FEED_EXPORTERS')
self.storage_batch = self.settings.getint('FEED_STORAGE_BATCH')
for uri, feed in self.feeds.items():
if not self._storage_supported(uri):
raise NotConfigured
Expand All @@ -250,19 +251,7 @@ def __init__(self, crawler):
def open_spider(self, spider):
for uri, feed in self.feeds.items():
uri = uri % self._get_uri_params(spider, feed['uri_params'])
storage = self._get_storage(uri)
file = storage.open(spider)
exporter = self._get_exporter(
file=file,
format=feed['format'],
fields_to_export=feed['fields'],
encoding=feed['encoding'],
indent=feed['indent'],
)
slot = _FeedSlot(file, exporter, storage, uri, feed['format'], feed['store_empty'])
self.slots.append(slot)
if slot.store_empty:
slot.start_exporting()
self.slots.append(self._start_new_batch(None, uri, feed, spider))

def close_spider(self, spider):
deferred_list = []
Expand All @@ -285,11 +274,48 @@ def close_spider(self, spider):
deferred_list.append(d)
return defer.DeferredList(deferred_list) if deferred_list else None

def _start_new_batch(self, previous_batch_slot, uri, feed, spider):
"""
Redirect the output data stream to a new file.
Execute multiple times if 'FEED_STORAGE_BATCH' setting is greater than zero.
"""
if previous_batch_slot is not None:
previous_batch_slot.exporter.finish_exporting()
previous_batch_slot.storage.store(previous_batch_slot.file)
storage = self._get_storage(uri)
file = storage.open(spider)
exporter = self._get_exporter(
file=file,
format=feed['format'],
fields_to_export=feed['fields'],
encoding=feed['encoding'],
indent=feed['indent']
)
slot = _FeedSlot(file, exporter, storage, uri, feed['format'], feed['store_empty'])
if slot.store_empty:
slot.start_exporting()
return slot

def _get_uri_of_partial(self, slot, feed, spider):
"""Get uri for each partial using datetime.now().isoformat()"""
uri = (slot.uri % self._get_uri_params(spider, feed['uri_params'])).split('.')[0] + '.'
uri = uri + datetime.now().isoformat() + '.' + feed['format']
return uri

def item_scraped(self, item, spider):
for slot in self.slots:
slots = []
for idx, slot in enumerate(self.slots):
slot.start_exporting()
slot.exporter.export_item(item)
slot.itemcount += 1
if self.storage_batch and slot.itemcount % self.storage_batch == 0:
uri = self._get_uri_of_partial(slot, self.feeds[slot.uri], spider)
slots.append(self._start_new_batch(slot, uri, self.feeds[slot.uri], spider))
self.feeds[uri] = self.feeds[slot.uri]
self.feeds.pop(slot.uri)
self.slots[idx] = None
self.slots = [slot for slot in self.slots if slot is not None]
self.slots.extend(slots)

def _load_components(self, setting_prefix):
conf = without_none_values(self.settings.getwithbase(setting_prefix))
Expand Down
1 change: 1 addition & 0 deletions scrapy/settings/default_settings.py
Expand Up @@ -146,6 +146,7 @@
's3': 'scrapy.extensions.feedexport.S3FeedStorage',
'ftp': 'scrapy.extensions.feedexport.FTPFeedStorage',
}
FEED_STORAGE_BATCH = 0
FEED_EXPORTERS = {}
FEED_EXPORTERS_BASE = {
'json': 'scrapy.exporters.JsonItemExporter',
Expand Down

0 comments on commit c5e89dc

Please sign in to comment.