In [29]:
import ast
import os
import base64
import glob
import json
import requests
import re
import pdfplumber
import pandas as pd
import time
from collections import OrderedDict
from itertools import compress
from pathlib import Path
from PIL import Image
from pdf2image import convert_from_path
from dotenv import load_dotenv

load_dotenv(override=True)

True

In [2]:
# Helper Functions
def encode_image(image_path):
  with open(image_path, "rb") as image_file:
    return base64.b64encode(image_file.read()).decode('utf-8')

def extract_json_from_markdown(markdown_string):
    # Remove the triple backticks and 'json' from the input string
    json_string = markdown_string.strip().strip('```json').strip('```').strip()
    
    # Parse the cleaned JSON string into a Python dictionary
    data = json.loads(json_string)

    if isinstance(data, list):
        return data
    else: 
        return [data]

def extract_tuple(text):
    # Define the regex patterns to find the tuples
    pattern_ab = r"\('([a-z])', '([a-z])'\)"
    pattern_c = r"\('([a-z])',?\)"
    pattern_empty = r"\(\)"
    
    # Search for the tuples in the text
    match_ab = re.search(pattern_ab, text)
    match_c = re.search(pattern_c, text)
    match_empty = re.search(pattern_empty, text)
    
    # Check for the tuples and return the corresponding result
    if match_ab:
        return match_ab.groups()
    elif match_c and not match_ab:
        return (match_c.groups()[0],)
    elif match_empty:
        return ()
    else:
        return None

def merge_dict_on_common_keys(dicts):

    if len(dicts) == 1:
        comb_dict = dicts[0]
    else:
        dict_keys = list(dicts[0].keys())
        comb_dict = {}

        for key in dict_keys:
            vals = [record[key] for record in dicts] 
            dedup_vals = list(OrderedDict.fromkeys(vals))

            if len(dedup_vals) == 1:
                comb_dict[key] = dedup_vals[0]
            else: 
                comb_dict[key] = "".join(dedup_vals)
        
    return comb_dict

def dict_reorder(
        input_dict, 
        keys_order
        ):

    ordered_dict = OrderedDict()

    for _, key in enumerate(keys_order):
        ordered_dict[key] = input_dict[key]

    return ordered_dict

def json_to_markdown(json_list):
  """
  Converts a list of JSON objects to a markdown document.

  Args:
    json_list: A list of dictionaries representing JSON objects.

  Returns:
    A string containing the markdown document.
  """
  markdown_text = ""
  for item in json_list:
    markdown_text += f"{'-'*40}\n\n"
    for key, value in item.items():
      # Convert value to string if it's not already
      if not isinstance(value, str):
        value = str(value)
      # Escape special characters for markdown
      escaped_value = value.replace("#", "\\#")
      markdown_text += f"## {key}\n{escaped_value}\n\n"
  return markdown_text
  

In [49]:
# Prompts
extract_question_parts_prompt = """
You are given a page from an exam paper. 
Extract the question parts mentioned in the page. 
Every question number will have a lower case letter denoting the question part, they will be enclosed in brackets.
Read the page and return in the letters of the questions as a tuple.

If no question parts are detected, return an empty tuple. Return nothing but the tuple.
"""

extract_question_answer_prompt = """
You are given a page from an exam paper. Your role is to extract the following details from the page:
1. Printed Question Text
2. Handwritten Answer Text

Obey the following format
[{
"question_text": str,
"answer_text": str
},
{
"question_text": str,
"answer_text": str
}]

If there are multiple questions, report all of them in a list of jsons obeying the format specified.
If there is no information about the fields requested, return the value of those fields to be an empty string. 
Do not invent anything.
"""

extract_handwriting_prompt = """
You are given a page from an exam paper. Your role is to extract the following details from the page:
1. Handwritten Text

Obey the following format

{
"answer_text": str
}

If there is no information about the fields requested, return the value of those fields to be an empty string. 
Do not invent anything.
"""

mcq_prompt = """
You are given a page from an exam paper. Your role is to extract the following details from the page:
1. Printed Question Text
2. Answer Text

Obey the following format
[{
"question_text": str,
"answer_text": str
},
{
"question_text": str,
"answer_text": str
}]

Here are some guidelines that you should follow:
1. There are some multiple choice questions on this page, for these questions, extract only the question text and report only the letter for the selected options.
2. If there are multiple questions, report all of them in a list of jsons obeying the format specified.
3. If there is no information about the fields requested, return the value of those fields to be an empty string. 
4. Do not invent anything.
""".strip()

In [4]:
# Model Configuration
model_name = "gpt-4o"

# OpenAI API Key
api_key = os.getenv("OPENAI_API_KEY")
headers = {
  "Content-Type": "application/json",
  "Authorization": f"Bearer {api_key}"
}

In [73]:
subject_id = "edexcel_business_studies"
candidate_id = 5002

In [74]:
# Load PDF
pdfpath = Path(f"./data/{subject_id}/")

images = convert_from_path(pdfpath / f"{candidate_id}.pdf")

In [75]:
# Subset to pages that have questions
question_images = images[1:22]

In [76]:
# Save a subset of pages
image_savedir = Path(f"./saved_imgs/{subject_id}/{candidate_id}")
image_savedir.mkdir(parents=True, exist_ok=True)

[image.save(image_savedir / f"image_page{idx}.png", 'png') for idx, image in enumerate(question_images)]

[None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None]

In [77]:
# Load PDF
img_paths = list(glob.glob(str(image_savedir)+"/*.png"))
img_paths = sorted(img_paths)
img_paths

['saved_imgs/edexcel_business_studies/5002/image_page0.png',
 'saved_imgs/edexcel_business_studies/5002/image_page1.png',
 'saved_imgs/edexcel_business_studies/5002/image_page10.png',
 'saved_imgs/edexcel_business_studies/5002/image_page11.png',
 'saved_imgs/edexcel_business_studies/5002/image_page12.png',
 'saved_imgs/edexcel_business_studies/5002/image_page13.png',
 'saved_imgs/edexcel_business_studies/5002/image_page14.png',
 'saved_imgs/edexcel_business_studies/5002/image_page15.png',
 'saved_imgs/edexcel_business_studies/5002/image_page16.png',
 'saved_imgs/edexcel_business_studies/5002/image_page17.png',
 'saved_imgs/edexcel_business_studies/5002/image_page18.png',
 'saved_imgs/edexcel_business_studies/5002/image_page19.png',
 'saved_imgs/edexcel_business_studies/5002/image_page2.png',
 'saved_imgs/edexcel_business_studies/5002/image_page20.png',
 'saved_imgs/edexcel_business_studies/5002/image_page3.png',
 'saved_imgs/edexcel_business_studies/5002/image_page4.png',
 'saved_imgs/

In [78]:
page_num = [int( i.split(".")[0].split("/")[-1][10:]) for i in img_paths]
encoded_imgs = [encode_image(v) for v in img_paths]

print(page_num)
print(img_paths)

[0, 1, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 2, 20, 3, 4, 5, 6, 7, 8, 9]
['saved_imgs/edexcel_business_studies/5002/image_page0.png', 'saved_imgs/edexcel_business_studies/5002/image_page1.png', 'saved_imgs/edexcel_business_studies/5002/image_page10.png', 'saved_imgs/edexcel_business_studies/5002/image_page11.png', 'saved_imgs/edexcel_business_studies/5002/image_page12.png', 'saved_imgs/edexcel_business_studies/5002/image_page13.png', 'saved_imgs/edexcel_business_studies/5002/image_page14.png', 'saved_imgs/edexcel_business_studies/5002/image_page15.png', 'saved_imgs/edexcel_business_studies/5002/image_page16.png', 'saved_imgs/edexcel_business_studies/5002/image_page17.png', 'saved_imgs/edexcel_business_studies/5002/image_page18.png', 'saved_imgs/edexcel_business_studies/5002/image_page19.png', 'saved_imgs/edexcel_business_studies/5002/image_page2.png', 'saved_imgs/edexcel_business_studies/5002/image_page20.png', 'saved_imgs/edexcel_business_studies/5002/image_page3.png', 'saved_imgs/e

In [79]:
# Make a DataFrame
img_df = pd.DataFrame({"page_number": page_num, "encoded_image": encoded_imgs})
img_df = img_df.sort_values(by='page_number').reset_index(drop=True)
img_df

Unnamed: 0,page_number,encoded_image
0,0,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...
1,1,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...
2,2,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...
3,3,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...
4,4,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...
5,5,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...
6,6,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...
7,7,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...
8,8,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...
9,9,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...


In [80]:
# Add Question Parts
extracted_tuples = []

for idx, row in img_df.iterrows():

  print("Processing Page:\n", row["page_number"])

  payload = {
    "model": model_name,
    "messages": [
      {
        "role": "user",
        "content": [
          {
            "type": "text",
            "text": f"{extract_question_parts_prompt}"
          },
          {
            "type": "image_url",
            "image_url": {
              "url": f"data:image/jpeg;base64,{row['encoded_image']}"
            }
          }
        ]
      }
    ],
    "max_tokens": 800
  }

  response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
  response_content = response.json()["choices"][0]["message"]["content"]

  time.sleep(1)
  
  extracted_tuples.append(response_content)

Processing Page:
 0
Processing Page:
 1
Processing Page:
 2
Processing Page:
 3
Processing Page:
 4
Processing Page:
 5
Processing Page:
 6
Processing Page:
 7
Processing Page:
 8
Processing Page:
 9
Processing Page:
 10
Processing Page:
 11
Processing Page:
 12
Processing Page:
 13
Processing Page:
 14
Processing Page:
 15
Processing Page:
 16
Processing Page:
 17
Processing Page:
 18
Processing Page:
 19
Processing Page:
 20


In [81]:
# Check if the outputs are all tuples
print(extracted_tuples)

["('a', 'b')", "('c', 'd')", "('a', 'b', 'c')", "('d', 'e')", "('a', 'b')", "('c', 'd')", "('e',)", "('a',)", "('b',)", "('a',)", "('b',)", "('c',)", "('a', 'b')", "('c',)", '()', "('a', 'b')", "('c',)", "('d',)", '()', "('e',)", '()']


In [82]:
# Augment DataFrame
img_df["question_parts"] = [ast.literal_eval(i) for i in extracted_tuples]
img_df['contains_a'] = img_df['question_parts'].apply(lambda x: 'a' in x)
img_df['number_of_question_parts'] = img_df['question_parts'].apply(lambda x: len(x))

img_df

Unnamed: 0,page_number,encoded_image,question_parts,contains_a,number_of_question_parts
0,0,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...,"(a, b)",True,2
1,1,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...,"(c, d)",False,2
2,2,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...,"(a, b, c)",True,3
3,3,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...,"(d, e)",False,2
4,4,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...,"(a, b)",True,2
5,5,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...,"(c, d)",False,2
6,6,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...,"(e,)",False,1
7,7,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...,"(a,)",True,1
8,8,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...,"(b,)",False,1
9,9,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...,"(a,)",True,1


In [83]:
# Add a Question Number - presumes all pages are in correct order
counter = 0 # starting question - 1
q_num = []
for i in list(img_df.contains_a):
    counter += i
    q_num.append(counter)

img_df["question_number"] = q_num

img_df

Unnamed: 0,page_number,encoded_image,question_parts,contains_a,number_of_question_parts,question_number
0,0,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...,"(a, b)",True,2,1
1,1,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...,"(c, d)",False,2,1
2,2,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...,"(a, b, c)",True,3,2
3,3,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...,"(d, e)",False,2,2
4,4,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...,"(a, b)",True,2,3
5,5,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...,"(c, d)",False,2,3
6,6,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...,"(e,)",False,1,3
7,7,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...,"(a,)",True,1,4
8,8,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...,"(b,)",False,1,4
9,9,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...,"(a,)",True,1,5


In [84]:
# Extract Question and Answer Text from page
extracted_questions = []
for idx, row in img_df.iterrows():

  print("Processing Page Number:\n", row['page_number'])
  
  if row['number_of_question_parts'] == 0:
    prompt = extract_handwriting_prompt
  else:
    prompt = mcq_prompt
  
  payload = {
    "model": model_name,
    "messages": [
      {
        "role": "user",
        "content": [
          {
            "type": "text",
            "text": f"{prompt}"
          },
          {
            "type": "image_url",
            "image_url": {
              "url": f"data:image/jpeg;base64,{row['encoded_image']}",
              "detail": "auto"
            }
          }
        ]
      }
    ],
    "max_tokens": 1000
    }

  response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
  response_content = response.json()["choices"][0]["message"]["content"]

  time.sleep(1)
  
  extracted_questions.append(response_content)

Processing Page Number:
 0
Processing Page Number:
 1
Processing Page Number:
 2
Processing Page Number:
 3
Processing Page Number:
 4
Processing Page Number:
 5
Processing Page Number:
 6
Processing Page Number:
 7
Processing Page Number:
 8
Processing Page Number:
 9
Processing Page Number:
 10
Processing Page Number:
 11
Processing Page Number:
 12
Processing Page Number:
 13
Processing Page Number:
 14
Processing Page Number:
 15
Processing Page Number:
 16
Processing Page Number:
 17
Processing Page Number:
 18
Processing Page Number:
 19
Processing Page Number:
 20


In [85]:
img_df["extracted_text"] = [extract_json_from_markdown(i) for i in extracted_questions]
img_df

Unnamed: 0,page_number,encoded_image,question_parts,contains_a,number_of_question_parts,question_number,extracted_text
0,0,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...,"(a, b)",True,2,1,[{'question_text': 'Which one of the following...
1,1,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...,"(c, d)",False,2,1,[{'question_text': 'Explain one advantage of s...
2,2,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...,"(a, b, c)",True,3,2,[{'question_text': 'Which two of the following...
3,3,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...,"(d, e)",False,2,2,[{'question_text': 'Explain one way a small bu...
4,4,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...,"(a, b)",True,2,3,[{'question_text': 'Which one of the following...
5,5,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...,"(c, d)",False,2,3,[{'question_text': 'Explain one disadvantage t...
6,6,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...,"(e,)",False,1,3,[{'question_text': 'Discuss the impact on a sm...
7,7,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...,"(a,)",True,1,4,[{'question_text': 'Outline one way that finan...
8,8,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...,"(b,)",False,1,4,[{'question_text': 'Analyse the impact of non-...
9,9,iVBORw0KGgoAAAANSUhEUgAAEfMAABlfCAIAAAAvdKTrAA...,"(a,)",True,1,5,[{'question_text': 'Using the information in T...


In [86]:
# Collect Questions and Answers
all_questions_lst = []
for idx, q_num in enumerate(list(set(img_df.question_number))):

    # Get question parts for a question number
    l = img_df[img_df.question_number == q_num].question_parts.to_list()
    m = [list(i) for i in l]

    for j in m:
        if len(j) == 0:
            j.append('')

    question_parts_collapsed = [item for sublist in m for item in sublist]
    question_parts_collapsed

    augmented_question_parts = []
    for idx, question_part in enumerate(question_parts_collapsed):
        if question_part == '':
            augmented_question_parts.append(question_parts_collapsed[idx - 1])
        else:
            augmented_question_parts.append(question_part)

    # Combine question number and question parts
    extracted_questions_by_part = img_df[img_df.question_number == q_num].extracted_text.sum()

    for idx, question_part in enumerate(augmented_question_parts):
        extracted_questions_by_part[idx]["question_part"] = question_part
        extracted_questions_by_part[idx]["question_number"] = q_num
        if not extracted_questions_by_part[idx].get("question_text"):
            extracted_questions_by_part[idx]["question_text"] = ""
    
    # Align Questions and Answers
    unique_qp_qn = list(OrderedDict.fromkeys([(i['question_number'], i["question_part"]) for i in extracted_questions_by_part]))

    for idx, record in enumerate(unique_qp_qn):
        qp = record[1]
        qn = record[0]

        question_set = list(compress(extracted_questions_by_part, [(i['question_number'] == qn and i['question_part'] == qp) for i in extracted_questions_by_part]))
        all_questions_lst.append(merge_dict_on_common_keys(question_set))
    

In [87]:
all_questions_ordered_lst = [dict_reorder(i, keys_order=["question_number", "question_part", "question_text", "answer_text"]) for i in all_questions_lst]
all_questions_ordered_lst

[{'question_text': 'Which one of the following could market mapping be used for?',
  'answer_text': 'C',
  'question_part': 'a',
  'question_number': 1},
 {'question_text': 'Which one of the following is an impact of a decrease in interest rates?',
  'answer_text': 'B',
  'question_part': 'b',
  'question_number': 1},
 {'question_text': 'Explain one advantage of starting a business as a private limited company.',
  'answer_text': 'One advantage of starting a business as a private limited company is that the business will have money and will not be able to struggle and may already have customers.',
  'question_part': 'c',
  'question_number': 1},
 {'question_text': 'Explain one way the internet may affect where a small business chooses to locate.',
  'answer_text': 'The internet may rate it based on its location. If it’s in a bad area it will be rated badly. If it’s in a good area it will be rated good. The internet would also help a business to choose location depending on an area wher

In [88]:
json_object = json.dumps(all_questions_lst, indent=4)

json_savedir = Path(f"./extracted_questions_answers/{subject_id}")
json_savedir.mkdir(parents=True, exist_ok=True)

# Writing to sample.json
with open(json_savedir / f"{candidate_id}_extracted_questions.json", "w") as outfile:
    outfile.write(json_object)

In [None]:
# Convert to markdown
markdown_doc = json_to_markdown(all_questions_ordered_lst)

# Print or write the markdown document to a file
print(markdown_doc)

# write it to a file:
with open(json_savedir / f"{candidate_id}_extracted_questions.md", "w") as f:
  f.write(markdown_doc)