In [None]:
import csv
import _csv
import re
from typing import Final, TextIO


input_file: Final[TextIO] = open("input.csv", "r", encoding="windows-1250")
input_file_reader: Final[_csv._reader] = csv.reader(input_file, delimiter=";")
rows: Final[list[list[str]]] = list(input_file_reader)
input_file.close()


def is_input_row_header(row: list[str]) -> bool:
    return len(row) == 21 and row[0].startswith("Data")


def is_input_row_data(row: list[str]) -> bool:
    return len(row) == 21 and re.match(r"^\d{4}-\d{2}-\d{2}$", row[0]) is not None and row[8] != ""


header_row: Final[list[str]] = next(row for row in rows if is_input_row_header(row))

data_rows: Final[list[list[str]]] = [row for row in rows if is_input_row_data(row)]


def combine_input_row_header_and_input_row_data_to_entries(
    header_row: list[str], data_rows: list[list[str]]
) -> list[dict[str, str]]:
    return [
        {header_row[i]: data_row[i] for i in range(len(header_row))}
        for data_row in data_rows
    ]


entries: Final[
    list[dict[str, str]]
] = combine_input_row_header_and_input_row_data_to_entries(header_row, data_rows)

In [None]:
import datetime

class Transaction:
	def __init__(self, date: datetime.date, amount: float):
		self._date: Final[datetime.date] = date
		self._amount: Final[float] = amount

	@property
	def date(self) -> datetime.date:
		return self._date

	@property
	def amount(self) -> float:
		return self._amount

	def __repr__(self) -> str:
		return f"Transaction(date={self.date}, amount={self.amount})"

def entry_to_transaction(entry: dict[str, str]) -> Transaction:
	if entry["Waluta"] != "PLN":
		raise ValueError(f"Unsupported currency: {entry['Waluta']}")

	amount: Final[float] = float(entry["Kwota transakcji (waluta rachunku)"].replace(",", "."))
	date: Final[datetime.datetime] = datetime.datetime.fromisoformat(entry["Data księgowania"] or entry["Data transakcji"])

	return Transaction(
		date=date,
		amount=amount,
	)

transactions: Final[list[Transaction]] = [entry_to_transaction(entry) for entry in entries]


def merge_transactions_by_day(transactions: list[Transaction]) -> list[Transaction]:
	transactions_by_date: Final[dict[datetime.date, list[Transaction]]] = {}
	for transaction in transactions:
		if transaction.date not in transactions_by_date:
			transactions_by_date[transaction.date] = []
		transactions_by_date[transaction.date].append(transaction)

	merged_transactions: Final[list[Transaction]] = []
	for date, transactions in transactions_by_date.items():
		merged_transactions.append(Transaction(date=date, amount=sum(transaction.amount for transaction in transactions)))

	return merged_transactions


merged_transactions: Final[list[Transaction]] = merge_transactions_by_day(transactions)


In [None]:
class AcumulatedBalance:
	def __init__(self, date: datetime.date, balance: float):
		self._date: Final[datetime.date] = date
		self._balance: Final[float] = balance

	@property
	def date(self) -> datetime.date:
		return self._date

	@property
	def balance(self) -> float:
		return self._balance

	def __repr__(self) -> str:
		return f"AcumulatedBalance(date={self.date}, balance={self.balance})"

def calculate_acumulated_balance(transactions: list[Transaction]) -> list[AcumulatedBalance]:
	acumulated_balance: Final[list[AcumulatedBalance]] = []
	current_balance: float = 0.0
	for transaction in transactions:
		current_balance += transaction.amount
		acumulated_balance.append(AcumulatedBalance(
			date=transaction.date,
			balance=current_balance,
		))
	return acumulated_balance

acumulated_balance: Final[list[AcumulatedBalance]] = calculate_acumulated_balance(sorted(merged_transactions, key=lambda transaction: transaction.date))


In [None]:
import matplotlib.pyplot as plt  # type: ignore
import matplotlib.dates as mdates  # type: ignore


def draw_acumulated_balance(acumulated_balance: list[AcumulatedBalance]):
    subplots: Final[tuple[plt.Figure, plt.Axes]] = plt.subplots()
    fig: Final[plt.Figure] = subplots[0]
    ax: Final[plt.Axes] = subplots[1]
    ax.plot(
        [ab.date for ab in acumulated_balance],
        [ab.balance for ab in acumulated_balance],
    )
    ax.xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d"))
    plt.show()


draw_acumulated_balance(acumulated_balance)

In [None]:
from sklearn.preprocessing import PolynomialFeatures  # type: ignore
from sklearn.linear_model import LinearRegression  # type: ignore
import numpy as np  # type: ignore


class Model:
    def __init__(self, data: list[AcumulatedBalance], degree: int):
        x_np: Final[np.ndarray] = np.array(
            [mdates.date2num(ab.date) for ab in data]
        ).reshape(-1, 1)
        self._poly_features: Final[PolynomialFeatures] = PolynomialFeatures(
            degree=degree, include_bias=False
        )
        y_np: Final[np.ndarray] = np.array([ab.balance for ab in data])
        self._lin_reg: Final[LinearRegression] = LinearRegression()
        x_poly_np: Final[np.ndarray] = self._poly_features.fit_transform(x_np)
        self._lin_reg.fit(x_poly_np, y_np)

    def predict(self, date: datetime.datetime) -> float:
        x_np: Final[np.ndarray] = np.array([mdates.date2num(date)]).reshape(-1, 1)
        x_poly_np: Final[np.ndarray] = self._poly_features.fit_transform(x_np)
        return self._lin_reg.predict(x_poly_np)[0]

    def find_datetime_of_balance(
        self,
        target_balance: float,
        starting_datetime: datetime.datetime,
        max_error: float,
    ) -> datetime.datetime:

        current_delta = datetime.timedelta(days=1)
        current_datetime = starting_datetime

        last_direction = 0
        direction = 0
        while True:
            current_balance = self.predict(current_datetime)
            if abs(current_balance - target_balance) <= max_error:
                break
            if current_balance < target_balance:
                direction = 1
            elif current_balance > target_balance:
                direction = -1

            current_datetime += current_delta * direction

            if direction == -last_direction:
                current_delta /= 2
            if direction == last_direction:
                current_delta *= 2

            last_direction = direction

        return current_datetime


model: Final[Model] = Model(data=acumulated_balance, degree=20)

In [None]:


def draw_acumulated_balance_and_polynomial_regression_model(
    acumulated_balance: list[AcumulatedBalance],
    model,
    x_ticks_count: int,
    regression_points_count: int,
    starting_datetime: datetime.datetime = None,
    ending_datetime: datetime.datetime = None,
):
    real_starting_datetime: Final[datetime.datetime] = datetime.datetime.fromordinal(acumulated_balance[0].date.toordinal()) if starting_datetime is None else starting_datetime
    real_ending_datetime: Final[datetime.datetime] = datetime.datetime.fromordinal(acumulated_balance[-1].date.toordinal()) if ending_datetime is None else ending_datetime

    subplots: Final[tuple[plt.Figure, plt.Axes]] = plt.subplots()
    fig: Final[plt.Figure] = subplots[0]
    ax: Final[plt.Axes] = subplots[1]
    ax.xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d"))
    ax.xaxis.set_major_locator(plt.MaxNLocator(x_ticks_count))

    ax.plot(
        [ab.date for ab in acumulated_balance],
        [ab.balance for ab in acumulated_balance],
    )

    x: Final[np.ndarray] = np.linspace(
        mdates.date2num(real_starting_datetime),
        mdates.date2num(real_ending_datetime),
        regression_points_count,
    )
    y: Final[np.ndarray] = [model.predict(mdates.num2date(x_i)) for x_i in x]
    ax.plot(mdates.num2date(x), y, color="red")

    plt.show()


draw_acumulated_balance_and_polynomial_regression_model(
    acumulated_balance,
    model,
    x_ticks_count=5,
    regression_points_count=100,
    starting_datetime=datetime.datetime(2015, 1, 1),
    ending_datetime=datetime.datetime(2023, 12, 31),
)

In [None]:

import tabulate

dates_to_predict: Final[list[datetime.datetime]] = [datetime.datetime(year, 1, 1) for year in range(2015, 2030 + 1)]
predicted_balances: Final[list[float]] = [round(model.predict(date), 2) for date in dates_to_predict]

print(tabulate.tabulate(
    zip(dates_to_predict, predicted_balances),
    headers=["Date", "Predicted balance"],
    tablefmt="github",
))

balances_to_predict_dates_of: Final[list[float]] = [
    0.0,
    1_000.0,
    2_000.0,
    5_000.0,
    10_000.0,
    15_000.0,
    20_000.0,
    50_000.0,
    100_000.0,
    250_000.0,
    500_000.0,
    750_000.0,
    1_000_000.0,
]
predicted_dates: Final[list[datetime.date]] = [
    model.find_datetime_of_balance(
        balance,
        starting_datetime=datetime.datetime.now(),
        max_error=0.004,
    ).date()
    for balance in balances_to_predict_dates_of
]

print("")

print(tabulate.tabulate(
    zip(balances_to_predict_dates_of, predicted_dates),
    headers=["Balance", "Predicted date"],
    tablefmt="github",
))