Skip to content

Commit

Permalink
EnvVars at project/spider level and set for job envVars (#211)
Browse files Browse the repository at this point in the history
* BITMAKER-2845 Allow setting environment variables at spider and project levels (#172)
* Set environment variables at project and spider level
* Adding cronjobs envvars setting
* Save stats from Redis to the DB when stopping a job
* Show STOPPED status in estela-web

---------

Co-authored-by: Raymond Negron <raymond1242@Raymonds-MacBook-Air.local>
Co-authored-by: emegona <mateo@emegona.com>
  • Loading branch information
3 people committed Aug 15, 2023
1 parent 0089993 commit 21d199a
Show file tree
Hide file tree
Showing 36 changed files with 1,674 additions and 313 deletions.
34 changes: 25 additions & 9 deletions estela-api/api/serializers/job.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
from datetime import timedelta

from django.conf import settings
from django.shortcuts import get_object_or_404
from rest_framework import serializers

from api import errors
Expand All @@ -9,7 +7,7 @@
SpiderJobEnvVarSerializer,
SpiderJobTagSerializer,
)
from api.utils import update_stats_from_redis, delete_stats_from_redis
from api.utils import delete_stats_from_redis, update_stats_from_redis
from config.job_manager import job_manager
from core.models import (
DataStatus,
Expand Down Expand Up @@ -58,9 +56,18 @@ def get_spider(self, instance):
return {"sid": instance.spider.sid, "name": instance.spider.name}


class SpiderJobCreateEnvVarSerializer(serializers.Serializer):
evid = serializers.IntegerField(required=False, help_text="Env var id.")
name = serializers.CharField(required=True, help_text="Env var name.")
value = serializers.CharField(required=True, help_text="Env var value.")
masked = serializers.BooleanField(
required=False, default=False, help_text="Env var masked."
)


class SpiderJobCreateSerializer(serializers.ModelSerializer):
args = SpiderJobArgSerializer(many=True, required=False, help_text="Job arguments.")
env_vars = SpiderJobEnvVarSerializer(
env_vars = SpiderJobCreateEnvVarSerializer(
many=True, required=False, help_text="Job env variables."
)
tags = SpiderJobTagSerializer(many=True, required=False, help_text="Job tags.")
Expand Down Expand Up @@ -99,14 +106,23 @@ def create(self, validated_data):
SpiderJobArg.objects.create(job=job, **arg)

for env_var in env_vars_data:
SpiderJobEnvVar.objects.create(job=job, **env_var)
evid = env_var.get("evid", None)
if not evid:
SpiderJobEnvVar.objects.create(job=job, **env_var)
elif evid > 0:
env = get_object_or_404(SpiderJobEnvVar, evid=evid)
SpiderJobEnvVar.objects.create(
job=job,
name=env.name,
value=env.value,
masked=env.masked,
)

for tag_data in tags_data:
tag, _ = SpiderJobTag.objects.get_or_create(**tag_data)
job.tags.add(tag)

job.save()

return job


Expand Down Expand Up @@ -163,9 +179,9 @@ def update(self, instance, validated_data):
else:
if instance.status == SpiderJob.RUNNING_STATUS:
try:
update_stats_from_redis(instance)
update_stats_from_redis(instance, save_to_database=True)
delete_stats_from_redis(instance)
except:
except Exception:
pass
instance.save()
job_manager.delete_job(instance.name)
Expand Down
2 changes: 1 addition & 1 deletion estela-api/api/serializers/job_specific.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class Meta:
class SpiderJobEnvVarSerializer(serializers.ModelSerializer):
class Meta:
model = SpiderJobEnvVar
fields = ("name", "value", "masked")
fields = ("evid", "name", "value", "masked")

def to_representation(self, instance):
ret = super().to_representation(instance)
Expand Down
14 changes: 13 additions & 1 deletion estela-api/api/serializers/project.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from django.contrib.auth.models import User
from rest_framework import serializers

from core.models import DataStatus, Permission, Project, UsageRecord, Activity
from api.serializers.job_specific import SpiderJobEnvVarSerializer
from core.models import Activity, DataStatus, Permission, Project, UsageRecord


class UserDetailSerializer(serializers.ModelSerializer):
Expand Down Expand Up @@ -44,6 +45,9 @@ class ProjectSerializer(serializers.ModelSerializer):
required=False,
help_text="Users with permissions on this project.",
)
env_vars = SpiderJobEnvVarSerializer(
many=True, required=False, help_text="Project env variables."
)
container_image = serializers.CharField(
read_only=True, help_text="Path of the project's container image."
)
Expand All @@ -57,6 +61,7 @@ class Meta:
"framework",
"container_image",
"users",
"env_vars",
"data_status",
"data_expiry_days",
)
Expand Down Expand Up @@ -120,6 +125,9 @@ class ProjectUpdateSerializer(serializers.ModelSerializer):
pid = serializers.UUIDField(
read_only=True, help_text="A UUID identifying this project."
)
name = serializers.CharField(
write_only=True, required=False, help_text="Project name."
)
users = UserDetailSerializer(many=True, required=False, help_text="Affected users.")
email = serializers.EmailField(
write_only=True, required=False, help_text="Email address."
Expand All @@ -139,6 +147,9 @@ class ProjectUpdateSerializer(serializers.ModelSerializer):
required=False,
help_text="Set project framework.",
)
env_vars = SpiderJobEnvVarSerializer(
many=True, required=False, help_text="Project env variables."
)
permission = serializers.ChoiceField(
write_only=True,
choices=PERMISSION_CHOICES,
Expand Down Expand Up @@ -166,6 +177,7 @@ class Meta:
"email",
"action",
"framework",
"env_vars",
"permission",
"data_status",
"data_expiry_days",
Expand Down
37 changes: 31 additions & 6 deletions estela-api/api/serializers/spider.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from rest_framework import serializers

from api import errors
from api.serializers.job_specific import SpiderJobEnvVarSerializer
from api.utils import update_env_vars
from core.models import DataStatus, Spider


Expand All @@ -13,15 +16,29 @@ class SpiderSerializer(serializers.ModelSerializer):
required=True, help_text="Days before data expires."
)

env_vars = SpiderJobEnvVarSerializer(
many=True, required=False, help_text="Spider environment variables."
)

class Meta:
model = Spider
fields = ("sid", "name", "project", "data_status", "data_expiry_days")
fields = (
"sid",
"name",
"project",
"env_vars",
"data_status",
"data_expiry_days",
)


class SpiderUpdateSerializer(serializers.ModelSerializer):
sid = serializers.UUIDField(
read_only=True, help_text="A UUID identifying this spider."
)
env_vars = SpiderJobEnvVarSerializer(
many=True, required=False, help_text="Project environment variables."
)
data_status = serializers.ChoiceField(
choices=DataStatus.HIGH_LEVEL_OPTIONS,
required=False,
Expand All @@ -34,12 +51,20 @@ class SpiderUpdateSerializer(serializers.ModelSerializer):

class Meta:
model = Spider
fields = ("sid", "name", "data_status", "data_expiry_days")
fields = ("sid", "env_vars", "data_status", "data_expiry_days")

def update(self, instance, validated_data):
instance.data_status = validated_data.get("data_status", instance.data_status)
instance.data_expiry_days = validated_data.get(
"data_expiry_days", instance.data_expiry_days
)
data_status = validated_data.get("data_status", None)
data_expiry_days = validated_data.get("data_expiry_days", 1)
env_vars = validated_data.get("env_vars", [])
if data_status:
instance.data_status = data_status
if data_status == DataStatus.PENDING_STATUS and data_expiry_days > 0:
instance.data_expiry_days = data_expiry_days
else:
raise serializers.ValidationError({"error": errors.INVALID_DATA_STATUS})
if "env_vars" in validated_data:
update_env_vars(instance, env_vars, level="spider")

instance.save()
return instance
66 changes: 57 additions & 9 deletions estela-api/api/utils.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,69 @@
import redis
from datetime import timedelta

import redis
from django.conf import settings

from datetime import timedelta
from api import errors
from api.exceptions import DataBaseError
from config.job_manager import spiderdata_db_client
from core.models import SpiderJobEnvVar


def update_env_vars(instance, env_vars, level="project"):
env_vars_instance = instance.env_vars.all()
for env_var in env_vars:
if env_vars_instance.filter(**env_var).exists():
continue
elif env_var["masked"] is True and env_var["value"] == "__MASKED__":
continue
elif env_var["masked"] is False and env_var["value"] == "__MASKED__":
env_vars_instance.filter(name=env_var["name"]).update(masked=False)
elif env_var["name"] in [value.name for value in env_vars_instance]:
env_vars_instance.filter(name=env_var["name"]).update(
value=env_var["value"],
masked=env_var["masked"],
)
else:
if level == "project":
SpiderJobEnvVar.objects.create(project=instance, **env_var)
elif level == "spider":
SpiderJobEnvVar.objects.create(spider=instance, **env_var)

for env_var in env_vars_instance:
if env_var.name not in [value["name"] for value in env_vars]:
env_var.delete()

def update_stats_from_redis(job):

def update_stats_from_redis(job, save_to_database=False):
redis_conn = redis.from_url(settings.REDIS_URL)
job_stats = redis_conn.hgetall(f"scrapy_stats_{job.key}")
job_stats = {key.decode(): value.decode() for key, value in job_stats.items()}

job.lifespan = timedelta(
seconds=int(float(job_stats.get(b"elapsed_time_seconds", b"0").decode()))
)
job.total_response_bytes = int(
job_stats.get(b"downloader/response_bytes", b"0").decode()
seconds=int(float(job_stats.get("elapsed_time_seconds", "0")))
)
job.item_count = int(job_stats.get(b"item_scraped_count", b"0").decode())
job.request_count = int(job_stats.get(b"downloader/request_count", b"0").decode())
job.total_response_bytes = int(job_stats.get("downloader/response_bytes", "0"))
job.item_count = int(job_stats.get("item_scraped_count", "0"))
job.request_count = int(job_stats.get("downloader/request_count", "0"))

if save_to_database and job_stats:
if not spiderdata_db_client.get_connection():
raise DataBaseError({"error": errors.UNABLE_CONNECT_DB})

for key, value in job_stats.items():
if value.isdigit():
job_stats[key] = int(value)
else:
try:
job_stats[key] = float(value)
except ValueError:
pass

job_collection_name = "{}-{}-job_stats".format(job.spider.sid, job.jid)
job_stats["_id"] = job_collection_name
spiderdata_db_client.insert_one_to_collection(
str(job.spider.project.pid), "job_stats", job_stats
)


def delete_stats_from_redis(job):
Expand Down
2 changes: 0 additions & 2 deletions estela-api/api/views/job.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from datetime import timedelta

from django.shortcuts import get_object_or_404
from django_filters.rest_framework import DjangoFilterBackend
from drf_yasg import openapi
Expand Down
14 changes: 9 additions & 5 deletions estela-api/api/views/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,20 @@
from rest_framework.response import Response

from api import errors
from api.mixins import BaseViewSet, ActionHandlerMixin
from api.serializers.job import ProjectJobSerializer, SpiderJobSerializer
from api.mixins import ActionHandlerMixin, BaseViewSet
from api.serializers.cronjob import ProjectCronJobSerializer, SpiderCronJobSerializer
from api.serializers.job import ProjectJobSerializer, SpiderJobSerializer
from api.serializers.project import (
ActivitySerializer,
ProjectActivitySerializer,
ProjectSerializer,
ProjectUpdateSerializer,
ProjectUsageSerializer,
UsageRecordSerializer,
ProjectActivitySerializer,
ActivitySerializer,
)
from api.utils import update_env_vars
from core.models import (
Activity,
DataStatus,
Permission,
Project,
Expand All @@ -31,7 +32,6 @@
SpiderJob,
UsageRecord,
User,
Activity,
)


Expand Down Expand Up @@ -101,6 +101,7 @@ def update(self, request, *args, **kwargs):
data_expiry_days = serializer.validated_data.pop("data_expiry_days", 0)
description = ""
framework = serializer.validated_data.pop("framework", "")
env_vars = serializer.validated_data.pop("env_vars", None)

if name:
old_name = instance.name
Expand All @@ -111,6 +112,9 @@ def update(self, request, *args, **kwargs):
instance.framework = framework
description = f"changed the framework to {framework}."

if env_vars is not None:
update_env_vars(instance, env_vars, level="project")

user = request.user
is_superuser = user.is_superuser or user.is_staff
if user_email and (is_superuser or user_email != user.email):
Expand Down
35 changes: 35 additions & 0 deletions estela-api/core/migrations/0033_auto_20230814_2238.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Generated by Django 3.1.14 on 2023-08-14 22:38

import django.db.models.deletion
from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("core", "0032_project_framework"),
]

operations = [
migrations.AddField(
model_name="spiderjobenvvar",
name="project",
field=models.ForeignKey(
help_text="Project pid.",
null=True,
on_delete=django.db.models.deletion.CASCADE,
related_name="env_vars",
to="core.project",
),
),
migrations.AddField(
model_name="spiderjobenvvar",
name="spider",
field=models.ForeignKey(
help_text="Spider sid.",
null=True,
on_delete=django.db.models.deletion.CASCADE,
related_name="env_vars",
to="core.spider",
),
),
]
14 changes: 14 additions & 0 deletions estela-api/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,20 @@ class SpiderJobEnvVar(models.Model):
null=True,
help_text="Cron job cjid.",
)
project = models.ForeignKey(
Project,
on_delete=models.CASCADE,
related_name="env_vars",
null=True,
help_text="Project pid.",
)
spider = models.ForeignKey(
Spider,
on_delete=models.CASCADE,
related_name="env_vars",
null=True,
help_text="Spider sid.",
)
name = models.CharField(max_length=1000, help_text="Env variable name.")
value = models.CharField(max_length=1000, help_text="Env variable value.")
masked = models.BooleanField(
Expand Down

0 comments on commit 21d199a

Please sign in to comment.