# Bloque 1

In [1]:
ALUMNO = 'civica'

# Consulta datos en BigQuery

In [2]:
from google.cloud import bigquery

client = bigquery.Client()

Tomamos solo los datos del centro de distribución 1

In [3]:
sql = """
SELECT * 
FROM `ia-ugr.ecommerce.products`
WHERE distribution_center_id = 1
AND name IS NOT NULL;
"""

In [4]:
products = client.query(sql).to_dataframe()

products.head()

Unnamed: 0,id,cost,category,name,brand,retail_price,department,sku,distribution_center_id
0,15674,3.10625,Plus,Low Profile Dyed Cotton Twill Cap - Navy W39S55D,MG,6.25,Women,63894CE404B8C652915C41EF8B879D20,1
1,15816,3.1773,Plus,Low Profile Dyed Cotton Twill Cap - Putty W39S55D,MG,5.95,Women,151EA8C2D98CE89C2336324C11B1E107,1
2,15981,20.43,Plus,Echo Design Women's Knit Touch Glove And Earbu...,ECHO,45.0,Women,A1C9A80B51FB1A982B13329A1DC58F95,1
3,15917,1.638,Plus,Keds Popcorn Socks in 3 Pack Different Style G...,Keds,3.5,Women,7737A2600285AFE739FC99B6F0E9FD97,1
4,15448,11.858,Plus,ASICS Unisex Adult Thrmopolis LT Beanie,ASICS,22.0,Women,349F259C872C43D1EA241BA414C5B70B,1


# Preparación embeddings

In [5]:
PROJECT_ID = 'ia-ugr'
REGION = "us-central1" 

## Definimos una función para generar chunks de nuestra tabla y evitar errores de memoria:

### Función `query_bigquery_chunks`

Consulta datos de BigQuery en bloques (chunks) y genera DataFrames de pandas. Cada DataFrame añade una columna `text_to_index` con información concatenada de la categoría, nombre, departamento y precio del producto.

#### Argumentos:
- `max_rows` (**int**): Número máximo de filas a consultar.
- `rows_per_chunk` (**int**): Número de filas en cada chunk.
- `start_chunk` (**int**): Índice inicial desde el que comenzar la consulta.

#### Salida:
- **Yields** `pd.DataFrame`: DataFrames con los datos consultados y procesados, generados uno a uno.


In [6]:
import math
from typing import Any, Generator

import pandas as pd
from google.cloud import bigquery

client = bigquery.Client(project=PROJECT_ID)

# LO VAMOS A HACER SOLAMENTE PARA EL CENTRO DE DISTRIBUCIÓN 1
QUERY_TEMPLATE = """
SELECT * 
FROM `ia-ugr.ecommerce.products`
WHERE distribution_center_id = 1
AND name IS NOT NULL
LIMIT {limit}
OFFSET {offset};
"""


def query_bigquery_chunks(
    max_rows: int, rows_per_chunk: int, start_chunk: int = 0
) -> Generator[pd.DataFrame, Any, None]:
    for offset in range(start_chunk, max_rows, rows_per_chunk):
        query = QUERY_TEMPLATE.format(limit=rows_per_chunk, offset=offset)
        query_job = client.query(query)
        rows = query_job.result()
        df = rows.to_dataframe()
#        df["text_to_index"] = df.name

        df["text_to_index"] = "Category: " + df.category + ". " \
        "Name: " + df.name + ". "\
        "Department: " + df.department + ". "\
        "Price: " + df['retail_price'].astype(str)

        df = df[df['text_to_index'].notna()]

        yield df

### Probamos la función generando 1 chunk de nuestra tabla:

In [7]:
# Get a dataframe of 1000 rows for demonstration purposes
df = next(query_bigquery_chunks(max_rows=1000, rows_per_chunk=1000))

# Examine the data
df.head()

Unnamed: 0,id,cost,category,name,brand,retail_price,department,sku,distribution_center_id,text_to_index
0,15674,3.10625,Plus,Low Profile Dyed Cotton Twill Cap - Navy W39S55D,MG,6.25,Women,63894CE404B8C652915C41EF8B879D20,1,Category: Plus. Name: Low Profile Dyed Cotton ...
1,15816,3.1773,Plus,Low Profile Dyed Cotton Twill Cap - Putty W39S55D,MG,5.95,Women,151EA8C2D98CE89C2336324C11B1E107,1,Category: Plus. Name: Low Profile Dyed Cotton ...
2,15981,20.43,Plus,Echo Design Women's Knit Touch Glove And Earbu...,ECHO,45.0,Women,A1C9A80B51FB1A982B13329A1DC58F95,1,Category: Plus. Name: Echo Design Women's Knit...
3,15917,1.638,Plus,Keds Popcorn Socks in 3 Pack Different Style G...,Keds,3.5,Women,7737A2600285AFE739FC99B6F0E9FD97,1,Category: Plus. Name: Keds Popcorn Socks in 3 ...
4,15448,11.858,Plus,ASICS Unisex Adult Thrmopolis LT Beanie,ASICS,22.0,Women,349F259C872C43D1EA241BA414C5B70B,1,Category: Plus. Name: ASICS Unisex Adult Thrmo...


## Definimos una función para generar los embeddings vectoriales:

### Funciones auxiliares:

#### Función `encode_texts_to_embeddings`

Genera embeddings vectoriales para una lista de frases utilizando el modelo de Vertex AI. Devuelve una lista de vectores de embeddings.

##### Argumentos:
- `sentences` (**List[str]**): Lista de frases a convertir en embeddings.

##### Salida:
- **List[Optional[List[float]]]**: Lista de embeddings vectoriales para cada frase, o `None` para frases que causaron excepciones.

*Es necesario habilitar Vertex AI API*

In [8]:
from typing import List, Optional

# Load the "Vertex AI Embeddings for Text" model
from vertexai.preview.language_models import TextEmbeddingModel

model = TextEmbeddingModel.from_pretrained("textembedding-gecko@001")

# Define an embedding method that uses the model
def encode_texts_to_embeddings(sentences: List[str]) -> List[Optional[List[float]]]:
    try:
        embeddings = model.get_embeddings(sentences)
        return [embedding.values for embedding in embeddings]
    except Exception:
        print(f"Entran las sentences: {sentences}")
        print("Es exception")
        return [None for _ in range(len(sentences))]

#### Función `generate_batches`

Genera lotes de frases de un tamaño específico. Según la documentación, cada solicitud puede manejar hasta cinco instancias de texto. Por lo tanto, se generarán lotes de cinco.

##### Argumentos:
- `sentences` (**List[str]**): La lista completa de frases de la cual generar lotes.
- `batch_size` (**int**): El número de frases en cada lote.

##### Salida:
- **Generator[List[str], None, None]**: Un generador que produce lotes de frases, donde cada lote es una lista de frases.


In [9]:
import functools
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Generator, List, Tuple

import numpy as np
from tqdm.auto import tqdm


# Generator function to yield batches of sentences
def generate_batches(
    sentences: List[str], batch_size: int
) -> Generator[List[str], None, None]:
    for i in range(0, len(sentences), batch_size):
        yield sentences[i : i + batch_size]

### Funciones principal para generar los embeddings:

#### Función `encode_text_to_embedding_batched`

Genera los embeddings utilizando llamando a la API por lotes. Esta función procesa los lotes y recupera embeddings de manera paralela, asegurando no superar el límite de llamadas API por segundo. Utiliza las funciones definidas anteriormente.

##### Argumentos:
- `sentences` (**List[str]**): Lista de frases a convertir en embeddings.
- `api_calls_per_second` (**int**): Máximo número de llamadas API permitidas por segundo, default a 10.
- `batch_size` (**int**): Número de frases en cada lote, default a 5.

##### Salida:
- **Tuple[List[bool], np.ndarray]**: Una tupla donde el primer elemento es una lista de booleanos que indica el éxito de la conversión para cada frase, y el segundo elemento es un array de NumPy conteniendo los embeddings generados para las frases exitosas.


In [10]:
def encode_text_to_embedding_batched(
    sentences: List[str], api_calls_per_second: int = 10, batch_size: int = 5
) -> Tuple[List[bool], np.ndarray]:

    embeddings_list: List[List[float]] = []

    # Prepare the batches using a generator
    batches = generate_batches(sentences, batch_size)

    seconds_per_job = 1 / api_calls_per_second

    with ThreadPoolExecutor() as executor:
        futures = []
        for batch in tqdm(
            batches, total=math.ceil(len(sentences) / batch_size), position=0
        ):
            if isinstance(batch, list):
                futures.append(
                    executor.submit(functools.partial(encode_texts_to_embeddings), batch)
                )
            else:
                futures.append(
                    executor.submit(functools.partial(encode_texts_to_embeddings), batch.tolist())
                )
            time.sleep(seconds_per_job)

        for future in futures:
            embeddings_list.extend(future.result())

    is_successful = [
        embedding is not None for sentence, embedding in zip(sentences, embeddings_list)
    ]

    embeddings_list_successful = np.squeeze(
        np.stack([embedding for embedding in embeddings_list if embedding is not None])
    )
    return is_successful, embeddings_list_successful

### Realizamos una prueba utilizando el df generado anteriormente (1 chunk de nuestra tabla):

In [11]:
# Encode a subset of questions for validation
questions = df.text_to_index.tolist()[:500]
is_successful, question_embeddings = encode_text_to_embedding_batched(
    sentences=df.text_to_index.tolist()[:500]
)

# Filter for successfully embedded sentences
questions = np.array(questions)[is_successful]

  0%|          | 0/100 [00:00<?, ?it/s]

In [12]:
DIMENSIONS = len(question_embeddings[0])

print(DIMENSIONS)

768


#### Comprobamos la similitud entre los embeddings generados

Cogemos un ejemplo al azar para ver la similitud que tiene con el resto de items

1. **Selección aleatoria**:
   - Se selecciona un índice aleatorio entre 0 y 99.

2. **Cálculo similitud**:
   - Se calcula la similitud para el item seleccionado en comparación con el resto utilizando el producto escalar (`np.dot`).

3. **Se muestran las 20 mejores coincidencias**:
   - Se emparejan y ordenan los items originales con sus puntajes de similitud.


In [13]:
import random

question_index = random.randint(0, 99)

print(f"Query question = {questions[question_index]}")

# Get similarity scores for each embedding by using dot-product.
scores = np.dot(question_embeddings[question_index], question_embeddings.T)

# Print top 20 matches
for index, (question, score) in enumerate(
    sorted(zip(questions, scores), key=lambda x: x[1], reverse=True)[:20]
):
    print(f"\t{index}: {question}: {score}")

Query question = Category: Plus. Name: SPANXÂ® - In-Power Line Super Higher Power - Nude. Department: Women. Price: 38.0
	0: Category: Plus. Name: SPANXÂ® - In-Power Line Super Higher Power - Nude. Department: Women. Price: 38.0: 0.9999986505895756
	1: Category: Plus. Name: Sag Harbor Women's Slimming Panel Pant. Department: Women. Price: 28.0: 0.8505490673220277
	2: Category: Plus. Name: Velrose Snip-it Long Pant Liner. Department: Women. Price: 19.950000762939453: 0.8500964658424228
	3: Category: Plus. Name: Pamela Mann Plain Stripe Suspender Tights - Available in Medium XL & XXL & XXXL. Department: Women. Price: 17.950000762939453: 0.8376499054346906
	4: Category: Plus. Name: Bra Discs Nipple Covers Style 410x. Department: Women. Price: 21.350000381469727: 0.8360589049746044
	5: Category: Plus. Name: Lamaze Maternity Nursing Camisole. Department: Women. Price: 25.0: 0.82962320846486
	6: Category: Swim. Name: Womens Sexy Tie-Up Soft Spandex Halter Bikini Set. Department: Women. Price

## Generamos los embeddings a partir de varios chunks y los guardamos en formato JSONL

In [14]:
from google.cloud import bigquery

products_count = bigquery.Client().query("SELECT count(*) FROM `ia-ugr.ecommerce.products` WHERE distribution_center_id = 1 AND name IS NOT NULL").to_dataframe()
products_count.head()

Unnamed: 0,f0_
0,3890


In [15]:
import tempfile
from pathlib import Path

# Create temporary file to write embeddings to
embeddings_file_path = Path(tempfile.mkdtemp())

print(f"Embeddings directory: {embeddings_file_path}")

Embeddings directory: /var/tmp/tmpzsh4hrq6


In [16]:
import gc
import json

BQ_NUM_ROWS = 3890
BQ_CHUNK_SIZE = 778
BQ_NUM_CHUNKS = math.ceil(BQ_NUM_ROWS / BQ_CHUNK_SIZE)

START_CHUNK = 0

# Create a rate limit of 300 requests per minute. Adjust this depending on your quota.
API_CALLS_PER_SECOND = 300 / 60
# According to the docs, each request can process 5 instances per request
ITEMS_PER_REQUEST = 10

# Loop through each generated dataframe, convert
for i, df in tqdm(
    enumerate(
        query_bigquery_chunks(
            max_rows=BQ_NUM_ROWS, rows_per_chunk=BQ_CHUNK_SIZE, start_chunk=START_CHUNK
        )
    ),
    total=BQ_NUM_CHUNKS - START_CHUNK,
    position=-1,
    desc="Chunk of rows from BigQuery",
):
    # Create a unique output file for each chunk
    chunk_path = embeddings_file_path.joinpath(
        f"{embeddings_file_path.stem}_{i+START_CHUNK}.json"
    )
    with open(chunk_path, "a") as f:
        id_chunk = df.id

        # Convert batch to embeddings
        is_successful, question_chunk_embeddings = encode_text_to_embedding_batched(
            sentences=df.text_to_index,
            api_calls_per_second=API_CALLS_PER_SECOND,
            batch_size=ITEMS_PER_REQUEST,
        )

        # Append to file
        embeddings_formatted = [
            json.dumps(
                {
                    "id": str(id),
                    "content": str(df["text_to_index"].astype(str)),
                    "embedding": [str(value) for value in embedding],
                }
            )
            + "\n"
            for id, embedding in zip(id_chunk[is_successful], question_chunk_embeddings)
        ]
        
        print("Ejemplo del embedding")
        print(embeddings_formatted)
        
        f.writelines(embeddings_formatted)

        # Delete the DataFrame and any other large data structures
        del df
        gc.collect()

Chunk of rows from BigQuery:   0%|          | 0/5 [00:00<?, ?it/s]

  0%|          | 0/78 [00:00<?, ?it/s]

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



  0%|          | 0/78 [00:00<?, ?it/s]

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



  0%|          | 0/78 [00:00<?, ?it/s]

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



  0%|          | 0/78 [00:00<?, ?it/s]

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



  0%|          | 0/78 [00:00<?, ?it/s]

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



### Creamos un bucket para guardar los embeddings generados

In [17]:
BUCKET_URI = f"gs://bucket-{PROJECT_ID}-{ALUMNO}"

In [18]:
! gsutil mb -l {REGION} -p {PROJECT_ID} {BUCKET_URI}

Creating gs://bucket-ia-ugr-civica/...


In [19]:
remote_folder = f"{BUCKET_URI}/{embeddings_file_path.stem}/"
! gsutil -m cp -r {embeddings_file_path}/* {remote_folder}

Copying file:///var/tmp/tmpzsh4hrq6/tmpzsh4hrq6_0.json [Content-Type=application/json]...
Copying file:///var/tmp/tmpzsh4hrq6/tmpzsh4hrq6_1.json [Content-Type=application/json]...
Copying file:///var/tmp/tmpzsh4hrq6/tmpzsh4hrq6_2.json [Content-Type=application/json]...
Copying file:///var/tmp/tmpzsh4hrq6/tmpzsh4hrq6_3.json [Content-Type=application/json]...
Copying file:///var/tmp/tmpzsh4hrq6/tmpzsh4hrq6_4.json [Content-Type=application/json]...
\ [5/5 files][ 71.4 MiB/ 71.4 MiB] 100% Done                                    
Operation completed over 5 objects/71.4 MiB.                                     


# Creamos el índice

In [20]:
DISPLAY_NAME = "ecommerce"
DESCRIPTION = "products data"

In [21]:
from google.cloud import aiplatform

aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)

In [None]:
DIMENSIONS = 768

tree_ah_index = aiplatform.MatchingEngineIndex.create_tree_ah_index(
    display_name=DISPLAY_NAME,
    contents_delta_uri=remote_folder,
    dimensions=DIMENSIONS,
    approximate_neighbors_count=150,
    distance_measure_type="DOT_PRODUCT_DISTANCE",
    leaf_node_embedding_count=500,
    leaf_nodes_to_search_percent=80,
    description=DESCRIPTION,
)

Creating MatchingEngineIndex
Create MatchingEngineIndex backing LRO: projects/1043238928011/locations/us-central1/indexes/2793590765341638656/operations/7479610950992003072
MatchingEngineIndex created. Resource name: projects/1043238928011/locations/us-central1/indexes/2793590765341638656
To use this MatchingEngineIndex in another session:
index = aiplatform.MatchingEngineIndex('projects/1043238928011/locations/us-central1/indexes/2793590765341638656')


In [None]:
INDEX_RESOURCE_NAME = tree_ah_index.resource_name
INDEX_RESOURCE_NAME

'projects/1043238928011/locations/us-central1/indexes/2793590765341638656'

In [None]:
tree_ah_index = aiplatform.MatchingEngineIndex(index_name=INDEX_RESOURCE_NAME)

# Creamos el endpoint y lo desplegamos

In [None]:
my_index_endpoint = aiplatform.MatchingEngineIndexEndpoint.create(
    display_name=DISPLAY_NAME,
    description=DISPLAY_NAME,
    public_endpoint_enabled=True,
)

Creating MatchingEngineIndexEndpoint
Create MatchingEngineIndexEndpoint backing LRO: projects/1043238928011/locations/us-central1/indexEndpoints/6459098649556156416/operations/8589185309185409024
MatchingEngineIndexEndpoint created. Resource name: projects/1043238928011/locations/us-central1/indexEndpoints/6459098649556156416
To use this MatchingEngineIndexEndpoint in another session:
index_endpoint = aiplatform.MatchingEngineIndexEndpoint('projects/1043238928011/locations/us-central1/indexEndpoints/6459098649556156416')


In [None]:
DEPLOYED_INDEX_ID = f"products_data_index_{ALUMNO}"

DEPLOYED_INDEX_ID

'products_data_index_civica'

In [None]:
my_index_endpoint = my_index_endpoint.deploy_index(
    index=tree_ah_index, deployed_index_id=DEPLOYED_INDEX_ID
)

my_index_endpoint.deployed_indexes

Deploying index MatchingEngineIndexEndpoint index_endpoint: projects/1043238928011/locations/us-central1/indexEndpoints/6459098649556156416
Deploy index MatchingEngineIndexEndpoint index_endpoint backing LRO: projects/1043238928011/locations/us-central1/indexEndpoints/6459098649556156416/operations/6115020263898742784
MatchingEngineIndexEndpoint index_endpoint Deployed index. Resource name: projects/1043238928011/locations/us-central1/indexEndpoints/6459098649556156416


[id: "products_data_index_civica"
index: "projects/1043238928011/locations/us-central1/indexes/2793590765341638656"
create_time {
  seconds: 1713509219
  nanos: 305780000
}
index_sync_time {
  seconds: 1713510126
  nanos: 345456000
}
automatic_resources {
  min_replica_count: 2
  max_replica_count: 2
}
deployment_group: "default"
]

## Verificamos que el número de items se corresponde con el número de embeddings

In [None]:
number_of_vectors = sum(
    aiplatform.MatchingEngineIndex(
        deployed_index.index
    )._gca_resource.index_stats.vectors_count
    for deployed_index in my_index_endpoint.deployed_indexes
)

print(f"Expected: {BQ_NUM_ROWS}, Actual: {number_of_vectors}")

Expected: 3890, Actual: 3890


# Borramos el endpoint y el bucket

In [None]:
# import os

# delete_bucket = False

# # Force undeployment of indexes and delete endpoint
# my_index_endpoint.delete(force=True)

# # Delete indexes
# tree_ah_index.delete()

# if delete_bucket or os.getenv("IS_TESTING"):
#     ! gsutil rm -rf {BUCKET_URI}