In [None]:
!pip install PyPDF2
!pip install pymupdf

##  HyperParameters

In [None]:
##  HyperParameters

# The name of index
index_name = 'dishwasher'

# The name of embbeding model endpoint
eb_endpoint = ''

# Ebbeding vector dimension, usually you can keep it as default
v_dimension = 1024

# Docs file folder to be processed and ingested
folder_path = '../docs/dishwasher/'

# The imported data of the same index_name, usually you can keep it as 0 if you are creating a new index
before_import = 0

# # The number of pages for one chunk
# # It should note that the token of one chunk should smaller than the maximum input token of embedding model
# num_chunk_for_one_paragraph = 7

# The resolution of PDF
resolution = 1.0

# The number of overlap pages between chunk
overlap_pages = 1

## Function Defination

In [None]:
import fitz
import base64
from PIL import Image
import io
import os

import numpy as np
import matplotlib.pyplot as plt

def pdf_to_base64_images(pdf_file_path, img_dir='test_img', show_imgs=False):
    # 打开PDF文件
    pdf_file = fitz.open(pdf_file_path)

    # 创建一个列表来存储每页的base64编码
    base64_images = []

    # 循环遍历每一页
    for page_index in range(len(pdf_file)):
        # 选择一个页面
        page = pdf_file[page_index]
        # 获取页面的尺寸
        image_width, image_height = page.rect.width, page.rect.height
        # 将页面渲染为一个PIL图像
        mat = fitz.Matrix(resolution, resolution)
        pix = page.get_pixmap(matrix=mat)
        img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
        
        if show_imgs:
        # 将 PIL Image 对象转换为 NumPy 数组
            img_np = np.asarray(img)

            # 使用 matplotlib 显示图像
            plt.imshow(img_np)
            plt.axis('off')
            plt.show()

        # 将PIL图像转换为字节流
        img_byte_arr = io.BytesIO()
        img.save(img_byte_arr, format='PNG')
        img_byte_arr = img_byte_arr.getvalue()

        # 将字节流编码为base64
        base64_image = base64.b64encode(img_byte_arr).decode('utf-8')
        base64_images.append(base64_image)
        
        #  # 将图像保存为文件
        # img_file_path = os.path.join(img_dir, f'page_{page_index}.png')
        # img.save(img_file_path, 'PNG')

    return base64_images

# 使用示例
# pdf_file_path = "docs/quectel/kb-faq/Wi-Fi_BT模块支持情况.pdf"
# xx = pdf_to_base64_images(pdf_file_path)

In [None]:
import logging
import boto3

from tqdm import tqdm
import os
import json

from botocore.exceptions import ClientError

import pandas as pd


logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)


from botocore.exceptions import ClientError
import time

import sagemaker
from sagemaker.huggingface import HuggingFaceModel

hfp = sagemaker.huggingface.model.HuggingFacePredictor(eb_endpoint)

bedrock_runtime = boto3.client(service_name='bedrock-runtime')
MODEL_ID = 'anthropic.claude-3-sonnet-20240229-v1:0'
# MODEL_ID = 'anthropic.claude-3-haiku-20240307-v1:0'

system_prompt = """
You are a document organizer of bicycle company and your task is to extract useful information from images.Please refer to the format of the content of the previous page to extract text information from the image on the current page.
If the content on this page contains tables or maintenance process, please organize the tables or maintenance process into json format. 
If the content of this page is a table and has no header, use the header of the previous page. 
If the content of this page is a maintenance process and does not specify the specific maintenance object, use the maintenance object on the previous page.

<previous page content>
{context}
</previous page content>
No preface, just output the content directly.
"""



def run_multi_modal_prompt(bedrock_runtime, model_id, messages, max_tokens, system_prompt=None):
    """
    Invokes a model with a multimodal prompt.
    Args:
        bedrock_runtime: The Amazon Bedrock boto3 client.
        model_id (str): The model ID to use.
        messages (JSON) : The messages to send to the model.
        max_tokens (int) : The maximum  number of tokens to generate.
    Returns:
        None.
    """
    
    sp = 'You are AI assistant'
    if system_prompt is not None:
        sp = system_prompt


    body = json.dumps(
        {
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": max_tokens,
            "system": sp,
            "messages": messages,
            "temperature": 0.01,
            "stop_sequences": ["</output>"]
        }
    )

    t0 = time.time()
    # print('8*******************',t0)
    response = bedrock_runtime.invoke_model(body=body, modelId=model_id)
    # print(response)
    response_body = json.loads(response.get('body').read())
    
    t1 = time.time()
    print("Invoke Cost: ",t1-t0)

    return response_body



def Sonnet(input_text, input_image_paths=None, input_images=None, max_tokens=4000, system_prompt='你是一个图片阅读助手，请尽可能用中文详细得描述图片中的内容', model_id=MODEL_ID):
    """
    input_text: 输入的prompt
    input_image_paths & input_images: 图像的输入为list，输入为一组图像地址input_image_paths或者base64编码后的图像input_images，优先input_image_paths
    """

    # try:
    content_images = []
    if input_image_paths is not None:
        # content_images = []

        if Path(input_image_paths).is_file():
            with open(input_image_paths, "rb") as image_file:
                content_images.append(base64.b64encode(image_file.read()).decode('utf8'))
        elif Path(input_image_paths).is_dir():
            for input_image_path in input_image_paths:
                with open(input_image_path, "rb") as image_file:
                    content_images.append(base64.b64encode(image_file.read()).decode('utf8'))
    elif input_images is not None:
        content_images = input_images

    content = [
        {
            "type": "image",
            "source":
            {
                "type": "base64",
                "media_type": "image/jpeg", 
                "data": content_image
            }
        }

        for content_image in content_images
    ]

    content.append({"type": "text", "text": input_text})

    # print(content)
    message = {"role": "user",
               "content": content}

    messages = [message]
    # print(messages)

    response = run_multi_modal_prompt(
        bedrock_runtime, model_id, messages, max_tokens, system_prompt)
    # print(response, type(response))
    # print(json.dumps(response, indent=4))
    return response['content'][0]['text'].replace('<output>','')

def get_title(path):
    title = os.path.split(os.path.splitext(path)[0])[1]
    return title


def read_doc(path):
    pages = pdf_to_base64_images(path)
    sentence = []
    sentence_r = []
    imgbase64 = []
    tmp_md = ''
    for i in tqdm(range(len(pages))):
        # try:
        tmp_pages = pages[i]
        tmp_md = Sonnet(input_text='<previous page content>'+tmp_md+'</previous page content>', 
                        input_image_paths=None, 
                        input_images=[tmp_pages], 
                        max_tokens=4096, 
                        system_prompt=system_prompt, 
                        model_id=MODEL_ID)
        print(tmp_md)
        sentence.append(tmp_md)
        imgbase64.append(tmp_pages)
        sentence_r.append(tmp_md)
        
    df = pd.DataFrame({
        'title':[get_title(path) for i in range(len(sentence))],
        'paragraph': sentence,
        'sentence': sentence,
        'image_base64': imgbase64
    })
    return df, sentence_r

def get_vector(input_text):
    try:
        return hfp.predict({'inputs':[input_text]})[0]
    except:
        print("embedding failed")
        return False


def embbeding(df):
    df['sentence_vector'] = ''
    title_vector = str(get_vector(df.iloc[0, 0]))
    for i in range(len(df)):
        df.iloc[i, 4] = str(get_vector(df.iloc[i, 2]))
        print('\r embbeding %i out of %i finished'%(i, len(df)), end='')
    return df

## Main

In [None]:
# Create index

import requests

# ==============OpenSearch Related=====================
# retrieve secret manager value by key using boto3
sm_client = boto3.client('secretsmanager')
master_user = sm_client.get_secret_value(SecretId='opensearch-host-url')['SecretString']
data= json.loads(master_user)
es_host_name = data.get('host')
host = es_host_name+'/' if es_host_name[-1] != '/' else es_host_name# cluster endpoint, for example: my-test-domain.us-east-1.es.amazonaws.com/
region = boto3.Session().region_name # e.g. cn-north-1
# sm_client = boto3.client('secretsmanager')
master_user = sm_client.get_secret_value(SecretId='opensearch-master-user')['SecretString']
data= json.loads(master_user)
username = data.get('username')
password = data.get('password')
# service = 'es'
# credentials = boto3.Session().get_credentials()
awsauth = (username, password)
url = host+'_bulk'
headers = { "Content-Type": "application/json" }

payloads = {
"settings": { "index": {
"knn": True,
"knn.algo_param.ef_search": 100 }
}, "mappings": {
"properties": { 
"sentence_vector": {
"type": "knn_vector", "dimension": v_dimension, "method": {
"name": "hnsw", "space_type": "l2", "engine": "nmslib", "parameters": {
"ef_construction": 256,
"m": 128 }
} },
"sentence": {"type": "text" }, 
"paragraph": {"type": "text" }, 
"image_base64": {"type": "text"},
"metadata": {
    "properties":{
"page": {"type": "long"},
"source": {"type": "text"},
}}
} }
}

# Create Index
r = requests.put(host+index_name, auth=awsauth, headers=headers, json=payloads)
print(r.text)

def import_data(df, id_start=0, before_import=0):
    payloads = ''
    for i in range(id_start, len(df)+id_start):
        first = json.dumps({ "index": { "_index": index_name, "_id": str(i+before_import) } }, ensure_ascii=False) + "\n"
        second = json.dumps({"metadata": {"source": str(df.iloc[i-id_start, 0]),
                                          "page": i,
                                         },
                             "image_base64": str(df.iloc[i-id_start, 3]),
                             "paragraph": str(df.iloc[i-id_start, 1]),
                             "sentence": str(df.iloc[i-id_start, 2]),
                             "sentence_vector": json.loads(df.iloc[i-id_start, 4])},
                            ensure_ascii=False) + "\n"
        payloads += first + second
    # print(payloads)
    r = requests.post(url, auth=awsauth, headers=headers, data=payloads.encode()) # requests.get, post, and delete have similar syntax
    # print(r.text)



In [None]:
#==============Main Preprocess Data and Import===============

slice = 1
df_all = []
names = os.listdir(folder_path)
print(folder_path)
# before_import = 0
failed_files = []
for j in range(len(names)):
    name = names[j]
    print(name)
    if name[0] == '.':
        continue
#     if os.path.splitext(name)[1] not in ['.doc','.docx']:continue
    try:
        df, _ = read_doc(os.path.join(folder_path, name))
        df = embbeding(df)
        df_all.append(df)
        for i in range(len(df)//slice+1):
            import_data(df[slice*i:slice*(i+1)], slice*i, before_import)
            print('\r import %i out of %i finished'%(i, len(df)//slice+1), end='')
        before_import += len(df)
        print(' file %i out of %i finished'%(j+1, len(names)))
    except Exception as ex:
#         traceback.print_exc(file=sys.stdout)
        failed_files.append(name)
        print(f"=================Exception================={ex}")
print(before_import)