- using Service Principal connect to ADLS

In [0]:
dbutils.widgets.text("Modified_by", "adfls_dev_cld_admin@am.elcompanies.net")
modified_by = dbutils.widgets.get("Modified_by")
modified_by = f"'{modified_by}'"
print(modified_by)

In [0]:
%run "../RNDUtils/nb_utility"

- import required libraries

In [0]:
from azure.core.credentials import AzureKeyCredential
from azure.ai.formrecognizer import DocumentAnalysisClient
from azure.ai.documentintelligence import DocumentIntelligenceClient
from azure.ai.documentintelligence.models import DocumentAnalysisFeature, AnalyzeResult,ContentFormat

from azure.storage.filedatalake import DataLakeServiceClient
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import col

import logging, json, os
import pandas as pd
from io import StringIO

- Create AFR create_document_intelligence Client Object & analyse the Document

In [0]:
def create_document_intelligence_client(azure_form_rec_key, azure_form_rec_endpoint, api_version):
    '''
    Creates and returns an instance of the Document Intelligence Client for interacting with the Azure Form Recognizer service.

    This function initializes the `DocumentIntelligenceClient` using the provided credentials and endpoint. It handles potential exceptions during the client creation process.

    Parameters:
        azure_form_rec_key (str): The API key for the Azure Form Recognizer service.
        azure_form_rec_endpoint (str): The endpoint URL for the Azure Form Recognizer service.
        api_version (str): The API version to be used for the service requests.

    Returns:
        DocumentIntelligenceClient or None: An instance of `DocumentIntelligenceClient` if creation is successful; otherwise, `None` if an exception occurs.
    '''
    try:
        credential = AzureKeyCredential(azure_form_rec_key)
        document_intelligence_client = DocumentIntelligenceClient(
            endpoint=azure_form_rec_endpoint,
            credential=AzureKeyCredential(azure_form_rec_key),
            api_version=api_version
        )
        return document_intelligence_client
    except Exception as error:
        print(f"An exception occurred while creating Document Intelligence Client: {error}")
        return None


def analyze_intelligence_document(document_intelligence_client, azure_file_path_url):
    '''
    Analyzes a document using the Document Intelligence Client to extract key-value pairs and returns the result.

    This function submits a document for analysis using the "prebuilt-layout" model and waits for the analysis to complete. It supports PDF documents and extracts key-value pairs in Markdown format.

    Parameters:
        document_intelligence_client (DocumentIntelligenceClient): An instance of the `DocumentIntelligenceClient` to interact with Azure Form Recognizer.
        azure_file_path_url (str): The URL of the document to be analyzed.

    Returns:
        tuple: A tuple where the first element is a boolean indicating success (always `True` here), and the second element is the result of the document analysis.
    '''
    poller = document_intelligence_client.begin_analyze_document(
        "prebuilt-layout", 
        analyze_request=azure_file_path_url,
        content_type="application/pdf",
        features=[DocumentAnalysisFeature.KEY_VALUE_PAIRS],
        output_content_format=ContentFormat.MARKDOWN
    )
    result = poller.result()
    return True, result

- Create AFR create_document_analysis_client Client Object & analyse the Document

In [0]:
def create_document_analysis_client(azure_form_rec_key, azure_form_rec_endpoint, api_version):
    '''
    Creates and returns an instance of the Document Analysis Client for interacting with Azure's Document Analysis service.

    This function initializes the `DocumentAnalysisClient` using the provided API key, endpoint, and API version. It handles exceptions that may occur during the client creation process.

    Parameters:
        azure_form_rec_key (str): The API key for the Azure Document Analysis service.
        azure_form_rec_endpoint (str): The endpoint URL for the Azure Document Analysis service.
        api_version (str): The API version to use for the service requests.

    Returns:
        DocumentAnalysisClient or None: An instance of `DocumentAnalysisClient` if creation is successful; otherwise, `None` if an exception occurs.
    '''
    try:
        credential = AzureKeyCredential(azure_form_rec_key)
        
        document_analysis_client = DocumentAnalysisClient(
            endpoint=azure_form_rec_endpoint,
            credential=AzureKeyCredential(azure_form_rec_key),
            api_version=api_version
        )
        return document_analysis_client
    except Exception as error:
        print(f"An exception occurred while creating Document Analysis Client: {error}")
        return None


def analyze_document(document_analysis_client, azure_file_path_url):
    '''
    Analyzes a document using the Document Analysis Client to extract information based on the "prebuilt-layout" model.

    This function submits the document specified by the URL for analysis and waits for the results. The analysis is performed using the prebuilt layout model, which extracts layout and structure information from the document.

    Parameters:
        document_analysis_client (DocumentAnalysisClient): An instance of the `DocumentAnalysisClient` for interacting with the Azure Document Analysis service.
        azure_file_path_url (str): The URL of the document to be analyzed.

    Returns:
        tuple: A tuple where the first element is a boolean indicating success (always `True` here), and the second element is the result of the document analysis.
    '''
    poller = document_analysis_client.begin_analyze_document(
        "prebuilt-layout", 
        document=azure_file_path_url,
    )
    result = poller.result()
    return True, result


- Create ADLS client object

In [0]:
def get_adls_service_client(adls_account_name, adls_file_system, azure_file_system_key, adls_directory):
    '''
    Connects to Azure Data Lake Storage (ADLS) and returns a file system client.

    This function establishes a connection to Azure Data Lake Storage using the provided account name and file system key, retrieves the file system client, and creates a directory if it does not already exist.

    Parameters:
        adls_account_name (str): The name of the Azure Data Lake Storage account.
        adls_file_system (str): The name of the file system within the ADLS account.
        azure_file_system_key (str): The key used for authenticating with the ADLS account.
        adls_directory (str): The name of the directory to create within the file system.

    Returns:
        DataLakeFileSystemClient or None: An instance of `DataLakeFileSystemClient` if the connection is successful and the directory is created; otherwise, `None` if an exception occurs.
    '''
    try:
        # Connect to Azure Data Lake Storage
        service_client = DataLakeServiceClient(account_url=f"https://{adls_account_name}.dfs.core.windows.net",
                                                credential=azure_file_system_key)
        # Get file system client
        adls_file_system_client = service_client.get_file_system_client(adls_file_system)

        # Create directory if it does not exist
        adls_file_system_client.create_directory(adls_directory)

        return adls_file_system_client
    except Exception as e:
        print(f"An error occurred while connecting to Azure Data Lake Storage: {e}")
        return None

    
def save_to_adls(adls_file_system_client, file_name, json_str):
    '''
    Uploads a string of JSON data to Azure Data Lake Storage.

    This function uploads the provided JSON string to a specified file within the ADLS file system. If the file already exists, it will be overwritten.

    Parameters:
        adls_file_system_client (DataLakeFileSystemClient): An instance of the `DataLakeFileSystemClient` used for interacting with the Azure Data Lake Storage.
        file_name (str): The name of the file where the JSON data should be uploaded.
        json_str (str): The JSON string to be uploaded to the file.

    Returns:
        bool: `True` if the upload is successful; otherwise, `False` if an exception occurs or the file system client is not provided.
    '''
    try:
        if adls_file_system_client:
            # Get file client
            file_client = adls_file_system_client.get_file_client(file_name)
            file_client.upload_data(json_str, overwrite=True)
            return True
        else:
            return False
    except Exception as e:
        print(f"An error occurred while uploading file to ADLS: {e}")
        return False


- Read Credentials from Config

In [0]:
# get credentials from config
utility_obj = Utility()
AFR_Config_dict = utility_obj.AFR_Config
Naming_dict = utility_obj.naming_dict

spark = SparkSession.builder.appName('delta tables').getOrCreate()

AZURE_FORM_REC_KEY = AFR_Config_dict["AZURE_FORM_REC_KEY"]
AZURE_FORM_REC_ENDPOINT = AFR_Config_dict["AZURE_FORM_REC_ENDPOINT"]
ADLS_ACCOUNT_NAME = AFR_Config_dict["ADLS_ACCOUNT_NAME"]
ADLS_FILE_SYSTEM = AFR_Config_dict["ADLS_FILE_SYSTEM"]
ADLS_DIRECTORY = AFR_Config_dict["ADLS_DIRECTORY"]
JSON_RESULT_DIRECTORY = AFR_Config_dict["JSON_RESULT_DIRECTORY"]
ADLS_FILE_SYSTEM_KEY = AFR_Config_dict["ADLS_FILE_SYSTEM_KEY"]
api_version_intelligence = AFR_Config_dict["AFR_API_VERSION_Intelligence_document"]
api_version_general = AFR_Config_dict["AFR_API_VERSION_analyse_document"]
ingest_table_name = Naming_dict["ingest_audit_dimension"]["table_name"]
proc_table_name = Naming_dict["proc_audit_dimension"]["table_name"]
ingest_table_seq_no = Naming_dict["ingest_audit_dimension"]["columns"]["IA_SEQ_NO"]
ingest_table_version = Naming_dict["ingest_audit_dimension"]["columns"]["IA_VERSION"]
ingest_table_dataset_id = Naming_dict["ingest_audit_dimension"]["columns"]["DATASET_ID"]
ingest_table_report_test_type = Naming_dict["ingest_audit_dimension"]["columns"]["REPORT_TEST_TYPE"]
ingest_table_study_id = Naming_dict["ingest_audit_dimension"]["columns"]["STUDY_ID"]
ingest_table_adls_proc_container_path = Naming_dict["ingest_audit_dimension"]["columns"]["ADLS_PROC_CONTAINER_PATH"]
ingest_table_filename = Naming_dict["ingest_audit_dimension"]["columns"]["FILE_NAME"]
proc_table_seq_no = Naming_dict["proc_audit_dimension"]["columns"]["PA_SEQ_NO"]
proc_table_version = Naming_dict["proc_audit_dimension"]["columns"]["PA_VERSION"]
proc_table_dataset_id  = Naming_dict["proc_audit_dimension"]["columns"]["DATASET_ID"]
proc_table_afr_status = Naming_dict["proc_audit_dimension"]["columns"]["AFR_STATUS"]
proc_table_afr_raw_json_path = Naming_dict["proc_audit_dimension"]["columns"]["AFR_RAW_JSON_PATH"]
proc_table_afr_datetime = Naming_dict["proc_audit_dimension"]["columns"]["AFR_DATETIME"]
proc_table_pa_modified_by = Naming_dict["proc_audit_dimension"]["columns"]["PA_MODIFIED_BY"]
proc_table_pa_modified_date = Naming_dict["proc_audit_dimension"]["columns"]["PA_MODIFIED_DATE"]
print(api_version_general, api_version_intelligence)

In [0]:
def read_ingest_audit_dim_df():
    '''
    Reads and filters data from an ingestion audit table.

    This function executes a SQL query on a Spark DataFrame to retrieve records from an ingestion audit table where:
    - The file name ends with `.pdf` or `.PDF`.
    - The report test type is either 'TAC' or 'Consumer Science'.
    - The status code is 'PROC_COMPLETE'.

    The query selects all columns from the table that match these criteria.

    Returns:
        pyspark.sql.dataframe.DataFrame: A Spark DataFrame containing the filtered records from the ingestion audit table.
    '''
    spark_ingest_df = spark.sql(f"""
        SELECT * 
        FROM {ingest_table_name}
        WHERE 
            (FILE_NAME LIKE '%.pdf' OR FILE_NAME LIKE '%.PDF') 
            AND 
            (
                REPORT_TEST_TYPE = 'TAC' 
                OR
                REPORT_TEST_TYPE = 'Consumer Science' 
            ) 
            AND
            STATUS_CODE = 'PROC_COMPLETE'
    """)
    return spark_ingest_df

# Example usage
# df = read_ingest_audit_dim_df()
# df.show() 


In [0]:
def update_proc_success_audit_table(seq_no_column1, seq_value1, updates):
    '''
    Updates specified columns in the process success audit table.

    This function constructs and executes an SQL `UPDATE` statement to modify records in the process success audit table.
    The columns to be updated and their new values are specified by the `updates` dictionary.

    Parameters:
        seq_no_column1 (str): The name of the column used to identify the record to be updated.
        seq_value1 (str or int): The value in the `seq_no_column1` column that identifies which record to update.
        updates (dict): A dictionary where the keys are column names and the values are the new values to set for those columns.
        
    Returns:
        None: This function does not return a value.

    Raises:
        Exception: If an error occurs while executing the SQL query.
    '''
    try:
        # Construct the SET part of the SQL query
        set_clause = ", ".join([f"{column} = {value}" for column, value in updates.items()])
        
        # Construct and execute the SQL query
        spark.sql(f"""UPDATE {proc_table_name} SET 
                      {set_clause}
                      WHERE {seq_no_column1} = {seq_value1}
                      """)
        print(f"Successfully updated the columns with values {updates}")
    except Exception as e:
        print(f"An error occurred while updating the audit table: {e}")


In [0]:
def check_proc_audit_table_afr_status(seq_no):
    '''
    Checks the AFR status of a record in the process audit table.

    This function queries the process audit table to determine the AFR (Application For Review) status of a record
    identified by the given sequence number (`seq_no`). It then prints the status and returns a boolean indicating
    whether the AFR status is "Success".

    Parameters:
        seq_no (int or str): The sequence number of the record to check in the process audit table.

    Returns:
        bool: 
            - `True` if the AFR status of the record is 'Success'.
            - `False` if the AFR status is not 'Success' or if the record does not exist.

    Notes:
        - This function assumes that `proc_table_name` and `proc_table_afr_status` are predefined variables in the
          environment where the function is executed.
        - The function prints a message to the console indicating whether the AFR status is 'Success' or 'New'.
    '''
    # Read the table with a filter applied
    df = spark.table(proc_table_name).filter(f"PA_SEQ_NO = {seq_no}")
    # Collect the filtered results
    existing_record = df.select(proc_table_afr_status).collect()
    # Check if there is any record and process it
    if existing_record:
        afr_status = existing_record[0][proc_table_afr_status]
        if afr_status == 'Success':
            print(f"For seq no {seq_no}, AFR status is Already Success!")
            return True
        else:
            print(f"For seq no {seq_no}, AFR status is New!")
            return False
    return False


In [0]:
def insert_proc_audit_table(new_row):
    '''
    Inserts a new record into the process audit table if a record with the same sequence number does not already exist.

    This function attempts to insert a new row into the process audit table. It first checks if a record with the
    same sequence number (`PA_SEQ_NO`) already exists in the table. If no such record is found, it inserts the new
    row. If a record with the same sequence number already exists, it skips the insertion.

    Parameters:
        new_row (list or DataFrame): A list of tuples or a DataFrame containing the new record to be inserted. 
                                      The record should match the schema of the existing process audit table.

    Returns:
        bool: `True` if the new record is inserted or if a record with the same sequence number already exists.
              Returns `False` only if there was an issue inserting the record (not handled in this implementation).

    Notes:
        - `proc_table_name` should be defined in the environment where this function is executed.
        - The function prints a message indicating whether the record was successfully inserted or if a duplicate was found.
        - If using a DataFrame, ensure `new_row` has the correct schema as expected by the process audit table.
    '''
    df = spark.table(proc_table_name)
    new_df = spark.createDataFrame(new_row, schema=df.schema)
    
    # Extract seq_no from the new row
    seq_no = new_row[0][0]
    
    # Check if a record with the same seq_no already exists
    existing_record = df.filter(df.PA_SEQ_NO == seq_no).collect()
    
    if not existing_record:
        # If no such record exists, insert the new row
        new_df.createOrReplaceTempView('new_row')
        spark.sql(f"""INSERT INTO {proc_table_name} SELECT * FROM new_row""")
        print(f"Inserted new record for seq_no {seq_no}")
        return True
    else:
        print(f"Record with seq_no {seq_no} already exists. No new record inserted.")
        return True


In [0]:
def get_seq_no_version_and_study_type(ingest_df, path):
    '''
    Retrieves sequence number, file version number, dataset ID, and study type for a given file path from the ingest DataFrame.

    This function filters the ingest DataFrame based on the provided file path and extracts the sequence number, 
    file version number, dataset ID, and study type associated with that path.

    Parameters:
        ingest_df (DataFrame): The DataFrame containing ingestion data with columns for sequence number, file version, 
                               dataset ID, and study type.
        path (str): The file path used to filter the DataFrame. Only rows with a matching `full_process_layer_path` 
                    will be considered.

    Returns:
        tuple: A tuple containing four elements:
            - `seq_no` (int): The sequence number associated with the file path, or `None` if no matching row is found.
            - `file_version_no` (int): The file version number associated with the file path, or `None` if no matching row is found.
            - `dataset_id` (int): The dataset ID associated with the file path, or `None` if no matching row is found.
            - `study_type` (str): The study type associated with the file path, or `None` if no matching row is found.
    '''
    new_df = ingest_df[ingest_df["full_process_layer_path"] == path]
    if not new_df.empty:
        seq_no = new_df[ingest_table_seq_no].iloc[0]
        file_version_no = new_df[ingest_table_version].iloc[0]
        dataset_id = new_df[ingest_table_dataset_id].iloc[0]
        study_type = new_df[ingest_table_report_test_type].iloc[0]
        return int(seq_no), int(file_version_no), int(dataset_id), study_type
    else:
        return None, None, None, None


In [0]:
# get documnet intelligence client object
document_intelligence_client = create_document_intelligence_client(AZURE_FORM_REC_KEY, AZURE_FORM_REC_ENDPOINT, api_version_intelligence)

# get documnet analysis client object
document_analysis_client = create_document_analysis_client(AZURE_FORM_REC_KEY, AZURE_FORM_REC_ENDPOINT, api_version_general)

In [0]:
def check_study_type(study_type, pdf_data):
    '''
    Analyzes a PDF document based on the study type and returns the analysis result as a JSON string.

    Depending on the study type specified, this function utilizes different methods to analyze the PDF document 
    and returns the status of the analysis along with the result in JSON format.

    Parameters:
        study_type (str): The type of study that determines the analysis method. Expected values are "TAC" or 
                          "Consumer Science".
        pdf_data (str): The path or URL to the PDF document to be analyzed.

    Returns:
        tuple: A tuple containing two elements:
            - `afr_status` (bool): A boolean indicating the success of the analysis.
            - `json_string` (str or None): The analysis result in JSON format if successful, otherwise `None`.
    '''
    if study_type == "TAC":
        afr_status, result_json = analyze_document(document_analysis_client, pdf_data)
        json_string = json.dumps(result_json.to_dict(), indent=4)
        return afr_status, json_string

    elif study_type == "Consumer Science":
        afr_status, result_json = analyze_document(document_analysis_client, pdf_data)
        json_string = json.dumps(result_json.to_dict(), indent=4)
        return afr_status, json_string
    else:
        return False, None


In [0]:
from datetime import datetime


# get ADLS client object
adls_file_system_client = get_adls_service_client(ADLS_ACCOUNT_NAME, 
                                                  ADLS_FILE_SYSTEM, 
                                                  ADLS_FILE_SYSTEM_KEY, 
                                                  ADLS_DIRECTORY)

# read ingest table file  
ingest_df = read_ingest_audit_dim_df().toPandas()

if not ingest_df.empty:
    ingest_df["full_process_layer_path"] = ingest_df[ingest_table_adls_proc_container_path] + "/" + ingest_df[ingest_table_filename]
    ingest_file_paths = ingest_df["full_process_layer_path"].tolist()

    print("Total Files to be ingest : ", len(ingest_file_paths))
else:
    print("Ingest Table is empty!")

# print(df.shape)                                           
# get all list of files from direcory
file_paths = adls_file_system_client.get_paths(path=ADLS_DIRECTORY)

# iterate each list of files
if len(ingest_file_paths) > 0:
    for file_path in ingest_file_paths:
        if file_path.lower().endswith(".pdf"):
            print("##########################")
            print(file_path)

            try:
                # fetch SEQ NO from Audit 
                seq_no, file_version_no, dataset_id, study_type  = get_seq_no_version_and_study_type(ingest_df, file_path)
                afr_success_status = check_proc_audit_table_afr_status(seq_no=seq_no)
                if afr_success_status:
                    continue
                # run rest of the logic where AFR status is not Success
                # Read each file from ADLS
                file_client = adls_file_system_client.get_file_client(file_path)
                download = file_client.download_file()
                pdf_data = download.readall()
                
                # Pass to AFR & get result
                afr_status, json_string = check_study_type(study_type, pdf_data)
                print(afr_status, "afr_status")
                
                # Dump AFR result into JSON file
                if afr_status and json_string:
                    json_file_path = file_path.split("/")[-1].replace(".pdf", ".pdf.json").replace(".PDF", ".PDF.json")
                    
                    json_file_path = str(dataset_id)+"_"+str(file_version_no)+"_"+str(json_file_path) 
                    
                    json_file_path = os.path.join(JSON_RESULT_DIRECTORY, json_file_path)
                    save_status = save_to_adls(adls_file_system_client, json_file_path, json_string) 
                    
                    print(f"JSON File Saved ---> {json_file_path}\n")
                    
                    # Update Audit table
                    if seq_no and save_status:
                        new_row = [[seq_no,file_version_no,dataset_id,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,None,modified_by,modified_by,datetime.now(),datetime.now()]]
                        new_row_status = insert_proc_audit_table(new_row)
                        if new_row_status:
                            json_file_path = json_file_path.replace("'", "%27")
                            json_file_path = f"""'{json_file_path}'"""
                            now = datetime.now()#.strftime("%d-%m-%Y-%H-%M-%S")
                            formatted_datetime = f"'{now}'"
                            updates = {
                                proc_table_afr_status : "'Success'",
                                proc_table_afr_raw_json_path : json_file_path,
                                proc_table_afr_datetime: formatted_datetime,
                                proc_table_pa_modified_by: modified_by,
                                proc_table_pa_modified_date: formatted_datetime
                            }
                            update_proc_success_audit_table(proc_table_seq_no, seq_no, updates)
                            
                else:
                    now = datetime.now()#.strftime("%d-%m-%Y-%H-%M-%S")
                    formatted_datetime = f"'{now}'"
                    updates = {
                                proc_table_afr_status: "'Failed'",
                                proc_table_afr_datetime: formatted_datetime,
                                proc_table_pa_modified_by: modified_by,
                                proc_table_pa_modified_date: formatted_datetime
                            }
                    update_proc_success_audit_table(proc_table_seq_no, seq_no,  updates)

            except Exception as e:
                print(f"An error occurred while calling AFR parser and storing into JSON: {e}")
                if seq_no and file_version_no:
                    now = datetime.now()#.strftime("%d-%m-%Y-%H-%M-%S")
                    formatted_datetime = f"'{now}'"
                    updates = {
                                proc_table_afr_status: "'Failed'",
                                proc_table_afr_datetime: formatted_datetime,
                                proc_table_pa_modified_by: modified_by,
                                proc_table_pa_modified_date: formatted_datetime
                    }
                    update_proc_success_audit_table(proc_table_seq_no, seq_no, updates)
                continue

    print("AFR Parsing is Done!")
    

In [0]:
dbutils.notebook.exit("AFR Parsing is Done!")