In [None]:
import logging
from pathlib import Path
import re

import pandas as pd
import pdfplumber

In [None]:
credentials_path = Path('../credentials/cool-plasma-452619-v4-feb20b70d461.json')
downloads_path = Path.home() / 'Downloads'

In [None]:
def get_all_account_statement_files(downloads_path: Path) -> list[Path]:
    """Get all the account statement files from the downloads folder."""
    return [
        file
        for file in downloads_path.iterdir()
        if 'Kontoauszug' in file.name and file.is_file()
    ]

In [None]:
files = get_all_account_statement_files(downloads_path)
files

In [None]:
file_path = downloads_path / files[0]
print(file_path)

In [None]:
def extract_text_from_pdf(pdf_path: Path) -> str:
    """Extract the text from a pdf file."""
    if not pdf_path.exists():
        logging.error(f'File could not be found: {pdf_path}')
        return ''

    full_pdf_text = ''

    try:
        with pdfplumber.open(pdf_path) as pdf:
            for page in pdf.pages:
                full_pdf_text += page.extract_text(extraction_mode='layout')
    except Exception as e:
        logging.error(f'An unexpected error occurred while reading {pdf_path}: {e}')

    return full_pdf_text

In [None]:
full_pdf_text = extract_text_from_pdf(file_path)
lines = full_pdf_text.split('\n')

In [None]:
full_pdf_text

In [None]:
lines[6]

In [None]:
def extract_balance_from_line(line: str) -> float:
    """Extract the balance from a line of text."""
    try:
        parts = line.split(' ')
        balance_str = parts[-2]
        balance_str = balance_str.replace('.', '').replace(',', '.')
        return float(balance_str)
    except (IndexError, ValueError) as e:
        logging.error(
            f'An error occurred when trying to extract the balance of a line: {line}. Fehler: {e}'
        )
        return 0.0
    

# def get_balance_of_account(lines: list, balance_type: str) -> tuple:
#     """Get a balance (old or new) of an account."""
#     balance_line = next((line for line in lines if balance_type in line), None)

#     if balance_line:
#         balance_float = extract_balance_from_line(balance_line)
#         balance_idx = lines.index(balance_line)
#         return balance_float, balance_idx
#     else:
#         logging.error(f'{balance_type} not found.')
#         return 0.0, -1

def get_balance_of_account(lines: list, balance_type: str) -> list:
    """Get all balances (old or new) of an account."""
    results = []
    for idx, line in enumerate(lines):
        if balance_type in line:
            balance_float = extract_balance_from_line(line)
            results.append((balance_float, idx))
    if not results:
        logging.error(f'{balance_type} not found.')
    else:
        if balance_type == 'neuer Kontostand':
            results = results[-1]
        if balance_type == 'alter Kontostand':
            results = results[0]
    return results



In [None]:
def get_all_transactions(
    lines: list, old_balance_idx: int, new_balance_idx: int
) -> list:
    """Extract all transactions from an account statement between two index markers."""
    print(lines)
    print(old_balance_idx)
    transactions_part = lines[old_balance_idx + 1 : new_balance_idx]
    print(transactions_part)
    pattern_transaction_start = re.compile(r'\d{2}\.\d{2}\. \d{2}\.\d{2}\.')
    pattern_transaction_start_alt = re.compile(r'Übertrag')

    transactions = []
    current_transaction = []

    for line in transactions_part:
        # If line starts with Übertrag or with pattern_transaction_start, then it is a new transaction
        if pattern_transaction_start_alt.match(line) or pattern_transaction_start.match(
            line
        ):
            transactions.append(current_transaction)
            current_transaction = []
        current_transaction.append(line)

    transactions.append(current_transaction)  # Append the last transaction

    print(transactions)
    transactions = transactions[1:]  # Remove the empty first transaction

    # Filter out transactions that start with 'Übertrag'
    transactions = [
        txn for txn in transactions if not pattern_transaction_start_alt.match(txn[0])
    ]

    for txn in transactions:
        # Append all lines after line 2 (name) and keep only the first two lines
        if len(txn) > 2:
            txn[2] = ''.join(txn[1:])
            del txn[3:]

    return transactions

In [None]:
results_old = get_balance_of_account(lines, 'alter Kontostand')
results_new = get_balance_of_account(lines, 'neuer Kontostand')
print(results_new)
print(results_old)

all_transactions = get_all_transactions(lines, results_old[1], results_new[1])
print(f'Anzahl der Transaktionen: {len(all_transactions)}')

In [None]:
from googleapiclient.discovery import build
import gspread
from gspread import Client, Spreadsheet, Worksheet
from oauth2client.service_account import ServiceAccountCredentials


SCOPE_GOOGLE_DRIVE = [
    'https://spreadsheets.google.com/feeds',
    'https://www.googleapis.com/auth/drive',
    'https://www.googleapis.com/auth/drive.file',
]


def set_up_google_connection(credentials_path=None):
    """Set up the Google connection using service account credentials."""
    if credentials_path is None:
        raise ValueError('Credential path is required')

    creds = ServiceAccountCredentials.from_json_keyfile_name(
        credentials_path, SCOPE_GOOGLE_DRIVE
    )
    client = gspread.authorize(creds)
    service = build('drive', 'v3', credentials=creds)

    return client, service

In [None]:
client, service = set_up_google_connection(credentials_path)

In [None]:
spreadsheet = client.open_by_key("1OnrW1foE-1lOtgfxBv2Y5qqJSDnW4hiYeLpScjgFKxM")
sheet = spreadsheet.sheet1

sheet_incomes = spreadsheet.worksheet('Einnahmen')
sheet_expenses = spreadsheet.worksheet('Ausgaben')

df_expenses = pd.DataFrame(sheet_expenses.get_all_values())
df_incomes = pd.DataFrame(sheet_incomes.get_all_values())

In [None]:
df_expenses.columns = df_expenses.iloc[0]
df_expenses = df_expenses[1:].reset_index(drop=True)

df_incomes.columns = df_incomes.iloc[0]
df_incomes = df_incomes[1:].reset_index(drop=True)

gsheets = {'Expense': df_expenses, 'Income': df_incomes}

In [None]:
gsheets['Income']

In [None]:
gsheets['Expense']

In [None]:
def check_income_or_expense(transaction: list[str]) -> str:
    """Check if the transaction is an income or an expense based on its first line."""
    if not transaction:
        return 'Unknown'

    line = transaction[0]
    if re.match(r'.*S$', line):
        return 'Expense'
    elif re.match(r'.*H$', line):
        return 'Income'
    return 'Unknown'


def get_transaction_value(transaction: list) -> float:
    """Get the value of the transaction."""
    value = transaction[0].split(' ')[-2]
    value_float = float(value.replace('.', '').replace(',', '.'))

    return value_float


def open_gsheet_from_file_name(
    client: Client, acc_num: str, secrets: dict
) -> Spreadsheet:
    """Get the Google Sheet from the file name."""
    for _object_id, obj_data in secrets['objects'].items():
        if obj_data['account_statement_id'] == acc_num:
            spreadsheet_id = obj_data['spreadsheet_id']
            return client.open_by_key(spreadsheet_id)

    raise ValueError(f'No object found for account number: {acc_num}')



def get_first_information_about_transaction(
    transaction: list, gsheets: dict
) -> tuple[str, pandas.DataFrame, str, float, str]:
    """Return transaction type, DataFrame, name, value, and month."""
    logging.info(f'Checking transaction {transaction}...')
    transaction_type = check_income_or_expense(transaction)

    df = (
        gsheets['Expense']
        if transaction_type == 'Expense'
        else gsheets['Income']
        if transaction_type == 'Income'
        else None
    )

    if df is None:
        raise ValueError(f'Unknown transaction type: {transaction_type}')

    transaction_value = get_transaction_value(transaction)
    name = transaction[1].strip() if len(transaction) > 1 else 'Monatsabschluss Bank'
    month = transaction[0].split('.')[1]

    return transaction_type, df, name, transaction_value, month



def add_new_row(
    df: pd.DataFrame,
    name: str,
    month: str,
    transaction_value: float,
    transaction_type: str,
    sheets: dict,
    general_account: bool = False,
) -> None:
    """Add a new transaction row to the DataFrame."""
    print("hi")
    new_row_data = [name] + [None] * (len(df.columns) - 1)
    new_row = pd.DataFrame([new_row_data], columns=df.columns)

    df = pd.concat([df, new_row], ignore_index=True)
    new_row_index = df.index[-1]

    df.loc[new_row_index, month] = transaction_value

    if general_account and transaction_type == 'Expense':
        df.loc[new_row_index, 'Umlegbar'] = 'Nein'

    if transaction_type == 'Income' and 'Mieter' in df.columns:
        df.loc[new_row_index, 'Mieter'] = 'Nein'

    if transaction_type == 'Expense':
        sheets['Expense'] = df
    else:
        sheets['Income'] = df

In [None]:
for transaction_index in range(len(all_transactions)):
    transaction = all_transactions[transaction_index]

    transaction_type, df, name, transaction_value, month = (
        get_first_information_about_transaction(transaction, gsheets)
    )

    add_new_row(
        df,
        name,
        month,
        transaction_value,
        transaction_type,
        gsheets,
        general_account=True,
    )

In [None]:
gsheets['Income']

In [None]:
gsheets['Expense']

In [None]:
from gspread_dataframe import set_with_dataframe

def update_google_sheet(sheet: Worksheet, df: pd.DataFrame) -> None:
    """Write a DataFrame to a Google Sheet while preserving header formatting."""
    set_with_dataframe(
        sheet,
        df,
        row=2,  # Start writing from row 2
        col=1,  # Start at column A
        include_index=False,
        include_column_header=False,  # Preserve row 1 (do not overwrite headers)
        resize=False,  # Preserve sheet formatting
    )

In [None]:
update_google_sheet(sheet_expenses, gsheets['Expense'])
update_google_sheet(sheet_incomes, gsheets['Income'])
print('✅ All changes saved to Google Sheets!')