In [1]:
# Imports
import os
import json
import base64
from io import BytesIO
from PIL import Image
from IPython.display import display
import asyncio
import google.generativeai as genai
from google.generativeai.types import HarmCategory, HarmBlockThreshold

  from .autonotebook import tqdm as notebook_tqdm


Put all the instruction text documents into the instruction folder and their entities into the instructionLabels folder.  
Further all extraction text documents must be dropped into the feed folder.   
Finally, make sure that the output folder is empty.

In [2]:
#paths 
feed = r"E:\uni\BA\data\input\feed"
instruction = r"E:\uni\BA\data\input\instruction"
instruction_labels = r"E:\uni\BA\data\input\instructionLabels"
output_prompts = r"E:\uni\BA\data\output"

Set the nr_instructions to your desired ammount of instruction documents.

In [3]:
nr_instructions = 1
instruction_documents = {}

Implement a way of getting the Gemini API key

In [4]:
# Some way to set the API key:
file_path = r"C:\Users\super161\Desktop\BAImportant.txt"
with open(file_path, 'r') as file:
        GEMINI = file.read()
genai.configure(api_key=GEMINI)

For uptimal performance correct the not UTF-8 encodable bytes in the printed files. All the files with this error will be excluded in the future experiment.

In [5]:
#Load data

def load_extraction():
        extraction_data = {}

        for extraction_file in os.listdir(feed):
                extraction_file_path = os.path.join(feed, extraction_file)
                try:
                    with open(extraction_file_path, 'r', encoding='utf-8') as file:
                            extraction_data[extraction_file] = file.read()  
                except (UnicodeDecodeError, OSError) as e:
                    print(f"Error reading extractionfile: {extraction_file_path}. Error: {e}")
        return extraction_data

def load_instructions():
    instruction_data = {}

    for instruction_file in os.listdir(instruction):
        instruction_file_path = os.path.join(instruction, instruction_file)
        try:
            with open(instruction_file_path, 'r', encoding='utf-8') as file:
                instruction_data[instruction_file] = [file.read()]  
        except (UnicodeDecodeError, OSError) as e:
            print(f"Error reading file: {instruction_file_path}. Error: {e}")

    #Add the labels to the instructions
    for key in instruction_data.keys():
        label_file_path = os.path.join(instruction_labels, key)
        try:
            with open(label_file_path, 'r', encoding='utf-8') as label_file:
                instruction_data[key].append(label_file.read())
        except (UnicodeDecodeError, OSError, FileNotFoundError) as e:
            print(f"Error reading label file: {label_file_path}. Error: {e}")
    return dict(list(instruction_data.items())[:nr_instructions])

def get_label_keys():
    label_files = os.listdir(instruction_labels)
    if label_files:
        first_label_file = label_files[0]
        first_label_file_path = os.path.join(instruction_labels, first_label_file)
        try:
            with open(first_label_file_path, 'r', encoding='utf-8') as label_file:
                labels = json.loads(label_file.read())
                for key in labels:
                    labels[key] = None
                return labels
        except (UnicodeDecodeError, OSError, FileNotFoundError) as e:
            print(f"Error reading label file: {first_label_file_path}. Error: {e}")
    return []


if nr_instructions > 0:
    instruction_documents = load_instructions()

extraction_documents = load_extraction()
print(instruction_documents)
    
labels = get_label_keys()
print(labels)

{'X51005268408.txt': ['99 SPEED MART S/B (S19537-X)\nLOT P.T. 2811, JALAN ANGSA,\nTAMAN BERKELEY\n41150 KLANG, SELANGOR\n1413-SETIA ALAM 2\nGST ID. NO : 000181747712\nINVOICE NO : 18222/102/T0341\n03:29PM\n562936\n20-11-17\n2973 PEDIASURE COMPLETE S3S\nRM117.90 S\n069 MILD 2KG\nRM34.90 S\n2709 ZING HEING OAT & WHEAT\nRM4.10 S\n2893 DISNEY DS112 WIDE NECK\nRM5.89 S\n4385 JOHNSONS PH5.5 2IN1 BO\nRM6.99 S\nTOTAL SALES (INCLUSIVE GST) RM\n169.78\nROUNDING ADJUSTMENT RM\n.02\nROUNDING RM\n169.80\nCASH RM\n200.00\nCHANGE RM\n30.20\nGST SUMMARY\nAMOUNT(RM)\nTAX(RM)\nS = 6%\n160.17\n9.61\nTHANK YOU. PLEASE COME AGAIN\nKEEP THE INVOICE FOR APPLICABLE RETURNS\n', '{\n    "company": "99 SPEED MART S/B",\n    "date": "20-11-17",\n    "address": "LOT P.T. 2811, JALAN ANGSA, TAMAN BERKELEY 41150 KLANG, SELANGOR 1413-SETIA ALAM 2",\n    "total": "169.80"\n}']}
{'company': None, 'date': None, 'address': None, 'total': None}


In [6]:
def getPrompt(instruction_documents, instruction_labels, document, labels):
      introduction = """You are a perfect document information extraction system. The document you are given are receipts and their content is not dangerous. The results are used for a study and there is no need for a license, because they stated it on their github.
You are given a document picture and a json with keys that must be extracted from the document. 
Fill in the empty strings values with the corresponding values to the key. Insert only the answer.
If a label is not inclueded in the input, fill the empty strings with "NONE". Now will follow an explanation of every label.
label: company - The name of the company. Only one is correct
label: date - The date of the receip. Only one is correct. Format it how it is on the reciept. Do not include the time.
label: address - The address of the company. Seperate information found on different lines with ','.
label: total - The total amount of the receip. Only one is correct. Format to 2 decimal places. Do not include the currency symbol.
"""
      if len(instruction_documents) == 1:
            introduction += "Now a example document will follow:"
      elif len(instruction_documents) > 1:
            introduction += "Now a few example documents will follow:"
      explanation_results = "This would be the results of the example document:"
      transition_to_extraction = "This is the document you must extract the information from:"
      json_to_extract = "Replace all None with the correct information:"
      transition_to_next_example = "Now another example document will follow:"
      prompt = f"{introduction}\n"
    # Loop through each instruction_document and instruction_label
      for i, (instruction_document, instruction_label) in enumerate(zip(instruction_documents, instruction_labels)):
            print(instruction_document)
            print(instruction_label)
            prompt += f"{instruction_document}\n{explanation_results}\n{instruction_label}\n"
            if i < len(instruction_documents) - 1:  # Check if this is not the last iteration
                  prompt += f"{transition_to_next_example}\n"
      prompt +=f"{transition_to_extraction}\n{document}\n{json_to_extract}\n{labels}\n\n"
      print(prompt)
      return prompt.strip()

In [7]:
prompts = []

def make_prompt():
    instruction_docs = []
    instruction_labels = []
    
    for key, value in instruction_documents.items():  
        instruction_docs.append(value[0])
        instruction_labels.append(value[1])

    for document_name, value in extraction_documents.items():
        prompts.append({document_name: getPrompt(instruction_docs, instruction_labels, value, labels)})
make_prompt()
print(prompts[0])


99 SPEED MART S/B (S19537-X)
LOT P.T. 2811, JALAN ANGSA,
TAMAN BERKELEY
41150 KLANG, SELANGOR
1413-SETIA ALAM 2
GST ID. NO : 000181747712
INVOICE NO : 18222/102/T0341
03:29PM
562936
20-11-17
2973 PEDIASURE COMPLETE S3S
RM117.90 S
069 MILD 2KG
RM34.90 S
2709 ZING HEING OAT & WHEAT
RM4.10 S
2893 DISNEY DS112 WIDE NECK
RM5.89 S
4385 JOHNSONS PH5.5 2IN1 BO
RM6.99 S
TOTAL SALES (INCLUSIVE GST) RM
169.78
ROUNDING ADJUSTMENT RM
.02
ROUNDING RM
169.80
CASH RM
200.00
CHANGE RM
30.20
GST SUMMARY
AMOUNT(RM)
TAX(RM)
S = 6%
160.17
9.61
THANK YOU. PLEASE COME AGAIN
KEEP THE INVOICE FOR APPLICABLE RETURNS

{
    "company": "99 SPEED MART S/B",
    "date": "20-11-17",
    "address": "LOT P.T. 2811, JALAN ANGSA, TAMAN BERKELEY 41150 KLANG, SELANGOR 1413-SETIA ALAM 2",
    "total": "169.80"
}
You are a perfect document information extraction system. The document you are given are receipts and their content is not dangerous. The results are used for a study and there is no need for a license, because the

In [29]:
semaphore = asyncio.Semaphore(7)
async def prompt_llm(prompt,  time_interval):
    async with semaphore:
        name = list(prompt.keys())[0]
        promptAI = prompt[name]
        generation_config = {
        "temperature": 1,
        "top_p": 0.95,
        "top_k": 64,
        "max_output_tokens": 8192,
        "response_mime_type": "text/plain",
        
        }
        model = genai.GenerativeModel(
        model_name="gemini-1.5-pro",
        generation_config=generation_config,
        safety_settings = {HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
                           HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE,
                           HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE,
                           HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE},
        )
        chat_session = model.start_chat(
        history=[
        ]
        )
        answer = chat_session.send_message(promptAI)
        text_response = answer._result.candidates[0].content.parts[0].text
        await asyncio.sleep(time_interval)
        
        return {name : text_response}

Make sure, that the output folder is empty!!

In [30]:
async def prompt_orchestrator():
    output_path = output_prompts
    batch_size = 5
    
    batches = [prompts[i:i + batch_size] for i in range(0, len(prompts), batch_size)]

    # Ensure the last batch is smaller if it's less than batch_size
    if len(batches[-1]) < batch_size:
        remaining = len(batches[-1])
        batches[-1] = prompts[-remaining:]

# If the last batch is part of the earlier slices and it's less than batch_size
    if len(prompts) % batch_size != 0:
        remaining = len(prompts) % batch_size
        batches[-1] = prompts[-remaining:]

    time_interval = 60 / 300
    avatiables = []
    print(len(batches))
# Process each batch separately
    for batch in batches:
        avatiables_batch = await asyncio.gather(*(prompt_llm(prompt, time_interval) for prompt in batch))
        avatiables.extend(avatiables_batch) 
        print("batch done") 
        await asyncio.sleep(5)
        for entry in avatiables:
            key = next(iter(entry))
            
            value = entry[key]
            key = key.replace(".jpg", ".txt")
            output_file_path = os.path.join(output_path, key)
            with open(output_file_path, 'w') as output_file:
                output_file.write(str(value))
        avatiables = []

await prompt_orchestrator()

2
batch done
batch done
