## SQLAlchemy and vector search (2 points)

### SQLAlchemy setup

For defining the table, we will use Python and [SQLAlchemy
framework](https://www.sqlalchemy.org/).

A good practice is to build the database URL using the SQLAlchemy library. This
option is much more readable, safer, and easier to maintain than using the
connection string directly.

In [1]:
from sqlalchemy.engine import URL

db_url = URL.create(
    drivername="postgresql+psycopg",
    username="postgres",
    password="password",
    host="localhost",
    port=5555,
    database="similarity_search_service_db",
)

Tables in SQLAlchemy are defined using class-based design. Typically,
application defines a single `Base` class, from which concrete tables inherit.
They are defined quite similarly to Pydantic, i.e. with attributes and types.
However, here we also need to assign concrete database types. To integrate it
with vector search, we will also use `pgvector` library.

In [2]:
from typing import List
from pgvector.sqlalchemy import Vector
from sqlalchemy import Integer, String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column


# Create the base class for the table definition
class Base(DeclarativeBase):
    __abstract__ = True


# Create the table definition
class Images(Base):
    __tablename__ = "images"
    VECTOR_LENGTH = 512

    # primary key
    id: Mapped[int] = mapped_column(Integer, primary_key=True)

    # image path - we will use it to store the path to the image file, after
    # similarity search we can use it to retrieve the image and display it
    image_path: Mapped[str] = mapped_column(String(256))

    # image embedding - we will store the image embedding in this column, the
    # image embedding is a list of 512 floats this is the output of the sentence
    # transformer model
    image_embedding: Mapped[List[float]] = mapped_column(Vector(VECTOR_LENGTH))

To actually connect to the database, interact with it and run queries, we use
engine object. It is created with `create_engine()` function.

In [4]:
from sqlalchemy import create_engine, text

engine = create_engine(db_url)

with engine.connect() as conn:
    conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector;"))
    conn.commit()

Now we can create the table in the database.

In [5]:
Base.metadata.create_all(engine)

### SQLAlchemy queries

For testing, we need to insert some data into the table. Code below has one
thing missing - you need to create the `Images` object based on provided data.

In [6]:
import sqlalchemy
import numpy as np

from sqlalchemy import select
from sqlalchemy.orm import Session


# reusable function to insert data into the table
def insert_image(engine: sqlalchemy.Engine, image_path: str, image_embedding: list[float]):
    with Session(engine) as session:
        # create the image object
        image = Images(image_path=image_path, image_embedding=image_embedding)
        # add the image object to the session
        session.add(image)
        # commit the transaction
        session.commit()


# calculate the cosine similarity between the first image and the K rest of the
# images, order the images by the similarity score
def find_k_images(engine: sqlalchemy.Engine, k: int, orginal_image: Images) -> list[Images]:
    with Session(engine) as session:
        # execution_options={"prebuffer_rows": True} is used to prebuffer the
        # rows, this is useful when we want to fetch the rows in chunks and
        # return them after session is closed
        result = session.execute(
            (select(Images).order_by(Images.image_embedding.cosine_distance(orginal_image.image_embedding)).limit(k)),
            execution_options={"prebuffer_rows": True},
        )
        return result


# insert some data into the table
N = 100
for i in range(N):
    image_path = f"image_{i}.jpg"
    image_embedding = np.random.rand(512).tolist()
    insert_image(engine, image_path, image_embedding)

# select first image from the table
with Session(engine) as session:
    image = session.query(Images).first()

# find the 10 most similar images to the first image
k = 10
similar_images = find_k_images(engine, k, image).fetchall()

In [7]:
for img in similar_images:
    print(img[0].image_path)

image_0.jpg
image_2.jpg
image_83.jpg
image_30.jpg
image_47.jpg
image_60.jpg
image_20.jpg
image_70.jpg
image_69.jpg
image_77.jpg


## Filtering

For more filtering, we will need an actual dataset with rich metadata to filter
by. For this, we will utilize [Steam Games
Dataset](https://huggingface.co/datasets/FronkonGames/steam-games-dataset). It
is hosted on HuggingFace Hub, and we can download it with:

In [8]:
from datasets import load_dataset

dataset = load_dataset("FronkonGames/steam-games-dataset")

# get columns names and types
columns = dataset["train"].features
print(columns)

columns_to_keep = ["Name", "Windows", "Linux", "Mac", "About the game", "Supported languages", "Price"]

N = 40_000
dataset = dataset["train"].select_columns(columns_to_keep).select(range(N))

{'AppID': Value('int64'), 'Name': Value('string'), 'Release date': Value('string'), 'Estimated owners': Value('string'), 'Peak CCU': Value('int64'), 'Required age': Value('int64'), 'Price': Value('float64'), 'DLC count': Value('int64'), 'About the game': Value('string'), 'Supported languages': Value('string'), 'Full audio languages': Value('string'), 'Reviews': Value('string'), 'Header image': Value('string'), 'Website': Value('string'), 'Support url': Value('string'), 'Support email': Value('string'), 'Windows': Value('bool'), 'Mac': Value('bool'), 'Linux': Value('bool'), 'Metacritic score': Value('int64'), 'Metacritic url': Value('string'), 'User score': Value('int64'), 'Positive': Value('int64'), 'Negative': Value('int64'), 'Score rank': Value('float64'), 'Achievements': Value('int64'), 'Recommendations': Value('int64'), 'Notes': Value('string'), 'Average playtime forever': Value('int64'), 'Average playtime two weeks': Value('int64'), 'Median playtime forever': Value('int64'), 'Medi

We will use columns:
* `Name`
* `About the game`
* `Price`
* `Platforms` - the platforms on which the game is available; note that there is
  separate field for each platform (Windows, Linux, macOS)

For vector search, we can use the `About the game` column, which is an arbitrary
text description. A great model for this purpose is
`distiluse-base-multilingual-cased-v2` from [Sentence
Transformers](https://sbert.net/). This model is a multilingual text
transformers, and thus it will work well for descriptions of games in languages
other than English.

The `distiluse-base-multilingual-cased-v2` model supports over 50 languages.
This version is a distilled multilingual knowledge model derived from the
original Universal Sentence Encoder, which only supported 15 languages. While
the v2 model supports a wider range of languages, it is noted that its
performance may be a bit lower, compared to the original model for the languages
it shares with v1, particularly the first 15 languages that were supported
originally. Read the paper if you're interested:
https://arxiv.org/pdf/2004.09813.

When looking at
https://huggingface.co/sentence-transformers/distiluse-base-multilingual-cased-v2,
one can see that it produces 512-dimensional embeddings. This is a value we need
to declare in the table definition as vector length.

In [9]:
from sqlalchemy import Integer, Float, Boolean


class Games(Base):
    __tablename__ = "games"
    __table_args__ = {"extend_existing": True}

    # the vector size produced by the model taken from documentation
    # https://huggingface.co/sentence-transformers/distiluse-base-multilingual-cased-v2
    VECTOR_LENGTH = 512

    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    name: Mapped[str] = mapped_column(String(256))
    description: Mapped[str] = mapped_column(String(4096))
    windows: Mapped[bool] = mapped_column(Boolean)
    linux: Mapped[bool] = mapped_column(Boolean)
    mac: Mapped[bool] = mapped_column(Boolean)
    price: Mapped[float] = mapped_column(Float)
    game_description_embedding: Mapped[List[int]] = mapped_column(Vector(VECTOR_LENGTH))


Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)

Let's prepare the function that will generate the embeddings for the games
descriptions.

In [10]:
from sentence_transformers import SentenceTransformer


checkpoint = "distiluse-base-multilingual-cased-v2"
model = SentenceTransformer(checkpoint)


def generate_embeddings(text: str) -> list[float]:
    return model.encode(text)

Now let's prepare the function that will insert the data into the table.

In [11]:
from tqdm import tqdm


def insert_games(engine, dataset):
    with tqdm(total=len(dataset)) as pbar:
        for i, game in enumerate(dataset):
            game_description = game["About the game"] or ""
            game_embedding = generate_embeddings(game_description)
            name, windows, linux, mac, price = game["Name"], game["Windows"], game["Linux"], game["Mac"], game["Price"]
            if name and windows and linux and mac and price and game_description:
                game = Games(
                    name=game["Name"],
                    description=game_description[0:4096],
                    windows=game["Windows"],
                    linux=game["Linux"],
                    mac=game["Mac"],
                    price=game["Price"],
                    game_description_embedding=game_embedding,
                )
                with Session(engine) as session:
                    session.add(game)
                    session.commit()
            pbar.update(1)

Now we can insert the data into the table.

In [12]:
insert_games(engine, dataset)

100%|██████████| 40000/40000 [08:37<00:00, 77.29it/s] 


Now the function that will find the games similar to the given game, and also
include given filtering criteria.

In [None]:
from typing import Optional


def find_game(
    engine: sqlalchemy.Engine,
    game_description: str,
    windows: Optional[bool] = None,
    linux: Optional[bool] = None,
    mac: Optional[bool] = None,
    price: Optional[int] = None,
):
    with Session(engine) as session:
        game_embedding = generate_embeddings(game_description)  # generate game embedding

        query = select(Games).order_by(Games.game_description_embedding.cosine_distance(game_embedding))

        if price:
            query = query.filter(Games.price <= price)
        if windows:
            query = query.filter(Games.windows)
        if linux:
            query = query.filter(Games.linux)
        if mac:
            query = query.filter(Games.mac)

        result = session.execute(query, execution_options={"prebuffer_rows": True})
        game = result.scalars().first()

        return game

Our first vector search service is ready to use! Let's check it out.

In [14]:
game = find_game(engine, "This is a game about a hero who saves the world", price=10)
print(f"Game: {game.name}")
print(f"Description: {game.description}")

game = find_game(engine, game_description="Home decorating", price=20)
print(f"Game: {game.name}")
print(f"Description: {game.description}")

Game: Ultimate Spider Hero
Description: Ultimate Spider Hero game was designed for real heroes! Your mission is to help poor residents of the Metropolis and to save them from the terrible monsters. Move forward to fight your enemies and try not to fall! Features: Simple and addictive gameplay Nice graphics Awesome Ultimate Spider Hero Countless Steam achievements for you to collect! Compatibility with multiple major platforms (Windows, Mac, Linux, SteamOS) Make your way through the endless labyrinths of long, confusing city streets together with your favorite hero from countless movies and cartoons! Although this may look simple enough, things are not as easy as they seem. You will have to learn how to cling into houses properly using your web, otherwise you will fall to your demise. If you manage to do so - you will become a real superhero, armed with elusiveness, agility and speed and the ability to tirelessly swing across the rooftops and between the huge skyscrapers this urban land

Let's change the filtering requirements:

In [15]:
game = find_game(engine, game_description="Home decorating", mac=True, price=5)
print(f"Game: {game.name}")
print(f"Description: {game.description}")

Game: 3D PUZZLE - Old House
Description: Collect a 3D puzzle, transferring things to the right places to create a beautiful house. You need to go to the item, take it by pressing the left mouse button and take the item to the desired location marked in green. If you brought the correct item, it will snap into place and you will receive leaderboard points and achievements for this. Collect as much substance as possible as quickly as possible to get more points for the leaderboard. If you brought the wrong item, you can throw it away, it will return to the starting location so that you can pick it up again.


As you can see, while changing criteria to more strict, results can vary. This
is the consequence of a few things: how attribute filtering reduces the results,
how model interprets the similarity of descriptions, and how those two things
interact together. A major advantage of this approach is its overall simplicity.

## Retrieval-Augmented Generation (RAG) service (1 point)

### Vector database setup

In this part of laboratory, we will build a RAG service. It enhances the LLM
text generation capabilities with context and information drawn from a knowledge
base. Relevant textual information is found with vector search and appended to
the prompt, resulting in less hallucinations and more precise, relevant answers.

In such cases, we don't relaly need any additional capabilities like attributes
filtering, ACID, JOINs or other Postgres-related advantages. Thus, we will use
[Milvus](https://milvus.io/), a typical example of vector database. To generate
embeddings, we will use [Silver Retriever
model](https://huggingface.co/ipipan/silver-retriever-base-v1.1) from Sentence
Transformers. It is based on HerBERT model for Polish language, and finetuned
for retrieval of similar vectors.

Let's connect to the database. Milvus provides its own `pymilvus` library.


In [16]:
from pymilvus import MilvusClient

host = "localhost"
port = "19530"

milvus_client = MilvusClient(host=host, port=port)

Vector databases work quite similarly to document databases like e.g. MongoDB.
We define not a table, but a **collection** with specific **schema**, but
conceptually it's a bit similar. For each element, we have an ID, text, and its
embedding.

In [17]:
from pymilvus import FieldSchema, DataType, CollectionSchema

VECTOR_LENGTH = 768  # check the dimensionality for Silver Retriever Base (v1.1) model

id_field = FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, description="Primary id")
text = FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=4096, description="Page text")
embedding_text = FieldSchema("embedding", dtype=DataType.FLOAT_VECTOR, dim=VECTOR_LENGTH, description="Embedded text")

fields = [id_field, text, embedding_text]

schema = CollectionSchema(fields=fields, auto_id=True, enable_dynamic_field=True, description="RAG Texts collection")

To create a collection with the given schema:

In [18]:
COLLECTION_NAME = "rag_texts_and_embeddings"

milvus_client.create_collection(collection_name=COLLECTION_NAME, schema=schema)

index_params = milvus_client.prepare_index_params()

index_params.add_index(
    field_name="embedding",
    index_type="HNSW",
    metric_type="L2",
    params={"M": 4, "efConstruction": 64},  # lower values for speed
)

milvus_client.create_index(collection_name=COLLECTION_NAME, index_params=index_params)

# checkout our collection
print(milvus_client.list_collections())

# describe our collection
print(milvus_client.describe_collection(COLLECTION_NAME))

['rag_texts_and_embeddings']
{'collection_name': 'rag_texts_and_embeddings', 'auto_id': True, 'num_shards': 1, 'description': 'RAG Texts collection', 'fields': [{'field_id': 100, 'name': 'id', 'description': 'Primary id', 'type': <DataType.INT64: 5>, 'params': {}, 'auto_id': True, 'is_primary': True}, {'field_id': 101, 'name': 'text', 'description': 'Page text', 'type': <DataType.VARCHAR: 21>, 'params': {'max_length': 4096}}, {'field_id': 102, 'name': 'embedding', 'description': 'Embedded text', 'type': <DataType.FLOAT_VECTOR: 101>, 'params': {'dim': 768}}], 'functions': [], 'aliases': [], 'collection_id': 461898704957473071, 'consistency_level': 2, 'properties': {}, 'num_partitions': 1, 'enable_dynamic_field': True, 'created_timestamp': 461898722575122437}


Now we are able to insert documents into put database. RAG is the most useful
when information is very specialized, niche, or otherwise probably unknown to
the model or less popular. Let's start with ["IAB POLSKA Przewodnik po sztucznej
inteligencji"](https://www.iab.org.pl/wp-content/uploads/2024/04/Przewodnik-po-sztucznej-inteligencji-2024_IAB-Polska.pdf).
This part is inspired by [SpeakLeash](https://www.speakleash.org/) and one of
their projects
[Bielik-how-to-start](https://github.com/speakleash/Bielik-how-to-start?tab=readme-ov-file)
- [Bielik_2_(4_bit)_RAG
example](https://colab.research.google.com/drive/1ZdYsJxLVo9fW75uonXE5PCt8MBgvyktA?authuser=1).
Bielik is the first Polish LLM, and you can also explore other tutorials for its
usage. Let's define some constants for a start:

In [19]:
# define data source and destination
## the document origin destination from which document will be downloaded
pdf_url = "https://www.iab.org.pl/wp-content/uploads/2024/04/Przewodnik-po-sztucznej-inteligencji-2024_IAB-Polska.pdf"

## local destination of the document
file_name = "Przewodnik-po-sztucznej-inteligencji-2024_IAB-Polska.pdf"

## local destination of the processed document
file_json = "Przewodnik-po-sztucznej-inteligencji-2024_IAB-Polska.json"

## local destination of the embedded pages of the document
embeddings_json = "Przewodnik-po-sztucznej-inteligencji-2024_IAB-Polska-Embeddings.json"

## local destination of all above local required files
data_dir = "./data"

Let's download the document into the `data_dir` directory:

In [20]:
# download data
import os
import requests


def download_pdf_data(pdf_url: str, file_name: str) -> None:
    response = requests.get(pdf_url, stream=True)
    with open(os.path.join(data_dir, file_name), "wb") as file:
        for block in response.iter_content(chunk_size=1024):
            if block:
                file.write(block)


download_pdf_data(pdf_url, file_name)

This is a lot of text, and in RAG we need to add specific fragments to the
prompt. To keep things simple, and number of vectors not too large, we will
treat each page as a separate **chunk** to vectorize and search for. Below, we
paginate document and save each page separately into a JSON file in format
`{"page": page_number, "text": text_of_the_page}`.

In [21]:
# prepare data

import fitz
import json


def extract_pdf_text(file_name, file_json):
    document = fitz.open(os.path.join(data_dir, file_name))
    pages = []

    for page_num in range(len(document)):
        page = document.load_page(page_num)
        page_text = page.get_text()
        pages.append({"page_num": page_num, "text": page_text})

    with open(os.path.join(data_dir, file_json), "w") as file:
        json.dump(pages, file, indent=4, ensure_ascii=False)


extract_pdf_text(file_name, file_json)

Now we have texts, but we need vectors. We will use the model to embed text from
each page and save the result in out collection in Milvus. It's very easy if we
first prepare a single JSON file with all data. Its format is `{"page":
page_num, "embedding": embedded_text}`.

In [None]:
# vectorize data

import torch
import numpy as np
from sentence_transformers import SentenceTransformer


def generate_embeddings(file_json, embeddings_json, model):  # noqa: F811
    pages = []
    with open(os.path.join(data_dir, file_json), "r") as file:
        data = json.load(file)

    for page in data:
        pages.append(page["text"])

    embeddings = model.encode(pages)

    embeddings_paginated = []
    for page_num in range(len(embeddings)):
        embeddings_paginated.append({"page_num": page_num, "embedding": embeddings[page_num].tolist()})

    with open(os.path.join(data_dir, embeddings_json), "w") as file:
        json.dump(embeddings_paginated, file, indent=4, ensure_ascii=False)


model_name = "ipipan/silver-retriever-base-v1.1"
device = "cuda" if torch.cuda.is_available() else "cpu"
model = SentenceTransformer(model_name, device=device)
generate_embeddings(file_json, embeddings_json, model)

Now we can easily insert the data into Milvus:

In [23]:
def insert_embeddings(file_json, embeddings_json, client=milvus_client):
    rows = []
    with (
        open(os.path.join(data_dir, file_json), "r") as t_f,
        open(os.path.join(data_dir, embeddings_json), "r") as e_f,
    ):
        text_data, embedding_data = json.load(t_f), json.load(e_f)
        text_data = list(map(lambda d: d["text"], text_data))
        embedding_data = list(map(lambda d: d["embedding"], embedding_data))

        for page, (text, embedding) in enumerate(zip(text_data, embedding_data)):
            rows.append({"text": text, "embedding": embedding})

    client.insert(collection_name="rag_texts_and_embeddings", data=rows)


insert_embeddings(file_json, embeddings_json)

# load inserted data into memory
milvus_client.load_collection("rag_texts_and_embeddings")

Now let's do some semantic search!

In [24]:
# search
def search(model, query, client=milvus_client):
    embedded_query = model.encode(query).tolist()
    result = client.search(
        collection_name="rag_texts_and_embeddings",
        data=[embedded_query],
        limit=1,
        search_params={"metric_type": "L2"},
        output_fields=["text"],
    )
    return result


result = search(model, query="Czym jest sztuczna inteligencja")
print(result[0][0]["entity"]["text"])

Historia powstania
sztucznej inteligencji
7
W języku potocznym „sztuczny" oznacza to, co
jest 
wytworem 
mającym 
naśladować 
coś
naturalnego. W takim znaczeniu używamy
terminu ,,sztuczny'', gdy mówimy o sztucznym
lodowisku lub oku. Sztuczna inteligencja byłaby
czymś (programem, maszyną) symulującym
inteligencję naturalną, ludzką.
Sztuczna inteligencja (AI) to obszar informatyki,
który skupia się na tworzeniu programów
komputerowych zdolnych do wykonywania
zadań, które wymagają ludzkiej inteligencji. 
Te zadania obejmują rozpoznawanie wzorców,
rozumienie języka naturalnego, podejmowanie
decyzji, uczenie się, planowanie i wiele innych.
Głównym celem AI jest stworzenie systemów,
które są zdolne do myślenia i podejmowania
decyzji na sposób przypominający ludzki.
Historia sztucznej inteligencji sięga lat 50. 
XX wieku, kiedy to powstały pierwsze koncepcje
i modele tego, co mogłoby stać się sztuczną
inteligencją. Jednym z pionierów był Alan
Turing, który sformułował test Turinga, mający
na 

However, **this is not yet RAG!**. This is just searching through our
embeddings, without any LLM or generation. Many companies rely on external LLMs
used via API, due to easy setup, good scalability, and low cost. We will follow
this trend here and use Google Gemini API to generate answer with RAG.

Let's prepare the function that will call Google API and generate our response.

In [27]:
from google import genai

GEMINI_KEY = os.getenv("GEMINI_API_KEY")
gemini_client = genai.Client(api_key=GEMINI_KEY)

MODEL = "gemini-2.0-flash"


def generate_response(prompt: str):
    try:
        # Send request to Gemini 2.0 Flash API and get the response
        response = gemini_client.models.generate_content(
            model=MODEL,
            contents=prompt,
        )
        return response.text
    except Exception as e:
        print(f"Error generating response: {e}")
        return None

Now we can fully integrate everything into a RAG system. Fill the function below
that will augment the prompt with knowledge from Milvus, and then use the LLM to
generate an answer based on that context.

In [28]:
def build_prompt(context: str, query: str) -> str:
    prompt = f"""You will be given a Question and a Context to help answer it.

Your process must be:

1. First, analyze the Context and extract the key pieces of information directly
   relevant to the Question.

2. Second, use only these extracted facts to synthesize a final answer.

3. If no relevant information is found in the Context, respond with: "The
   provided context does not contain information to answer this question."

Context: {context}

Question: {query}

Answer:
"""

    return prompt


def rag(model, query: str) -> str:
    # having all prepared functions, you can combine them together and try to build your own RAG!
    ctx = search(model, query)[0][0]["entity"]["text"]
    prompt = build_prompt(ctx, query)
    return generate_response(prompt)

In [29]:
print(rag(model, "Jak wyglądają przepisy związane z generacją deepfake'ów?"))

1.  **Key Information Extraction:**

*   AI systems can be used to create digital content that is not real (deepfake).
*   Deepfakes are often created using someone's image (e.g., appearance, voice).
*   Deepfakes are to be defined in the AI Act, but existing laws apply, especially regarding personal rights.
*   Personal rights protect human integrity and are inalienable.
*   Image (appearance, voice) is a personal right (Article 23 of the Civil Code).
*   Using someone's image requires permission.
*   Distributing an image without permission (e.g., in a deepfake) violates the right to that image and can result in civil liability (e.g., compensation).
*   Deepfakes can violate other personal rights (e.g., good name) or constitute a crime (e.g., defamation, Article 212 §1 or §2 of the Criminal Code).
*   There are limited exceptions for disseminating an image without permission, such as for a publicly known person in connection with their professional duties (Article 81 of the Copyright

In [30]:
print(rag(model, "Czy dzisiaj będzie padać?"))

The provided context does not contain information to answer this question.

