In [None]:
# Install the packages
! pip3 install --upgrade google-cloud-aiplatform \
                         google-cloud-bigquery\
                         google-cloud-storage\
                         bigframes\
                         pandas-gbq\
                         db-dtypes

## セットアップ

In [None]:
from google.colab import auth
auth.authenticate_user()

## 環境変数を定義

In [None]:
# BigQuery関連の設定
PROJECT_ID = None  # @param {type:"string"} Google CloudプロジェクトID
REGION = "us-central1"  # @param {type:"string"} 使用するリージョン
BQ_DATASET_ID = None  # @param {type:"string"} BigQueryのデータセットID
BQ_TABLE_ID = None  # @param {type:"string"} BigQueryのテーブルID

# Feature Storeの設定
FEATURE_ONLINE_STORE_ID = None  # @param {type:"string"} Feature StoreのオンラインストアID
FEATURE_VIEW_ID = None  # @param {type:"string"} Feature StoreのビューID

# スケジュール設定
# スケジュールはCRON設定に基づいて作成されます。
# CRONが空の場合、即時スケジュールジョブが開始されます。
CRON_SCHEDULE = "TZ=Asia/Tokyo 0 8 * * *"  # @param {type:"string"} スケジュール設定（東京時間で毎日午前9時）

# ベクトル検索の設定
DIMENSIONS = 768  # @param {type:"number"} ベクトルの次元数
EMBEDDING_COLUMN = "embedding"  # @param {type:"string"} 埋め込みを保持する列名

# オプショナル設定
LEAF_NODE_EMBEDDING_COUNT = 10000  # @param {type:"number"} リーフノードの埋め込み数（オプショナル）
FILTER_COLUMNS = ["title"]  # @param {type:"string"} フィルタリングに使用する列（オプショナル）


## ユーティリティ関数を定義

In [None]:
# Google Cloud AI Platform関連のインポート
from google.cloud import aiplatform
from google.cloud.aiplatform_v1beta1 import (
    FeatureOnlineStoreAdminServiceClient,
    FeatureOnlineStoreServiceClient
)
from google.cloud.aiplatform_v1beta1.types import (
    NearestNeighborQuery,
    feature_online_store as feature_online_store_pb2,
    feature_online_store_admin_service as feature_online_store_admin_service_pb2,
    feature_online_store_service as feature_online_store_service_pb2,
    feature_view as feature_view_pb2
)
from google.protobuf import struct_pb2
from vertexai.language_models import TextEmbeddingModel

# Google Cloud BigQuery関連のインポート
from google.cloud import bigquery

# Google Cloud Storage関連のインポート
from google.cloud import storage

# その他のインポート
import bigframes.pandas as bpd
import random
import base64
import time
import typing


aiplatform.init(project=PROJECT_ID, location=REGION)

API_ENDPOINT = f"{REGION}-aiplatform.googleapis.com"

admin_client = FeatureOnlineStoreAdminServiceClient(
    client_options={"api_endpoint": API_ENDPOINT}
)



# Set BigQuery DataFrames options
bpd.options.bigquery.project = PROJECT_ID
bpd.options.bigquery.location = "us"

def list_gcs_files(bucket_name, prefix):
    """Return a list of file names in the specified GCS bucket and prefix without the prefix."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blobs = bucket.list_blobs(prefix=prefix)
    # Remove the prefix from each file name
    return [blob.name.replace(prefix, '') for blob in blobs]

def list_files_from_metadata(metadata_paths):
    """Return a list of file names from the metadata txt files."""
    file_list = []
    storage_client = storage.Client()

    for metadata_path in metadata_paths:
        bucket_name, file_path = metadata_path.split("gs://")[1].split("/", 1)
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(file_path)
        content = blob.download_as_text()
        # Convert each line to its corresponding image path with .jpg extension
        file_list.extend([line + ".jpg" for line in content.splitlines()])

    return file_list

def upload_to_gcs(local_file, bucket_name, gcs_path):
    """Upload a local file to GCS."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(gcs_path)
    blob.upload_from_filename(local_file)

In [None]:
import base64
import time
import typing

from google.cloud import aiplatform
from google.protobuf import struct_pb2


class EmbeddingResponse(typing.NamedTuple):
    text_embedding: typing.Sequence[float]
    image_embedding: typing.Sequence[float]


def load_image_bytes(image_uri: str) -> bytes:
    """Load image bytes from a remote or local URI."""
    image_bytes = None
    if image_uri.startswith("http://") or image_uri.startswith("https://"):
        response = requests.get(image_uri, stream=True)
        if response.status_code == 200:
            image_bytes = response.content
    else:
        image_bytes = open(image_uri, "rb").read()
    return image_bytes


class EmbeddingPredictionClient:
    """Wrapper around Prediction Service Client."""

    def __init__(
        self,
        project: str,
        location: str = "us-central1",
        api_regional_endpoint: str = "us-central1-aiplatform.googleapis.com",
    ):
        client_options = {"api_endpoint": api_regional_endpoint}
        # Initialize client that will be used to create and send requests.
        # This client only needs to be created once, and can be reused for multiple requests.
        self.client = aiplatform.gapic.PredictionServiceClient(
            client_options=client_options
        )
        self.location = location
        self.project = project

    def get_embedding(self, text: str = None, image_file: str = None, model_name: str = "multimodalembedding@001"):
        if not text and not image_file:
            raise ValueError("At least one of text or image_file must be specified.")

        # Load image file
        image_bytes = None
        if image_file:
            image_bytes = load_image_bytes(image_file)

        instance = struct_pb2.Struct()
        if text:
            instance.fields["text"].string_value = text

        if image_bytes:
            encoded_content = base64.b64encode(image_bytes).decode("utf-8")
            image_struct = instance.fields["image"].struct_value
            image_struct.fields["bytesBase64Encoded"].string_value = encoded_content

        instances = [instance]

        endpoint = (
            f"projects/{self.project}/locations/{self.location}/publishers/google/models/{model_name}"
        )

        response = self.client.predict(endpoint=endpoint, instances=instances)
        text_embedding = None

        if text:
            text_emb_value = response.predictions[0]["textEmbedding"]
            text_embedding = [v for v in text_emb_value]

        image_embedding = None
        if image_bytes:
            image_emb_value = response.predictions[0]["imageEmbedding"]
            image_embedding = [v for v in image_emb_value]

        return EmbeddingResponse(
            text_embedding=text_embedding, image_embedding=image_embedding
        )

    def get_embedding_text_model(self, text: str = None, model_name: str = "textembedding-gecko@001", task_type: str = "RETRIEVAL_QUERY"):
        model = TextEmbeddingModel.from_pretrained(model_name)
        embeddings = model.get_embeddings([text])
        for embedding in embeddings:
            vector = embedding.values
            print(f"Length of Embedding Vector: {len(vector)}")
        return EmbeddingResponse(
            text_embedding=vector, image_embedding=None
        )

In [None]:
import copy
from typing import List, Optional

import numpy as np
import requests
from tenacity import retry, stop_after_attempt

client = EmbeddingPredictionClient(project=PROJECT_ID)


# Use a retry handler in case of failure
@retry(reraise=True, stop=stop_after_attempt(3))
def encode_texts_to_embeddings_with_retry(text: List[str], model_name: str) -> List[List[float]]:
    assert len(text) == 1

    try:
        return [client.get_embedding(text=text[0], image_file=None, model_name=model_name).text_embedding]
    except Exception as e:
        raise RuntimeError("Error getting embedding.")

# Use a retry handler in case of failure
@retry(reraise=True, stop=stop_after_attempt(3))
def encode_texts_to_text_model_embeddings_with_retry(text: List[str], model_name: str) -> List[List[float]]:
    assert len(text) == 1

    try:
        return [client.get_embedding_text_model(text=text[0], model_name=model_name).text_embedding]
    except Exception as e:
        raise RuntimeError("Error getting embedding.")

def encode_texts_to_embeddings(text: List[str], model_name: str) -> List[Optional[List[float]]]:
    try:
        return encode_texts_to_embeddings_with_retry(text=text, model_name=model_name)
    except Exception:
        return [None for _ in range(len(text))]

def encode_texts_to_text_model_embeddings(text: List[str], model_name: str) -> List[Optional[List[float]]]:
    try:
        return encode_texts_to_text_model_embeddings_with_retry(text=text, model_name=model_name)
    except Exception:
        return [None for _ in range(len(text))]


@retry(reraise=True, stop=stop_after_attempt(3))
def encode_images_to_embeddings_with_retry(image_uris: List[str], model_name: str) -> List[List[float]]:
    assert len(image_uris) == 1

    try:
        return [
            client.get_embedding(text=None, image_file=image_uris[0], model_name=model_name).image_embedding
        ]
    except Exception as ex:
        raise RuntimeError("Error getting embedding.")


def encode_images_to_embeddings(image_uris: List[str], model_name: str) -> List[Optional[List[float]]]:
    try:
        return encode_images_to_embeddings_with_retry(image_uris=image_uris, model_name=model_name)
    except Exception as ex:
        print(ex)
        return [None for _ in range(len(image_uris))]

In [None]:
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Callable, Generator, List

from tqdm.auto import tqdm


def generate_batches(
    inputs: List[str], batch_size: int
) -> Generator[List[str], None, None]:
    """
    Generator function that takes a list of strings and a batch size, and yields batches of the specified size.
    """

    for i in range(0, len(inputs), batch_size):
        yield inputs[i : i + batch_size]


API_IMAGES_PER_SECOND = 2


def encode_to_embeddings_chunked(
    process_function: Callable[[List[str]], List[Optional[List[float]]]],
    items: List[str],
    model_name: str,
    batch_size: int = 1,
) -> List[Optional[List[float]]]:
    """
    Function that encodes a list of strings into embeddings using a process function.
    It takes a list of strings and returns a list of optional lists of floats.
    The data is processed in chunks to prevent out-of-memory errors.
    """

    embeddings_list: List[Optional[List[float]]] = []

    # Prepare the batches using a generator
    batches = generate_batches(items, batch_size)

    seconds_per_job = batch_size / API_IMAGES_PER_SECOND

    with ThreadPoolExecutor() as executor:
        futures = []
        for batch in tqdm(batches, total=len(items) // batch_size, position=0):
            futures.append(executor.submit(process_function, batch, model_name))
            time.sleep(seconds_per_job)

        for future in futures:
            embeddings_list.extend(future.result())
    return embeddings_list

## データ準備

In [None]:
# エンベディング抽出対象画像をGCSに配置
selected_paths = ["chinsuko_3.jpg",
 "okinawa-soba-3.jpg",
 "umibudo_3.jpeg"]

# それぞれの商品画像の概要を定義
descriptions = ["ちんすこうは沖縄の伝統的な焼き菓子で、サクサクとした食感と優しい甘さが魅力です。主な材料は小麦粉、砂糖、ラードで、これらを練り合わせて焼き上げます。元々は王族や貴族のための菓子でしたが、今では沖縄のお土産として広く親しまれています。そのバリエーションは豊富で、プレーンタイプから、黒糖や塩、紅芋など様々なフレーバーが楽しめます。軽い食感と独特の風味が、お茶請けやおやつに最適です。",
                 "沖縄そばは、日本の沖縄県独特の麺料理です。中太のちぢれた麺は小麦粉を主原料とし、コシがありつるつるとした食感が特徴です。通常、豚肉やかまぼこ、ネギといった具材がトッピングされ、煮込んだ豚の骨から取ったコクのあるスープで味わいます。沖縄そばは、そのシンプルながら深い味わいで、地元民はもちろんのこと観光客にも愛されています。伝統的な沖縄の食文化の一環として、多くの食堂や専門店で提供されています。",
                 "海ぶどうは沖縄の海岸近くで育つ海藻の一種で、小さな透明な粒がブドウのように連なっていることからその名がつきました。プチプチとした食感と、海のミネラルを感じる独特の風味が特徴です。生で食べることが多く、ポン酢や醤油で味付けして楽しまれます。海ぶどうは、沖縄料理の中でも特に珍しい食材であり、観光客には新鮮な体験として人気があります。栄養価が高く、健康的な食品としても注目されています。"]

# 売上額を定義
sales_amounts = [200000000, 1000000000, 100000000]

# 売上数を定義
number_of_sales_list = [133333, 1250000, 142857]

## ベクトル抽出（画像から（マルチモーダル））

In [None]:
%%time
# Encode a sample subset of images
image_embeddings = encode_to_embeddings_chunked(
    process_function=encode_images_to_embeddings, items=selected_paths, model_name="multimodalembedding@001"
)

# Keep only non-None embeddings
indexes_to_keep, image_embeddings = zip(
    *[
        (index, embedding)
        for index, embedding in enumerate(image_embeddings)
        if embedding is not None
    ]
)

print(f"Processed {len(indexes_to_keep)} embeddings successfully")

## ベクトル抽出（文章から（マルチモーダル））

In [None]:
%%time
# Encode a sample subset of images
text_embeddings = encode_to_embeddings_chunked(
    process_function=encode_texts_to_embeddings, items=descriptions, model_name="multimodalembedding@001"
)

# Keep only non-None embeddings
indexes_to_keep, text_embeddings = zip(
    *[
        (index, embedding)
        for index, embedding in enumerate(text_embeddings)
        if embedding is not None
    ]
)

print(f"Processed {len(indexes_to_keep)} embeddings successfully")

## ベクトル抽出（文章から（テキストエンベディング））

In [None]:
%%time
# Encode a sample subset of images
text_embeddings = encode_to_embeddings_chunked(
    process_function=encode_texts_to_text_model_embeddings, items=descriptions, model_name="textembedding-gecko-multilingual@001"
)

# text_embeddings
# Keep only non-None embeddings
indexes_to_keep, text_embeddings = zip(
    *[
        (index, embedding)
        for index, embedding in enumerate(text_embeddings)
        if embedding is not None
    ]
)

print(f"Processed {len(indexes_to_keep)} embeddings successfully")

## BigQuery にアップロード

In [None]:
import pandas as pd
from pandas_gbq import to_gbq


def extract_name(path):
    return path.split('/')[-2]

df = pd.DataFrame({'title': pd.Series(selected_paths).map(extract_name).tolist(),
                   'embedding': text_embeddings, # テキストの場合
                  #  'embedding': image_embeddings, # 画像の場合
                   'image_path': selected_paths,
                   'description': descriptions,
                   'sales_amount': sales_amounts,
                   'number_of_sales': number_of_sales_list})
df

In [None]:
# 書き込み先テーブル
BQ_TABLE_ID_FQN = f"{BQ_DATASET_ID}.{BQ_TABLE_ID}"

# スキーマを定義
table_schema=[
    {'name': 'title', 'type': 'STRING'},
    {'name': 'embedding', 'type': 'FLOAT64', 'mode': 'REPEATED'},
    {'name': 'image_path', 'type': 'STRING'},
    {'name': 'description', 'type': 'STRING'},
    {'name': 'sales_amount', 'type': 'INT64'},
    {'name': 'number_of_sales', 'type': 'INT64'},
]

# ロード
to_gbq(df, BQ_TABLE_ID_FQN, project_id=PROJECT_ID, if_exists='append', table_schema=table_schema)

## 手動同期ジョブを実行

BigQuery と オンラインストアを同期

In [None]:
sync_response = admin_client.sync_feature_view(
    feature_view=f"projects/{PROJECT_ID}/locations/{REGION}/featureOnlineStores/{FEATURE_ONLINE_STORE_ID}/featureViews/{FEATURE_VIEW_ID}"
)

In [None]:
import time

while True:
    feature_view_sync = admin_client.get_feature_view_sync(
        name=sync_response.feature_view_sync
    )
    if feature_view_sync.run_time.end_time.seconds > 0:
        status = "Succeed" if feature_view_sync.final_status.code == 0 else "Failed"
        print(f"Sync {status} for {feature_view_sync.name}.")
        # wait a little more for the job to properly shutdown
        time.sleep(30)
        break
    else:
        print("Sync ongoing, waiting for 30 seconds.")
    time.sleep(30)