In [1]:
from pypdf import PdfReader
from datetime import datetime
import pandas as pd
import os
import re
import math

In [2]:
TRANS_DF_TYPE = {'Expense Date': datetime.date,
                 'Transaction Desc': object,
                 'Amount': float}

FOLDER = "Data/Divyaman Statements/"

In [5]:
def extract_trans_data_from_page(page, df=None):

    AMOUNT_REGEX = r"[-]*\$[\d,]*.\d+"

    text = page.extract_text()
    text_list = text.splitlines()
    
    trans_start_index = None
    index = 0
    
    #Find the index from where the transaction details begin.
    while ((trans_start_index is None) and (index < len(text_list))):
            
        if text_list[index].strip() == 'Date Transaction details Amount Balance':

            trans_start_index = index + 1
            
        index += 1

    if trans_start_index is None:
        raise Exception('No transaction data on this page.')

    #Get the list of transactions. These start from index 'trans_start_index'.
    index = -1
    text_sub_list = text_list[trans_start_index:]

    look_for_amount = False
    
    #Loop through each transaction.
    for element in text_sub_list:

        #Look for transaction amount.
        if look_for_amount == True:

            match = re.search(AMOUNT_REGEX, element)

            if match is not None:
                amount_start_index = match.span()[0]
                amount_end_index = match.span()[1]
                trans_amt_text = element[amount_start_index:amount_end_index].strip()
                trans_amount = float(trans_amt_text.strip().replace('$', '').replace(',', ''))
                look_for_amount = False
                
                new_trans_row = {'Expense Date': date,
                                 'Transaction Desc': trans_desc,
                                 'Amount': trans_amount}
                #We are only interested in debit transactions.
                if trans_amount < 0:
                    df.loc[len(df)] = new_trans_row
                
            else:
                continue

        else:
        
            #Extract date - First 11 characters.
            date_str = element[0:11]
            
            try:
                date_obj = datetime.strptime(date_str, '%d %b %Y')
                date = date_obj.strftime("%d-%b-%Y")
    
                #If we get to this point (without an exception), we have successfully extracted the date.
                #Now, let's extract the remaining information.
                remaining_str = element[11:]

                #Does the remaining string contain amount information?
                match = re.search(AMOUNT_REGEX, remaining_str)

                if match is not None:
                    amount_start_index = match.span()[0]
                    amount_end_index = match.span()[1]
                    trans_desc = remaining_str[0:amount_start_index].strip()
                    trans_amt_text = remaining_str[amount_start_index:amount_end_index].strip()
                    trans_amount = float(trans_amt_text.strip().replace('$', '').replace(',', ''))
                    look_for_amount = False
                    
                    new_trans_row = {'Expense Date': date,
                                     'Transaction Desc': trans_desc,
                                     'Amount': trans_amount}

                    #We are only interested in debit transactions.
                    if trans_amount < 0:
                        df.loc[len(df)] = new_trans_row

                else:
                    trans_desc = remaining_str
                    look_for_amount = True
                
            except ValueError as err:
                continue

    return df

In [7]:
def extract_data_from_statement(file_path, df, verbose = False):

    reader = PdfReader(file_path)
    num_of_pages = len(reader.pages)
    
    prev_bal = None
    for page_index in range(num_of_pages):
        if verbose == True:
            print(f'Reading page index {page_index}')
        try:
            df = extract_trans_data_from_page(reader.pages[page_index], df)
        except Exception as e:
            if verbose == True:
                print(e)
        page_index += 1

    return df

In [9]:
def extract_data_from_folder(folder=FOLDER, verbose = False):

    df = pd.DataFrame(columns=TRANS_DF_TYPE.keys())
    
    file_list = os.listdir(folder)

    for file_name in file_list:
        if verbose == True:
            print(f'Reading file {file_name}')
            
        file_path = folder + file_name
        df = extract_data_from_statement(file_path, df)

        if verbose == True:
            print('\n')

    df['Expense Date'] = pd.to_datetime(df['Expense Date'], format='%d-%b-%Y')
    
    df = df.sort_values(by = ['Expense Date'], ascending = True)

    return df

In [17]:
def categorize_expenses(expenses_df):
    
    num_expenses = expenses_df.shape[0]
    expenses_df['Expense Category ID'] = ''
    expenses_df['Expense Sub Category ID'] = ''

    cat_df = pd.read_excel('Data/Expense Categorization.xlsx', sheet_name='Expense Category')
    
    sub_cat_df = pd.read_excel('Data/Expense Categorization.xlsx', sheet_name='Sub-category')
    sub_cat_df['Expense Sub Category ID'] = sub_cat_df['Expense Sub Category ID'].astype(str)
    print(sub_cat_df)
    
    cat_sub_cat_asgn_df = pd.read_excel('Data/Expense Categorization.xlsx', sheet_name='Category - Sub-category')
    cat_sub_cat_asgn_df['Expense Category ID'] = cat_sub_cat_asgn_df['Expense Category ID'].astype(str)
    cat_sub_cat_asgn_df['Expense Sub Category ID'] = cat_sub_cat_asgn_df['Expense Sub Category ID'].astype(str)
    
    sub_cat_keywords_df = pd.read_excel('Data/Expense Categorization.xlsx', sheet_name='Sub-category keywords')
    sub_cat_keywords_df['Expense Sub Category ID'] = sub_cat_keywords_df['Expense Sub Category ID'].astype(str)

    exp_cat_override_df = pd.read_excel('Data/Expense Categorization Override.xlsx', sheet_name='Expense Category Override')

    for index, expenses_row in expenses_df.iterrows():

        cat_override_row = None
        
        #Check if this expense has a categorization override.
        cat_override_df = exp_cat_override_df[(exp_cat_override_df['Expense Date'] ==  expenses_row['Expense Date']) & (exp_cat_override_df['Expense Date'] ==  expenses_row['Expense Date']) & (exp_cat_override_df['Amount'] ==  expenses_row['Amount'])]
        if cat_override_df.shape[0] > 0:
            cat_override_row =  cat_override_df.iloc[0]
            
        if cat_override_row is not None:
            expenses_df.at[index, 'Expense Category ID'] = cat_override_row['Expense Category ID']
            expenses_df.at[index, 'Expense Sub Category ID'] = cat_override_row['Expense Sub Category ID']

        else:
            
            trans_desc = expenses_row['Transaction Desc'].strip().lower()
    
            cat_id = ''
            sub_cat_id = ''
            
            #Check the keyword that exists in the transaction description.
            for _, sub_cat_keyword_row in sub_cat_keywords_df.iterrows():
                keyword = sub_cat_keyword_row['Keyword'].strip().lower()
                
                if (trans_desc.find(keyword) != -1):
                    
                    sub_cat_id = sub_cat_keyword_row['Expense Sub Category ID'].strip().lower()
                    expenses_df.at[index, 'Expense Sub Category ID'] = sub_cat_id
                    
                    cat_id = cat_sub_cat_asgn_df[cat_sub_cat_asgn_df['Expense Sub Category ID'] == sub_cat_id].iloc[0]['Expense Category ID']                    
                    expenses_df.at[index, 'Expense Category ID'] = cat_id
                    
                    break
    
            #If sub-category ID is not determined, assign the default sub-category ID.
            if len(sub_cat_id) == 0:
                default_sub_cat_id = cat_sub_cat_asgn_df[cat_sub_cat_asgn_df['Default'] == 'X'].iloc[0]['Expense Sub Category ID']
                expenses_df.at[index, 'Expense Sub Category ID'] = default_sub_cat_id
    
                cat_id = cat_sub_cat_asgn_df[cat_sub_cat_asgn_df['Expense Sub Category ID'] == default_sub_cat_id].iloc[0]['Expense Category ID']                    
                expenses_df.at[index, 'Expense Category ID'] = cat_id
        
    return expenses_df        

In [19]:
expenses_df = extract_data_from_folder(FOLDER)
expenses_df = categorize_expenses(expenses_df)

   Expense Sub Category ID                         Description
0                        1                               Coles
1                        2                      Tales of India
2                        3                              Mobile
3                        4             Electricity + Hot Water
4                        5                               Water
5                        6                                 Gas
6                        7                            Internet
7                        8                                Taxi
8                        9                    Public Transport
9                       10               Ordered Meal Expenses
10                      11                              Events
11                      12                            Coursera
12                      13                          Monash Uni
13                      14                           Linked-In
14                      15                         Trad