In [None]:
from google.colab import drive
drive.mount('/content/drive')

root_path = "/content/drive/MyDrive/Diploma-mag"

Mounted at /content/drive


In [None]:
import os
from dotenv import load_dotenv, find_dotenv

from langchain_gigachat import Gigachat
from langchain_gigachat.embeddings import GigaChatEmbeddings


load_dotenv(find_dotenv())

def init_gigachat():
    return Gigachat(credentials="ключ_авторизации", model="GigaChat-Max", verify_ssl_certs=False, temperature=1e-15, timeout=100)


def init_gigachat_embeddings():
    return GigaChatEmbeddings(credentials="ключ_авторизации", scope-"GIGACHAT_API_PER", verify_ssl_certs=False)

In [None]:
import torch
import open_clip

from PIL import Image
from typing import List, Union
from langchain_core.embeddings import Embeddings
from tqdm import tqdm


class OpenCLIPEmbeddings(Embeddings):
    def __init__(self, model_name: str = "ViT-H-14", pretrained: str = "/Users/21451364/Downloads/CLIP-ViT-H-14-laion2B-s32B-b79K/open_clip_model.safetensors"):
        self.model,  self.preprocess = open_clip.create_model_from_pretrained(model_name, pretrained=pretrained)
        self.tokenizer = open_clip.get_tokenizer(model_name)
        self.model.eval()

    def _normalize(self, tensor: torch.Tensor) -> List[float]:
        return tensor.div(tensor.norm(p=2, dim=-1, keepdim=True)).squeeze(0).tolist()

    def embed_text(self, text: str) -> List[float]:
        tokens = self.tokenizer(text)
        with torch.no_grad():
            embeddings = self.model.encode_text(tokens)
        return self._normalize(embeddings)

    def embed_image(self, uris: List[str]) -> List[List[float]]:
        pil_images = [Image.open(uri) for uri in uris]
        image_features = []
        for pil_image in tqdm(pil_images):
            preprocessed_image = self.preprocess(pil_image).unsqueeze(0)
            embeddings_tensor = self.model.encode_image(preprocessed_image)
            norm = embeddings_tensor.norm(p=2, dim=1, keepdim=True)
            normalized_embeddings_tensor = embeddings_tensor.div(norm)
            embeddings_list = normalized_embeddings_tensor.squeeze(0).tolist()
            image_features.append(embeddings_list)
        return image_features

    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        return [self.embed_image(text) for text in texts]

    def embed_query(self, text: str) -> List[float]:
        return self.embed_text(text)

In [None]:
llm = init_gigachat()
embeddings = init_gigachat_embeddings()
image_embeddigns = OpenCLIPEmbeddings()

model_id = "gigachat"
text_embedding_model = "gigaEmbeddings"
image_embeddign_model = "clip"

img_vectorstore_dir = f"./data/multimodal_rag/image_{model_id}_vectorstore_{image_embeddign_model}"
text_vectorstore_dir = f"./data/multimodal_rag/text_{model_id}_vectorstore_{text_embedding_model}"

In [None]:
from langchain_chroma import Chroma
from chromadb.config import Settings

text_vectorstore = Chroma(
    persist_directory=text_vectorstore_dir,
    embedding_function=embeddings,
    collection_name="mm_rag_text_gigaembeddings",
    client_settings=Settings(anonymized_telemetry=False)
)

img_vectorstore = Chroma(
        persist_directory=img_vectorstore_dir,
        embedding_function=image_embeddigns,
        collection_name=f"mm_rag_image_gigaembeddings",
        client_settings=Settings(anonymized_telemetry=False)
)

In [None]:
import json
with open("./extracted_data/extracted_texts.json", "r") as f:
    documents = json.load(f)

with open("./extracted_data/extracted_images.json", "r") as f:
    images = json.load(f)

In [None]:
uris = [f"./extracted_data/source_images/image_{j}.png" for j in range(len(images))]
metadatas = [{"page_number": images[j]["page_number"], "image_path": uris[j]} for j in range(len(images))]

img_vectorstore.add_images(
    uris=uris,
    metadatas=metadatas
)

In [None]:
import json
with open("texts.json", "r") as f:
    documents = json.load(f)

with open("images.json", "r") as f:
    images = json.load(f)

In [None]:
import uuid
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter

text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=0)

documents_content = []
documents_page = []

for d in documents:
    documents_content.append(d["text"])
    documents_page.append(d["metadata"]["page_number"])

prepared_text = []
for i, document_content in enumerate(documents_content):
    chunks = text_splitter.split_text(document_content)
    doc_ids += [str(uuid.uuid4()) for _ in chunks]
    prepared_text += [Document(
        page_content=chunk,
        metadata={
            "page_number": documents_page[i]
        })
    for j, chunk in enumerate(chunks)]

all_chunks = [text.page_content for text in prepared_text]

text_vectorstore.add_documents(prepared_text)

In [None]:
from langchain_core.messages import HumanMessage
from langchain.prompts import ChatPromptTemplate
from prompts import QA_PROMPT_SYSTEM, QA_PROMPT_USER

def run_pipeline(question, text_vectorstore, img_vectorstore, llm):
    text_content = text_vectorstore.similarity_search(question, k=4)
    image_content = img_vectorstore.similarity_search(question, k=1)

    context = "\n\n".join([t.page_content for t in text_content])
    img_path = image_content[0].metadata["image_path"]
    file = llm.upload_file(open(img_path, "rb"))

    text_content = QA_PROMPT_USER.format(context=context, question=question)

    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", QA_PROMPT_SYSTEM),
            HumanMessage(content=text_content, additional_kwargs={"attachments": [file.id_]})
        ]
    )
    chain = prompt | llm

    return  chain.invoke({}).content

In [None]:
question = "Как выделить прямоугольную область на изображении в Adobe Photoshop?"
run_pipeline(question, text_vectorstore, img_vectorstore, llm)

# LLaVa

In [None]:
!pip install -q -U transformers bitsandbytes accelerate
#!pip install -q bitsandbytes==0.41.3 accelerate==0.25.0

In [None]:
from transformers import BitsAndBytesConfig, LlavaNextProcessor, LlavaNextForConditionalGeneration
from PIL import Image
import io
import pandas as pd
from typing import Tuple

quantization_config = BitsAndBytesConfig(
    load_in_8bit=True,
    low_cpu_mem_usage=True,
    use_flash_attention_2=True
)

def format_prompt_with_image(prompt: str) -> str:
    return f"[INST] <image>\n{prompt} [/INST]"


def get_qa_prompt(model_id:str, system_prompt:str, question: str, context: str, image: Image=None) -> str:
    if "vicuna" in model_id:
        prompt = f"USER:{'<image>' if image else ' '}\n{system_prompt}\n{context}\n\nQuestion:\n{question}\n\nASSISTANT:"
    else:   # mistral
        prompt = f"[INST]{'<image>' if image else ' '}\n{system_prompt}\n{context}\n\nQuestion:\n{question}\n\n[/INST]"
    return prompt


def format_output(raw_output, processor: LlavaNextProcessor, prompt: str) -> str:
    out = processor.decode(raw_output[0], skip_special_tokens=True)
    out_prompt = prompt.replace("<image>", " ").strip()
    formatted_output = out.replace(out_prompt, "").strip()
    return formatted_output


def get_prompt(task: str, model_id: str, system_prompt: str, text: str, image: Image, question: str) -> str:
    prompt = get_qa_prompt(model_id, system_prompt, question, text, image)
    return prompt


def llava_call(prompt: str, model: LlavaNextForConditionalGeneration, processor: LlavaNextProcessor, device: str, image: Image=None) -> str:
    inputs = processor(prompt, image, return_tensors="pt").to(device)
    raw_output = model.generate(**inputs, max_new_tokens=300)
    formatted_output = format_output(raw_output, processor, prompt)
    return formatted_output


def load_llava_model(model_id: str) -> Tuple[LlavaNextForConditionalGeneration, LlavaNextProcessor]:
    processor = LlavaNextProcessor.from_pretrained(model_id)
    model = LlavaNextForConditionalGeneration.from_pretrained(model_id, quantization_config=quantization_config, device_map="auto", cache_dir=root_path + "/models")
    #model = LlavaNextForConditionalGeneration.from_pretrained(model_id, device_map="auto")

    return model, processor

In [None]:
from prompts import QA_PROMPT_SYSTEM, QA_PROMPT_USER

def run_pileline_llava(question, text_vectorstore, img_vectorstore, model, processor, device="cuda"):
    text_content = text_vectorstore.similarity_search(question, k=4)
    context = "\n\n".join([t.page_content for t in text_content])
    image_content = img_vectorstore.similarity_search(question, k=1)

    if len(image_content) > 0:
      img_path = image_content[0].metadata["image_path"]
      image = Image.open(img_path)
      img_prompt = get_qa_prompt("llava-hf/llava-v1.6-mistral-7b-hf", QA_PROMPT_SYSTEM, question, context, image)
      return llava_call(img_prompt, model, processor, device, image)

    no_img_prompt = get_qa_prompt("llava-hf/llava-v1.6-mistral-7b-hf", QA_PROMPT_SYSTEM, question, context)
    return llava_call(no_img_prompt, model, processor, device)



In [None]:
question = "What are the possible positions of the manual operator and what colors are associated with each position?"
model, processor = load_llava_model("llava-hf/llava-v1.6-mistral-7b-hf")
model = model.eval()

run_pileline_llava(question, text_vectorstore, img_vectorstore, model, processor)

# Оценка ответа