## Initial Setup

In [1]:
#upload drive to batch process
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
pip install openai pillow

In [None]:
#API Key
api_key = "YOUR-API-KEY-HERE"

In [None]:
batch_directory = '/folder/of/images/to/process'
test_image_path = '/optional/test/img/path'
is_interior = True #if you are processing interior images, set this to True, else set it to False

In [None]:
category_options = "[wall, floor, ceiling, column, beam, window, door, countertop, shelf, equipment, furniture, other]" #there is a balancing-act with the level of detail with these categories. Sometimes fewer options is better for GPT.
material_options = "[concrete, metal, wood, brick, glass, quartz, granite, stone, gypsum, plastic, tile, carpet, other]"

ext_category_options = "[building, other]"
ext_material_options = "[concrete, metal, wood, brick, glass, stone, plastic, other]"

In [6]:
import openai
import base64
import requests
import json
import os
import os.path
import re
import time

In [7]:
client = openai.OpenAI(api_key=api_key)

## Defs

In [8]:
def build_prompt(is_interior:bool, additional_context:str):
  return (
    "You are an architect and possess deep knowledge of the built world. You understand how buildings go together and the materials that constitute them."
    "Your job is to help your client identify the parts of their building and what they are made of."
    f"Attached is an image of {'an interior room' if is_interior else 'a building exterior'}. "
    f"{additional_context}"
    "There is a single red outline around a segmented region of the image. "
    "Your response to me will be in the form of a plus-separated (PSV) line of text with the following format: {description}+{family}+{category}+{material} "
    "First, write a text description, of what the region is enclosing."
    "Make this as detailed as possible, including the types of things inside the boundary and what materials they are made of. Pay attention to textures as these may provide clues."
    "Remember that your description should only describe what is inside the outline, not things that are nearby it."
    "Record this in the {description} field of the PSV. "
    "Next, based on your text description, determine if this is attached to the building or not. Examples of things that are not attached are furniture and plants."
    "Record your answer to this as either 'attached' or 'loose' in the {family} field of the PSV."
    "Next, based on your text description, determine which one of the following categories the text description fits in, "
    f"only choose one: {category_options if is_interior else ext_category_options}. Do not choose anything that is not on this list."
    "Record this in the {category} field of the PSV. "
    "Next, based on your original text description, determine which of the following materials the text description fits in, "
    f"only choose one: {material_options if is_interior else ext_material_options}. Do not choose anything that is not on this list."
    "Record this in the {material} field of the PSV. "
    "Remember, do not include any other words/characters in your response except those that fill in the plus-separate line as described above"
  )

In [None]:
# Encode image
def encode_image(image_path):
  with open(image_path, "rb") as image_file:
    return base64.b64encode(image_file.read()).decode('utf-8')
  
def classify_image(image_path, is_interior, additional_context=""):

  headers = {
  "Content-Type": "application/json",
  "Authorization": f"Bearer {api_key}"
  }

  payload = build_payload(image_path, is_interior, additional_context)

  response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)

  response_data = response.json()
  # Response printed as  a list here
  #response_list = json.dumps(response_data, indent=4)
  #print(response_list)
  return response_data

def build_batch_call_line(custom_id, image_path, is_interior, additional_context=""):
  line = {
      "custom_id": f"{custom_id}", "method": "POST", "url": "/v1/chat/completions", "body": build_payload(image_path, is_interior, additional_context)
  }
  return line

def build_payload(image_path, is_interior, additional_context=""):
  # base64 string
  base64_image = encode_image(image_path)
  text_prompt = build_prompt(is_interior, additional_context)

  payload = {
    "model": "gpt-4o",
    "messages": [
      {
        "role": "user",
        "content": [
          {
            "type": "text",
            "text": text_prompt,
          },
          {
            "type": "image_url",
            "image_url": {
              "url": f"data:image/jpg;base64,{base64_image}",
            }
          },
          # {
          #   "type": "image_url",
          #   "image_url": {
          #     "url": f"data:image/png;base64,{base64_image_02}",
          #   }
          # },
        ],
      },
    ],
    "max_tokens": 500
  }

  return payload

def extract_response_content(complete_response):
  return complete_response['choices'][0]['message']['content']

def batch_process(dir, is_interior, additional_context=""):
  files = get_ordered_files(dir)

  responses = {}
  i = 1
  c = len(files)
  for file in files:
    r = classify_image(file, is_interior, additional_context)
    responses[file] = r 
    print(f"Completed image {i} of {c}")
    i+=1

  return responses

def get_ordered_files(dir):
  files = []
  for file in os.listdir(dir):
    if file.endswith(".jpg"):
        files.append(os.path.join(dir, file))
  files.sort(key=sorting_name)
  return files

def sorting_name(image_path:str):
  parts = image_path.split('.')
  if (len(parts) < 2):
    return image_path
  wo_ext = parts[len(parts) - 2]
  parts = wo_ext.split('/')
  length = len(parts)
  if (length == 0):
    return image_path
  index = parts[length - 2]
  if not index.isdigit():
    return image_path
  return int(index)

def build_batch_jsons(main_dir, is_interior, start_idx):
  subdirs = next(os.walk(main_dir))[1]
  batch_jsonl_files = {}
  c = len(subdirs)
  for i, dir in enumerate(subdirs):
    if i < start_idx:
      continue
    json_lines = []
    additional_context = ""
    if "_i0_" in dir:
      additional_context = "This view is facing diagonally upwards."
    elif "_i2_" in dir:
      additional_context = "This view is facing diagonally downwards."
    image_dir = f"{batch_directory}/{dir}/outlines"
    image_files = get_ordered_files(image_dir)
    for image in image_files:
      line = build_batch_call_line(image, image, is_interior, additional_context)
      json_lines.append(line)
    path = f"{batch_directory}/BatchRequest_{dir}.jsonl"
    with open(path, "w") as json_file:#"w" option will overwrite anything that is there if it exists.
      for json_obj in json_lines:
        json_file.write(json.dumps(json_obj) + '\n')
    batch_jsonl_files[dir] = path
    print(f"Saved request jsonl file {i + 1} of {c} at {path}")

  return batch_jsonl_files


## Get Files

In [None]:
subdirs = next(os.walk(batch_directory))[1]
print(subdirs)
for dir in subdirs:
  files = get_ordered_files(f"{batch_directory}/{dir}")
  for f in files:
    print(f)

In [None]:
print(len(subdirs))
for dir in subdirs:
  print(dir)

# Test Image

In [None]:
test_result = classify_image(test_image_path, True)

In [None]:
print(test_image_path)
print(test_result)

# Batch JSON file

In [None]:
start_idx = 0 #this is to use if for some reason a previous run failed partially, like if the colab session got disconnected, it will start making batch request files at this index of the list
batch_jsonl_files = build_batch_jsons(batch_directory, is_interior, start_idx)

In [None]:
print(len(batch_jsonl_files))

189


# Alternate workflow in cases of disconnected runtimes
You can use this to restart classification if the previous runtime disconnected. Make sure you copy the batch_ids.txt and image classifications csv with unique names so they are not overwritten when you rerun classification. You should also use the batch_ids.txt file that was previously-generated to retrieve the old results from OpenAI, if you have not already done this.

In [None]:
batch_jsonl_files = {} #clear the dictionary if anything is already in it, for this process, or create it new
with open(f"{batch_directory}/batch_ids_batch_1.txt") as previous_batch_ids:
  previously_processed_file_count = len(previous_batch_ids.readlines())
  expected_additional_file_count = len(subdirs) - previously_processed_file_count
  print(f"Total batch count: {len(subdirs)}")
  print(f"Previously processed batches: {previously_processed_file_count}")
  print(f"Expected additional batches: {expected_additional_file_count}")
with open(f"{batch_directory}/ImageClassifications_batch_1.csv") as previous_csv: #just make this match whatever you renamed the previous classifications CSV as.
  csv_string = previous_csv.read()
  found_all = True
  for dir in subdirs:
    if dir in csv_string: #is this directory already classified? if so, skip it
      continue
    path = f"{batch_directory}/BatchRequest_{dir}.jsonl"
    if not os.path.isfile(path):
      found_all = False
      print(f"ERROR: file not found: {path}")
      continue
    batch_jsonl_files[dir] = path
  print("Found all necessary JSONL files" if found_all else "ERROR: not all necessary JSONL files exist")
  print(f"Number of batches to process: {len(batch_jsonl_files)}")



# Actually Run the Batch Operation. Costs may add up.

In [None]:
batch_client_objects = {} #information about each asynchronous batch process, like ID, etc.
file_count = len(batch_jsonl_files)
log_path = f"{batch_directory}/batch_ids.txt"#logging the batch ids in case the process gets interrupted or instance is disconnected
open(log_path, "w").close() #clear the file if there is anything in it from before
for i, key in enumerate(batch_jsonl_files):
  #https://platform.openai.com/docs/guides/batch/getting-started
  jsonl_file = batch_jsonl_files[key]
  batch_input_file = client.files.create(
      file = open(jsonl_file, "rb"),
      purpose = "batch"
  )

  batch_input_file_id = batch_input_file.id
  print(f"Preparing to send batch file {i + 1} of {file_count} for asynchronous processing")
  in_progress = False
  while not in_progress:
    batch_obj = client.batches.create(
        input_file_id=batch_input_file_id,
        endpoint="/v1/chat/completions",
        completion_window="24h",
        metadata={
            "description": "batch label process"
        }
    )

    time_inc = 10
    print(f"Sent batch file {i + 1} of {file_count} for asynchronous processing, validating")
    status = ""
    rtr_btc = None
    while not in_progress:
      print("waiting for validation...")
      time.sleep(time_inc)
      rtr_btc = client.batches.retrieve(batch_obj.id)
      status = rtr_btc.status
      if status == "validating":
        continue
      if status == "failed":
        break
      in_progress = True

    if not in_progress:
      print(f"Failed to send batch file {i + 1} of {file_count} for asynchronous processing, errors: {rtr_btc.errors}, will attempt again...")
    else:
      batch_client_objects[key] = batch_obj
      with open(log_path, "a") as logfile:
        logfile.write(batch_obj.id + "\n")
      print(f"Successfully sent batch file {i + 1} of {file_count} for asynchronous processing, status = {status}")


# Retrieve/check status of request

In [None]:
batch_ids_file = open(f"{batch_directory}/batch_ids.txt", 'r')
batch_ids_lines = batch_ids_file.readlines()
batch_ids_file.close()

incomplete_batches = 0
response_jsonl_paths = []
for id_line in batch_ids_lines:
  id = id_line.strip() #remove newline character
  print(id)
  retrieved_data = client.batches.retrieve(id)
  if retrieved_data.status != "completed":
    incomplete_batches += 1
  try:
    retrieved_text = client.files.content(retrieved_data.output_file_id).text
  except:
    print("error retreiving file, file id is likely invalid")
    continue
  print(retrieved_text)
  path = f"{batch_directory}/BatchRequest_Retrieval_{id}.jsonl"
  with open(path, "w") as json_file:
    json_file.write(retrieved_text)
  response_jsonl_paths.append(path)

print("All batches complete" if incomplete_batches == 0 else f"Not all batches complete (awaiting {incomplete_batches})")

Output hidden; open in https://colab.research.google.com to view.

# Save Results to CSV

In [23]:
def cleanup_text(text):
  parts = re.split('\n|,', text)#clean up GPT's responses
  fixed = ""
  for p in parts:
    fixed += p
  return fixed

In [None]:
csv = ""
for path in response_jsonl_paths:
  #read lines from jsonL file as json objects
  retrieved_json_lines = []
  with open(path) as f:
    for line in f:
      retrieved_json_lines.append(json.loads(line))

  for retrieved_json_line in retrieved_json_lines:
    #first, cull empty/invalid lines
    if "custom_id" not in retrieved_json_line:
      continue
    img_path = retrieved_json_line["custom_id"]
    #print(img_path)
    try:
      response = retrieved_json_line['response']['body']['choices'][0]['message']['content']
      print(response)
      parts = response.split('+')
      desc = cleanup_text(parts[0])
      fam = cleanup_text(parts[1])
      cat = cleanup_text(parts[2])
      mat = cleanup_text(parts[3])
      csv += img_path + "," + desc + "," + fam + "," + cat + "," + mat + "\n"
    except:
      csv+= img_path + ",ERROR,ERROR,ERROR," + str(retrieved_json_line) + "\n"

print(csv)
csv_file = open(f"{batch_directory}/ImageClassifications.csv", "w") #"w" option will overwrite anything that is there if it exists.
csv_file.write(csv)
csv_file.close()