<font size="5">**Extract information from image for Confluence Data**</font>

Для данной работы возникло предположение - поскольку chatGPT assistant при загрузке данных по факту построенны на RAG c chunk размером 800 и перекрытием 400, возникло предположение извлекать из отдельных страниц pdf-файлов изображений с целью формирования полных знаний.


<font size="5">Chart 1. Загрузка файлов из директории и выполнение целевого препроцессинга изображения</font>

In [None]:
import os 
import io 
from PyPDF2 import PdfReader
from docx import Document
from pdf2image import convert_from_path
from PIL import Image
from docx2pdf import convert

from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas

In [2]:
def save_texts_to_pdf(texts, output_pdf_path):
    # Save collected texts to a PDF file
    c = canvas.Canvas(output_pdf_path, pagesize=letter)
    width, height = letter
    y_position = height - 50  # Starting position on the page

    for text in texts:
        for line in text.split('\n'):
            c.drawString(50, y_position, line)
            y_position -= 15  # Move to the next line
            if y_position < 50:
                c.showPage()
                y_position = height - 50
    c.save()

In [17]:
def extract_image_from_file(file, path_dir, processed_dir, image_processing_function=None):
    def extract_text_from_doc(file, path_dir, processed_dir, image_processing_function=None):
        docx_path = os.path.join(path_dir, file)
        pdf_temp_path = os.path.join(processed_dir, file.replace('.docx', '_temp.pdf'))

        # Convert DOCX to PDF
        convert(docx_path, pdf_temp_path)

        # Convert PDF pages to images
        images = convert_from_path(pdf_temp_path)

        processed_texts = []
        for img in images:
            if image_processing_function:
                text = image_processing_function(img)
                processed_texts.append(text)
            else:
                # If default (image_processing_function=None), we collect empty strings
                processed_texts.append('')

        # Сreate a PDF from the collected texts
        output_pdf_path = os.path.join(processed_dir, file.replace('.docx', '.pdf'))
        save_texts_to_pdf(processed_texts, output_pdf_path)

        # Delete temp PDF file
        os.remove(pdf_temp_path)

    def extract_text_from_pdf(file, path_dir, processed_dir, image_processing_function=None):
        pdf_path = os.path.join(path_dir, file)
        # Convert PDF pages to images
        images = convert_from_path(pdf_path)

        processed_texts = []
        for img in images:
            if image_processing_function:
                text = image_processing_function(img)
                processed_texts.append(text)
            else:
                processed_texts.append('')

        # Сreate a PDF from the collected texts
        output_pdf_path = os.path.join(processed_dir, file)
        save_texts_to_pdf(processed_texts, output_pdf_path)

    file_name = file.lower()
    if file_name.endswith('.pdf'):
        extract_text_from_pdf(file, path_dir, processed_dir, image_processing_function)
    elif file_name.endswith('.docx'):
        extract_text_from_doc(file, path_dir, processed_dir, image_processing_function)

In [18]:
# test example function
def my_image_processing_function(img):
    buffer = io.BytesIO()
    img.save(buffer, format='PNG')
    img_bytes = buffer.getvalue()
    img_str = img_bytes.decode('latin1')  
    return img_str

In [19]:
def preprocessing_document(path_dir, image_processing_function):
    path_files = [f for f in os.listdir(path_dir)]

    processed_dir = os.path.join(path_dir, 'processed')

    # if not directory, create
    if not os.path.exists(processed_dir):
        os.makedirs(processed_dir)

    # main cycle preprocessing
    for file in path_files:
        extract_image_from_file(file, path_dir, processed_dir, image_processing_function=image_processing_function)
        


In [None]:
preprocessing_document("data_confluence", image_processing_function=my_image_processing_function)

<font size="5">Chart 2. Взаимодействие с OpenAPI для анализа изображения</font>

**Асинхронная версия**

In [22]:
import asyncio
import base64
import logging
import io 
from io import BytesIO
import os

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from dotenv import load_dotenv
from pdf2image import convert_from_path
from PyPDF2 import PdfReader
from docx import Document
from PIL import Image
from docx2pdf import convert

from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas

import nest_asyncio
nest_asyncio.apply()

logging.basicConfig(level=logging.INFO, format='%(levelname)s-%(asctime)s - %(message)s')

load_dotenv()

api_key = os.getenv("OPENAI_API_KEY")

In [8]:
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas

In [9]:
def save_texts_to_pdf(texts, output_pdf_path):
    # install font with russian language for to solve encoding issues
    pdfmetrics.registerFont(TTFont('DejaVuSans', 'DejaVuSans.ttf'))

    # Save collected texts to a PDF file
    c = canvas.Canvas(output_pdf_path, pagesize=letter)
    c.setFont('DejaVuSans', 12)
    width, height = letter
    y_position = height - 50  # Starting position on the page

    for text in texts:
        for line in text.split('\n'):
            c.drawString(50, y_position, line)
            y_position -= 15  # Move to the next line
            if y_position < 50:
                c.showPage()
                c.setFont('DejaVuSans', 12)
                y_position = height - 50
    c.save()

In [10]:
# Function to encode the image
def encode_image(image):
    from io import BytesIO
    buffered = BytesIO()
    image.save(buffered, format="JPEG")  # Сохраняем как JPEG
    return base64.b64encode(buffered.getvalue()).decode("utf-8")
  

async def image_processing_function(images, semaphore, file_name):
    client = ChatOpenAI(
        api_key=os.getenv("OPENAI_API_KEY"),
        model="gpt-4o-mini",
        max_retries=3,
        timeout=None,
        max_tokens=None,
    )

    prompt = """
<System>
You are an assistant for extracting data from images, which are just pages of doc or pdf documents.
This is a very important role, and the quality of the data depends entirely on your actions, and therefore the result of all the work. In this case, you are working with the documentation of an internal product based on OpenText.

<Instructions>
You need to perform the following, but do not mention these steps or the algorithm in your response. Only provide the final extracted data.

1. Extract the text from the page that is not linked to any graphic elements. It is very important not to shorten the code.
2. Extract data from the graphical elements using the following template:
<
[Entity_1 ("Entity_name_1") -> location1] - functional_description_1 and related_to_other_entities_1
[Entity_2 ("Entity_name_2") -> location2] - functional_description_2 and related_to_other_entities_2
...
[Entity_n ("Entity_name_n") -> location_n] - functional_description_n
>
After that, extract brief information by indirectly answering questions like "What is depicted here?" and "What functionality is being performed?".

3. Combine the data obtained in steps 1 and 2.

<Final Instruction>
Provide only the combined extracted data. Do not include any step numbers, explanations, or descriptions of your process in your response.
    """

    total_requests = len(images)
    completed_requests = 0

    async def process_image(image_path):
        nonlocal completed_requests
        async with semaphore:  # Limit concurrency with the semaphore
            base64_image = encode_image(image_path)
            input_model = [
                {
                    "role": "user",
                    "content": [
                        {"type": "text", "text": prompt},
                        {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}}
                    ]
                }
            ]
            response = await client.ainvoke(input=input_model, temperature=0.2)
            answer = response.content
            logging.info(answer)
            completed_requests += 1

            if completed_requests % 50 == 0 or completed_requests == total_requests:
                logging.info(f"File '{file_name}': complete {completed_requests}/{total_requests} query.")

            return answer

    tasks = [process_image(image) for image in images]
    results = await asyncio.gather(*tasks)
    return results

In [11]:
async def extract_image_from_file(file, path_dir, processed_dir, image_processing_function=None, semaphore=None):
    async def extract_text_from_doc(file, path_dir, processed_dir, image_processing_function=None, semaphore=None):
        docx_path = os.path.join(path_dir, file)
        pdf_temp_path = os.path.join(processed_dir, file.replace('.docx', '_temp.pdf'))

        # Convert DOCX to PDF
        convert(docx_path, pdf_temp_path)

        # Convert PDF pages to images
        images = convert_from_path(pdf_temp_path)

        processed_texts = []
        total_pages = len(images)
        page_number = 0

        if image_processing_function:
            texts = await image_processing_function(images, semaphore, file)
            processed_texts.extend(texts)
        else:
            processed_texts.extend([''] * total_pages)

        # Сreate a PDF from the collected texts
        output_pdf_path = os.path.join(processed_dir, file.replace('.docx', '.pdf'))
        save_texts_to_pdf(processed_texts, output_pdf_path)

        # Delete temp PDF file
        os.remove(pdf_temp_path)

    async def extract_text_from_pdf(file, path_dir, processed_dir, image_processing_function=None, semaphore=None):
        pdf_path = os.path.join(path_dir, file)
        # Convert PDF pages to images
        images = convert_from_path(pdf_path)

        processed_texts = []
        total_pages = len(images)
        page_number = 0

        if image_processing_function:
            texts = await image_processing_function(images, semaphore, file)
            processed_texts.extend(texts)
        else:
            processed_texts.extend([''] * total_pages)

        # Сreate a PDF from the collected texts
        output_pdf_path = os.path.join(processed_dir, file)
        save_texts_to_pdf(processed_texts, output_pdf_path)

    file_name = file.lower()
    if file_name.endswith('.pdf'):
        await extract_text_from_pdf(file, path_dir, processed_dir, image_processing_function, semaphore)
    elif file_name.endswith('.docx'):
        await extract_text_from_doc(file, path_dir, processed_dir, image_processing_function, semaphore)

In [12]:
async def preprocessing_document(path_dir, image_processing_function=image_processing_function):
    path_files = [f for f in os.listdir(path_dir) if f.endswith(('.pdf', '.docx'))]
    processed_dir = os.path.join(path_dir, 'processed')

    # if not directory, create
    if not os.path.exists(processed_dir):
        os.makedirs(processed_dir)

    total_files = len(path_files)  
    processed_count = 0  

    semaphore = asyncio.Semaphore(30)  

    async def process_file(file):
        nonlocal processed_count
        await extract_image_from_file(file, path_dir, processed_dir, image_processing_function=image_processing_function, semaphore=semaphore)
        processed_count += 1
        logging.info(f"Processing file: {file} ({processed_count}/{total_files})")

    tasks = [process_file(file) for file in path_files]

    await asyncio.gather(*tasks)

    logging.info("All complete all files from documents.")

In [None]:
# preprocessing definition folder
def start_processing():
    path_dir = "./data/data_confluence"
    
    asyncio.run(preprocessing_document(path_dir, image_processing_function=image_processing_function))

if __name__ == "__main__":
    start_processing()


<font size="5">Chart 3. Объединение всех pdf по confluence в одну
</font>

In [13]:
import os
from PyPDF2 import PdfMerger

def merde_pdfs(path_folder, output_pdf_path):
    pdf_files = [f for f in os.listdir(path_folder) if f.endswith(".pdf")]

    pdf_files.sort()

    merger = PdfMerger()

    for pdf in pdf_files:
        pdf_path = os.path.join(path_folder, pdf)
        with open(pdf_path, "rb") as file:
            merger.append(file)

    with open(output_pdf_path, "wb") as file:
        merger.write(file)
    
    merger.close()

In [None]:
if __name__ == "__main__":
    path_folder = "./data/data_confluence/processed"
    output_pdf_path = "./data/merge_pdf_confluence.pdf"

    merde_pdfs(path_folder, output_pdf_path)
    print(f"Merged pdfs and save in {output_pdf_path}")

<font size="5">**Extract information from Gitlab repository data**</font>

После выполнения сбора информации с confluence требуется также загрузить в ассистента информацию из gitlab репозиториев проекта chat-ai. 

Для этого мы загрузим в промт ассистента в instructions полностью все данные из confluence в секции additional data для того, чтобы попробовать сформировать полную документацию для кода на основе архитектуру и прочих данных из confluence.




In [None]:
import os
import logging
import asyncio
from tqdm import tqdm
from utils import OpenAIChatAssistant
from dotenv import load_dotenv
import json  

load_dotenv()

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Maximum content size for sending (256000 characters)
MAX_CONTENT_LENGTH = 256000

# Function to process a single file
async def process_file(idx, file_path, semaphore, assistant_id, total_files, output_file_path, lock, processed_files, state_file_path, max_retries=3):
    async with semaphore:
        # Check if the file has already been processed
        if file_path in processed_files:
            logger.info(f"File already processed, skipping: {file_path}")
            return

        for attempt in range(max_retries):
            try:
                logger.info(f"Processing file {idx + 1}/{total_files}, attempt {attempt + 1}")
                assistant = OpenAIChatAssistant(assistant_id=assistant_id, api_key=os.getenv("OPENAI_API_KEY"))
                await assistant.initialize()
                # Reading the content of the file
                with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                    content = f.read()

                # Checking the size of the content
                if len(content) > MAX_CONTENT_LENGTH:
                    logger.info(f"File too large and will be skipped: {file_path}")
                    # Updating the list of processed files
                    async with lock:
                        processed_files.add(file_path)
                        save_processed_files(processed_files, state_file_path)
                    return

                # Sending content to the assistant
                response = await assistant.send_message(content)
                # Writing the response to the output file
                async with lock:
                    with open(output_file_path, 'a', encoding='utf-8') as f_out:
                        f_out.write(f"File: {file_path}\nResponse:\n{response}\n\n")
                    # Updating the list of processed files
                    processed_files.add(file_path)
                    save_processed_files(processed_files, state_file_path)
                return
            except Exception as e:
                # Check if the error is due to content being too large
                if "string_above_max_length" in str(e):
                    logger.info(f"File too large and will be skipped: {file_path}")
                    # Updating the list of processed files
                    async with lock:
                        processed_files.add(file_path)
                        save_processed_files(processed_files, state_file_path)
                    return
                else:
                    logger.error(f"Error processing file {file_path}, attempt {attempt + 1}: {e}")
                    await asyncio.sleep(2 ** attempt)
        logger.error(f"Failed to process file {file_path} after {max_retries} attempts.")
        # Updating the list of processed files
        async with lock:
            processed_files.add(file_path)
            save_processed_files(processed_files, state_file_path)
        return

def load_processed_files(state_file_path):
    if os.path.exists(state_file_path):
        with open(state_file_path, 'r', encoding='utf-8') as f:
            return set(json.load(f))
    else:
        return set()

def save_processed_files(processed_files, state_file_path):
    with open(state_file_path, 'w', encoding='utf-8') as f:
        json.dump(list(processed_files), f)

async def main():
    assistant_id = os.getenv("OPENAI_ASST")

    if not assistant_id:
        name = "Coding assistant for chat-ai developer"
        text_for_instructions = "Documentation from Confluence..."
        template = """
        <Context>
        You are a developer assistant to help with development on the Chat-ai platform.
        Your current task is to expand the documentation of already written modules, as well as to compile their descriptions.
        At the same time, you must fully save the input code and return a response consisting of the source code and detailed documentation with comments.
        Temperature = 0.1

        <Additional information>

        Below is the documentation from confluence, it will allow you to more fully do the documentation and description of the module.

        {text_for_instructions}

        <Final instructions>
        Your current task is to expand the documentation of already written modules, as well as to compile their descriptions.
        At the same time, you must fully save the input code and return a response consisting of the source code and detailed documentation with comments.
        """
        prompt = template.format(text_for_instructions=text_for_instructions)

        # Creating the assistant
        assistant = OpenAIChatAssistant(assistant_id=None, api_key=os.getenv("OPENAI_API_KEY"))
        assistant.create_assistant(
            name=name,
            instructions=prompt,
            tools=[
                {"type": "code_interpreter"},
                {"type": "file_search"}
            ],
            model="gpt-4o-mini"
        )
        assistant_id = assistant.assistant_id
        model = assistant.model
        logger.info(f"Id Assistant: {assistant_id}")
        os.environ["OPENAI_ASST"] = assistant_id
        logger.info(f"Model: {model}")
    else:
        logger.info(f"Using existing Assistant ID: {assistant_id}")

    # Path to the directory with repositories
    data_gitlab_path = "./data/data_gitlab"
    output_data_path = "./data"

    # Path to the state file
    state_file_path = os.path.join(output_data_path, "processed_files.json")

    # Loading the list of processed files
    processed_files = load_processed_files(state_file_path)

    batch_size = 50  # Batch size

    # Getting the list of repositories
    repos = [d for d in os.listdir(data_gitlab_path) if os.path.isdir(os.path.join(data_gitlab_path, d))]

    for repo in repos:
        repo_path = os.path.join(data_gitlab_path, repo)
        logger.info(f"Processing repository: {repo}")

        # Collecting all files in the repository
        file_paths = []
        for root, dirs, files in os.walk(repo_path):
            for file in files:
                file_paths.append(os.path.join(root, file))

        total_files = len(file_paths)

        # Path to the output file
        output_file_path = os.path.join(output_data_path, f"{repo}.txt")

        # Lock for writing to file and updating state
        lock = asyncio.Lock()

        # Processing files in batches
        for batch_start in range(0, total_files, batch_size):
            batch_end = min(batch_start + batch_size, total_files)
            batch_file_paths = file_paths[batch_start:batch_end]
            logger.info(f"Starting processing batch of files from {batch_start + 1} to {batch_end}")

            max_concurrent_tasks = 5  
            semaphore = asyncio.Semaphore(max_concurrent_tasks)
            tasks = []

            for idx_in_batch, file_path in enumerate(batch_file_paths):
                idx = batch_start + idx_in_batch
                task = asyncio.create_task(process_file(idx, file_path, semaphore, assistant_id, total_files, output_file_path, lock, processed_files, state_file_path))
                tasks.append(task)

            # Creating a progress bar
            with tqdm(total=len(tasks), desc=f"Processing batch of files in {repo}", unit="file") as pbar:
                for future in asyncio.as_completed(tasks):
                    await future
                    pbar.update(1)

            logger.info(f"Finished processing batch of files from {batch_start + 1} to {batch_end}")

            await asyncio.sleep(10)  

        logger.info(f"Finished processing repository: {repo}")

if __name__ == "__main__":
    asyncio.run(main())
