# generate assets



```markdown
# BigQuery & Gemini: Generating and Analyzing Multimodal Data

This notebook demonstrates how to use Vertex AI's generative models to create a multimodal dataset, store it in Google Cloud Storage, and then analyze it using BigQuery and Gemini.
```


```markdown
## 1. Setup and Installation

First, let's install the necessary Python libraries for interacting with Google Cloud services.
```

In [1]:
%pip install --upgrade --user google-cloud-aiplatform google-cloud-storage google-cloud-bigquery



In [1]:
%pip install --upgrade --user google-cloud-aiplatform google-cloud-storage google-cloud-bigquery google-cloud-texttospeech



In [6]:
%pip install --upgrade google-cloud-aiplatform[generative_models]




```markdown
Next, please fill in your Google Cloud project details and other configuration values below.
```

In [3]:
#easy test - to be deleted later
import os

# Your Google Cloud project ID
PROJECT_ID = "geminienterprise-485114"
# The region for your resources
LOCATION = "us-central1"
# Your Google Cloud Storage bucket name
GCS_BUCKET = " meetupmarch"
# Your BigQuery dataset name
# BIGQUERY_DATASET = "your_bigquery_dataset"

# Authenticate with Google Cloud
if "google.colab" in str(get_ipython()):
    from google.colab import auth
    auth.authenticate_user()

# Initialize Vertex AI
import vertexai
vertexai.init(project=PROJECT_ID, location=LOCATION)

In [2]:
# Authentication
# --- AUTHENTICATE ---
# Authenticate with Google Cloud. This is crucial for running in a Colab environment.
# It will trigger a pop-up window to ask for your credentials and permissions.
import sys
if "google.colab" in sys.modules:
    from google.colab import auth
    auth.authenticate_user()

In [7]:
# --- CONFIGURATION AND IMPORTS ---
import os
import json
import vertexai

# CORRECTED IMPORTS: Import each client library on its own line.
from google.cloud import bigquery
from google.cloud import storage
from google.cloud import texttospeech
from vertexai.preview.vision_models import ImageGenerationModel
from vertexai.preview.generative_models import GenerativeModel


In [33]:
import base64

from IPython.display import Audio
import google.auth
import google.auth.transport.requests
import requests

In [52]:
# Your Google Cloud project ID
PROJECT_ID = "geminienterprise-485114"

# The region for your resources
LOCATION = "us-central1"

# Your Google Cloud Storage bucket name (no 'gs://' prefix)
GCS_BUCKET_NAME = "meetupmarch"

# Derived names for our BigQuery resources
DATASET_ID = "generative_assets_dataset"
TABLE_ID = "assets_metadata"

# --- INITIALIZE CLIENTS ---
# Initialize Vertex AI SDK and other clients with your project details.
# After authentication, these clients will have the necessary permissions.
vertexai.init(project=PROJECT_ID, location=LOCATION)

storage_client = storage.Client(project=PROJECT_ID)
bq_client = bigquery.Client(project=PROJECT_ID)
tts_client = texttospeech.TextToSpeechClient()

print(f"Project: {PROJECT_ID}, Location: {LOCATION}")
print("Vertex AI and other Google Cloud clients initialized successfully.")



Project: geminienterprise-485114, Location: us-central1
Vertex AI and other Google Cloud clients initialized successfully.


In [9]:
# --- PREPARE GCS BUCKET AND BIGQUERY DATASET ---
# This code will create the resources if they don't already exist.

# GCS Bucket
bucket = storage_client.bucket(GCS_BUCKET_NAME)
if not bucket.exists():
    bucket.create(location=LOCATION)
    print(f"Bucket '{GCS_BUCKET_NAME}' created.")
else:
    print(f"Bucket '{GCS_BUCKET_NAME}' already exists.")

Bucket 'meetupmarch' already exists.


In [10]:
# BigQuery Dataset
dataset_ref = bq_client.dataset(DATASET_ID)
try:
    bq_client.get_dataset(dataset_ref)
    print(f"Dataset '{DATASET_ID}' already exists.")
except Exception:
    bq_client.create_dataset(dataset_ref)
    print(f"Dataset '{DATASET_ID}' created.")


Dataset 'generative_assets_dataset' created.


In [11]:
# A list to hold metadata for all generated assets
all_metadata = []

```markdown
## 2. Data Generation with Vertex AI

Now, let's generate some multimodal data using different Vertex AI models.
```

```markdown
### 2.1 Generate an Image with Imagen - 1 image
```

In [7]:
import vertexai
from vertexai.vision_models import ImageGenerationModel

# TODO: Specify your project ID and location
# vertexai.init(project="your-project-id", location="your-location")

image_model = ImageGenerationModel.from_pretrained("imagen-3.0-generate-002")

image_prompt = "a futuristic banana-shaped spaceship flying through a nebula"

response = image_model.generate_images(prompt=image_prompt)

# The response is a list of Image objects.
# Access the first image directly and save it.
response[0].save("generated_image.png")

print("Image generated and saved as generated_image.png")

Image generated and saved as generated_image.png


```markdown
### 2.1 Generate an Image with Imagen - 10 images
```

In [12]:
print("--- Starting Image Generation (Imagen 3) ---")
image_model = ImageGenerationModel.from_pretrained("imagen-3.0-generate-002")
local_image_dir = "generated_images"
os.makedirs(local_image_dir, exist_ok=True)

image_prompts = [
    "A state-of-the-art chemical manufacturing plant at sunset, with clean energy sources visible.",
    "Macro shot of a new, sustainable consumer goods product made from plant-based materials.",
    "A team of engineers in a modern factory reviewing data on a holographic display.",
    "Futuristic robotic arms assembling a complex piece of machinery with precision.",
    "A digital twin of a manufacturing facility, showing real-time operational data streams.",
    "An aerial view of a smart warehouse with autonomous forklifts and delivery drones.",
    "A scientist in a lab coat examining a beaker with a glowing liquid.",
    "High-end cosmetic products arranged in a minimalist, elegant composition.",
    "A cross-section of an advanced engine, showing intricate inner workings.",
    "A beautiful landscape shot of a factory that blends seamlessly with nature.",
]

for i, prompt in enumerate(image_prompts):
    local_filename = f"{local_image_dir}/image_{i}.png"
    gcs_blob_name = f"images/image_{i}.png"

    print(f"Generating image {i+1}/10 with prompt: '{prompt[:50]}...'")
    response = image_model.generate_images(prompt=prompt)
    response[0].save(local_filename)

    # Upload to GCS
    blob = bucket.blob(gcs_blob_name)
    blob.upload_from_filename(local_filename)
    gcs_uri = f"gs://{GCS_BUCKET_NAME}/{gcs_blob_name}"

    # Store metadata
    all_metadata.append({
        "asset_id": f"image_{i}",
        "asset_type": "image",
        "prompt": prompt,
        "gcs_uri": gcs_uri,
        "model_used": "imagen-3.0-generate-002"
    })
    print(f"Image {i+1} saved and uploaded to {gcs_uri}")

print("--- Image Generation Complete ---")


--- Starting Image Generation (Imagen 3) ---




Generating image 1/10 with prompt: 'A state-of-the-art chemical manufacturing plant at...'




Image 1 saved and uploaded to gs://meetupmarch/images/image_0.png
Generating image 2/10 with prompt: 'Macro shot of a new, sustainable consumer goods pr...'
Image 2 saved and uploaded to gs://meetupmarch/images/image_1.png
Generating image 3/10 with prompt: 'A team of engineers in a modern factory reviewing ...'
Image 3 saved and uploaded to gs://meetupmarch/images/image_2.png
Generating image 4/10 with prompt: 'Futuristic robotic arms assembling a complex piece...'
Image 4 saved and uploaded to gs://meetupmarch/images/image_3.png
Generating image 5/10 with prompt: 'A digital twin of a manufacturing facility, showin...'
Image 5 saved and uploaded to gs://meetupmarch/images/image_4.png
Generating image 6/10 with prompt: 'An aerial view of a smart warehouse with autonomou...'
Image 6 saved and uploaded to gs://meetupmarch/images/image_5.png
Generating image 7/10 with prompt: 'A scientist in a lab coat examining a beaker with ...'
Image 7 saved and uploaded to gs://meetupmarch/images/imag

```markdown
### 2.2 Generate Music with Lyria
```

In [40]:
# We need these libraries for making direct HTTP requests and handling authentication.
import requests
import google.auth
import google.auth.transport.requests
import base64
import json
import os

# --- Helper Functions Exactly as in the Notebook ---

def send_request_to_google_api(api_endpoint, data=None):
    """
    Sends an HTTP request to a Google API endpoint.
    """
    creds, project = google.auth.default()
    auth_req = google.auth.transport.requests.Request()
    creds.refresh(auth_req)
    access_token = creds.token
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json",
    }
    response = requests.post(api_endpoint, headers=headers, json=data)
    response.raise_for_status()
    return response.json()

def generate_music(api_endpoint, request: dict):
    """
    Wraps the request and calls the API.
    """
    req = {"instances": [request], "parameters": {}}
    resp = send_request_to_google_api(api_endpoint, req)
    return resp["predictions"]

# --- Define the Model URL ---
music_model = f"https://us-central1-aiplatform.googleapis.com/v1/projects/{PROJECT_ID}/locations/us-central1/publishers/google/models/lyria-002:predict"
local_music_dir = "generated_music"
os.makedirs(local_music_dir, exist_ok=True)

print("Setup complete. You can now run the cells below to generate each song individually.")


Setup complete. You can now run the cells below to generate each song individually.


In [41]:
#simple version - to be deleted
try:
    print("Generating Song 1...")
    # New, more descriptive prompt to avoid safety filters.
    prompt = "An uplifting and motivational corporate anthem for a quarterly results presentation, featuring bright piano, a steady electronic beat, and subtle synth pads."

    predictions = generate_music(music_model, {"prompt": prompt, "duration_secs": 20})

    for pred in predictions:
        bytes_b64 = dict(pred)["bytesBase64Encoded"]
        audio_data = base64.b64decode(bytes_b64)

        with open(f"{local_music_dir}/music_1.wav", "wb") as f:
            f.write(audio_data)
        print("SUCCESS: Song 1 saved as music_1.wav")
except Exception as e:
    print(f"ERROR generating Song 1: {e}")


Generating Song 1...
SUCCESS: Song 1 saved as music_1.wav


In [44]:
try:
    # --- Define Song Details ---
    song_number = 1
    prompt = "An uplifting and motivational corporate anthem for a quarterly results presentation, featuring bright piano, a steady electronic beat, and subtle synth pads."
    local_filename = f"{local_music_dir}/music_{song_number}.wav"
    gcs_blob_name = f"music/music_{song_number}.wav"

    print(f"Generating Song {song_number} using the correct method...")

    # --- This is YOUR working code for generating the music ---
    predictions = generate_music(
        music_model,
        {"prompt": prompt, "duration_secs": 20}
    )

    for pred in predictions:
        bytes_b64 = dict(pred)["bytesBase64Encoded"]
        audio_data = base64.b64decode(bytes_b64)

        # --- Save Locally (as in your working code) ---
        with open(local_filename, "wb") as f:
            f.write(audio_data)
        print(f"SUCCESS: Song {song_number} saved locally.")

        # --- ADDED: Upload to Google Cloud Storage ---
        bucket = storage_client.bucket(GCS_BUCKET_NAME)
        blob = bucket.blob(gcs_blob_name)
        blob.upload_from_filename(local_filename)
        print(f"SUCCESS: Song {song_number} uploaded to GCS.")

        # --- ADDED: Store metadata for BigQuery ---
        gcs_uri = f"gs://{GCS_BUCKET_NAME}/{gcs_blob_name}"
        all_metadata.append({
            "asset_id": f"music_{song_number}",
            "asset_type": "music",
            "prompt": prompt,
            "gcs_uri": gcs_uri,
            "model_used": "lyria-002-direct-api"
        })
        print(f"SUCCESS: Metadata for Song {song_number} stored.")

except Exception as e:
    print(f"ERROR generating Song {song_number}: {e}")



Generating Song 1 using the correct method...
SUCCESS: Song 1 saved locally.
SUCCESS: Song 1 uploaded to GCS.
SUCCESS: Metadata for Song 1 stored.


In [45]:
try:
    # --- Define Song Details ---
    song_number = 2
    prompt = "A minimal, ambient electronic track for a technology product showcase, calm and focused."
    local_filename = f"{local_music_dir}/music_{song_number}.wav"
    gcs_blob_name = f"music/music_{song_number}.wav"

    print(f"Generating Song {song_number} using the correct method...")

    # --- This is YOUR working code for generating the music ---
    predictions = generate_music(
        music_model,
        {"prompt": prompt, "duration_secs": 20}
    )

    for pred in predictions:
        bytes_b64 = dict(pred)["bytesBase64Encoded"]
        audio_data = base64.b64decode(bytes_b64)

        # --- Save Locally (as in your working code) ---
        with open(local_filename, "wb") as f:
            f.write(audio_data)
        print(f"SUCCESS: Song {song_number} saved locally.")

        # --- ADDED: Upload to Google Cloud Storage ---
        bucket = storage_client.bucket(GCS_BUCKET_NAME)
        blob = bucket.blob(gcs_blob_name)
        blob.upload_from_filename(local_filename)
        print(f"SUCCESS: Song {song_number} uploaded to GCS.")

        # --- ADDED: Store metadata for BigQuery ---
        gcs_uri = f"gs://{GCS_BUCKET_NAME}/{gcs_blob_name}"
        all_metadata.append({
            "asset_id": f"music_{song_number}",
            "asset_type": "music",
            "prompt": prompt,
            "gcs_uri": gcs_uri,
            "model_used": "lyria-002-direct-api"
        })
        print(f"SUCCESS: Metadata for Song {song_number} stored.")

except Exception as e:
    print(f"ERROR generating Song {song_number}: {e}")



Generating Song 2 using the correct method...
SUCCESS: Song 2 saved locally.
SUCCESS: Song 2 uploaded to GCS.
SUCCESS: Metadata for Song 2 stored.


In [46]:
try:
    # --- Define Song Details ---
    song_number = 3
    prompt = "A powerful, driving industrial beat with synth elements, suggesting innovation and power."
    local_filename = f"{local_music_dir}/music_{song_number}.wav"
    gcs_blob_name = f"music/music_{song_number}.wav"

    print(f"Generating Song {song_number} using the correct method...")

    # --- This is YOUR working code for generating the music ---
    predictions = generate_music(
        music_model,
        {"prompt": prompt, "duration_secs": 20}
    )

    for pred in predictions:
        bytes_b64 = dict(pred)["bytesBase64Encoded"]
        audio_data = base64.b64decode(bytes_b64)

        # --- Save Locally (as in your working code) ---
        with open(local_filename, "wb") as f:
            f.write(audio_data)
        print(f"SUCCESS: Song {song_number} saved locally.")

        # --- ADDED: Upload to Google Cloud Storage ---
        bucket = storage_client.bucket(GCS_BUCKET_NAME)
        blob = bucket.blob(gcs_blob_name)
        blob.upload_from_filename(local_filename)
        print(f"SUCCESS: Song {song_number} uploaded to GCS.")

        # --- ADDED: Store metadata for BigQuery ---
        gcs_uri = f"gs://{GCS_BUCKET_NAME}/{gcs_blob_name}"
        all_metadata.append({
            "asset_id": f"music_{song_number}",
            "asset_type": "music",
            "prompt": prompt,
            "gcs_uri": gcs_uri,
            "model_used": "lyria-002-direct-api"
        })
        print(f"SUCCESS: Metadata for Song {song_number} stored.")

except Exception as e:
    print(f"ERROR generating Song {song_number}: {e}")



Generating Song 3 using the correct method...
SUCCESS: Song 3 saved locally.
SUCCESS: Song 3 uploaded to GCS.
SUCCESS: Metadata for Song 3 stored.


In [47]:
try:
    # --- Define Song Details ---
    song_number = 4
    prompt = "An atmospheric and thoughtful soundscape for a documentary about sustainable manufacturing."
    local_filename = f"{local_music_dir}/music_{song_number}.wav"
    gcs_blob_name = f"music/music_{song_number}.wav"

    print(f"Generating Song {song_number} using the correct method...")

    # --- This is YOUR working code for generating the music ---
    predictions = generate_music(
        music_model,
        {"prompt": prompt, "duration_secs": 20}
    )

    for pred in predictions:
        bytes_b64 = dict(pred)["bytesBase64Encoded"]
        audio_data = base64.b64decode(bytes_b64)

        # --- Save Locally (as in your working code) ---
        with open(local_filename, "wb") as f:
            f.write(audio_data)
        print(f"SUCCESS: Song {song_number} saved locally.")

        # --- ADDED: Upload to Google Cloud Storage ---
        bucket = storage_client.bucket(GCS_BUCKET_NAME)
        blob = bucket.blob(gcs_blob_name)
        blob.upload_from_filename(local_filename)
        print(f"SUCCESS: Song {song_number} uploaded to GCS.")

        # --- ADDED: Store metadata for BigQuery ---
        gcs_uri = f"gs://{GCS_BUCKET_NAME}/{gcs_blob_name}"
        all_metadata.append({
            "asset_id": f"music_{song_number}",
            "asset_type": "music",
            "prompt": prompt,
            "gcs_uri": gcs_uri,
            "model_used": "lyria-002-direct-api"
        })
        print(f"SUCCESS: Metadata for Song {song_number} stored.")

except Exception as e:
    print(f"ERROR generating Song {song_number}: {e}")



Generating Song 4 using the correct method...
SUCCESS: Song 4 saved locally.
SUCCESS: Song 4 uploaded to GCS.
SUCCESS: Metadata for Song 4 stored.


```markdown
### 2.3 Generate Speech with Gemini TTS
```

In [None]:
# generate 1 file only test
from google.cloud import texttospeech

client = texttospeech.TextToSpeechClient()

synthesis_input = texttospeech.SynthesisInput(text="Hello, this is a test of the Gemini Text-to-Speech API.")
voice = texttospeech.VoiceSelectionParams(language_code="en-US", ssml_gender=texttospeech.SsmlVoiceGender.NEUTRAL)
audio_config = texttospeech.AudioConfig(audio_encoding=texttospeech.AudioEncoding.MP3)

response = client.synthesize_speech(input=synthesis_input, voice=voice, audio_config=audio_config)

with open("generated_speech.mp3", "wb") as out:
    out.write(response.audio_content)
    print('Audio content written to file "generated_speech.mp3"')

In [53]:
import os
from google.cloud import texttospeech
import google.auth

# --- THE DEFINITIVE FIX: SET THE ENVIRONMENT VARIABLE ---
# This command forces the project ID for the entire session at the OS level.
# This is the most powerful way to set the project and should override all other settings.
print(f"Setting environment variable GOOGLE_CLOUD_PROJECT to '{PROJECT_ID}'...")
os.environ['GOOGLE_CLOUD_PROJECT'] = PROJECT_ID
# --- END FIX ---


# We re-initialize the client one more time to be certain it picks up the new environment variable.
print("Re-initializing Text-to-Speech client...")
tts_client = texttospeech.TextToSpeechClient()
print("Client re-initialized.")


print("\n--- Starting Speech Generation (Text-to-Speech) ---")
local_speech_dir = "generated_speech"
os.makedirs(local_speech_dir, exist_ok=True)

speech_texts = [
    "Quarterly production targets have been exceeded by fifteen percent.",
    "Safety protocol update: All personnel must attend the mandatory briefing on Friday.",
    "The new supply chain optimization model is now live across all regions.",
    "Alert: Unscheduled maintenance is required for assembly line three.",
    "Our commitment to sustainable manufacturing has reduced our carbon footprint by 20% year-over-year.",
    "The next shareholder meeting will be held on July 25th to discuss Q2 earnings.",
    "Innovation in materials science is key to developing our next generation of products.",
    "Customer feedback indicates a 95% satisfaction rate with our new service portal.",
    "We are projecting a 10% growth in the consumer goods sector for the upcoming fiscal year.",
    "Emergency shutdown procedures for the chemical processing unit have been initiated. This is a drill.",
]

for i, text in enumerate(speech_texts):
    local_filename = f"{local_speech_dir}/speech_{i}.mp3"
    gcs_blob_name = f"speech/speech_{i}.mp3"

    print(f"\nGenerating speech {i+1}/10 for text: '{text[:50]}...'")

    try:
        synthesis_input = texttospeech.SynthesisInput(text=text)
        voice = texttospeech.VoiceSelectionParams(language_code="en-US", ssml_gender=texttospeech.SsmlVoiceGender.NEUTRAL)
        audio_config = texttospeech.AudioConfig(audio_encoding=texttospeech.AudioEncoding.MP3)

        response = tts_client.synthesize_speech(input=synthesis_input, voice=voice, audio_config=audio_config)

        with open(local_filename, "wb") as out:
            out.write(response.audio_content)
        print(f"SUCCESS: Speech {i+1} saved locally.")

        bucket = storage_client.bucket(GCS_BUCKET_NAME)
        blob = bucket.blob(gcs_blob_name)
        blob.upload_from_filename(local_filename)
        gcs_uri = f"gs://{GCS_BUCKET_NAME}/{gcs_blob_name}"
        print(f"SUCCESS: Speech {i+1} uploaded to GCS.")

        all_metadata.append({
            "asset_id": f"speech_{i}",
            "asset_type": "speech",
            "prompt": text,
            "gcs_uri": gcs_uri,
            "model_used": "google-text-to-speech"
        })
        print(f"SUCCESS: Metadata for speech {i+1} stored.")

    except Exception as e:
        print(f"ERROR generating speech for prompt {i+1}: {e}")
        print("If this error persists, the issue is with the notebook environment's core authentication and may require restarting the runtime or contacting support.")
        print("Skipping this entry.")

print("\n--- Speech Generation Complete ---")


Setting environment variable GOOGLE_CLOUD_PROJECT to 'geminienterprise-485114'...
Re-initializing Text-to-Speech client...
Client re-initialized.

--- Starting Speech Generation (Text-to-Speech) ---

Generating speech 1/10 for text: 'Quarterly production targets have been exceeded by...'
ERROR generating speech for prompt 1: 403 Cloud Text-to-Speech API has not been used in project 522309567947 before or it is disabled. Enable it by visiting https://console.developers.google.com/apis/api/texttospeech.googleapis.com/overview?project=522309567947 then retry. If you enabled this API recently, wait a few minutes for the action to propagate to our systems and retry. [reason: "SERVICE_DISABLED"
domain: "googleapis.com"
metadata {
  key: "service"
  value: "texttospeech.googleapis.com"
}
metadata {
  key: "serviceTitle"
  value: "Cloud Text-to-Speech API"
}
metadata {
  key: "containerInfo"
  value: "522309567947"
}
metadata {
  key: "consumer"
  value: "projects/522309567947"
}
metadata {
  

```markdown
## 3. Upload to Google Cloud Storage
```

In [None]:
# easy version
from google.cloud import storage

storage_client = storage.Client()
bucket = storage_client.bucket(GCS_BUCKET)

def upload_to_gcs(filename):
    blob = bucket.blob(filename)
    blob.upload_from_filename(filename)
    return f"gs://{GCS_BUCKET}/{filename}"

image_gcs_uri = upload_to_gcs("generated_image.png")
music_gcs_uri = upload_to_gcs("generated_music_prompt.txt")
speech_gcs_uri = upload_to_gcs("generated_speech.mp3")

print(f"Image URI: {image_gcs_uri}")
print(f"Music URI: {music_gcs_uri}")
print(f"Speech URI: {speech_gcs_uri}")

#  BigQuery Metadata Ingestion

In [None]:
print("\n--- Creating and Uploading Metadata File ---")
local_metadata_filename = "metadata.jsonl"
gcs_metadata_blob_name = "metadata/assets.jsonl"

with open(local_metadata_filename, "w") as f:
    for item in all_metadata:
        f.write(json.dumps(item) + "\n")

blob = bucket.blob(gcs_metadata_blob_name)
blob.upload_from_filename(local_metadata_filename)
metadata_gcs_uri = f"gs://{GCS_BUCKET_NAME}/{gcs_metadata_blob_name}"

print(f"Metadata file uploaded to {metadata_gcs_uri}")


# Create BigQuery External Table

In [None]:
print("\n--- Creating BigQuery External Table ---")

table_ref = dataset_ref.table(TABLE_ID)

# Define the schema for the external table
schema = [
    bigquery.SchemaField("asset_id", "STRING"),
    bigquery.SchemaField("asset_type", "STRING"),
    bigquery.SchemaField("prompt", "STRING"),
    bigquery.SchemaField("gcs_uri", "STRING"),
    bigquery.SchemaField("model_used", "STRING"),
]

external_config = bigquery.ExternalConfig("JSON")
external_config.source_uris = [metadata_gcs_uri]
external_config.schema = schema
# For JSONL, autodetect often works well, but explicit schema is safer.
# external_config.autodetect = True

# Create the table
try:
    bq_client.delete_table(table_ref, not_found_ok=True) # Delete if it exists to ensure a fresh start
    print(f"Existing table '{TABLE_ID}' deleted.")
except Exception as e:
    print(e)

table = bigquery.Table(table_ref)
table.external_data_configuration = external_config
table = bq_client.create_table(table)

print(f"External table '{table.project}.{table.dataset_id}.{table.table_id}' created successfully.")


# Analyze Data with BigQuery and Gemini

Before running the SQL: You'll need a reference to a Gemini model in BigQuery. If you don't have one, you can create it with this DDL command in BigQuery:

In [None]:
print("\n--- Creating BigQuery Remote Model ---")

# --- CONFIGURATION for BigQuery Connection ---
# !!! IMPORTANT !!!
# Please replace this with the actual ID of your BigQuery connection.
# The connection must be created in the same location as your dataset (e.g., 'us-central1').
CONNECTION_NAME = "your-bq-connection-to-vertex-ai" # e.g., "bq-vertex-connection"

# Define the SQL query to create the remote model
sql_create_model = f"""
CREATE OR REPLACE MODEL `{PROJECT_ID}.{DATASET_ID}.gemini_pro_vision_model`
REMOTE WITH CONNECTION `{PROJECT_ID}.{LOCATION}.{CONNECTION_NAME}`
OPTIONS (remote_service_type = 'CLOUD_AI_LARGE_LANGUAGE_MODEL_V1');
"""

print("Executing query to create or replace BigQuery remote model...")
# Execute the query using the BigQuery client
query_job = bq_client.query(sql_create_model)
query_job.result()  # Wait for the job to complete

print(f"BigQuery remote model `{DATASET_ID}.gemini_pro_vision_model` created or replaced successfully.")



In [None]:
print("\n--- Analyzing Images with Gemini in BigQuery ---")

# The fully qualified ID of the table we created earlier
full_table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"

# This query selects only the image assets, then passes their GCS URI and
# the original prompt to the Gemini model for analysis.
sql_analyze = f"""
SELECT
    prompt,
    gcs_uri,
    ml_generate_text_result['predictions'][0]['content'] AS gemini_analysis
FROM
    ML.GENERATE_TEXT(
        MODEL `{PROJECT_ID}.{DATASET_ID}.gemini_pro_vision_model`,
        (
            -- Subquery to select only the images and their URIs
            SELECT
                prompt,
                gcs_uri
            FROM
                `{full_table_id}`
            WHERE
                asset_type = 'image'
        ),
        STRUCT(
            -- This is the prompt for the LLM analysis itself
            'Describe this image in detail based on its content. Also, comment on how well it matches the original user prompt.' AS prompt,
            TRUE AS flatten_json_output
        )
    );
"""

print("Executing analysis query... This may take a few moments.")
# Execute the query and load the results into a pandas DataFrame
df = bq_client.query(sql_analyze).to_dataframe()

# Display the results in a clean markdown format
print("\n--- Gemini Vision Analysis Results ---")
print(df.to_markdown(index=False))



```markdown
## 4. Create BigQuery Table

Now we'll create a JSONL file with metadata about our generated assets and upload it to GCS. Then we'll create a BigQuery external table that points to this metadata file.
```

In [None]:
import json

metadata = [
    {"prompt": image_prompt, "gcs_uri": image_gcs_uri, "type": "image"},
    {"prompt": music_prompt, "gcs_uri": music_gcs_uri, "type": "music"},
    {"prompt": "Hello, this is a test of the Gemini Text-to-Speech API.", "gcs_uri": speech_gcs_uri, "type": "speech"}
]

with open("metadata.jsonl", "w") as f:
    for item in metadata:
        f.write(json.dumps(item) + "\n")

metadata_gcs_uri = upload_to_gcs("metadata.jsonl")
print(f"Metadata GCS URI: {metadata_gcs_uri}")

In [None]:
from google.cloud import bigquery

bq_client = bigquery.Client()

dataset_id = f"{PROJECT_ID}.{BIGQUERY_DATASET}"
try:
    bq_client.get_dataset(dataset_id)  # Make an API request.
    print(f"Dataset {dataset_id} already exists.")
except Exception:
    dataset = bigquery.Dataset(dataset_id)
    dataset.location = LOCATION
    dataset = bq_client.create_dataset(dataset, timeout=30)
    print(f"Created dataset {PROJECT_ID}.{dataset.dataset_id}")

table_id = f"{dataset_id}.multimodal_assets"
schema = [
    bigquery.SchemaField("prompt", "STRING"),
    bigquery.SchemaField("gcs_uri", "STRING"),
    bigquery.SchemaField("type", "STRING"),
]

external_config = bigquery.ExternalConfig("NEWLINE_DELIMITED_JSON")
external_config.source_uris = [metadata_gcs_uri]
external_config.schema = schema
table = bigquery.Table(table_id, schema=schema)
table.external_data_configuration = external_config
table = bq_client.create_table(table, exists_ok=True)

print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")

```markdown
## 5. Analyze Data with BigQuery and Gemini

Finally, we'll create a remote model in BigQuery that points to the Gemini Pro Vision model. This will allow us to analyze the images directly from BigQuery using SQL.
```

In [None]:
# This step assumes you have a BigQuery connection to Vertex AI set up.
# See: https://cloud.google.com/bigquery/docs/create-cloud-resource-connection
CONNECTION_NAME = "your-bq-connection-to-vertex-ai"

sql_create_model = f"""
CREATE OR REPLACE MODEL `{PROJECT_ID}.{BIGQUERY_DATASET}.gemini_vision_model`
REMOTE WITH CONNECTION `{PROJECT_ID}.{LOCATION}.{CONNECTION_NAME}`
OPTIONS (remote_service_type = 'CLOUD_AI_LARGE_LANGUAGE_MODEL_V1');
"""

query_job = bq_client.query(sql_create_model)
query_job.result()  # Wait for the job to complete
print("BigQuery remote model created.")

In [None]:
sql_analyze = f"""
SELECT
    prompt,
    gcs_uri,
    ml_generate_text_result['predictions'][0]['content'] AS gemini_analysis
FROM
    ML.GENERATE_TEXT(
        MODEL `{PROJECT_ID}.{BIGQUERY_DATASET}.gemini_vision_model`,
        (SELECT prompt, gcs_uri FROM `{table_id}` WHERE type = 'image'),
        STRUCT('Analyze the following image:' AS prompt, TRUE AS flatten_json_output)
    );
"""

df = bq_client.query(sql_analyze).to_dataframe()
print(df.to_markdown())