In [1]:
import os
import re
import json
import time
import datetime
import random

import requests
from io import BytesIO

import numpy as np
import pandas as pd
from PIL import Image

from Download_CoralNet import *

In [2]:
def get_token(username, password):
    """
    Retrieves a CoralNet authentication token for API requests.

    Returns:
        tuple: A tuple containing the CoralNet token and request headers for authenticated requests.

    Raises:
        ValueError: If authentication fails.
    """

    # Requirements for authentication
    CORALNET_AUTH = CORALNET_URL + "/api/token_auth/"
    HEADERS = {"Content-type" : "application/vnd.api+json"}
    PAYLOAD =  {"username": username, "password": password}

    # Response from CoralNet when provided credentials
    response = requests.post(CORALNET_AUTH,
                             data=json.dumps(PAYLOAD),
                             headers=HEADERS)

    if response.ok:

        print("NOTE: Successful authentication")

        # Get the coralnet token returned to the user
        CORALNET_TOKEN = json.loads(response.content.decode())['token']

        # Update the header to contain the user's coralnet token
        HEADERS = {"Authorization": f"Token {CORALNET_TOKEN}",
                   "Content-type": "application/vnd.api+json"}

    else:
        raise ValueError(f"ERROR: Could not authenticate\n{response.content}")

    return CORALNET_TOKEN, HEADERS


def in_N_seconds(wait):
    """
    Calculate the time in N seconds from the current time.

    Args:
    - wait: an integer representing the number of seconds to wait

    Returns:
    - A string representing the time in `wait` seconds from the current time, in the format "HH:MM:SS"
    """
    now = datetime.datetime.now()
    then = now + datetime.timedelta(seconds=wait)
    return then.strftime("%H:%M:%S")


def check_expiration(url):
    """
    Calculates the time remaining before a URL expires, based on its "Expires" timestamp.

    Args:
    url (str): The URL to check.

    Returns:

    """

    valid = False
    time_remaining = 0

    try:

        # Extract expiration timestamp from URL
        match = re.search(r"Expires=(\d+)", url)

        if match:
            expiration = int(match.group(1))

            # Calculate time remaining before expiration
            now = int(time.time())
            time_remaining = expiration - now
        else:
            raise ValueError(f"ERROR: Could not find expiration timestamp in URL\n{url}")

    except Exception as e:
        print(f"{e}")

    # Check the amount of time remaining
    if time_remaining >= 60:
        valid = True

    return valid


def sample_points_for_url(url, num_samples=200, method='stratified'):
    """
    Generates a set of sample coordinates within a given image size.
    
    Parameters:
    ----------
    width : int
        The width of the image.
    height : int
        The height of the image.
    num_samples : int, optional
        The number of samples to generate. Default is 200.
    method : str, optional
        The method to use for generating samples. Valid values are:
        - 'uniform': generates samples using uniform sampling
        - 'random': generates samples using random sampling
        - 'stratified': generates samples using stratified sampling (default)
    
    Returns:
    -------
    tuple
        A tuple containing three elements:
        - A numpy array of x-coordinates of the generated samples.
        - A numpy array of y-coordinates of the generated samples.
        - A list of dictionaries containing row and column coordinates of the generated samples.
    """
    
    if not check_expiration(url):
        raise Exception(f"ERROR: URL is expiring soon; skipping.\n{url}")
        
    else:
        # Request the image from AWS
        response = requests.get(url)

        # Read it to get the size
        img = Image.open(BytesIO(response.content))
        width, height = img.size
        
        x_coordinates = []
        y_coordinates = []
        samples = []

        if method == 'uniform':
            x_coords = np.linspace(0, width-1, int(np.sqrt(num_samples)))
            y_coords = np.linspace(0, height-1, int(np.sqrt(num_samples)))
            for x in x_coords:
                for y in y_coords:
                    x_coordinates.append(int(x))
                    y_coordinates.append(int(y))
                    samples.append({'row': int(y), 'column': int(x)})

        elif method == 'random':
            for i in range(num_samples):
                x = random.randint(0, width-1)
                y = random.randint(0, height-1)
                x_coordinates.append(x)
                y_coordinates.append(y)
                samples.append({'row': y, 'column': x})

        elif method == 'stratified':
            n = int(np.sqrt(num_samples))
            x_range = np.linspace(0, width-1, n+1)
            y_range = np.linspace(0, height-1, n+1)
            for i in range(n):
                for j in range(n):
                    x = np.random.uniform(x_range[i], x_range[i+1])
                    y = np.random.uniform(y_range[j], y_range[j+1])
                    x_coordinates.append(int(x))
                    y_coordinates.append(int(y))
                    samples.append({'row': int(y), 'column': int(x)})

        x = np.array(x_coordinates).astype(int)
        y = np.array(y_coordinates).astype(int)

    return x, y, samples


def check_job_status(response):
    """
    Sends a request to retrieve the completed annotations and returns the status update.
    
    Parameters:
    ----------
    response : requests.Response
        A Response object returned from a previous request to CoralNet API.
    
    Returns:
    -------
    dict
        A dictionary containing status information, which includes the following keys:
        - 'status': a string indicating the current status of the job, such as "in progress" or "completed"
        - 'message': a string providing additional details about the job status, if available
    """
    
    # Sends a request to retrieve the completed annotations, obtains status update
    status = requests.get(url=f"https://coralnet.ucsd.edu{response.headers['Location']}", 
                      headers={"Authorization": f"Token {CORALNET_TOKEN}"})
    
    current_status = json.loads(status.content) 
    wait = 1

    if status.ok:
        
        # Still in progress
        if 'status' in current_status['data'][0]['attributes'].keys(): 

            s = current_status['data'][0]['attributes']['successes'] 
            f = current_status['data'][0]['attributes']['failures'] 
            t = current_status['data'][0]['attributes']['total']
            status_str = current_status['data'][0]['attributes']['status'] 
            ids = current_status['data'][0]['id'].split(",")
            ids = ''.join(str(_) for _ in ids)

            now = time.strftime("%H:%M:%S")

            message = f"Status: {status_str} \tID: {ids} \tTime: {now}"

        else:
            # Completed
            message = "Completed Job"
            
    else:
        # CoralNet is getting too many requests, sleep for a second.
        message = f"CoralNet: {current_status['errors'][0]['detail']}"
        try:
            # Try to wait the amount of time requested by CoralNet
            match = re.search(r'\d+', message)
            wait = int(match.group())
        except:
            wait = 30

    return current_status, message, wait


def print_job_status(queue, active, completed, expired):
    """
    Print the current status of jobs and images being processed.

    Args:
    - queued_jobs (list): A list of jobs that are currently queued.
    - active_jobs (list): A list of jobs that are currently active.
    - completed_jobs (list): A list of jobs that have been completed.
    - expired_images (list): A list of images that need updated URLs.

    Returns:
    - None
    """
    print(f"JOBS: Queued: {len(queue)} \t"
          f"Active: {len(active)} \t"
          f"Completed: {len(completed)} \t"
          f"Expired: {len(expired)}")


def convert_to_csv(response, image_name, output_dir):
    """
    Converts response data into a Pandas DataFrame and concatenates each row into a single DataFrame.
    
    Parameters:
    ----------
    response : dict
        A dictionary object containing response data from a server.
    image_file : str
        The name of the image file corresponding to the response data.
    
    Returns:
    -------
    model_predictions : pandas.DataFrame
        A Pandas DataFrame containing prediction data, where each row represents a single point in the image.
        The columns of the DataFrame include 'image', 'X', 'Y', 'score_*', 'label_id_*', 'label_code_*', and 'label_name_*'.
        The asterisk (*) in the column names represents the index of the classification for each point, starting at 1.
    """
    
    model_predictions = pd.DataFrame()

    for point in response['data'][0]['attributes']['points']:

        per_point = dict()
        per_point['image'] = image_name
        per_point['X'] = point['column']
        per_point['Y'] = point['row']

        for index, classification in enumerate(point['classifications']):

            per_point['score_' + str(index + 1)] = classification['score']
            per_point['label_id_' + str(index + 1)] = classification['label_id']
            per_point['label_code_' + str(index + 1)] = classification['label_code']
            per_point['label_name_' + str(index + 1)] = classification['label_name']

        model_predictions = pd.concat([model_predictions, pd.DataFrame.from_dict([per_point])])
        
    basename = os.path.basename(image_name).split(".")[0]
    output_file = output_dir + basename + ".csv"
    model_predictions.reset_index(drop=True, inplace=True)
    model_predictions.to_csv(output_file, index=True)
    
    if os.path.exists(output_file):
        print(f"NOTE: Predictions for {basename} saved successfully")
    else:
        print(f"ERROR: Could not save predictions for {basename}")
    
    return model_predictions

In [3]:
try:
    # Username and password provided by user
    USERNAME = os.getenv("CORALNET_USERNAME")
    PASSWORD = os.getenv("CORALNET_PASSWORD")

    # Verify that the username and password are valid
    CORALNET_TOKEN, HEADERS = get_token(USERNAME, PASSWORD)

except Exception as e:
    print(e)
    sys.exit()

# Desired source provided by user
SOURCE_ID = str(4006)

# Set the data root
DATA_ROOT = "C://Users/jordan.pierce/Documents/GitHub/CoralNet_Tools/CoralNet_Data/4006/"
OUTPUT_PREDICTIONS = DATA_ROOT + "predictions/"

# Create a folder to contain predictions and points
os.makedirs(OUTPUT_PREDICTIONS, exist_ok=True)

NOTE: Successful authentication


In [4]:
# Variables for the model
metadata = get_model_meta(SOURCE_ID, USERNAME, PASSWORD)
MODEL_ID = metadata['Model_ID'][0]
MODEL_URL = CORALNET_URL + f"/api/classifier/{MODEL_ID}/deploy/"

# Images associated with the source
images = get_images(SOURCE_ID, USERNAME, PASSWORD)

Downloading Metadata...
Crawling for Images...


In [5]:
# Desired images
images = images.sample(3, replace=False)
images

Unnamed: 0,image_name,image_page,image_url
7,mcr_lter1_fringingreef_pole1-2_qu8_20080415.jpg,https://coralnet.ucsd.edu/image/3370050/view/,https://coralnet-production.s3.amazonaws.com:4...
3,mcr_lter1_fringingreef_pole1-2_qu4_20080415.jpg,https://coralnet.ucsd.edu/image/3370046/view/,https://coralnet-production.s3.amazonaws.com:4...
17,mcr_lter1_fringingreef_pole3-4_qu2_20080415.jpg,https://coralnet.ucsd.edu/image/3370060/view/,https://coralnet-production.s3.amazonaws.com:4...


In [6]:
# Jobs that are currently queued
queued_jobs = []
queued_images = []
# Jobs that are currently active
active_jobs = []
active_images = []
# Jobs that are completed
completed_jobs = []
completed_images = []
# A list that contains just the images that need updated urls
expired_images = []
# Flag to indicate if all images have been processed
finished = False
# The amount of time to wait before checking the status of a job
patience = 75

# This will continue looping until all images have been processed
while not finished:

    # Print the status of the jobs
    print_job_status(queued_jobs, active_jobs, completed_jobs, expired_images)

    # Looping through each image requested, sample points, add to queue
    for index, row in images.iterrows():
        # If this image has already been sampled, skip it.
        if row['image_name'] in queued_images + active_images + completed_images:
            print(f"Image {row['image_name']} has already been sampled; skipping")
            continue # Skip to the next image within the current for loop

        try:
            # Sample points from image
            x, y, points = sample_points_for_url(row['image_url'], 200)
            print(f"NOTE: Sampled {len(points)} points for {row['image_name']}")
        except:
            # The image url expired, so we need to resample points again later
            expired_images.append(row['image_name'])
            print(f"WARNING: Could not sample points; {row['image_name']} expired.")
            continue # Skip to the next image within the current for loop

        # Create a payload for the current image
        payload = {}
        payload['data'] = [{"type": "image",
                            "attributes":
                             {
                                "name": row['image_name'],
                                "url" : row['image_url'],
                                "points": points
                              },
                          }]

        job = {
                "image_name": row['image_name'],
                "model_url": MODEL_URL,
                "data": json.dumps(payload, indent=4),
                "headers": HEADERS
              }

        queued_jobs.append(job)
        queued_images.append(row['image_name'])
        print(f"NOTE: Added {row['image_name']} to queue")

    # Print the status of the jobs
    print_job_status(queued_jobs, active_jobs, completed_jobs, expired_images)

    # Start uploading the queued jobs to CoralNet if there are
    # less than 5 active jobs, and there are more in the queue.
    # If there are no queued jobs, this won't need to be entered.
    while len(active_jobs) < 5 and len(queued_jobs) > 0:

        for job in queued_jobs:
            # Flag to determine if a job needs to be removed from the queue
            remove_from_queue = False

            # Break when active gets to 5
            if len(active_jobs) >= 5:
                print("NOTE: Maximum number of active jobs reached; checking status")
                break # Breaks from both loops, since the while loop condition is met

            # Upload the image and the sampled points to CoralNet
            print(f"NOTE: Attempting to upload {job['image_name']}")
            # Sends the requests to the `source` and in exchange, receive
            # a message telling if it was received correctly.
            response = requests.post(url=job["model_url"],
                                     data=job["data"],
                                     headers=job["headers"])
            if response.ok:
                # If it was received, add to the current active jobs queue
                print(f"NOTE: Successfully uploaded {job['image_name']}")
                active_jobs.append(response)
                active_images.append(job['image_name'])

                # If the image was previously in expired, remove.
                if job['image_name'] in expired_images:
                    expired_images.remove(job['image_name'])
                    print(f"Removed {job['image_name']} from expired")

                # Marked to be removed from the queued jobs list
                remove_from_queue = True
            else:
                # There was an error uploading to CoralNet; get the message
                message = json.loads(response.text)['errors'][0]['detail']
                print(f"CoralNet: {message}")
                if "5 jobs active" in message:
                    print(f"NOTE: Will attempt again at {in_N_seconds(patience)}")
                    time.sleep(patience)

                else:
                    # Assumed that the image has expired; add to expired list.
                    print(f"ERROR: Failed to upload {job['image_name']}; added to expired")
                    expired_images.append(job['image_name'])

                    # Marked to be removed from the queued jobs list
                    remove_from_queue = True

            # Only if the job was successfully uploaded, or expired, remove
            # from the queued jobs list. This won't be reached if there were
            # any active jobs from before.
            if remove_from_queue:
                queued_jobs.remove(job)
                queued_images.remove(job['image_name'])
                print(f"NOTE: Removed {job['image_name']} from queue")

    # Check the status of the active jobs
    print_job_status(queued_jobs, active_jobs, completed_jobs, expired_images)

    # Check the status of the active jobs, break when another can be added
    while len(active_jobs) <= 5 and len(active_jobs) != 0:

        # Sleep before checking status again
        print(f"NOTE: Checking status again at {in_N_seconds(patience)}")
        time.sleep(patience)

        # Loop through the active jobs
        for (job, image_name) in list(zip(active_jobs, active_images)):
            # Check the status of the current job
            current_status, message, wait = check_job_status(job)
            print(message); time.sleep(wait)
            # Current job has finished, output the results, remove from queue
            if message == "Completed Job":
                print(f"NOTE: {message} for {image_name}")
                # Convert to csv, and save locally
                convert_to_csv(current_status, image_name, OUTPUT_PREDICTIONS)
                # Add to completed jobs list
                completed_jobs.append(current_status)
                completed_images.append(image_name)
                # Remove from active jobs list
                active_jobs.remove(job)
                active_images.remove(image_name)

        # After checking the current status, break if another can be added
        # Else wait and check the status of the active jobs again.
        if len(active_jobs) < 5 and len(queued_jobs) > 0:
            print(f"NOTE: Active jobs is {len(active_jobs)}; adding another.")
            break

    # If there are no queued jobs, and no active jobs, but there are images in
    # expired, resample points, and add to the queue. This gets just the AWS
    # URL for the expired images and updates the image dataframe.
    if not queued_jobs and not active_jobs and expired_images:
        print("NOTE: Updating expired images' URL")
        # Get the subset of images dataframe containing only the expired images
        images = images[images['image_name'].isin(expired_images)]
        new_urls = []
        for i, r in images.iterrows():
            new_urls.append(get_image_url(r['image_page'], USERNAME, PASSWORD))
        # Store the new urls in the subset of images dataframe
        images['image_url'] = new_urls

    # Check to see everything has been completed, breaking the loop
    if not queued_jobs and not active_jobs and not expired_images:
        print("NOTE: All images have been processed; exiting loop.")
        finished = True

JOBS: Queued: 0 	Active: 0 	Completed: 0 	Expired: 0
NOTE: Sampled 196 points for mcr_lter1_fringingreef_pole1-2_qu2_20080415.jpg
NOTE: Added mcr_lter1_fringingreef_pole1-2_qu2_20080415.jpg to queue
NOTE: Sampled 196 points for mcr_lter1_fringingreef_pole2-3_qu7_20080415.jpg
NOTE: Added mcr_lter1_fringingreef_pole2-3_qu7_20080415.jpg to queue
NOTE: Sampled 196 points for mcr_lter1_fringingreef_pole3-4_qu1_20080415.jpg
NOTE: Added mcr_lter1_fringingreef_pole3-4_qu1_20080415.jpg to queue
NOTE: Sampled 196 points for mcr_lter1_fringingreef_pole1-2_qu7_20080415.jpg
NOTE: Added mcr_lter1_fringingreef_pole1-2_qu7_20080415.jpg to queue
NOTE: Sampled 196 points for mcr_lter1_fringingreef_pole4-5_qu2_20080415.jpg
NOTE: Added mcr_lter1_fringingreef_pole4-5_qu2_20080415.jpg to queue
NOTE: Sampled 196 points for mcr_lter1_fringingreef_pole1-2_qu1_20080415.jpg
NOTE: Added mcr_lter1_fringingreef_pole1-2_qu1_20080415.jpg to queue
NOTE: Sampled 196 points for mcr_lter1_fringingreef_pole1-2_qu8_2008041