In [1]:
import pymupdf
import re

# global function

In [17]:
def crop_header_footer_pdf_page(page):
    height, width = page.rect.height, page.rect.width
    header = min(0.06 * height, height)  # Ensure header is within bounds
    footer = min(0.065 * height, height)  # Ensure footer is within bounds

    # Ensure the crop box is within the media box
    crop_box = (0, header, width , height - footer)

    if crop_box[1] < crop_box[3] and crop_box[3] <= height:
        page.set_cropbox(crop_box)
    else:
        raise ValueError("Calculated crop box is outside the media box boundaries.")
    return page



def extract_bold_block(lines):
    """
    extract each bold block from the list of lines
    each block start from a line in blod font
    and ends with a line in bold font
    
    
    :param lines: list of lines, each line is a list of spans
    """
    bold_blocks = []
    bold_block = []

    for line in lines:
        if 'bold' in line['font'].lower():
            if bold_block:
                bold_blocks.append(bold_block.copy())
                bold_block = []
            
        bold_block.append(line.copy())

    bold_blocks.append(bold_block.copy())
    return bold_blocks



def extract_question_block(lines):
    """
    extract each question block from the list of lines
    each question start from a line with 'Question X' in blod font
    and ends with a line with 'Question X+1' in bold font
    
    X goes from 1 to 20
    The question number 20 ends with the last line of the last page
    
    
    :param lines: list of lines, each line is a list of spans
    """
    questions = []
    question = []
    question_number = 1

    for line in lines:
        if 'bold' in line['font'].lower():
            if line['text'].lower().startswith(f'question {question_number}'):

                if question:
                    questions.append(question.copy())
                    question = []
                question_number += 1
            
        question.append(line.copy())

    questions.append(question.copy())
    return questions


def build_line(spans):
    output = spans[0].copy()

    fonts = {
    }

    fonts[spans[0]['font']] = len(spans[0]['text'].replace(' ', ''))
    
    for span in spans[1::]:
        output['text'] += span['text']

        fonts[span['font']] = fonts.get(span['font'], 0) + len(span['text'].replace(' ', ''))

    output['font'] = max(fonts, key=fonts.get)
    if len(output['text'].replace(' ', '')) <= 5:
        output['font'] = 'arial'

    return output


def prepross_pdf(pdf, *, number_of_first_pages_to_skip=0, number_of_last_pages_to_skip=0):
    """
    extract each line from the pdf preprocess
    """
    lines = []

    if number_of_first_pages_to_skip < 0 or number_of_last_pages_to_skip < 0:
        raise ValueError("number_of_first_pages_to_skip and number_of_last_pages_to_skip should be positive integers")
    
    if number_of_first_pages_to_skip + number_of_last_pages_to_skip >= len(pdf):
        raise ValueError("number_of_first_pages_to_skip + number_of_last_pages_to_skip should be less than the number of pages in the pdf")
    
    pages = pdf[number_of_first_pages_to_skip:-number_of_last_pages_to_skip]
    if number_of_last_pages_to_skip == 0:
        pages = pdf[number_of_first_pages_to_skip:]
        
    
    for page in pages:
        page = crop_header_footer_pdf_page(page)
        
        for block in page.get_text('dict', flags=11)['blocks']:
            for line in block['lines']:
                lines.append(build_line(line['spans'].copy()))
    
    return lines

                
def make_block_unique(block):
    """
    make a block unique by removing the duplicate lines
    """
    unique_block = []
    for line in block:
        if line not in unique_block:
            unique_block.append(line)

    return unique_block

# extract answers

## func for answers

In [23]:
def extract_question_block_header(block, *, question_no, start_index = 1):
    """
        extract the correction note from a correction block
    """
    line_idx = start_index
    header = ""
    # while line does not start with 'Question_no.1' it's teacher's comment
    while line_idx < len(block) and not block[line_idx]['text'].lower().startswith(f'{question_no}.1'):
        header += block[line_idx]['text'] + ' '
        line_idx += 1

    return header, line_idx

    
def extract_sub_answer(block, *, question_no, start_index, verbose=False):
    """
        extract the sub question from a question block
    """ 
   
    unique_question = block 
    line_idx = start_index
    sub_question_no = 1
    sub_questions = []
    while line_idx < len(unique_question):
        answer = ""
        
        # ignore all empty lines
        while line_idx < len(unique_question) and unique_question[line_idx]['text'].strip() == '':
            line_idx += 1

        # extract everything until the next sub-question
        answer = unique_question[line_idx]['text']
        line_idx += 1
        while line_idx < len(unique_question) and not unique_question[line_idx]['text'].startswith(f'{question_no}.{sub_question_no + 1}'):
            answer += unique_question[line_idx]['text']
            line_idx += 1

        
        # ignore sub-question number (e.g. 1.1, 9.3, 20.2)
        code = ''
        if answer.startswith(f'{question_no}.{sub_question_no}'):
            code = answer[:len(f'{question_no}.{sub_question_no}')]
            answer = answer[len(f'{question_no}.{sub_question_no}')::]
        else:
            print(f'Error: {question_no}.{sub_question_no} not found in {answer}')
            print(answer)
        
        if verbose:
            print(f'\tcode : {code}')
        

        # while next work is not TRUE or FALSE, it's the question
        sub_question = ""
        word_idx = 0
        words = answer.split(' ')

        
        while word_idx < len(words) and not ('true' in words[word_idx].lower().strip()  or 'false' in  words[word_idx].lower().strip() or ((word_idx + 1) < len(words) and 
                                                                                                                                           ('see above' in ' '.join(words[word_idx:word_idx + 2]).lower().strip() or 'see below' in ' '.join(words[word_idx:word_idx + 2]).lower().strip()))):
            sub_question += words[word_idx] + ' '
            word_idx += 1

        if verbose:
            print(f'\tsub_question : {sub_question}')
        
        
        # extract true/false answer
        if verbose:
            print(f'{answer}')
        answer = words[word_idx]
        if "true" in answer.lower():
            answer = "true"
        elif "false" in answer.lower():
            answer = "false"
        elif 'see' in answer.lower():
            answer = "see above"
            word_idx += 1
        else:
            print(f'Error: {answer} is not TRUE or FALSE')
            
        if verbose:
            print(f'\tanswer : {answer}')
        
        
        # the rest is the explanation
        explanation = ' '.join(word for word in words[word_idx + 1::] if word != '')
        if verbose:
            print(f'\texplanation : {explanation}')
            print("\n")

        
        
        sub_questions.append((sub_question_no, sub_question, answer, explanation))
        sub_question_no += 1
    
    return sub_questions 


## main for answer

In [None]:


# pdf = pymupdf.open('../data/EQE/2015_PreEx_questions_en.pdf')
# pdf = pymupdf.open('../data/EQE/2014_PreEx_Questions.pdf')
# pdf = pymupdf.open('../data/EQE/2012_PreEx_answers.pdf')
# pdf = pymupdf.open('../data/EQE/2019_PreEx_answers_EN.pdf.pdf')
# pdf = pymupdf.open('../data/EQE/2022_PreEx_Answers.pdf')
# pdf = pymupdf.open('../data/EQE/2017_PreEx_answers.pdf')
# pdf = pymupdf.open('../data/EQE/2015_PreEx_answers.pdf')

import os
#list all pdf in current dir
folder_path = '../data/EQE/answwer/preprocess/'
pdf_files = [f for f in os.listdir(folder_path) if f.endswith('.pdf')]
print(pdf_files)
# pdf = pymupdf.open('../data/EQE/2014_PreEx_answers.pdf')




def extract_answer_data(pdf, *, verbose = False):
    lines = prepross_pdf(pdf, number_of_first_pages_to_skip=0, number_of_last_pages_to_skip=0)
    
    questions = extract_question_block(lines)
    if verbose:
        print("QUESTION BLOCKS FIND")
        print(len(questions))

    format_answer = []
    for question in questions[1::]:

        # removing duplicate
        unique_question = make_block_unique(question)
        
        
        question_line = unique_question[0]['text']
        if verbose:
            print(f'question : {question_line}')
        

        # make sure to only keep number
        regex_get_only_number = re.compile(r'\d+')
        question_no = regex_get_only_number.findall(question_line.split(' ')[1])
        if len(question_no) != 1:
            print(f'Error: {question_no} is not a valid question number')
            continue
        
        question_no = int(question_no[0])
        if verbose:
            print(f'question_no : {question_no}')


        note, line_idx = extract_question_block_header(unique_question, question_no = question_no, start_index = 1)
        if verbose:
            print(f'note : {note}')
        
        try:
            sub_questions = extract_sub_answer(unique_question, question_no = question_no, start_index = line_idx, verbose = verbose) 
        except Exception as e:
            print(f'Error: {e}')
            continue
        

        format_answer.append((question_no, note, sub_questions))
    
    if verbose:
        print(format_answer)
    return format_answer

# pdf = pymupdf.open(folder_path + pdf_files[8])  
# extract_answer_data(pdf, verbose = True)

for idx, file in enumerate(pdf_files):
    pdf = pymupdf.open(folder_path + file)
    try:
        print(file)
        for quest in extract_answer_data(pdf, verbose = False):
            print(f'Question : {quest[0]}')
            print(f'Note : {quest[1]}')
            for sub_quest in quest[2]:
                print(f'\t{quest[0]} : {sub_quest[0]}')
                print(f'\t\t{sub_quest[1]}')
                print(f'\t\t{sub_quest[2]}')
                print(f'\t\t{sub_quest[3]}')
                print()
                
        print("\n==========================\n")
    except Exception as e:
        print(idx, file)
        print(f'Error: {e}')
        
# extract_answer_data(pdf, verbose = True)

    

# extract Question

## func for questions

In [9]:
def find_questions_from_bold_blocks(blocks_bold):
    questions_blocks  = []
    q_id = 1
    index_first_question = None
    index_last_question = -1 
    for q_idx, block in enumerate(blocks_bold[::]):

        unique_question = make_block_unique(block)

        # get only questions blocks
        if f'question {q_id}' in unique_question[0]['text'].lower():
            if not index_first_question:
                index_first_question = q_idx 

            questions_blocks.append(unique_question)
            q_id += 1
            index_last_question = q_idx

    return questions_blocks, index_first_question, index_last_question
 

 

def extract_sub_question(block, *, question_no, start_index, verbose=False):
    """
        extract the sub question from a question block
    """ 
   
    unique_question = block 
    line_idx = start_index
    sub_question_no = 1
    sub_questions = []
    while line_idx < len(unique_question):
        question= ""
        
        # ignore all empty lines
        while line_idx < len(unique_question) and unique_question[line_idx]['text'].strip() == '':
            line_idx += 1

        # extract everything until the next sub-question
        question = unique_question[line_idx]['text']
        line_idx += 1
        while line_idx < len(unique_question) and not unique_question[line_idx]['text'].startswith(f'{question_no}.{sub_question_no + 1}'):
            question += unique_question[line_idx]['text']
            line_idx += 1

        
        # ignore sub-question number (e.g. 1.1, 9.3, 20.2)
        code = ''
        if question.startswith(f'{question_no}.{sub_question_no}'):
            code = question[:len(f'{question_no}.{sub_question_no}')]
            question = question[len(f'{question_no}.{sub_question_no}')::]
        else:
            print(f'Error: {question_no}.{sub_question_no} not found in {question}')
            print(question)
        
        if verbose:
            print(f'\tcode : {code}')
            print(f'\tquestion : {question}\n')
        
       
        
        sub_questions.append((sub_question_no, question))
        sub_question_no += 1
    
    return sub_questions 


In [89]:

pdf = pymupdf.open('../data/EQE/question/2012_PreEx_questions.pdf')
pdf = pymupdf.open('../data/EQE/question/2014_PreEx_Questions.pdf')
pdf = pymupdf.open('../data/EQE/question/2015_PreEx_questions_en.pdf')
pdf = pymupdf.open('../data/EQE/question/2017_PreEx_questions_EN.pdf')
pdf = pymupdf.open('../data/EQE/question/2022_PreEx_questions_EN.pdf')
pdf = pymupdf.open('../data/EQE/question/2019_PreEx_questions_EN.pdf')
pdf = pymupdf.open('../data/EQE/question/2021_PreEx_questions_EN.pdf')
pdf = pymupdf.open('../data/EQE/question/2016_PreEx_questions_EN.pdf')
# pdf = pymupdf.open('../data/EQE/2014_PreEx_answers.pdf')

def extract_question_data(pdf, *, verbose = False):
    lines = prepross_pdf(pdf)

    
    blocks_bold = extract_bold_block(lines)
    if verbose:
        print("bold BLOCKS FIND")
        print("\n\n\n")


    question_blocks, index_first_question, index_last_question = find_questions_from_bold_blocks(blocks_bold)

    
    
    if verbose:
        print('number of bold blocks')
        print(len(question_blocks))
        print(index_first_question)
        print(index_last_question)

    # ignore everything before first question and after the last question
    blocks_bold = blocks_bold[index_first_question : index_last_question + 1]

    if verbose:
        print('number of useful bold blocks')
        print(len(blocks_bold))
        print(blocks_bold[0][0]['text'])


    format_questions = []
    for unique_question in question_blocks:
        
        # first line is the question
        question_line = unique_question[0]['text']
        if verbose:
            print(f'question : {question_line}')
            
        # get question number and only number
        regex_get_only_number = re.compile(r'\d+')
        question_no = regex_get_only_number.findall(question_line.split(' ')[1])
        if len(question_no) != 1:
            print(f'Error: {question_no} is not a valid question number')
            continue
               
        question_no = int(question_no[0])
        if verbose:
            print(f'question_no : {question_no}')


        
        question_context, line_idx = extract_question_block_header(unique_question, question_no = question_no, start_index = 1)
        if verbose:
            print(f'question_context : {question_context}')


        sub_questions = extract_sub_question(unique_question, question_no = question_no, start_index = line_idx, verbose = verbose)    

        format_questions.append((question_no, question_context, sub_questions))
    
    if verbose:
        print(format_questions)
    # print(len(format_questions))
    return format_questions

extract_question_data(pdf, verbose = True)
# extract_question_data(pdf, verbose = False)
   




bold BLOCKS FIND




number of bold blocks
20
3
33
number of useful bold blocks
31
Question 1 
question : Question 1 
question_no : 1
question_context :   Opposition proceedings are pending against all claims of European patent EP-B. The  only ground of opposition is Article 100(c) EPC. Today, 29 February 2016, the EPO  receives third party observations against EP-B. The submissions of the third party are as  follows:   -  it is explained that prior art document D1 anticipates claim 1 of EP-B; and  -  it is reasoned that the invention in EP-B is not sufficiently disclosed.       For each of the statements 1.1 – 1.4, indicate on the answer sheet whether the  statement is true or false:    
	code : 1.1
	question :  Since third party observations can only be filed concerning issues arising under Articles 52 to 57 EPC, the opposition division will not consider the reasoning concerning lack of sufficiency of disclosure.  

	code : 1.2
	question :  The submissions of the third party will be 

[(1,
  '  Opposition proceedings are pending against all claims of European patent EP-B. The  only ground of opposition is Article 100(c) EPC. Today, 29 February 2016, the EPO  receives third party observations against EP-B. The submissions of the third party are as  follows:   -  it is explained that prior art document D1 anticipates claim 1 of EP-B; and  -  it is reasoned that the invention in EP-B is not sufficiently disclosed.       For each of the statements 1.1 – 1.4, indicate on the answer sheet whether the  statement is true or false:    ',
  [(1,
    ' Since third party observations can only be filed concerning issues arising under Articles 52 to 57 EPC, the opposition division will not consider the reasoning concerning lack of sufficiency of disclosure.  '),
   (2,
    ' The submissions of the third party will be forwarded by the EPO to the patent proprietor.  '),
   (3,
    ' The submissions of the third party will be accessible to the public through file inspection.  '),
  

# Extract couple

In [94]:
import os
import json
os.makedirs('output/EQE', exist_ok=True)

list_pdfs = [
    (
        "../data/EQE/answwer/preprocess/process_2012_PreEx_answers.pdf",
        "../data/EQE/question/2012_PreEx_questions.pdf"
    ),
    (
        "../data/EQE/answwer/preprocess/process_2013_PreEx_answers.pdf",
        "../data/EQE/question/2013_PreEx_questions.pdf"
    ),
    (
        "../data/EQE/answwer/preprocess/process_2014_PreEx_answers.pdf",
        "../data/EQE/question/2014_PreEx_Questions.pdf"
    ),
    (
        "../data/EQE/answwer/preprocess/process_2015_PreEx_answers.pdf",
        "../data/EQE/question/2015_PreEx_questions_en.pdf"
    ),
    (
        "../data/EQE/answwer/preprocess/process_2016_PreEx_answers.pdf",
        "../data/EQE/question/2016_PreEx_questions_EN.pdf"
    ),
    (
        "../data/EQE/answwer/preprocess/process_2017_PreEx_answers.pdf",
        "../data/EQE/question/2017_PreEx_questions_EN.pdf"
    ),
    (
        "../data/EQE/answwer/preprocess/process_2018_PreEx_answers.pdf",
        "../data/EQE/question/2018_PreEx_questions_EN.pdf"
    ),
    (
        "../data/EQE/answwer/preprocess/process_2019_PreEx_answers.pdf",
        "../data/EQE/question/2019_PreEx_questions_EN.pdf"
    ),
    # (
    #     "../data/EQE/answwer/preprocess/process_2019_PreEx_answers.pdf",
    #     "../data/EQE/question/202"
    # ),
    (
        "../data/EQE/answwer/preprocess/process_2021_PreEx_answers.pdf",
        "../data/EQE/question/2021_PreEx_questions_EN.pdf"
    ),
    (
        "../data/EQE/answwer/preprocess/process_2022_PreEx_answers.pdf",
        "../data/EQE/question/2022_PreEx_questions_EN.pdf"
    ),
]
# pdfs = (
#     "../data/EQE/answwer/preprocess/process_2022_PreEx_answers.pdf",
#     "../data/EQE/question/2022_PreEx_questions_EN.pdf"
# )

def extract_data(pdfs):
    ans_pdf = pymupdf.open( pdfs[0])
    que_pdf = pymupdf.open( pdfs[1])
    answer = extract_answer_data(ans_pdf)
    question = extract_question_data(que_pdf)

    output = {
        'exam_type': 'EQE',
        'year': os.path.basename(pdfs[0]).split('_')[1],
        'exam_name': 'PreEx',
        
        'exercices': []
    }

    for q in question:
        q_no = q[0]
        q_context = q[1]
        
        
        q_data = {
            'question_number': str(q_no).strip(),
            'question_type': 'True/False',
            'context': q_context.strip(),
            'questions': []
        }
        
        for s in q[2]:
            s_no = s[0]
            s_context = s[1]
            
            
            s_data = {
                'question_code': str(s_no).strip(),
                'question_text': s_context.strip(),
            }
            
            q_data['questions'].append(s_data)
        
        output['exercices'].append(q_data)

        
    # sort the questions by number
    output['exercices'] = sorted(output['exercices'], key=lambda x: int(x['question_number']))

    # dump the output to a json file
    with open(f"output/EQE/EQE_{output['year']}_{output['exam_name']}.json", 'w') as f:
        json.dump(output, f, indent=4)

    for idx, a in enumerate(answer):
        a_no = a[0]
        a_context = a[1]
        
        current_question = output['exercices'][a_no - 1]
        
        # check if first 10% of note match context
        begin = a_context[:int(len(a_context) * 0.1)].strip()  
        if begin != current_question['context'][:int(len(begin))]:
            current_question['examiner_note'] = a_context.strip()
            
        
        for s in a[2]:
            s_no = s[0]
            question = s[1]
            answer = s[2]
            explanation = s[3]
            
            try:
                current_sub = current_question['questions'][s_no - 1]
            except Exception as e:
                print(f'Error: {e}')
                print(a_no)
                print(output['exercices'])
                print(a)
                print("\n\n")
                break
                raise e
            
            assert current_sub['question_code'] == str(s_no).strip()

            current_sub['answer'] = answer.strip().capitalize()
            current_sub['examiner_note'] = explanation.strip()
            
            current_question['questions'][s_no - 1] = current_sub

        try:
            output['exercices'][a_no - 1] = current_question
        except Exception as e:
            print(f'Error: {e}')
            print(a_no)
            print(output['exercices'])
            print(a)
            print("\n\n")
            raise e
            
    # dump the output to a json file
    with open(f"output/EQE/EQE_{output['year']}_{output['exam_name']}_final.json", 'w') as f:
        json.dump(output, f, indent=4)
                
    
# bug on last quesion of 2016
# extract_data(list_pdfs[-1])

for pdfs in list_pdfs:

    try:
        extract_data(pdfs)
    except Exception as e:
        print(e)
        print(pdfs)

Error: list index out of range
20
[{'question_number': '1', 'question_type': 'True/False', 'context': 'Opposition proceedings are pending against all claims of European patent EP-B. The  only ground of opposition is Article 100(c) EPC. Today, 29 February 2016, the EPO  receives third party observations against EP-B. The submissions of the third party are as  follows:   -  it is explained that prior art document D1 anticipates claim 1 of EP-B; and  -  it is reasoned that the invention in EP-B is not sufficiently disclosed.       For each of the statements 1.1 – 1.4, indicate on the answer sheet whether the  statement is true or false:', 'questions': [{'question_code': '1', 'question_text': 'Since third party observations can only be filed concerning issues arising under Articles 52 to 57 EPC, the opposition division will not consider the reasoning concerning lack of sufficiency of disclosure.', 'answer': 'False', 'examiner_note': ''}, {'question_code': '2', 'question_text': 'The submiss

In [95]:
# for each final pdf, keep only 10 first exercice if they contain 4 questions

folder = 'output/EQE/'
final_files = [f for f in os.listdir(folder) if f.endswith('final.json')]

for file in final_files:
    with open(folder + file, 'r') as f:
        data = json.load(f)
        
        data['exercices'] = data['exercices'][:10]
        for idx, exercice in enumerate(data['exercices']):
            if len(exercice['questions']) != 4:
                #remove the exercice
                data['exercices'].remove(exercice)

        
    # filename without extension
    source_name = file.split('.')[0]

    with open(folder + source_name + '_documentLess.json', 'w') as f:
        json.dump(data, f, indent=4)