In [18]:
from imutils.perspective import four_point_transform
from pdf2image import convert_from_path
from imutils import contours
from dynamsoft_barcode_reader_bundle import *
import numpy as np
import imutils
import cv2
import time
import os
import json

CONFIG = "./config/config.json"


In [19]:
def read_config():
    with open(CONFIG) as f:
        config = json.load(f)["config"]

    return config["multithread_count"], config["in_folder"], config["out_folder"], config["out_file"], config["mark_scheme"], config["dpi"], config["questions"]

MULTITHREAD, IN_FOLDER_PATH, OUT_FOLDER_PATH, OUT_FILE, MARK_IN, DPI, QUESTIONS = read_config()

First we will read all files from the ./fileIn folder, and convert them all to numpy arrays.

In [20]:
def get_all_pages():
    all_pages = []

    start_time = time.time()

    for file_name in os.listdir(IN_FOLDER_PATH):
        path = IN_FOLDER_PATH + file_name
        
        if file_name.endswith(".pdf") :
            pages = convert_from_path(path, dpi = DPI, thread_count=MULTITHREAD, grayscale=True)
            for i in range(len(pages)):
                pages[i] = np.array(pages[i])
            all_pages.extend(np.array(pages))
        elif not file_name.startswith("."):
            page = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
            all_pages.append(page)
    
    end_time = time.time()
    time_taken = end_time - start_time

    print(f'PDF to image:\n{len(all_pages)} pages read at dpi {DPI}\n{time_taken:.2f} seconds taken\n\n')
    
    return all_pages

pages = get_all_pages()

PDF to image:
53 pages read at dpi 150
5.19 seconds taken




In [21]:
def findBarcode(pages):
    barcodes = []

    errorCode, errorMsg = LicenseManager.init_license("t0068lQAAAFOzmd4LuzieTZiIUJB0/zbVbun10frmeJpQglgX+5yRwXCYNHWoiRKMDkQca2pt1eyBAy2gFBJenqGlz3YtFSs=;t0069lQAAAGfEQ04ThQCnPBRkXD1c0coSrEPVKx9aOqX6/DXjXjiOn2aEn8WpF1mLMT68yR7TFTH3RaOzyu+2vWwipTEgbBNg;t0069lQAAAAaaOX6utHM4pTlZ7t0wGDRL1b1LRHdbYLNvacxR50R9pR2+3ocPFfJPLSv6yppCVuJoadTgopabM2/YugPMnQdg")
    if errorCode != EnumErrorCode.EC_OK and errorCode != EnumErrorCode.EC_LICENSE_CACHE_USED:
        
        ("License initialization failed: ErrorCode:", errorCode, ", ErrorString:", errorMsg)
    else:
        cvr = CaptureVisionRouter()
        
        for page in pages:
            height = page.shape[0]
            page = page[:height // 2, :-1]

            result = cvr.capture(page)

            if result.get_error_code() != EnumErrorCode.EC_OK:
                print("Error:", result.get_error_code(), result.get_error_string())
            barcode_result = result.get_decoded_barcodes_result()

            if barcode_result is None or barcode_result.get_items() == 0:
                barcodes.append("no_barcode_detected")
            else:
                items = barcode_result.get_items()
                sn = items[0].get_text()
                barcodes.append(sn[4:])
    
    return barcodes

barcodes = findBarcode(pages)

In [22]:
def findContour(pages):
    index = 0
    cropped_pages = []
    for temp_page in pages:
        height = temp_page.shape[0]
        page = temp_page[height // 3:, :]
        thresh = cv2.threshold(page, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU)[1]

        blurred = cv2.GaussianBlur(thresh, (5, 5), 0)

        edged = cv2.Canny(blurred, 75, 200)

        cnts = cv2.findContours(edged.copy(), cv2.RETR_EXTERNAL,cv2.CHAIN_APPROX_SIMPLE)
        cnts = imutils.grab_contours(cnts)
        docCnt = None
        
        if len(cnts) > 0:
            # sorting the contours according to their size in descending order
            cnts = sorted(cnts, key=cv2.contourArea, reverse=True)

            # looping over the sorted contours
            for c in cnts:
                # approximating the contour
                peri = cv2.arcLength(c, True)
                approx = cv2.approxPolyDP(c, 0.02 * peri, True)

                # if our approximated contour has four points, then we can assume we have found the paper
                if len(approx) == 4:
                    docCnt = approx
                    break
        
        paper = four_point_transform(thresh, docCnt.reshape(4, 2))

        # cv2.imwrite(OUT_FOLDER_PATH + f"result{index}.jpg", paper)

        index += 1

        cropped_pages.append(paper.copy())
    return cropped_pages


cropped_pages = findContour(pages)

In [23]:
CODE_MAP = {-2: "X", -1: "O", 0: "A", 1: "B", 2: "C", 3: "D", 4: "E"}

def crop_by_third(page):
    thirds = []
    for i in range(3):
        width = page.shape[1]
        thirds.append(page[:, i * width // 3 : (i + 1) * width // 3])
        # cv2.imwrite(OUT_FOLDER_PATH + f"test{i}.jpg", page[:, i * width // 3 : (i + 1) * width // 3])
    return thirds


def get_answer_by_third(third, bubbles):
    cnts = cv2.findContours(third.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    cnts = imutils.grab_contours(cnts)
    questionCnts = []
    answer = []

    for c in cnts:
        (_, _, w, h) = cv2.boundingRect(c)
        ar = w / float(h)

        if w >= 20 and h >= 20 and ar > 0.8 and ar < 1.2:
            area = cv2.contourArea(c)
            perimeter = cv2.arcLength(c, True)

            if perimeter > 0:
                circularity = (4 * np.pi * area) / (perimeter ** 2)
                if circularity > 0.3:
                    questionCnts.append(c)
    
    if len(questionCnts) != bubbles:
        return None

    questionCnts = contours.sort_contours(questionCnts,method="top-to-bottom")[0]

    for (q, i) in enumerate(np.arange(0, len(questionCnts), 5)):
        cnts = contours.sort_contours(questionCnts[i:i + 5])[0]
        filled = -1

        for (j, c) in enumerate(cnts):
            mask = np.zeros(third.shape, dtype="uint8")
            cv2.drawContours(mask, [c], -1, 255, -1)

            mask = cv2.bitwise_and(third, third, mask=mask)
            total = cv2.countNonZero(mask)

            if total / cv2.contourArea(c) > 0.8:
                filled = -2 if filled != -1 else j
        answer.append(CODE_MAP[filled])
    
    return answer



def get_answers(barcodes, pages, full_pages):
    fail_count = 1
    all_answer = {}

    for barcode, page, full_page in zip(barcodes, pages, full_pages):
        answer = []

        thirds = crop_by_third(page)

        for i in range(3):
            third = thirds[i]
            temp_answer = get_answer_by_third(third, 25 if i == 2 else 50)
            if temp_answer:
                answer.extend(temp_answer)
            else:
                print(f"Error no. {fail_count} on {barcode}, proceeding to next contestant")
                cv2.imwrite(OUT_FOLDER_PATH + f"{barcode}.jpg", full_page)
                fail_count += 1
                answer = None
                break
        
        all_answer[barcode] = answer
    
    return all_answer
            
    
answers = get_answers(barcodes, cropped_pages, pages)

Error no. 1 on O-022-009, proceeding to next contestant
Error no. 2 on O-022-046, proceeding to next contestant
Error no. 3 on O-022-072, proceeding to next contestant
Error no. 4 on O-022-075, proceeding to next contestant
Error no. 5 on O-022-054, proceeding to next contestant
Error no. 6 on O-022-081, proceeding to next contestant
Error no. 7 on O-022-078, proceeding to next contestant
Error no. 8 on O-022-057, proceeding to next contestant
Error no. 9 on no_barcode_detected, proceeding to next contestant
Error no. 10 on O-022-019, proceeding to next contestant
Error no. 11 on O-022-063, proceeding to next contestant
Error no. 12 on O-022-086, proceeding to next contestant
Error no. 13 on O-022-020, proceeding to next contestant
Error no. 14 on O-022-058, proceeding to next contestant
Error no. 15 on O-022-012, proceeding to next contestant
Error no. 16 on O-022-064, proceeding to next contestant
Error no. 17 on O-022-065, proceeding to next contestant
Error no. 18 on O-022-042, pro

In [24]:
def read_json():
    with open(MARK_IN) as f:
        mark_scheme = json.load(f)

    return mark_scheme["answer_sheet"]

def grade_students(mark_scheme, answers):
    marks = ""
    for answer in answers:
        info_dump = f"{answer},"
        mark = 0
        tiebreaker = 0
        if answers[answer]:
            for i in range(QUESTIONS):
                student_answer = answers[answer][i]
                scheme = mark_scheme[f"question_{i+1}"]
                info_dump += f"{student_answer},"
                if student_answer == "O":
                    mark += scheme["no_answer"]
                elif student_answer == scheme["answer"]:
                    mark += scheme["correct"]
                    tiebreaker += i + 1
                else:
                    mark += scheme["incorrect"]
            info_dump += f"{mark},{tiebreaker}\n"
        else:
            info_dump += f"scan failed" + "," * 26 + "\n"
        marks += info_dump
    
    return marks

def save_csv(content):
    with open(f"{OUT_FOLDER_PATH+OUT_FILE}", "w") as f:
        header = "id,"
        for i in range(QUESTIONS):
            header += f"{i+1},"
        header += "mark,tiebreaker\n"
        f.write(header)
        f.write(content)
    
def main():
    mark_scheme = read_json()
    content = grade_students(mark_scheme, answers)
    save_csv(content)
    print("Scan complete.")
main()



Scan complete.
