In [None]:
from enum import Enum

class PGAI_Mode(Enum):
    Disabled = 1 # never use pgai to access ai, use python ollama module instead
    Sql      = 2 # use SQL to access pgai functions
    Function = 3 # define postgres stored functions to access pgai functions

class LLM_Mode(Enum):
    LocalOllama = 1 # use local Ollama server
    OpenAI      = 2 # use Openai through pgai
    AnCo        = 3 # use Anthropic for chat and Cohere for embedding

class CFG:
  def __init__(self):

    ##### IMPORTANT - PLEASE READ
    #####
    ##### Set 'LLM_MODE' and 'PGAI_MODE' according to Enum options above to
    ##### select mode.
    #####
    ##### Collab Pro (High-RAM mode) strongly recommended
    #####
    ##### NOTE: If you choose PGAI_Mode.Disabled, you MUST use
    #####       LLM_Mode.LocalOllama

    self.LLM_MODE = LLM_Mode.OpenAI
    self.PGAI_MODE = PGAI_Mode.Function

    self.VECTOR_TYPE = "halfvec"
    self.MAX_DB_EMBEDDING_SIZE = 4000 if self.VECTOR_TYPE == "halfvec" else 2000

    if self.LLM_MODE == LLM_Mode.OpenAI:
      # https://platform.openai.com/docs/overview
      self.DEBUG_PGAI_EMBEDDING = False
      assert self.PGAI_MODE != PGAI_Mode.Disabled, "PGAI_MODE should not be Disabled in {self.LLM_MODE}"
      # https://openai.com/api/pricing/
      self.CHAT_MODEL = "gpt-4o-mini"
      self.CHAT_CONTEXT_SIZE = 8192 * 4
      self.EMBEDDING_MODEL = "text-embedding-3-large"
      self.EMBEDDING_CONTEXT_SIZE = 8192
      self.set_EMBEDDING_SIZE(raw=3072)
      self.CHUNK_SIZE = 4096
      self.NUM_OF_CHUNKS = 5
      self.CHAT_API_KEY = "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"
      self.EMBEDDING_API_KEY = self.CHAT_API_KEY
    elif self.LLM_MODE == LLM_Mode.AnCo:
      # https://console.anthropic.com/
      self.DEBUG_PGAI_EMBEDDING = False
      assert self.PGAI_MODE != PGAI_Mode.Disabled, "PGAI_MODE should not be Disabled in {self.LLM_MODE}"
      # https://docs.anthropic.com/en/docs/about-claude/models
      self.CHAT_MODEL = "claude-3-5-sonnet-20240620"
      self.CHAT_CONTEXT_SIZE = 8192 * 4
      # https://dashboard.cohere.com/
      self.EMBEDDING_MODEL = "embed-english-v3.0"
      self.EMBEDDING_CONTEXT_SIZE = 512
      self.set_EMBEDDING_SIZE(raw=1024)
      self.CHUNK_SIZE = 1024
      self.NUM_OF_CHUNKS = 15
      self.CHAT_API_KEY = "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"
      self.EMBEDDING_API_KEY = "KsFiuKf6ugoGNFpwEzSUXZO3tfehWmAtMCoQG5EN"
    else:
      self.DEBUG_PGAI_EMBEDDING = False
      self.CHAT_MODEL = "llama3.2:3b" #"llama3.1"
      self.EMBEDDING_MODEL = "nomic-embed-text"
      self.CHAT_CONTEXT_SIZE = None
      self.EMBEDDING_CONTEXT_SIZE = None
      self.RAW_EMBEDDING_SIZE = None
      self.EMBEDDING_SIZE = None
      self.CHUNK_SIZE = 2048
      self.NUM_OF_CHUNKS = 5
      self.CHAT_API_KEY = ""
      self.EMBEDDING_API_KEY = ""

    self.TEST_VECTOR_DB_NAME = "testvector_db"
    self.FULL_ACCESS_USER = "full_access_user"
    self.FULL_ACCESS_PASSWORD = "full_access_password"
    self.LIMITED_ACCESS_USER = "limited_access_user"
    self.LIMITED_ACCESS_PASSWORD = "new_password2"

    self.FULL_ACCESS_DB = f"postgresql://{self.FULL_ACCESS_USER}:{self.FULL_ACCESS_PASSWORD}@localhost/{self.TEST_VECTOR_DB_NAME}"
    self.LIMITED_ACCESS_DB = f"postgresql://{self.LIMITED_ACCESS_USER}:{self.LIMITED_ACCESS_PASSWORD}@localhost/{self.TEST_VECTOR_DB_NAME}"
    self.DB_USED_BY_VECTOR_STORE = self.LIMITED_ACCESS_DB if self.PGAI_MODE == PGAI_Mode.Function else self.FULL_ACCESS_DB

  def set_VECTOR_TYPE(self, vector_type):
    self.VECTOR_TYPE = vector_type


  def set_CHAT_CONTEXT_SIZE(self, context_size):
    self.CHAT_CONTEXT_SIZE = context_size


  def set_EMBEDDING_CONTEXT_SIZE(self, context_size):
    self.EMBEDDING_CONTEXT_SIZE = context_size


  def set_EMBEDDING_SIZE(self, raw):
    self.RAW_EMBEDDING_SIZE = raw
    self.EMBEDDING_SIZE = min(self.RAW_EMBEDDING_SIZE, self.MAX_DB_EMBEDDING_SIZE)

g_cfg = CFG()

In [None]:
# detect GPU/TPU. This notebook requires GPU/TPU to run in LLM_Mode.LocalOllama mode
import tensorflow as tf

# Check for TPU
try:
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
    tpu_available = True
    print("TPU is availalbe.")
except:
    tpu_available = False

In [None]:
# Check for GPU
gpu_available = tf.config.list_physical_devices('GPU')
if gpu_available:
    print("GPU is availalbe.")

if g_cfg.LLM_MODE == LLM_Mode.LocalOllama:
    # require GPU or TPU
    assert tpu_available or gpu_available, "Error: No GPU or TPU found. Please enable a GPU (e.g. T4) runtime in Colab."

else:
  if tpu_available or gpu_available:
    print("#############################################################")
    print(f"# GPU/TPU detected even {g_cfg.LLM_MODE} doesn't need it #####")
    print("#############################################################")
  assert len(g_cfg.CHAT_API_KEY) > 0 and len(g_cfg.EMBEDDING_API_KEY) > 0, "API_KEY should not be empty in {self.LLM_MODE}"

In [None]:
# install and launch xterm for debug purpose
!pip -qq install colab-xterm
%load_ext colabxterm
!echo sudo -u postgres psql -d {g_cfg.TEST_VECTOR_DB_NAME}
!echo sudo -u postgres psql -d {g_cfg.FULL_ACCESS_DB}
!echo sudo -u postgres psql -d {g_cfg.LIMITED_ACCESS_DB}
%xterm

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/115.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m115.6/115.6 kB[0m [31m6.9 MB/s[0m eta [36m0:00:00[0m
[?25hsudo -u postgres psql -d testvector_db
sudo -u postgres psql -d postgresql://full_access_user:full_access_password@localhost/testvector_db
sudo -u postgres psql -d postgresql://limited_access_user:new_password2@localhost/testvector_db


Launching Xterm...

<IPython.core.display.Javascript object>

In [None]:
# install postgres
!apt install gnupg postgresql-common apt-transport-https lsb-release wget
# add apt.postgresql.org to sources.list.d
!/usr/share/postgresql-common/pgdg/apt.postgresql.org.sh -y
!apt install postgresql-16 postgresql-contrib-16 postgresql-plpython3-16
# install postgres development env
!sudo apt install postgresql-server-dev-16

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
lsb-release is already the newest version (11.1.0ubuntu4).
lsb-release set to manually installed.
gnupg is already the newest version (2.2.27-3ubuntu2.1).
gnupg set to manually installed.
wget is already the newest version (1.21.2-2ubuntu1.1).
The following additional packages will be installed:
  libcommon-sense-perl libjson-perl libjson-xs-perl libtypes-serialiser-perl logrotate netbase
  postgresql-client-common ssl-cert
Suggested packages:
  bsd-mailx | mailx
The following NEW packages will be installed:
  apt-transport-https libcommon-sense-perl libjson-perl libjson-xs-perl libtypes-serialiser-perl
  logrotate netbase postgresql-client-common postgresql-common ssl-cert
0 upgraded, 10 newly installed, 0 to remove and 49 not upgraded.
Need to get 486 kB of archives.
After this operation, 1,886 kB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy-updates/

In [None]:
# add timescaledb package repo to apt source list
!echo "deb https://packagecloud.io/timescale/timescaledb/ubuntu/ $(lsb_release -c -s) main" | sudo tee /etc/apt/sources.list.d/timescaledb.list
# add timescaledb gpg key
!wget --quiet -O - https://packagecloud.io/timescale/timescaledb/gpgkey | sudo gpg --dearmor -o /etc/apt/trusted.gpg.d/timescaledb.gpg
# install timescaledb extension
!sudo apt update
!sudo apt install timescaledb-2-postgresql-16 postgresql-client-16


deb https://packagecloud.io/timescale/timescaledb/ubuntu/ jammy main
Hit:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:2 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Ign:4 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:5 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:6 https://r2u.stat.illinois.edu/ubuntu jammy Release
Hit:7 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:9 http://apt.postgresql.org/pub/repos/apt jammy-pgdg InRelease
Hit:10 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:11 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:12 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:13 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:8 https://packagecloud.io/timescale/timescaledb/ubuntu jammy InRelease [29.2 kB]
Get:15 h

In [None]:
# compile and install pgvector extension
!git clone --depth 1 --branch v0.7.4 https://github.com/pgvector/pgvector.git
!cd pgvector && make -j 4 && sudo make install

Cloning into 'pgvector'...
remote: Enumerating objects: 154, done.[K
remote: Counting objects: 100% (154/154), done.[K
remote: Compressing objects: 100% (104/104), done.[K
remote: Total 154 (delta 75), reused 66 (delta 46), pack-reused 0 (from 0)[K
Receiving objects: 100% (154/154), 141.12 KiB | 11.76 MiB/s, done.
Resolving deltas: 100% (75/75), done.
Note: switching to '103ac50f1a90b47a72003e8e8628a55ec372f202'.

You are in 'detached HEAD' state. You can look around, make experimental
changes and commit them, and you can discard any commits you make in this
state without impacting any branches by switching back to a branch.

If you want to create a new branch to retain commits you create, you may
do so (now or later) by using -c with the switch command. Example:

  git switch -c <new-branch-name>

Or undo this operation with:

  git switch -

Turn off this advice by setting config variable advice.detachedHead to false

gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-a

In [None]:
# NOTE: This may take several minutes, depending on your Colab runtime type

# install Rust and PostgreSQL extensions in Rust
![ ! -e "$HOME/.cargo/env" ] && curl https://sh.rustup.rs -sSf | sh -s -- -y
!. "$HOME/.cargo/env" && CARGO_BUILD_JOBS=4 cargo install --version 0.11.4 --locked cargo-pgrx && CARGO_BUILD_JOBS=4 cargo pgrx init --pg16 pg_config

[1minfo:[0m downloading installer
[0m[1minfo: [0mprofile set to 'default'
[0m[1minfo: [0mdefault host triple is x86_64-unknown-linux-gnu
[0m[1minfo: [0msyncing channel updates for 'stable-x86_64-unknown-linux-gnu'
[0m[1minfo: [0mlatest update on 2024-09-05, rust version 1.81.0 (eeb90cda1 2024-09-04)
[0m[1minfo: [0mdownloading component 'cargo'
[0m[1minfo: [0mdownloading component 'clippy'
[0m[1minfo: [0mdownloading component 'rust-docs'
[0m[1minfo: [0mdownloading component 'rust-std'
[0m[1minfo: [0mdownloading component 'rustc'
[0m[1minfo: [0mdownloading component 'rustfmt'
[0m[1minfo: [0minstalling component 'cargo'
[0m[1minfo: [0minstalling component 'clippy'
[0m[1minfo: [0minstalling component 'rust-docs'
 15.9 MiB /  15.9 MiB (100 %)   5.9 MiB/s in  2s ETA:  0s
[0m[1minfo: [0minstalling component 'rust-std'
 26.8 MiB /  26.8 MiB (100 %)   9.8 MiB/s in  4s ETA:  0s
[0m[1minfo: [0minstalling component 'rustc'
 66.9 MiB /  66.9 MiB (100 %

In [None]:
# compile and install pgvectorscale
!git clone --depth 1 --branch 0.3.0 https://github.com/timescale/pgvectorscale
!. "$HOME/.cargo/env" && cd pgvectorscale/pgvectorscale && RUSTFLAGS="-C target-feature=+avx2,+fma" CARGO_BUILD_JOBS=4 cargo pgrx install --release

Cloning into 'pgvectorscale'...
remote: Enumerating objects: 79, done.[K
remote: Counting objects: 100% (79/79), done.[K
remote: Compressing objects: 100% (69/69), done.[K
remote: Total 79 (delta 3), reused 43 (delta 0), pack-reused 0 (from 0)[K
Receiving objects: 100% (79/79), 81.07 KiB | 13.51 MiB/s, done.
Resolving deltas: 100% (3/3), done.
Note: switching to '04e1d1309b2e1c70e1e979c49d71b3f499cf5853'.

You are in 'detached HEAD' state. You can look around, make experimental
changes and commit them, and you can discard any commits you make in this
state without impacting any branches by switching back to a branch.

If you want to create a new branch to retain commits you create, you may
do so (now or later) by using -c with the switch command. Example:

  git switch -c <new-branch-name>

Or undo this operation with:

  git switch -

Turn off this advice by setting config variable advice.detachedHead to false

[32m[1m       Using[0m[39m [37m[1mPgConfig("pg16")[0m[39m and 

In [None]:
# install pgai
!rm -fr pgai
stable_pgai_tag = "a4ae9fb4482fb51433aedc931cf51c2c906e2aba"
!git clone --branch main https://github.com/timescale/pgai.git
!cd pgai && git checkout {stable_pgai_tag} && make -j 4 && sudo make install

Cloning into 'pgai'...
remote: Enumerating objects: 1981, done.[K
remote: Counting objects:   0% (1/1038)[Kremote: Counting objects:   1% (11/1038)[Kremote: Counting objects:   2% (21/1038)[Kremote: Counting objects:   3% (32/1038)[Kremote: Counting objects:   4% (42/1038)[Kremote: Counting objects:   5% (52/1038)[Kremote: Counting objects:   6% (63/1038)[Kremote: Counting objects:   7% (73/1038)[Kremote: Counting objects:   8% (84/1038)[Kremote: Counting objects:   9% (94/1038)[Kremote: Counting objects:  10% (104/1038)[Kremote: Counting objects:  11% (115/1038)[Kremote: Counting objects:  12% (125/1038)[Kremote: Counting objects:  13% (135/1038)[Kremote: Counting objects:  14% (146/1038)[Kremote: Counting objects:  15% (156/1038)[Kremote: Counting objects:  16% (167/1038)[Kremote: Counting objects:  17% (177/1038)[Kremote: Counting objects:  18% (187/1038)[Kremote: Counting objects:  19% (198/1038)[Kremote: Counting objects:  20% (208/1038)[K

In [None]:
# configure and tune timescaledb databases
!sudo timescaledb-tune -yes
# start postgresql service
!sudo service postgresql restart

[37;1mUsing postgresql.conf at this path:
[0m/etc/postgresql/16/main/postgresql.conf

[37;1mWriting backup to:
[0m/tmp/timescaledb_tune.backup202409220823

[37;1mshared_preload_libraries needs to be updated
[0m[37;1mCurrent:
[0m#shared_preload_libraries = ''
[37;1mRecommended:
[0mshared_preload_libraries = 'timescaledb'
[32;1msuccess: [0mshared_preload_libraries will be updated

[37;1mRecommendations based on 50.99 GB of available memory and 8 CPUs for PostgreSQL 16
[0m
[37;1mMemory settings recommendations
[0m[37;1mCurrent:
[0mshared_buffers = 128MB
#effective_cache_size = 4GB
#maintenance_work_mem = 64MB
#work_mem = 4MB
[37;1mRecommended:
[0mshared_buffers = 13054MB
effective_cache_size = 39163MB
maintenance_work_mem = 2047MB
work_mem = 16709kB
[32;1msuccess: [0mmemory settings will be updated

[37;1mParallelism settings recommendations
[0m[37;1mCurrent:
[0m[31;1mmissing: [0mtimescaledb.max_background_workers
#max_worker_processes = 8
#max_parallel_workers

In [None]:
# drop database
!sudo -u postgres dropdb -f {g_cfg.TEST_VECTOR_DB_NAME}

dropdb: error: database removal failed: ERROR:  database "testvector_db" does not exist


In [None]:
# create user testvectoruser and database testvectordb
!sudo -u postgres createuser {g_cfg.FULL_ACCESS_USER}
!sudo -u postgres createdb {g_cfg.TEST_VECTOR_DB_NAME}

# change password and grant privileges
cmd_change_password = f'"ALTER USER {g_cfg.FULL_ACCESS_USER} WITH ENCRYPTED PASSWORD \'{g_cfg.FULL_ACCESS_PASSWORD}\';"'
cmd_grant_privileges = f'"GRANT ALL PRIVILEGES ON DATABASE {g_cfg.TEST_VECTOR_DB_NAME} TO {g_cfg.FULL_ACCESS_USER};"'
cmd_grant_schema_public = f'"GRANT ALL ON SCHEMA public TO {g_cfg.FULL_ACCESS_USER};"'
cmd_grant_schema_ai = f'"GRANT ALL ON SCHEMA ai TO {g_cfg.FULL_ACCESS_USER};"'
cmd_grant_schema_ai_func = f'"GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA ai TO {g_cfg.FULL_ACCESS_USER};"'

!sudo -u postgres psql -c {cmd_change_password}
!sudo -u postgres psql -c {cmd_grant_privileges}
!sudo -u postgres psql -d {g_cfg.TEST_VECTOR_DB_NAME} -c {cmd_grant_schema_public}

ALTER ROLE
GRANT
GRANT


In [None]:
# create timescaledb and vectorscale extensions in the testvectordb
cmd_create_extensions = f'"CREATE EXTENSION IF NOT EXISTS timescaledb; CREATE EXTENSION IF NOT EXISTS vectorscale CASCADE; CREATE EXTENSION IF NOT EXISTS ai CASCADE;"'
!sudo -u postgres psql -d {g_cfg.TEST_VECTOR_DB_NAME} -c {cmd_create_extensions}
!sudo -u postgres psql -d {g_cfg.TEST_VECTOR_DB_NAME} -c {cmd_grant_schema_ai}
!sudo -u postgres psql -d {g_cfg.TEST_VECTOR_DB_NAME} -c {cmd_grant_schema_ai_func}

# check extensions have been created
!timeout 1 psql {g_cfg.FULL_ACCESS_DB} -c "SELECT * FROM pg_extension;"

NOTICE:  installing required extension "vector"
NOTICE:  installing required extension "plpython3u"
CREATE EXTENSION
CREATE EXTENSION
CREATE EXTENSION
GRANT
GRANT
  oid  |   extname   | extowner | extnamespace | extrelocatable | extversion |                                                                                 extconfig                                                                                 |                                                              extcondition                                                               
-------+-------------+----------+--------------+----------------+------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------
 13545 | plpgsql     |       10 |           11 | f           

In [None]:
!pip -qq install psycopg2 pgvector
import json
import math
import psycopg2
import subprocess
import threading

In [None]:
if g_cfg.LLM_MODE == LLM_Mode.LocalOllama:

    # install ollama
    !curl -fsSL https://ollama.com/install.sh | sh

if g_cfg.LLM_MODE == LLM_Mode.LocalOllama:

    # Start a ollama service in the background, i.e. ollama serve
    commands = [
        ["bash", "-c", "nohup ollama serve > /dev/null 2>&1 &"],
    ]
    for c in commands:
        print(c)
        process = subprocess.Popen(c, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        stdout, stderr = process.communicate()
        print(stdout.decode())
        print(stderr.decode())

    # sleep a couple of seconds to wait ollama starts
    !sleep 2

    # load models
    !ollama pull {g_cfg.CHAT_MODEL}
    !ollama pull {g_cfg.EMBEDDING_MODEL}

    # show models info
    # to get more model info
    # !curl http://localhost:11434/api/show -d '{"name": "llama3"}'
    !ollama show {g_cfg.CHAT_MODEL}
    !ollama show {g_cfg.EMBEDDING_MODEL}

    # get model context length
    output = get_ipython().getoutput(f'ollama show {g_cfg.CHAT_MODEL}' + '| grep "context length" | awk \'{print $3}\'')
    g_cfg.set_CHAT_CONTEXT_SIZE(int(output[0]))
    output = get_ipython().getoutput(f'ollama show {g_cfg.EMBEDDING_MODEL}' + '| grep "context length" | awk \'{print $3}\'')
    g_cfg.set_EMBEDDING_CONTEXT_SIZE = (int(output[0]))
    print(f"chat model: {g_cfg.CHAT_MODEL}, context length: {g_cfg.CHAT_CONTEXT_SIZE}")
    # get model embedding size
    output = get_ipython().getoutput(f'ollama show {g_cfg.EMBEDDING_MODEL}' + '| grep "embedding length" | awk \'{print $3}\'')
    g_cfg.set_EMBEDDING_SIZE(raw=int(output[0]))

    print(f"chat model: {g_cfg.CHAT_MODEL}, context length: {g_cfg.CHAT_CONTEXT_SIZE}")
    print(f"embedding model: {g_cfg.EMBEDDING_MODEL}, embedding length: {g_cfg.EMBEDDING_SIZE}/{g_cfg.RAW_EMBEDDING_SIZE}, context length: {g_cfg.EMBEDDING_CONTEXT_SIZE}")


In [None]:
# install dependencies
!pip -qq install langchain
!pip -qq install langchain-core
!pip -qq install langchain-community

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/50.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m50.4/50.4 kB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.0 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.0/1.0 MB[0m [31m53.4 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/399.9 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m399.9/399.9 kB[0m [31m31.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m290.2/290.2 kB[0m [31m24.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m141.9/141.9 kB[0m [31m11.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
if g_cfg.LLM_MODE == LLM_Mode.LocalOllama:
    from langchain_community.llms import Ollama

    class Ollama_Client:
        def __init__(self, model, max_model_context_length):
          self.model = model
          self.max_model_context_length = max_model_context_length
          self.ollama_client = Ollama(model=model,
                                      temperature=0.5)


        def _determine_context_length(self, question, context):
          min_context_length = 1024
          max_context_length = self.max_model_context_length
          context_length = (len(context) + len(question)) // 4 + 1024
          context_length = max(context_length, min_context_length)
          context_length = min(context_length, self.max_model_context_length)
          return context_length


        def ask_question(self, question, context):
          context = f"========= Begin of Document =========\n{context}\n\n========= End of Document =========\n"
          question = f"Please review the document very carefully then answer the following question step by step:\n{question}\n"
          messages = [
                  {"role": "system", "content": "You are an expert."},
                  {"role": "assistant", "content": context},
                  {"role": "user", "content": question},
                  ]
          context_length = self._determine_context_length(context, question)
          print(f"set context length to {context_length}")
          response = self.ollama_client.invoke(input=messages,
                                              num_ctx=context_length)

          return response


    # test request to ollama
    g_ollama_client = Ollama_Client(model=g_cfg.CHAT_MODEL,
                                    max_model_context_length=g_cfg.CHAT_CONTEXT_SIZE)

    answer = g_ollama_client.ask_question(question="How many cat in Tom's house?",
                                          context="Tom has a big house. He has 2 male cats, 4 female cats and 1 cat too small to be identified.")
    print(answer)

In [None]:
# Commented out IPython magic to ensure Python compatibility.
class Vector_DB:
    _first_instance = None
    _lock = threading.Lock()  # A class-level lock
    _content_table = None
    _embedding_table = None
    _embedding_size = None
    _chat_model = None
    _embedding_model = None
    _vector_type = None
    _llm_mode = None
    _func_embed = None
    _func_chat_complete = None

    def __init__(self, database,
                 chat_model, embedding_model, embedding_size,
                 pgai_mode, vector_type,
                 llm_mode, chat_api_key, embedding_api_key):
      with type(self)._lock:  # Acquire the lock before checking or modifying _first_instance
        if type(self)._first_instance is None:
          print("First time init VectorDB")
          type(self)._content_table = f"content_tb"
          type(self)._embedding_table = f"embedding_{embedding_size}_tb"
          type(self)._embedding_size = embedding_size
          type(self)._chat_model = chat_model
          type(self)._embedding_model = embedding_model
          type(self)._vector_type = vector_type
          type(self)._llm_mode = llm_mode
          type(self)._chat_api_key = chat_api_key
          type(self)._embedding_api_key = embedding_api_key

          if llm_mode == LLM_Mode.OpenAI:
            type(self)._func_embed = "openai_embed"
            type(self)._func_chat_complete = "openai_chat_complete"
            type(self)._func_embed_options = ", _api_key => api_key_in "
            type(self)._func_chat_complete_options = type(self)._func_embed_options
            type(self)._sql_embed_options = ", _api_key => %s "
            type(self)._sql_chat_complete_options = type(self)._sql_embed_options
            type(self)._input_options = ", api_key_in TEXT"
          elif llm_mode == LLM_Mode.AnCo:
            type(self)._func_embed = "cohere_embed"
            type(self)._func_chat_complete = "anthropic_generate"
            type(self)._func_embed_options = ", _api_key => api_key_in, _input_type => 'search_document' "
            type(self)._func_chat_complete_options = ", _api_key => api_key_in "
            type(self)._sql_embed_options = ", _api_key => %s , _input_type => 'search_document' "
            type(self)._sql_chat_complete_options = ", _api_key => %s "
            type(self)._input_options = ", api_key_in TEXT"
          elif llm_mode == LLM_Mode.LocalOllama:
            type(self)._func_embed = "ollama_embed"
            type(self)._func_chat_complete = "ollama_chat_complete"
            type(self)._func_embed_options = ""
            type(self)._func_chat_complete_options = ""
            type(self)._sql_embed_options = ""
            type(self)._sql_chat_complete_options = ""
            type(self)._input_options = ", dummpy_key TEXT"
          else:
            assert False, f"llm_mode: {llm_mode}"

          print("connect to", database)
          self._conn = psycopg2.connect(database)
          self._initilization()
          self._conn.autocommit = False
          self._verify_extensions()
          type(self)._first_instance = self
        else:
          print("connect to", database)
          self._conn = psycopg2.connect(database)

        self._pgai_mode = pgai_mode


    @staticmethod
    def _get_num_list_from_num_records_for_embedding_index(num_records):
        num_lists = num_records / 1000
        if num_lists < 10:
          num_lists = 10
        if num_records > 1000000:
          num_lists = math.sqrt(num_records)
        return math.ceil(num_lists)


    def _initilization(self):
      num_records = 100 # for demo purpose
      num_lists = self._get_num_list_from_num_records_for_embedding_index(num_records)

      ########### tables ##########
      # sql to create 2 tables
      # one stores main content
      # one stores embeddings
      sqls = [
        # sql to create 2 tables
        # one stores main content
        # one stores embeddings
        ("sql_create_tables",
         f"""
            CREATE TABLE IF NOT EXISTS {self._content_table} (
              id BIGSERIAL PRIMARY KEY,
              source TEXT NOT NULL,
              contents TEXT NOT NULL,
              metadata JSONB)
            ;
            CREATE TABLE IF NOT EXISTS {self._embedding_table} (
              id BIGINT PRIMARY KEY,
              embedding {self._vector_type}({self._embedding_size}) NOT NULL,
              CONSTRAINT fk_{self._embedding_table}_id
              FOREIGN KEY (id) REFERENCES {self._content_table}(id))
            ;
          """
        ),

        ########### indices ##########
        # sql to create 3 indices
        # one for source
        # one for keyword search
        # one for similarity search
        ("sql_create_indices",
         f"""
            CREATE INDEX IF NOT EXISTS {self._content_table}_source_idx
              ON {self._content_table} (source)
            ;

            -- https://www.postgresql.org/docs/current/textsearch-tables.html
            CREATE INDEX IF NOT EXISTS {self._content_table}_keyword_idx
              ON {self._content_table} USING GIN
              (to_tsvector('english', contents))
            ;

            -- https://github.com/pgvector/pgvector#indexing
            CREATE INDEX IF NOT EXISTS {self._embedding_table}_embedding_idx
              ON {self._embedding_table} USING ivfflat (embedding {self._vector_type}_cosine_ops) WITH (lists = {num_lists})
            ;
          """
        ),

        ########### functions ##########
        # https://www.cybertec-postgresql.com/en/abusing-security-definer-functions/
        ("create_function_check_source_already_in_db",
          f"""
              CREATE OR REPLACE FUNCTION check_source_already_in_db(
                  source_val TEXT
              ) RETURNS boolean
              LANGUAGE plpgsql SECURITY DEFINER
              AS $$
              BEGIN
                  RETURN (SELECT EXISTS (
                      SELECT 1
                      FROM {self._content_table}
                      WHERE source = source_val
                  ));
              END;
              $$;
            """
        ),
        ("create_function_fetch_all_documents_sources",
          f"""
              CREATE OR REPLACE FUNCTION fetch_all_documents_sources(
              ) RETURNS TABLE (source TEXT)
              LANGUAGE plpgsql SECURITY DEFINER
              AS $$
              BEGIN
                  RETURN QUERY
                  SELECT DISTINCT ct.source
                  FROM {self._content_table} ct;
              END;
              $$;
          """
        ),
        ("create_function_add_document_and_embedding",
          f"""
              CREATE OR REPLACE FUNCTION add_document_and_embedding(
                  IN source_value TEXT,
                  IN contents_value TEXT,
                  IN metadata_value JSONB
                  {self._input_options}
              ) RETURNS TABLE (idr BIGINT, embr {self._vector_type}({self._embedding_size}))
              LANGUAGE plpgsql SECURITY DEFINER
              VOLATILE
              AS $$
              BEGIN
                  -- Perform the insertion and embedding within a CTE structure
                  WITH inserted AS (
                      -- Insert into the content table and return the id
                      INSERT INTO {self._content_table} (source, contents, metadata)
                      VALUES (source_value, contents_value, metadata_value)
                      RETURNING {self._content_table}.id
                  ), embed AS (
                      -- Generate the embedding using the ai function
                      SELECT inserted.id, subvector(ai.{self._func_embed}('{self._embedding_model}',
                          contents_value {self._func_embed_options}), 1, {self._embedding_size}) AS embed2
                      FROM inserted
                  )
                  -- Insert into the embedding table using the id and the generated embedding
                  INSERT INTO {self._embedding_table} (id, embedding)
                  SELECT embed.id, subvector(embed.embed2, 1, {self._embedding_size})
                  FROM embed
                  RETURNING id, embedding into STRICT idr, embr;
                  RETURN QUERY
                  SELECT idr, embr;
              END;
              $$;
            """
        ),
        ("create_function_fetch_documents_by_query",
          f"""
              CREATE OR REPLACE FUNCTION fetch_documents_by_query(
                  query_value TEXT,
                  limit_value INT
                  {self._input_options}
              ) RETURNS TABLE (id BIGINT, contents TEXT)
              LANGUAGE plpgsql SECURITY DEFINER
              AS $$
              BEGIN
                  -- Return the nearest embeddings based on the model and embedding value
                  RETURN QUERY
                  SELECT ct.id, ct.contents
                  FROM {self._content_table} ct
                  JOIN {self._embedding_table} et ON ct.id = et.id
                  ORDER BY et.embedding <=> subvector(ai.{self._func_embed}('{self._embedding_model}',
                    query_value {self._func_embed_options})::{self._vector_type}, 1, {self._embedding_size})
                  LIMIT limit_value;
              END;
              $$;
            """
        ),
        ("create_function_fetch_documents_by_embedding",
          f"""
              CREATE OR REPLACE FUNCTION fetch_documents_by_embedding(
                  subvec FLOAT[],
                  limit_val INT
              ) RETURNS TABLE (id BIGINT, contents TEXT)
              LANGUAGE plpgsql SECURITY DEFINER
              AS $$
              BEGIN
                  RETURN QUERY
                  SELECT ct.id, ct.contents
                  FROM {self._content_table} ct
                  JOIN {self._embedding_table} AS et ON ct.id = et.id
                  ORDER BY et.embedding <=> subvector(subvec::{self._vector_type}, 1, {self._embedding_size})
                  LIMIT limit_val;
              END;
              $$;
            """
        ),
        ("create_function_fetch_documents_by_keywords",
          f"""
              CREATE OR REPLACE FUNCTION fetch_documents_by_keywords(
                  search_text TEXT,
                  limit_count INT
              ) RETURNS TABLE(id BIGINT, contents TEXT)
              LANGUAGE plpgsql SECURITY DEFINER
              AS $$
              BEGIN
                  RETURN QUERY
                  SELECT ct.id, ct.contents
                  FROM {self._content_table} ct,
                      plainto_tsquery('english', search_text) query
                  WHERE to_tsvector('english', ct.contents) @@ query
                  ORDER BY ts_rank_cd(to_tsvector('english', ct.contents), query) DESC
                  LIMIT limit_count;
              END;
              $$;
            """
        ),
        ("create_function_chat_complete",
          f"""
              CREATE OR REPLACE FUNCTION chat_complete(
                  question TEXT,
                  ctx_content TEXT
                  {self._input_options}
              )
              RETURNS TEXT
              LANGUAGE plpgsql SECURITY DEFINER
              AS $$
              BEGIN
                  -- Return the result of the chat completion as pretty-printed JSON
                  RETURN jsonb_pretty(
                      ai.{self._func_chat_complete}(
                          '{self._chat_model}',
                          jsonb_build_array(
                              -- jsonb_build_object('role', 'system', 'content', 'You are a helpful assistant'),
                              jsonb_build_object('role', 'user', 'content', question),
                              jsonb_build_object('role', 'assistant', 'content', ctx_content)
                          )
                          -- _options => jsonb_build_object('temperature', 0.6)
                          {self._func_chat_complete_options}
                      )
                  );
              END;
              $$;
            """
        )
      ]

      try:
        cursor = self._conn.cursor()

        for (description, sql) in sqls:
          print(f"{description}\n{sql}")
          cursor.execute(sql)

        self._conn.commit()

      except Exception as e:
        print(f"Error creating tables/indices/functions: {e}")
        self._conn.rollback()
        raise

      finally:
        cursor.close()


    def _verify_extensions(self):
      # check extensions have been created
      try:
        cursor = self._conn.cursor()
        cursor.execute("SELECT * FROM pg_extension")
        ex = cursor.fetchall()

      except Exception as e:
        print(f"Error verifying extensions: {e}")

      finally:
        cursor.close()

      print(f"Extensions: {ex}")
      assert len(ex) >= 4, f"Extensions: {ex}"


    def check_source_already_in_db(self, source):
      if self._pgai_mode == PGAI_Mode.Function:
        sql = f"SELECT * FROM check_source_already_in_db(%s)"
      else:
        sql = f"""
                SELECT EXISTS (SELECT 1 FROM {self._content_table}
                  WHERE source = (%s)) AS record_exists
                ;
              """
      try:
        cursor = self._conn.cursor()
        cursor.execute(sql, [source])
        result = cursor.fetchone()

      except Exception as e:
        print(f"Error checking source: {e}")

      finally:
        cursor.close()

      return True if result[0] else False


    def fetch_all_documents_sources(self):
      if self._pgai_mode == PGAI_Mode.Function:
        sql = f"SELECT * FROM fetch_all_documents_sources()"
      else:
        sql = f"""
                SELECT distinct(source) FROM {self._content_table}
                ;
              """
      try:
        cursor = self._conn.cursor()
        cursor.execute(sql)
        result = cursor.fetchall()

      except Exception as e:
        print(f"Error fetch all source: {e}")

      finally:
        cursor.close()

      return [r[0] for r in result if "://" in r[0]]


    def add_documents_and_embeddings(self, documents, embeddings, source):
      sz = len(documents)

      if embeddings:
        assert sz == len(embeddings), f"documents: {sz}, embeddings: {len(embeddings)}"

      if self._pgai_mode == PGAI_Mode.Function:
        sql = f"SELECT * FROM add_document_and_embedding(%s, %s, %s, %s)"
      elif self._pgai_mode == PGAI_Mode.Sql:
        sql = f"""
              WITH inserted AS (
                  -- Insert into the content  and return the id
                  INSERT INTO {self._content_table} (source, contents, metadata)
                  VALUES (%s, %s, %s)
                  RETURNING id
              ), embed AS (
                  -- Use the id from the previous insertion and get the embedding
                  SELECT inserted.id, subvector(ai.{self._func_embed}('{self._embedding_model}',
#                     %s {self._sql_embed_options}), 1, {self._embedding_size}) AS embed2
                  FROM inserted
              )
                -- Insert the embedding into the embedding table using the id from the first insert
                INSERT INTO {self._embedding_table} (id, embedding)
                SELECT embed.id, embed.embed2
                FROM embed
                RETURNING id, embedding
              ;
              """
      else:
        assert embeddings is not None and len(embeddings) > 0, f"embeddings: {embeddings}"
        sql = f"""
              WITH inserted AS (
                  INSERT INTO {self._content_table} (source, contents, metadata)
                  VALUES (%s, %s, %s)
                  RETURNING id
                )
                INSERT INTO {self._embedding_table} (id, embedding)
                SELECT id, subvector((%s::{self._vector_type}), 1, {self._embedding_size}) FROM inserted
                RETURNING id, embedding
              ;
              """
      # print(sql)

      try:
        cursor = self._conn.cursor()

        for i in range(sz):
          if self._pgai_mode == PGAI_Mode.Function:
            params = (source, documents[i].page_content, "{}", self._embedding_api_key)
          elif self._pgai_mode == PGAI_Mode.Sql:
            if self._llm_mode == LLM_Mode.LocalOllama:
              params = (source, documents[i].page_content, "{}", documents[i].page_content)
            else:
              params = (source, documents[i].page_content, "{}", documents[i].page_content, self._embedding_api_key)
          else:
            params = (source, documents[i].page_content, "{}", embeddings[i][0:self._embedding_size])

          cursor.execute(sql, params)
          res = cursor.fetchall()[0]
          embd = json.loads(res[1])
          print(f"inserted {source} chunk_id={res[0]} chunk_len={len(documents[i].page_content)}")
          print(f" {self._pgai_mode} add: (len={len(embd)}) {embd[:5]}")
          if embeddings:
            print(f" extl embedding: (len={len(embeddings[i])}) {embeddings[i][:5]}")

        self._conn.commit()

      except Exception as e:
        print(f"Error inserting document: {e}")
        self._conn.rollback()
        return

      finally:
        cursor.close()

      return


    def fetch_documents_by_query(self, query, limit):
      if self._pgai_mode == PGAI_Mode.Function:
        sql = f"SELECT * FROM fetch_documents_by_query(%s, %s, %s)"
        # named paremeter must be the last one
        params = (query, limit, self._embedding_api_key, )
      elif self._pgai_mode == PGAI_Mode.Sql:
        sql = f"""
                SELECT ct.id, ct.contents
                  FROM {self._content_table} ct
                  JOIN {self._embedding_table} et ON ct.id = et.id
                  ORDER BY et.embedding <=> subvector(ai.{self._func_embed}('{self._embedding_model}',
#                     %s {self._sql_embed_options})::{self._vector_type}, 1, {self._embedding_size})
                  LIMIT (%s)
                ;
              """
        if self._llm_mode == LLM_Mode.OpenAI or self._llm_mode == LLM_Mode.AnCo:
          params = (query, self._embedding_api_key, limit, )
        else:
          params = (query, limit, )
      else:
        assert False, f"self._pgai_mode"

      # print(sql)
      try:
        cursor = self._conn.cursor()
        cursor.execute(sql, params)
        result = cursor.fetchall()
        print(f" {self._pgai_mode} query result(limit={limit}): {len(result)} {result}")
      except Exception as e:
        print(f"Error fetch_documents_by_query: {e}")
      finally:
        cursor.close()

      return result


    def fetch_documents_by_embedding(self, query_embedding, limit):
        if self._pgai_mode == PGAI_Mode.Function:
          sql = f"SELECT * FROM fetch_documents_by_embedding(%s, %s)"
        else:
          sql = f"""
                  SELECT ct.id, ct.contents
                    FROM {self._content_table} ct
                    JOIN {self._embedding_table} et ON ct.id = et.id
                    ORDER BY et.embedding <=> subvector(%s::{self._vector_type}, 1, {self._embedding_size})
                    LIMIT %s
                  ;
                """
        # print(sql)
        try:
          cursor = self._conn.cursor()
          cursor.execute(sql, (query_embedding, limit, ))
          result = cursor.fetchall()
          # print(f"by_embeddings result: {len(result)} {result})"
        except Exception as e:
          print(f"Error fetch_documents_by_embedding: {e}")
        finally:
          cursor.close()

        return result


    def fetch_documents_by_keywords(self, query, limit):
      if self._pgai_mode == PGAI_Mode.Function:
        sql = f"SELECT * FROM fetch_documents_by_keywords(%s, %s)"
      else:
        sql = f"""
                SELECT id, contents FROM {self._content_table},
                  plainto_tsquery('english', (%s)) query
                  WHERE to_tsvector('english', contents) @@ query
                  ORDER BY ts_rank_cd(to_tsvector('english', contents), query)
                  DESC LIMIT %s
                ;
              """

      # print(sql)
      try:
        cursor = self._conn.cursor()
        cursor.execute(sql, (query, limit, ))
        result = cursor.fetchall()
        print(f"by_keywords result(limit={limit}): {len(result)} {result}")
      except Exception as e:
        print(f"Error fetch_documents_by_keywords: {e}")
      finally:
        cursor.close()

      return result


    def chat_complete(self, query, context):
      if self._pgai_mode == PGAI_Mode.Function:
        sql = f"SELECT * FROM chat_complete(%s, %s, %s)"
        params = (query, context, self._chat_api_key,)
      elif self._pgai_mode == PGAI_Mode.Sql:
        sql = f"""
                SELECT jsonb_pretty(
                  ai.{self._func_chat_complete}('{self._chat_model}',
                    jsonb_build_array(
                      -- jsonb_build_object('role', 'system', 'content', 'you are a helpful assistant'),
                      jsonb_build_object('role', 'user', 'content', (%s)),
                      jsonb_build_object('role', 'assistant', 'content', (%s)))
                    -- _options=> jsonb_build_object('temperature', 0.6)
                    {self._sql_chat_complete_options}
                  )
                );
              """
        if self._llm_mode == LLM_Mode.LocalOllama:
          params = (query, context,)
        else:
          params = (query, context, self._chat_api_key,)
      else:
        assert False, f"self._pgai_mode"

      # print(sql)
      # print(params)

      try:
        cursor = self._conn.cursor()
        cursor.execute(sql, params)
        result = cursor.fetchall()[0]
        # print(result)
        result = json.loads(result[0])
        if self._llm_mode == LLM_Mode.OpenAI:
          result = result["choices"][0]["message"]["content"]
        elif self._llm_mode == LLM_Mode.AnCo:
          result = result["content"][0]["text"]
        else:
          result = result["message"]["content"]
      except Exception as e:
        print(f"Error chat_complete: {e}")
      finally:
        cursor.close()

      return result

In [None]:
# Init the Database, create the tables, indices, functions as a normal user
# that has ai, public schema access.

Vector_DB(database=g_cfg.FULL_ACCESS_DB,
                  chat_model=g_cfg.CHAT_MODEL,
                  embedding_model=g_cfg.EMBEDDING_MODEL,
                  embedding_size=g_cfg.EMBEDDING_SIZE,
                  pgai_mode=g_cfg.PGAI_MODE,
                  vector_type=g_cfg.VECTOR_TYPE,
                  llm_mode=g_cfg.LLM_MODE,
                  chat_api_key=g_cfg.CHAT_API_KEY,
                  embedding_api_key=g_cfg.EMBEDDING_API_KEY)

# create LIMITED_ACCESS_USER
!sudo -u postgres createuser {g_cfg.LIMITED_ACCESS_USER}
# change password and grant privileges
cmd_change_password = f'"ALTER USER {g_cfg.LIMITED_ACCESS_USER} WITH ENCRYPTED PASSWORD \'{g_cfg.LIMITED_ACCESS_PASSWORD}\';"'
!sudo -u postgres psql -c {cmd_change_password}

# grand execute on DB functions that just create to g_cfg.LIMITED_ACCESS_USER,
# this limited access user can only access data and pgai functions through these functions.
cmd_grand_functions_privileges = f"""
    DO $$
    DECLARE
        row record;
    BEGIN
        FOR row IN SELECT proname, oid FROM pg_proc
                  WHERE proowner = (SELECT oid FROM pg_roles WHERE rolname = '{g_cfg.FULL_ACCESS_USER}' )
                    AND pronamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'public')
        LOOP
            EXECUTE format('GRANT EXECUTE ON FUNCTION public.%I TO {g_cfg.LIMITED_ACCESS_USER}', row.proname);
        END LOOP;
    END;
    $$;
    """

# very hacky way to feed the cmd to psql, because bash cannot handle $$ '' "" well
with open("tmp938.txt", "w") as text_file:
    text_file.write(cmd_grand_functions_privileges)
!sudo -u postgres psql -d {g_cfg.TEST_VECTOR_DB_NAME} --file=tmp938.txt

!pip -qq install langchain
!pip -qq install ollama

import ollama
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import OllamaEmbeddings

First time init VectorDB
connect to postgresql://full_access_user:full_access_password@localhost/testvector_db
sql_create_tables

            CREATE TABLE IF NOT EXISTS content_tb (
              id BIGSERIAL PRIMARY KEY,
              source TEXT NOT NULL,
              contents TEXT NOT NULL,
              metadata JSONB)
            ;
            CREATE TABLE IF NOT EXISTS embedding_3072_tb (
              id BIGINT PRIMARY KEY,
              embedding halfvec(3072) NOT NULL,
              CONSTRAINT fk_embedding_3072_tb_id
              FOREIGN KEY (id) REFERENCES content_tb(id))
            ;
          
sql_create_indices

            CREATE INDEX IF NOT EXISTS content_tb_source_idx
              ON content_tb (source)
            ;

            -- https://www.postgresql.org/docs/current/textsearch-tables.html
            CREATE INDEX IF NOT EXISTS content_tb_keyword_idx
              ON content_tb USING GIN
              (to_tsvector('english', contents))
            ;

         

In [None]:
class Vector_Store:
    def __init__(self, database, chat_model,
                 embedding_model, embedding_size,
                 max_context_size, chunk_size,
                 pgai_mode, vector_type,
                 llm_mode, chat_api_key, embedding_api_key):
      self._db = Vector_DB(database,
                           chat_model=chat_model,
                           embedding_model=embedding_model,
                           embedding_size=embedding_size,
                           pgai_mode=pgai_mode,
                           vector_type=vector_type,
                           llm_mode=llm_mode,
                           chat_api_key=chat_api_key,
                           embedding_api_key=embedding_api_key)
      self._embedding_model = embedding_model
      self._embedding_service = None
      # self._embedding_service = OllamaEmbeddings(model=embedding_model,
      #                                            num_ctx=min(chunk_size // 4,
      #                                                       max_context_size))
      self._embedding_service2 = ollama.embeddings
      self._text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size,
                                                           chunk_overlap=chunk_size // 10)
      self._pgai_mode = pgai_mode


    def _get_embedding(self, prompt):
      return self._embedding_service2(model=self._embedding_model,
                              prompt=prompt)['embedding']

    def _create_documents_tables_and_indices_if_not_exists(self):
      self._db._create_documents_tables_and_indices_if_not_exists()


    def document_is_in_db(self, source):
      return self._db.check_source_already_in_db(source)


    def fetch_all_documents_sources(self):
      return self._db.fetch_all_documents_sources()


    def add_documents_and_embeddings(self, docs, source):
      if self.document_is_in_db(source):
        print(f"{source} already in db")
        return 0

      for d in docs:
        lines = (line.strip() for line in d.page_content.splitlines())
        # break multi-headlines into a line each
        chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
        # drop blank lines
        text = '\n'.join(chunk for chunk in chunks if chunk)
        d.page_content = text

      splits = self._text_splitter.split_documents(docs)

      if self._pgai_mode == PGAI_Mode.Disabled or g_cfg.DEBUG_PGAI_EMBEDDING:
        # embeddings = self._embedding_service.embed_documents(splits)
        embeddings = [self._get_embedding(d.page_content) for d in splits]
      else:
        embeddings = None

      self._db.add_documents_and_embeddings(splits, embeddings, source)
      return len(splits)


    def fetch_documents_by_query(self, query, limit):
      if self._pgai_mode == PGAI_Mode.Disabled:
        # query_embedding = self._embedding_service.embed_query(query)
        query_embedding = self._get_embedding(query)
        # print(f"query_embedding: (len={len(query_embeddings)}) {query_embeddings[:5]}")
        return self._fetch_documents_by_embedding(query_embedding, limit)
      else:
        return self._db.fetch_documents_by_query(query, limit)


    def fetch_documents_by_query_keywords(self, query, limit):
      return self._db.fetch_documents_by_keywords(query, limit)


    def chat_complete(self, query, context):
      return self._db.chat_complete(query=query, context=context)


    def _fetch_documents_by_embedding(self, query_embeddings, limit):
      return self._db.fetch_documents_by_embedding(query_embeddings, limit)


In [None]:
g_vectorstore = Vector_Store(database=g_cfg.DB_USED_BY_VECTOR_STORE,
                             chat_model=g_cfg.CHAT_MODEL,
                             embedding_model=g_cfg.EMBEDDING_MODEL,
                             embedding_size=g_cfg.EMBEDDING_SIZE,
                             max_context_size=g_cfg.EMBEDDING_CONTEXT_SIZE,
                             chunk_size=g_cfg.CHUNK_SIZE,
                             pgai_mode=g_cfg.PGAI_MODE,
                             vector_type=g_cfg.VECTOR_TYPE,
                             llm_mode=g_cfg.LLM_MODE,
                             chat_api_key=g_cfg.CHAT_API_KEY,
                             embedding_api_key=g_cfg.EMBEDDING_API_KEY)

connect to postgresql://limited_access_user:new_password2@localhost/testvector_db


In [None]:
# functions to load url
import requests
from langchain.docstore.document import Document
from langchain_community.document_loaders import WebBaseLoader

def get_wiki_by_title(page_title, source):
    URL = "https://en.wikipedia.org/w/api.php"
    params = {
        'action': 'query',
        'format': 'json',
        'titles': page_title,
        'prop': 'extracts',
        'explaintext': True,  # Extracts the text in plain text format
    }

    response = requests.get(URL, params=params)
    data = response.json()
    page = next(iter(data['query']['pages'].values()))  # Get the first page item
    print(page)
    d = Document(page_content=page['extract'], metadata={"source": source}) if 'extract' in page else None
    return [d] if d else []


def load_url(url):
    splitted = url.split("en.wikipedia.org/wiki/", 2)
    if len(splitted) >= 2:
      docs = get_wiki_by_title(splitted[1], url)
    else:
      loader = WebBaseLoader(url)
      docs = loader.load()

    return docs

!pip -qq install gradio
import gradio as gr



[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m18.1/18.1 MB[0m [31m76.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m318.7/318.7 kB[0m [31m22.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m94.6/94.6 kB[0m [31m7.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.8/10.8 MB[0m [31m82.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.8/62.8 kB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m71.4/71.4 kB[0m [31m5.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m130.2/130.2 kB[0m [31m10.0 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
# Define the RAG setup
def rag_chain(question, use_context=True, total_chunk_num=g_cfg.NUM_OF_CHUNKS):
    if not use_context:
      prompt = f"\nPlease answer the following question step by step:\n{question}\n"
      formatted_ctx_chat = ""
      ctx_debug = "No context used"
    else:
      # 1/3 from keyword search
      keyword_chunk_num = total_chunk_num // 3
      docs_keyword = g_vectorstore.fetch_documents_by_query_keywords(question, keyword_chunk_num)
      docs_keyword.reverse()
      # the rest from embedding simularity search
      left = total_chunk_num - len(docs_keyword)
      docs_embd = g_vectorstore.fetch_documents_by_query(question, left)
      docs_embd.reverse()
      docs_keyword = [doc[1] for doc in docs_keyword]
      docs_embd = [doc[1] for doc in docs_embd]
      len_keyword = 0
      len_embd = 0
      for d in docs_keyword:
        len_keyword += len(d)
      for d in docs_embd:
        len_embd += len(d)
      ctx_chat = "\n\n".join(doc for doc in (docs_embd + docs_keyword))
      ctx_debug = f"fetched chunks: keyword={len(docs_keyword)}/{len_keyword} embedding={len(docs_embd)}/{len_embd}\n"
      ctx_debug += "\n\n=================\n\n".join(doc for doc in docs_embd + docs_keyword)
      formatted_ctx_chat = f"========= Here is context info =========\n{ctx_chat}\n\n========= End of context info ========="
      prompt = f"\nPlease review the above context very very carefully then answer the following question step by step:\n{question}"

    if g_cfg.PGAI_MODE == PGAI_Mode.Disabled:
      return g_ollama_client.ask_question(question=prompt, context=formatted_ctx_chat), ctx_debug
    else:
      return g_vectorstore.chat_complete(query=prompt, context=formatted_ctx_chat), ctx_debug

In [None]:
def load_urls(urls):
  if type(urls) == str:
    urls = urls.replace("\n", ",").split(",")

  urls = [url.strip() for url in urls]
  urls = [url for url in urls if url]
  for url in urls:
    if g_vectorstore.document_is_in_db(url):
      print(f"{url} already in db")
      continue;
    docs = load_url(url)
    if docs:
      g_vectorstore.add_documents_and_embeddings(docs, url)
  loaded = g_vectorstore.fetch_all_documents_sources()

  return "\n".join(loaded)

with gr.Blocks() as demo:
  with gr.Tab("Load from URLs"):
    with gr.Row():
      with gr.Column():
        url_output = gr.Textbox(label="URLs of content already in Content Store",
                                lines=12)
      with gr.Column():
        url_input = gr.Textbox(label="Wiki URLs to be loaded to Content Store",
                              lines=9,
                              value="https://en.wikipedia.org/wiki/World_War_II\nhttps://en.wikipedia.org/wiki/World_War_I")
        submit_url_button = gr.Button(value="Load URLs")

    submit_url_button.click(load_urls, inputs=[url_input], outputs=[url_output])

  with gr.Tab("Ask Question"):
    with gr.Column():
      question_input = gr.Textbox(lines=2, label="Type your question here.", value="How many lives were lost in World War Two?")
      with gr.Row():
        use_context_checkbox = gr.Checkbox(label="Fetch context from Content Store", value=True)
        number_chunks = gr.Number(value=g_cfg.NUM_OF_CHUNKS, label="Enter number of chunks to fetch from DB", maximum=20, minimum=0)
        submit_button = gr.Button(value="Submit")

    with gr.Column():
      answer_output = gr.Textbox(lines=10, label="Answer")
      context_output = gr.Textbox(lines=10, label="Context fetched from Content Store")
    submit_button.click(rag_chain, inputs=[question_input, use_context_checkbox, number_chunks], outputs=[answer_output, context_output])


In [None]:
def test_load_urls():
  urls = [
    "https://en.wikipedia.org/wiki/World_War_II",
    "https://en.wikipedia.org/wiki/World_War_I",
    #"https://en.wikipedia.org/wiki/Pacific_War",
    #"https://en.wikipedia.org/wiki/American_Civil_War",
    #"https://en.wikipedia.org/wiki/Vietnam_War",
    #"https://en.wikipedia.org/wiki/Korean_War",
    #"https://en.wikipedia.org/wiki/Gulf_War",
    #"https://en.wikipedia.org/wiki/Soviet%E2%80%93Afghan_War"
  ]
  """ urls = [
    "https://en.wikipedia.org/wiki/Cultural_depictions_of_dogs",
    #"https://en.wikipedia.org/wiki/Cultural_depictions_of_cats",
    #"https://en.wikipedia.org/wiki/Pigs_in_culture",
    #"https://en.wikipedia.org/wiki/Computer_mouse",
    #"https://en.wikipedia.org/wiki/Austin,_Texas"
  ]"""
  load_urls(urls)

def test_fetch_documents():
  result_k = g_vectorstore.fetch_documents_by_query_keywords("when did it start?", 10)
  print(f"keyword fetch: len={len(result_k)} result={result_k}")
  result_e = g_vectorstore.fetch_documents_by_query("Cultural depictions of dogs", 10)
  print(f"embedding fetch: len={len(result_e)}, result={result_e}")

def test_rag():
  question = "Why people love dogs?"
  use_context = True
  num_of_chunks = g_cfg.NUM_OF_CHUNKS
  answer = rag_chain(question, use_context, num_of_chunks)
  print("\n\n===================\n\n\context:\n\n", answer[1])
  print("\n\n===================\n\n\nanswer:\n\n", answer[0])

In [None]:

test_load_urls()
test_fetch_documents()

{'pageid': 32927, 'ns': 0, 'title': 'World War II', 'extract': 'World War II or the Second World War (1 September 1939 – 2 September 1945) was a global conflict between two coalitions: the Allies and the Axis powers. Nearly all the world\'s countries—including all the great powers—participated, with many investing all available economic, industrial, and scientific capabilities in pursuit of total war, blurring the distinction between military and civilian resources. Tanks and aircraft played major roles, with the latter enabling the strategic bombing of population centres and delivery of the only two nuclear weapons ever used in war. World War II was the deadliest conflict in history, resulting in 70 to 85 million fatalities, more than half of which were civilians. Millions died in genocides, including the Holocaust of European Jews, and by massacres, starvation, and disease. Following the Allied powers\' victory, Germany, Austria, Japan, and Korea were occupied, and war crimes tribuna

In [None]:
def test_pgai_generate():
  conn = psycopg2.connect(g_cfg.DB_USED_BY_VECTOR_STORE)
  cursor = conn.cursor()

  if g_cfg.LLM_MODE == LLM_Mode.LocalOllama:
    cmd = f"SELECT ai.ollama_generate('{g_cfg.CHAT_MODEL}', 'what is the typical weather like in Champaign, IL in October.');"

    #
    try:
      cursor.execute(cmd)
      result = cursor.fetchall()
      print(result)
      reply = result[0][0]
      print(reply)
      print("\ndone:", reply["done"],
            "\ndone_reason:", reply["done_reason"],
            "\nmodel:", reply["model"],
            "\nresponse:", reply["response"],
            "\nprompt_eval_count:", reply["prompt_eval_count"],
            "\neval_count:", reply["eval_count"],
            "\ntotal_duration:", reply["total_duration"])
    except psycopg2.errors.InsufficientPrivilege as e:
      print(f"Permission denied: {e}", "As expected, continue...")
      conn.rollback()

    cursor.execute(cmd)
    result = cursor.fetchall()
    print(result)
    reply = result[0][0]
    print(reply)
    print("\ndone:", reply["done"],
          "\ndone_reason:", reply["done_reason"],
          "\nmodel:", reply["model"],
          "\nresponse:", reply["response"],
          "\nprompt_eval_count:", reply["prompt_eval_count"],
          "\neval_count:", reply["eval_count"],
          "\ntotal_duration:", reply["total_duration"])
  else:
    if g_cfg.LLM_MODE == LLM_Mode.OpenAI:
      func_chat = "openai_chat_complete"
    elif g_cfg.LLM_MODE == LLM_Mode.AnCo:
      func_chat = "anthropic_generate"
    else:
      assert False, f"g_cfg.LLM_MODE"

    cmd = f"""
          SELECT jsonb_pretty(
            ai.{func_chat}(
              '{g_cfg.CHAT_MODEL}',
            jsonb_build_array(
              -- jsonb_build_object('role', 'system', 'content', 'you are a helpful assistant'),
              jsonb_build_object('role', 'user', 'content', 'what is the typical weather like in Champaign, IL in Sept.')
              )
            , _api_key => %s)
          );
          """
    print(cmd)
    caught_exception = False
    try:
      cursor.execute(cmd, (g_cfg.CHAT_API_KEY, ))
      result = cursor.fetchall()[0][0]
      result = json.loads(result)
      if g_cfg.LLM_MODE == LLM_Mode.OpenAI:
        print(result["choices"][0]["message"]["content"])
      else:
        print(result["content"][0]["text"])
    except psycopg2.errors.InsufficientPrivilege as e:
      print(f"Permission denied: {e}", "As expected, continue...")
      conn.rollback()
      caught_exception = True

    if g_cfg.PGAI_MODE == PGAI_Mode.Function:
      assert caught_exception == True, "should caught InsufficientPrivilege"
    else:
      assert caught_exception == False, "should not caught InsufficientPrivilege"

def test_embedding():
  query = "the purple elephant sits on a red mushroom"
  conn = psycopg2.connect(g_cfg.DB_USED_BY_VECTOR_STORE)
  cursor = conn.cursor()

  if g_cfg.LLM_MODE == LLM_Mode.LocalOllama:
      func_embed = "ollama_embed"
      sql_embed_options = ''
      test_db_params = ("", query, "{}", query, )
      # test Ollama call
      embedding_service = OllamaEmbeddings(model=g_cfg.EMBEDDING_MODEL)
      embd_python = embedding_service.embed_query(query)
      print(f"embedding by Ollama python module: len={len(embd_python)}\n{embd_python[:5]}")

      # ollama module
      response = ollama.embeddings(model=g_cfg.EMBEDDING_MODEL, prompt=query)
      print(f"embedding by ollama module: len={len(response['embedding'])}\n{response['embedding'][:5]}")
  else:
      if g_cfg.LLM_MODE == LLM_Mode.OpenAI:
        func_embed = "openai_embed"
        sql_embed_options = f', _api_key => %s'
      elif g_cfg.LLM_MODE == LLM_Mode.AnCo:
        func_embed = "cohere_embed"
        sql_embed_options = f", _api_key => %s , _input_type => 'search_document'"
      else:
        assert False, f"g_cfg.LLM_MODE"
      test_db_params = ("", query, "{}", query, g_cfg.EMBEDDING_API_KEY, )

  # test direct pgai call
  caught_exception = False
  try:
    if g_cfg.LLM_MODE == LLM_Mode.LocalOllama:
      cmd = f"SELECT ai.{func_embed}('{g_cfg.EMBEDDING_MODEL}', '{query}');"
      print(f"test direct pgai call, sql: {cmd}")
      cursor.execute(cmd)
    else:
      cmd = f"SELECT ai.{func_embed}('{g_cfg.EMBEDDING_MODEL}', '{query}' {sql_embed_options})"
      print(f"test direct pgai call, sql: {cmd}")
      cursor.execute(cmd, (g_cfg.EMBEDDING_API_KEY, ))

    result = cursor.fetchall()
    embd_pgai = json.loads(result[0][0])
    print(f"pgai command: {cmd}")
    print(f"embedding by pgai: len={len(embd_pgai)}\n{embd_pgai[:5]}")

  except psycopg2.errors.InsufficientPrivilege as e:
    print(f"Permission denied: {e}", "As expected, continue...")
    conn.rollback()
    caught_exception = True

  if g_cfg.PGAI_MODE == PGAI_Mode.Function:
    assert caught_exception == True, "should caught InsufficientPrivilege"
  else:
    assert caught_exception == False, "should not caught InsufficientPrivilege"

  caught_exception = False
  # test db sql
  sql = f"""
      WITH inserted AS (
          -- Insert into the content_tb and return the id
          INSERT INTO content_tb (source, contents, metadata)
          VALUES (%s, %s, %s)
          RETURNING id
      ), embed AS (
          -- Use the id from the previous insertion and get the embedding
          SELECT inserted.id, ai.{func_embed}('{g_cfg.EMBEDDING_MODEL}', %s {sql_embed_options}) AS embed2
          FROM inserted
      )
      -- Insert the embedding into the embedding table using the id from the first insert
      INSERT INTO embedding_{g_cfg.EMBEDDING_SIZE}_tb (id, embedding)
      SELECT embed.id, subvector(embed.embed2, 1, {g_cfg.EMBEDDING_SIZE})
      FROM embed
      RETURNING id, embedding;
      """
  print(f"test db, sql: {sql}")
  try:
    cursor.execute(sql, test_db_params)
    result = cursor.fetchall()
    embd_db = json.loads(result[0][1])
    print(f"embedding by DB sql: len={len(embd_db)}\n{embd_db[:5]} chunk_id = {result[0][0]}")
  except psycopg2.errors.InsufficientPrivilege as e:
    print(f"Permission denied: {e}", "As expected, continue...")
    conn.rollback()
    caught_exception = True

  if g_cfg.PGAI_MODE == PGAI_Mode.Function:
    assert caught_exception == True, "should caught InsufficientPrivilege"
  else:
    assert caught_exception == False, "should not caught InsufficientPrivilege"

  # test db function
  sql = f"SELECT * from add_document_and_embedding(%s, %s, %s, %s);"
  print(f"test db function, sql: {sql}")
  cursor.execute(sql, ('source', query, '{}', g_cfg.EMBEDDING_API_KEY))
  result = cursor.fetchall()
  embd_db = json.loads(result[0][1])
  print(f"embedding by DB func: len={len(embd_db)}\n{embd_db[:5]} chunk_id = {result[0][0]}")

  ### test fetch
  conn.commit()
  print("test fetch by embedding", type(embd_db), len(embd_db))
  a = g_vectorstore._fetch_documents_by_embedding(embd_db[0:g_cfg.EMBEDDING_SIZE], 10)
  print(a)

  print("test fetch by query")
  a = g_vectorstore.fetch_documents_by_query(query, 10)
  print(a)

In [None]:
test_embedding()

test_pgai_generate()

test_rag()

test direct pgai call, sql: SELECT ai.openai_embed('text-embedding-3-large', 'the purple elephant sits on a red mushroom' , _api_key => %s)
Permission denied: permission denied for schema ai
LINE 1: SELECT ai.openai_embed('text-embedding-3-large', 'the purple...
               ^
 As expected, continue...
test db, sql: 
      WITH inserted AS (
          -- Insert into the content_tb and return the id
          INSERT INTO content_tb (source, contents, metadata)
          VALUES (%s, %s, %s)
          RETURNING id
      ), embed AS (
          -- Use the id from the previous insertion and get the embedding
          SELECT inserted.id, ai.openai_embed('text-embedding-3-large', %s , _api_key => %s) AS embed2
          FROM inserted
      )
      -- Insert the embedding into the embedding table using the id from the first insert
      INSERT INTO embedding_3072_tb (id, embedding)
      SELECT embed.id, subvector(embed.embed2, 1, 3072)
      FROM embed
      RETURNING id, embedding;
      

In [None]:
demo.launch()

Setting queue=True in a Colab notebook requires sharing enabled. Setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
Running on public URL: https://eaad3a7051bb3ebfd9.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)


