In [1]:
# USE_CLOUDSQL = False
USE_CLOUDSQL = True


project_id = "imrenagi-gemini-experiment" #change this to your project id
region = "us-central1" 
gemini_embedding_model = "text-embedding-004"

if not USE_CLOUDSQL:
    # use pgvector docker image for local development
    database_password = "pyconapac"
    database_name = "pyconapac"
    database_user = "pyconapac"
    database_host = "localhost"
else:
    # use cloudsql credential if you want to use cloudsql
    instance_name="pyconapac-demo"
    database_password = 'testing'
    database_name = 'testing'
    database_user = 'testing'


assert database_name, "⚠️ Please provide a database name"
assert database_user, "⚠️ Please provide a database user"
assert database_password, "⚠️ Please provide a database password"

In [2]:
if USE_CLOUDSQL:
    # get the ip address of the cloudsql instance
    ip_addresses = !gcloud sql instances describe {instance_name} --format="value(ipAddresses[0].ipAddress)"
    database_host = ip_addresses[0]

In [3]:
db_conn_string = f"postgres://{database_user}:{database_password}@{database_host}:5432/{database_name}"
db_conn_string

'postgres://testing:testing@35.232.5.157:5432/testing'

In [4]:
import vertexai
vertexai.init(project=project_id, location=region)

from langchain_google_vertexai import VertexAIEmbeddings
embeddings_service = VertexAIEmbeddings(model_name=gemini_embedding_model)

In [5]:
%%writefile lib/pg_retriever.py

from typing import List

from langchain_core.callbacks import CallbackManagerForRetrieverRun
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever

from langchain_google_vertexai import VertexAIEmbeddings

import psycopg2
from pgvector.psycopg2 import register_vector

class CourseContentRetriever(BaseRetriever):
    """Retriever to find relevant course content based on the
    query provided."""

    embeddings_service: VertexAIEmbeddings    
    similarity_threshold: float
    num_matches: int
    conn_str: str

    def _get_relevant_documents(
            self, query: str, *, run_manager: CallbackManagerForRetrieverRun
        ) -> List[Document]:
        conn = psycopg2.connect(self.conn_str)
        register_vector(conn)

        qe = self.embeddings_service.embed_query(query)

        with conn.cursor() as cur:
            cur.execute(
                """
                        WITH vector_matches AS (
                        SELECT id, content, 1 - (embedding <=> %s::vector) AS similarity
                        FROM course_content_embeddings
                        WHERE 1 - (embedding <=> %s::vector) > %s
                        ORDER BY similarity DESC
                        LIMIT %s
                        )
                        SELECT cc.id as id, cc.title as title, 
                            vm.content as content, 
                            vm.similarity as similarity 
                        FROM course_contents cc
                        LEFT JOIN vector_matches vm ON cc.id = vm.id;
                """,
                (qe, qe, self.similarity_threshold, self.num_matches)
            )
            results = cur.fetchall()

        conn.close()

        if not results:
            return []
        
        return [
            Document(
                page_content=r[2],
                metadata={
                    "id": r[0],
                    "title": r[1],
                    "similarity": r[3],
                }
            ) for r in results if r[2] is not None
        ]

Overwriting lib/pg_retriever.py


In [6]:
from lib.pg_retriever import CourseContentRetriever

retriever = CourseContentRetriever(embeddings_service=embeddings_service, conn_str=db_conn_string, similarity_threshold=0.1, num_matches=10)
retriever.invoke("what is strategy for creating forgot password", run_manager=None)

[Document(metadata={'id': 2, 'title': 'Forgot Password Cheat Sheet', 'similarity': 0.626802716961831}, page_content="1. Generate a token to the user and attach it in the URL query string.\n2. Send this token to the user via email.\n   - Don't rely on the [Host](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Host) header while creating the reset URLs to avoid [Host Header Injection](https://owasp.org/www-project-web-security-testing-guide/stable/4-Web_Application_Security_Testing/07-Input_Validation_Testing/17-Testing_for_Host_Header_Injection) attacks. The URL should be either be hard-coded, or should be validated against a list of trusted domains.\n   - Ensure that the URL is using HTTPS.\n3. The user receives the email, and browses to the URL with the attached token.\n   - Ensure that the reset password page adds the [Referrer Policy](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Referrer-Policy) tag with the `noreferrer` value in order to avoid [referrer leaka

In [9]:
from typing import List

from langchain_core.callbacks import CallbackManagerForRetrieverRun
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever

from langchain_google_vertexai import VertexAIEmbeddings

import asyncpg
import asyncio
from pgvector.asyncpg import register_vector

class CourseContentRetriever(BaseRetriever):
    """Retriever to find relevant course content based on the
    query provided."""

    embeddings_service: VertexAIEmbeddings    
    similarity_threshold: float
    num_matches: int
    conn_str: str

    async def _aget_relevant_documents(
            self, query: str, *, run_manager: CallbackManagerForRetrieverRun
        ) -> List[Document]:
        conn = await asyncpg.connect(self.conn_str)
        await register_vector(conn)

        qe = await self.embeddings_service.aembed_query(query)

        results = await conn.fetch(
            """
            WITH vector_matches AS (
            SELECT id, content, 1 - (embedding <=> $1::vector) AS similarity
            FROM course_content_embeddings
            WHERE 1 - (embedding <=> $1::vector) > $2
            ORDER BY similarity DESC
            LIMIT $3
            )
            SELECT cc.id as id, cc.title as title, 
                vm.content as content, 
                vm.similarity as similarity 
            FROM course_contents cc
            LEFT JOIN vector_matches vm ON cc.id = vm.id;
            """,
            qe, self.similarity_threshold, self.num_matches
        )

        await conn.close()

        if not results:
            return []
        
        return [
            Document(
                page_content=r['content'],
                metadata={
                    "id": r['id'],
                    "title": r['title'],
                    "similarity": r['similarity'],
                }
            ) for r in results if r['content'] is not None
        ]

    def _get_relevant_documents(
            self, query: str, *, run_manager: CallbackManagerForRetrieverRun
        ) -> List[Document]:
        return asyncio.run(self._aget_relevant_documents(query, run_manager=run_manager))

In [15]:
async def test():
    retriever = CourseContentRetriever(embeddings_service=embeddings_service, conn_str=db_conn_string, similarity_threshold=0.1, num_matches=10)
    retriever.invoke("what is strategy for creating forgot password", run_manager=None)

await test()

RuntimeError: asyncio.run() cannot be called from a running event loop