Skip to content

Commit

Permalink
adds scheduled jobs to load manager;
Browse files Browse the repository at this point in the history
adds better support for manual jobs;
removes 'manual' from job status enum;
removes 'dict' type returns from interface in favor of ORM;
cleans up tests after removal of to_dict;
  • Loading branch information
btylerburton committed Jun 26, 2024
1 parent 8e408cb commit 5c3a452
Show file tree
Hide file tree
Showing 27 changed files with 647 additions and 392 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ clean: ## Cleans docker images
docker compose -p harvest-app down -v --remove-orphans

lint: ## Lints wtih ruff, isort, black
ruff .
ruff check .
isort .
black .

Expand Down
2 changes: 1 addition & 1 deletion app/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class HarvestSourceForm(FlaskForm):
)
frequency = SelectField(
"Frequency",
choices=["Manual", "Daily", "Weekly", "Biweekly", "Monthly"],
choices=["manual", "daily", "weekly", "biweekly", "monthly"],
validators=[DataRequired()],
)
user_requested_frequency = StringField(
Expand Down
90 changes: 46 additions & 44 deletions app/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
url_for,
)

from app.scripts.load_manager import schedule_first_job, trigger_manual_job
from database.interface import HarvesterDBInterface

from .forms import HarvestSourceForm, OrganizationForm
Expand Down Expand Up @@ -249,9 +250,9 @@ def add_organization():
@mod.route("/organization/<org_id>", methods=["GET"])
def get_organization(org_id=None):
if org_id:
org = db.get_organization(org_id)
org = db._to_dict(db.get_organization(org_id))
if request.args.get("type") and request.args.get("type") == "json":
return jsonify(org)
return org
else:
return render_template(
"view_data.html",
Expand All @@ -262,21 +263,21 @@ def get_organization(org_id=None):
)
else:
org = db.get_all_organizations()
return org
return db._to_dict(org)


# Edit Org
@mod.route("/organization/edit/<org_id>", methods=["GET", "POST"])
@login_required
def edit_organization(org_id=None):
if org_id:
org = db.get_organization(org_id)
org = db._to_dict(db.get_organization(org_id))
form = OrganizationForm(data=org)
if form.validate_on_submit():
new_org_data = make_new_org_contract(form)
org = db.update_organization(org_id, new_org_data)
if org:
flash(f"Updated org with ID: {org['id']}")
flash(f"Updated org with ID: {org.id}")
else:
flash("Failed to update organization.")
return redirect(f"/organization/{org_id}")
Expand Down Expand Up @@ -316,22 +317,28 @@ def add_harvest_source():
form = HarvestSourceForm()
organizations = db.get_all_organizations()
organization_choices = [
(str(org["id"]), f'{org["name"]} - {org["id"]}') for org in organizations
(str(org.id), f"{org.name} - {org.id}") for org in organizations
]
form.organization_id.choices = organization_choices

if request.is_json:
org = db.add_harvest_source(request.json)
if org:
return jsonify({"message": f"Added new harvest source with ID: {org.id}"})
source = db.add_harvest_source(request.json)
job_message = schedule_first_job(source.id)
if source and job_message:
return jsonify(
{
"message": f"Added new harvest source with ID: {source.id}. {job_message}"
}
)
else:
return jsonify({"error": "Failed to add harvest source."}), 400
else:
if form.validate_on_submit():
new_source = make_new_source_contract(form)
source = db.add_harvest_source(new_source)
if source:
flash(f"Updated source with ID: {source.id}")
job_message = schedule_first_job(source.id)
if source and job_message:
flash(f"Updated source with ID: {source.id}. {job_message}")
else:
flash("Failed to add harvest source.")
return redirect("/")
Expand All @@ -347,11 +354,11 @@ def add_harvest_source():
# View Source
@mod.route("/harvest_source/", methods=["GET"])
@mod.route("/harvest_source/<source_id>", methods=["GET", "POST"])
def get_harvest_source(source_id=None):
def get_harvest_source(source_id: str = None):
if source_id:
source = db.get_harvest_source(source_id)
source = db._to_dict(db.get_harvest_source(source_id))
if request.args.get("type") and request.args.get("type") == "json":
return jsonify(source)
return source
return render_template(
"view_data.html",
data=source,
Expand All @@ -361,24 +368,24 @@ def get_harvest_source(source_id=None):
)
else:
source = db.get_all_harvest_sources()
return source
return db._to_dict(source)


# Edit Source
@mod.route("/harvest_source/edit/<source_id>", methods=["GET", "POST"])
@login_required
def edit_harvest_source(source_id=None):
def edit_harvest_source(source_id: str = None):
if source_id:
source = db.get_harvest_source(source_id)
organizations = db.get_all_organizations()
source = db._to_dict(db.get_harvest_source(source_id))
organizations = db._to_dict(db.get_all_organizations())
organization_choices = [
(str(org["id"]), f'{org["name"]} - {org["id"]}') for org in organizations
]
form = HarvestSourceForm(data=source)
form.organization_id.choices = organization_choices
if form.validate_on_submit():
new_source_data = make_new_source_contract(form)
source = db.update_harvest_source(source_id, new_source_data)
source = db._to_dict(db.update_harvest_source(source_id, new_source_data))
if source:
flash(f"Updated source with ID: {source['id']}")
else:
Expand All @@ -400,7 +407,7 @@ def edit_harvest_source(source_id=None):
return "No harvest sources found for this organization", 404
else:
source = db.get_all_harvest_sources()
return jsonify(source)
return db._to_dict(source)


# Delete Source
Expand All @@ -422,11 +429,8 @@ def delete_harvest_source(source_id):
# Trigger Harvest
@mod.route("/harvest_source/harvest/<source_id>", methods=["GET"])
def trigger_harvest_source(source_id):
job = db.add_harvest_job({"harvest_source_id": source_id, "status": "manual"})
if job:
flash(f"Triggered harvest of source with ID: {source_id}")
else:
flash("Failed to add harvest job.")
message = trigger_manual_job(source_id)
flash(message)
return redirect(f"/harvest_source/{source_id}")


Expand All @@ -448,22 +452,18 @@ def add_harvest_job():
@mod.route("/harvest_job/", methods=["GET"])
@mod.route("/harvest_job/<job_id>", methods=["GET"])
def get_harvest_job(job_id=None):
try:
if job_id:
job = HarvesterDBInterface._to_dict(db.get_harvest_job(job_id))
return jsonify(job) if job else ("Not Found", 404)

source_id = request.args.get("harvest_source_id")
if source_id:
job = db.get_harvest_job_by_source(source_id)
if not job:
return "No harvest jobs found for this harvest source", 404
else:
job = db.get_all_harvest_jobs()
if job_id:
job = db.get_harvest_job(job_id)
return jsonify(job) if job else ("Not Found", 404)

return jsonify(job)
except Exception:
return "Please provide correct job_id or harvest_source_id"
source_id = request.args.get("harvest_source_id")
if source_id:
job = db.get_harvest_job_by_source(source_id)
if not job:
return "No harvest jobs found for this harvest source", 404
else:
job = db.get_all_harvest_jobs()
return db._to_dict(job)


# Update Job
Expand Down Expand Up @@ -547,14 +547,16 @@ def get_all_harvest_record_errors(record_id: str) -> list:

## Harvest Error
# Get error by id
@mod.route("/harvest_error/", methods=["GET"])
@mod.route("/harvest_error/<error_id>", methods=["GET"])
def get_harvest_error(error_id: str) -> dict:
def get_harvest_error(error_id: str = None) -> dict:
# retrieves the given error ( either job or record )
try:
if error_id:
error = db.get_harvest_error(error_id)
return error if error else ("Not Found", 404)
except Exception:
return "Please provide correct error_id"
else:
errors = db.get_all_harvest_errors()
return db._to_dict(errors)


## Test interface, will remove later
Expand Down
129 changes: 91 additions & 38 deletions app/scripts/load_manager.py
Original file line number Diff line number Diff line change
@@ -1,70 +1,123 @@
import logging
import os
from datetime import datetime

from database.interface import HarvesterDBInterface
from harvester.utils import CFHandler
from harvester.lib.cf_handler import CFHandler
from harvester.utils.utils import create_future_date

DATABASE_URI = os.getenv("DATABASE_URI")
CF_API_URL = os.getenv("CF_API_URL")
CF_SERVICE_USER = os.getenv("CF_SERVICE_USER")
CF_SERVICE_AUTH = os.getenv("CF_SERVICE_AUTH")
HARVEST_RUNNER_APP_GUID = os.getenv("HARVEST_RUNNER_APP_GUID")
CF_INSTANCE_INDEX = os.getenv("CF_INSTANCE_INDEX")

LM_MAX_TASKS_COUNT = 3
MAX_TASKS_COUNT = 3

interface = HarvesterDBInterface()

logger = logging.getLogger("harvest_admin")

def create_task(jobId):
return {

def create_cf_handler():
# check for correct env vars to init CFHandler
if not CF_API_URL or not CF_SERVICE_USER or not CF_SERVICE_AUTH:
logger.info("CFHandler is not configured correctly. Check your env vars.")
return
return CFHandler(CF_API_URL, CF_SERVICE_USER, CF_SERVICE_AUTH)


def create_task(jobId, cf_handler=None):
task_contract = {
"app_guuid": HARVEST_RUNNER_APP_GUID,
"command": f"python harvester/harvest.py {jobId}",
"task_id": f"harvest-job-{jobId}",
}


def sort_jobs(jobs):
return sorted(jobs, key=lambda x: x["status"])
if cf_handler is None:
cf_handler = create_cf_handler()

cf_handler.start_task(**task_contract)
updated_job = interface.update_harvest_job(jobId, {"status": "in_progress"})
message = f"Updated job {updated_job.id} to in_progress"
logger.info(message)
return message


def trigger_manual_job(source_id):
source = interface.get_harvest_source(source_id)
jobs_in_progress = interface.get_harvest_jobs_by_filter(
{"harvest_source_id": source.id, "status": "in_progress"}
)
if len(jobs_in_progress):
return (
f"Can't trigger harvest. Job {jobs_in_progress[0].id} already in progress."
)
job_data = interface.add_harvest_job(
{
"harvest_source_id": source.id,
"status": "new",
"date_created": datetime.now(),
}
)
if job_data:
logger.info(f"Created new manual harvest job: for {job_data.harvest_source_id}")
return create_task(job_data.id)


def schedule_first_job(source_id):
future_jobs = interface.get_new_harvest_jobs_by_source_in_future(source_id)
# delete any future scheduled jobs
for job in future_jobs:
interface.delete_harvest_job(job.id)
logger.info(f"Deleted harvest job: {job.id} for source {source_id}")
# then schedule next job
return schedule_next_job(source_id)


def schedule_next_job(source_id):
source = interface.get_harvest_source(source_id)
if source.frequency != "Manual":
# schedule new future job
job_data = interface.add_harvest_job(
{
"harvest_source_id": source.id,
"status": "new",
"date_created": create_future_date(source.frequency),
}
)
message = f"Scheduled new harvest job: for {job_data.harvest_source_id} at {job_data.date_created}" # noqa E501
logger.info(message)
return message
else:
return "No job scheduled for manual source"


def load_manager():
if not CF_API_URL or not CF_SERVICE_USER or not CF_SERVICE_AUTH:
print("CFHandler is not configured correctly. Check your env vars.")
return

cf_handler = CFHandler(CF_API_URL, CF_SERVICE_USER, CF_SERVICE_AUTH)

# confirm CF_INSTANCE_INDEX == 0 or bail
# confirm CF_INSTANCE_INDEX == 0. we don't want multiple instances starting jobs
if os.getenv("CF_INSTANCE_INDEX") != "0":
print("CF_INSTANCE_INDEX is not set or not equal to zero")
logger.info("CF_INSTANCE_INDEX is not set or not equal to zero")
return

# filter harvestjobs by new (automated) & manual
jobs = interface.get_harvest_jobs_by_faceted_filter("status", ["new", "manual"])
cf_handler = create_cf_handler()

# get current list of all tasks
current_tasks = cf_handler.get_all_app_tasks(HARVEST_RUNNER_APP_GUID)
# filter out in_process tasks
running_tasks = cf_handler.get_all_running_tasks(current_tasks)
# get new jobs older than now
jobs = interface.get_new_harvest_jobs_in_past()

# get list of running tasks
running_tasks = cf_handler.get_all_running_app_tasks(HARVEST_RUNNER_APP_GUID)

# confirm tasks < MAX_JOBS_COUNT or bail
if running_tasks > LM_MAX_TASKS_COUNT:
print(f"{running_tasks} running_tasks > LM_MAX_TASKS_COUNT. can't proceed")
if running_tasks > MAX_TASKS_COUNT:
logger.info(
f"{running_tasks} running_tasks > max tasks count ({MAX_TASKS_COUNT})."
)
return
else:
slots = LM_MAX_TASKS_COUNT - running_tasks

# sort jobs by manual first
sorted_jobs = sort_jobs(jobs)

# slice off jobs to invoke
jobs_to_invoke = sorted_jobs[:slots]
slots = MAX_TASKS_COUNT - running_tasks

# invoke cf_task with next job(s)
# then mark that job(s) as running in the DB
print("Load Manager :: Updated Harvest Jobs")
for job in jobs_to_invoke:
task_contract = create_task(job["id"])
cf_handler.start_task(**task_contract)
updated_job = interface.update_harvest_job(job["id"], {"status": "in_progress"})
print(updated_job)
logger.info("Load Manager :: Updated Harvest Jobs")
for job in jobs[:slots]:
create_task(job.id, cf_handler)
schedule_next_job(job.harvest_source_id)
File renamed without changes.
Loading

1 comment on commit 5c3a452

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests Skipped Failures Errors Time
2 0 💤 0 ❌ 0 🔥 7.999s ⏱️

Please sign in to comment.