# Image Export Workflow

In [2]:

# Select the kernel observatorio-ipa(Python 3.12.7) .venv/Scripts/python
import json
import ee
import ee.batch
import ee.data
import logging
import importlib
from pathlib import Path
from datetime import date
from gee_toolbox.gee import assets as toolbox_assets
import sqlite3

from observatorio_ipa.services import connections
from observatorio_ipa.core.workflows import wflows_connections
from observatorio_ipa.core import config
from observatorio_ipa.core.defaults import *
from observatorio_ipa.utils import dates as utils_dates
from observatorio_ipa.services.gee import dates as gee_dates
from observatorio_ipa.services.gee.processes import binary, merge



logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

In [3]:
# general settings
#! JS code shows: users/observatorionieves/Cuencas/Andes # Verified on 2025-06-30

settings = {
    "user": "osn-imageautomation-dev@ee-observatorionieves.iam.gserviceaccount.com",
    "service_credentials_file": Path(
        "../secrets/ee-observatorionieves-288939dbc1cf.json"
    ),
    "monthly_assets_path": Path(
        "projects/ee-observatorionieves/assets/MODIS/Andes_MCDS4S5_Yearly_Monthly"
    ),
    "monthly_image_prefix": "Andes_MCDS4S5_Yearly_Monthly",
    "months_list": ["2025-06", "2025-07", "2025-08"],
    "aoi_asset_path": Path("projects/ee-observatorionieves/assets/Modules/Andes"),
    "dem_asset_path": Path(
        "projects/ee-observatorionieves/assets/Modules/DEM_SRTM_reproj_MODIS_463_Andes"
    ),
}

In [4]:
# Connect to Google Earth Engine

runtime_service_account = connections.GoogleServiceAccount(
    settings["service_credentials_file"].as_posix(),
)
connections.connect_to_gee(runtime_service_account)



## Step by Step

In [5]:
from observatorio_ipa.core.workflows.images import monthly_export
# importlib.reload(monthly_export)

### Create Job in DB

In [6]:
from observatorio_ipa.utils import db
# importlib.reload(db)
db_path = Path("../db/observatorio_ipa.db")
db_path.exists()

True

In [None]:
# Write Job to DB
def create_job(conn):
    now = db.utc_now()
    job_id = db.new_id()
    
    conn.execute(
        """INSERT INTO jobs (id, job_status, image_export_status, stats_export_status, report_status, created_at, updated_at)
        VALUES (?, ?, ?, ?, ?, ?, ?)""",
        (
            job_id,
            "RUNNING",
            "PENDING",
            "PENDING",
            "PENDING",
            db.datetime_to_iso(now),
            db.datetime_to_iso(now),
        ),
    )
    return job_id


In [None]:
with db.db(db_path) as conn:
    job_id = create_job(conn)
    job = conn.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)).fetchone()
    print(dict(job))

{'id': 'eb306128-0eae-4ed8-9ea4-5c21baf1a9ec', 'job_status': 'RUNNING', 'image_export_status': 'PENDING', 'stats_export_status': 'PENDING', 'report_status': 'PENDING', 'email_to': None, 'error': None, 'created_at': '2025-08-20T23:49:04.612341+00:00', 'updated_at': '2025-08-20T23:49:04.612341+00:00'}


### Make Monthly Export Tasks

In [14]:
from observatorio_ipa.services.gee.exports import ExportTaskList, ExportTask

In [None]:
monthly_collection_path = settings["monthly_assets_path"].as_posix() # type: ignore
name_prefix = settings["monthly_image_prefix"]
aoi_path = settings["aoi_asset_path"].as_posix()
dem_path = settings["dem_asset_path"].as_posix()
months_list = settings["months_list"]


In [31]:
# ----- MOCKING EXPORT RESULTS FOR TESTING

# monthly_export_proc determines the monthly images that need to be exported, creates the Snow/Cloud TAC bands and creates the
# Export tasks.
#! export tasks are not being started within the function but probably should be
#! When running in automated mode export plan details will be lost since they are not being written anywhere (db)
try:
    # monthly_export_results = monthly_export.monthly_export_proc(
    #     monthly_collection_path=monthly_collection_path,
    #     aoi_path=aoi_path,
    #     dem_path=dem_path,
    #     name_prefix=name_prefix,
    #     months_list=months_list,
    # )

    monthly_export_results = {
        "frequency": "monthly",
        "initial_export_plan": ["2025-05", "2025-06", "2025-07", "2025-08"],
        "images_pending_export": ["2025-06", "2025-07", "2025-08"],
        "images_excluded": [
            {"2025-05": "already exported"},
            {"2025-08": "Current month (Terra)"},
        ],
        "images_to_export": ["2025-06", "2025-07"],
        "export_tasks": ExportTaskList(
            [
                ExportTask(
                    type="image",
                    name=monthly_export._fix_name_prefix(name_prefix) + "_2025_05",
                    target="gee",
                    path=monthly_collection_path,
                    task_status="ALREADY_EXISTS",
                ),
                ExportTask(
                    type="image",
                    name=monthly_export._fix_name_prefix(name_prefix) + "_2025_06",
                    target="gee",
                    path=monthly_collection_path,
                    task_status="RUNNING",
                ),
                ExportTask(
                    type="image",
                    name=monthly_export._fix_name_prefix(name_prefix) + "_2025_07",
                    target="gee",
                    path=monthly_collection_path,
                    task_status="RUNNING",
                ),
                ExportTask(
                    type="image",
                    name=monthly_export._fix_name_prefix(name_prefix) + "_2025_08",
                    target="gee",
                    path=monthly_collection_path,
                    task_status="EXCLUDED",
                    error="Current month (Terra)",
                ),
            ]
        ),
    }
except Exception as e:
    print(f"Error occurred: {e}")
    # Update Job Status in DB
    with db.db(db_path) as conn:
        conn.execute(
            """UPDATE jobs SET 
                job_status = 'FAILED', 
                error = ?,
                updated_at = ? WHERE id = ?""",
            (str(e),db.datetime_to_iso(db.utc_now()), job_id),
        )
    export_tasks = ExportTaskList([])  # No tasks to export in case of error

In [32]:
with db.db(db_path) as conn:
    job = conn.execute(
        "SELECT * FROM jobs WHERE id = ?", (job_id,)
    ).fetchone()
    print(dict(job))

{'id': 'eb306128-0eae-4ed8-9ea4-5c21baf1a9ec', 'job_status': 'RUNNING', 'image_export_status': 'PENDING', 'stats_export_status': 'PENDING', 'report_status': 'PENDING', 'email_to': None, 'error': None, 'created_at': '2025-08-20T23:49:04.612341+00:00', 'updated_at': '2025-08-20T23:49:04.612341+00:00'}


In [33]:
monthly_export_results

{'frequency': 'monthly',
 'initial_export_plan': ['2025-05', '2025-06', '2025-07', '2025-08'],
 'images_pending_export': ['2025-06', '2025-07', '2025-08'],
 'images_excluded': [{'2025-05': 'already exported'},
  {'2025-08': 'Current month (Terra)'}],
 'images_to_export': ['2025-06', '2025-07'],
 'export_tasks': ExportList(export_tasks=[ExportTask(type=image, name=Andes_MCDS4S5_Yearly_Monthly__2025_05, target=gee, status=EXCLUDED, task_status=ALREADY_EXISTS), ExportTask(type=image, name=Andes_MCDS4S5_Yearly_Monthly__2025_06, target=gee, status=PENDING, task_status=RUNNING), ExportTask(type=image, name=Andes_MCDS4S5_Yearly_Monthly__2025_07, target=gee, status=PENDING, task_status=RUNNING), ExportTask(type=image, name=Andes_MCDS4S5_Yearly_Monthly__2025_08, target=gee, status=EXCLUDED, task_status=EXCLUDED)])}

In [34]:
# Start Export tasks
export_tasks: ExportTaskList = monthly_export_results['export_tasks']
export_task=export_tasks.start_exports()
print(export_task)
print(export_tasks)


{'PENDING': 2, 'EXCLUDED': 2}
(type=image, name=Andes_MCDS4S5_Yearly_Monthly__2025_05, target=gee, status=EXCLUDED, task_status=ALREADY_EXISTS)
(type=image, name=Andes_MCDS4S5_Yearly_Monthly__2025_06, target=gee, status=PENDING, task_status=RUNNING)
(type=image, name=Andes_MCDS4S5_Yearly_Monthly__2025_07, target=gee, status=PENDING, task_status=RUNNING)
(type=image, name=Andes_MCDS4S5_Yearly_Monthly__2025_08, target=gee, status=EXCLUDED, task_status=EXCLUDED)


### Update Parent Job in DB (skip if DB not set)


In [35]:
with db.db(db_path) as conn:
    current_job = conn.execute(
        "SELECT * FROM jobs WHERE id = ?", (job_id,)
    ).fetchone()

    if current_job['job_status'] == "FAILED":
        print("Job is already marked as FAILED.")
        pass
    
    elif current_job['image_export_status'] == "PENDING":
        if len(export_tasks) == 0:
            image_export_status = "NOT_REQUIRED"
        else:
            image_export_status = "RUNNING"
        
        print(f"New Image Export Status: {image_export_status}")
    
        conn.execute(
            "UPDATE jobs SET image_export_status = ?, updated_at = ? WHERE id = ?",
            (image_export_status, db.datetime_to_iso(db.utc_now()), job_id)
        )
    
    job = conn.execute(
        "SELECT * FROM jobs WHERE id = ?", (job_id,)
    ).fetchone()
    print(dict(job))

New Image Export Status: RUNNING
{'id': 'eb306128-0eae-4ed8-9ea4-5c21baf1a9ec', 'job_status': 'RUNNING', 'image_export_status': 'RUNNING', 'stats_export_status': 'PENDING', 'report_status': 'PENDING', 'email_to': None, 'error': None, 'created_at': '2025-08-20T23:49:04.612341+00:00', 'updated_at': '2025-08-20T23:49:04.806846+00:00'}


### Save export tasks to DB

In [36]:
# db row state should not be the same as status from ExportTask. ExportTask.status is self calculated from task_status.
# whereas the db state refers to the state for polling

DEFAULT_POLLING_INTERVAL_SEC = 15

def _make_db_export_state(export_task:ExportTask) -> str:
    if export_task.status in ["FAILED"]:
        db_state = "FAILED"
    elif export_task.status in ["PENDING", "UNKNOWN"]:
        db_state = "RUNNING"
    else:
        db_state = "COMPLETED"
    return db_state


def add_exportTask_to_db(conn: sqlite3.Connection, job_id: str, export_task: ExportTask):
    now = db.utc_now()
    now_iso = db.datetime_to_iso(now)

    polling_state = _make_db_export_state(export_task)
    conn.execute(
        """INSERT INTO exports (
            id, job_id, state, type, name, target, path, task_id, 
            task_status, next_check_at, poll_interval_sec, 
            created_at, updated_at)
           VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
        (
            export_task.id,
            job_id,
            polling_state,
            export_task.type,
            export_task.name,
            export_task.target,
            export_task.path.as_posix(),
            export_task.task,
            export_task.task_status,
            now_iso,
            DEFAULT_POLLING_INTERVAL_SEC,
            now_iso,
            now_iso,
        ),
    )

In [37]:

if export_tasks:
    with db.db(db_path) as conn:
        cur = conn.cursor()
        try:
            for task in export_tasks:
                add_exportTask_to_db(conn, job_id, task)
            print(f"Inserted {conn.total_changes} export tasks into the database.")
        # Update Job Status in DB
        
        except Exception as e:
            print(f"Error occurred while inserting export tasks: {e}")
            


Inserted 4 export tasks into the database.


In [38]:
# Quick DB Cleanup

# with db.db(db_path) as conn:
#     conn.execute("DELETE FROM jobs WHERE id IN (SELECT id FROM jobs)")
#     conn.execute("DELETE FROM exports WHERE id IN (SELECT id FROM exports)")
#     remaining_jobs = conn.execute("SELECT COUNT(*) FROM jobs").fetchone()[0]
#     remaining_tasks = conn.execute("SELECT COUNT(*) FROM exports").fetchone()[0]
#     print(f"Remaining jobs after deletion: {remaining_jobs}")
#     print(f"Remaining tasks after deletion: {remaining_tasks}")

#Quick Job Exports Cleanup
# with db.db(db_path) as conn:
#     conn.execute("DELETE FROM exports WHERE job_id = ?", (job_id,))

In [39]:
with db.db(db_path) as conn:
    bd_task_list = conn.execute("SELECT * FROM exports WHERE job_id = ?", (job_id,)).fetchall()
for task in bd_task_list:
    print(dict(task))

{'id': '7ae7ff71-11fe-4e26-af51-a8cbcae36a7a', 'job_id': 'eb306128-0eae-4ed8-9ea4-5c21baf1a9ec', 'state': 'COMPLETED', 'type': 'image', 'name': 'Andes_MCDS4S5_Yearly_Monthly__2025_05', 'target': 'gee', 'path': 'projects/ee-observatorionieves/assets/MODIS/Andes_MCDS4S5_Yearly_Monthly', 'task_id': None, 'task_status': 'ALREADY_EXISTS', 'error': None, 'next_check_at': '2025-08-20T23:49:04.863191+00:00', 'lease_until': None, 'poll_interval_sec': 15, 'attempts': 0, 'deadline_at': None, 'last_error': None, 'created_at': '2025-08-20T23:49:04.863191+00:00', 'updated_at': '2025-08-20T23:49:04.863191+00:00'}
{'id': '10bcb58f-1444-43a1-8261-804c35d1eeca', 'job_id': 'eb306128-0eae-4ed8-9ea4-5c21baf1a9ec', 'state': 'RUNNING', 'type': 'image', 'name': 'Andes_MCDS4S5_Yearly_Monthly__2025_06', 'target': 'gee', 'path': 'projects/ee-observatorionieves/assets/MODIS/Andes_MCDS4S5_Yearly_Monthly', 'task_id': None, 'task_status': 'RUNNING', 'error': None, 'next_check_at': '2025-08-20T23:49:04.863191+00:00',

### Update Job Status after inserting Image Tasks

In [40]:
def get_state_of_tasks(conn, job_id, type):
    rows = conn.execute(
        "SELECT state FROM exports WHERE job_id=? AND type=?", (job_id, type)
    ).fetchall()
    return [r["state"] for r in rows]


def update_job(conn, job_id):
    now_iso = db.datetime_to_iso(db.utc_now())

    # Exit if job is not RUNNING - Assumes Nothing to Update
    job = conn.execute("SELECT * FROM jobs WHERE id=?", (job_id,)).fetchone()
    if job["job_status"] != "RUNNING":
        return # Do Nothing

    # ---------- IMAGE_EXPORT_STATUS ----------
    image_states = get_state_of_tasks(conn, job_id, "image")
    match job["image_export_status"]:
        case "NOT_REQUIRED":
            if not image_states:
                # Normal: No image export required/Expected - Move on to stats assessment
               pass
            elif all(s != "RUNNING" for s in image_states):
                # Not Normal - Export tasks might have been created but failed to update Job Status
                error_message = (
                    "No image exports were expected but export tasks were created."
                )
                conn.execute(
                    """UPDATE jobs SET job_status='FAILED', image_export_status='FAILED',
                    error=?, updated_at=? WHERE id=?""",
                    (error_message, now_iso, job_id),
                )
                return
            else:
                # Not Normal: Same as above but giving time to complete exports before reporting
                return

        case "PENDING":
            if not image_states:
                # Normal - Might still be waiting to create Image Export Tasks
                # ! Add logic for deadline (Pending over x days)
                return

            elif all(s != "RUNNING" for s in image_states):
                # Not Normal - Export tasks might have been created but failed to update Job Status and never changed to "RUNNING"
                error_message = "Cannot verify all exports were created/completed successfully"
                conn.execute(
                    """UPDATE jobs SET job_status='FAILED', image_export_status='FAILED',
                    error=?, updated_at=? WHERE id=?""",
                    (error_message, now_iso, job_id),
                )
                return
            else:
                # Not Normal: Same as above but giving time to complete exports before reporting
                return

        case "RUNNING":
            if not image_states:
                # Not Normal: If image_export_status = RUNNING, at least 1 image export should be present
                error_message = "Image tasks failed to create or could not be saved to DB. Check logs for details."
                conn.execute(
                    """UPDATE jobs SET job_status='FAILED', image_export_status='FAILED', 
                   error=?, updated_at=? WHERE id=?""",
                    (error_message, now_iso, job_id),
                )
                return

            elif all(s != "RUNNING" for s in image_states):
                # Normal: All image export tasks have now completed
                conn.execute(
                    """UPDATE jobs SET image_export_status='COMPLETED', 
                    updated_at=? WHERE id=?""",
                    (now_iso, job_id),
            )
            else:
                # Normal: 1+ exports are still running - No change - keep "RUNNING" state
                return

        case "FAILED":
            if not image_states or all(s != "RUNNING" for s in image_states):
                # Not Normal: Something went wrong somewhere - Unknown error
                error_message = job['error'] or "Unknown error - Something went wrong somewhere - check the logs."
                conn.execute(
                    """UPDATE jobs SET job_status='FAILED',
                    error=?, updated_at=? WHERE id=?""",
                    (error_message, now_iso, job_id),
                )
                return
            else :
                # Not Normal: Something went wrong somewhere - but keep "RUNNING" state until tasks complete
                return
        case "COMPLETED":
            if not image_states or all(s == "RUNNING" for s in image_states):
                # Not Normal: Not expecting to still be 'RUNNING' exports if image_export_status is COMPLETED
                # revert to "RUNNING" stats has not Ran
                if job["stats_export_status"] in ("NOT_REQUIRED", "PENDING",):
                    conn.execute(
                        """UPDATE jobs SET image_export_status='RUNNING', updated_at=? WHERE id=?""",
                        (now_iso, job_id),
                    )
                return
            else:
                # Normal: Continue to stats status assessment
                pass

    # ---------- STATS_EXPORT_STATUS ----------
    # Sanity check in case I missed something above
    if job["image_export_status"] not in ("NOT_REQUIRED", "COMPLETED"):
        return

    table_states = get_state_of_tasks(conn, job_id, "table")
    match job["stats_export_status"]:
        case "NOT_REQUIRED":
            if not table_states:
                # Normal: No table export required/Expected - Shouldn't have come this far though
                conn.execute(
                    """UPDATE jobs SET job_status='COMPLETED', updated_at=? WHERE id=?""",
                    (now_iso, job_id),
                )
                return
            elif all(s != "RUNNING" for s in table_states):
                # Not Normal - Export tasks might have been created but failed to update Job Status
                error_message = (
                    "No Stats exports were expected but export tasks were created."
                )
                conn.execute(
                    """UPDATE jobs SET job_status='FAILED', stats_export_status='FAILED',
                    error=?, updated_at=? WHERE id=?""",
                    (error_message, now_iso, job_id),
                )
                return
            else:
                # Not Normal: Same as above but giving time to complete exports before reporting
                return

        case "PENDING":
            if not table_states:
                # Normal - Might still be waiting to create Stats Export Tasks
                # ! Add logic for deadline (Pending over x days)
                return

            elif all(s != "RUNNING" for s in table_states):
                # Not Normal - Export tasks might have been created but failed to update Job Status and never changed to "RUNNING"
                error_message = (
                    "Cannot verify all exports were created/completed successfully"
                )
                conn.execute(
                    """UPDATE jobs SET job_status='FAILED', stats_export_status='FAILED',
                    error=?, updated_at=? WHERE id=?""",
                    (error_message, now_iso, job_id),
                )
                return
            else:
                # Not Normal: Same as above but giving time to complete exports before reporting
                return

        case "RUNNING":
            if not table_states:
                # Not Normal: If stats_export_status = RUNNING, at least 1 stats export should be present
                error_message = "Stats tasks failed to create or could not be saved to DB. Check logs for details."
                conn.execute(
                    """UPDATE jobs SET job_status='FAILED', stats_export_status='FAILED', 
                    error=?, updated_at=? WHERE id=?""",
                    (error_message, now_iso, job_id),
                )
                return

            elif all(s != "RUNNING" for s in table_states):
                # Normal: All stats export tasks have now completed
                conn.execute(
                    """UPDATE jobs SET job_status='COMPLETED', stats_export_status='COMPLETED', 
                    updated_at=? WHERE id=?""",
                    (now_iso, job_id),
            )
            else:
                # Normal: 1+ exports are still running - No change - keep "RUNNING" state
                return

        case "FAILED":
            if not table_states or all(s != "RUNNING" for s in table_states):
                # Not Normal: Something went wrong somewhere - Unknown error
                error_message = job['error'] or "Unknown error - Something went wrong somewhere - check the logs."
                conn.execute(
                    """UPDATE jobs SET job_status='FAILED',
                    error=?, updated_at=? WHERE id=?""",
                    (error_message, now_iso, job_id),
                )
                return
            else :
                # Not Normal: Something went wrong somewhere - but keep "RUNNING" state until tasks complete
                return
            
        case "COMPLETED":
            if not table_states or all(s == "RUNNING" for s in table_states):
                # Not Normal: Not expecting to still be 'RUNNING' exports if stats_export_status is COMPLETED
                # revert to "RUNNING" stats until all tasks complete if reporting has not gone out
                if job["report_status"] in ("SKIP", "PENDING",):
                    conn.execute(
                        """UPDATE jobs SET stats_export_status='RUNNING', updated_at=? WHERE id=?""",
                        (now_iso, job_id),
                    )
                return
            else:
                # Normal: Continue to reporting status assessment
                pass

In [41]:
with db.db(db_path) as conn:
    print("---- Before Job update ----")
    job_before = conn.execute(
        "SELECT * FROM jobs WHERE id = ?", (job_id,)
    ).fetchone()
    print(dict(job_before))
    tasks_by_type_status = conn.execute(
        "SELECT type, state, COUNT(*) AS count FROM exports WHERE job_id = ? GROUP BY type, state",
        (job_id,),
    ).fetchall()
    for _task in tasks_by_type_status:
        print(dict(_task))

    print("---- After Job update ----")
    update_job(conn, job_id)
    
    job_after = conn.execute(
        "SELECT * FROM jobs WHERE id = ?", (job_id,)
    ).fetchone()
    print(dict(job_after))

---- Before Job update ----
{'id': 'eb306128-0eae-4ed8-9ea4-5c21baf1a9ec', 'job_status': 'RUNNING', 'image_export_status': 'RUNNING', 'stats_export_status': 'PENDING', 'report_status': 'PENDING', 'email_to': None, 'error': None, 'created_at': '2025-08-20T23:49:04.612341+00:00', 'updated_at': '2025-08-20T23:49:04.806846+00:00'}
{'type': 'image', 'state': 'COMPLETED', 'count': 2}
{'type': 'image', 'state': 'RUNNING', 'count': 2}
---- After Job update ----
{'id': 'eb306128-0eae-4ed8-9ea4-5c21baf1a9ec', 'job_status': 'RUNNING', 'image_export_status': 'RUNNING', 'stats_export_status': 'PENDING', 'report_status': 'PENDING', 'email_to': None, 'error': None, 'created_at': '2025-08-20T23:49:04.612341+00:00', 'updated_at': '2025-08-20T23:49:04.806846+00:00'}


In [105]:
# Random Job generator for status testing
import random
from itertools import product
from observatorio_ipa.services.gee.exports import GEE_TASK_VALID_STATUS
def random_task_list_generator(conn, job_id, type):
    num_tasks = random.randint(0, 1)
    for _ in range(num_tasks):
        add_exportTask_to_db(conn, job_id, ExportTask(
            type=type,
            name=f"test_task_{random.randint(1, 1000)}",
            target="gee",
            path=Path("projects/ee-observatorionieves/assets/MODIS/Andes_MCDS4S5_Yearly_Monthly"),
            task_status=random.choice(GEE_TASK_VALID_STATUS),
        ))
    return

def random_job_generator():
    job_options = ('RUNNING',)
    image_options = ('PENDING', 'NOT_REQUIRED', 'RUNNING', 'COMPLETED', 'FAILED',)
    stats_options = (
        "PENDING",
        "NOT_REQUIRED",
        "RUNNING",
        "COMPLETED",
        "FAILED",
    )
    combinations = list(product(job_options, image_options, stats_options))

    chosen_combo = random.choice(combinations)

    with db.db(db_path) as conn:
        job_id = db.new_id()
        now = db.utc_now()
        now_iso = db.datetime_to_iso(now)
        conn.execute(
            """INSERT INTO jobs (id, job_status, image_export_status, stats_export_status, report_status, created_at, updated_at)
                VALUES (?, ?, ?, ?, ?, ?, ?)""",
            (
                job_id,
                chosen_combo[0],
                chosen_combo[1],
                chosen_combo[2],
                "PENDING",
                now_iso,
                now_iso,
            ),
        )
        random_task_list_generator(conn, job_id, "image")
        random_task_list_generator(conn, job_id, "table")
    return job_id

In [106]:
random_job_id = random_job_generator()
with db.db(db_path) as conn:
    print("---- Before Job update ----")
    job_before=conn.execute("SELECT * FROM jobs WHERE id = ?", (random_job_id,)).fetchone()
    print(dict(job_before))
    tasks_by_type_status = conn.execute(
        "SELECT type, state, COUNT(*) AS count FROM exports WHERE job_id = ? GROUP BY type, state", (random_job_id,)).fetchall()
    for _task in tasks_by_type_status:
        print(dict(_task))

    print("---- After Job update ----")   
    update_job(conn, random_job_id)
    update_job(conn, random_job_id)
    job_after=conn.execute("SELECT * FROM jobs WHERE id = ?", (random_job_id,)).fetchone()
    print(dict(job_after))

---- Before Job update ----
{'id': 'c479ab75-6af8-4996-8638-4cda42d4002e', 'job_status': 'RUNNING', 'image_export_status': 'NOT_REQUIRED', 'stats_export_status': 'RUNNING', 'report_status': 'PENDING', 'email_to': None, 'error': None, 'created_at': '2025-08-20T22:08:40.276249+00:00', 'updated_at': '2025-08-20T22:08:40.276249+00:00'}
---- After Job update ----
{'id': 'c479ab75-6af8-4996-8638-4cda42d4002e', 'job_status': 'FAILED', 'image_export_status': 'NOT_REQUIRED', 'stats_export_status': 'FAILED', 'report_status': 'PENDING', 'email_to': None, 'error': 'Stats tasks failed to create or could not be saved to DB. Check logs for details.', 'created_at': '2025-08-20T22:08:40.276249+00:00', 'updated_at': '2025-08-20T22:08:40.294247+00:00'}


## Polling Jobs

### Check/Update Image Task Status
    

In [42]:
# Identify running jobs 

# Limiting to 1 job for testing this is no

with db.db(db_path) as conn:
    running_jobs = conn.execute(
        """SELECT id FROM jobs WHERE job_status = 'RUNNING'"""
    ).fetchall()

    running_job = running_jobs[0] if running_jobs else None
    if running_job:
        running_job_id = running_job['id']
        print(f"Running job found: {running_job['id']}")
    else:
        print("No running jobs found.")



Running job found: eb306128-0eae-4ed8-9ea4-5c21baf1a9ec


In [11]:
import datetime
def now_iso_plus(seconds):
    return db.datetime_to_iso(db.utc_now() + datetime.timedelta(seconds=seconds))

def dt_iso_plus(dt, seconds):
    return db.datetime_to_iso(dt + datetime.timedelta(seconds=seconds))

In [9]:
# Claim a batch of due tasks that are pending (not BLOCKED/COMPLETED)
LEASE_SECONDS = 60
MAX_BATCH_SIZE = 20 #! Check GEE to see max rate 
def lease_due_tasks(conn):
    now = db.utc_now()
    now_iso = db.datetime_to_iso(now)
    # original code pulled everything with state ('PENDING','RUNNING','SUCCEEDED','FAILED','TIMED_OUT')
    # Why is TIMED_OUT still being included?
    conn.execute(
        f"""
        UPDATE exports SET lease_until = ? 
        WHERE id in (
            SELECT id FROM exports 
            WHERE state in ('RUNNING', 'TIMED_OUT') 
                AND next_check_at <= ? 
                AND (lease_until is NULL OR lease_until <= ?)
            LIMIT {MAX_BATCH_SIZE}
            )""",
        (now_iso_plus(LEASE_SECONDS), now_iso, now_iso),
    )

    #! Review, this would pull all records with lease set before this run. Leases are 60 seconds and polls will be 120 so there shouldn't be any pending leases
    #! but, if multiple workers are running they might pull each other's leases
    rows = conn.execute(
        """
        SELECT * FROM exports 
        WHERE lease_until > ? 
            AND next_check_at <=?
        """,
        (now_iso, now_iso),
    ).fetchall()

    return rows



In [173]:
import ee.batch

In [176]:
my_list = ee.batch.Task.list()

In [180]:
len(my_list)

22

In [179]:
print(my_list[0])

<Task W5VNYYNMKT7RGHI5ZGLMKJ75 EXPORT_IMAGE: Andes_MCDS4S5_Yearly_Monthly_2025_07 (READY)>


In [199]:
my_task = ee.batch.Task(
    task_id="W5VNYYNMKT7RGHI5ZGLMKJ75",
    task_type=ee.batch.Task.Type["EXPORT_IMAGE"],
    state=ee.batch.Task.State["READY"],
)

In [201]:
print(my_task)

<Task "W5VNYYNMKT7RGHI5ZGLMKJ75">


In [202]:
my_task.status()

{'state': 'READY',
 'description': 'Andes_MCDS4S5_Yearly_Monthly_2025_07',
 'priority': 100,
 'creation_timestamp_ms': 1755803493025,
 'update_timestamp_ms': 1755803506030,
 'start_timestamp_ms': 0,
 'task_type': 'EXPORT_IMAGE',
 'id': 'W5VNYYNMKT7RGHI5ZGLMKJ75',
 'name': 'projects/ee-observatorionieves/operations/W5VNYYNMKT7RGHI5ZGLMKJ75'}

In [186]:
for state in ee.batch.Task.State:
    print(state)


State.UNSUBMITTED
State.READY
State.RUNNING
State.COMPLETED
State.FAILED
State.CANCEL_REQUESTED
State.CANCELLED


In [192]:
task_states = ee.batch.Task.State


In [198]:
task_states["READY"]

<State.READY: 'READY'>

In [7]:
import random

def random_state_testing() -> str:
    # should raise an error if index is out of bounds, which is a potential state 
    random_options = ("SUBMITTED", "PENDING", "STARTED", "READY", "RUNNING", "COMPLETED", "FAILED", "CANCELED")
    idx = random.randint(0, len(random_options) + 2)
    try:
        return random_options[idx]
    except IndexError:
        raise ValueError("Random State doesn't Exist")


def exportTask_from_db_row(db_row):
    try:
        match db_row["type"]:
            case "image":
                task_type = ee.batch.Task.Type["EXPORT_IMAGE"]
            case "table":
                task_type = ee.batch.Task.Type["EXPORT_TABLE"]
            case _:
                raise ValueError(f"Unknown export type: {db_row['type']}")

        task_state = ee.batch.Task.State[db_row["state"]]
        task = ee.batch.Task(task_id=db_row["task_id"], task_type=task_type, state=task_state)

    except Exception as e:
        task = None

    return ExportTask(
        type=db_row["type"],
        name=db_row["name"],
        target=db_row["target"],
        path=db_row["path"],
        task=task,
        task_status=db_row["task_status"],
        id=db_row["id"]
    )

def update_task_status(conn, db_task):
    now = db.utc_now()
    now_iso = db.datetime_to_iso(now)
    next_poll_interval = db.next_backoff(db_task["poll_interval_sec"])
    next_check_at = dt_iso_plus(now, next_poll_interval)

    # Only poll task status if not terminal. This is a double check in case a task with this status is provided
    if db_task["state"] in ["COMPLETED", "FAILED", "TIMED_OUT"]:
        return

    #! deadline_at is not set anywhere, needs review
    if (
        db_task["deadline_at"]
        and datetime.datetime.fromisoformat(db_task["deadline_at"]) < now
    ):
        print(
            f"Task {db_task['id']} is past its deadline. updating status to TIMED_OUT"
        )

        conn.execute(
            """UPDATE exports SET state = 'TIMED_OUT', updated_at=? WHERE id = ?""",
            (now_iso, db_task["id"]),
        )
        return 

    try:
        # attempt to get new state
        #! Replace with real GEE status query
        export_task = exportTask_from_db_row(db_task)
        print(export_task)
        export_task.query_status()
        # export_task.task_status= random_state_testing()
        print(export_task)
        new_export_status = export_task.status
        new_task_status = export_task.task_status
        new_state = _make_db_export_state(export_task)  # remote_status(db_task['task_id'])
        print(f"New task status: {new_export_status}, New state: {new_state}")

    except Exception as e:
        # Not Normal: Backoff if error in getting status - Try again later
        print("Simulating GEE non-response")
        conn.execute(
            """UPDATE exports SET attempts=attempts+1, poll_interval_sec=?, next_check_at=?, last_error=?, updated_at=? WHERE id = ?""",
            (
                next_poll_interval,
                next_check_at,
                str(e),
                now_iso,
                db_task["id"],
            ),
        )
        return

    if new_state in ["RUNNING"]:
        # Normal: Backoff if task is still running - Try again later
        conn.execute(
            """UPDATE exports SET state = 'RUNNING', task_status=?, poll_interval_sec=?, next_check_at=?, updated_at=? WHERE id = ?""",
            (
                new_task_status,
                next_poll_interval,
                next_check_at,
                now_iso,
                db_task["id"],
            ),
        )
    elif new_state in ["COMPLETED"]:
        # Normal: Task completed successfully
        conn.execute(
            """UPDATE exports SET state = 'COMPLETED', task_status=?, last_error = NULL, updated_at=? WHERE id = ?""",
            (
                new_task_status,
                now_iso,
                db_task["id"],
            ),
        )
    elif new_state in ["FAILED"]:
        # Not Normal: Task failed - Mark as FAILED
        #! Need to find how to get GEE Fail status
        conn.execute(
            """UPDATE exports SET state = 'FAILED', task_status=?, updated_at=?, last_error = ? WHERE id = ?""",
            (new_task_status, now_iso, "GEE failed status", db_task["id"]),
        )
    else:
        # Not Normal: Unknown state - Mark as UNKNOWN and log error
        # Try again until we hit deadline
        conn.execute(
            """UPDATE exports SET state = 'UNKNOWN', task_status=?, next_check_at=?, updated_at=?, last_error = ? WHERE id = ?""",
            (
                new_task_status,
                next_check_at,
                now_iso,
                f"Unknown state {new_state}",
                db_task["id"],
                ),
            )



In [15]:

# Iterate over each due task
with db.db(db_path) as conn:
    due_tasks = lease_due_tasks(conn)
    print("--------- DUE TASKS ---------")
    for db_task in due_tasks:
        print(
            f"task: {db_task['id']} - {db_task['state']} - {db_task['task_status']} - {db_task['last_error']} - {db_task['next_check_at']} - {db.utc_now()}"
        )
        update_task_status(conn, db_task)
    print()

    print("--------- UPDATED TASKS ---------")
    updated_tasks = conn.execute(
        """
        SELECT a.id AS job_id, b.* 
        FROM jobs AS a 
            JOIN exports AS b ON a.id = b.job_id 
        WHERE a.job_status='RUNNING'
        ORDER BY next_check_at""").fetchall()
    for task in updated_tasks:
        time_until_next_check = datetime.datetime.fromisoformat(task['next_check_at']) - db.utc_now()
        print(f"job: {task['job_id'][0:7]} task: {task['id'][0:7]} - {task['state']} - {task['task_status']} - {task['last_error']} - {task['next_check_at']} - {db.utc_now()} - {time_until_next_check}")
        

--------- DUE TASKS ---------

--------- UPDATED TASKS ---------
job: d258473 task: c235f5b - FAILED - FAILED - None - 2025-08-21T19:23:57.341910+00:00 - 2025-08-22 01:21:58.636695+00:00 - -1 day, 18:01:58.705215
job: d258473 task: a9d8d06 - FAILED - FAILED - None - 2025-08-21T19:23:57.341910+00:00 - 2025-08-22 01:21:58.636695+00:00 - -1 day, 18:01:58.705215
job: d258473 task: b26d69f - FAILED - FAILED - None - 2025-08-21T19:23:57.341910+00:00 - 2025-08-22 01:21:58.636695+00:00 - -1 day, 18:01:58.705215
job: d258473 task: 057e2f1 - FAILED - FAILED - None - 2025-08-21T19:23:57.341910+00:00 - 2025-08-22 01:21:58.636695+00:00 - -1 day, 18:01:58.705215
job: d258473 task: 73cd90f - FAILED - FAILED - None - 2025-08-21T19:23:57.341910+00:00 - 2025-08-22 01:21:58.636695+00:00 - -1 day, 18:01:58.705215
job: d258473 task: 6f6b38a - FAILED - FAILED - None - 2025-08-21T19:23:57.341910+00:00 - 2025-08-22 01:21:58.636695+00:00 - -1 day, 18:01:58.705215
job: d258473 task: 3a5a62b - FAILED - FAILED - 

In [113]:
# RESET task for testing
# with db.db(db_path) as conn:
#     funky_id = "ab5d7cff-d3d5-44ba-a85f-5fd19b03bf56"
#     conn.execute(
#         "UPDATE exports SET state = 'RUNNING', task_status='RUNNING' WHERE id = ?",
#         (funky_id,),
#     )
#     funky_task = conn.execute("SELECT * FROM exports WHERE id = ?", (funky_id,)).fetchone()
#     print(f"Updated task: {funky_task['id']} - {funky_task['state']} - {funky_task['task_status']} - {funky_task['last_error']}")

### Update Job if Image Exports Completed

In [77]:
def _print_job_details(conn, job_id):
    job = conn.execute(
        "SELECT * FROM jobs WHERE id = ?", (job_id,)
    ).fetchone()
    print(dict(job))
    
    tasks_by_type_status = conn.execute(
        "SELECT type, state, COUNT(*) AS count FROM exports WHERE job_id = ? GROUP BY type, state",
        (job_id,),
    ).fetchall()
    for _task in tasks_by_type_status:
        print(dict(_task))

In [97]:
with db.db(db_path) as conn:
    jobs = conn.execute("SELECT * FROM jobs WHERE job_status IN ('RUNNING')").fetchall()
    for job in jobs:
        poll_job_id = job['id']
        print(f"##### {poll_job_id} #####")

        print("---- Before Job update ----")
        _print_job_details(conn, poll_job_id)

        update_job(conn, poll_job_id)

        print("---- After Job update ----")
        _print_job_details(conn, poll_job_id)

        print()

##### eb306128-0eae-4ed8-9ea4-5c21baf1a9ec #####
---- Before Job update ----
{'id': 'eb306128-0eae-4ed8-9ea4-5c21baf1a9ec', 'job_status': 'RUNNING', 'image_export_status': 'RUNNING', 'stats_export_status': 'PENDING', 'report_status': 'PENDING', 'email_to': None, 'error': None, 'created_at': '2025-08-20T23:49:04.612341+00:00', 'updated_at': '2025-08-20T23:49:04.806846+00:00'}
{'type': 'image', 'state': 'COMPLETED', 'count': 3}
{'type': 'image', 'state': 'FAILED', 'count': 1}
---- After Job update ----
{'id': 'eb306128-0eae-4ed8-9ea4-5c21baf1a9ec', 'job_status': 'RUNNING', 'image_export_status': 'COMPLETED', 'stats_export_status': 'PENDING', 'report_status': 'PENDING', 'email_to': None, 'error': None, 'created_at': '2025-08-20T23:49:04.612341+00:00', 'updated_at': '2025-08-21T00:44:42.558335+00:00'}
{'type': 'image', 'state': 'COMPLETED', 'count': 3}
{'type': 'image', 'state': 'FAILED', 'count': 1}



## Stats Export Workflow 

In [79]:
def dummy_stats_export_tasks():
    dummy_stat_task1 = ExportTask(
    type="table",
    name=f"stats_task_{job_id}",
    target="gee",
    path="projects/observatorio-ipa/stats",
    task_status="RUNNING",  # Simulating a running task
)
    dummy_stat_task2 = ExportTask(
        type="table",
        name=f"stats_task_{job_id}_2",
        target="gee",
        path="projects/observatorio-ipa/stats",
        task_status="RUNNING",  # Simulating a running task
    )

    return ExportTaskList([dummy_stat_task1, dummy_stat_task2])

In [80]:
def stats_export(conn, job_id):
    now_iso = db.datetime_to_iso(db.utc_now())
    job = conn.execute("SELECT * FROM jobs WHERE id=?", (job_id,)).fetchone()

    # Skip if job is not RUNNING or image exports are not COMPLETED or stats exports is not PENDING
    if job['job_status'] != "RUNNING" or job['image_export_status'] in ("PENDING", "RUNNING") or job['stats_export_status'] != "PENDING":
        return

    # Simulate stats export tasks creation - Replace with actual logic to create stats export tasks
    # Logic to create Export Stats - Fake tasks for now
    try :
        stats_task_list = dummy_stats_export_tasks()
        # Insert stats tasks into the database
        for task in stats_task_list:
            add_exportTask_to_db(conn, job_id, task)
    except Exception as e:
        print(f"Error occurred while creating stats export tasks: {e}")
        conn.execute(
            """UPDATE jobs SET job_status='FAILED', stats_export_status='FAILED',
            error=?, updated_at=? WHERE id=?""",
            (str(e), now_iso, job_id),
        )
        return

    # Update job stats_export_status to RUNNING
    if stats_task_list:
        new_stats_export_status = "RUNNING"
    else :
        new_stats_export_status = "COMPLETED"
    conn.execute(
        """UPDATE jobs SET stats_export_status=?, updated_at=? WHERE id=?""",
        (new_stats_export_status,now_iso, job_id),
    )

In [98]:
# Orchestrate per job
with db.db(db_path) as conn:
    jobs = conn.execute("SELECT * FROM jobs WHERE job_status IN ('RUNNING')").fetchall()
    for job in jobs:
        poll_job_id = job["id"]
        print(f"##### {poll_job_id} #####")

        print("---- Before Starting Stats ----")
        _print_job_details(conn, poll_job_id)

        stats_export(conn, poll_job_id)

        print("---- After Starting Stats ----")
        _print_job_details(conn, poll_job_id)
        print()

##### eb306128-0eae-4ed8-9ea4-5c21baf1a9ec #####
---- Before Starting Stats ----
{'id': 'eb306128-0eae-4ed8-9ea4-5c21baf1a9ec', 'job_status': 'RUNNING', 'image_export_status': 'COMPLETED', 'stats_export_status': 'PENDING', 'report_status': 'PENDING', 'email_to': None, 'error': None, 'created_at': '2025-08-20T23:49:04.612341+00:00', 'updated_at': '2025-08-21T00:44:42.558335+00:00'}
{'type': 'image', 'state': 'COMPLETED', 'count': 3}
{'type': 'image', 'state': 'FAILED', 'count': 1}
---- After Starting Stats ----
{'id': 'eb306128-0eae-4ed8-9ea4-5c21baf1a9ec', 'job_status': 'RUNNING', 'image_export_status': 'COMPLETED', 'stats_export_status': 'RUNNING', 'report_status': 'PENDING', 'email_to': None, 'error': None, 'created_at': '2025-08-20T23:49:04.612341+00:00', 'updated_at': '2025-08-21T00:45:00.121259+00:00'}
{'type': 'image', 'state': 'COMPLETED', 'count': 3}
{'type': 'image', 'state': 'FAILED', 'count': 1}
{'type': 'table', 'state': 'RUNNING', 'count': 2}



### Check/Update Stats Task Status (Polling)

In [None]:
# Iterate over each due task
with db.db(db_path) as conn:
    due_tasks = lease_due_tasks(conn)
    print("--------- DUE TASKS ---------")
    for db_task in due_tasks:
        print(
            f"task: {db_task['id']} - {db_task['state']} - {db_task['task_status']} - {db_task['last_error']} - {db_task['next_check_at']} - {db.utc_now()}"
        )
        update_task_status(conn, db_task)
    print()

    print("--------- UPDATED TASKS ---------")
    updated_tasks = conn.execute(
        """
        SELECT a.id AS job_id, b.* 
        FROM jobs AS a 
            JOIN exports AS b ON a.id = b.job_id 
        WHERE a.job_status='RUNNING'
        ORDER BY next_check_at"""
    ).fetchall()
    for task in updated_tasks:
        time_until_next_check = (
            datetime.datetime.fromisoformat(task["next_check_at"]) - db.utc_now()
        )
        print(
            f"job: {task['job_id'][0:7]} task: {task['id'][0:7]} - {task['state']} - {task['task_status']} - {task['last_error']} - {task['next_check_at']} - {db.utc_now()} - {time_until_next_check}"
        )

--------- DUE TASKS ---------
task: a5bfa506-518f-45b5-aac8-c4787aefb5a6 - RUNNING - PENDING - Random State doesn't Exist - 2025-08-21T00:49:49.101634+00:00 - 2025-08-21 00:50:25.734813+00:00
(type=table, name=stats_task_eb306128-0eae-4ed8-9ea4-5c21baf1a9ec, target=gee, status=PENDING, task_status=PENDING)
(type=table, name=stats_task_eb306128-0eae-4ed8-9ea4-5c21baf1a9ec, target=gee, status=COMPLETED, task_status=CANCELED)
New task status: COMPLETED, New state: COMPLETED

--------- UPDATED TASKS ---------
job: eb30612 task: 7ae7ff7 - COMPLETED - ALREADY_EXISTS - None - 2025-08-20T23:49:04.863191+00:00 - 2025-08-21 00:50:25.734813+00:00 - -1 day, 22:58:39.128378
job: eb30612 task: aa6a692 - COMPLETED - EXCLUDED - None - 2025-08-20T23:49:04.863191+00:00 - 2025-08-21 00:50:25.735813+00:00 - -1 day, 22:58:39.127378
job: eb30612 task: 10bcb58 - COMPLETED - COMPLETED - None - 2025-08-20T23:50:33.688377+00:00 - 2025-08-21 00:50:25.735813+00:00 - -1 day, 23:00:07.952564
job: eb30612 task: edbe

### Update Job Status 

In [157]:
with db.db(db_path) as conn:
    jobs = conn.execute("SELECT * FROM jobs WHERE job_status IN ('RUNNING')").fetchall()
    for job in jobs:
        poll_job_id = job["id"]
        print(f"##### {poll_job_id} #####")

        print("---- Before Job update ----")
        _print_job_details(conn, poll_job_id)

        update_job(conn, poll_job_id)

        print("---- After Job update ----")
        _print_job_details(conn, poll_job_id)

        print()

##### eb306128-0eae-4ed8-9ea4-5c21baf1a9ec #####
---- Before Job update ----
{'id': 'eb306128-0eae-4ed8-9ea4-5c21baf1a9ec', 'job_status': 'RUNNING', 'image_export_status': 'COMPLETED', 'stats_export_status': 'RUNNING', 'report_status': 'PENDING', 'email_to': None, 'error': None, 'created_at': '2025-08-20T23:49:04.612341+00:00', 'updated_at': '2025-08-21T00:45:00.121259+00:00'}
{'type': 'image', 'state': 'COMPLETED', 'count': 3}
{'type': 'image', 'state': 'FAILED', 'count': 1}
{'type': 'table', 'state': 'COMPLETED', 'count': 1}
{'type': 'table', 'state': 'FAILED', 'count': 1}
---- After Job update ----
{'id': 'eb306128-0eae-4ed8-9ea4-5c21baf1a9ec', 'job_status': 'COMPLETED', 'image_export_status': 'COMPLETED', 'stats_export_status': 'COMPLETED', 'report_status': 'PENDING', 'email_to': None, 'error': None, 'created_at': '2025-08-20T23:49:04.612341+00:00', 'updated_at': '2025-08-21T00:53:32.900002+00:00'}
{'type': 'image', 'state': 'COMPLETED', 'count': 3}
{'type': 'image', 'state': 'FAIL

In [158]:
# View state of all tasks 
with db.db(db_path) as conn:
    updated_tasks = conn.execute(
        """
        SELECT a.id AS job_id, a.job_status, b.* 
        FROM jobs AS a 
            JOIN exports AS b ON a.id = b.job_id 
        ORDER BY a.id"""
    ).fetchall()
    for task in updated_tasks:
        time_until_next_check = (
            datetime.datetime.fromisoformat(task["next_check_at"]) - db.utc_now()
        )
        print(
            f"job: {task['job_id'][0:7]} status: {task['job_status']} - task: {task['id'][0:7]} - {task['state']} - {task['task_status']} - {task['last_error']} - {time_until_next_check}"
        )

job: eb30612 status: COMPLETED - task: 7ae7ff7 - COMPLETED - ALREADY_EXISTS - None - -1 day, 22:54:55.619581
job: eb30612 status: COMPLETED - task: 10bcb58 - COMPLETED - COMPLETED - None - -1 day, 22:56:24.443768
job: eb30612 status: COMPLETED - task: edbe509 - FAILED - FAILED - GEE failed status - -1 day, 23:47:29.203757
job: eb30612 status: COMPLETED - task: aa6a692 - COMPLETED - EXCLUDED - None - -1 day, 22:54:55.618582
job: eb30612 status: COMPLETED - task: a5bfa50 - COMPLETED - CANCELED - None - -1 day, 23:55:39.857025
job: eb30612 status: COMPLETED - task: 73e99fe - FAILED - FAILED - GEE failed status - -1 day, 23:54:38.367433


In [167]:
def update_job_report(conn, job_id):

    job = conn.execute(
        "SELECT * FROM jobs WHERE id=? LIMIT 1", (job_id,)
    ).fetchone()

    if not (job['job_status'] in ("COMPLETED", "FAILED") and job['report_status'] in ("PENDING")):
        print(f"skipping.")
        return

    print(f"Generating report for job {job_id}...")
    tasks = conn.execute(
        "SELECT * FROM exports WHERE job_id=? ORDER BY type, state", (job_id,)
    ).fetchall()

    full_job = {
        "job": dict(job),
        "tasks": dict(tasks)
    }
    try:
        print(dict(full_job)) #! Replace with actual report generation logic
        conn.execute(
            "UPDATE jobs SET report_status='COMPLETED', updated_at=? WHERE id=?",
            (db.datetime_to_iso(db.utc_now()), job_id),
        )
    except Exception as e:
        print(f"Error printing full job details: {e}")
        conn.execute(
            "UPDATE jobs SET report_status='FAILED', updated_at=? WHERE id=?",
            (db.datetime_to_iso(db.utc_now()), job_id),
        )

In [171]:
def _print_job(conn, job_id):
    job = conn.execute(
        "SELECT * FROM jobs WHERE id = ?", (job_id,)
    ).fetchone()
    job_dict = dict(job)
    job_dict['id'] = job['id'][0:7]
    print(job_dict)

In [170]:
# Orchestrate per job
with db.db(db_path) as conn:
    jobs = conn.execute("SELECT * FROM jobs WHERE job_status IN ('COMPLETED', 'FAILED')").fetchall()
    for job in jobs:
        poll_job_id = job["id"]
        print(f"##### {poll_job_id} #####")

        print("---- Before Report Generation ----")
        _print_job(conn, poll_job_id)

        update_job_report(conn, poll_job_id)

        print("---- After Report Generation ----")
        _print_job(conn, poll_job_id)

        print()


##### eb306128-0eae-4ed8-9ea4-5c21baf1a9ec #####
---- Before Report Generation ----
{'id': 'eb30612', 'job_status': 'COMPLETED', 'image_export_status': 'COMPLETED', 'stats_export_status': 'COMPLETED', 'report_status': 'COMPLETED', 'email_to': None, 'error': None, 'created_at': '2025-08-20T23:49:04.612341+00:00', 'updated_at': '2025-08-21T01:11:09.657679+00:00'}
skipping.
---- After Report Generation ----
{'id': 'eb30612', 'job_status': 'COMPLETED', 'image_export_status': 'COMPLETED', 'stats_export_status': 'COMPLETED', 'report_status': 'COMPLETED', 'email_to': None, 'error': None, 'created_at': '2025-08-20T23:49:04.612341+00:00', 'updated_at': '2025-08-21T01:11:09.657679+00:00'}

