# Gemini in BigQuery

In [None]:
# @title 🥸
# python imports
import json
import os
import re
import ipywidgets as widgets
from IPython.display import display, Markdown

# these are the required services
required_services = [
    'aiplatform.googleapis.com',
    #'cloudaicompanion.googleapis.com',
    #'dataplex.googleapis.com',
    'compute.googleapis.com',
    'dataform.googleapis.com',
    'bigqueryconnection.googleapis.com'
]
filter_list = [f"(config.name:{service} AND state:ENABLED) OR " for service in required_services]
filter = "".join(filter_list)[:-4]

# PROJECT
project_ = os.environ['GOOGLE_CLOUD_PROJECT']
def on_project_change(change):
    if change['type'] == 'change' and change['name'] == 'value':
        global project_
        project_ = change['new']

project__ = widgets.Text(
    value= project_,
    placeholder='Project ID',
    description='Project ID:',
    disabled=False
)
project__.observe(on_project_change)
display(project__)

# REGION
region_ = os.environ['GOOGLE_CLOUD_REGION']
def on_region_change(change):
    if change['type'] == 'change' and change['name'] == 'value':
        global region_
        region_ = change['new']

region__ = widgets.Text(
    value= region_,
    placeholder='Region',
    description='Region :',
    disabled=False
)
region__.observe(on_region_change)

display(region__)

# DATASET
dataset_ = f"demo_ds"
def on_dataset_change(change):
    if change['type'] == 'change' and change['name'] == 'value':
        global dataset_
        dataset_ = change['new']

dataset__ = widgets.Text(
    value= dataset_,
    description='Dataset',
    disabled=False
)
dataset__.observe(on_dataset_change)
display(dataset__)

# CONNECTION
connection_ = f"my-connection"

def on_connection_change(change):
    if change['type'] == 'change' and change['name'] == 'value':
        global connection_
        connection_ = change['new']

connection__ = widgets.Text(
    value= connection_,
    description='Connection',
    disabled=False
)
connection__.observe(on_connection_change)
display(connection__)

# MODEL
model_ = "gemini-flash"

def on_model_change(change):
    if change['type'] == 'change' and change['name'] == 'value':
        global model_
        model_ = change['new']

model__ = widgets.Text(
    value= model_,
    description='Model',
    disabled=False
)
model__.observe(on_model_change)
display(model__)

# BUCKET
bucket_ = "gs://vertexit-golden/videos/*"
bucket_name_ = ""
bucket_file_ = ""

def update_bucket_info(bucket):
  bre = re.search("gs://(.*?)/(.*)", bucket)
  if bre:
    global bucket_, bucket_name_, bucket_file_
    bucket_ = bucket
    bucket_name_ = bre.group(1)
    bucket_file_ = bre.group(2)

def on_bucket_change(change):
  if change['type'] == 'change' and change['name'] == 'value':
        update_bucket_info(change['new'])

bucket__ = widgets.Text(
    value= bucket_,
    description='Bucket',
    disabled=False
)
bucket__.observe(on_bucket_change)
display(bucket__)
update_bucket_info(bucket_)

# get the current user account
result = !gcloud auth list --filter="status:ACTIVE" --format="value(account)"
user_ = widgets.Text(
    value= result.nlstr,
    placeholder='User',
    description='User :',
    disabled=True
)
display(user_)

# service account
saccount_ = 'undefined'
saccount__ = widgets.Text(
    value= saccount_,
    description='SA',
    disabled=True
)
display(saccount__)

def extract_service_account(s):
    print(s.nlstr)
    g = re.search(r'{ *"serviceAccountId" *: *"([^"]+)"', s.nlstr)
    global saccount_
    if g:
      saccount_ = g.group(1)
      saccount__.value = saccount_
    else:
      saccount_ = 'unavailable'

def vars_dict():
  return {"project_": project_,
   "region_": region_,
   "connection_": connection_,
   "model_": model_,
   "saccount_": saccount_,
   "dataset_":dataset_,
   "bucket_":bucket_}

def cell_magic_wrapper(line, query):
    from google.cloud.bigquery.magics.magics import _cell_magic
    q = query.format(**vars_dict())
    print(q)
    return _cell_magic(line, q)

# this is a hack for a variable substituion in queries
ip = get_ipython()
ip.register_magic_function(cell_magic_wrapper, magic_kind="cell", magic_name="bigquery")

class StopExecution(Exception):
    def _render_traceback_(self):
        return []

In [None]:
# @title Check services [optional]

# get the activated services
result = !gcloud services list --enabled --filter="$filter" --format="json(name)"
json_result = 42
try:
  json_result = json.loads(result.nlstr)
except Exception as e:
  print(result.nlstr)
  raise StopExecution()
activated_services = [re.search('([^\/]+$)',service["name"]).group(0) for service in json_result]
activated_services_map = dict.fromkeys(activated_services,True)
service_map = {name: name in activated_services_map for name in required_services}
for name,enabled in service_map.items():
    print(name.ljust(64, ' '), "🟢" if enabled else "🔴  << PLEASE ACTIVATE BEFORE PROCEEDING")

# Create an external connection
We require an external connection to the Gemini API. BigQuery distinguishes between multi-regions (```us``` and ```eu```) and single regions (```us-central1``` and ```europe-west1```).

In [None]:
!bq --project_id="{project_}" --location="{region_}" mk --connection --connection_type=CLOUD_RESOURCE "{connection_}"

In [None]:
# @title 🐞
print(f'!bq --project_id="{project_}" --location="{region_}" mk --connection --connection_type=CLOUD_RESOURCE "{connection_}"')

# Show connection Details

Get more details about the created connection, e.g. the related service account.

In [None]:
# execute the command
result = !bq --project_id="{project_}" --location="{region_}" show --connection "{project_}.{region_}.{connection_}"
extract_service_account(result)

In [None]:
# @title 🐞
print(f'!bq --project_id="{project_}" --location="{region_}" show --connection "{project_}.{region_}.{connection_}"')

# Assign role Vertex AI User to service account

The created BigQuery connection uses a service account to access the Vertex AI APIs. This is the reason why we have to assign the **Vertex AI User** (*roles/aiplatform.user*) to it:


In [None]:
!gcloud projects add-iam-policy-binding "{project_}" --role=roles/aiplatform.user --condition="None" --member "serviceAccount:{saccount_}"

In [None]:
# @title 🐞
print(f'!gcloud projects add-iam-policy-binding "{project_}" --role=roles/aiplatform.user --condition="None" --member "serviceAccount:{saccount_}"')

# Create a dataset to store your model [optional]
The dataset is the level where ai-models are stored. Either create a new dataset or use an existing one.

In [None]:
!bq --project_id="{project_}" --location="{region_}" mk --dataset "{project_}:{dataset_}"

In [None]:
# @title 🐞
print(f'!bq --project_id="{project_}" --location="{region_}" mk --dataset "{project_}:{dataset_}"')

# Create the model

In [None]:
%%bigquery
CREATE OR REPLACE MODEL `{project_}.{dataset_}.{model_}`
REMOTE WITH CONNECTION `{project_}.{region_}.{connection_}`
OPTIONS(endpoint = 'gemini-1.5-flash');


In [None]:
# @title 🐞
print(f"""%%bigquery
CREATE OR REPLACE MODEL `{project_}.{dataset_}.{model_}`
REMOTE WITH CONNECTION `{project_}.{region_}.{connection_}`
OPTIONS(endpoint = 'gemini-1.5-flash');""")

# Use Gemini Flash with structured data
The public dateset `bigquery-public-data.bbc_news.fulltext` is located in the region `US`. If your model is located in a different region then please change the table.

In [None]:
%%bigquery
WITH selected AS (
  SELECT CONCAT('Return a list of sentences in this article that cite a statistic: ', body) AS prompt
  FROM `bigquery-public-data.bbc_news.fulltext` LIMIT 5
)
SELECT ml_generate_text_llm_result
FROM
  ML.GENERATE_TEXT(
    MODEL `{project_}.{dataset_}.{model_}`,
    TABLE selected,
    STRUCT(
      0.2 AS temperature,
      1024 AS max_output_tokens,
      TRUE AS FLATTEN_JSON_OUTPUT)
  );

In [None]:
# @title 🐞
print(f"""%%bigquery
WITH selected AS (
  SELECT CONCAT('Return a list of sentences in this article that cite a statistic: ', body) AS prompt
  FROM `bigquery-public-data.bbc_news.fulltext` LIMIT 5
)
SELECT ml_generate_text_llm_result
FROM
  ML.GENERATE_TEXT(
    MODEL `{project_}.{dataset_}.{model_}`,
    TABLE selected,
    STRUCT(
      0.2 AS temperature,
      1024 AS max_output_tokens,
      TRUE AS FLATTEN_JSON_OUTPUT)
  );""")

# Use Gemini Flash with unstructured data
This example demonstrates how you can use unstructured data like video, audio, PDFs in BigQuery. First we have to create an object table in BigQuery. This object table contains metadata of objects stored in Cloud Storage.

In [None]:
%%bigquery
CREATE OR REPLACE EXTERNAL TABLE `{project_}.{dataset_}.object_table`
WITH CONNECTION `{project_}.{region_}.{connection_}`
OPTIONS(
  object_metadata = 'SIMPLE',
  uris = ['{bucket_}']
);

In [None]:
# @title 🐞
print(f"""%%bigquery
CREATE OR REPLACE EXTERNAL TABLE `{project_}.{dataset_}.object_table`
WITH CONNECTION `{project_}.{region_}.{connection_}`
OPTIONS(
  object_metadata = 'SIMPLE',
  uris = [{bucket_}]
);""")

# Assign role Object Viewer to service account

In [None]:
!gsutil iam ch serviceAccount:{saccount_}:objectViewer gs://{bucket_name_}

In [None]:
# @title 🐞
print(f'!gsutil iam ch serviceAccount:{saccount_}:objectViewer gs://{bucket_name_}')

# Check the content of the object table [optional]

In [None]:
%%bigquery
SELECT * FROM `{project_}.{dataset_}.object_table` LIMIT 5;

In [None]:
%%bigquery
SELECT * FROM EXTERNAL_OBJECT_TRANSFORM(TABLE `{project_}.{dataset_}.object_table`, ['SIGNED_URL']);

In [None]:
# @title 🐞
print(f"""%%bigquery
SELECT * FROM `{project_}.{dataset_}.object_table` LIMIT 5;""")

In [None]:
%%bigquery
SELECT ml_generate_text_llm_result, ml_generate_text_status, signed_url  FROM
ML.GENERATE_TEXT(
  MODEL `{project_}.{dataset_}.{model_}`,
  TABLE `{project_}.{dataset_}.object_table`,
  STRUCT(0.2 AS temperature,
  'Erzeuge eine Zusammenfassung des Videos' AS PROMPT,
  TRUE AS FLATTEN_JSON_OUTPUT)) result
JOIN EXTERNAL_OBJECT_TRANSFORM(
  TABLE `{project_}.{dataset_}.object_table`, ['SIGNED_URL']
) transformed ON result.uri = transformed.uri;

In [None]:
# @title 🐞
print(f"""%%bigquery
SELECT * FROM
ML.GENERATE_TEXT(
  MODEL `{project_}.{dataset_}.{model_}`,
  TABLE `{project_}.{dataset_}.object_table`,
  STRUCT(0.2 AS temperature,
  'Erzeuge eine Zusammenfassung des Videos' AS PROMPT,
  TRUE AS FLATTEN_JSON_OUTPUT)) result
JOIN EXTERNAL_OBJECT_TRANSFORM(
  TABLE `{project_}.{dataset_}.object_table`, ['SIGNED_URL']
) transformed ON result.uri = transformed.uri;
""")