### Importing the Relevant Libraries

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, DateType, TimestampType


In [None]:
import re
import pandas as pd
import numpy as np
import PyPDF2
from PyPDF2 import PdfReader
from pdfminer.high_level import extract_text as fallback_text_extraction
import warnings
import os
from datetime import datetime, timedelta
import calendar
import shutil



current_day = datetime.today().date()
#current_day = datetime(2023, 12, 21)#: For debugging files for different dates

### Getting the Folder Path from the FolderPath widget

In [None]:
folder_path = dbutils.widgets.get("FolderPath")

#### A function that tells us when other functions are called and when the functions are done running

In [None]:
def function_name_decorator(func):
    def wrapper(*args, **kwargs):
        print("\n")
        print(f"FUNCTION CALLED: {func.__name__}")
        result = func(*args, **kwargs)
        print(f"END OF FUNCTION CALL: {func.__name__}")
        return result

    return wrapper

#### A function for converting the filepath gotten from the widget to one that can be read using pandas library

In [None]:
@function_name_decorator
def _convert_dbfs_file_path(file_path):
    print("Converting dbfs file path to a readable file path")
    if file_path.startswith("dbfs:"):
        proper_file_path = file_path.replace("dbfs:", "/dbfs")
        print(f"Filepath created: {proper_file_path}")
        return proper_file_path
    
    else:
        print("Not a dbfs filepath")



#### Functions used for reading in the PDF files

In [None]:
@function_name_decorator
def _read_pdf(pdf_name):
    print(f"Reading the pdf: {pdf_name}")
    list_of_pages = []
    try:
        reader = PdfReader(pdf_name)
        for page in reader.pages:
            list_of_pages.append(page.extract_text())
            
    except Exception as exc:
        print("An error occurred. The exception is given below:")
        print(exc)
        list_of_pages.append(fallback_text_extraction(pdf_name))
        
    except FileNotFoundError as fnfe:
        print(f"FILE NOT FOUND!\n{fnfe}")
        
    
    return list_of_pages



def _extract_useful_text(list_of_pages, useful_text, *reports_of_interest):
    print(f"Extracting the only the useful pages from the selected pdf file.\nThe useful pages contain: {reports_of_interest}")
    useful_text_dict = {}
    for report_id in reports_of_interest:
        useful_text_dict[report_id] = []
        for text in list_of_pages:
            if report_id in text:
                useful_text_dict[report_id].append(text)
        
    for report_id in useful_text_dict.keys():
        useful_text.append("\n".join(useful_text_dict[report_id]))
    
    num_required_reports = len(reports_of_interest)
    num_extracted_pages = len(useful_text)
    print(f"{num_extracted_pages} pages have been extracted: ")
    
    if num_extracted_pages > num_required_reports:
        warnings.warn("\n\nIncomplete extraction! 1 or more extra reporting ids extracted from pdf\n")
        print(f"{num_extracted_pages} pages were extracted instead of {num_required_reports}\n")
        
    if num_extracted_pages < num_required_reports:
        warnings.warn("\n\nIncomplete extraction! 1 or more reporting ids missing from pdf\n")
        print(f"{num_extracted_pages} pages were extracted instead of {num_required_reports}\n")
                
            

            
@function_name_decorator
def _validate_extracted_text(useful_text, *reports_of_interest):
    print(f"Checking if all the extracted pages required were gotten. That is: {reports_of_interest}")
    check_list = list(reports_of_interest).copy()
    
    for report_id in reports_of_interest:
        for text in useful_text:
            if report_id in text:
                check_list.remove(report_id)
                   
    if len(check_list) == 0:
        print("All required report_ids successfully extracted")
        return True
    
    else:
        warnings.warn(f"\n\nThe following report_id(s) was/were not found\n: {check_list}\n")
        return False

    

#### Functions for creating the dataframes used for the rest of the transformation

In [None]:
@function_name_decorator
def _column_list_maker(number_of_columns):
    print("This function creates a list of column names such as 'col1', 'col2', and so on from the the number of columns")
    list_of_columns = []
    
    for col_number in range(number_of_columns):
        list_of_columns.append(f"col{col_number}")
    
    return list_of_columns


@function_name_decorator
def _optimal_df_col_names(rows):
    print("This function determines the uptimal number of columns required to generate the dataframe.")
    
    max_col_number = 7
    optimal_col_names = []
    while max_col_number < np.inf:
        column_names = _column_list_maker(max_col_number)
        
        try:
            df = pd.DataFrame(rows, columns = column_names)
            
            optimal_col_names = column_names.copy()
            
            print(f"{max_col_number} columns is the optimal number of columns! Creating the DataFrame...")
            
            break
            
        except (AssertionError, ValueError) as e:
            print(f"{max_col_number} columns is not the optimal number of columns! Trying {max_col_number + 1} columns next!")
            max_col_number += 1
        
        if max_col_number > 20:
            print(f"There are no reasonable number of columns!")
            break
            
        
    
    return optimal_col_names




@function_name_decorator
def _dframe_maker(text_list, *reports_of_interest):
    print(f"This function creates the Dataframes for each report of interest: {reports_of_interest}")
    df_dict = {}
    for report_id in reports_of_interest:
        df_dict[report_id] = ""
    for text in text_list:
        # Create a DataFrame using the provided column names
        #column_names = ['col0', 'col1', 'col2', 'col3', 'col4', 'col5', 'col6', 'col7', 'col8', 'col9', 'col10']

        # Preprocess the data to create a list of lists representing rows
        rows = []
        for line in text.split('\n'):
            if line.strip():
                row = line.split()
                rows.append(row)
        print(f"The number of rows is {len(rows)}")
    
        # Convert the list of lists to a DataFrame
        column_names = _optimal_df_col_names(rows)
        df = pd.DataFrame(rows, columns=column_names)
        
        for report_id in reports_of_interest:
            if report_id in text:
                print(f"report_id = {report_id}")
                df_dict[report_id] = df
                
        
        #print(df)
        
    return df_dict


#### Function for extracting the current month from the folderpath from the widget

In [None]:

months_list = ["january", "february", "march", "april", "june", "july", "august", "september", "october", "november", "december"]

#folder_path = dbutils.widgets.get("FolderPath")

@function_name_decorator
def extract_month_from_folderpath(folder_path, months_list):
    print("This function is used to get the month value from the folder path otherwise, it uses the current month")
    current_month = ""
    if os.path.basename(folder_path).lower() in months_list:
        current_month = os.path.basename(folder_path)

    else:
        warnings.warn("Month value not on file name(s). Using current month as Default")
        today = datetime.today()
        month_num = today.month
        current_month = calendar.month_name[month_num]

    return current_month


#### Function for getting the folderpaths for each day within the main FolderPath provided by the widget

In [None]:
@function_name_decorator
def get_folder_paths(folder_path):
    print("This function pools all the folder paths for all the days within the month folder")
    try:
        day_folder_paths = [folder.path for folder in dbutils.fs.ls(folder_path)]

    except  (dbutils.dbutils.FileNotFoundError, dbutils.Exception.AttributeError) as e:
        print(f"File not found because the folder is empty or another error occurred:\nPython error: {e}")
        dbutils.notebook.exit(f"File not found: {e}")
        day_folder_paths = []

    return day_folder_paths


#### Function for getting the exact file paths within the folderpaths for each day

In [None]:
@function_name_decorator
def get_report_pdf_filepaths(folder_path, day_folder_paths, current_month):
    print("This function is used to get all the report pdf files that are required for a particular day.")
    file_dict = {}
    days_list = []
    report_pdfs = []
    
    for num in range(len(day_folder_paths)):
        days_list.append(day_folder_paths[num].partition(current_month)[-1].split("/")[1])

    for day in days_list:
        file_dict[day] = folder_path + "/" + day

    report_folders = {}
    for key in file_dict.keys():
        for file in dbutils.fs.ls(file_dict[key]):
            #if "chipper" not in file.name.lower() or "business" not in file.name.lower() or "paga" not in file.name.lower() or "virtual" not in file.name.lower():
                
            report_folders[key] = file.path

    for path in report_folders.values():
        path_list = []
        path_list.extend(dbutils.fs.ls(path))

        path_list_copy = path_list.copy()

        for path in path_list_copy:
            report_pdfs.append(path.path)

    pdf_file_paths = [pdf_file for pdf_file in report_pdfs if pdf_file.endswith('.pdf') or pdf_file.endswith('.pdf')]
    pdf_file_paths.sort(key=lambda pdf_file: dbutils.fs.ls(pdf_file)[-1].modificationTime, reverse=True)

    return pdf_file_paths



#### Function for deriving the dates of the files we are interested in

In [None]:
@function_name_decorator
def get_desired_dates():
    print("This function gets all the desired dates based on what day it is. Every other weekday picks the date before that day")
    print("But on Tuesday, dates for Saturday, Sunday, and Monday are selected")
    print("For Monday, Friday's date is selected")
    desired_dates = []
    weekday = current_day.weekday()
    
    if weekday == 1:
        for days_ago in range(1, 4):
            desired_dates.append((current_day - timedelta(days=days_ago)).strftime('%d-%m-%Y')) #From Tuesday, pick Sat,Sun,Mon
    else:
        desired_dates.append((current_day - timedelta(days=1)).strftime('%d-%m-%Y'))  # Pick the day before
        
    print(f"The files to be transformed have the following desired date(s):")
    for num, date in enumerate(desired_dates):
        print(f"File {num}: {date}")
        
    return desired_dates


#### Function for extracting the desired files

In [None]:
@function_name_decorator
def get_desired_files(pdf_file_paths, desired_dates, desired_date_formats):
    print("This function is for selecting the desired files based on the desired dates")
    
    latest_file_paths = {}

    for file_path in pdf_file_paths:
        file_name = os.path.basename(file_path)
        #print(file_name)
        #date_match = re.search(r'(\d{1,2})[-_]?([A-Za-z]{3,})[-_]?(\d{4})', file_name)
        date_match = re.search(r'(20\d{2})(0[1-9]|1[0-2])(0[1-9]|[1-2][0-9]|3[01])[-_]?', file_name)
        if date_match:
            day = date_match.group(3)
            month = date_match.group(2)
            year = date_match.group(1)
            date_string = f"{day}-{month}-{year}"
            print(date_string)
        else:
            print(f"No date found in file name: {file_name}")
            continue
        
        latest_file_paths[date_string] = []
        for date_format in desired_date_formats:
            try:
                file_date = datetime.strptime(date_string, date_format).strftime('%d-%m-%Y')
                if file_date in desired_dates: #or file_date == current_day.strftime('%d-%m-%Y'):  # Also include files with the current date
                    latest_file_paths[date_string].append(file_path)
                    print(f"File found for the desired date: {file_date}")
                    print(file_path)

                else:
                    latest_file_paths.pop(date_string)
                    
            except ValueError:
                continue
    print("The files we want are given below:")
    for date in latest_file_paths.keys():
        print(f"{date}: {latest_file_paths[date]}")
        
    return latest_file_paths


#### Function for getting the country name from the filepath

In [None]:
@function_name_decorator
def _get_country_name(file_path):
    print("This function gets the country name from the file path")
    country_name = file_path.split("AFRICAN COUNTRIES/")[1].split("/")[0].strip()
    return country_name


In [None]:
@function_name_decorator
def initialize_vss_dicts():
    print("This function initializes the dictionaries for storing extracted target values. The dictionaries are for VSS-110, VSS-120, VSS-210")
    
    vss110_dict = {
    
    "NET_SETTLEMENT":[],
    "TOTAL_REIMBURSEMENT": [],
    "TOTAL_CHARGE": [],
    "DATE": []
    
    }

    vss120_dict = {
    
    "CLEARING_AMOUNT": [],      
    }

    vss210_dict = {
    "OPTIONAL_FEE": []
    }

    return vss110_dict, vss120_dict, vss210_dict


In [None]:
@function_name_decorator
def _create_dataframes_from_pdf(pdf_files):
    print("This function is used to create the dataframes for each of VSS-110, VSS-120, VSS-210")
    no_files = False

    df_dict = {}
    if pdf_files:
        #temp_dir = tempfile.mkdtemp()

        useful_text = []
        for file_path in pdf_files:
            proper_file_path = _convert_dbfs_file_path(file_path)

            list_of_pages = _read_pdf(proper_file_path)

            _extract_useful_text(list_of_pages, useful_text, "VSS-110", "VSS-120", "VSS-210")

            _validate_extracted_text(useful_text, "VSS-110", "VSS-120", "VSS-210")

            df_dict = _dframe_maker(useful_text, "VSS-110", "VSS-120", "VSS-210")

    else:
        no_files = True
        df_dict["VSS-110"] = ""
        df_dict["VSS-120"] = ""
        df_dict["VSS-210"] = ""

    return df_dict


In [None]:
@function_name_decorator
def proc_date(dframe, date_string):
    
    df = dframe.copy()
    df_columns = list(df.columns)
    mask_sum = []
    col_labels = []
    idx_list = []
    for col in dframe.columns:
        proc_mask = df[col].fillna("-").str.lower().str.startswith("proc")
        date_mask = df[col].fillna("-").str.lower().str.startswith("date")
        
        try:
            if sum(proc_mask) > 0:
                idx_list.append(df[proc_mask].index.values[0])
                mask_sum.append(sum(proc_mask))
                col_labels.append(col)

            if sum(date_mask) > 0:
                mask_sum.append(sum(date_mask))
                col_labels.append(col)
                break

        except (TypeError, IndexError) as e:
            print(e)
            continue
        
    try:
        col_label0 = df_columns.index(col_labels[0])
    except IndexError as e:
        print(e)
        print(f"Using the date value of the file name: {date_string}")
        return date_string
    try:    
        col_label1 = df_columns.index(col_labels[1])
    except IndexError as e:
        print(e)
        print(f"Using the date value of the file name: {date_string}")
        return date_string
            
    if col_label0 + 1 == col_label1:
        #date_label_index = df_columns.index(col_index[1])
        date_col = df_columns[col_label1 + 1]
    else:
        print("After the else")
        print(f"Using the date value of the file name: {date_string}")
        return date_string
    
    return df.at[idx_list[0], date_col]


In [None]:
@function_name_decorator
def _convert_db_cr(number):
    print("This function is used to convert Debits with 'db' attached to them to Negative numbers, and Credits to Positive numbers")
    if not isinstance (number, str):
        return number

    if "db" in number.lower():
        new_num = "-" + number.lower().replace("db", "")
        return new_num
        
    elif "cr" in number.lower():
        new_num = number.lower().replace("cr", "")
        return new_num
        
    else:
        return number

#proc_date(df)

@function_name_decorator
def _convert_string_numbers(num):
    print("This function is used to convert string numbers into float numbers")
    num = str(num)
    num = num.replace(",", "")
    
    try:
        num = float(num)
        
    except ValueError as e:
        print(f"The string value you are trying to convert doesn't contain any numbers\nPython Error:{e.with_traceback()}")
    
    else:
        return num
    
    return num



@function_name_decorator
def _convert_number_strings(number):
    print("This function is used to convert numbers back to strings")
    number_str = str(number)
    
    partitioned_num = number_str.partition(".")
    print(partitioned_num)
    if len(partitioned_num[0])<= 3:
        if len(partitioned_num[-1]) == 2:
            return number_str      
        elif len(partitioned_num[-1]) == 1:
            return number_str + "0"     
        
        elif len(partitioned_num[-1]) > 2:
            result = str((lambda x: round(float("0." + x), 2))(partitioned_num[-1]))[-2:]
            new_result = partitioned_num[0] + "."+result
            #print(new_result)
            if "." in new_result.partition(".")[-1]:
                #print("this became true")
                new_result = new_result.partition(".")[0] + new_result.partition(".")[-1]
                return new_result
            
            else: 
                return new_result
        
        else:
            partitioned_num[0] + "." + "00"
            
    length = len(partitioned_num[0])

    if length <= 3:
        return number_str  # No need for commas

    formatted = []
    for i in range(length):
        if i > 0 and (length - i) % 3 == 0:
            formatted.append(',')
        formatted.append(number_str[i])
    
    if len(partitioned_num[-1]) == 2:
        result = ''.join(formatted) + "." + partitioned_num[-1]
        
    elif len(partitioned_num[-1]) == 1:
        result = ''.join(formatted) + "." + partitioned_num[-1] + "0"
        
    elif len(partitioned_num[-1]) > 2:
        result = ''.join(formatted) + "." + str((lambda x: round(float("0." + x), 2))(partitioned_num[-1]))[-2:]
        
    else:
        result = ''.join(formatted) + "." + partitioned_num[-1] + "00"
        
    return result


In [None]:
@function_name_decorator
def net_settlement(df):
    print("This function is used for extracting the Net Settlement from the dataframe generated.")
    net_filter = df["col0"].str.lower() == "net"
    settlement_filter = df["col1"].str.lower() == "settlement"
    amount_filter = df["col2"].str.lower() == "amount"
    
    try:
        result = df.loc[(net_filter) & (settlement_filter) & (amount_filter), "col5"]
        raw_result = 0
    
        
        raw_result = result.values[0]

    except (IndexError, Exception) as e:
        print(f"Null values/empty space in total_reimbursement column.\nAlso, it could be truncated and/or incomplete pdf file.\nPython error:{e}")
    
    net_settlement = _convert_db_cr(raw_result)
    
    return net_settlement


In [None]:
@function_name_decorator
def _reimbursement_calculator(df, reimbursement_type):
    print("This function checks if the reimbursement value is the correct value via calculation")
    if reimbursement_type.lower() == "credit":
        value_col = "col3"
    elif reimbursement_type.lower() == "debit":
        value_col = "col4"
    
    raw_result = 0
    try:
        total_filter = df["col0"].str.lower() == "total"
        reimbursement_filter = df["col1"].str.lower() == "reimbursement"
        fees_filter = df["col2"].str.lower() == "fees"

        result = df.loc[(total_filter) & (reimbursement_filter) &(fees_filter), value_col]
    #print(result)
    
        raw_result = result.values[0]
    except IndexError as e:
        print(f"Please ensure you use VSS-110 pdf file if not you get index error. \nAlso, it could be truncated and/or incomplete pdf file")
        print(e)

    except KeyError as e:
        print(f"There is most likely no information in the dataframe.\nPython error:{e}")
    
    raw_result = _convert_db_cr(raw_result)
    raw_result = _convert_string_numbers(raw_result)
    
    try:
        raw_result = float(raw_result)

    except (ValueError, Exception) as e:
        print(e)
        return raw_result

    return raw_result
    

#reimbursement_calculator(df, "credit")


@function_name_decorator
def _charges_calculator(df, charges_type):
    print("This function calculates the charges")
    if charges_type.lower() == "credit":
        value_col = "col3"
    elif charges_type.lower() == "debit":
        value_col = "col4"
    
    try:
        total_filter = df["col0"].str.lower() == "total"
        
        visa_filter = df["col1"].str.lower() == "visa"
        
        charges_filter = df["col2"].str.lower() == "charges"
        
        result = df.loc[(total_filter) & (visa_filter) &(charges_filter), value_col]

    except (IndexError, Exception) as e:
        print(f"Total and/or visa and/or charges missing from pdf file.\nPdf file might be truncated/incomplete.\nPython error: {e}")
    #print(result)
    raw_result = 0

    try:
        raw_result = result.values[0]
    except IndexError as e:
        print(f"Please ensure you use VSS-110 pdf file if not you get index error. \nAlso, it could be truncated and/or incomplete pdf file")
        print(e)
    
    raw_result = _convert_db_cr(raw_result)
    raw_result = _convert_string_numbers(raw_result)
    
    try:
        raw_result = float(raw_result)

    except (ValueError, Exception) as e:
        print(e)
        return raw_result

    return raw_result
    



@function_name_decorator    
def _totals(df):
    print("This function adds up the total reimbursement and charges")
    #print(f"adding up {total_visa_charge_credit(df)} and {total_reimbursement_credit(df)}")
    total_reimbursement = _reimbursement_calculator(df, "credit") +  _charges_calculator(df, "credit")
    total_charge = _reimbursement_calculator(df, "debit") + _charges_calculator(df, "debit")
    
    return total_reimbursement, total_charge
    


#totals(df_dict["VSS-110"])
#vss110_dict

In [None]:
@function_name_decorator
def _total_issuer_interchange(df):
    print("This function helps to extract the total issuer interchange.")
    try:
    
        total_filter = df["col0"].str.lower() == "total"
        
        issuer_filter = df["col1"].str.lower() == "issuer"
        
        interchange_filter = df["col2"].str.lower() == "interchange"
        
        result = df.loc[(total_filter) & (issuer_filter) & (interchange_filter), "col4"]

    except (IndexError, Exception) as e:
        print(f"Total and/or issuer and/or interchange missing from pdf file.\nPdf file might be truncated/incomplete\n.Python error:{e}")
    #print(result)

    except (KeyError) as e:
        print(f"There is most likely no file. Python error is: {e}")
    
    try:
        if result.isnull().values[0]:
            warnings.warn("Null values/empty space in total_reimbursement column.")
            return 0
    except IndexError as e:
        print(f"Please ensure you use VSS-120 pdf file if not you get index error")
        print(e)
        return 0

    raw_result = result.values[0]
    
    total_issuer_interchange_val = _convert_db_cr(raw_result)
    #raw_result = convert_string_numbers(raw_result)


    #total_issuer_interchange_val = convert_db_cr(raw_result)
    
    
    return total_issuer_interchange_val

#total_issuer_interchange(df_dict["VSS-120"])

In [None]:
def _total_currency_conversion(df):
    print("This function helps to calculate the total currency conversion")
    try:
        total_filter = df["col0"].str.lower() == "total"
        currency_filter = df["col1"].str.lower() == "currency"
        conversion_filter = df["col2"].str.lower() == "conversion"
        fees_filter = df["col3"].str.lower() == "fees"

    except (KeyError, Exception) as e:
        print(e)

    try:
        title_row = df.loc[(total_filter) & (currency_filter) & (conversion_filter), :].index[0]
        value_row = title_row + 1

    except (IndexError, Exception) as e:
        print(f"Total and/or currency and/or conversion are not present in the pdf file\nPdf File might be truncated/incomplete.\nPython error:{e}")
    
    try:
        result = df.iloc[value_row, :].dropna().to_list()[-1]
    
    except (IndexError, Exception) as e:
        print(e)
        result = 0
    
    total_currency_conversion = _convert_db_cr(result)
    
    return total_currency_conversion


In [None]:
@function_name_decorator
def _select_parameters_vss110(df_dict, vss110_dict, date_string):
    print("This function uses the other functions extract Total reimbursement, Total charge, Net settlement, and Date")
    df = df_dict["VSS-110"]
    #display(df)
    if isinstance(df, str):
        return vss110_dict
    
    
    total_reimbursement, total_charge = _totals(df)
    total_reimbursement = round(total_reimbursement, 2) 
    total_reimbursement = _convert_number_strings(total_reimbursement)
    
    total_charge = round(total_charge, 2)
    total_charge = _convert_number_strings(total_charge)
    
    vss110_dict["TOTAL_REIMBURSEMENT"].append(total_reimbursement)
    vss110_dict["TOTAL_CHARGE"].append(total_charge)
    
    
    vss110_dict["NET_SETTLEMENT"].append(net_settlement(df))
    
    vss110_dict["DATE"].append(proc_date(df, date_string))
    
    
    return vss110_dict

In [None]:
def _select_parameters_vss120(df_dict, vss120_dict):
    print("This function is used to extract the Clearing Amount")
    df = df_dict["VSS-120"]
    #display(df)
    if isinstance(df, str):
        return vss120_dict
    
    total_issuer_interchange_val = _total_issuer_interchange(df)
    
    vss120_dict["CLEARING_AMOUNT"].append(_total_issuer_interchange(df))

    return vss120_dict

In [None]:
def _select_parameters_vss210(df_dict, vss210_dict):
    df = df_dict["VSS-210"]
    #display(df)
    if isinstance(df, str):
        return vss210_dict
    
    total_currency_conversion_val = _total_currency_conversion(df)
    vss210_dict["OPTIONAL_FEE"].append(_total_currency_conversion(df))

    return vss210_dict

#### Function for doing extra column additions like Date and Date Modified 

In [None]:
@function_name_decorator
def _perform_final_df_configuration(dframe, date_string):
    print("This function performs additional data cleaning and adds Transaction type and Date modified to the dataframe")
    
    final_df = dframe.copy()
    country_name = _get_country_name(folder_path)
    print(country_name)
    final_df["TRANSACTION_TYPE"] = [country_name] * len(final_df)

    #CurrentDate = dbutils.widgets.get("CurrentDate")
    CurrentDate = datetime.now()
    final_df['Date_Modified'] = [CurrentDate] * len(final_df)

    final_df = final_df[['NET_SETTLEMENT', 'TOTAL_REIMBURSEMENT', 'TOTAL_CHARGE','CLEARING_AMOUNT', 'OPTIONAL_FEE', 'TRANSACTION_TYPE', 'DATE', 'Date_Modified']]

    final_df["DATE"] = pd.to_datetime(final_df["DATE"])
    final_df['Date_Modified'] = pd.to_datetime(final_df['Date_Modified'], format='%m/%d/%Y %I:%M:%S %p')

    final_df.loc[:, ['NET_SETTLEMENT', 'TOTAL_REIMBURSEMENT', 'TOTAL_CHARGE','CLEARING_AMOUNT', 'OPTIONAL_FEE']] = final_df[['NET_SETTLEMENT', 'TOTAL_REIMBURSEMENT', 'TOTAL_CHARGE','CLEARING_AMOUNT', 'OPTIONAL_FEE']].applymap(_convert_string_numbers)

    print(f"The final structure of the dataframe for the following date {date_string} is given below:")
    return final_df


In [None]:

def perform_transformations(latest_file_paths, desired_dates):
    if len(latest_file_paths) == 0:
        warnings.warn(f"File(s) not found for desired dates: {desired_dates}.\nNotebook closed")
        dbutils.notebook.exit(f"File(s) not found for desired dates: {desired_dates}.\nNotebook closed")

    else:
        print("Performing the transformations for the files that came on the following days:")
        for date_string in latest_file_paths.keys():
            print(date_string)
            
        combined_dataframes_dict = {}
        for date_string in latest_file_paths.keys():
            print(f"File transformation for file on date: {date_string}")
            combined_dataframes_dict[date_string] = []
            pdf_files = latest_file_paths[date_string]
            
            df_dict = _create_dataframes_from_pdf(pdf_files)
            vss110_dict, vss120_dict, vss210_dict = initialize_vss_dicts()
            print(vss120_dict)
            vss110_dict = _select_parameters_vss110(df_dict, vss110_dict, date_string)

            vss120_dict = _select_parameters_vss120(df_dict, vss120_dict)

            vss210_dict = _select_parameters_vss210(df_dict, vss210_dict)
            
            vss110_df = pd.DataFrame(vss110_dict)
            vss120_df = pd.DataFrame(vss120_dict)
            print(vss120_dict)
            vss210_df = pd.DataFrame(vss210_dict)
            
            final_df = pd.concat([vss110_df, vss120_df, vss210_df], axis = 1)
            print(final_df)
            
            final_df = _perform_final_df_configuration(final_df, date_string)
            
            combined_dataframes_dict[date_string].append(final_df)
            print("\n\n\n")
        return combined_dataframes_dict



In [None]:
def _define_schema():
    schema = StructType([
        StructField("NET_SETTLEMENT", FloatType(), True),
        StructField("TOTAL_REIMBURSEMENT", FloatType(), True),
        StructField("TOTAL_CHARGE", FloatType(), True),
        StructField("CLEARING_AMOUNT", FloatType(), True),
        StructField("OPTIONAL_FEE", FloatType(), True),
        StructField("TRANSACTION_TYPE", StringType(), True),
        StructField("DATE", DateType(), True),
        StructField("Date_Modified", TimestampType(), True)  # <-- corrected line
    ])
    
    return schema

In [None]:

def create_spark_dataframe(combined_final_df):
    schema = _define_schema()
    database_table_dict = {}
    for date in combined_final_df.keys():
        print(f"\nThe final Dataframe for date: {date} is:")
        final_df = combined_final_df[date][0]
        final_result = spark.createDataFrame(final_df, schema=schema)
        database_table_dict[date] = final_result
        display(final_result)
    return final_result, database_table_dict

In [None]:
def _define_database_parameters():
    jdbcHostname = dbutils.secrets.get(scope = "azurekv-rpa1scope", key = "Hostname")
    jdbcPort = dbutils.secrets.get(scope = "azurekv-rpa1scope", key = "port")
    jdbcDatabase = dbutils.secrets.get(scope = "azurekv-rpa1scope", key = "database")
    jdbcUsername = dbutils.secrets.get(scope = "azurekv-rpa1scope", key = "username")
    jdbcPassword = dbutils.secrets.get(scope = "azurekv-rpa1scope", key = "jdbcpwd")
    table ="Prepaid_Affliate_Report"
    jdbcDriver = dbutils.secrets.get(scope = "azurekv-rpa1scope", key = "jdbcdriver")
    jdbcUrl = f"jdbc:sqlserver://{jdbcHostname}:{jdbcPort};database = {jdbcDatabase}"
    
    return jdbcHostname, jdbcPort, jdbcDatabase, jdbcUsername, jdbcPassword, table, jdbcDriver, jdbcUrl