# API Gateway Demo

In [None]:
uploads_bucket_name = "ai-description-dev-nt-isngd-381491992967-uploads"
api_url = "https://3jkdst30r0.execute-api.us-east-1.amazonaws.com/dev/"
job_name = "job_001"

## Populate bucket

In [None]:
import os
import json

import boto3

def populate_bucket(bucket_name: str, image_fpath: str) -> tuple[str, str, str]:
    # Initialize S3 client
    s3_client = boto3.client('s3')
    
    # Upload image file
    image_filename = os.path.basename(image_fpath)
    image_key = f"images/{image_filename}"
    s3_client.upload_file(image_fpath, bucket_name, image_key)
    # Create image S3 URI
    image_s3_uri = f"s3://{bucket_name}/{image_key}"
    
    # Create metadata
    original_metadata = {
        "title": "foo",
        "desription": "offensive image",
    }
    # Convert metadata to JSON
    original_metadata_json = json.dumps(original_metadata)
    # Upload metadata JSON file
    original_metadata_key = f"metadata/{os.path.splitext(image_filename)[0]}_metadata.json"
    s3_client.put_object(Body=original_metadata_json, Bucket=bucket_name, Key=original_metadata_key)
    # Create metadata S3 URI
    original_metadata_s3_uri = f"s3://{bucket_name}/{original_metadata_key}"

    # Convert context data to JSON
    context_str = "This document is old"
    # Upload context JSON file
    context_key = f"contexts/{os.path.splitext(image_filename)[0]}_context.txt"
    s3_client.put_object(Body=context_str, Bucket=bucket_name, Key=context_key)
    # Create context S3 URI
    context_s3_uri = f"s3://{bucket_name}/{context_key}"

    return image_s3_uri, original_metadata_s3_uri, context_s3_uri

image_s3_uri, original_metadata_s3_uri, context_s3_uri = populate_bucket(
    bucket_name=uploads_bucket_name,
    image_fpath="../projects/research/research_data/offensive_1.jpg",
)

## Create Jobs

In [None]:
import json

import requests


def create_dummy_job(
    api_url: str,
    job_name: str,
    job_type: str,
    original_metadata_s3_uri: str,
    context_s3_uri: str,
    image_s3_uri: str,
):

    # Construct the full URL
    api_url = api_url.rstrip("/")
    endpoint = f"{api_url}/create_job"

    # Headers
    headers = {"Content-Type": "application/json"}

    works = [
        {
            "work_id": f"{job_name}_short_work",
            "image_s3_uris": [image_s3_uri],
            "context_s3_uri": context_s3_uri,
            "original_metadata_s3_uri": original_metadata_s3_uri,
        },
        {
            "work_id": f"{job_name}_long_work",
            "image_s3_uris": [image_s3_uri, image_s3_uri, image_s3_uri, image_s3_uri],
            "context_s3_uri": context_s3_uri,
            "original_metadata_s3_uri": original_metadata_s3_uri,
        },
    ]
    request_body = {"job_name": job_name, "job_type": job_type, "works": works}

    try:
        # Make the POST request
        response = requests.post(endpoint, data=json.dumps(request_body), headers=headers)

        # Check if the request was successful
        if response.status_code == 200:
            # Parse the JSON response
            data = response.json()
            print("API Response:", data)
        else:
            print(f"Error: API request failed with status code {response.status_code}")
            print("Response:", response.text)

    except requests.exceptions.RequestException as e:
        print(f"An error occurred: {e}")


bias_job_name = f"{job_name}_bias"
create_dummy_job(
    api_url=api_url,
    job_name=bias_job_name,
    job_type="bias",
    original_metadata_s3_uri=original_metadata_s3_uri,
    context_s3_uri=context_s3_uri,
    image_s3_uri=image_s3_uri,
)

metadata_job_name = f"{job_name}_metadata"
create_dummy_job(
    api_url=api_url,
    job_name=metadata_job_name,
    job_type="metadata",
    original_metadata_s3_uri=original_metadata_s3_uri,
    context_s3_uri=context_s3_uri,
    image_s3_uri=image_s3_uri,
)

## Check Job Progress

In [None]:
import logging

import requests


def get_job_progress(api_url: str, job_name: str):
    """Query the job_progress endpoint with the given job_name.

    Args:
    api_url (str): The base URL of your API Gateway
    job_name (str): The name of the job to query

    Returns:
    dict: The JSON response from the API, or None if an error occurred
    """
    api_url = api_url.rstrip("/")
    # Construct the full URL
    endpoint = f"{api_url}/job_progress"

    # Set up the query parameters
    params = {"job_name": job_name}

    try:
        # Make the GET request
        response = requests.get(endpoint, params=params)

        # Check the status code
        if response.status_code == 200:
            return response.json()
        elif response.status_code == 404:
            logging.info(f"No data found for job_name: {job_name}")
            return response.json()
        else:
            response.raise_for_status()

    except requests.RequestException as e:
        logging.exception(f"An error occurred: {e}")


bias_progress = get_job_progress(
    api_url=api_url,
    job_name=bias_job_name,
)
for key, val in bias_progress.items():
    print(f"{key}: {val}")

metadata_progress = get_job_progress(
    api_url=api_url,
    job_name=metadata_job_name,
)
for key, val in metadata_progress.items():
    print(f"{key}: {val}")

## Get Job Results

In [None]:
import logging

import requests


def get_job_results(api_url: str, job_name: str, work_id: str):
    """
    Query the results endpoint with the given job_name and work_id.

    Args:
    api_url (str): The base URL of your API Gateway.
    job_name (str): The name of the job to query.
    work_id (str): The ID of the work within the job.

    Returns:
    dict: The JSON response from the API, or None if an error occurred
    """
    api_url = api_url.rstrip("/")
    # Construct the full URL
    endpoint = f"{api_url}/results"

    # Set up the query parameters
    params = {"job_name": job_name, "work_id": work_id}

    # Make the GET request
    response = requests.get(endpoint, params=params)

    # Check the status code
    if response.status_code == 200:
        return response.json()["item"]
    elif response.status_code == 404:
        msg = f"No data found for job_name={job_name} and work_id={work_id}"
        logging.exception(f"No data found for job_name={job_name} and work_id={work_id}")
        raise Exception(msg)
    else:
        response.raise_for_status()


item = get_job_results(api_url=api_url, job_name=bias_job_name, work_id=bias_progress["READY_FOR_REVIEW"][0])
for key, val in item.items():
    print(f"{key}: {val}")

item = get_job_results(api_url=api_url, job_name=bias_job_name, work_id=bias_progress["READY_FOR_REVIEW"][1])
for key, val in item.items():
    print(f"{key}: {val}")

item = get_job_results(api_url=api_url, job_name=metadata_job_name, work_id=metadata_progress["READY_FOR_REVIEW"][0])
for key, val in item.items():
    print(f"{key}: {val}")

item = get_job_results(api_url=api_url, job_name=metadata_job_name, work_id=metadata_progress["READY_FOR_REVIEW"][1])
for key, val in item.items():
    print(f"{key}: {val}")

## Update Job Results

In [None]:
import json

import requests


def update_job_results(api_url: str, job_name: str, work_id: str):

    # Construct the full URL
    api_url = api_url.rstrip("/")
    endpoint = f"{api_url}/results"

    # Headers
    headers = {"Content-Type": "application/json"}

    updated_fields = {"work_status": "REVIEWED"}
    request_body = {"job_name": job_name, "work_id": work_id, "updated_fields": updated_fields}

    try:
        # Make the POST request
        response = requests.post(endpoint, data=json.dumps(request_body), headers=headers)

        # Check if the request was successful
        if response.status_code == 200:
            # Parse the JSON response
            data = response.json()
            print("API Response:", data)
        else:
            print(f"Error: API request failed with status code {response.status_code}")
            print("Response:", response.text)

    except requests.exceptions.RequestException as e:
        print(f"An error occurred: {e}")


update_job_results(
    api_url=api_url,
    job_name=job_name,
    work_id=bias_progress["READY_FOR_REVIEW"][0],
)