to-do: improve documentation (docstrings + comments)
  - create readme file
  - create a proper repo structure
  - add sample yaml configs
  - reorder imports

  **less important**

  - add unit tests
  - config validation
  - data input validation
  - improve preprocessing
  - possibly improve the keywords matching
  - follow-up visualization

In [None]:
CONFIG_FILE_NAME = 'config_raif.yaml'
KEYWORDS_FILE_NAME = 'keywords.yaml'
LOG_FILE_NAME = 'categorization_pipeline.log'

In [None]:
from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [None]:
!pip install openpyxl



In [None]:
from typing import Optional, Union, Any, Dict, List
import sys
from io import StringIO


import pandas as pd
import numpy as np
import openpyxl
import logging
import json
import yaml




In [None]:
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', None)
pd.set_option('display.expand_frame_repr', False)


In [None]:

def obtain_index_to_fix(data:pd.DataFrame,
                        config:dict[str, Any],
                        incl_prichozi:bool=False,
                        incl_missing:bool=False,
                        incl_conflicts:bool=False,
                        incl_missing_suggestions:bool=False,
                        ind_list:list[int]=None)->tuple[list[int], dict[str,str]]:
  """
    Heuristic function, which returns ids of records, which should be manually fixed. The options are (each by separate flag):
      - to include all incoming transactions,
      - to include rows with missing categorization,
      - to include those rows for which the category suggestion and original category misalign,
      - to include those rows which do not get any suggestion
      - manual list of ids
  """
  ind = []
  cat_col = config['output_format']['category_col']
  transaction_type = config['output_format']['transaction_type']
  date=config['output_format']['date']
  amount=config['output_format']['amount']

  if incl_prichozi:
    ind += data[data[cat_col].str.contains('příchozí', na=False)].index.tolist() # to-do: make more agnostic
  if incl_missing:
    ind += data[(data[cat_col].isna()) | (data[cat_col]=='')].index.tolist()
  if incl_conflicts:
    ind += data[data.apply(lambda row: str(row[cat_col]) != str(row['suggested_categories']), axis=1)].index.tolist()
  if incl_missing_suggestions:
    ind += data[(data[cat_col].isna()) | (data[cat_col]=='')].index.tolist()
  if ind_list:
    ind += ind_list

  ind = list(set(ind))
  non_matching_data = data.loc[ind].to_dict('index')
  fix_dict = { k:
      f"""{{ '{cat_col}': '{v[cat_col]}'/'{v['suggested_categories']}'}},
            #original: {v[cat_col]}, suggested: {v['suggested_categories']},
            proti: {v['protistrana']}, zprava: {v['zpráva']}, datum: {v[date]} castka: {v[amount]}
      """
      for k, v in non_matching_data.items()
  }
  logging.info(fix_dict)
  #for key, value in non_matching_data.items():
  #  print(f"{key}: {{ '{category_col}': '{value[category_col]}'/'{value['suggested_categories']}'}}, #orginal: {value[category_col]}, suggested: {value['suggested_categories']}, proti: {value['protistrana']}, zprava: {value['zpráva']}, datum: {value[date]} castka: {value[amount]} ")
  #print('}')
  return list(set(ind)), fix_dict

In [None]:
def load_config(file_path:str)->Union[Dict[str, Any], List[Any]]:
  """Loads and parses a YAML configuration file."""
  with open(file_path, 'r', encoding='utf-8') as file:
    return yaml.safe_load(file)

def read_csv(file_name: str, delimiter:str, encoding:Optional[str]=None)->pd.DataFrame:
  return pd.read_csv(f'{file_name}', delimiter=delimiter, encoding=encoding)

def read_write_json(file_name:str, mode:str, payload:dict[str, any]=None, encoding=None)->Optional[dict[str, Any]]:
  if encoding is None:
    encoding='utf-8'

  with open(file_name, mode, encoding=encoding) as f:
    if mode == 'w':
      json.dump(payload, f, ensure_ascii=False, indent=2)
      logging.info(f"Json saved to {file_name}")
    if mode == 'r':
      return json.load(f)
      logging.info(f"Loaded JSON from {file_name}. {fix_dict}")



In [None]:
# data preprocessing
def preprocess_data(df:pd.DataFrame, config:dict[str, Any])->pd.DataFrame:
  data_raw = df.copy(deep=True)
  amount = config['output_format']['amount']
  date = config['output_format']['date']
  transaction_type = config['output_format']['transaction_type']
  category_col = config['output_format']['category_col']
  dte = config['input_format']['dte']
  tran_type = config['input_format']['tran_type']
  am = config['input_format']['am']
  protistrana_ids = config['input_format']['protistrana_ids']
  message = config['input_format']['message']
  sort_by = config['output_format']['sort_by']

  data_raw[amount] = data_raw[am].str.replace(' ', '').str.replace(',', '.').astype(float)

  try:
    # Attempt to parse with the original format
    data_raw[dte] = pd.to_datetime(data_raw[dte], format='%d/%m/%Y %H:%M:%S', errors='raise')
  except ValueError:
    # If the original format fails, try parsing with a format without time
    try:
      data_raw[dte] = pd.to_datetime(data_raw[dte], format='%d/%m/%Y', errors='raise')
    except ValueError:
      # If both formats fail, try with the format '%d.%m.%Y'
      try:
        data_raw[dte] = pd.to_datetime(data_raw[dte], format='%d.%m.%Y', errors='raise')  # Changed format here
      except ValueError:
        # If all formats fail, print the value causing the issue for debugging
        logging.error(f"Problematic value in column '{dte}': {data_raw[d][data_raw[d].apply(lambda x: isinstance(x, str) and not pd.isnull(x))].iloc[0]}")
        raise  # Re-raise the error for visibility

  data_raw.rename(columns={tran_type: transaction_type}, inplace=True)
  data_raw['měsíc'] = data_raw[dte].dt.strftime('%Y%m')
  data_raw[date] = data_raw[dte].dt.strftime('%Y-%m-%d')
  if category_col not in data_raw.columns:
    data_raw[category_col]=None

  for group in [protistrana_ids, message]:
    for ind, item1 in enumerate(protistrana_ids):
      for ind2 in range (ind+1, len(protistrana_ids)):
        data_raw[protistrana_ids[ind2]] = data_raw.apply(lambda row: np.nan if str(row[protistrana_ids[ind2]]) in str(row[item1]) else row[protistrana_ids[ind2]], axis=1)
        data_raw[item1] = data_raw.apply(lambda row: np.nan if str(row[item1]) in str(protistrana_ids[ind2]) else row[item1], axis=1)

  for label, group in zip(['protistrana', 'zpráva'], [protistrana_ids, message]):
    data_raw[label] = data_raw.apply(lambda row: '__'.join([str(row[col]) for col in group if pd.notna(row[col])]), axis=1)

  return data_raw[[date, transaction_type, category_col, 'měsíc']+['protistrana', 'zpráva'] + [amount]]




In [None]:
# Load configs and data
stream_logs = set_up_logger(LOG_FILE_NAME)
# To make this runnable as a standalone cell, let's create dummy files for demonstration
account_config_filename = CONFIG_FILE_NAME
keywords_config_filename = KEYWORDS_FILE_NAME

config = load_config(account_config_filename)
keywords = load_config(keywords_config_filename)

logging.info("Account Configuration:")
logging.info(config)
logging.info("\nKeywords Configuration:")
logging.info(keywords)

df = read_csv(file_name=config['file_name_input'], **config['parsing'])
logging.info("Data loaded:")
logging.info(df)

2025-06-01 17:36:55 [INFO] Account Configuration:
2025-06-01 17:36:55 [INFO] {'file_name_input': 'Pohyby_1015274383_202506011923.csv', 'file_name_output': 'Raif_250101_250531.xlsx', 'fix_file_name': 'records_to_fix', 'parsing': {'delimiter': ';', 'encoding': 'Windows-1250'}, 'input_format': {'cols': ['Datum provedení', 'Datum zaúčtování', 'Číslo protiúčtu', 'Název protiúčtu', 'Typ transakce', 'Zpráva', 'Poznámka', 'Zaúčtovaná částka', 'Název obchodníka', 'Město', 'Vlastní poznámka'], 'dte': 'Datum provedení', 'protistrana_ids': ['Poznámka', 'Zpráva', 'Název obchodníka', 'Město', 'Název protiúčtu', 'Číslo protiúčtu'], 'message': ['Vlastní poznámka'], 'am': 'Zaúčtovaná částka', 'tran_type': 'Typ transakce'}, 'output_format': {'display_groups': ['kategorie plateb'], 'cat_cols': ['protistrana', 'zpráva'], 'amount': 'částka', 'date': 'datum', 'transaction_type': 'Typ transakce', 'category_col': 'Kategorie_plateb', 'sort_by': ['Kategorie_plateb', 'měsíc']}}
2025-06-01 17:36:55 [INFO] 
Keywor

In [None]:
data = preprocess_data(df, config)

In [None]:
data

Unnamed: 0,datum,Typ transakce,Kategorie_plateb,měsíc,protistrana,zpráva,částka
0,2025-05-29,Platba kartou,,202505,LEKARNA U LIDUSKY 01; PRAHA 4 - BRA; CZE__408359XXXXXX8367,,-488.2
1,2025-05-30,Odchozí okamžitá úhrada,,202505,2592685013/3030,,-20750.0
2,2025-05-30,Příchozí platba,,202505,Jan Krejčí__1026704422/5500,,20750.0
3,2025-05-29,Příchozí platba,,202505,Jan Krejčí__1026704422/5500,,1000.0
4,2025-05-29,Odchozí okamžitá úhrada,,202505,1723805018/3030,,-1000.0
5,2025-05-29,Příchozí platba,,202505,Jan Krejčí__1026704422/5500,,1000.0
6,2025-05-24,Platba kartou,,202505,S. C. WESTFIELD CHODOV; PRAHA CHODOV; CZE__ecipo.hu__408359XXXXXX8367,,-2724.3
7,2025-05-24,Platba kartou,,202505,Ansons; Prag; CZE__Praha 4 - Chodov__408359XXXXXX8367,,-4339.0
8,2025-05-24,Příchozí platba,,202505,Jan Krejčí__1026704422/5500,,7000.0
9,2025-05-17,Platba kartou,,202505,Revolut**3268*; Dublin; IRL__408359XXXXXX8367,,-4000.0


In [None]:
data['suggested_categories'] = data.apply(lambda row:
      generate_suggestions(row, config['output_format']['cat_cols'], keywords),
                                          axis=1)
non_matching_ind, fix_dict = obtain_index_to_fix(data=data,
                                                 config=config,
                                                 incl_prichozi=True,
                                                 incl_missing=True,
                                                 incl_conflicts=True,
                                                 incl_missing_suggestions=True)

2025-06-01 17:35:59 [INFO] {0: "{ 'Kategorie_plateb': 'None'/'['zdraví', 'Chrochro']'}, \n            #original: None, suggested: ['zdraví', 'Chrochro'], \n            proti: LEKARNA U LIDUSKY 01; PRAHA 4 - BRA; CZE__408359XXXXXX8367, zprava: , datum: 2025-05-29 castka: -488.2 \n      ", 1: "{ 'Kategorie_plateb': 'None'/'Nezapočítávat'}, \n            #original: None, suggested: Nezapočítávat, \n            proti: 2592685013/3030, zprava: , datum: 2025-05-30 castka: -20750.0 \n      ", 2: "{ 'Kategorie_plateb': 'None'/'Spoření'}, \n            #original: None, suggested: Spoření, \n            proti: Jan Krejčí__1026704422/5500, zprava: , datum: 2025-05-30 castka: 20750.0 \n      ", 3: "{ 'Kategorie_plateb': 'None'/'Spoření'}, \n            #original: None, suggested: Spoření, \n            proti: Jan Krejčí__1026704422/5500, zprava: , datum: 2025-05-29 castka: 1000.0 \n      ", 4: "{ 'Kategorie_plateb': 'None'/'Nezapočítávat'}, \n            #original: None, suggested: Nezapočítávat, 

In [None]:
read_write_json(file_name=config['fix_file_name'], mode='w', payload=fix_dict)

2025-06-01 17:39:53 [INFO] Json saved to records_to_fix


In [None]:
fix_dict = read_write_json(file_name=config['fix_file_name'], mode='r')

In [None]:
fix_dict

{'0': "{ 'Kategorie_plateb': 'None'/'['zdraví', 'Chrochro']'}, \n            #original: None, suggested: ['zdraví', 'Chrochro'], \n            proti: LEKARNA U LIDUSKY 01; PRAHA 4 - BRA; CZE__408359XXXXXX8367, zprava: , datum: 2025-05-29 castka: -488.2 \n      ",
 '1': "{ 'Kategorie_plateb': 'None'/'Nezapočítávat'}, \n            #original: None, suggested: Nezapočítávat, \n            proti: 2592685013/3030, zprava: , datum: 2025-05-30 castka: -20750.0 \n      ",
 '2': "{ 'Kategorie_plateb': 'None'/'Spoření'}, \n            #original: None, suggested: Spoření, \n            proti: Jan Krejčí__1026704422/5500, zprava: , datum: 2025-05-30 castka: 20750.0 \n      ",
 '3': "{ 'Kategorie_plateb': 'None'/'Spoření'}, \n            #original: None, suggested: Spoření, \n            proti: Jan Krejčí__1026704422/5500, zprava: , datum: 2025-05-29 castka: 1000.0 \n      ",
 '4': "{ 'Kategorie_plateb': 'None'/'Nezapočítávat'}, \n            #original: None, suggested: Nezapočítávat, \n           

changes - maxbanka - spoření, buffer taky spoření

VZP
REMESLNA PEKARNA
escape

upravit revolut pro Norsko (odečíst, co jsem tam dostal)


lístky do dovolené - Norsko, Orličky, chata?

In [None]:
# Update the DataFrame based on modifications in diff_dict
for idx, changes in fix_dict.items():
  data.at[idx, config['output_format']['category_col']] = changes['Kategorie_plateb']


TypeError: string indices must be integers, not 'str'

In [None]:
# verify results
data.sort_values(by=config['output_format']['sort_by'])


Unnamed: 0,datum,Typ transakce,Kategorie_plateb,měsíc,protistrana,zpráva,částka,suggested_categories
79,2025-01-28,Odchozí okamžitá úhrada,,202501,2592685013/3030,,-20810.0,Nezapočítávat
80,2025-01-28,Příchozí platba,,202501,Jan Krejčí__1026704422/5500,,16810.0,Spoření
81,2025-01-28,Příchozí okamžitá úhrada,,202501,Jan Krejčí__1723805018/3030,,2000.0,Nezapočítávat
82,2025-01-13,Jednorázová platba,,202501,Alza; Praha; CZE__408359XXXXXX8367,,2542.0,elektronika a sw
83,2025-01-24,Platba kartou,,202501,HelloComp.cz; Praha; CZE__408359XXXXXX8367,,-4590.0,[]
84,2025-01-24,Příchozí platba,,202501,Jan Krejčí__1026704422/5500,,4000.0,Spoření
85,2025-01-24,Jednorázová platba,,202501,8784010018/4000,,-2544.0,Spoření
86,2025-01-23,Příchozí okamžitá úhrada,,202501,vratka kredit__Beachklub Ládví z.s.__2200853015/2010,,2236.0,[]
87,2025-01-23,Příchozí okamžitá úhrada,,202501,Groceries January__Darya Hryhoryeva__2592685013/3030,,1043.0,Nezapočítávat
88,2025-01-20,Trvalá platba,,202501,Životní pojištění__2700/2700,,-735.0,Životní pojištění


In [None]:
# fix second round
#non_matching_ind2 = obtain_index_to_fix(data, False, False, False, False, [])



reshuffle output - měsíc, kategorie plateb, částka, add měna and flag for account

In [None]:
# generate statistics

monthly_expenses = data[data[config['output_format']['category_col']].str.lower()!='nezapočítávat']\
  .groupby(config['output_format']['sort_by'])\
   [config['output_format']['amount']].sum().reset_index()
print(monthly_expenses)

Empty DataFrame
Columns: [Kategorie_plateb, měsíc, částka]
Index: []


In [None]:
# prompt: take original csv, data and last aggregation and save it into excel file each on separate sheet
log_contents = stream_logs.getvalue()
log_df = pd.DataFrame(log_contents.strip().split('\n'), columns=["Log"])

# Create a new Excel workbook
with pd.ExcelWriter(config['file_name_output']) as writer:
  # Write each DataFrame to a different sheet
  df.to_excel(writer, sheet_name='Original Data', index=False)
  data.to_excel(writer, sheet_name='Processed Data', index=False)
  log_df.to_excel(writer, sheet_name="Logs", index=False)
  monthly_expenses.to_excel(writer, sheet_name='Monthly Expenses', index=False)