Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds Scheduled Jobs Support to Load Manager #86

Merged
merged 4 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ clean: ## Cleans docker images
docker compose -p harvest-app down -v --remove-orphans

lint: ## Lints wtih ruff, isort, black
ruff .
ruff check .
isort .
black .

Expand Down
2 changes: 1 addition & 1 deletion app/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class HarvestSourceForm(FlaskForm):
)
frequency = SelectField(
"Frequency",
choices=["Manual", "Daily", "Weekly", "Biweekly", "Monthly"],
choices=["manual", "daily", "weekly", "biweekly", "monthly"],
validators=[DataRequired()],
)
user_requested_frequency = StringField(
Expand Down
90 changes: 46 additions & 44 deletions app/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
url_for,
)

from app.scripts.load_manager import schedule_first_job, trigger_manual_job
from database.interface import HarvesterDBInterface

from .forms import HarvestSourceForm, OrganizationForm
Expand Down Expand Up @@ -249,9 +250,9 @@ def add_organization():
@mod.route("/organization/<org_id>", methods=["GET"])
def get_organization(org_id=None):
if org_id:
org = db.get_organization(org_id)
org = db._to_dict(db.get_organization(org_id))
if request.args.get("type") and request.args.get("type") == "json":
return jsonify(org)
return org
else:
return render_template(
"view_data.html",
Expand All @@ -262,21 +263,21 @@ def get_organization(org_id=None):
)
else:
org = db.get_all_organizations()
return org
return db._to_dict(org)


# Edit Org
@mod.route("/organization/edit/<org_id>", methods=["GET", "POST"])
@login_required
def edit_organization(org_id=None):
if org_id:
org = db.get_organization(org_id)
org = db._to_dict(db.get_organization(org_id))
form = OrganizationForm(data=org)
if form.validate_on_submit():
new_org_data = make_new_org_contract(form)
org = db.update_organization(org_id, new_org_data)
if org:
flash(f"Updated org with ID: {org['id']}")
flash(f"Updated org with ID: {org.id}")
else:
flash("Failed to update organization.")
return redirect(f"/organization/{org_id}")
Expand Down Expand Up @@ -316,22 +317,28 @@ def add_harvest_source():
form = HarvestSourceForm()
organizations = db.get_all_organizations()
organization_choices = [
(str(org["id"]), f'{org["name"]} - {org["id"]}') for org in organizations
(str(org.id), f"{org.name} - {org.id}") for org in organizations
]
form.organization_id.choices = organization_choices

if request.is_json:
org = db.add_harvest_source(request.json)
if org:
return jsonify({"message": f"Added new harvest source with ID: {org.id}"})
source = db.add_harvest_source(request.json)
job_message = schedule_first_job(source.id)
if source and job_message:
return jsonify(
{
"message": f"Added new harvest source with ID: {source.id}. {job_message}"
}
)
else:
return jsonify({"error": "Failed to add harvest source."}), 400
else:
if form.validate_on_submit():
new_source = make_new_source_contract(form)
source = db.add_harvest_source(new_source)
if source:
flash(f"Updated source with ID: {source.id}")
job_message = schedule_first_job(source.id)
if source and job_message:
flash(f"Updated source with ID: {source.id}. {job_message}")
else:
flash("Failed to add harvest source.")
return redirect("/")
Expand All @@ -347,11 +354,11 @@ def add_harvest_source():
# View Source
@mod.route("/harvest_source/", methods=["GET"])
@mod.route("/harvest_source/<source_id>", methods=["GET", "POST"])
def get_harvest_source(source_id=None):
def get_harvest_source(source_id: str = None):
if source_id:
source = db.get_harvest_source(source_id)
source = db._to_dict(db.get_harvest_source(source_id))
if request.args.get("type") and request.args.get("type") == "json":
return jsonify(source)
return source
return render_template(
"view_data.html",
data=source,
Expand All @@ -361,24 +368,24 @@ def get_harvest_source(source_id=None):
)
else:
source = db.get_all_harvest_sources()
return source
return db._to_dict(source)


# Edit Source
@mod.route("/harvest_source/edit/<source_id>", methods=["GET", "POST"])
@login_required
def edit_harvest_source(source_id=None):
def edit_harvest_source(source_id: str = None):
if source_id:
source = db.get_harvest_source(source_id)
organizations = db.get_all_organizations()
source = db._to_dict(db.get_harvest_source(source_id))
organizations = db._to_dict(db.get_all_organizations())
organization_choices = [
(str(org["id"]), f'{org["name"]} - {org["id"]}') for org in organizations
]
form = HarvestSourceForm(data=source)
form.organization_id.choices = organization_choices
if form.validate_on_submit():
new_source_data = make_new_source_contract(form)
source = db.update_harvest_source(source_id, new_source_data)
source = db._to_dict(db.update_harvest_source(source_id, new_source_data))
if source:
flash(f"Updated source with ID: {source['id']}")
else:
Expand All @@ -400,7 +407,7 @@ def edit_harvest_source(source_id=None):
return "No harvest sources found for this organization", 404
else:
source = db.get_all_harvest_sources()
return jsonify(source)
return db._to_dict(source)


# Delete Source
Expand All @@ -422,11 +429,8 @@ def delete_harvest_source(source_id):
# Trigger Harvest
@mod.route("/harvest_source/harvest/<source_id>", methods=["GET"])
def trigger_harvest_source(source_id):
job = db.add_harvest_job({"harvest_source_id": source_id, "status": "manual"})
if job:
flash(f"Triggered harvest of source with ID: {source_id}")
else:
flash("Failed to add harvest job.")
message = trigger_manual_job(source_id)
flash(message)
return redirect(f"/harvest_source/{source_id}")


Expand All @@ -448,22 +452,18 @@ def add_harvest_job():
@mod.route("/harvest_job/", methods=["GET"])
@mod.route("/harvest_job/<job_id>", methods=["GET"])
def get_harvest_job(job_id=None):
try:
if job_id:
job = HarvesterDBInterface._to_dict(db.get_harvest_job(job_id))
return jsonify(job) if job else ("Not Found", 404)

source_id = request.args.get("harvest_source_id")
if source_id:
job = db.get_harvest_job_by_source(source_id)
if not job:
return "No harvest jobs found for this harvest source", 404
else:
job = db.get_all_harvest_jobs()
if job_id:
job = db.get_harvest_job(job_id)
return jsonify(job) if job else ("Not Found", 404)

return jsonify(job)
except Exception:
return "Please provide correct job_id or harvest_source_id"
source_id = request.args.get("harvest_source_id")
if source_id:
job = db.get_harvest_job_by_source(source_id)
if not job:
return "No harvest jobs found for this harvest source", 404
else:
job = db.get_all_harvest_jobs()
return db._to_dict(job)


# Update Job
Expand Down Expand Up @@ -547,14 +547,16 @@ def get_all_harvest_record_errors(record_id: str) -> list:

## Harvest Error
# Get error by id
@mod.route("/harvest_error/", methods=["GET"])
@mod.route("/harvest_error/<error_id>", methods=["GET"])
def get_harvest_error(error_id: str) -> dict:
def get_harvest_error(error_id: str = None) -> dict:
# retrieves the given error ( either job or record )
try:
if error_id:
error = db.get_harvest_error(error_id)
return error if error else ("Not Found", 404)
except Exception:
return "Please provide correct error_id"
else:
errors = db.get_all_harvest_errors()
return db._to_dict(errors)


## Test interface, will remove later
Expand Down
133 changes: 93 additions & 40 deletions app/scripts/load_manager.py
Original file line number Diff line number Diff line change
@@ -1,70 +1,123 @@
import logging
import os
from datetime import datetime

from database.interface import HarvesterDBInterface
from harvester.utils import CFHandler
from harvester.lib.cf_handler import CFHandler
from harvester.utils.general_utils import create_future_date

DATABASE_URI = os.getenv("DATABASE_URI")
CF_API_URL = os.getenv("CF_API_URL")
CF_SERVICE_USER = os.getenv("CF_SERVICE_USER")
CF_SERVICE_AUTH = os.getenv("CF_SERVICE_AUTH")
HARVEST_RUNNER_APP_GUID = os.getenv("HARVEST_RUNNER_APP_GUID")
CF_INSTANCE_INDEX = os.getenv("CF_INSTANCE_INDEX")

LM_MAX_TASKS_COUNT = 3
MAX_TASKS_COUNT = 3

interface = HarvesterDBInterface()

logger = logging.getLogger("harvest_admin")

def create_task(jobId):
return {
"app_guuid": HARVEST_RUNNER_APP_GUID,
"command": f"python harvester/harvest.py {jobId}",
"task_id": f"harvest-job-{jobId}",
}


def sort_jobs(jobs):
return sorted(jobs, key=lambda x: x["status"])


def load_manager():
def create_cf_handler():
# check for correct env vars to init CFHandler
if not CF_API_URL or not CF_SERVICE_USER or not CF_SERVICE_AUTH:
print("CFHandler is not configured correctly. Check your env vars.")
logger.info("CFHandler is not configured correctly. Check your env vars.")
return
return CFHandler(CF_API_URL, CF_SERVICE_USER, CF_SERVICE_AUTH)

cf_handler = CFHandler(CF_API_URL, CF_SERVICE_USER, CF_SERVICE_AUTH)

# confirm CF_INSTANCE_INDEX == 0 or bail
def create_task(job_id, cf_handler=None):
task_contract = {
"app_guuid": HARVEST_RUNNER_APP_GUID,
"command": f"python harvester/harvest.py {job_id}",
"task_id": f"harvest-job-{job_id}",
}
if cf_handler is None:
cf_handler = create_cf_handler()

cf_handler.start_task(**task_contract)
updated_job = interface.update_harvest_job(job_id, {"status": "in_progress"})
message = f"Updated job {updated_job.id} to in_progress"
logger.info(message)
return message


def trigger_manual_job(source_id):
source = interface.get_harvest_source(source_id)
jobs_in_progress = interface.get_harvest_jobs_by_filter(
{"harvest_source_id": source.id, "status": "in_progress"}
)
if len(jobs_in_progress):
return (
f"Can't trigger harvest. Job {jobs_in_progress[0].id} already in progress."
)
job_data = interface.add_harvest_job(
{
"harvest_source_id": source.id,
"status": "new",
"date_created": datetime.now(),
}
)
if job_data:
logger.info(f"Created new manual harvest job: for {job_data.harvest_source_id}")
return create_task(job_data.id)


def schedule_first_job(source_id):
future_jobs = interface.get_new_harvest_jobs_by_source_in_future(source_id)
# delete any future scheduled jobs
for job in future_jobs:
interface.delete_harvest_job(job.id)
logger.info(f"Deleted harvest job: {job.id} for source {source_id}")
# then schedule next job
return schedule_next_job(source_id)


def schedule_next_job(source_id):
source = interface.get_harvest_source(source_id)
if source.frequency != "Manual":
# schedule new future job
job_data = interface.add_harvest_job(
{
"harvest_source_id": source.id,
"status": "new",
"date_created": create_future_date(source.frequency),
}
)
message = f"Scheduled new harvest job: for {job_data.harvest_source_id} at {job_data.date_created}" # noqa E501
logger.info(message)
return message
else:
return "No job scheduled for manual source"


def load_manager():
# confirm CF_INSTANCE_INDEX == 0. we don't want multiple instances starting jobs
if os.getenv("CF_INSTANCE_INDEX") != "0":
print("CF_INSTANCE_INDEX is not set or not equal to zero")
logger.info("CF_INSTANCE_INDEX is not set or not equal to zero")
return

# filter harvestjobs by new (automated) & manual
jobs = interface.get_harvest_jobs_by_faceted_filter("status", ["new", "manual"])
cf_handler = create_cf_handler()

# get current list of all tasks
current_tasks = cf_handler.get_all_app_tasks(HARVEST_RUNNER_APP_GUID)
# filter out in_process tasks
running_tasks = cf_handler.get_all_running_tasks(current_tasks)
# get new jobs older than now
jobs = interface.get_new_harvest_jobs_in_past()

# get list of running tasks
running_tasks = cf_handler.get_all_running_app_tasks(HARVEST_RUNNER_APP_GUID)

# confirm tasks < MAX_JOBS_COUNT or bail
if running_tasks > LM_MAX_TASKS_COUNT:
print(f"{running_tasks} running_tasks > LM_MAX_TASKS_COUNT. can't proceed")
if running_tasks >= MAX_TASKS_COUNT:
logger.info(
f"{running_tasks} running_tasks >= max tasks count ({MAX_TASKS_COUNT})."
)
return
else:
slots = LM_MAX_TASKS_COUNT - running_tasks

# sort jobs by manual first
sorted_jobs = sort_jobs(jobs)

# slice off jobs to invoke
jobs_to_invoke = sorted_jobs[:slots]
slots = MAX_TASKS_COUNT - running_tasks

# invoke cf_task with next job(s)
# then mark that job(s) as running in the DB
print("Load Manager :: Updated Harvest Jobs")
for job in jobs_to_invoke:
task_contract = create_task(job["id"])
cf_handler.start_task(**task_contract)
updated_job = interface.update_harvest_job(job["id"], {"status": "in_progress"})
print(updated_job)
logger.info("Load Manager :: Updated Harvest Jobs")
for job in jobs[:slots]:
create_task(job.id, cf_handler)
schedule_next_job(job.harvest_source_id)
File renamed without changes.
Loading
Loading