# PGCView Image Prediction Pipeline

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
%cd /content/drive/MyDrive/

/content/drive/MyDrive


# Library Imports and Endpoint Configuration
Do not modify these cells, just run them

In [20]:
# Imports
import requests
import cv2
import polars as pl
import pandas as pd
import numpy as np
from tqdm import tqdm
import asyncio
import aiohttp
import time
import glob
import csv

In [10]:
# Set endpoint URLs
STATUS_ENDPOINT = 'https://pgcview.org/api/v1/status'
MARKER_ENDPOINT = 'https://pgcview.org/api/v1/predict_markers'
PGC_ENDPOINT = 'https://pgcview.org/api/v1/predict_pgc'


In [5]:
def invoke_endpoints(path, **kwargs):
  """
  Invoke the marker and pgc prediction endpoints and return the results
  """
  # Make the marker model request
  files_body = {'file': open(path, 'rb')}
  marker_response = requests.post(MARKER_ENDPOINT, files=files_body, **kwargs)

  # Reopen img as binary
  files_body = {'file': open(path, 'rb')}
  pgc_response = requests.post(PGC_ENDPOINT, files=files_body, **kwargs)

  # Format responses
  filename = marker_response.json()['filename']
  marker_data = {
      'coordinates': np.array(marker_response.json()['data']['coordinates']),
      'classes': np.array(marker_response.json()['data']['classes'])
  }
  pgc_data = np.array(pgc_response.json()['data'])

  return filename, marker_data, pgc_data


In [7]:
# Get server status
try:
  response = requests.get(STATUS_ENDPOINT)
  print(response.json())
except Exception as e:
  print(e)

Expecting value: line 1 column 1 (char 0)


In [None]:
# Def async API calls
async def fetch_markers(session, url):
    async with session.post(url, ) as response:
        # Await and parse the response as JSON
        result = await response.json()
        print(result)  # Print the result to the console
        return result

async def write_to_csv(data, key):
    # Open the CSV file in append mode
    with open('status_results.csv', mode='a', newline='') as file:
        writer = csv.writer(file)
        # Append the data corresponding to the specified key
        writer.writerow([data[key]])

async def call_api_n_times(url, key, images):
    async with aiohttp.ClientSession() as session:
        for _ in range(n):
            result = await fetch_status(session, url)
            await write_to_csv(result, key)

# Example usage: Call the API 10 times and store the results of 'status_key' in a CSV file
url = 'https://pgcview.org/api/v1/status'
key = 'server_name'  # Replace with the actual key from the API response you want to save

# Run the async loop to make the API calls and write to CSV
await call_api_n_times(url, key)


In [None]:
# Define Async API calls

CONCURRENCY_LIMIT = 4
semaphore = asyncio.Semaphore(CONCURRENCY_LIMIT)

async def invoke_prediction_endpoint(endpoint, session, files, path):
  """
  POST an image binary stream to an endpoint and return the results as JSON.
  """
  async with session.post(endpoint, data=files) as response:
    # Call endpoint and await response
    print(f'calling {endpoint} with {path}')
    response_json = await response.json()

    return response_json


async def hit_prediction_endpoints(image_path):
  """
  POST an image binary stream to the marker and pgc prediction endpoints and return the results as JSON.
  """
  async with aiohttp.ClientSession() as session:
    # Open the file in binary mode
    with open(image_path, 'rb') as f:
      file_content = f.read()
    # Call MARKER_ENDPOINT
    files = {'file': file_content}
    async with semaphore:
      marker_response = await invoke_prediction_endpoint(
          endpoint=MARKER_ENDPOINT,
          session=session,
          files=files,
          path=image_path
      )
    coordinates = np.array(marker_response['data']['coordinates'])
    classes = np.array(marker_response['data']['classes'])

    del marker_response

    # Call PGC_ENDPOINT
    files = {'file': file_content}
    async with semaphore:
      pgc_response = await invoke_prediction_endpoint(
          endpoint=PGC_ENDPOINT,
          session=session,
          files=files,
          path=image_path
      )
    pgc_array = np.array(pgc_response['data'])

    del pgc_response

    return coordinates, classes, pgc_array
    return pgc_array




async def main(images):
    # List of coroutines to process each image asynchronously
    tasks = [hit_prediction_endpoints(image_path) for image_path in images]

    # Run tasks concurrently
    results = await asyncio.gather(*tasks)

    return results

# def start_async_tasks(images):
#     # Instead of asyncio.run(), we use asyncio.create_task() or ensure_future()
#     loop = asyncio.get_event_loop()
#     loop.create_task(main(images))

# Function to time the async execution
# async def time_async_execution(images, endpoint):
#     start_time = time.time()
#     await main(images)
#     elapsed_time = time.time() - start_time
#     print(f"Execution time: {elapsed_time:.4f} seconds")

# Example usage
images = ['IMG_5291.PNG']*10
images = glob.glob('test_api_images/*')
# PGC_ENDPOINT = 'https://pgcview.org/api/v1/predict_pgc'  # Replace with your actual endpoint URL

# Run the async function to send the image and get the response
# await send_image_and_get_response(image_path, MARKER_ENDPOINT)
# await time_async_execution(images, PGC_ENDPOINT)

await main(images)