Skip to content

Commit 5957952

Browse files
authored
Cache stats in Redis while jobs are running (#206)
* BITMAKER-1980: Save real-time stats in Redis (#144) * Retrieve job stats from Redis if the job is running * Catch Exception rather than PyMongo exceptions * Add unless-stopped restart policy to docker services * Save stats from Redis when job is manually stopped
1 parent 5d4c8dc commit 5957952

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+183
-90
lines changed

database_adapters/db_adapters.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import pymongo
44
from bson.objectid import ObjectId
5-
from pymongo.errors import ConnectionFailure, PyMongoError
5+
from pymongo.errors import ConnectionFailure
66

77

88
class InsertionResponse:
@@ -163,7 +163,7 @@ def insert_one_to_unique_collection(self, database_name, collection_name, item):
163163
item, {"$set": item}, upsert=True
164164
)
165165
response = InsertionResponse(True)
166-
except PyMongoError as ex:
166+
except Exception as ex:
167167
response = InsertionResponse(False, ex)
168168
finally:
169169
return response
@@ -173,7 +173,7 @@ def insert_one_to_collection(self, database_name, collection_name, item):
173173
try:
174174
self.client[database_name][collection_name].insert_one(item)
175175
response = InsertionResponse(True)
176-
except PyMongoError as ex:
176+
except Exception as ex:
177177
response = InsertionResponse(False, ex)
178178
finally:
179179
return response
@@ -187,7 +187,7 @@ def insert_many_to_collection(
187187
items, ordered=ordered
188188
)
189189
response = InsertionResponse(True)
190-
except PyMongoError as ex:
190+
except Exception as ex:
191191
response = InsertionResponse(False, ex, need_upsert=True)
192192
finally:
193193
return response

docs/estela/installation/helm-variables.md

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ All the queue platform variables should be written as children of the _<QUEUE\_P
8585
{: .highlight }
8686
> Refer to the [estela Queue Adapter](./../queueing.html#estela-queue-adapter) documentation to fill in any additional variables needed for the selected queue platform.
8787
88+
#### Redis Stats
89+
* _<REDIS\_URL>_ (Required): The connection URL to the Redis instance.
90+
* _<REDIS\_STATS\_INTERVAL>_ (Required): The interval, in seconds, of how often the job stats should be updated.
91+
8892
### estela API variables
8993

9094
#### Database
@@ -176,26 +180,26 @@ All the queue platform variables should be written as children of the _<QUEUE\_P
176180
to `"True"`.
177181

178182
* _<WORKER\_POOL>_ (Optional): Number of worker threads per consumer, it must be an integer.
179-
If the variable is left blank, the default value is 10.
183+
If the variable is left blank, the default value is `10`.
180184

181185
* _<HEARTBEAT\_TICK>_ (Optional): Number of seconds between heartbeat inspections, it must
182-
be an integer. If the variable is left blank, the default value is 300.
186+
be an integer. If the variable is left blank, the default value is `300`.
183187

184188
* _<QUEUE\_BASE\_TIMEOUT>_ (Optional): Minimum number of seconds a worker thread can wait
185189
for an item to be available in the internal item queue, it must be an integer. If the
186-
variable is left blank, the default value is 5.
190+
variable is left blank, the default value is `5`.
187191

188192
* _<QUEUE\_MAX\_TIMEOUT>_ (Optional): Maximum number of seconds a worker thread can wait
189193
for an item to be available in the internal item queue, it must be an integer. If the
190-
variable is left blank, the default value is 300.
194+
variable is left blank, the default value is `300`.
191195

192196
* _<BATCH\_SIZE\_THRESHOLD>_ (Optional): Size threshold in bytes of the data batch to be
193-
inserted, it must be an integer. If the variable is left blank, the default value is 4096.
197+
inserted, it must be an integer. If the variable is left blank, the default value is `4096`.
194198

195199
* _<INSERT\_TIME\_THRESHOLD>_ (Optional): Time threshold in seconds of the insertion of
196200
consecutive items belonging to the same batch of data, it must be an integer. If the
197-
variable is left blank, the default value is 5.
201+
variable is left blank, the default value is `5`.
198202

199203
* _<ACTIVITY\_TIME\_THRESHOLD>_ (Optional): Time threshold in seconds of the activity time
200204
of an Inserter object before being cleaned up, it must be an integer. If the variable is
201-
left blank, the default value is 600.
205+
left blank, the default value is `600`.

estela-api/api/filters.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from django_filters import rest_framework as filters
22

3-
from core.models import SpiderJob, SpiderCronJob
3+
from core.models import SpiderCronJob, SpiderJob
44

55

66
class SpiderJobFilter(filters.FilterSet):

estela-api/api/mixins.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from rest_framework.pagination import PageNumberPagination
55
from rest_framework.permissions import IsAuthenticated
66

7-
from api.permissions import IsProjectUser, IsAdminOrReadOnly
7+
from api.permissions import IsAdminOrReadOnly, IsProjectUser
88
from core.models import Notification, Activity
99

1010

estela-api/api/permissions.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
from rest_framework.permissions import BasePermission, SAFE_METHODS
2-
from core.models import Project, Permission
32
from django.contrib.auth.models import User
43

4+
from core.models import Project, Permission
5+
56

67
class IsProjectUser(BasePermission):
78
def has_permission(self, request, view):

estela-api/api/serializers/deploy.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
from rest_framework import serializers
22

3-
from core.models import Deploy, Spider
4-
from api.serializers.spider import SpiderSerializer
53
from api.serializers.project import UserDetailSerializer
4+
from api.serializers.spider import SpiderSerializer
5+
from core.models import Deploy, Spider
66

77

88
class DeploySerializer(serializers.ModelSerializer):

estela-api/api/serializers/job.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
from datetime import timedelta
2+
3+
from django.conf import settings
14
from rest_framework import serializers
25

36
from api import errors
@@ -6,6 +9,7 @@
69
SpiderJobEnvVarSerializer,
710
SpiderJobTagSerializer,
811
)
12+
from api.utils import update_stats_from_redis, delete_stats_from_redis
913
from config.job_manager import job_manager
1014
from core.models import (
1115
DataStatus,
@@ -157,6 +161,13 @@ def update(self, instance, validated_data):
157161
}
158162
)
159163
else:
164+
if instance.status == SpiderJob.RUNNING_STATUS:
165+
try:
166+
update_stats_from_redis(instance)
167+
delete_stats_from_redis(instance)
168+
except:
169+
pass
170+
instance.save()
160171
job_manager.delete_job(instance.name)
161172
instance.status = status
162173

estela-api/api/utils.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import redis
2+
3+
from django.conf import settings
4+
5+
from datetime import timedelta
6+
7+
8+
def update_stats_from_redis(job):
9+
redis_conn = redis.from_url(settings.REDIS_URL)
10+
job_stats = redis_conn.hgetall(f"scrapy_stats_{job.key}")
11+
job.lifespan = timedelta(
12+
seconds=int(float(job_stats.get(b"elapsed_time_seconds", b"0").decode()))
13+
)
14+
job.total_response_bytes = int(
15+
job_stats.get(b"downloader/response_bytes", b"0").decode()
16+
)
17+
job.item_count = int(job_stats.get(b"item_scraped_count", b"0").decode())
18+
job.request_count = int(job_stats.get(b"downloader/request_count", b"0").decode())
19+
20+
21+
def delete_stats_from_redis(job):
22+
redis_conn = redis.from_url(settings.REDIS_URL)
23+
try:
24+
redis_conn.delete(f"scrapy_stats_{job.key}")
25+
except:
26+
pass

estela-api/api/views/deploy.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,19 @@
11
from django.shortcuts import get_object_or_404
22
from drf_yasg.utils import swagger_auto_schema
3-
from rest_framework import viewsets, status
3+
from rest_framework import status, viewsets
4+
from rest_framework.exceptions import APIException, ParseError, PermissionDenied
45
from rest_framework.response import Response
5-
from rest_framework.exceptions import ParseError, APIException, PermissionDenied
66

7-
from api.mixins import BaseViewSet, ActionHandlerMixin
7+
from api import errors
8+
from api.mixins import ActionHandlerMixin, BaseViewSet
89
from api.serializers.deploy import (
9-
DeploySerializer,
1010
DeployCreateSerializer,
11+
DeploySerializer,
1112
DeployUpdateSerializer,
1213
)
13-
14-
from api import errors
14+
from config.job_manager import credentials
1515
from core.models import Deploy, Project
1616
from core.views import launch_deploy_job
17-
from config.job_manager import credentials
1817

1918

2019
class DeployViewSet(

estela-api/api/views/job.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,23 @@
1+
from datetime import timedelta
2+
13
from django.shortcuts import get_object_or_404
24
from django_filters.rest_framework import DjangoFilterBackend
3-
from drf_yasg.utils import swagger_auto_schema
45
from drf_yasg import openapi
6+
from drf_yasg.utils import swagger_auto_schema
57
from rest_framework import mixins, status
6-
from rest_framework.response import Response
78
from rest_framework.exceptions import ParseError
9+
from rest_framework.response import Response
810

911
from api.filters import SpiderJobFilter
1012
from api.mixins import BaseViewSet, ActionHandlerMixin
1113
from api.serializers.job import (
12-
SpiderJobSerializer,
1314
SpiderJobCreateSerializer,
15+
SpiderJobSerializer,
1416
SpiderJobUpdateSerializer,
1517
)
18+
from api.utils import update_stats_from_redis
1619
from config.job_manager import job_manager
17-
from core.models import DataStatus, Spider, SpiderJob, Project
20+
from core.models import DataStatus, Project, Spider, SpiderJob
1821

1922

2023
class SpiderJobViewSet(
@@ -169,3 +172,13 @@ def update(self, request, *args, **kwargs):
169172

170173
headers = self.get_success_headers(serializer.data)
171174
return Response(serializer.data, status=status.HTTP_200_OK, headers=headers)
175+
176+
def retrieve(self, request, *args, jid=None, **kwargs):
177+
job = get_object_or_404(self.queryset, jid=jid)
178+
if job.status == SpiderJob.RUNNING_STATUS:
179+
update_stats_from_redis(job)
180+
181+
serializer = SpiderJobSerializer(job)
182+
183+
headers = self.get_success_headers(serializer.data)
184+
return Response(serializer.data, status=status.HTTP_200_OK, headers=headers)

estela-api/api/views/job_data.py

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import redis
2+
13
from django.conf import settings
24
from drf_yasg import openapi
35
from drf_yasg.utils import swagger_auto_schema
@@ -25,6 +27,10 @@ class JobDataViewSet(
2527
DEFAULT_PAGINATION_SIZE = 50
2628
JOB_DATA_TYPES = ["items", "requests", "logs", "stats"]
2729

30+
def __init__(self, *args, **kwargs):
31+
super().__init__(*args, **kwargs)
32+
self.redis_conn = redis.from_url(settings.REDIS_URL)
33+
2834
def get_parameters(self, request):
2935
page = int(request.query_params.get("page", 1))
3036
data_type = request.query_params.get("type", "items")
@@ -98,10 +104,18 @@ def list(self, request, *args, **kwargs):
98104
count = spiderdata_db_client.get_estimated_document_count(
99105
kwargs["pid"], job_collection_name
100106
)
107+
101108
if data_type == "stats":
102-
result = spiderdata_db_client.get_job_stats(
103-
kwargs["pid"], job_collection_name
104-
)
109+
if job.status == SpiderJob.RUNNING_STATUS:
110+
job_stats = self.redis_conn.hgetall(f"scrapy_stats_{job.key}")
111+
parsed_job_stats = {
112+
key.decode(): value.decode() for key, value in job_stats.items()
113+
}
114+
result = [parsed_job_stats]
115+
else:
116+
result = spiderdata_db_client.get_job_stats(
117+
kwargs["pid"], job_collection_name
118+
)
105119
elif request.META["HTTP_USER_AGENT"].startswith("estela-cli/"):
106120
chunk_size = max(
107121
1,
@@ -223,19 +237,10 @@ def delete(self, request, *args, **kwargs):
223237
data_type = request.query_params.get("type")
224238
if not spiderdata_db_client.get_connection():
225239
raise DataBaseError({"error": errors.UNABLE_CONNECT_DB})
226-
if (
227-
job.cronjob is not None
228-
and job.cronjob.unique_collection
229-
and data_type == "items"
230-
):
231-
job_collection_name = "{}-scj{}-job_{}".format(
232-
kwargs["sid"], job.cronjob.cjid, data_type
233-
)
234-
else:
235-
job_collection_name = "{}-{}-job_{}".format(
236-
kwargs["sid"], kwargs["jid"], data_type
237-
)
238240

241+
job_collection_name = self.get_collection_name(
242+
job, data_type, kwargs["sid"], kwargs["jid"]
243+
)
239244
count = spiderdata_db_client.delete_collection_data(
240245
kwargs["pid"], job_collection_name
241246
)

estela-api/api/views/project.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from datetime import datetime, timedelta
2-
from django.shortcuts import get_object_or_404
32

43
from django.core.paginator import Paginator
4+
from django.shortcuts import get_object_or_404
55
from drf_yasg import openapi
66
from drf_yasg.utils import swagger_auto_schema
77
from rest_framework import status, viewsets
@@ -13,6 +13,7 @@
1313
from api.mixins import BaseViewSet, ActionHandlerMixin
1414
from api.serializers.job import ProjectJobSerializer, SpiderJobSerializer
1515
from api.serializers.cronjob import ProjectCronJobSerializer, SpiderCronJobSerializer
16+
from api.serializers.job import ProjectJobSerializer, SpiderJobSerializer
1617
from api.serializers.project import (
1718
ProjectSerializer,
1819
ProjectUpdateSerializer,

estela-api/build_project/build.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
1-
import os
2-
import sys
3-
import json
4-
import docker
51
import base64
6-
import requests
2+
import json
73
import logging
8-
4+
import os
5+
import sys
96
from zipfile import ZipFile
107

8+
import docker
9+
import requests
1110
from django.conf import settings
1211

13-
1412
sys.path.append("/home/estela/estela-api")
1513
os.environ["DJANGO_SETTINGS_MODULE"] = "config.settings.base"
1614

estela-api/config/job_manager.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
from engines.config import JobManager
2-
from credentials.config import Credentials
3-
from django.conf import settings
41
from database_adapters.db_adapters import get_database_interface
2+
from django.conf import settings
3+
4+
from credentials.config import Credentials
5+
from engines.config import JobManager
56

67
credentials = Credentials(plataform=settings.CREDENTIALS)
78

estela-api/config/settings/base.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@
1010
https://docs.djangoproject.com/en/3.1/ref/settings/
1111
"""
1212

13+
from pathlib import Path
1314
from urllib.parse import urlparse
15+
1416
import environ
15-
from pathlib import Path
1617

1718
from estela_queue_adapter import get_queue_env_vars
1819

@@ -38,6 +39,8 @@
3839
DJANGO_EXTERNAL_APPS=(str, ""),
3940
EXTERNAL_APP_KEYS=(str, "dummy"),
4041
EXTERNAL_MIDDLEWARES=(str, ""),
42+
REDIS_URL=(str, "redis://redis"),
43+
REDIS_STATS_INTERVAL=(str, "5.0"),
4144
CORS_ORIGIN_WHITELIST=(str, "http://127.0.0.1:3000"),
4245
AWS_ACCESS_KEY_ID=(str, "dummy"),
4346
AWS_SECRET_ACCESS_KEY=(str, "dummy"),
@@ -232,6 +235,11 @@
232235
# Queue settings
233236
QUEUE_PARAMS = get_queue_env_vars()
234237

238+
# Redis settings for job stats
239+
240+
REDIS_URL = env("REDIS_URL")
241+
REDIS_STATS_INTERVAL = env("REDIS_STATS_INTERVAL")
242+
235243

236244
# Cluster settings
237245
MULTI_NODE_MODE = False

estela-api/config/storage_backends.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from storages.backends.s3boto3 import S3Boto3Storage
2+
23
from config.settings import prod
34

45

0 commit comments

Comments
 (0)