In [6]:
import sqlalchemy
from sqlalchemy.engine import URL

In [7]:
db_url = URL.create(
    drivername="postgresql+psycopg",
    username="postgres",
    password="password",
    host="localhost",
    port="5555",
    database="similarity_search_service_db",
)

In [8]:
from pgvector.sqlalchemy import Vector
from sqlalchemy import Integer, String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column


# Create the base class for the table definition
class Base(DeclarativeBase):
    __abstract__ = True


# Create the table definition
class Images(Base):
    __tablename__ = "images"
    VECTOR_LENGTH = 512

    # primary key
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    # image path - we will use it to store the path to the image file, after similarity search we can use it to retrieve the image and display it
    image_path: Mapped[str] = mapped_column(String(256))
    # image embedding - we will store the image embedding in this column, the image embedding is a list of 512 floats this is the output of the sentence transformer model
    image_embedding: Mapped[list[float]] = mapped_column(Vector(VECTOR_LENGTH))

In [9]:
from sqlalchemy import create_engine

engine = create_engine(db_url)


In [10]:
Base.metadata.create_all(engine)

In [11]:
import numpy as np
from sqlalchemy import select
from sqlalchemy.orm import Session


# reusable function to insert data into the table
def insert_image(
    engine: sqlalchemy.Engine, image_path: str, image_embedding: list[float]
):
    with Session(engine) as session:
        # create the image object
        image = Images(image_path=image_path, image_embedding=image_embedding)
        # add the image object to the session
        session.add(image)
        # commit the transaction
        session.commit()


# insert some data into the table
N = 100
for i in range(N):
    image_path = f"image_{i}.jpg"
    image_embedding = np.random.rand(512).tolist()
    insert_image(engine, image_path, image_embedding)

# select first image from the table
with Session(engine) as session:
    image = session.query(Images).first()


# calculate the cosine similarity between the first image and the K rest of the images, order the images by the similarity score
def find_k_images(
    engine: sqlalchemy.Engine, k: int, orginal_image: Images
) -> list[Images]:
    with Session(engine) as session:
        # execution_options={"prebuffer_rows": True} is used to prebuffer the rows, this is useful when we want to fetch the rows in chunks and return them after session is closed
        result = session.execute(
            select(Images)
            .order_by(
                Images.image_embedding.cosine_distance(orginal_image.image_embedding)
            )
            .limit(k),
            execution_options={"prebuffer_rows": True},
        )
        return result


# find the 10 most similar images to the first image
k = 10
similar_images = find_k_images(engine, k, image)

In [12]:
# find the images with the similarity score greater than 0.9
def find_images_with_similarity_score_greater_than(
    engine: sqlalchemy.Engine, similarity_score: float, orginal_image: Images
) -> list[Images]:
    with Session(engine) as session:
        result = session.execute(
            select(Images).filter(
                Images.image_embedding.cosine_similarity(orginal_image.image_embedding)
                > similarity_score
            ),
            execution_options={"prebuffer_rows": True},
        )
        return result

In [13]:
from datasets import load_dataset

dataset = load_dataset("FronkonGames/steam-games-dataset")

# get columns names and types
columns = dataset["train"].features
print(columns)

columns_to_keep = [
    "Name",
    "Windows",
    "Linux",
    "Mac",
    "About the game",
    "Supported languages",
    "Price",
]

N = 40000
dataset = dataset["train"].select_columns(columns_to_keep).select(range(N))

{'AppID': Value('int64'), 'Name': Value('string'), 'Release date': Value('string'), 'Estimated owners': Value('string'), 'Peak CCU': Value('int64'), 'Required age': Value('int64'), 'Price': Value('float64'), 'DLC count': Value('int64'), 'About the game': Value('string'), 'Supported languages': Value('string'), 'Full audio languages': Value('string'), 'Reviews': Value('string'), 'Header image': Value('string'), 'Website': Value('string'), 'Support url': Value('string'), 'Support email': Value('string'), 'Windows': Value('bool'), 'Mac': Value('bool'), 'Linux': Value('bool'), 'Metacritic score': Value('int64'), 'Metacritic url': Value('string'), 'User score': Value('int64'), 'Positive': Value('int64'), 'Negative': Value('int64'), 'Score rank': Value('float64'), 'Achievements': Value('int64'), 'Recommendations': Value('int64'), 'Notes': Value('string'), 'Average playtime forever': Value('int64'), 'Average playtime two weeks': Value('int64'), 'Median playtime forever': Value('int64'), 'Medi

In [14]:
from sqlalchemy import Boolean, Float, Integer


class Games(Base):
    __tablename__ = "games"
    __table_args__ = {"extend_existing": True}

    # the vector size produced by the model taken from documentation https://huggingface.co/sentence-transformers/distiluse-base-multilingual-cased-v2
    VECTOR_LENGTH = 512

    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    name: Mapped[str] = mapped_column(String(256))
    description: Mapped[str] = mapped_column(String(4096))
    windows: Mapped[bool] = mapped_column(Boolean)
    linux: Mapped[bool] = mapped_column(Boolean)
    mac: Mapped[bool] = mapped_column(Boolean)
    price: Mapped[float] = mapped_column(Float)
    game_description_embedding: Mapped[list[float]] = mapped_column(
        Vector(VECTOR_LENGTH)
    )


Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)

In [15]:
from sentence_transformers import SentenceTransformer

checkpoint = "distiluse-base-multilingual-cased-v2"
model = SentenceTransformer(checkpoint)


def generate_embeddings(text: str) -> list[float]:
    return model.encode(text)

In [16]:
from tqdm import tqdm


def insert_games(engine, dataset):
    with tqdm(total=len(dataset)) as pbar:
        for i, game in enumerate(dataset):
            game_description = game["About the game"] or ""
            game_embedding = generate_embeddings(game_description)
            name, windows, linux, mac, price = (
                game["Name"],
                game["Windows"],
                game["Linux"],
                game["Mac"],
                game["Price"],
            )
            if name and windows and linux and mac and price and game_description:
                game = Games(
                    name=game["Name"],
                    description=game_description[0:4096],
                    windows=game["Windows"],
                    linux=game["Linux"],
                    mac=game["Mac"],
                    price=game["Price"],
                    game_description_embedding=game_embedding,
                )
                with Session(engine) as session:
                    session.add(game)
                    session.commit()
            pbar.update(1)

In [17]:
insert_games(engine, dataset)

100%|██████████| 40000/40000 [07:16<00:00, 91.62it/s] 


In [18]:
from typing import Optional


def find_game(
    engine: sqlalchemy.Engine,
    game_description: str,
    windows: Optional[bool] = None,
    linux: Optional[bool] = None,
    mac: Optional[bool] = None,
    price: Optional[int] = None,
):
    with Session(engine) as session:
        game_embedding = generate_embeddings(game_description)

        query = select(Games).order_by(
            Games.game_description_embedding.cosine_distance(game_embedding)
        )

        if price:
            query = query.filter(Games.price <= price)
        if windows:
            query = query.filter(Games.windows == True)
        if linux:
            query = query.filter(Games.linux == True)
        if mac:
            query = query.filter(Games.mac == True)

        result = session.execute(query, execution_options={"prebuffer_rows": True})
        game = result.scalars().first()

        return game

In [19]:
game = find_game(engine, "This is a game about a hero who saves the world", price=10)
print(f"Game: {game.name}")
print(f"Description: {game.description}")

game = find_game(engine, game_description="Home decorating", price=20)
print(f"Game: {game.name}")
print(f"Description: {game.description}")

Game: Ultimate Spider Hero
Description: Ultimate Spider Hero game was designed for real heroes! Your mission is to help poor residents of the Metropolis and to save them from the terrible monsters. Move forward to fight your enemies and try not to fall! Features: Simple and addictive gameplay Nice graphics Awesome Ultimate Spider Hero Countless Steam achievements for you to collect! Compatibility with multiple major platforms (Windows, Mac, Linux, SteamOS) Make your way through the endless labyrinths of long, confusing city streets together with your favorite hero from countless movies and cartoons! Although this may look simple enough, things are not as easy as they seem. You will have to learn how to cling into houses properly using your web, otherwise you will fall to your demise. If you manage to do so - you will become a real superhero, armed with elusiveness, agility and speed and the ability to tirelessly swing across the rooftops and between the huge skyscrapers this urban land

In [20]:
game = find_game(engine, game_description="city building", mac=True, price=5)
print(f"Game: {game.name}")
print(f"Description: {game.description}")

Game: Megapolis
Description: Build your own city and make heaps of money in this addictive strategy game. Construct beautiful and functional city zones with plenty of modern blocks and keep your residents happy by providing all services they need, and they will reward you with a tidy profit. It’s up to you how to play - you can work your way to the top in the comprehensive campaign mode or you can create and play your own custom scenarios. Build dozens of houses, structures and other buildings. Trophies and awards available for the meticulous gamer! Construct beautiful and functional city zones. Build dozens of houses, structures and other buildings. Win trophies and awards in 24 unique campaign scenarios. Play your own custom scenarios. Reach up to 22 achievements.


In [21]:
from pymilvus import MilvusClient

host = "localhost"
port = "19530"

milvus_client = MilvusClient(host=host, port=port)

In [22]:
from pymilvus import CollectionSchema, DataType, FieldSchema

VECTOR_LENGTH = 768

id_field = FieldSchema(
    name="id", dtype=DataType.INT64, is_primary=True, description="Primary id"
)
text = FieldSchema(
    name="text", dtype=DataType.VARCHAR, max_length=4096, description="Page text"
)
embedding_text = FieldSchema(
    "embedding",
    dtype=DataType.FLOAT_VECTOR,
    dim=VECTOR_LENGTH,
    description="Embedded text",
)

fields = [id_field, text, embedding_text]

schema = CollectionSchema(
    fields=fields,
    auto_id=True,
    enable_dynamic_field=True,
    description="RAG Texts collection",
)

In [None]:
COLLECTION_NAME = "rag_texts_and_embeddings"

milvus_client.create_collection(collection_name=COLLECTION_NAME, schema=schema)

index_params = milvus_client.prepare_index_params()

index_params.add_index(
    field_name="embedding",
    index_type="HNSW",
    metric_type="L2",
    params={"M": 4, "efConstruction": 64},  # lower values for speed
)

milvus_client.create_index(collection_name=COLLECTION_NAME, index_params=index_params)

# checkout our collection
print(milvus_client.list_collections())

# describe our collection
print(milvus_client.describe_collection(COLLECTION_NAME))

In [24]:
# define data source and destination
## the document origin destination from which document will be downloaded
pdf_url = "https://www.iab.org.pl/wp-content/uploads/2024/04/Przewodnik-po-sztucznej-inteligencji-2024_IAB-Polska.pdf"

## local destination of the document
file_name = "Przewodnik-po-sztucznej-inteligencji-2024_IAB-Polska.pdf"

## local destination of the processed document
file_json = "Przewodnik-po-sztucznej-inteligencji-2024_IAB-Polska.json"

## local destination of the embedded pages of the document
embeddings_json = "Przewodnik-po-sztucznej-inteligencji-2024_IAB-Polska-Embeddings.json"

## local destination of all above local required files
data_dir = "./data"

In [25]:
# download data
import os

import requests


def download_pdf_data(pdf_url: str, file_name: str) -> None:
    response = requests.get(pdf_url, stream=True)
    with open(os.path.join(data_dir, file_name), "wb") as file:
        for block in response.iter_content(chunk_size=1024):
            if block:
                file.write(block)


download_pdf_data(pdf_url, file_name)

In [26]:
# prepare data

import json

import fitz


def extract_pdf_text(file_name, file_json):
    document = fitz.open(os.path.join(data_dir, file_name))
    pages = []

    for page_num in range(len(document)):
        page = document.load_page(page_num)
        page_text = page.get_text()
        pages.append({"page_num": page_num, "text": page_text})

    with open(os.path.join(data_dir, file_json), "w") as file:
        json.dump(pages, file, indent=4, ensure_ascii=False)


extract_pdf_text(file_name, file_json)

In [27]:
# vectorize data

import numpy as np
import torch
from sentence_transformers import SentenceTransformer


def generate_embeddings(file_json, embeddings_json, model):
    pages = []
    with open(os.path.join(data_dir, file_json), "r") as file:
        data = json.load(file)

    for page in data:
        pages.append(page["text"])

    embeddings = model.encode(pages)

    embeddings_paginated = []
    for page_num in range(len(embeddings)):
        embeddings_paginated.append(
            {"page_num": page_num, "embedding": embeddings[page_num].tolist()}
        )

    with open(os.path.join(data_dir, embeddings_json), "w") as file:
        json.dump(embeddings_paginated, file, indent=4, ensure_ascii=False)


model_name = "ipipan/silver-retriever-base-v1.1"
device = "cuda" if torch.cuda.is_available() else "cpu"
model = SentenceTransformer(model_name, device=device)
generate_embeddings(file_json, embeddings_json, model)

'(ReadTimeoutError("HTTPSConnectionPool(host='huggingface.co', port=443): Read timed out. (read timeout=10)"), '(Request ID: 8d81d701-235c-4fe8-8a3d-e5e297a51093)')' thrown while requesting HEAD https://huggingface.co/ipipan/silver-retriever-base-v1.1/resolve/main/./modules.json
Retrying in 1s [Retry 1/5].


In [28]:
def insert_embeddings(file_json, embeddings_json, client=milvus_client):
    rows = []
    with (
        open(os.path.join(data_dir, file_json), "r") as t_f,
        open(os.path.join(data_dir, embeddings_json), "r") as e_f,
    ):
        text_data, embedding_data = json.load(t_f), json.load(e_f)
        text_data = list(map(lambda d: d["text"], text_data))
        embedding_data = list(map(lambda d: d["embedding"], embedding_data))

        for page, (text, embedding) in enumerate(zip(text_data, embedding_data)):
            rows.append({"text": text, "embedding": embedding})

    client.insert(collection_name="rag_texts_and_embeddings", data=rows)


insert_embeddings(file_json, embeddings_json)

# load inserted data into memory
milvus_client.load_collection("rag_texts_and_embeddings")

In [29]:
# search
def search(model, query, client=milvus_client):
    embedded_query = model.encode(query).tolist()
    result = client.search(
        collection_name="rag_texts_and_embeddings",
        data=[embedded_query],
        limit=1,
        search_params={"metric_type": "L2"},
        output_fields=["text"],
    )
    return result


result = search(model, query="Czym jest sztuczna inteligencja")

In [30]:
from dotenv import load_dotenv

load_dotenv()

True

In [31]:
from google import genai

GEMINI_KEY = os.getenv("GEMINI_API_KEY")
gemini_client = genai.Client(api_key=GEMINI_KEY)

MODEL = "gemini-2.0-flash"


def generate_response(prompt: str):
    try:
        # Send request to Gemini 2.0 Flash API and get the response
        response = gemini_client.models.generate_content(
            model=MODEL,
            contents=prompt,
        )
        return response.text
    except Exception as e:
        print(f"Error generating response: {e}")
        return None

In [32]:
def build_prompt(context: str, query: str) -> str:
    prompt = f"""
        You are a helpful assistant whose function is to answer Machine Learning related questions.
        Here is information information similar to user's question retrieved from the knowledge base: {context}

        User asked you the following question:
        {query}
        """
    return prompt


def rag(model, query: str) -> str:
    vector_search_result = search(model, query)
    prompt = build_prompt(vector_search_result, query)
    response = generate_response(prompt)
    return response

In [33]:
rag(model, "czym jest sztuczna inteligencja?")

'Sztuczna inteligencja (AI) to obszar informatyki, który skupia się na tworzeniu programów komputerowych zdolnych do wykonywania zadań, które wymagają ludzkiej inteligencji. Te zadania obejmują rozpoznawanie wzorców, rozumienie języka naturalnego, podejmowanie decyzji, uczenie się, planowanie i wiele innych. Głównym celem AI jest stworzenie systemów, które są zdolne do myślenia i podejmowania decyzji na sposób przypominający ludzki.\n'