This code must be run locally, never on Google Colab, because there are functions that don't work properly at this platform.

In [None]:
# When running it, change the directoy to that where this file is in.
directory  = "/home/merlin/coding/ic/createDataset"

In [None]:
# Google Drive folder IDs:
dataID =
dataWithResults =

In [2]:
import pandas as pd

from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from pydrive.files import GoogleDriveFile

import pdfplumber
import re

import os
import csv
import glob

In [None]:
# Verifying Google Drive API authorization. For this is necessary a Google account. To avoid using personal accounts, 
# there is a specific account to this, that should be logged in, when asked, to script work properly.
# Email:
# Password:

gAuth = GoogleAuth()
gAuth.LocalWebserverAuth()
gDrive = GoogleDrive(gAuth)

In [None]:
def verifyCsvExistence() -> GoogleDriveFile | None:
    '''
    Searches on project Google Drive a CSV with exam results. Returns the file if
    it exists, otherwise None.
    '''

    # Get a list of files in the 'dataID' folder.
    files = gDrive.Lists({'q':  "\'" + dataID + "\' in parents and trashed=false" }).GetList()
    
    for f in files:
        # If a CSV file with exam results already exists, downloading it.
        if f['title'] == "examResults.csv":
            f.GetContentFile("oldExamResults.csv")
            return f
    
    return None

def appendRegisteredPacients(registeredPacients: list):
    '''
    Adds all the already registered pacients to a list. 
    '''

    # Searching the CSV with these old pacients.
    csvFile = glob.glob("oldExamResults*")[0]

    # Adding the pacient ID to the list.
    with open(csvFile, "r") as f:
        reader = csv.DictReader(f)
        for row in reader:
            pacient = row['ID']
            registeredPacients.append(pacient)

def extractTextFromPdf(pdfFilename: str) -> str:
    '''
    Uses PdfPlumber library to transform PDF text information in a string.
    '''

    text = ""

    with pdfplumber.open(pdfFilename) as pdf:
        for page in pdf.pages:
            text += page.extract_text()

    return text

def getExamResults(examResultsList: list, pacients: list, registeredPacients: list):
    '''
    Adds to 'examResultsList' all the new pacients exam results.
    '''

    # Iterating through the pacients.
    for pacient in pacients:

        # Ignoring already registered pacients.
        if pacient['title'] in registeredPacients:
            continue

        pacientID = pacient['id']

        # Dictionary containing pacient ID, laboratorial exams names (keys) and its results (value).
        examResult = {"ID": pacient['title']}

        # Get a list of files in the pacient folder.
        files = gDrive.ListFile({'q':  "\'" + pacientID + "\' in parents and trashed=false" }).GetList()

        # Iterating through pacient files.
        for f in files:

            # When find the exam result, downloading it, extracting its data, then erasing the file.
            if f['title'] == "laudo.pdf":
                f.GetContentFile(f['title'])

                text = extractTextFromPdf('laudo.pdf')
                examResultsList.append(getexamResults(text, examResult))

                os.remove(directory + "/laudo.pdf")
    
def extractNameFromText(text: str, offset: int) -> str:
    '''
    Extracts the name of laboratorial exam from a string.
    '''

    name = ""

    # Getting the exam name backwards.
    while text[offset] != "\n":
        name += text[offset]
        offset -= 1

    # Inverting the name and returning it.
    return name[::-1]

def extractNumberFromText(text: str, offset: int) -> tuple[str | int, int]:
    '''
    Extracts the number of the exam result from a string.
    '''

    number = ""
    ignoredChars = 0

    # Ignoring non numeric information.
    while not text[offset + ignoredChars].isnumeric() and text[offset + ignoredChars] != '\n':
        ignoredChars += 1

    # If all the data from this exam is textual, returning a flag tuple.
    if text[offset + ignoredChars] == '\n':
        return [-1, -1]

    # Extracting the numberic result.
    offset += ignoredChars
    while text[offset].isnumeric() or text[offset] == "," or text[offset] == ".":
        number = number + text[offset]
        offset += 1

    # Adjusting the floating point from a possible ',' to '.'.
    number = number.replace(".", "")
    number = number.replace(",", ".")

    return [number, ignoredChars]

def extractUnitFromText(text: str, offset: int) -> str:
    '''
    Extracts the unit of measurement used in the exam result from a string.
    '''

    unit = ""
    
    while text[offset].isalnum() or text[offset] == "/" or text[offset] == "%":
        unit = unit + text[offset]
        offset += 1

    return unit

def getHemogram(text: str, examResults: dict[str, float]):
    '''
    In the exam results PDF, the Hemogram has a different pattern, so it needs
    a special treatment when extracting its data.
    '''

    # All type of information collected by hemogram.
    hemogramExams = ["ERITRÓCITOS", "HEMOGLOBINA", "HEMATÓCRITO", "V.C.M.", "H.C.M.", "C.H.C.M.", "R.D.W.", 
                       "LEUCÓCITOS", "BASÓFILOS","EOSINÓFILOS", "NEUTRÓFILOS", "Promielócitos", "Mielócitos",
                        "Metamielócitos", "Bastonetes", "Segmentados", "LINFÓCITOS", "Linfócitos reativos", 
                        "MONÓCITOS", "PLASMÓCITOS","BLASTOS", "PLAQUETAS", "V.P.M."]

    for exam in hemogramExams:
        offset = 0

        # Adjusting offset in the needed cases.
        if exam in hemogramExams[8:21]:
            offset += 2
        
        # Getting the numeric result.
        resultado = extractNumberFromText(text, text.find(exam) + len(exam) + 1 + offset)

        # Getting the unit of measurement.
        if exam in hemogramExams[7:23]:
            unit = "n°/mm³"
        else:
            unit = extractUnitFromText(text, text.find(exam) + len(exam) + len(resultado[0]) + resultado[1] + 2)

        # Updating the dictionary containing the exam results.
        examResults[exam + " " + "[{}]".format(unit)] = float(resultado[0])

def getGlycatedHemoglobin(text: str, examResults: dict[str, float]):
    '''
    As the Hemoglobin in the above function, the Glycated Hemoglobin has a
    different PDF pattern.
    '''

    # All type of information collected by glycated hemogram.
    exams = ["HbA1c", "HbA1a", "HbA1b", "HbF", "HbLA1c", "HbA0", "Glicemia média estimada"]

    for exame in exams:

        # Getting the numeric result.
        resultado = extractNumberFromText(text, text.find(exame) + len(exame))

        # Getting the unit of measurement.
        if exame != exams[6]:
            unit = "%"
        else:
            unit = "mg/dL"
        
        # Updating the dictionary containing the exam results.
        examResults[exame + " " + "[{}]".format(unit)] = float(resultado[0])  

def getOtherResults(text: str, examResults: dict[str, float]):
    '''
    Gets the data of other exams that are not covered on Hemogram.
    '''
    
    # Extracting Glycated Hemoglobin data if it exists.
    if text.find("HEMOGLOBINA GLICADA") != -1:
        getGlycatedHemoglobin(text, examResults)

    # Extracting the exams results not collected by Hemogram nor Glycated Hemoglobin.
    for exame in re.finditer("Resultado :", text):
        nomeExame = extractNameFromText(text, exame.start() - 2)
        resultado = extractNumberFromText(text, exame.start() + 12)

        # Verifying if the data is useful to us.
        if resultado == [-1, -1]:
            return
        
        unit = extractUnitFromText(text, exame.start() + len(resultado[0]) + resultado[1] + 13)

        # Updating the dictionary containing the exam results.
        examResults[nomeExame + " " + "[{}]".format(unit)] = float(resultado[0])

def getexamResults(text: str, examResults: dict[str, float]) -> dict[str, float]:
    '''
    Returns a dictionary containing the exam names (keys) and its results (values).
    '''

    if text.find("HEMOGRAMA") != -1:
        getHemogram(text, examResults)
    getOtherResults(text, examResults)
    
    return examResults

def getHeaderNames(examResultsList: list) -> list:
    '''
    Returns a list with the laboratorial exams names.
    '''

    headerNames = []

    # Iterating through each person exam results.
    for examResults in examResultsList:

        # For each exam...
        for name in examResults.keys():

            # If it is a new type of exam, add its name on 'headerNames'.
            if not name in headerNames:
                headerNames.append(name)

    return headerNames

def writeNewResults(headerNames: list, examResultsList: list):
    '''
    Writes the new pacients data to the "newExamResults.csv" file.
    '''
    
    with open("newExamResults.csv", mode='w') as resultsFile:
        resultsWriter = csv.DictWriter(resultsFile, fieldnames=headerNames)
        resultsWriter.writeheader()

        data = {}
        for examResult in examResultsList:
            
            for exam in headerNames:

                # If the pacient did the 'exam', putting its result in the
                # data. Else putting a ''.
                try:
                    data[exam] = examResult[exam]
                except:
                    data[exam] = ""

            resultsWriter.writerow(data)
            data = {}

def mergeCSV():
    '''
    Merges the data in "oldExamResults.csv" with that in "newExamResults.csv",
    creating "examResults.csv".
    '''

    oldData = pd.read_csv(directory + "oldExamResults.csv")
    newData = pd.read_csv(directory + "newExamResults.csv")

    oldColumns = set(oldData.columns)
    newColumns = set(newData.columns)
    sameColumns = list(oldColumns & newColumns)

    allData = pd.merge(oldData, newData, on=sameColumns, how="outer")
    allData.to_csv(directory + "examResults.csv", index=False)

    os.remove(directory + "newExamResults.csv")
    os.remove(directory + "oldExamResults.csv")

def uploadExamResults(oldCsv: GoogleDriveFile | None):
    '''
    Uploads the exam results on project Google Drive.
    '''

    # Searching the file locally.
    csvFile = glob.glob("examResults*")[0]

    with open(csvFile, "r") as f:
        fileName = os.path.basename(f.name)
        filePath = os.path.join(directory, fileName)

        # Uploading the file on Google Drive.
        driveFile = oldCsv
        if oldCsv == None:
            driveFile = gDrive.CreateFile({
                'parents': [{'id': dataID}],
                'title': "examResults.csv"
            })
        driveFile.SetContentFile(filePath)
        driveFile.Upload()

        os.remove(filePath)

In [None]:
registeredPacients = []
oldDataExistsFlag = False

oldCsv = verifyCsvExistence()

# If there is an old CSV, adding the already registered pacients to the list.
if oldCsv != None:
    oldDataExistsFlag = True
    appendRegisteredPacients(registeredPacients)   

# Get a list of pacients that already have their exam results.
pacients = gDrive.ListFile({'q':  "\'" + dataWithResults + "\' in parents and trashed=false" }).GetList()

# List that will store the exam results of new pacients.
examResultsList = []

getExamResults(examResultsList, pacients, registeredPacients)

# Getting the laboratorial exams names.
headerNames = getHeaderNames(examResultsList)

# If there are new pacients...
if examResultsList != []:

    # Writing the new pacient data to "newExamResults.csv".
    writeNewResults(headerNames, examResultsList)

    # If there is old data, merging the CSVs with old and new data on "examResults.csv".
    if oldDataExistsFlag:
        mergeCSV()
    # Otherwise only renaming the file.
    else:
        os.rename("newExamResults.csv", "examResults.csv")

    uploadExamResults(oldCsv)
# Otherwise only deleting the CSV.
else:
    os.remove(directory + "oldExamResults.csv")