Skip to content

Commit

Permalink
Cache stats in Redis while jobs are running (#28)
Browse files Browse the repository at this point in the history
* Add RedisStatsCollector
* Add redis to requirements
* Require redis in setup.py
* Move job update to RUNNING to RedisStatsCollector
  • Loading branch information
mgonnav committed Jul 11, 2023
1 parent f166d1f commit 25421fb
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 46 deletions.
111 changes: 70 additions & 41 deletions estela_scrapy/extensions.py
Original file line number Diff line number Diff line change
@@ -1,62 +1,46 @@
import json
import os
from datetime import timedelta
from datetime import datetime

import requests
import redis
from scrapy import signals
from scrapy.exceptions import NotConfigured
from scrapy.exporters import PythonItemExporter
from twisted.internet import task

from estela_scrapy.utils import json_serializer, producer

from .utils import json_serializer, update_job

RUNNING_STATUS = "RUNNING"
COMPLETED_STATUS = "COMPLETED"


class ItemStorageExtension:
def __init__(self, stats):
class BaseExtension:
def __init__(self, stats, *args, **kwargs):
self.stats = stats
exporter_kwargs = {"binary": False}
self.exporter = PythonItemExporter(**exporter_kwargs)
self.auth_token = os.getenv("ESTELA_AUTH_TOKEN")
job = os.getenv("ESTELA_SPIDER_JOB")
host = os.getenv("ESTELA_API_HOST")
self.auth_token = os.getenv("ESTELA_AUTH_TOKEN")
self.job_jid, spider_sid, project_pid = job.split(".")
self.job_url = "{}/api/projects/{}/spiders/{}/jobs/{}".format(
host, project_pid, spider_sid, self.job_jid
)

def spider_opened(self, spider):
self.update_job(status=RUNNING_STATUS)

def update_job(
self,
status,
lifespan=timedelta(seconds=0),
total_bytes=0,
item_count=0,
request_count=0,
):
requests.patch(
self.job_url,
data={
"status": status,
"lifespan": lifespan,
"total_response_bytes": total_bytes,
"item_count": item_count,
"request_count": request_count,
},
headers={"Authorization": "Token {}".format(self.auth_token)},
)

class ItemStorageExtension(BaseExtension):
def __init__(self, stats):
super().__init__(stats)
exporter_kwargs = {"binary": False}
self.exporter = PythonItemExporter(**exporter_kwargs)

@classmethod
def from_crawler(cls, crawler):
ext = cls(crawler.stats)
crawler.signals.connect(ext.item_scraped, signals.item_scraped)
crawler.signals.connect(ext.spider_opened, signals.spider_opened)
crawler.signals.connect(ext.spider_closed, signals.spider_closed)
return ext

def item_scraped(self, item):
def item_scraped(self, item, spider):
item = self.exporter.export_item(item)
data = {
"jid": os.getenv("ESTELA_COLLECTION"),
Expand All @@ -65,19 +49,64 @@ def item_scraped(self, item):
}
producer.send("job_items", data)


class RedisStatsCollector(BaseExtension):
def __init__(self, stats):
super().__init__(stats)

redis_url = os.getenv("REDIS_URL")
if not redis_url:
raise NotConfigured("REDIS_URL not found in the settings")
self.redis_conn = redis.from_url(redis_url)

self.stats_key = os.getenv("REDIS_STATS_KEY")
self.interval = float(os.getenv("REDIS_STATS_INTERVAL"))

@classmethod
def from_crawler(cls, crawler):
ext = cls(crawler.stats)

crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)

return ext

def spider_opened(self, spider):
update_job(self.job_url, self.auth_token, status=RUNNING_STATUS)
self.task = task.LoopingCall(self.store_stats, spider)
self.task.start(self.interval)

def spider_closed(self, spider, reason):
spider_stats = self.stats.get_stats()
self.update_job(
if self.task.running:
self.task.stop()

try:
self.redis_conn.delete(self.stats_key)
except Exception:
pass

stats = self.stats.get_stats()
update_job(
self.job_url,
self.auth_token,
status=COMPLETED_STATUS,
lifespan=spider_stats.get("elapsed_time_seconds", 0),
total_bytes=spider_stats.get("downloader/response_bytes", 0),
item_count=spider_stats.get("item_scraped_count", 0),
request_count=spider_stats.get("downloader/request_count", 0),
lifespan=int(stats.get("elapsed_time_seconds", 0)),
total_bytes=stats.get("downloader/response_bytes", 0),
item_count=stats.get("item_scraped_count", 0),
request_count=stats.get("downloader/request_count", 0),
)

parser_stats = json.dumps(spider_stats, default=json_serializer)
parsed_stats = json.dumps(stats, default=json_serializer)
data = {
"jid": os.getenv("ESTELA_SPIDER_JOB"),
"payload": json.loads(parser_stats),
"payload": json.loads(parsed_stats),
}
producer.send("job_stats", data)
producer.send("job_stats", value=data)

def store_stats(self, spider):
stats = self.stats.get_stats()
elapsed_time = int((datetime.now() - stats.get("start_time")).total_seconds())
stats.update({"elapsed_time_seconds": elapsed_time})

parsed_stats = json.dumps(stats, default=json_serializer)
self.redis_conn.hmset(self.stats_key, json.loads(parsed_stats))
1 change: 0 additions & 1 deletion estela_scrapy/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

from estela_scrapy.utils import producer, to_standard_str


_stderr = sys.stderr


Expand Down
3 changes: 2 additions & 1 deletion estela_scrapy/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ def load_default_settings(settings):
}
spider_middlewares = {}
extensions = {
"estela_scrapy.extensions.ItemStorageExtension": 1000,
"estela_scrapy.extensions.ItemStorageExtension": 999,
"estela_scrapy.extensions.RedisStatsCollector": 1000,
}
settings.get("DOWNLOADER_MIDDLEWARES_BASE").update(downloader_middlewares)
settings.get("EXTENSIONS_BASE").update(extensions)
Expand Down
25 changes: 24 additions & 1 deletion estela_scrapy/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import date, datetime
from datetime import date, datetime, timedelta

import requests
from estela_queue_adapter import get_producer_interface


Expand All @@ -26,4 +27,26 @@ def to_standard_str(text, encoding="utf-8", errors="strict"):
return text.decode(encoding, errors)


def update_job(
job_url,
auth_token,
status,
lifespan=timedelta(seconds=0),
total_bytes=0,
item_count=0,
request_count=0,
):
requests.patch(
job_url,
data={
"status": status,
"lifespan": lifespan,
"total_response_bytes": total_bytes,
"item_count": item_count,
"request_count": request_count,
},
headers={"Authorization": "Token {}".format(auth_token)},
)


producer = get_producer_interface()
1 change: 1 addition & 0 deletions requirements.in
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
Scrapy>=1.0
requests
black
redis
pytest
pytest-env
git+https://github.com/bitmakerla/estela-queue-adapter.git
8 changes: 6 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#
# This file is autogenerated by pip-compile with python 3.9
# To update, run:
# This file is autogenerated by pip-compile with Python 3.9
# by the following command:
#
# pip-compile requirements.in
#
appdirs==1.4.4
# via black
async-timeout==4.0.2
# via redis
attrs==21.2.0
# via
# automat
Expand Down Expand Up @@ -109,6 +111,8 @@ pytest-env==0.8.1
# via -r requirements.in
queuelib==1.6.1
# via scrapy
redis==4.6.0
# via -r requirements.in
regex==2021.7.6
# via black
requests==2.26.0
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
install_requires=[
"Scrapy>=1.0",
"requests",
"redis",
"estela-queue-adapter @ git+https://github.com/bitmakerla/estela-queue-adapter.git"
],
entry_points={
Expand Down

0 comments on commit 25421fb

Please sign in to comment.