# VertexAI, LangChain and numeric data

---

## Example Scenario

This notebook uses Facebook dataset which is filled with stats about Facebook Posts. Such as type, date posted, engagement etc.

The goals are:

- (_Usecase 1_) For marketers: Build a new AI-powered hybrid search, where users can describe their questions in simple English text, along with regular filters (like POST type, etc.)


Dataset:
- The dataset for this notebook is dataset_facebook2.csv

## Overview of the steps

1. Download the dataset and load it into a PostgreSQL table called `fb_stats` has the following columns.
  
`page_total_likes`,`post_type`,`category`,`post_month`,`post_weekday`,`post_hour`,`paid`,`lifetime_post_total_reach`,`lifetime_post_total_impressions`,`lifetime_engaged_users`,`lifetime_post_consumers`,`lifetime_post_consumptions`,
`lifetime_post_impressions_by_people_who_liked_your_page`,`lifetime_post_reach_by_people_who_like_your_page`,`lifetime_people_have_liked_your_page_engaged_your_post`,`comments`,`likes`,`shares`,`total_interactions`.

2. Make each row its own vector and then into smaller chunks and generate
   vector embeddings for each chunk. The vector embeddings are then stored in another PostgreSQL table called `fb_stats_embeddings` using the `pgvector` extension. The `fb_stats_embeddings` table has a foreign key referencing the `fb_stat` table.
3. For a given user query, generate its vector embeddings and use `pgvector`
   vector similarity search operators to find closest matching fb_stats _after applying the relevant SQL filters._
4. Once matching rows and their descriptions are found, use the [MapReduceChain](https://python.langchain.com/docs/modules/chains/document/map_reduce) from LangChain framework to generate a summarized high-quality context using an LLM model (Google PaLM in this case).
5. Finally, pass the context to an LLM prompt to answer the user query. The LLM
   model will return a well-formatted natural sounding English result back to
   the user.
   
---

&nbsp;
&nbsp;
&nbsp;

## Setup

### Install required packages



In [None]:
# Install dependencies.
!pip install asyncio==3.4.3 asyncpg==0.27.0 cloud-sql-python-connector["asyncpg"]==1.2.3
!pip install numpy==1.22.4 pandas==1.5.3
!pip install pgvector==0.1.8
!pip install langchain==0.0.196 transformers==4.30.1
!pip install google-cloud-aiplatform==1.26.0

In [None]:
# Automatically restart kernel after installs so that your environment
# can access the new packages.
import IPython

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

### Setup Google Cloud environment


In [None]:


# Please fill in these values.
project_id = ""  # @param {type:"string"}
database_password = ""  # @param {type:"string"}
region = ""  # @param {type:"string"}
instance_name = ""  # @param {type:"string"}
database_name = ""  # @param {type:"string"}
database_user = ""  # @param {type:"string"}


# Quick input validations.
assert project_id, "Please provide a Google Cloud project ID"
assert region, "Please provide a Google Cloud region"
assert instance_name, "Please provide the name of your instance"
assert database_name, "Please provide a database name"
assert database_user, "Please provide a database user"
assert database_password, " Please provide a database password"

In [None]:
#@markdown ###Authenticate Account and enable APIs.
# Authenticate gcloud.
from google.colab import auth
auth.authenticate_user()

# Configure gcloud.
!gcloud config set project {project_id}

# Grant Cloud SQL Client role to authenticated user
current_user = !gcloud auth list --filter=status:ACTIVE --format="value(account)"

!gcloud projects add-iam-policy-binding {project_id} \
  --member=user:{current_user[0]} \
  --role="roles/cloudsql.client"


# Enable Cloud SQL Admin API
!gcloud services enable sqladmin.googleapis.com
!gcloud services enable aiplatform.googleapis.com

### Setup Cloud SQL instance and PostgreSQL database

In [None]:
#@markdown Create and setup a Cloud SQL PostgreSQL instance, if not done already.
database_version = !gcloud sql instances describe {instance_name} --format="value(databaseVersion)"
if database_version[0].startswith("POSTGRES"):
  print("Found an existing Postgres Cloud SQL Instance!")
else:
  print("Creating new Cloud SQL instance...")
  !gcloud sql instances create {instance_name} --database-version=POSTGRES_15 \
    --region={region} --cpu=1 --memory=4GB --root-password={database_password}

# Create the database, if it does not exist.
out = !gcloud sql databases list --instance={instance_name} --filter="NAME:{database_name}" --format="value(NAME)"
if ''.join(out) == database_name:
  print("Database %s already exists, skipping creation." % database_name)
else:
  !gcloud sql databases create {database_name} --instance={instance_name}

# Create the database user for accessing the database.
!gcloud sql users create {database_user} \
  --instance={instance_name} \
  --password={database_password}

In [None]:
# @markdown Verify that you are able to connect to the database. Executing this block should print the current PostgreSQL server version.

import asyncio
import asyncpg
from google.cloud.sql.connector import Connector


async def main():
    # get current running event loop to be used with Connector
    loop = asyncio.get_running_loop()
    # initialize Connector object as async context manager
    async with Connector(loop=loop) as connector:
        # create connection to Cloud SQL database
        conn: asyncpg.Connection = await connector.connect_async(
            f"{project_id}:{region}:{instance_name}",  # Cloud SQL instance connection name
            "asyncpg",
            user=f"{database_user}",
            password=f"{database_password}",
            db=f"{database_name}"
            # ... additional database driver args
        )

        # query Cloud SQL database
        results = await conn.fetch("SELECT * from fb_stats")
        print(results[1])

        # close asyncpg connection
        await conn.close()


# Test connection with `asyncio`
await main()  # type: ignore

## Prepare data

### Download and load the dataset in PostgreSQL

In [None]:
# Load dataset from content and store it in a pandas dataframe.

import pandas as pd
import os

DATASET_URL ="/content/dataset_Facebook2.csv"
df = pd.read_csv(DATASET_URL)

df = df.dropna()
df.head(1)
#converting floats to ints
df.columns =['page_total_likes','post_type','category','post_month','post_weekday','post_hour','paid','lifetime_post_total_reach','lifetime_post_total_impressions','lifetime_engaged_users','lifetime_post_consumers','lifetime_post_consumptions',
'lifetime_post_impressions_by_people_who_liked_your_page','lifetime_post_reach_by_people_who_like_your_page','lifetime_people_have_liked_your_page_engaged_your_post','comments','likes','shares','total_interactions']


#df['post_type'] = [str(x) for x in df['post_type']]

#df.post_type.describe

In [None]:
df['row_id'] = df.index
#df.rename(columns={'row_d': 'row_id'}, inplace=True)
df['row_id']

In [None]:
df

In [None]:
# Save the Pandas dataframe in a PostgreSQL table.

import asyncio
import asyncpg
from google.cloud.sql.connector import Connector


async def main():
    loop = asyncio.get_running_loop()
    async with Connector(loop=loop) as connector:
        # Create connection to Cloud SQL database
        conn: asyncpg.Connection = await connector.connect_async(
            f"{project_id}:{region}:{instance_name}",  # Cloud SQL instance connection name
            "asyncpg",
            user=f"{database_user}",
            password=f"{database_password}",
            db=f"{database_name}",
        )
     # do we delete old one now?
       #await conn.execute("DROP TABLE IF EXISTS fb_stats CASCADE")
        # Create the `fb_stats` table.
        await conn.execute(
            """CREATE TABLE fb_stats(
                                row_id INTEGER PRIMARY KEY,
                                page_total_likes NUMERIC,
post_type VARCHAR,category NUMERIC,
post_month NUMERIC,post_weekday NUMERIC,post_hour NUMERIC,
paid NUMERIC(10,4),lifetime_post_total_reach NUMERIC,lifetime_post_total_impressions NUMERIC,
lifetime_engaged_users NUMERIC,lifetime_post_consumers NUMERIC,lifetime_post_consumptions NUMERIC,
lifetime_post_impressions_by_people_who_liked_your_page NUMERIC,
lifetime_post_reach_by_people_who_like_your_page NUMERIC,
lifetime_people_have_liked_your_page_engaged_your_post NUMERIC,
comments NUMERIC,likes NUMERIC(10,4),shares NUMERIC(10,4),total_interactions NUMERIC)"""
        )


        await conn.close()


# Run the SQL commands now.
await main()  # type: ignore

In [None]:
#load the dataframe to the `fb_stats` table.
import pandas as pd
import psycopg2
import asyncio
import asyncpg
from google.cloud.sql.connector import Connector

async def load():
    # get current running event loop to be used with Connector
    loop = asyncio.get_running_loop()
    # initialize Connector object as async context manager
    async with Connector(loop=loop) as connector:
        # create connection to Cloud SQL database
        conn: asyncpg.Connection = await connector.connect_async(
            f"{project_id}:{region}:{instance_name}",  # Cloud SQL instance connection name
            "asyncpg",
            user=f"{database_user}",
            password=f"{database_password}",
            db=f"{database_name}"
            # ... additional database driver args
        )
        # query Cloud SQL database
        # Load the dataframe into the PostgreSQL table

        cols = list(df.columns)
        tuples = [tuple(row) for row in df.values]

        await conn.copy_records_to_table(
            "fb_stats", records=tuples, columns=list(df), timeout=10
        )

        # close asyncpg connection
        await conn.close()


# Test connection with `asyncio`
await load()  # type: ignore


In [None]:
# get the month
def get_month(month):
  if month == 1: return 'January'
  elif month == 2: return 'Feburary'
  elif month == 3: return 'March'
  elif month == 4: return 'April'
  elif month == 5: return 'May'
  elif month == 6: return 'June'
  elif month == 7: return 'July'
  elif month == 8: return 'August'
  elif month == 9: return 'September'
  elif month == 10: return 'October'
  elif month == 11: return 'November'
  elif month == 12: return 'December'

# get the day of the week
def get_weekday(day):
  if day == 1: return 'Sunday'
  elif day == 2: return 'Monday'
  elif day == 3: return 'Tuesday'
  elif day == 4: return 'Wednesday'
  elif day == 5: return 'Thursday'
  elif day == 6: return 'Friday'
  elif day == 7: return 'Saturday'

# lets add column name and values to one string
def create_string_from_row(df, row_index):
  """Creates a string for a row in a Pandas DataFrame, prepended with the column name.

  Args:
    df: The Pandas DataFrame.
    row_index: The index of the row to be processed.

  Returns:
    A string representing the row, with a description of the value.
  """

  #column_names = df.columns
  #row_values = df.iloc[row_index].tolist()

  string = ""
  '''for i, value in enumerate(row_values):
    column_name = column_names[i]
    string += f"{column_name}={value}; "
'''
  string += f"Page Total likes is {df.page_total_likes[row_index]}.The type of Post is {df.post_type[row_index]}."
  string += f"The Category is {df.category[row_index]}. The Month the Post was made is {get_month(df.post_month[row_index])}."
  string += f"Was it a paid Post {False if df.paid[row_index] == 0 else True}.The Lifetime Post Total Reach is {df.lifetime_post_total_reach[row_index]} viewers."
  string += f"The Lifetime Post Total Impressions for this Post is {df.lifetime_post_total_impressions[row_index]}.The Lifetime Engaged Users for this Post is {df.lifetime_engaged_users[row_index]}."
  string += f"The Lifetime Post Consumers for this Post is {df.lifetime_post_consumers[row_index]}.The Lifetime Post Consumptions for this Post is {df.lifetime_post_consumptions[row_index]}."
  string += f"The Lifetime Post Impressions by people who have liked your Page is {df.lifetime_post_impressions_by_people_who_liked_your_page[row_index]}.The Lifetime Post reach by people who like your Page {df.lifetime_post_reach_by_people_who_like_your_page[row_index]}"
  string += f"The Lifetime People who have liked your Page and engaged with your post is {df.lifetime_people_have_liked_your_page_engaged_your_post[row_index]}.Comments for this Post is {df.comments[row_index]}."
  string += f"The Like count for this Post is {df.comments[row_index]}.How many People shared this Post is {df.shares[row_index]}."
  string += f"The Total Interactions for this Post is {df.total_interactions[row_index]}."

  return string

if __name__ == "__main__":

  string = create_string_from_row(df, 1)

  print(string)

## Vector Embeddings

### Generate vector embeddings using a Text Embedding model

Step 1: Split long stats description text into smaller chunks

- The stats descriptions can be much longer than what can fit into a single API request for generating the vector embedding.

- For example, Vertex AI text embedding model accepts a maximum of 3,072 input tokens for a single API request.

- Use the `RecursiveCharacterTextSplitter` from LangChain library to split
the description into smaller chunks of 500 characters each.

In [None]:
# Split long text descriptions into smaller chunks that can fit into
# the API request size limit, as expected by the LLM providers.

from langchain.text_splitter import RecursiveCharacterTextSplitter

text_splitter = RecursiveCharacterTextSplitter(
    separators=[".", "\n"],
    chunk_size=500,
    chunk_overlap=0,
    length_function=len,
)
chunked = []
cnt =0;
for index, row in df.iterrows():
    row_id = row["row_id"]
    desc = create_string_from_row(df,row_id)
    print(cnt, desc)
    cnt= cnt + 1
    splits = text_splitter.create_documents([desc])
    for s in splits:
        r = {"row_id": row_id, "content": s.page_content}
        chunked.append(r)

Step 2: Generate vector embedding for each chunk by calling an Embedding Generation service

-Vertex AI text embedding model is used to generate vector embeddings, which outputs a 768-dimensional vector for each chunk of text.



In [None]:
# Generate the vector embeddings for each chunk of text.
# This code snippet may run for a few minutes.

from langchain.embeddings import VertexAIEmbeddings
from google.cloud import aiplatform
import time

aiplatform.init(project=f"{project_id}", location=f"{region}")
embeddings_service = VertexAIEmbeddings()


# Helper function to retry failed API requests with exponential backoff.
def retry_with_backoff(func, *args, retry_delay=5, backoff_factor=2, **kwargs):
    max_attempts = 10
    retries = 0
    for i in range(max_attempts):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            print(f"error: {e}")
            retries += 1
            wait = retry_delay * (backoff_factor**retries)
            print(f"Retry after waiting for {wait} seconds...")
            time.sleep(wait)


batch_size = 5
for i in range(0, len(chunked), batch_size):
    request = [x["content"] for x in chunked[i : i + batch_size]]
    response = retry_with_backoff(embeddings_service.embed_documents, request)
    # Store the retrieved vector embeddings for each chunk back.
    for x, e in zip(chunked[i : i + batch_size], response):
        x["embedding"] = e

# Store the generated embeddings in a pandas dataframe.
fb_stats_embeddings = pd.DataFrame(chunked)
fb_stats_embeddings.head()

### Use pgvector to store the generated embeddings within PostgreSQL

- The `pgvector` extension introduces a new `vector` data type.
- **The new `vector` data type allows you to directly save a vector embedding (represented as a NumPy array) through a simple INSERT statement in PostgreSQL!**



In [None]:
# Store the generated vector embeddings in a PostgreSQL table.
# This code may run for a few minutes.

import asyncio
import asyncpg
from google.cloud.sql.connector import Connector
import numpy as np
from pgvector.asyncpg import register_vector


async def main():
    loop = asyncio.get_running_loop()
    async with Connector(loop=loop) as connector:
        # Create connection to Cloud SQL database.
        conn: asyncpg.Connection = await connector.connect_async(
            f"{project_id}:{region}:{instance_name}",  # Cloud SQL instance connection name
            "asyncpg",
            user=f"{database_user}",
            password=f"{database_password}",
            db=f"{database_name}",
        )

        await conn.execute("CREATE EXTENSION IF NOT EXISTS vector")
        await register_vector(conn)

        await conn.execute("DROP TABLE IF EXISTS fb_stats_embeddings")
        # Create the `fb_stats_embeddings` table to store vector embeddings.
        await conn.execute(
            """CREATE TABLE fb_stats_embeddings(
                                row_id INTEGER NOT NULL REFERENCES fb_stats(row_id),
                                content TEXT,
                                embedding vector(768))"""
        )

        # Store all the generated embeddings back into the database.
        for index, row in fb_stats_embeddings.iterrows():
            await conn.execute(
                "INSERT INTO fb_stats_embeddings (row_id, content, embedding) VALUES ($1, $2, $3)",
                row["row_id"],
                row["content"],
                np.array(row["embedding"]),
            )

        await conn.close()


# Run the SQL commands now.
await main()  # type: ignore

### Finding similar row ids using pgvector cosine search operator - not using, it didn't work consistently enough. and not enough data returned.


In [None]:
#didn't work, not used
#@markdown Enter your question about the Facebook data:
fb_stat = "lifetime_post_impressions_by_people_who_liked_your_page"  # @param {type:"string"}

# Quick input validations.
assert fb_stat, "⚠️ Please input a valid input search text"

from langchain.embeddings import VertexAIEmbeddings
from google.cloud import aiplatform

from pgvector.asyncpg import register_vector
import asyncio
import asyncpg
from google.cloud.sql.connector import Connector

aiplatform.init(project=f"{project_id}", location=f"{region}")

embeddings_service = VertexAIEmbeddings()
qe = embeddings_service.embed_query([fb_stat])

matches = []


async def main():
    loop = asyncio.get_running_loop()
    async with Connector(loop=loop) as connector:
        # Create connection to Cloud SQL database.
        conn: asyncpg.Connection = await connector.connect_async(
            f"{project_id}:{region}:{instance_name}",  # Cloud SQL instance connection name
            "asyncpg",
            user=f"{database_user}",
            password=f"{database_password}",
            db=f"{database_name}",
        )

        await register_vector(conn)
        similarity_threshold = 0.1
        num_matches = 50

        # Find similar data to the query using cosine similarity search
        # over all vector embeddings. This new feature is provided by `pgvector`.
        results = await conn.fetch(
            """
                            WITH vector_matches AS (
                              SELECT row_id, 1 - (embedding <=> $1) AS similarity
                              FROM fb_stats_embeddings
                              WHERE 1 - (embedding <=> $1) > $2
                              ORDER BY similarity DESC
                              LIMIT $3
                            )
                            SELECT * FROM fb_stats
                            WHERE row_id IN (SELECT row_id FROM vector_matches)
                            """,
            qe,
            similarity_threshold,
            num_matches,
        )

        if len(results) == 0:
            raise Exception("Did not find any results. Adjust the query parameters.")

        for r in results:
            # Collect the row id.
            matches.append(
                {
                    "row_id": r["row_id"],
                }
            )

        await conn.close()


# Run the SQL commands now.
await main()  # type: ignore

# Show the results for similar stats that matched the user query.
matches = pd.DataFrame(matches)
matches.head(5)

## LLMs and LangChain

### *Use case 1*: Building an AI-curated contextual hybrid search

Combine natural language query text with regular relational filters to create a powerful hybrid search.

Example: A grandparent wants to use the **AI-powered search interface** to find an educational toy for their grandkid that fits within their budget.

In [None]:
# @markdown Enter the user search query in a simple English text. T
user_query = "what combination of photos and images will maximize impressions?"  # @param {type:"string"}


# Quick input validations.
assert user_query, "⚠️ Please input a valid input search text"

In [None]:


from langchain.embeddings import VertexAIEmbeddings
from google.cloud import aiplatform

aiplatform.init(project=f"{project_id}", location=f"{region}")

embeddings_service = VertexAIEmbeddings()

qe = embeddings_service.embed_query([user_query])

Step 1: Generate the vector embedding for the user query

Step 2: Use `pgvector` to find the values in the vector embeddings.

- The new `pgvector` similarity search operators provide powerful semantics
to combine the vector search operation with regular query filters in a single SQL query.
- **Using pgvector, integrate relational databases with vector search operations**


In [None]:
from pgvector.asyncpg import register_vector
import asyncio
import asyncpg
from google.cloud.sql.connector import Connector

matches = []


async def main():
    loop = asyncio.get_running_loop()
    async with Connector(loop=loop) as connector:
        # Create connection to Cloud SQL database.
        conn: asyncpg.Connection = await connector.connect_async(
            f"{project_id}:{region}:{instance_name}",  # Cloud SQL instance connection name
            "asyncpg",
            user=f"{database_user}",
            password=f"{database_password}",
            db=f"{database_name}",
        )

        await register_vector(conn)
        similarity_threshold = 0.5
        num_matches = 5

        # Find similar stat descriptions to the query using cosine similarity search
        # over all vector embeddings. This new feature is provided by `pgvector`.
        results = await conn.fetch(
            """
                            WITH vector_matches AS (
                              SELECT row_id, 1 - (embedding <=> $1) AS similarity
                              FROM fb_stats_embeddings
                              WHERE 1 - (embedding <=> $1) > $2
                              ORDER BY similarity DESC
                              LIMIT $3
                            )
                            SELECT * FROM fb_stats
                            WHERE row_id IN (SELECT row_id FROM vector_matches)

                            """,
            qe,
            similarity_threshold,
            num_matches,
        )

        if len(results) == 0:
            raise Exception("I don't have a good answer. Try rewording the question.")

        for r in results:
            # Collect the description for all the matched similar stat descriptions.
            matches.append(
                f"""The row id {r["row_id"]}.
                          Its description is below:
                          {r}."""
            )
        await conn.close()


# Run the SQL commands now.
await main()  # type: ignore

# Show the results for similar stats that matched the user query.
#matches

Step 3: Use LangChain to summarize and generate a high-quality prompt to answer the user query

- After finding the similar rows and their fields using `pgvector`, the next step is to use them for generating a prompt input for the LLM model.
- Since individual rows fields can be very long, they may not fit within the specified input payload limit for an LLM model.
- The `MapReduceChain` from LangChain framework is used to generate and combine short summaries of similarly matched rows.
- The combined summaries are then used to build a high-quality prompt for an input to the LLM model.

In [None]:
# Using LangChain for summarization and efficient context building.

from langchain.chains.summarize import load_summarize_chain
from langchain.docstore.document import Document
from langchain.llms import VertexAI
from langchain import PromptTemplate, LLMChain
from IPython.display import display, Markdown

llm = VertexAI()

map_prompt_template = """
              You will be given a question about the data in the vector embeddings.
              the data in the embeddings are Facebook statstics for Posts.
               Select the right column described in the question and do the analysis.
                You should use SUM() and AVG(), MEAN() when necessary.Always return the row_id. The question will be enclosed in triple backticks (```).
              Using this description only, extract the number for the column referenced and return your answer.
              ```{text}```
              SUMMARY:
              """
map_prompt = PromptTemplate(template=map_prompt_template, input_variables=["text"])

combine_prompt_template = """
               You will be given a question about the Facebook statistics about POSTs.
               All information for answering the question is in the vector embeddings and fb_stats database. (```) and a question enclosed in
                double backticks(``).
                Select the right column described in the question and do the analysis.
                You should use SUM() and AVG(), MEAN() when necessary.
                You should only use the information in the description. always return the row_id.
                Your answer should include the explanantion for your answer. Your answer should be less than 200 words.
                Your answer should be in Markdown in a numbered list format.


                Description:
                ```{text}```


                Question:
                ``{user_query}``


                Answer:
                """
combine_prompt = PromptTemplate(
    template=combine_prompt_template, input_variables=["text", "user_query"]
)

docs = [Document(page_content=t) for t in matches]
chain = load_summarize_chain(
    llm, chain_type="map_reduce", map_prompt=map_prompt, combine_prompt=combine_prompt
)
answer = chain.run(
    {
        "input_documents": docs,
        "user_query": user_query,
    }
)


display(answer)