In [1]:
import pandas as pd
import numpy as np
import os

In [122]:
def parse_sebra_excel_file_to_pandas(excel_file_path):
    def find_organizations_start_row(df):
        org_start_phrases = [
            'По бюджетни организации',
            'ПЛАЩАНИЯ ПО ВТОРОСТЕПЕННИ СИСТЕМИ В СЕБРА '
        ]
        correct = False
        for t in org_start_phrases:
            mask = df.iloc[:, 0] == t
            if mask.sum() == 1:
                correct = True
                break
        assert correct, f'There should be exactly one row containing any of the phrases {org_start_phrases}. File "{excel_file_path}".'
        return mask[mask].index[0]
    
    def find_operations_blocks(df, org_start_row):
        op_start_text = 'Код'
        op_end_text = 'Общо:'
        mask_start = df.iloc[:, 0] == op_start_text
        mask_end = df.iloc[:, 0].str.startswith(op_end_text).fillna(False)
        op_start_index = mask_start[mask_start].index
        op_end_index = mask_end[mask_end].index
        
        if not (len(op_start_index) == len(op_end_index)):
            print(op_start_index, op_end_index)
            display(df)
            
        assert len(op_start_index) == len(op_end_index), f'The number of occurances of "{op_start_text}" is not the same as the number of occurances of "{op_end_text}". File "{excel_file_path}".'
        op_blocks = list(zip(op_start_index, op_end_index))
        op_blocks_dict = {
            'general': [],
            'organizations': []
        }
        for i in range(len(op_blocks)):
            b_start, b_end = op_blocks[i]
            assert b_start < b_end, f'Block start index ({b_start}) should be smaller that the end index ({b_end}). File "{excel_file_path}".'
            assert not df.iloc[b_start: b_end, 0].isna().any(), f'There are empty cells within the operations block between rows ({b_start}, {b_end}). File "{excel_file_path}".'
            if i > 0:
                _, prev_b_end = op_blocks[i - 1]
                assert prev_b_end < b_start, f'Block end index ({prev_b_end}) of the previous block should be smaller that the start index of the next block ({b_start}). File "{excel_file_path}".'
            assert not (b_start < org_start_row and org_start_row < b_end), 'The starting row for organizations shouldn\'t be within an operations block. File "{excel_file_path}".'

            if b_end < org_start_row:
                op_blocks_dict['general'].append((b_start, b_end))
            else:
                op_blocks_dict['organizations'].append((b_start, b_end))

            assert len(op_blocks_dict['general']) == 1, 'There should be exactly one block for general totals. File "{excel_file_path}".'

        return op_blocks_dict

    def get_general_totals(df, general_op_block):
        new_columns = ['Operations Code', 'Operations Description', 'Operations Count', 'Operations Amount (BGN)']
        b_start, b_end = general_op_block
        block_df = df.iloc[b_start + 1: b_end].copy()
        block_df.columns = new_columns
        block_df['Operations Count'] = block_df['Operations Count'].astype('int64')
        block_df['Operations Amount (BGN)'] = block_df['Operations Amount (BGN)'] \
            .astype(str) \
            .str.replace(' ', '') \
            .str.replace(',', '.') \
            .astype('float64')
        block_df = block_df.set_index('Operations Code')

        # check if sums are correct
        sum_row = df.iloc[b_end, :]
        sum_row.index = new_columns
        if isinstance(sum_row['Operations Amount (BGN)'], str):
            sum_row['Operations Amount (BGN)'] = float(
                sum_row['Operations Amount (BGN)'] \
                    .replace(' ', '') \
                    .replace(',', '.')
            )
        if not sum_row['Operations Count'] == block_df['Operations Count'].sum():
                print(f"Warning: The sums of \"Operations Count\" do not match for block ({b_start}, {b_end}). Expected value \"{sum_row['Operations Count']}\". Calculated value \"{block_df['Operations Count'].sum()}\". File \"{excel_file_path}\".")
        if not round(sum_row['Operations Amount (BGN)'], 2) == round(block_df['Operations Amount (BGN)'].sum(), 2):
                print(f"Warning: The sums of \"Operations Amount (BGN)\" do not match for block ({b_start}, {b_end}). Expected value \"{round(sum_row['Operations Amount (BGN)'], 2)}\". Calculated value \"{round(block_df['Operations Amount (BGN)'].sum(), 2)}\". File \"{excel_file_path}\".")

        return block_df

    def get_organization_operations_blocks(df, org_op_blocks):
        new_columns = ['Operations Code', 'Operations Description', 'Operations Count', 'Operations Amount (BGN)']
        org_op_blocks_dfs = []
        for b_start, b_end in org_op_blocks:
            org_df = df.iloc[b_start + 1: b_end, :].copy()
            org_df.columns = new_columns
            org_df['Operations Count'] = org_df['Operations Count'].astype('int64')
            org_df['Operations Amount (BGN)'] = org_df['Operations Amount (BGN)'] \
                .astype(str) \
                .str.replace(' ', '') \
                .str.replace(',', '.') \
                .astype('float64')

            # get organization name and time period
            block_header = df.iloc[b_start - 1]
            org_df['Organization Name'] = block_header[0]
            period = block_header[2].split(' ')
            assert period[0] == 'Период:', 'Field for time period should start with string "Период". File "{excel_file_path}".'
            org_df['Start Date'] = period[1]
            org_df['End Date'] = period[3]

            # check if sums are correct
            sum_row = df.iloc[b_end, :]
            sum_row.index = new_columns
            if isinstance(sum_row['Operations Amount (BGN)'], str):
                sum_row['Operations Amount (BGN)'] = float(
                    sum_row['Operations Amount (BGN)'] \
                        .replace(' ', '') \
                        .replace(',', '.')
                )
            if not sum_row['Operations Count'] == org_df['Operations Count'].sum():
                print(f"Warning: The sums of \"Operations Count\" do not match for block ({b_start}, {b_end}). Expected value \"{sum_row['Operations Count']}\". Calculated value \"{org_df['Operations Count'].sum()}\". File \"{excel_file_path}\".")
            if not round(sum_row['Operations Amount (BGN)'], 2) == round(org_df['Operations Amount (BGN)'].sum(), 2):
                print(f"Warning: The sums of \"Operations Amount (BGN)\" do not match for block ({b_start}, {b_end}). Expected value \"{round(sum_row['Operations Amount (BGN)'], 2)}\". Calculated value \"{round(org_df['Operations Amount (BGN)'].sum(), 2)}\". File \"{excel_file_path}\".")

            org_op_blocks_dfs.append(org_df)
        return pd.concat(org_op_blocks_dfs).reset_index().drop(columns=['index'])

    def check_sums(ops_df, general_block_df):
        ops_totals_df = ops_df.groupby('Operations Code').sum()
        sum_check_df = pd.merge(
            ops_totals_df,
            general_block_df,
            how='left',
            left_on='Operations Code',
            right_index=True
        )
        if not (sum_check_df['Operations Count_x'] == sum_check_df['Operations Count_y']).all():
            print(f'Warning: Some sums do not match for Operations Count. File "{excel_file_path}".')
        if not (sum_check_df['Operations Amount (BGN)_x'].round(2) == sum_check_df['Operations Amount (BGN)_y'].round(2)).all():
            print(f'Warning: Some sums do not match Operations Amount (BGN). File "{excel_file_path}".')
    
    # load Excel file to Pandas
    df = pd.read_excel(excel_file_path)
    
    # get the row that indicates the start of the organizations section
    org_start_row = find_organizations_start_row(df)
    
    # get the start and end rows for all blocks of operations
    op_blocks = find_operations_blocks(df, org_start_row)
    
    # parse the data containing the general totals for all operations in the file
    # (we will use those just to check the sums at the end for correctness)
    general_block_df = get_general_totals(df, op_blocks['general'][0])
    
    # parse all operations by organization in the file
    ops_df = get_organization_operations_blocks(df, op_blocks['organizations'])
    
    # check if the sums for all opearions by organization match the general totals
    check_sums(ops_df, general_block_df)

    # extract the Organization ID from the Name
    ops_df['Organization ID'] = ops_df['Organization Name'].str.findall(r'\((.*?)\)').map(
        lambda x: x[-1].strip() if len(x) > 0 else np.nan
    )
    
    return ops_df

In [123]:
def get_all_excel_files(data_dir):
    folders = [f for f in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, f))]
    excel_files = []
    for fold in folders:
        excel_files += [
            os.path.join(data_dir, fold, f) \
                for f in os.listdir(os.path.join(data_dir, fold)) \
                    if os.path.isfile(os.path.join(data_dir, fold, f)) \
                        and (f.endswith('.xlsx') or f.endswith('.xls'))
        ]
    return sorted(excel_files)

In [124]:
data_dir = './Data/Archive_10052021/MF_SEBRA'
excel_files = get_all_excel_files(data_dir)

In [132]:
dfs = []
for f in excel_files:
    temp_df = parse_sebra_excel_file_to_pandas(f)
    dfs.append(temp_df)
df = pd.concat(dfs)

Int64Index([1, 12, 19, 25, 32, 41], dtype='int64') Int64Index([7, 15, 21, 28, 37, 44, 46], dtype='int64')


Unnamed: 0,ОБЩО ПЛАЩАНИЯ ЗА ДЕНЯ НА МИНИСТЕРСТВО НА ФИНАНСИТЕ (в лева),Unnamed: 1,Период: 31.01.2013 - 31.01.2013,Unnamed: 3
0,,,,
1,Код,Описание,Брой,Сума
2,01 xxxx,"Заплати, възнаграждения и други плащания за пе...",66,140706
3,03 xxxx,Плащания за други удръжки от възнаграждения за...,1,242.19
4,10 xxxx,Издръжка,261,206575
5,50 xxxx,"Плащания за дълготрайни активи, основен ремонт...",1,2460
6,88 xxxx,Средства на разпореждане,3,2159.13
7,Общо:,,332,352142
8,,,,
9,ПЛАЩАНИЯ ПО ВТОРОСТЕПЕННИ СИСТЕМИ В СЕБРА,,,


AssertionError: The number of occurances of "Код" is not the same as the number of occurances of "Общо:". File "./Data/Archive_10052021/MF_SEBRA/2013/SEBRA_MF 31012013.xls".