In [171]:
import os

DB_TRANSACTIONS = os.environ['NOTION_MONEY_TRANSACTIONS']  # Replace with your database ID
DB_CSV = os.environ['NOTION_MONEY_CSV']  # Replace with your database ID
NOTION_API_KEY = os.environ['NOTION_TOKEN']  # Replace with your database ID

In [172]:
from notion_client import Client
from pprint import pprint
import requests
import json
import pandas as pd


headers = {
    "Authorization": "Bearer " + NOTION_API_KEY,
    "Content-Type": "application/json",
    "Notion-Version": "2022-06-28",
}

def get_pages(num_pages=None):
    """
    If num_pages is None, get all pages, otherwise just the defined number.
    """
    url = f"https://api.notion.com/v1/databases/{DB_CSV}/query"

    get_all = num_pages is None
    page_size = 100 if get_all else num_pages

    filter = {
        "filter": {
            "property": "Added",
            "checkbox": {
            "equals": False
            }
        },
    }
    response = requests.post(url, json=filter, headers=headers)

    data = response.json()

    results = data["results"]
    while data["has_more"] and get_all:
        payload = {"page_size": page_size, "start_cursor": data["next_cursor"]}
        url = f"https://api.notion.com/v1/databases/{DB_CSV}/query"
        response = requests.post(url, json=filter, headers=headers)
        data = response.json()
        results.extend(data["results"])

    return results

notion = Client(auth=NOTION_API_KEY)
list_users_response = notion.users.list()

results = get_pages()

In [173]:
list_n26 = []
list_buddy = []

for item in results:
    # url = item["properties"]["CSV"]["files"][0]["file"]["url"]
    try:
        if item["properties"]["Name"]["title"][0]["text"]["content"].find("Buddy") != -1:
            list_buddy.append(
                {   
                    "id": item["id"],
                    "url": item["properties"]["CSV"]["files"][0]["file"]["url"]
                }
            )
        elif item["properties"]["Name"]["title"][0]["text"]["content"].find("N26") != -1:
            list_n26.append(
                {   
                    "id": item["id"],
                    "url": item["properties"]["CSV"]["files"][0]["file"]["url"]
                }
            )
    except:
        print("No File Attached")

In [174]:
def create_item(title, description='', date='', account ='', amount='', category=''):
    if not category:
        category = 'Other'
    if description == 'nan':
        description = ''
    url = f'https://api.notion.com/v1/pages'
    # Prepare the payload for the new task
    payload = {
        'parent': {
            'database_id': f'{DB_TRANSACTIONS}'
        },
        
        'properties': {
            'Name': {
                'title': [{
                    'text': {
                        'content': title
                    }
                }]
            },
            'Description': {
                'rich_text': [{
                    'text': {
                        'content': str(description)
                    }
                }]
            },
            'Transaction Date': {
                'date': {
                    'start': date
                }
            },
            'Account': {
                'select': {
                    'name': account
                }
            },
            'Price': {
                'number': float(amount)
            },
            'Category': {
                "multi_select": [
                    {
                    "name": category
                    },
                    # {
                    # "name": "",
                    # },
                ]
            },
        }
    }

    response = requests.post(url, headers=headers, data=json.dumps(payload))

    if response.status_code == 200:
        print(f"Task '{title}' created successfully.")
        return 1
    else:
        print(f"Error creating task: {response.status_code} {response.text}")
        return 0

# Buddybank.pdf
Tested on Q2 24 with big number of element.

### FD
Add partial sum verification as for Buddybank Lite 

In [239]:
from datetime import datetime
import numpy as np
import time

def buddy_to_notion(df):
    try:
        df["Entrate"] = (df['Entrate'].str.replace('.', '').str.replace(',', '.').astype(float))
    except:
        pass
    
    try:
        df["Uscite"] = (df['Uscite'].str.replace('.', '').str.replace(',', '.').astype(float))
    except:
        pass

    for index, row in df.iterrows():
        val_date_str = row["Valuta"]

        try:
            val_date = datetime.strptime(val_date_str, "%Y-%m-%d")
        except:
            try:
                val_date = datetime.strptime(val_date_str, "%d.%m.%y")
            except:
                print(f"Row is not a valid one, shit")
                continue

        try:
            title = row["Descrizione"].split("*")[2]
        except:
            title = row["Descrizione"]

        account = "Buddybank"
        
        if not np.isnan(row["Entrate"]): 
            amount = row["Entrate"]
        else: 
            amount = - abs(row["Uscite"])
        
        create_item(title=title,
                    date=val_date.strftime("%Y-%m-%d"),
                    account=account,
                    amount=amount)
    else:
            print("Execution Complete")
            return

In [None]:
import tabula

for list_item in list_buddy:
    try: 
        tables = tabula.read_pdf(list_item["url"], pages="all", multiple_tables=True, lattice=True)
        df_temp = pd.concat(tables)

        df_temp = df_temp[1:] #take the data less the header row
        # df_temp.columns = new_header #set the header row as the df header
        df_temp = df_temp[df_temp.Valuta != 'Valuta']
        df_temp.dropna(how='all', axis=0, inplace=True)
        df_temp.dropna(how='all', axis=1, inplace=True)
        df_temp = df_temp[1:] #take the data less the header row
        df_temp = df_temp[:-2].reindex()

        for index, item in df_temp.iterrows():
            if item["Descrizione"].find("EUR") != -1:
                item["Descrizione"] = item["Descrizione"][(item["Descrizione"].find("EUR"))+9:].strip()
                print(item["Descrizione"].strip())
            elif item["Descrizione"].find("AUD") != -1:
                item["Descrizione"] = item["Descrizione"][(item["Descrizione"].find("AUD"))+9:].strip()
            else:
                pass
        
        buddy_to_notion(df_temp)
    except:
        print(f"Error with {item}")
        print("no")
    else:
        notion.pages.update(
            list_item["id"],
            properties={
                "Added": {
                    "checkbox": True
                }
            },
        )

# N26.pdf
Last run in Sept 2024. Verify after usage

In [177]:
def N26_to_notion(df):
    
    for index, rows in df.iterrows():
        date_str = rows["Booking Date"]
        date = datetime.strptime(date_str, "%d.%m.%Y")
        if not create_item( title=f'{rows["Description"]}',
                            date=f'{date.strftime("%Y-%m-%d")}',
                            account=f'{"N26"}',
                            amount=f'{rows["Amount"]}',
                            description='',
                            category=''):
            print("Error in execution, correct the configuration and start again")
            break
    else:
        print("Execution Complete")

In [178]:
header = ['Description', 'Booking Date', 'Amount']

for list_item in list_n26:
    try: 
        tables = tabula.read_pdf(item["url"], pages="all", multiple_tables=False, lattice=False)

        df_temp = tables[0]
        df_temp = pd.DataFrame(df_temp.values[0:], columns=header)
        restore_row = tables[0].columns.to_list()
        df_temp = pd.concat([df_temp, pd.DataFrame([tables[0].columns.to_list()], columns=header)], ignore_index=True, axis=0) # does not save changes to the original dataframe
        df_temp["Amount"] = df_temp["Amount"].str.replace('€','').str.replace('.','').str.replace(',','.').astype(float)
        df_temp = df_temp[df_temp['Amount'].notna()]
        
        N26_to_notion(df_temp)
        
    except:
        print("Brutta madonna")
    else:
        notion.pages.update(
            item["id"],
            properties={
                "Added": {
                    "checkbox": True
                }
            },
        )

# BuddybankMonthly.pdf (Lite)
To be used only when 1 BuddyLite document in list_buddy

### FD
Remove "only1" limit and consider 

In [None]:
import unicodedata
import re
from pypdf import PdfReader
import os

import urllib.request

it_to_eng_dict = {"ottobre" : "Oct"}

df_temp = pd.DataFrame(columns=["Data", "Valuta", "Descrizione", "Uscite", "Entrate"])

url = list_buddy[0]["url"]
page_lines = []

file_Path = 'temp_buddy.pdf'
urllib.request.urlretrieve(url, file_Path)

# To be used ONLY if there's 1 BuddyLite document in list_buddy
reader = PdfReader(file_Path)
os.remove(file_Path)

regex_list = [r"\b\d\d\s", r"\b\d\s", r"\b\d"] # [0, 1] are filters for dates ex. 31 ottobre, 8 ottobre...
skip = False

# Function to remove elements according to their index from a lsit
def remove_elements_at_indices(test_list, idx_list):
    # Base case: if index list is empty, return original list
    if not idx_list:
        return test_list

    # Recursive case: extract first index and recursively process the rest of the list
    first_idx = idx_list[0]
    rest_of_indices = idx_list[1:]
    sub_list = remove_elements_at_indices(test_list, rest_of_indices)

    # Remove element at current index
    sub_list.pop(first_idx)

    return sub_list

for page_index in range(len(reader.pages)):
    page = reader.pages[page_index]
    text = page.extract_text()
    lines = text.splitlines()
    page_lines.extend(lines)

# Removing the page indexes from the list
idx_list = [index for (index, item) in enumerate(page_lines) if re.match(r"(?<!\S)\d(?!\S)(?!.)", item)]
page_lines = remove_elements_at_indices(page_lines, idx_list)

dates_indexes = [index for (index, item) in enumerate(page_lines) if re.match(regex_list[0], item) or re.match(regex_list[1], item)]
# remove header
page_lines = page_lines[dates_indexes[0]:]
# recompute dates indexes
dates_indexes = [index for (index, item) in enumerate(page_lines) if re.match(regex_list[0], item) or re.match(regex_list[1], item)]

line = ""
new_row = ""
buffer = ""

patial_sum = 0.0

for index, line in enumerate(page_lines):

    if index in dates_indexes:
        date = page_lines[index]
        continue
    # check previous row is different
    if (index < len(page_lines)-1 and page_lines[index] == page_lines[index+1]) or skip:
        skip = False
        print(f"Skip at index {index}")
        continue

    if page_lines[index].find(unicodedata.lookup("EURO SIGN")) == -1 and (index not in dates_indexes):
        buffer = buffer + page_lines[index]
        continue

    if page_lines[index].find(unicodedata.lookup("EURO SIGN")) != -1:
        new_row = buffer + page_lines[index]
        buffer = ""

    try:
        money = re.search(r"(-\d+\,?\d*\s\€|\d+\,?\d*\s\€)", new_row)
        descrizione = new_row[:money.start()]
        # magia SO per convertire mee
        val_date_str = re.sub(r'\b'+r'|\b'.join(it_to_eng_dict)+r'\b',lambda m: it_to_eng_dict.get(m.group(), m.group()), date)
        val_date_str = val_date_str + " 2024"
        val_date = datetime.strptime(val_date_str, "%d %b %Y")

        money = float(money.group().replace('€','').replace('.','').replace(',','.'))
        print(f"New row found: {index, date, new_row}")
        if money < 0:
            new_df_row = {"Data": "", "Valuta": val_date.strftime("%Y-%m-%d"), "Descrizione": descrizione, "Uscite": (money), "Entrate": ""}
        else:
            new_df_row = {"Data": "", "Valuta": val_date.strftime("%Y-%m-%d"), "Descrizione": descrizione, "Uscite": "", "Entrate": money}
        dict_to_df = pd.DataFrame([new_df_row])
        df_temp = pd.concat([df_temp, dict_to_df], ignore_index=True)

    except:
        print(f"Exception at row {index}")
        continue

# Dataframe creato con data, descrizione e entrata/uscita


In [None]:
# Not considering last total element:
df_temp["Uscite"] = df_temp["Uscite"].replace(r'^\s*$', np.nan, regex=True)
df_temp["Entrate"] = df_temp["Entrate"].replace(r'^\s*$', np.nan, regex=True)

total_sum = df_temp["Uscite"].iloc[-1]
partial_sum = df_temp["Uscite"][:-1].sum() + df_temp["Entrate"][:-1].sum()

if total_sum != partial_sum:
    print("WARNING: partial sum is different from the total")
    print(f"Partial: {partial_sum}")
    print(f"Total: {total_sum}")


# Format description as for normal buddy
for index, item in df_temp.iterrows():
            if item["Descrizione"].find("EUR") != -1:
                df_temp.iloc[index]["Descrizione"] = item["Descrizione"][(item["Descrizione"].find("EUR"))+9:].strip()
            elif item["Descrizione"].find("AUD") != -1:
                df_temp.iloc[index]["Descrizione"] = item["Descrizione"][(item["Descrizione"].find("AUD"))+9:].strip()
            else:
                pass

buddy_to_notion(df_temp[:-1])
