<a href="https://colab.research.google.com/github/DevJGraham/vlm-image-to-description-generator/blob/main/google_vision_api.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Installs and Imports

In [None]:
!pip install -U google-cloud-vision google-auth google-auth-oauthlib -q

In [18]:
from google.cloud import vision
from google.cloud import storage

import google.auth
from google.colab import drive

from PIL import Image, ImageOps

import os
import re
import io

# Cloud Vision Setup: Drive Mount & GCP Auth in Colab
> Set `DIR` to your notebook's Drive path and `PROJECT_ID` to your GCP project ID before running

In [12]:
DIR = 'path-to-directory-in-drive'
PROJECT_ID = 'project-id'

In [None]:
drive.mount('/content/drive')

%cd "$DIR"

# Sign in to Google so the Cloud SDK in this VM can act as your user
from google.colab import auth
auth.authenticate_user()

# Point the CLI at the correct project
!gcloud config set project "$PROJECT_ID"

# Log in to my Google account and get back an authorization code which then the CLI exchanges for a short-term access token, and a log-term refresh token that can refressh the access token when it expires
!gcloud auth application-default login

# Attach a quota/billing project to those user ADC credentials
!gcloud auth application-default set-quota-project "$PROJECT_ID"

# Enable Vision API for the active CLI project
!gcloud services enable vision.googleapis.com

# Ensures the primary environment variable is unset so that the CLI will use the ADC instead of the primary env
# This ensures Python libs will actually use ADC (not a hard-coded service account key)

print("GOOGLE_APPLICATION_CREDENTIALS set?", "GOOGLE_APPLICATION_CREDENTIALS" in os.environ)
os.environ.pop("GOOGLE_APPLICATION_CREDENTIALS", None)

# Prints the active project and the account for the CLI to verify that the previous steps worked
!gcloud config list

# Fetches the default credentials and the project that the client will use
creds, proj = google.auth.default()
print("ADC project seen by client:", proj) # proj should be the project that we are working with

# Cloud Vision API Pipeline




In [5]:
storage_client = storage.Client()

In [6]:
# # This is needed to access the Google Cloud Storage bucket (already done for this code above)
# from google.colab import auth
# auth.authenticate_user()

In [21]:
class Cloud_Vision_Api:
  def __init__(self, client):
    self.client = client

  def get_labels(self, image_bytes):
    image = vision.Image(content=image_bytes)
    label_output = self.client.label_detection(image=image)

    return [label.description for label in label_output.label_annotations]

  def get_text(self, image_bytes):
    image = vision.Image(content=image_bytes)
    txt_output = self.client.document_text_detection(image=image)

    return [txt.description for txt in txt_output.text_annotations]

  def get_objects(self, image_bytes):
    image = vision.Image(content=image_bytes)
    object_output = self.client.object_localization(image=image)

    return [obj.name for obj in object_output.localized_object_annotations]

In [22]:
FEATURE_SIZES = {
    "text": 1024, "labels_objects": 640
}

def resize_for_api_feature(byte_data, api_feature):
    target = FEATURE_SIZES.get(str(api_feature).strip().lower())     # Get the size based on the api feature that is being called
    if target is None:
        raise ValueError(f"Unknown api_feature '{api_feature}'. Use one of: text or labels_objects")

    with Image.open(io.BytesIO(byte_data)) as im:                    # open bytes as PIL Image
        im = ImageOps.exif_transpose(im).convert("RGB")              # normalize (phone images) + RGB
        im.thumbnail((target, target), Image.Resampling.LANCZOS)     # resize into high res (LANCZOS) - only downscale
        # print(im.size)
        out = io.BytesIO()                                           # create BytesIO buffer to write bytes to
        im.save(out, format="JPEG", quality=90)                      # Save the PIL Image to the BytesIO buffer
        return out.getvalue()                                        # return resized byte data

In [23]:
# Create Image Annotator Client
client = vision.ImageAnnotatorClient()

# Create Cloud Vision api object passing in the client
api = Cloud_Vision_Api(client)

In [24]:
VALID_EXTS = (".jpg", ".jpeg", ".png") # Add more as needed

def parse_image_path(name):
  if not name.lower().endswith(VALID_EXTS):
    return None

  path = regex_capture.match(name)
  if not path:
    return None
  return int(path.group('item')), int(path.group('idx'))

In [25]:
bucket_name = 'auction-images-bucket'
bucket = storage_client.bucket(bucket_name)
auction = 'aypt2025-13'
prefix = auction + '/'

regex_capture = re.compile(rf"^{re.escape(prefix)}(?P<item>\d+)_(?P<idx>\d+)\.[^/\.]+$")

items = {} # Dictionary of items that will be filled dynamically
for blob in bucket.list_blobs(prefix=prefix):
  # Testing on a small sample rather than all 60 items
  if not blob.name.startswith('aypt2025-13/1'):
    continue
  # Validate path
  parsed = parse_image_path(blob.name)
  if not parsed:
    continue # If the path is invalid, skip to the next path
  item, idx = parsed
  image_bytes = blob.download_as_bytes()

  item_data = items.setdefault(item, {})
  text_data = item_data.setdefault('text', []) # The first time that the item is created, make 'text' have an empty list

  # Resize to recommended size for ocr extraction
  text_bytes = resize_for_api_feature(image_bytes, 'text')

  # Append text data for each image
  text = api.get_text(text_bytes)
  for t in text:
    text_data.append(t)

  if idx == 1:
    # Resize to recommended size for labels and objects
    labels_objects_bytes = resize_for_api_feature(image_bytes, 'labels_objects')

    # Add labels, object, and thumbnail path for the first image of the item
    labels = api.get_labels(labels_objects_bytes)
    objects = api.get_objects(labels_objects_bytes)

    item_data['labels'] = labels
    item_data['objects'] = objects
    item_data['thumbnail_path'] = blob.name

aypt2025_13_data = {auction: items}

In [27]:
aypt2025_13_data

{'aypt2025-13': {10: {'text': ['USA\nQUARANTEL\nTOOLS\nARMSTRONG\nRONG\n57\nARMSTRONG',
    'USA',
    'QUARANTEL',
    'TOOLS',
    'ARMSTRONG',
    'RONG',
    '57',
    'ARMSTRONG',
    'MAR\nSA ARMSTRON\nTOOK\nSUARANTEE\nSA ARMSTRONG\nEE TOOLS',
    'MAR',
    'SA',
    'ARMSTRON',
    'TOOK',
    'SUARANTEE',
    'SA',
    'ARMSTRONG',
    'EE',
    'TOOLS',
    '33-894',
    '33-894',
    '39.4 ARMSTRONG',
    '39.4',
    'ARMSTRONG',
    'E\nTHE\nE GUARAN\nTHE USA ARMSTR\nGUARANTEE TOOLS\nTHE USA ARMSTRONG LIE\nGUARANTEE TOOLS MAD\nHE USA ARMSTRONG LIFES\nJARANTEE TOOLS MADE\nSA ARMSTRONG LIFETI\nANTEE TOOLS MADE I\nMSTRONG LIFETIM\nIFETIME\nIN TH\nOLS MADE INT',
    'E',
    'THE',
    'E',
    'GUARAN',
    'THE',
    'USA',
    'ARMSTR',
    'GUARANTEE',
    'TOOLS',
    'THE',
    'USA',
    'ARMSTRONG',
    'LIE',
    'GUARANTEE',
    'TOOLS',
    'MAD',
    'HE',
    'USA',
    'ARMSTRONG',
    'LIFES',
    'JARANTEE',
    'TOOLS',
    'MADE',
    'SA',
    'ARMSTRONG',
  

# Workflow with dummy data
Code that creates a dummy dictionary with the correct thumbnail path, but useless data for `labels`, `objects`, and `text`

In [None]:
VALID_EXTS = (".jpg", ".jpeg", ".png") # Add more as needed

def parse_image_path(name):
  if not name.lower().endswith(VALID_EXTS):
    return None

  path = regex_capture.match(name)
  if not path:
    return None
  return int(path.group('item')), int(path.group('idx'))

In [None]:
bucket_name = 'auction-images-bucket'
bucket = storage_client.bucket(bucket_name)
auction = 'aypt2025-13'
prefix = auction + '/'

regex_capture = re.compile(rf"^{re.escape(prefix)}(?P<item>\d+)_(?P<idx>\d+)\.[^/\.]+$")

items = {} # Dictionary of items that will be filled dynamically
for blob in bucket.list_blobs(prefix=prefix):
  # Validate path
  parsed = parse_image_path(blob.name)
  if not parsed:
    print(f'Didn\'t parse {blob.name}')
    continue # If the path is invalid, skip to the next path
  item, idx = parsed
  # image_bytes = blob.download_as_bytes()

  item_data = items.setdefault(item, {})
  text_data = item_data.setdefault('text', []) # The first time that the item is created, make 'text' have an empty list

  if idx == 1:
    # labels = api.get_labels(image_bytes)
    # objects = api.get_objects(image_bytes)
    labels = ['sample', 'labels']
    objects = ['sample', 'objects']
    item_data['labels'] = labels
    item_data['objects'] = objects
    item_data['thumbnail_path'] = blob.name

  # text = api.get_text(image_bytes)
  text = ['sample'f'_text_{idx}']
  for t in text:
    text_data.append(t)
  # append text data to the text_list for the item number in the list

aypt2025_13_data = {auction: items}

Didn't parse aypt2025-13/
Didn't parse aypt2025-13/.DS_Store


In [None]:
aypt2025_13_data

{'aypt2025-13': {10: {'text': ['sample_text_1',
    'sample_text_2',
    'sample_text_3',
    'sample_text_4',
    'sample_text_5'],
   'labels': ['sample', 'labels'],
   'objects': ['sample', 'objects'],
   'thumbnail_path': 'aypt2025-13/10_1.jpg'},
  11: {'text': ['sample_text_1',
    'sample_text_2',
    'sample_text_3',
    'sample_text_4'],
   'labels': ['sample', 'labels'],
   'objects': ['sample', 'objects'],
   'thumbnail_path': 'aypt2025-13/11_1.jpg'},
  12: {'text': ['sample_text_1',
    'sample_text_2',
    'sample_text_3',
    'sample_text_4'],
   'labels': ['sample', 'labels'],
   'objects': ['sample', 'objects'],
   'thumbnail_path': 'aypt2025-13/12_1.jpg'},
  13: {'text': ['sample_text_1',
    'sample_text_2',
    'sample_text_3',
    'sample_text_4',
    'sample_text_5',
    'sample_text_6'],
   'labels': ['sample', 'labels'],
   'objects': ['sample', 'objects'],
   'thumbnail_path': 'aypt2025-13/13_1.jpg'},
  14: {'text': ['sample_text_1',
    'sample_text_2',
    'sam

In [None]:
bucket_name = "auction-images-bucket"

# Get bucket reference
bucket = storage_client.bucket(bucket_name)
auction = 'aypt2025-13/' # used as prefix

# List all blobs (files) in the bucket
item_list = []
for blob in bucket.list_blobs(prefix=auction):
  if blob.name.startswith(auction + '31_1'):
    print(blob.name)

    # if blob.name.endswith('.jpg'):
    #   # if blob.name.startswith('aypt2025-31_1'):
    #     print(f"Processing: {blob.name}")
    #   # byte_data = blob.download_as_bytes()

aypt2025-13/31_1.jpg


# End Test Workflow