# Selectividad Exams Parser

## Libraries import

### Required packages for OCR processing:
**Ubuntu based:**
- tesseract-ocr
- tesseract-ocr-spa
- poppler-utils

**Archlinux based:**
- tesseract
- tesseract-data-spa

In [1]:
# !pip install pdf2image
# !pip install pytesseract

In [2]:
from pdf2image import convert_from_path
import pytesseract
import pandas as pd
from pathlib import Path
from multiprocessing import Pool
import os
import requests

## Functions

### Telegram bot

Function to generate messages to a telegram bot

In [3]:
# Replace with your bot's token and chat ID
from bot_credentials import *

def send_telegram_message(message):
    url = f"https://api.telegram.org/bot{BOT_TOKEN}/sendMessage"
    params = {"chat_id": CHAT_ID, "text": message}
    response = requests.get(url, params=params)
    return response.json()

### *pdf_to_text*: parse pdf pages into images

Extract the text from pdf files, page by page

In [4]:
def pdf_to_text(file_path):
    images = convert_from_path(file_path)[1:]
    # parse each page
    pages = []
    for i, page in enumerate(images):
        pages.append(pytesseract.image_to_string(page, lang='spa'))
    return pages

### *extract_statement*: extract statement text from page text

Extract the statement from a page in text format

In [5]:
def extract_statement(page):
    # split the content into lines
    text = page.split('\n')
    # list to store the processed lines
    final_text = []
    # start from the 4th line
    for line in text[4:]:
        # stop at RESOLUCION
        if line.startswith('RESOLUCIÓN') or line.startswith('SOCIALES'):
            final_text.append(line.strip())
            break
        # add the line to final_text
        else:
            if not line == '':
                final_text.append(line.strip())
    # join the list into a single string
    final_text = '\n'.join(final_text)

    return final_text

### *extract_exam_details*: extract exam datails from statement

Extract the exam details, like subject, year, exam and exercise number.

In [6]:
def extract_exam_details(statement):
    # get last line with exam details
    lines = [i for i in statement.split('\n') if i != '']
    exam_details = lines[-1].lower()
   
    # SOCIALES Il. 2017 JUNIO. EJERCICIO 2. OPCIÓN A
    if exam_details.startswith('sociales'):
        exam_details = exam_details.replace('sociales il', 'MATES_CCSS')
    # split the words by ' '
    exam_details = exam_details.split(' ')
    # strip whitespaces
    exam_details = [i.replace('.', '')
                    .replace(',', '').strip()
                    for i in exam_details]

    # exam dictionary
    details = ['subject', 'year', 'exam', 'exercise']
    exam_dict = dict.fromkeys(details)
    exam_dict = {k:None for k in details}

    ## fill dictionary with values
    
    # subject
    if exam_details[0] == 'sociales':
        exam_dict['subject'] = 'Mates CCSS'
        # drop 2nd element of exam_details list
        del exam_details[1]
    else:
        exam_dict['subject'] = exam_details[0]
    # year
    exam_dict['year'] = int(exam_details[1])

    # parse the exam string
    if exam_details[2].startswith('reserva'):
        exam_dict['exam'] = ' '.join(exam_details[2:4]).title()
        exam_index_start = 4
    else:
        exam_dict['exam'] = exam_details[2].title()
        exam_index_start = 3
    
    exam_dict['exercise'] = ' '.join(exam_details[exam_index_start:]).title()

    
    return exam_dict

### Function to time the time elapsed in the processing stage

In [7]:
import time
def what_time():
    now = time.localtime()
    time_formated = time.strftime("%H:%M:%S on %Y-%m-%d ", now)
    return now, time_formated

def elapsed_time_minutes(start, end):
    start_seconds = time.mktime(start)
    end_seconds = time.mktime(end)
    return (end_seconds - start_seconds) / 60

### *generate_error_log*: generate a error log and add it to errors.log file

In [31]:
def generate_error_log(file, page, error_type, statement = None, details = None):
    log = f'Error processing {error_type}, in {file.name} at page {page+2}\n'
    
    with open('errors.log', 'a') as f:
        f.write(log)
        f.write('*' * 10 + '\n')
        if statement:
            f.write(f'{statement}\n')
        elif details:
            f.write(f'{details}\n')
        f.write('*' * 10 + '\n')
    
    print(log)

### Testing for Mates CCSS

In [9]:
# file = Path('./pdf_files/Funciones CCSS/2020 - Funciones CCSS.pdf')
# pages = pdf_to_text(file)

In [10]:
# s = [extract_statement(i) for i in pages]

In [11]:
# for index, statement in enumerate(s):
#     try:
#         extract_exam_details(statement)
#     except:
#         print(f'page: {index + 2}') # page number (index)
#         print(statement)
#         break

In [12]:
# extract_exam_details(s[3])

## Process the pdf files

In [32]:
def process_file(file):
    exercises_dict = {}
    topic = file.stem.split(' - ')[-1]   
    
    # lists to store the values
    subjects = []
    years = []
    exams = []
    exercises = []
    statements = []
    pages = pdf_to_text(file)

    for index, page in enumerate(pages):
        try:
            statement = extract_statement(page)
        # error with extracting statement
        except:
            generate_error_log(file = file, page = index, error_type= 'statement', statement=page)
            continue
        try:
            details = extract_exam_details(statement)
            subjects.append(details['subject'])
            years.append(details['year'])
            exams.append(details['exam'])
            exercises.append(details['exercise'])
            statements.append(statement)
        # error with extracting details
        except:
            generate_error_log(file = file, page = index, error_type= 'details', details = statement)
            continue
        
    print(f'Success parsing file: {file.stem}')


    # generate the key and content for the exercises dictionary
    key = details['subject'] + ' ' + str(details['year']) + ' ' + topic
    exercises_dict[key] = {
        'subject' : subjects,
        'year' : years,
        'topic' : [topic] * len(subjects),
        'exam' : exams,
        'exercise' : exercises,
        'statement' : statements
    }

    return exercises_dict

## Save the parsed content into a dataframe

In [14]:
def create_content_dict():
    # create the dict from keys
    keys = ['subject', 'year', 'topic', 'exam', 'exercise', 'statement']
    #content = dict.fromkeys(exercises_dict[first_processed_file].keys())
    content = dict.fromkeys(keys)
    # fill the dict with empty lists
    for key in content.keys():
        content[key] = []

    return content

In [15]:
folders = sorted( [i for i in Path.iterdir(Path('pdf_files'))] )

In [16]:
def process_folder(folder):
    df = pd.DataFrame()
    files = sorted( [i for i in Path.iterdir(folder)
                        if i.suffix == '.pdf'] )
    for file in files:
        try:
            exercises_dict = process_file(file)
            # each year contains a dictionary with keys containing lists of values
            for year, dict_ in exercises_dict.items():
                df = pd.concat([df, pd.DataFrame(dict_)], axis = 0)

        except:
            pass
        
    # correct wrong subjects
    df.subject = df.subject.apply(lambda x: 'Química'
                                    if x.endswith('mica') else x)
    df.to_csv(f'./csv/exercises_{folder.stem}.csv', index=False)

In [47]:
def add_row(df, row):
    return pd.concat([df, pd.DataFrame(row, columns = df.columns, index = [0])], axis = 0)

In [48]:
'''
a) Calcule los valores de a y b para que la función f ( x )  
derivable en el punto de abscisa x  1
b) Para a  1 y b  2 , estudie su monotonía y determine las ecuaciones de sus asíntotas, si
existen.
SOCIALES II. 2016. JUNIO. EJERCICIO 2. OPCIÓN A
'''
row = {'subject' : 'Mates CCSS',
       'year' : 2016,
       'topic' : 'Funciones CCSS',
       'exam' : 'Junio',
       'exercise' : 'Ejercicio 2, Opción A',
       'statement' : '''a) Calcule los valores de a y b para que la función f (x) sea derivable en el punto de abscisa x = 1
b) Para a = 1 y b = 2 , estudie su monotonía y determine las ecuaciones de sus asíntotas, si existen'''
}

In [50]:
df = add_row(df, row)

In [51]:
df.to_csv(f'./csv/exercises_Funciones CCSS.csv', index=False)

### Process pdf files

In [None]:
start, start_formatted = what_time()
send_telegram_message(f'Parsing process started at: {start_formatted}')

for folder in folders:
    try:
        process_folder(folder)
        print(f'Success processing folder: {folder.stem}')
        send_telegram_message(f'Success processing folder: {folder.stem}')
    except Exception as e:
        send_telegram_message(f'Error processing folder: {folder.stem}' \
                              f'\n{e}')
end, end_formatted = what_time()

send_telegram_message(f'Parsing process finished at: {end_formatted}'\
                      f'\nTime elapsed in minutes: {elapsed_time_minutes(start, end)}')
send_telegram_message('✅ Your files have been processed!')


In [18]:
# # Create a pool of worker processes (adjust number as needed)
# with Pool(processes=os.cpu_count()) as pool:
#     pool.map(process_folder, folders)

Create the pandas dataframe

In [19]:
csv_files = [i for i in Path.iterdir(Path('./csv'))
             if i.suffix == '.csv' and not i.stem.startswith('all')]

In [None]:
csv_files

In [20]:
df = pd.read_csv(csv_files[0])

In [None]:
# df = pd.read_csv(csv_files[0])
# for file in csv_files[1:]:
#     df = pd.concat([df, pd.read_csv(file)], axis = 0)
#     #df.to_csv('./csv/all_exercises.csv', index=False)

# df.sample(10)

In [21]:
df.isna().sum()

subject      0
year         0
topic        0
exam         0
exercise     0
statement    0
dtype: int64

In [30]:
df[~df.year.isna()]

Unnamed: 0,subject,year,topic,exam,exercise,statement
0,MATES_CCSS,2001,Funciones CCSS,Junio,"Ejercicio 2, Opción A",Calcule las funciones derivadas de las siguien...
1,MATES_CCSS,2001,Funciones CCSS,Reserva 1,"Ejercicio 2, Opción A","beneficio diario, en euros, será: B(x) =-—10x?..."
2,MATES_CCSS,2001,Funciones CCSS,Reserva 2,"Ejercicio 2, Opción B",El consumo de luz (en miles de pesetas) de una...
3,Mates CCSS,2001,Funciones CCSS,Reserva 3,"Ejercicio 2, Opción B",Determine los valores que han de tomar “a” y “...
4,MATES_CCSS,2001,Funciones CCSS,Reserva 4,"Ejercicio 2, Opción B",El estudio de la rentabilidad de una empresa r...
...,...,...,...,...,...,...
160,MATES_CCSS,2018,Funciones CCSS,Reserva 3,"Ejercicio 2, Opción B",Se considera la función f(x)= * si —-1<x<0\nx+...
161,MATES_CCSS,2018,Funciones CCSS,Reserva 4,"Ejercicio 2, Opción A",: 0% si x<3\nSe considera la función f(x)=\n=x...
162,MATES_CCSS,2018,Funciones CCSS,Reserva 4,"Ejercicio 2, Opción B","3 1) 2\nDe"" (x?-5)? =e+D""\nFx) ( ) g(x) In(r7+..."
163,MATES_CCSS,2018,Funciones CCSS,Septiembre,"Ejercicio 2, Opción A","El consumo de cereales en una ciudad, en miles..."


In [None]:
df.shape

In [23]:
df.subject.unique()

array(['MATES_CCSS', 'Mates CCSS'], dtype=object)

In [29]:
df.exercise.unique()

array(['Ejercicio 2, Opción A', 'Ejercicio 2, Opción B'], dtype=object)

In [25]:
df.exercise = df.exercise.apply(lambda x: x.replace('Opcióon', 'Opción').replace('Opcion', 'Opción'))

In [27]:
def add_comma(string):
    if len(string.split(' ')) == 4 and ',' not in string:
        words = string.split(' ')
        return f'{words[0]} {words[1]}, {words[2]} {words[3]}'
    else:
        return string

In [28]:
df.exercise = df.exercise.apply(add_comma)

In [None]:
df.to_csv(f'./csv/all_exercises_{df.subject.unique()[0]}.csv', index=False)