In [1]:
import os
import pandas as pd
import xlrd
import datetime as dt
import yaml
import sys

In [2]:
with open('catalog.yaml', 'r', encoding = 'UTF-8') as file:
    catalog = yaml.safe_load(file)


In [3]:
class PivotTableParser():
    def __init__(self, catalog):
        self.catalog = catalog
        path = self.catalog['file_parameters']['path']
        filename = self.catalog['file_parameters']['filename']
        file_format = self.catalog['file_parameters']['format']
        self.file = filename + file_format
        self.worksheet = self.catalog['file_parameters']['worksheet']
        self.fullpath = os.path.join(path,filename)
        
    def get_pivot_table_data(self, pivot_table):
        '''
        The only function that is not set up in a linux environment. Opens the xls file with win32com
        and uses the selected pivot table cache to create a new sheet with a pivot table that has the
        granularity needed. The resulting file will be saved with the name of the chosen pivot table.
        '''
        
        
        print('# Creating intermediary .xls file')
        
        # Locating file
        import win32com.client
        xlApp = win32com.client.Dispatch('Excel.Application')
        wb = xlApp.Workbooks.Open(self.fullpath + '.xls')
        
        
        # Retrieving PivotCache
        pivot_table = wb.Worksheets(self.worksheet).PivotTables(pivot_table)
        pivot_cache = pivot_table.PivotCache()
        
        # Creating new pivot table
        new_ws = wb.Worksheets.Add()
        new_pivot_table = new_ws.PivotTables().Add(PivotCache=pivot_cache, TableDestination=new_ws.Range('A1'))
        
        xls_parameters = self.catalog['xls_parameters']
        rows = self.catalog['file_parameters']['rows']
        columns = self.catalog['file_parameters']['columns']

        for r in rows:
            new_pivot_table.PivotFields(r).Orientation = xls_parameters['row']
            new_pivot_table.PivotFields(r).Position = rows.index(r) + 1
            new_pivot_table.PivotFields(r).Subtotals = [False]*12

        for c in columns:
            new_pivot_table.PivotFields(c).Orientation = xls_parameters['values']
            new_pivot_table.PivotFields(c).Subtotals = [False]*12

        # Save the workbook
        wb.SaveAs(f'{pivot_table}.xls')
        wb.Close(SaveChanges=False)
        
        xlApp.Quit()
        
    def pivot_table_parsing_xls(self, pivot_table):
        '''
        Opens the intermediary file and extracts the pivot table as a pandas dataframe
        using xlrd. Also does some preliminary treating, replacing missing volume 
        values with 0 and doing a ffill on the other columns.
        '''
        
        print('# Extracting data from intermediate file')
        # Open workbook
        path = self.catalog['file_parameters']['out_path']
        filename = pivot_table
        fullpath = path + filename + '.xls'
        wb = xlrd.open_workbook(fullpath)
        sheet = wb.sheet_by_index(0)
        
        # Needed variables
        data = [] # Will become DataFrame
        column_names = [sheet.cell_value(0, col_num) for col_num in range(sheet.ncols)]
        data_cache = {} # Stores cache so the empty values can be forward-filled 
        
        # Loop through rows to populate 'data'
        for row_num in range(1, sheet.nrows):  # Start from 1 to skip the header row
            row_data = {}
            for col_num in range(sheet.ncols):
                if sheet.cell_type(row_num, col_num) != 0:
                    data_cache[column_names[col_num]] = sheet.cell_value(row_num, col_num)
                elif col_num == sheet.ncols-1:
                    data_cache[column_names[col_num]] = 0
                cell_value = data_cache[column_names[col_num]]
                row_data[column_names[col_num]] = cell_value
            data.append(row_data)
            
        df = pd.DataFrame(data)
        
        return df
    
    def pivot_table_parsing_xlsx(self, pivot_table):
        '''
        Opens the intermediary file and extracts the pivot table as a pandas dataframe
        using pandas. Also does some preliminary treating, replacing missing volume 
        values with 0 and doing a ffill on the other columns.
        '''
        
        print('# Extracting data from intermediate file')
        path = self.catalog['file_parameters']['out_path']
        filename = pivot_table
        fullpath = path + filename + '.xls'
        df = pd.read_excel(pivot_table + '.xls')
        df['Total'] = df['Total'].fillna(0)
        df = df.ffill()
        return df

    def data_treating(self, df):
        '''
        Does all the modifications needed on the dataframe so it fits the
        desired schema. The resulting dataframe is already the final output.
        '''
        df.columns = df.columns.str.lower().str.replace('í','i')
        
        # Rename columns
        columns_rename = self.catalog['maps']['columns']
        df = df.rename(columns = columns_rename)

        # Year column
        df.year = df.year.astype('str')
        df = df.loc[~(df.year.str.startswith('Total Soma'))].copy()
        df.year = df.year.str[:4]
        df.year = df.year.astype('int')

        # Month column
        df['month'] = df.dados.str[-3:]

        # Creating a map for converting months into numbers
        # This works because excel pivot tables sort months in an ascending pattern
        month_map = {}
        for i in range (len(df.month.unique())):
            month_map[df.month.unique()[i]] = i+1
        df.month = df.month.replace(month_map)

        df['year_month'] = df.apply(lambda row: dt.datetime(row.year,row.month,1), axis = 1)
        # UF column
        uf_map = self.catalog['maps']['uf']
        df.uf = df.uf.replace(uf_map)

        # Timestamp
        df['created_at'] = dt.datetime.now()


        return df[['year_month','uf','product','unit','volume','created_at']]
    
    def consistency_check(self, pivot_table_df, pivot_table):
        '''
        Runs a consistency check on the output table. Opens the original file with xlrd,
        gets the cell values of the desired pivot table and converts into a DataFrame that can
        be merged with our output table and checked if the values are the same.
        
        Returns a boolean that triggers the creation of the output file
        '''
        
        print (f'# Running consistency check on table {pivot_table}')
        
        wb = xlrd.open_workbook(parser.fullpath + '.xls')
        sheet = wb.sheet_by_index(0)
        
        # Needed variables
        skip = 9 # Determines the distance from the pivot table trigger to the actual data
        pivot_table_found = False
        pivot_table_end = False
        table = []
        pivot_trigger = self.catalog['file_parameters']['pivot_table_reference'][pivot_table]
        
        print('# Looking for the desired pivot table')
        # Iterating through the xls file to find the range of the table
        for row_num in range(sheet.nrows):
            
            # Boolean that gives the data
            if pivot_table_found and skip > 0:
                skip -= 1
            for col_num in range(1,sheet.ncols):
                if sheet.cell_value(row_num,col_num) == pivot_trigger and not pivot_table_found:
                    print('# Table found!')
                    pivot_table_found = True
                    n_row = row_num + skip
                elif skip == 0:
                    if sheet.cell_value(row_num, col_num) == '':
                        n_col = col_num - 1
                        pivot_table_end = True
                        break
            if pivot_table_end:
                break
        
        print('# Extracting table data')
        
        # Iterates again to get the table
        column_names = [sheet.cell_value(n_row, col) for col in range (1,n_col)]
        data = []
        
        for row_num in range(n_row+1, n_row+13):
            row_data = {}
            for col_num in range(1,n_col):
                cell_value = sheet.cell_value(row_num,col_num)
                row_data[column_names[col_num-1]] = cell_value
            data.append(row_data)
        df = pd.DataFrame(data)
        df_melt = pd.melt(df, id_vars = column_names[0], value_vars = column_names[1:]).fillna(0)
        df_melt.value = df_melt.value.replace('',0)

        # Subsequent treating to get a mergeable dataframe
        month_map = {}
        for i in range (len(df_melt.Dados.unique())):
            month_map[df_melt.Dados.unique()[i]] = i+1
        df_melt.Dados = df_melt.Dados.replace(month_map)
        df_melt.Dados = df_melt.Dados.apply(lambda x: '0'+str(x) if x < 10 else str(x))
        df_melt.variable = df_melt.variable.astype('int')
        df_melt.variable = df_melt.variable.apply(lambda x: str(x))
        df_melt['year_month'] = df_melt.variable + '_' + df_melt.Dados + '_01'
        df_melt.year_month = df_melt.year_month.apply(lambda x: dt.datetime.strptime(x, '%Y_%m_%d'))
        
        print('# Comparing total values for every year and month')
        # Merging dataframes
        grouped_df = pivot_table_df.groupby(['year_month']).agg({'volume' : 'sum'}).reset_index(level = ['year_month'])
        grouped_df = grouped_df.merge(df_melt[['year_month','value']], on = 'year_month')
        grouped_df['check'] = grouped_df.apply(lambda row: round(row.volume - row.value,2) == 0.00, axis = 1)
        
        if grouped_df.check.all():
            print('# Values are consistent\n')
        else:
            print('# Not all values are consistent. Please check the following months:')
            print(grouped_df.loc[grouped_df.check == False])
            print('# System will exit')
            sys.exit()
        return grouped_df.check.all()
    
    def pipeline_out(self):
        '''
        Pipeline that does all steps of the extraction after the creation of the intermediate file.
        '''
        
        output_path = self.catalog['output_file_parameters']['path']
        pivot_tables = self.catalog['file_parameters']['pivot_tables']
        for pivot_table in pivot_tables:
            try:
                print('# Trying to extract data via .xls')
                df = self.pivot_table_parsing_xls(pivot_table)
                print('# Data extracted successfully\n')
            except:
                print('# Method failed. Doing extraction via .xlsx')
                df = self.pivot_table_parsing_xlsx(pivot_table)
                print('# Data extracted successfully\n')
            df = self.data_treating(df)
            check = self.consistency_check(df, pivot_table)
            if check:
                print('# Saving data...')
                output_filename = self.catalog['output_file_parameters']['file_name'][pivot_table]
                output_fullpath = output_path + output_filename + '.parquet'
                df.to_parquet(f'{output_fullpath}', index = False)
                print(f'# {pivot_table} saved as {output_filename}.parquet\n')


In [4]:
parser = PivotTableParser(catalog)

In [5]:
parser.pipeline_out()

# Trying to extract data via .xls
# Extracting data from intermediate file
# Data extracted successfully

# Running consistency check on table Tabela dinâmica1
# Looking for the desired pivot table
# Table found!
# Extracting table data
# Comparing total values for every year and month
# Values are consistent

# Saving data...
# Tabela dinâmica1 saved as oil_sales.parquet

# Trying to extract data via .xls
# Extracting data from intermediate file
# Data extracted successfully

# Running consistency check on table Tabela dinâmica3
# Looking for the desired pivot table
# Table found!
# Extracting table data
# Comparing total values for every year and month
# Values are consistent

# Saving data...
# Tabela dinâmica3 saved as diesel_sales.parquet

