In [None]:
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Environment setup

Make sure to edit all the cells marked as `TODO`.

In [None]:
!pip install --upgrade google-cloud-bigquery google-generativeai

In [None]:
import sys

if "google.colab" in sys.modules:
    from google.colab import auth
    auth.authenticate_user()



In [None]:
# TODO(developer): Set project_id to your project's ID.
PROJECT_ID = "" # @param {type:"string"}

# TODO(developer): Set dataset_id to the ID of the dataset that the table belongs to.
DATASET_ID = "" # @param {type:"string"}

table_tracks = f"`{PROJECT_ID}.{DATASET_ID}.tracks_view`"  # Fully qualified table name
table_observations = f"`{PROJECT_ID}.{DATASET_ID}.observations_view`"
table_urls = f"`{PROJECT_ID}.{DATASET_ID}.urls_view`"

In [None]:
print(table_tracks)
print(table_observations)
print(table_urls)

`imagery-insights-sandbox.road_signs_dcr.tracks_view`
`imagery-insights-sandbox.road_signs_dcr.observations_view`
`imagery-insights-sandbox.road_signs_dcr.urls_view`


In [None]:
MODEL_ID = "gemini-2.0-flash"  # @param {type: "string"}
LOCATION = "us-central1"  # @param {type: "string"}

In [None]:
import os

if not PROJECT_ID or PROJECT_ID == "your-project-id":
  PROJECT_ID = str(os.environ.get("GOOGLE_CLOUD_PROJECT"))

if not LOCATION:
  LOCATION = os.environ.get("GOOGLE_CLOUD_REGION", "us-central1")

In [None]:
from google.cloud import bigquery
from google import genai

vertex_ai_client = genai.Client(vertexai=True, project=PROJECT_ID, location=LOCATION)
bq_client = bigquery.Client()

# Let's fetch the data from BigQuery

This code snippet constructs a series of temporary SQL tables to join track data, observation details, and associated URLs, ultimately producing a final table containing combined information ordered by track ID.

It prepares and organizes data for analysis by linking related track, observation, and URL information.

In [None]:
# Create a temporary table with a sample of 5 tracks.
sql_create_temp_tracks = f"""
CREATE OR REPLACE TEMP TABLE TempTracks AS
SELECT
  *
FROM
  {table_tracks};
"""

# Create a temporary table with (trackId, observationId) pairs and Google Maps URLs.
sql_create_track_observation_pairs = """
CREATE OR REPLACE TEMP TABLE TempTrackObservationPairs AS
SELECT
  t.trackId,
  observationId,
  ST_GeogPoint(ST_X(geometry), ST_Y(geometry)) AS geographic_point,
  FORMAT('https://maps.google.com/?q=%f,%f', ST_Y(t.geometry), ST_X(t.geometry)) AS google_maps_url
FROM
  TempTracks AS t
CROSS JOIN
  UNNEST(t.observationIds) AS observationId;
"""

# Create a temporary table with observation metadata (timestamp, bbox).
sql_create_observation_metadata = f"""
CREATE OR REPLACE TEMP TABLE TempObservationMetadata AS
SELECT
  observationId,
  captureTimestamp,
  bbox
FROM
  {table_observations};
  -- WHERE observationId IN (SELECT observationId FROM TempTrackObservationPairs) -- Optional
"""

# Create a temporary table joining track/observation pairs, URLs, and metadata.
sql_create_joined_data = f"""
CREATE OR REPLACE TEMP TABLE TempJoinedData AS
SELECT
  top.trackId,
  top.observationId,
  top.geographic_point,
  top.google_maps_url,
  u.signedUrl,
  om.captureTimestamp,
  om.bbox
FROM
  TempTrackObservationPairs AS top
INNER JOIN
  {table_urls} AS u
  ON top.observationId = u.observationId
INNER JOIN
  TempObservationMetadata AS om
  ON top.observationId = om.observationId;
"""

# Select all columns from the joined data, ordered by trackId.
sql_final_select = """
SELECT * FROM TempJoinedData ORDER BY trackId;
"""

# --- Optional Cleanup (Uncomment if needed) ---
# sql_cleanup = """
# DROP TABLE TempTracks;
# DROP TABLE TempTrackObservationPairs;
# DROP TABLE TempObservationMetadata;
# DROP TABLE TempJoinedData;
# """

In [None]:
# Execute each query part
query_jobs = []

merged_queries = f"""
{sql_create_temp_tracks}
{sql_create_track_observation_pairs}
{sql_create_observation_metadata}
{sql_create_joined_data}
{sql_final_select}
"""
# add sql_cleanup  # Uncomment if you want to explicitly drop the tables

query_job = bq_client.query(merged_queries)
query_jobs.append(query_job)  # Store the job object
query_job.result()  # Wait for the query to complete
print(f"Query executed successfully!")

# Get results from the final query (SELECT * FROM TempJoinedData)
results = query_jobs[-1].result()  # Get results from last query job

Query executed successfully!


# Data and image classification
Let's process the results and anlyze the road sign images using Gemini (or any other model).

In [None]:
# Import necessary libraries
import typing # Standard library
import os    # Standard library
import json  # For parsing JSON responses
import io    # For handling byte streams (e.g., for image data)

# Define placeholder classes in the global scope
class GenaiPlaceholder:
    """Placeholder for the 'genai' module if it's not found or is malformed."""
    Client = type('Client', (object,), {})
    # The 'models' attribute on a client instance would be an object,
    # so we don't need a GenerativeModel class placeholder at this level
    # if we are using client.models.generate_content.
    # However, 'GenerativeModel' might be a type hint or used elsewhere, so keep a basic one.
    GenerativeModel = type('GenerativeModel', (object,), {})
    models = type('ModelsServicePlaceholder', (object,), {'generate_content': lambda **kwargs: None})() # Placeholder for client.models

    @staticmethod
    def _Part_from_text_static(text): return None
    @staticmethod
    def _Part_from_uri_static(uri, mime_type): return None
    @staticmethod
    def _Part_from_bytes_static(data, mime_type): return None
    Part = type('Part', (object,), {'from_text': _Part_from_text_static, 'from_uri': _Part_from_uri_static, 'from_bytes': _Part_from_bytes_static})
    Content = type('Content', (object,), {})
    Tool = type('Tool', (object,), {})
    Retrieval = type('Retrieval', (object,), {})
    VertexRagStore = type('VertexRagStore', (object,), {})
    GenerationConfig = type('GenerationConfig', (object,), {})
    GenerateContentResponse = type('GenerateContentResponse', (object,), {})
    types = type('TypesPlaceholderSubmodule', (object,), {
        'Part': Part, 'Content': Content, 'Tool': Tool, 'Retrieval': Retrieval,
        'VertexRagStore': VertexRagStore, 'GenerationConfig': GenerationConfig,
        'GenerateContentResponse': GenerateContentResponse
    })

class TypesPlaceholder:
    """Placeholder for a separate 'types' module if needed."""
    Part = GenaiPlaceholder.Part
    Content = GenaiPlaceholder.Content
    Tool = GenaiPlaceholder.Tool
    Retrieval = GenaiPlaceholder.Retrieval
    VertexRagStore = GenaiPlaceholder.VertexRagStore
    GenerationConfig = GenaiPlaceholder.GenerationConfig
    GenerateContentResponse = GenaiPlaceholder.GenerateContentResponse

# Initialize module variables
genai_module = GenaiPlaceholder()
gg_types = TypesPlaceholder()
_libraries_loaded_successfully = False

# --- Diagnostic Step 1: Check 'google.cloud.aiplatform' import ---
_google_cloud_aiplatform_imported_successfully = False
print("--- Diagnostic: Checking 'google.cloud.aiplatform' import ---")
try:
    import google.cloud.aiplatform
    _google_cloud_aiplatform_imported_successfully = True
    print(f"Successfully imported 'google.cloud.aiplatform'. Path: {getattr(google.cloud.aiplatform, '__file__', 'N/A')}")
except Exception as e_aip:
    print(f"WARNING: Could not import 'google.cloud.aiplatform'. This is needed for VertexAI client. Error: {e_aip}")
print("--------------------------------------------------------------")

# --- Attempt to import and configure 'genai' and its 'types' ---
print("\n--- Attempting to configure 'genai' (for Client & client.models) and 'types' (for Part) ---")

# Attempt 1: User's preferred pattern 'from google import genai' and 'from google.genai import types'
try:
    from google import genai as attempt1_genai
    print(f"Attempt 1: 'from google import genai' successful. Path: {getattr(attempt1_genai, '__file__', 'N/A')}")

    from google.genai import types as attempt1_types
    print(f"Attempt 1: 'from google.genai import types' successful. Path: {getattr(attempt1_types, '__file__', 'N/A')}")

    has_client = hasattr(attempt1_genai, 'Client')
    # Check if the client instance would have a 'models' attribute with 'generate_content'
    # This is a bit indirect; we assume if Client exists, its instance will have .models.generate_content
    has_models_with_generate_content = hasattr(attempt1_genai, 'Client') # Simplified check, real check on client instance later
    has_part_in_types = hasattr(attempt1_types, 'Part')
    has_generationconfig_in_types = hasattr(attempt1_types, 'GenerationConfig') # Check another key type

    if has_client and has_models_with_generate_content and has_part_in_types and has_generationconfig_in_types:
        print("INFO: Attempt 1 ('from google import genai' and 'from google.genai import types') provides necessary attributes ('Client', 'models' structure anticipated, 'types.Part', 'types.GenerationConfig'). Using this setup.")
        genai_module = attempt1_genai
        gg_types = attempt1_types
        _libraries_loaded_successfully = True
    else:
        print("WARNING: Attempt 1 ('from google import genai' and 'from google.genai import types') did not provide a complete setup:")
        if not has_client: print("  - Missing 'genai.Client'")
        if not has_models_with_generate_content: print("  - 'genai.Client' might not yield '.models.generate_content' (anticipated)")
        if not has_part_in_types: print("  - Missing 'types.Part'")
        if not has_generationconfig_in_types: print("  - Missing 'types.GenerationConfig'")
        if not _libraries_loaded_successfully: print("  Will try fallback 'import google.generativeai'.")

except (ImportError, Exception) as e_attempt1:
    print(f"Attempt 1: Failed or error during 'from google import genai' or 'from google.genai import types'. Error: {e_attempt1}")
    print("  Will try fallback 'import google.generativeai'.")

# Attempt 2: Try 'import google.generativeai' if Attempt 1 failed
if not _libraries_loaded_successfully:
    try:
        import google.generativeai as attempt2_genai
        print(f"Attempt 2: 'import google.generativeai' successful. Path: {getattr(attempt2_genai, '__file__', 'N/A')}")

        # For this import, types are usually directly under google.generativeai.types
        attempt2_types_candidate = None
        if hasattr(attempt2_genai, 'types') and hasattr(attempt2_genai.types, 'Part'):
            attempt2_types_candidate = attempt2_genai.types
            print("  INFO: Found 'types.Part' via 'google.generativeai.types'.")
        else: # Try separate import if not on main module or Part is missing
            try:
                from google.generativeai import types as attempt2_separate_types
                print(f"  Attempted separate import of 'google.generativeai.types'. Path: {getattr(attempt2_separate_types, '__file__', 'N/A')}")
                if hasattr(attempt2_separate_types, 'Part'):
                    attempt2_types_candidate = attempt2_separate_types
                    print("  INFO: Found 'Part' in separately imported 'google.generativeai.types'.")
                else: print("  WARNING: Separately imported 'google.generativeai.types' is MISSING 'Part'.")
            except (ImportError, Exception) as e_sep_types:
                print(f"  WARNING: Failed to import 'google.generativeai.types' separately. Error: {e_sep_types}")

        has_client = hasattr(attempt2_genai, 'Client')
        has_models_with_generate_content = hasattr(attempt2_genai, 'Client') # Simplified check
        has_part_in_types = attempt2_types_candidate and hasattr(attempt2_types_candidate, 'Part')
        has_generationconfig_in_types = attempt2_types_candidate and hasattr(attempt2_types_candidate, 'GenerationConfig')

        if has_client and has_models_with_generate_content and has_part_in_types and has_generationconfig_in_types:
            print("INFO: Attempt 2 ('import google.generativeai') provides 'Client', 'models' structure anticipated, and 'types' with 'Part'/'GenerationConfig'. Using this setup.")
            genai_module = attempt2_genai
            gg_types = attempt2_types_candidate
            _libraries_loaded_successfully = True
        else:
            print("ERROR: Attempt 2 ('import google.generativeai') also did not provide a complete setup:")
            if not has_client: print("  - Missing 'Client'")
            if not has_models_with_generate_content: print("  - 'Client' might not yield '.models.generate_content' (anticipated)")
            if not has_part_in_types: print("  - Could not find 'types.Part'")
            if not has_generationconfig_in_types: print("  - Could not find 'types.GenerationConfig'")

    except (ImportError, Exception) as e_attempt2:
        print(f"Attempt 2: Failed or error during 'import google.generativeai'. Error: {e_attempt2}")

if not _libraries_loaded_successfully:
    print("\n######################################################################################")
    print("FINAL ERROR: Could not establish a working setup for 'genai.Client' (with '.models' attribute) and 'types.Part'.")
    if not _google_cloud_aiplatform_imported_successfully:
        print("      This might be related to 'google.cloud.aiplatform' not being available for the genai library.")
    print("      RECOMMENDATION: Ensure 'google-generativeai' (>=0.5.0 for Vertex) and 'google-cloud-aiplatform' are correctly installed and compatible.")
    print("      Run 'pip install --upgrade google-generativeai google-cloud-aiplatform' in your Colab environment and RESTART THE RUNTIME.")
    print("      The script will use placeholder objects, and API calls will not function correctly.")
    print("######################################################################################")
    genai_module = GenaiPlaceholder()
    gg_types = TypesPlaceholder()

print("--------------------------------------------------------------")

# --- Configuration ---
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID", "imagery-insights-sandbox")
GCP_LOCATION = os.getenv("GCP_LOCATION", "us-central1")

VISION_MODEL_NAME_LAMP = "gemini-pro-vision"
RAG_MODEL_NAME_LAMP = "gemini-1.5-pro-preview-0409"
RAG_CORPUS_PATH = f"projects/{GCP_PROJECT_ID}/locations/{GCP_LOCATION}/ragCorpora/6917529027641081856"
DEFAULT_ROAD_SIGN_MODEL = "gemini-1.5-flash-latest"

# --- Helper Function to Initialize Client ---
def initialize_genai_client():
    if not _libraries_loaded_successfully:
        print("GenAI library not properly loaded. Cannot initialize client.")
        return None
    try:
        client = genai_module.Client(
            vertexai=True,
            project=GCP_PROJECT_ID,
            location=GCP_LOCATION,
        )
        print(f"DEBUG: Type of initialized client object: {type(client)}")
        print(f"DEBUG: Attributes of initialized client object: {dir(client)}")
        if not hasattr(client, 'models') or not hasattr(client.models, 'generate_content'):
            print("ERROR: Initialized client does NOT have '.models.generate_content()' method as expected!")
            return None # Critical failure if this specific invocation pattern is targeted
        print(f"GenAI Client Initialized and Configured for Vertex AI (project '{GCP_PROJECT_ID}', location '{GCP_LOCATION}').")
        return client
    except Exception as e:
        print(f"Error initializing GenAI client: {e}")
        return None

# --- Function for Step 1: Lamp Image Analysis ---
def get_lamp_description_from_image(client: typing.Optional['genai_module.Client'], gcs_url: str) -> typing.Optional[str]:
    print(f"--- Step 1 (Lamp): Analyzing image {gcs_url} ---")
    if not _libraries_loaded_successfully or not client:
        print("Error: GenAI client/types not properly loaded for lamp image analysis.")
        return None
    try:
        prompt_text_part = gg_types.Part.from_text(text="""Analyze the provided image.
Instructions:
1. Determine if the image clearly shows a utility pole or a street lamp.
2. If it shows a street lamp, provide a detailed description focusing on:
   - Overall shape and structure, Lamp head type and shape, Arm type, Pole material and color, Any other key identifying features.
3. If it is clearly a utility pole (carrying wires, transformers, etc.) and not a street lamp, respond ONLY with: "UTILITY_POLE".
4. If the image does not clearly show either, or is ambiguous, respond ONLY with: "UNCLEAR_IMAGE".
""")
        image_part = gg_types.Part.from_uri(uri=gcs_url, mime_type="image/jpeg")

        current_generation_config = gg_types.GenerationConfig(temperature=0.2)
        print(f"Attempting to generate content for lamp vision model: {VISION_MODEL_NAME_LAMP}")
        response = client.models.generate_content(
            model=f"projects/{GCP_PROJECT_ID}/locations/{GCP_LOCATION}/publishers/google/models/{VISION_MODEL_NAME_LAMP}", # Vertex model path
            contents=[prompt_text_part, image_part],
            generation_config=current_generation_config
        )
        print("Lamp vision content generation complete.")

        if not hasattr(response, 'candidates') or not response.candidates or \
           not hasattr(response.candidates[0], 'content') or not response.candidates[0].content or \
           not hasattr(response.candidates[0].content, 'parts') or not response.candidates[0].content.parts:
            print("Error: Lamp vision model response is empty or malformed.")
            return None

        # Assuming response.text is available for this invocation pattern
        description = ""
        if response.candidates[0].content.parts:
            for part in response.candidates[0].content.parts:
                if hasattr(part, 'text'):
                    description += part.text
        description = description.strip()

        if not description and hasattr(response, 'text'): # Fallback if parts don't have text but response object does
            description = response.text.strip()

        print(f"Lamp image analysis result: {description}")
        if description in ["UTILITY_POLE", "UNCLEAR_IMAGE"]:
            print("Image identified as utility pole or unclear. Skipping lamp classification.")
            return None
        elif "error" in description.lower():
            print(f"Lamp vision model returned an error message: {description}")
            return None
        return description
    except AttributeError as ae:
        error_detail = str(ae)
        print(f"AttributeError during lamp image analysis (Step 1): {error_detail}.")
        return None
    except Exception as e:
        print(f"An unexpected error occurred during lamp image analysis (Step 1): {e}")
        return None

# --- Function for Step 2: Lamp RAG Classification ---
def classify_lamp_with_rag(client: typing.Optional['genai_module.Client'], lamp_description: str) -> typing.Optional[str]:
    print(f"\n--- Step 2 (Lamp): Classifying lamp using RAG ---")
    if not _libraries_loaded_successfully or not client:
        print("Error: GenAI client/types not properly loaded for lamp RAG classification.")
        return None
    try:
        rag_prompt_text_part = gg_types.Part.from_text(text=f"Lamp Description: {lamp_description}\n...")
        si_text1 = "You are a lighting expert..."
        system_instruction_part = gg_types.Part.from_text(text=si_text1) # This becomes part of contents for client.models.generate_content

        # For client.models.generate_content, system_instruction is a top-level param
        # Contents should be a list of Content objects or dicts
        contents_for_rag = [
            # gg_types.Content(parts=[system_instruction_part]), # System instruction can be a top-level param
            gg_types.Content(role="user", parts=[rag_prompt_text_part])
        ]

        tools_for_rag = [gg_types.Tool(retrieval=gg_types.Retrieval(source=gg_types.VertexRagStore(rag_corpora=[RAG_CORPUS_PATH], similarity_top_k=10)))]
        rag_generation_config = gg_types.GenerationConfig(temperature=0.3, top_p=0.95, max_output_tokens=512)
        safety_settings_rag = [{"category": c, "threshold": "BLOCK_MEDIUM_AND_ABOVE"} for c in ["HARM_CATEGORY_HARASSMENT", "HARM_CATEGORY_HATE_SPEECH", "HARM_CATEGORY_SEXUALLY_EXPLICIT", "HARM_CATEGORY_DANGEROUS_CONTENT"]]

        print(f"Attempting to generate content for lamp RAG model: {RAG_MODEL_NAME_LAMP}")
        response = client.models.generate_content(
            model=f"projects/{GCP_PROJECT_ID}/locations/{GCP_LOCATION}/publishers/google/models/{RAG_MODEL_NAME_LAMP}", # Vertex model path
            contents=contents_for_rag,
            system_instruction=system_instruction_part, # Pass system instruction here
            tools=tools_for_rag,
            safety_settings=safety_settings_rag,
            generation_config=rag_generation_config
        )
        print("Lamp RAG content generation complete.")

        if not hasattr(response, 'candidates') or not response.candidates or \
           not hasattr(response.candidates[0], 'content') or not response.candidates[0].content or \
           not hasattr(response.candidates[0].content, 'parts') or not response.candidates[0].content.parts:
            print("Error: Lamp RAG model response is empty or malformed.")
            if hasattr(response, 'prompt_feedback') and response.prompt_feedback.block_reason:
                print(f"Prompt blocked due to: {response.prompt_feedback.block_reason_message}")
            return None

        final_result = ""
        if response.candidates[0].content.parts:
            for part in response.candidates[0].content.parts:
                if hasattr(part, 'text'):
                    final_result += part.text
        final_result = final_result.strip()

        if not final_result and hasattr(response, 'text'): # Fallback
            final_result = response.text.strip()

        print(f"Lamp RAG Classification result: {final_result}")
        return final_result
    except AttributeError as ae:
        error_detail = str(ae)
        print(f"AttributeError during lamp RAG classification (Step 2): {error_detail}.")
        return None
    except Exception as e:
        print(f"An unexpected error occurred during lamp RAG classification (Step 2): {e}")
        return None

# --- Main Lamp Orchestration Function ---
def analyze_and_classify_lamp(client: typing.Optional['genai_module.Client'], gcs_url: str):
    print(f"\nStarting lamp analysis for image: {gcs_url}")
    if not client:
        print("Lamp analysis: GenAI client is not provided or not initialized. Exiting lamp analysis.")
        return
    lamp_description = get_lamp_description_from_image(client, gcs_url)
    if lamp_description:
        classification_result = classify_lamp_with_rag(client, lamp_description)
        if classification_result: print(f"\n=== Final Lamp Classification for {gcs_url} ===\n{classification_result}\n===============================================")
        else: print(f"\nFailed to get lamp classification for {gcs_url} after obtaining description.")
    else: print(f"\nNo valid lamp description obtained for {gcs_url}. Cannot proceed to lamp classification.")

# --- Function for Road Sign Classification ---
def classify_road_sign(client: typing.Optional['genai_module.Client'], image_data: bytes, model_name: str = DEFAULT_ROAD_SIGN_MODEL) -> dict:
    print(f"\n--- Classifying road sign using model: {model_name} ---")
    if not _libraries_loaded_successfully or not client:
        return {"error": "Client/Types not loaded", "details": "GenAI library (Client, client.models or types.Part) not available."}
    if not isinstance(image_data, bytes): return {"error": "Invalid input type", "details": "image_data must be bytes."}
    try:
        prompt_text = """Classify the road sign in this image into one of the following categories:
                      * Stop, * Yield, * Speed Limit, * Pedestrian Crossing, * No Parking, * Turn, * Do not enter, * Street name, * Other
                    Provide the category and a description in JSON format.
                    Provide a field called "sign_quality" (Good, Fair, Poor, Critical, Other).
                    If image quality is poor, include notes in "image_quality_notes".
                    Return as a json object. Example:
                    ```json
                    {
                        "category": "Stop", "sign_quality": "Good",
                        "description": "A red octagonal stop sign is clearly visible.",
                        "image_quality_notes": "image is clear"
                    }
                    ```"""
        image_part = gg_types.Part.from_bytes(data=image_data, mime_type="image/jpeg")

        # Construct contents list:
        # The prompt should be a Part if it's multi-turn or complex, or just a string.
        # For client.models.generate_content, contents is a list of Content objects or dicts.
        # Each Content object has a 'parts' list.
        contents_for_road_sign = [
            gg_types.Content(parts=[gg_types.Part.from_text(prompt_text), image_part])
        ]

        print(f"Attempting to generate content for road sign model: {model_name}")
        # Use client.models.generate_content
        response: 'gg_types.GenerateContentResponse' = client.models.generate_content(
            model=f"projects/{GCP_PROJECT_ID}/locations/{GCP_LOCATION}/publishers/google/models/{model_name}", # Vertex model path
            contents=contents_for_road_sign
        )
        print("Road sign classification content generation complete.")

        if not hasattr(response, 'candidates') or not response.candidates or \
           not hasattr(response.candidates[0], 'content') or not response.candidates[0].content or \
           not hasattr(response.candidates[0].content, 'parts') or not response.candidates[0].content.parts:
            return {"error": "Model response empty", "details": "No content parts in response."}

        json_text = ""
        if response.candidates[0].content.parts:
            for part in response.candidates[0].content.parts:
                if hasattr(part, 'text'):
                    json_text += part.text
        json_text = json_text.strip()

        if not json_text and hasattr(response, 'text'): # Fallback
             json_text = response.text.strip()

        print(f"Raw response text from road sign model: {json_text[:200]}...")

        if json_text.startswith("```json"): json_text = json_text[7:]
        if json_text.endswith("```"): json_text = json_text[:-3]

        result_dict = json.loads(json_text.strip())
        print(f"Parsed road sign classification: {result_dict}")
        return result_dict
    except AttributeError as ae:
        error_detail = str(ae)
        print(f"AttributeError during road sign classification: {error_detail}.")
        return {"error": "AttributeError", "details": error_detail}
    except json.JSONDecodeError as je:
        return {"error": "JSONDecodeError", "details": str(je), "raw_response": json_text} # Return the text we tried to parse
    except Exception as e:
        return {"error": "Unexpected error", "details": str(e)}

# --- Example Usage ---
if __name__ == "__main__":
    if not _libraries_loaded_successfully:
        print("\nCannot run examples: Critical libraries (GenAI Client with .models or Types.Part) failed to load.")
        print("RECOMMENDATION: Run 'pip install --upgrade google-generativeai google-cloud-aiplatform' in your Colab environment and RESTART THE RUNTIME.")
    else:
        print("\nLibraries loaded. Initializing client for examples...")
        master_client = initialize_genai_client()

        if master_client:
            print("\n--- Running Lamp Analysis Examples (Commented Out) ---")
            # analyze_and_classify_lamp(master_client, "gs://utility-poles/o1:0010f9db7600bf7de3cd2874d8ae421c_00250082.jpg")
            print("\n\n--- Running Road Sign Classification Example ---")
            dummy_jpeg_bytes = (
                b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xdb\x00C\x00\x08\x06\x06\x07\x06\x05\x08\x07\x07\x07\t\t\x08\n\x0c\x14\r\x0c\x0b\x0b\x0c\x19\x12\x13\x0f\x14\x1d\x1a\x11\x11\x18!\x1e\x18\x1a\x1d(%\x1e\x1f#\x1c\x1c #%)\x17\x1f\x1f+('
                b'\x1f\x1f\x1f\x1f\x1f\x1f\x1f\x1f\x1f\x1f\x1f\x1f\x1f\x1f\x1f\x1f\x1f\x1f\x1f\x1f\x1f\x1f\x1f\x1f\x1f\x1f\x1f\x1f\x1f\x1f\x1f\x1f\xff\xc9\x00\x0b\x08\x00\x01\x00\x01\x01\x01\x11\x00\xff\xda\x00\x08\x01\x01\x00\x00?\x00\xd2\xcf \xff\xd9'
            )
            print("Classifying dummy road sign image...")
            # Note: The 'client' parameter to classify_road_sign is the initialized vertex_ai_client
            classification_result = classify_road_sign(master_client, dummy_jpeg_bytes, model_name=DEFAULT_ROAD_SIGN_MODEL)

            print("\n=== Road Sign Classification Result ===")
            if "error" in classification_result:
                print(f"Error: {classification_result['error']}\nDetails: {classification_result.get('details', 'N/A')}")
                if "raw_response" in classification_result: print(f"Raw Model Response: {classification_result['raw_response'][:500]}...")
            else:
                print(json.dumps(classification_result, indent=4))
            print("======================================")
        else:
            print("\nGenAI client initialization failed. Cannot run examples.")
    print("\nProcessing finished.")



# Insert image data int Bigquery

We may want to store the image classification reponse in a BQ table.

In [None]:


from google.cloud import bigquery
from typing import Dict, Any

#PROJECT_ID = "imagery-insights-sandbox"  # Replace with your project ID
ANALYSIS_DATASET_ID = "imagery_analysis"  # @param {type: "string"}

def insert_image_report_to_bq(image_report: Dict[str, Any]) -> None:
    """
    Inserts a single image report dictionary into a BigQuery table.

    Args:
        image_report: A dictionary representing the image report.
    """
    ANALYSIS_RESULTS_TABLE = "road_sign_data"  # @param {type: "string"}

    client = bigquery.Client(project=PROJECT_ID)
    table_ref = client.dataset(ANALYSIS_DATASET_ID).table(ANALYSIS_RESULTS_TABLE)

    try:
        errors = client.insert_rows_json(table_ref, [image_report])  # Corrected line
        if errors:
            print(f"Error inserting image report: {errors}")
        else:
            print("Image report inserted successfully.")
    except Exception as e:
        print(f"An error occurred during BigQuery insertion: {e}")


# Create the data element

We want to combine results from the original query along with image analysis results in a single dict object.

With this object we can either

1.   Insert in BQ
2.   Just visualize the data as pandas dataframe
3.   export as json/csv



In [None]:
import requests
import PIL.Image
import importlib
import io
from typing import List, Dict, Any
from decimal import Decimal
import datetime
importlib.reload(datetime)

def process_image_reports(results: List[Any]) -> List[Dict[str, Any]]:
    """
    Processes a list of results, retrieves images from signed URLs, classifies them, and inserts into BigQuery.
    """
    all_reports: List[Dict[str, Any]] = []
    for image_counter, row in enumerate(results, 0):
        track_id: Any = getattr(row, "trackId", None)
        observation_id: Any = getattr(row, "observationId", None)
        geographic_point: Any = getattr(row, "geographic_point", None)
        maps_url: Any = getattr(row, "google_maps_url", None)
        capture_timestamp: Any = getattr(row, "captureTimestamp", None)
        bbox: Any = getattr(row, "bbox", None)
        signed_url: str = getattr(row, "signedUrl", "")

        print(f"Processing image {image_counter}: trackId={track_id}, observationId={observation_id}")
        print(f"Processing image {image_counter}: timestamp={capture_timestamp}, observationId={observation_id}")

        try:
            response = requests.get(signed_url, stream=True, timeout=10)
            response.raise_for_status()
            image_data: bytes = response.content

            gemini_response: dict = classify_road_sign(image_data)

            print(f"The classification type is ##### {gemini_response}")

            if "error" in gemini_response:
                print(f"  Error classifying image {image_counter}: {gemini_response['error']}")
                category = None
                description = None
                image_quality_notes = None
                sign_quality = None
            else:
                category = gemini_response.get("category")
                description = gemini_response.get("description")
                image_quality_notes = gemini_response.get("image_quality_notes")
                sign_quality = gemini_response.get("sign_quality")

            # Convert captureTimestamp to ISO 8601 string
            capture_datetime_str = None
            if capture_timestamp:
                if isinstance(capture_timestamp, datetime.datetime):
                    capture_datetime_str = capture_timestamp.isoformat()
                else:
                    print(f"Warning: Unexpected captureTimestamp type: {type(capture_timestamp)}")
                    capture_datetime_str = None

            image_report: Dict[str, Any] = {
                "trackId": str(track_id),
                "observationId": str(observation_id),
                "geographic_point": str(geographic_point),
                "mapsURL": str(maps_url),
                "captureTimestamp": capture_datetime_str,  # Use ISO 8601 string
                "signedUrl": str(signed_url),
                "category": category,
                "description": description,
                "image_quality_notes": image_quality_notes,
                "sign_quality": sign_quality,
            }

            # Convert bbox to JSON string
            bbox_data = bbox
            if bbox_data:
                def convert_decimal(obj):
                    if isinstance(obj, Decimal):
                        return float(obj)
                    raise TypeError

                image_report['bbox'] = json.dumps(bbox_data, default=convert_decimal)

            all_reports.append(image_report)
            print(f"image_report to be inserted: {image_report}")
            insert_image_report_to_bq(image_report)

        except requests.exceptions.RequestException as e:
            print(f"  Error processing image {image_counter}: {e}")
        except PIL.UnidentifiedImageError as e:
            print(f"  Error processing image {image_counter}: Invalid image data - {e}")
        except Exception as e:
            print(f"  Error processing image {image_counter}: {e}")

    return all_reports


# Analyzie the images with Gemini

In [None]:
all_reports = process_image_reports(results)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  The raw response is: candidates=[Candidate(content=Content(parts=[Part(video_metadata=None, thought=None, code_execution_result=None, executable_code=None, file_data=None, function_call=None, function_response=None, inline_data=None, text='```json\n{\n  "category": "No Parking",\n  "sign_quality": "Good",\n  "description": "The sign indicates a \'no parking\' zone specifically for a bus stop, with a 5 minute parking limit allowed from 8 AM to 4 PM for driver to leave vehicle.",\n  "image_quality_notes": "image is clear"\n}\n```')], role='model'), citation_metadata=None, finish_message=None, token_count=None, avg_logprobs=-0.43740690322149367, finish_reason='STOP', grounding_metadata=None, index=None, logprobs_result=None, safety_ratings=None)] model_version='gemini-2.0-flash' prompt_feedback=None usage_metadata=GenerateContentResponseUsageMetadata(cached_content_token_count=None, candidates_token_count=84, prompt_token_

# Visualize the dataset

In [None]:
import pandas as pd
df_all_reports = pd.DataFrame(all_reports)
print(f"The size of all_reports inside the pandas is: {len(all_reports)}")
#print(df_all_reports)
df_all_reports

In [None]:
# (OPTIONAL)
# You can export the results of the analysis
# df_all_reports.to_csv("road_sign_classification_results.csv", index=False)