In [31]:
#conda activate text-analytics

import os
import json as js
import requests, uuid
import re, math
import PyPDF2
import fitz
from datetime import datetime, timedelta
from docx import Document
from docx.enum.text import WD_COLOR_INDEX
from azure.core.credentials import AzureKeyCredential
from azure.ai.textanalytics import TextAnalyticsClient
from azure.ai.translation.document import DocumentTranslationClient
from azure.storage.blob import BlobServiceClient
from azure.storage.blob._shared_access_signature import BlobSharedAccessSignature

#Loads info from config file 
with open('./config.json','r') as file:
    config = js.load(file)

<h1>Demo setup </h1>

Before running this demo you need few things set up: 

<h4>Azure Resources</h4>
<ul>
<li> Storage Account, in the blob storage create two containers: </li>
<ul>
<li> data </li>
<li> output </li>
</ul>
<li> Azure Text Analytics: https://docs.microsoft.com/en-us/azure/cognitive-services/text-analytics/overview </li>
<li> Azure Translator: https://docs.microsoft.com/en-us/azure/cognitive-services/translator/document-translation/get-started-with-document-translation?WT.mc_id=Portal-Microsoft_Azure_ProjectOxford&tabs=csharp </li>
<ul>
<li>Create a SAS token policy to use Document Translator here: https://docs.microsoft.com/en-us/azure/cognitive-services/translator/document-translation/create-sas-tokens?tabs=Containers</li>
</ul>
</ul>

<h4> Folder Structure </h4>
In the root folder:
<ul>
<li> "data" folder, inside the data folder: </li>
<ul>
<li> "original" folder (where your original pdf files will be)</li>
<li> "output" folder (where your outputted highlighted docx file will be stored)</li>
<li> "preprocessed" folder (where your docx file will be)</li>
<li> "translated" folder (where you can download your translated docx)</li>
</ul>
<li> "config.json" file, with all the path and keys </li>
<li> notebook </li>
</ul>

**N.B. Edit variable <em>container_name</em> with the name of your container.**

<h2> Blob functions helpers </h2>

Helper functions to run basic operations on Azure Blob Storage.

In [2]:
#Blob helpers

######################################
# Creates connection to blob storage #
######################################
def ConnectToBlobStorage(connectionString):
    #connects to the Azure Storage Account
    blob_service_client = BlobServiceClient.from_connection_string(connectionString)
    return blob_service_client

###################################
# Lists container in blob storage #
###################################
def GetContainersInStorage(blob_service_client):
    container_list = blob_service_client.list_containers()
    return container_list

#####################################
# Lists blobs in specific container #
#####################################
def GetBlobsInContainer(blobClient,containerName):
    #gets the container you want to list
    container_client = blobClient.get_container_client(containerName)
    #gets the blobs
    blobs_list = container_client.list_blobs()
    return blobs_list

###############################################
# Downloads locally specific blob in storage  #
###############################################
def DownloadBlobLocally(blobServiceClient,containerName,fileName,downloadPath):
    #creates blob client
    blob_client = blobServiceClient.get_blob_client(container=containerName,blob=fileName)
    print("Downloading from Azure Storage as blob: " + fileName)
    #downloads file
    download_file_path= downloadPath + fileName
    with open(download_file_path,"wb") as download_file:
        download_file.write(blob_client.download_blob().readall())
    print("Blob downloaded in the following folder: "+download_file_path)
    return download_file_path

########################################################
# Uploads local file to specific container in storage  #
########################################################
def UploadFileToBlob(blob_service_client,containerName,fileName,filePath):
    #creates blob client
    blob_client = blob_service_client.get_blob_client(container=containerName,blob=fileName)  
    print("Uploading to Azure Storage as blob: " + fileName)
    #uploads file
    with open(filePath,"rb") as upload_file:
        blob_client.upload_blob(upload_file)
    print("Blob uploaded!")

##########################################
# Gets blob url from specific container  #
##########################################
def GetBlobURL(blob_service_client,blob_name,container_name):
    blob_client = blob_service_client.get_blob_client(container=container_name,blob=blob_name)
    blob_url = blob_client.url
    return blob_url

###########################################
# Creates SAS signature for specific blob #
###########################################
def CreateSASSignature(container_name, blob_name, permissions,expiry):
    blob_shared_access_signature = BlobSharedAccessSignature(config["sa_account_name"],config["sa_key"])
    sas_token = blob_shared_access_signature.generate_blob(container_name,blob_name,expiry=expiry,permission="rw")
    return sas_token

<h2> Local function helpers </h2>

Helper functions to run basic operations on local files.

In [3]:
#Local helpers

################################
# Lists files in folder        #
################################
def FilesInFolder(path):
    file_list = os.listdir(path)
    return file_list

################################
#Extracts text from docx file  #
################################
def ExtractTextFromLocal(docPath):
    doc = Document(docPath)
    full_text = []
    for t in doc.paragraphs:
        full_text.append(t.text)
    return full_text

##############################################
# Highlights entities found in the docx file #
##############################################
def EntitySearch(document,keyword,dstPath,color):
    for p in document.paragraphs:
        if keyword in p.text:
            for run in p.runs:
                if keyword in run.text:
                    temp = run.text.split(keyword)
                    run.clear()
                    for i in range(len(temp)-1):
                        #run.add_text(temp[i])
                        run.add_text(keyword)
                        if(color == 'y'):
                            run.font.highlight_color = WD_COLOR_INDEX.YELLOW
                        else:
                            run.font.highlight_color = WD_COLOR_INDEX.GREEN
    document.save(dstPath)

################################################
# Takes extracted document text and divides it #
# in max n parts to avoid API limits           #
################################################
def ChunkText(text,n):
    max_size = math.ceil(len(text)/n)
    max_length = len(text)-1
    chunks_text = []
    for x in range(0,max_length,max_size):
        temp = ""
        for t in range(x,x+max_size):
            if(t >= max_length): 
                break 
            temp = temp + " "+ text[t]   
        chunks_text.append(temp)
    return chunks_text

<h2> Text Analytics function helpers </h2>

Helper functions to run basic operations using Azure Text Analytics API.

In [11]:
#Text Analytics helpers

#############################
# Detects document language #
#############################
def LanguageDetection(text_analytics_client,full_text):
    language_detected = text_analytics_client.detect_language(full_text)
    print("Language detected: {}".format(language_detected[0].primary_language))
    return language_detected

############################################
# Translates documents online (from blob)  #
############################################
def DocumentTranslation(transaltor_client,source_url,target_url,language):
    poller = transaltor_client.begin_translation(source_url,target_url,language)
    result = poller.result()
    #print("Status: {}".format(poller.status()))
    #print("Created on: {}".format(poller.details.created_on))
    #print("Last updated on: {}".format(poller.details.last_updated_on))
    #print("Total number of translations on documents: {}".format(poller.details.documents_total_count))
    for document in result:
        print("Document ID: {}".format(document.id))
        print("Document status: {}".format(document.status))
        if document.status == "Succeeded":
            print("Source document location: {}".format(document.source_document_url))
            print("Translated document location: {}".format(document.translated_document_url))
            print("Translated to language: {}\n".format(document.translated_to))
            return document.translated_document_url
        else:
            print("Error Code: {}, Message: {}\n".format(document.error.code, document.error.message))

#######################################
# Extracts health entities from text  #
#######################################
def TextAnalyticsForHealth(text_analytics_client,text):
    poller = text_analytics_client.begin_analyze_healthcare_entities(text)
    result = poller.result()
    entities = []
    docs = [doc for doc in result if not doc.is_error]
    print("Healthcare results:")
    for idx, doc in enumerate(docs):
        for entity in doc.entities:
            entities.append(entity.text)
    #         print("Entity: {}".format(entity.text))
    #         print("...Normalized Text: {}".format(entity.normalized_text))
    #         print("...Category: {}".format(entity.category))
    #         print("...Subcategory: {}".format(entity.subcategory))
    #         print("...Offset: {}".format(entity.offset))
    #         print("...Confidence score: {}".format(entity.confidence_score))
    #         if entity.data_sources is not None:
    #             print("...Data Sources:")
    #             for data_source in entity.data_sources:
    #                 print("......Entity ID: {}".format(data_source.entity_id))
    #                 print("......Name: {}".format(data_source.name))
    #         if entity.assertion is not None:
    #             print("...Assertion:")
    #             print("......Conditionality: {}".format(entity.assertion.conditionality))
    #             print("......Certainty: {}".format(entity.assertion.certainty))
    #             print("......Association: {}".format(entity.assertion.association))
    #     for relation in doc.entity_relations:
    #         print("Relation of type: {} has the following roles".format(relation.relation_type))
    #         for role in relation.roles:
    #             print("...Role '{}' with entity '{}'".format(role.name, role.entity.text))
    #     print("------------------------------------------")
    return docs,entities

###########################
# Extracts PII from text  #
###########################
def TextAnalyticsPII(text_analytics_client,text,language):
    response = text_analytics_client.recognize_pii_entities(text, language=language)
    result = [doc for doc in response if not doc.is_error]
    pii_entities = []
    for idx, doc in enumerate(result):
        #print("Document text: {}".format(documents[idx]))
        #print("Redacted document text: {}".format(doc.redacted_text))
        for entity in doc.entities:
            pii_entities.append(entity.text)
            #print("...Entity: {}".format(entity.text))
            #print("......Category: {}".format(entity.category))
            #print("......Confidence Score: {}".format(entity.confidence_score))
            #print("......Offset: {}".format(entity.offset))
    return result,pii_entities

#############################################
# Execute all the steps to: extracts text   #
# from docx, translates it, extracts health #
#    entities, highlights them in docx,     #
#    extracts PII entities from docx        #
#############################################
def TextAnalyticsOnDocs(docs_folder,translated_folder,output_folder,ta_client,tr_client,blob_client,container_name,source_url,target_url):
    #Check files locally 
    file_list = FilesInFolder(docs_folder)
    for f in file_list:
        print("File processed {}".format(f))
        #Stores file path
        file_path = docs_folder + f
        #Extract file text
        extracted_text = ExtractTextFromLocal(file_path)
        ########################
        #       STEP 1         #
        #                      #
        #  LANGUAGE DETECTION  #
        ########################
        language_detected = LanguageDetection(ta_client,extracted_text)
        ########################
        #       STEP 2         #
        #                      #
        # DOCUMENT TRANSLATION #
        ########################
        #Upload file to Blob storage
        UploadFileToBlob(blob_client,container_name,f,file_path)
        #Translate document
        translated_doc_url = DocumentTranslation(tr_client,source_url,target_url,"en")
        #################################
        #             STEP 3            #
        #                               #
        # DOWNLOAD AND EXTRACT ENTITIES #
        #################################
        #Download file 
        translated_file_path = DownloadBlobLocally(blob_client,"output",f,translated_folder)
        fully_translated_text = ExtractTextFromLocal(translated_file_path)
        #Format text to be sent to the TA
        ct = ChunkText(fully_translated_text,10)
        #Extract entities
        entities, health_entities = TextAnalyticsForHealth(ta_client,ct)
        #################################
        #             STEP 4            #
        #                               #
        #   HIGHLIGHT ENTITIES IN DOCX  #
        #################################
        translated_document = Document(translated_file_path)
        #Highlighted doc path
        highlighted_doc_path = output_folder + f
        for d in entities:
            for entity in d.entities:
                print(entity.text)
                EntitySearch(translated_document,entity.text,highlighted_doc_path,'y')
        #################################
        #             STEP 5            #
        #                               #
        #              PII              #
        #################################
        pii_extraction = ChunkText(fully_translated_text,5)
        results,pii_entities = TextAnalyticsPII(ta_client,pii_extraction,"en")
        #################################
        #             STEP 6            #
        #                               #
        #         HIGHLIGHT PII         #
        #################################
        for r in results:
            for entity in r.entities:
                #Uncomment to show results
                #print(entity.text)
                EntitySearch(translated_document,entity.text,highlighted_doc_path,'g')

<h2>User input functions</h2>

In [5]:
###########################################
#    Print user choice on what kind       #
#    of operation wants to execute        #
###########################################

def UserActivityChoice():
    print(""" Hi, what do you whant to do? \n 
        1. Read PDF and extract text
        2. Detect PDF language 
        3. Translate PDF 
        4. Extract entities
        5. Extract PII 
        6. All the above 
        Digit 'exit' to close the prompt \n""")

    user_choice = input()   
    return(user_choice)

#####################################
#    Print user choice on which     #
#    file wants to execute the      #
#    operation on                   #
#####################################
def UserFileChoice(original_folder):
    files = FilesInFolder(original_folder)
    count_files = 1
    for f in files:
            print("{}. {}".format(count_files,f))
            count_files = count_files + 1
    print("{}. All the above \n".format(count_files))
    file_choice = input()
    num_docs = len(files)
    return file_choice,num_docs,files
    
################################################
#       Reads a pdf file and extract text      #
################################################

def ReadPDFandExtractText(file_path):
    pdf = PyPDF2.PdfFileReader(file_path)
    #Gets PDF page number
    page_num = pdf.getNumPages()
    pdf_text = ""
    if(page_num == 0):
        print("PDF is empty")
        return pdf_text
    else:
        for p in range(page_num):
            pdf_text = pdf_text + pdf.getPage(p).extractText()
        pdf_text = pdf_text.split('\n')
        return pdf_text


<h1> Text Analytics </h1>

<h3> Variables </h3> 

In [20]:
#Helpers
DOCS_FOLDER = config["preprocessed_path"]
ORIGINAL_FOLDER =config["pdfs_path"]
OUTPUT_FOLDER = config["output_path"]
TRANSLATED_FOLDER = config["translated_path"]
SOURCE_URL = config["sa_source_url"]
TARGET_URL = config["sa_target_url"]
NUM_MAX_DOCS = 0

#Connect to Azure Text Analytics service
ta_credentials = AzureKeyCredential(config["text_analytics_key"])
TEXT_ANALYTICS_CLIENT = TextAnalyticsClient(endpoint=config["text_analytics_endpoint"],credential=ta_credentials)

#Connect to Azure Translator service
tr_credentials = AzureKeyCredential(config["translator_key"])
TRANSLATOR_CLIENT= DocumentTranslationClient(endpoint=config["translator_documents_endpoint"],credential=tr_credentials)

#Blob storage connection
BLOBSERVICECLIENT = ConnectToBlobStorage(config["sa_connectionstring"])
#Hardcoded container name
CONTAINER_NAME_UP = "data"
CONTAINER_NAME_DOWN = "output"

<h3> Prompts for user input </h3>

In [22]:
user_choice = UserActivityChoice()
file_choice,NUM_MAX_DOCS,files = UserFileChoice(ORIGINAL_FOLDER)

 Hi, what do you whant to do? 
 
        1. Read PDF and extract text
        2. Detect PDF language 
        3. Translate PDF 
        4. Extract entities
        5. Extract PII 
        6. All the above 
        Digit 'exit' to close the prompt 

1. LDO_AOPR_1.pdf
2. LDO_AOPR_2.pdf
3. LDO_AUSBO_1.pdf
4. LDO_AUSBO_2.pdf
5. LDO_AUSBO_3.pdf
6. LDO_AUSBO_4.pdf
7. LDO_AUSBO_5.pdf
8. LDO_AVEC_1.pdf
9. LDO_IOR_1.pdf
10. LDO_IOR_2.pdf
11. LDO_IOR_3.pdf
12. VPS_AUSBO_1.pdf
13. VPS_AUSBO_2.pdf
14. VPS_AUSBO_3.pdf
15. VPS_IOR_1.pdf
16. VPS_IOR_2.pdf
17. All the above 



In [16]:
############################################
#          Recap of user choices           # 
############################################
if(int(file_choice) >= NUM_MAX_DOCS):
    print("You choose: {} on all docs. Starting execution. ".format(user_choice))
else:
    print("You choose: {} on {}. Starting execution. ".format(user_choice,files[int(file_choice)-1]))

#SelectionSwitch(user_choice,file_choice,files)

You choose: 3 on LDO_AUSBO_2.pdf. Starting execution. 


In [23]:
user_choice_num = int(user_choice)
file_choice_num = int(file_choice)

pdf_text = ""
translated_text = ""

if(user_choice_num == 1):
    ##########################
    # Read and extract file  #
    ##########################
    if(file_choice_num == NUM_MAX_DOCS):
        print("Executing on all documents")
    else:
        print("Reading and extracting text from {}".format(files[file_choice_num-1]))
        pdf_text = ReadPDFandExtractText(ORIGINAL_FOLDER+files[file_choice_num-1])
elif(user_choice_num == 2):
    ##########################
    #  Detect PDF language   #
    ##########################
    if(file_choice_num == NUM_MAX_DOCS):
        print("Executing on all documents")
    else:
        print("Detecting language from {}".format(files[file_choice_num-1]))
        if(pdf_text == ""):
            pdf_text = ReadPDFandExtractText(ORIGINAL_FOLDER+files[file_choice_num-1])
        else:
            language_detected = LanguageDetection(TEXT_ANALYTICS_CLIENT,pdf_text)
elif(user_choice_num == 3):
    ##########################
    #  Translate online PDF  #
    ##########################
    print("Which is the translation language?(it or en)")
    lang_translation = input()
    if(file_choice_num == NUM_MAX_DOCS):
        print("Executing on all documents")
    else:
        print("Translating {} document".format(files[file_choice_num-1]))
        #Uploads document to Azure Blob Storage
        UploadFileToBlob(BLOBSERVICECLIENT,CONTAINER_NAME_UP,files[file_choice_num-1],ORIGINAL_FOLDER+files[file_choice_num-1])
        if(lang_translation == 'en'):
            doc_url = DocumentTranslation(TRANSLATOR_CLIENT,SOURCE_URL,TARGET_URL,lang_translation)                    
        elif(lang_translation == 'it'):
            print("")
elif(user_choice_num == 4):
    ##########################
    #   Extract entities     #
    ##########################
    if(file_choice_num == NUM_MAX_DOCS):
        print("Executing on all documents")
    else:
        print("Extracting entities from {}".format(files[file_choice_num-1]))
        translated_file_path = DownloadBlobLocally(BLOBSERVICECLIENT,CONTAINER_NAME_DOWN,files[file_choice_num-1],TRANSLATED_FOLDER)
        translated_text = ReadPDFandExtractText(ORIGINAL_FOLDER+files[file_choice_num-1])
        translated_text_formatted = ChunkText(translated_text,10)
        entities, health_entities = TextAnalyticsForHealth(TEXT_ANALYTICS_CLIENT,translated_text_formatted)
elif(user_choice_num == 5):
    ##########################
    #       Extract PII      #
    ##########################
    if(file_choice_num == NUM_MAX_DOCS):
        print("Executing on all documents")
    else:
        print("Extracting PII from {}".format(files[file_choice_num-1]))
        if(translated_text == ""):
            translated_file_path = DownloadBlobLocally(BLOBSERVICECLIENT,CONTAINER_NAME_DOWN,files[file_choice_num-1],TRANSLATED_FOLDER)
            translated_text = ReadPDFandExtractText(ORIGINAL_FOLDER+files[file_choice_num-1])
        else:
            translated_text_pii = ChunkText(translated_text,5)
            pii_results,pii_entities = TextAnalyticsPII(TEXT_ANALYTICS_CLIENT,translated_text_pii,"en")
elif(user_choice_num == 6):
    ##########################
    #      All the above     #
    #     TO BE DONE         #
    ##########################
    if(file_choice_num == NUM_MAX_DOCS):
        print("Executing on all documents")
    else:
        print("Executing all operation on {}".format(files[file_choice_num-1]))
        #Reads the PDF
        pdf_text = ReadPDFandExtractText(ORIGINAL_FOLDER+files[file_choice_num-1])
        #Formats the PDF for extraction
        pdf_text_formatted = ChunkText(pdf_text,10)
        #Detect PDF language
        lang = LanguageDetection(TEXT_ANALYTICS_CLIENT,pdf_text)
        #Translate online PDF      
else:
    if(user_choice == 'exit'):
        print("Closing the prompt")
    else:
        print("No choice selected. {}".format(user_choice_num))

Extracting entities from LDO_AUSBO_2.pdf
Downloading from Azure Storage as blob: LDO_AUSBO_2.pdf
Blob downloaded in the following folder: C://Users//guscianc//source//repos//TextAnalytics4Health//data//translated//LDO_AUSBO_2.pdf
Healthcare results:


In [32]:
#Highlight PDF
entities
    
doc = fitz.open(ORIGINAL_FOLDER+files[file_choice_num-1])

for entity in health_entities:
    for page in doc:
        text_instances = page.searchFor(entity)
        for inst in text_instances:
            highlight = page.addHighlightAnnot(inst)
            highlight.update()

doc.save(OUTPUT_FOLDER+files[file_choice_num-1],garbage=4,deflate=True,clean=True)

