This file will be used to get accounting entries from the bookkeeping program. Initially, the program will be used to retrieve entries from the most-used bookkeeping program in Denmark (+85% market share) - Visma E-conomic

In [19]:
import os
import asyncio
import aiohttp
import pandas as pd
from datetime import datetime
from dotenv import load_dotenv

In [20]:
# This starts by loading the environment variables from the .env file.
# The program asserts that the environment variables are set and not placeholders.

load_dotenv()

app_secret_token = os.getenv('X_AppSecretToken')
agreement_grant_token = os.getenv('X_AgreementGrantToken')

assert app_secret_token is not None, "X_AppSecretToken is missing. Please add it to your .env file."
assert app_secret_token != "ADD-APPSECRETTOKEN-HERE", "X_AppSecretToken is a placeholder. Please add a valid token."
assert agreement_grant_token is not None, "X_AgreementGrantToken is missing. Please add it to your .env file."
assert agreement_grant_token != "ADD-AGREEMENTGRANTTOKEN-HERE", "X_AgreementGrantToken is a placeholder. Please add a valid token."

# This sets the ULR and headers for the API request.
# The current implementation is to define the URL and headers as global variables (to simplify).
base_url = "https://restapi.e-conomic.com"
headers = {
    'X-AppSecretToken': app_secret_token,
    'X-AgreementGrantToken': agreement_grant_token,
    'Content-Type': "application/json"
}

In [21]:
class EconomicAPI:
    def __init__(self, base_url, headers, concurrency=100):
        self.base_url = base_url
        self.headers = headers
        self.concurrency = concurrency
        self.semaphore = asyncio.Semaphore(self.concurrency)
    
    # To retrieve this data, we need address different endpoints in the API.
    # For each accounting entry, we need to supply the API with an account number and accounting year.
    # Hence, or API will need to be able to retrieve a list of accounts and accounting years.
    # Then we can use this data to retrieve the accounting entries.
    # I'm abstracting it away in the class to simplify usage while deconstructing the API to simplify dev.

    # First, I'll define a function to fetch data from the API.
    # This function will be used to fetch data from different endpoints.
    async def fetch_page(self, session, url, params=None):
        async with session.get(url, headers=self.headers, params=params) as response:
            response.raise_for_status() # This raises an exception for bad status codes (4xx, 5xx).
            return await response.json()

    # The API uses 0-based indexing for pages.
    # So I start at page 0 and increment until there are no more pages.
    async def fetch_pages(self, session, url):
        all_pages = []
        current_page_id = 1
        params = {'pageSize': 1000} # Defaults to 1,000, the maximum number of items per page.
        while True:
            params['skipPages'] = current_page_id - 1
            data = await self.fetch_page(session, url, params)
            collection = data.get('collection', [])
            if not collection:
                break
            all_pages.extend(collection)
            current_page_id += 1
        return all_pages

    # This function retrieves the chart of accounts.
    # The pagination and fetching logic is abstracted away in the fetch_pages function.
    async def get_accounts(self, session):
        url = f"{self.base_url}/accounts"
        accounts_collection = await self.fetch_pages(session, url)
        print(f"Found {len(accounts_collection)} accounts.")
        return accounts_collection

    # This function retrieves the accounting years.
    async def get_accounting_years(self, session):
        url = f"{self.base_url}/accounting-years"
        years_collection = await self.fetch_pages(session, url)
        years = [y['year'] for y in years_collection]
        print(f"Found {len(years)} accounting years: {years}")
        return years

    # This function retrieves the accounting entries for a specific account and year.
    async def get_entries(self, session, account_number, accounting_year):
        url = f"{self.base_url}/accounts/{account_number}/accounting-years/{accounting_year}/entries"
        entries_collection = await self.fetch_pages(session, url)
        entries = []
        for entry in entries_collection:
            entries.append({
                'account_number': str(account_number),
                'amount_in_base_currency': entry.get('amountInBaseCurrency', 'N/A'),
                'date': entry.get('date', 'N/A'),
                'entry_number': str(entry.get('entryNumber', 'N/A')),
                'text': str(entry.get('text', 'N/A'))
            })
        return entries

    # I've created this wrapper which limits the number of concurrent requests and handles exceptions.
    # E-conomic does not have a rate limit, but urge fair use of the API.
    async def bounded_get_entries(self, session, account_number, accounting_year):
        async with self.semaphore:
            try:
                return await self.get_entries(session, account_number, accounting_year)
            except Exception as e:
                print(f"Error fetching entries for account {account_number}, year {accounting_year}: {e}")
                return None

    # This is the final function that fetches all entries for all accounts and years.
    # I'll go through it step by step
    async def fetch_all_entries(self):
        async with aiohttp.ClientSession() as session: # This opens a connection to the server and keeps it open for the duration of the session.
            accounts_collection = await self.get_accounts(session) # This fetches the accounts.
            accounting_years = await self.get_accounting_years(session) # This fetches the accounting years.
            tasks = []
            for account in accounts_collection:
                for year in accounting_years: # This loops through the accounts and years to fetch the entries.
                    tasks.append(
                        asyncio.create_task(self.bounded_get_entries(session, account['accountNumber'], year))
                    ) # This creates a task for each account and year, thus establishing a sort of queue to fetch multiple entries at the same time.
            all_entries = await asyncio.gather(*tasks, return_exceptions=True)
            flat_entries = []
            for result in all_entries:
                if isinstance(result, Exception):
                    print(f"Task resulted in exception: {result}")
                elif result is not None:
                    flat_entries.extend(result) # This takes a list of lists and turns it into a single list.
            print(f"Total entries fetched: {len(flat_entries)}") # Manual debugging statement.
            return flat_entries

# Users can initialize instances of the API with different headers (the base_url, in this instance will likely not change).
# They can use a single function to fetch all entries for all accounts and years.
api = EconomicAPI(base_url, headers)
entries = await api.fetch_all_entries()

Found 308 accounts.
Found 9 accounting years: ['2018', '2019', '2020', '2021', '2022', '2023', '2024', '2025', '2026']
Total entries fetched: 75166


In [22]:
# I'll now write the data to an Excel file in the "data" directory.
# This is done as a sort of caching mechanism to avoid fetching the data again.
entries_df = pd.DataFrame(entries)
timestamp = datetime.now().strftime("%Y%m%d")
file_path = f"data/accounting_entries_{timestamp}.xlsx"
entries_df.to_excel(file_path, index=False)

In [25]:
# This is a very simple flux analysis that prints the difference and percentage change for the specified month.
# You can imagine that they could add more types of analyses in the future.
class FinancialAnalysis:
    def __init__(self, file_path):
        self.file_path = file_path
        self.entries_df = pd.read_excel(self.file_path)
        self.entries_df['date'] = pd.to_datetime(self.entries_df['date'])

    def analyze_flux(self, month_t):
        entries_month_t = self.entries_df[self.entries_df['date'].dt.month == month_t]
        entries_month_t_minus_1 = self.entries_df[self.entries_df['date'].dt.month == month_t - 1]
        entries_month_t_grouped = entries_month_t.groupby('account_number')['amount_in_base_currency'].sum()
        entries_month_t_minus_1_grouped = entries_month_t_minus_1.groupby('account_number')['amount_in_base_currency'].sum()
        merged_entries = pd.merge(
            entries_month_t_grouped, 
            entries_month_t_minus_1_grouped, 
            on='account_number', 
            how='outer', 
            suffixes=('_month_t', '_month_t_minus_1')
        )
        merged_entries.fillna(0, inplace=True)
        merged_entries['difference'] = merged_entries['amount_in_base_currency_month_t'] - merged_entries['amount_in_base_currency_month_t_minus_1']
        merged_entries['percentage_change'] = (
            merged_entries['difference'] / merged_entries['amount_in_base_currency_month_t_minus_1']
        ) * 100
        merged_entries.sort_values(by='percentage_change', ascending=False, inplace=True)
        
        timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
        file_path = f"data/flux_analysis_{timestamp}.xlsx"
        merged_entries.to_excel(file_path)
        print(f"Flux Analysis for month {month_t} has been saved to {file_path}.")

# Users can initialize instances of the analysis with different accounting entries files.
# They can use a single function to analyze the flux for a specific month.
analysis = FinancialAnalysis(file_path)
analysis.analyze_flux(6) # This analyzes the flux for the month of June (6)


Flux Analysis for month 6 has been saved to data/flux_analysis_20241215224904.xlsx.
