Main Program

Task 1 Requirement Recap
1. Read financial_transactions.csv from your specified directory.
2. Parse each row into a dictionary.
3. Convert date strings to datetime objects.
4. Make debit amounts negative.
5. Skip invalid rows and log errors to errors.txt.
Print the number of loaded transactions.

In [None]:
import csv
from datetime import datetime
import os
import pandas as pd

def load_transactions(
    filename='./financial_transactions.csv',
    errorlog = 'errors.txt'
):
    transactions = []
    error_lines = []
    
    try:
        with open(filename, mode='r', newline='', encoding='utf-8') as file:
            reader = csv.DictReader(file)
            for row_num, row in enumerate(reader, start=2):  # start=2 to account for header line
                try:
                    # Parse and validate fields
                    transaction_id = int(row['transaction_id'])
                    date = datetime.strptime(row['date'], '%Y-%m-%d')
                    customer_id = int(row['customer_id'])
                    amount = float(row['amount'])
                    if row['type'] not in ['credit', 'debit', 'transfer']:
                        raise ValueError(f"Invalid transaction type: {row['type']}")
                    description = row['description'].strip()

                    # Adjust amount if debit
                    if row['type'] == 'debit':
                        amount = -abs(amount)

                    transaction = {
                        'transaction_id': transaction_id,
                        'date': date,
                        'customer_id': customer_id,
                        'amount': amount,
                        'type': row['type'],
                        'description': description
                    }

                    transactions.append(transaction)

                except Exception as e:
                    error_lines.append(f"Row {row_num}: {e} | Raw: {row}")

    except FileNotFoundError:
        print(f"Error: File not found at path: {filename}")
        return []

    # Log errors to errors.txt in same directory
    if error_lines:
        error_log_path = os.path.join(os.path.dirname(filename), error_log)
        with open(error_log_path, 'w', encoding='utf-8') as f:
            for line in error_lines:
                f.write(line + '\n')
        print(f"{len(error_lines)} invalid row(s) skipped. See {error_log_path} for details.")

    print(f"{len(transactions)} valid transaction(s) loaded.")
    return transactions

if __name__ == "__main__":
    transactions = load_transactions()
    for t in transactions[:3]:  # Show the first 3 for preview
        print(t)

100000 valid transaction(s) loaded.
{'transaction_id': 1, 'date': datetime.datetime(2020, 10, 26, 0, 0), 'customer_id': 926, 'amount': 6478.39, 'type': 'credit', 'description': 'Expect series shake art again our.'}
{'transaction_id': 2, 'date': datetime.datetime(2020, 1, 8, 0, 0), 'customer_id': 466, 'amount': 1255.95, 'type': 'credit', 'description': 'Each left similar likely coach take.'}
{'transaction_id': 3, 'date': datetime.datetime(2019, 9, 2, 0, 0), 'customer_id': 110, 'amount': -7969.68, 'type': 'debit', 'description': 'Direction wife job pull determine leader move college.'}


Task 2:  
1. add_transaction(transactions) – Prompts user to input and append a new transaction.
2. view_transactions(transactions, filter_type=None) – Displays transactions, optionally filtered by type.


In [4]:
from datetime import datetime

def add_transaction(transactions):
    """Add a new transaction from user input."""
    try:
        date_str = input("Enter date (YYYY-MM-DD): ").strip()
        date = datetime.strptime(date_str, "%Y-%m-%d")

        existing_ids = list(set([t['customer_id'] for t in transactions]))
        print(f"Existing Customer IDs: {sorted(existing_ids)}")
        customer_id = int(input("Enter customer ID: ").strip())

        amount = float(input("Enter amount: ").strip())

        trans_type = input("Enter type (credit/debit/transfer): ").strip().lower()
        if trans_type not in ['credit', 'debit', 'transfer']:
            print("Invalid type. Must be credit, debit, or transfer.")
            return

        description = input("Enter description: ").strip()

        # Make amount negative for debits
        if trans_type == 'debit':
            amount = -abs(amount)

        new_id = max([t['transaction_id'] for t in transactions], default=0) + 1
        new_transaction = {
            'transaction_id': new_id,
            'date': date,
            'customer_id': customer_id,
            'amount': amount,
            'type': trans_type,
            'description': description
        }

        transactions.append(new_transaction)
        print("Transaction added!\n")

    except ValueError as e:
        print(f"Error adding transaction: {e}")


def view_transactions(transactions, filter_type=None):
    """Display transactions in a formatted table, optionally filtered by type."""
    if filter_type:
        filter_type = filter_type.lower()
        transactions = [t for t in transactions if t['type'] == filter_type]

    print(f"{'ID':<4} | {'Date':<12} | {'Customer':<9} | {'Amount':<10} | {'Type':<8} | Description")
    print("-" * 80)

    for t in transactions:
        date_str = t['date'].strftime('%b %d, %Y')
        print(f"{t['transaction_id']:<4} | {date_str:<12} | {t['customer_id']:<9} | "
              f"{t['amount']:<10.2f} | {t['type']:<8} | {t['description'][:40]}")



Task 3
1. Update: Edit one or more fields of a selected transaction.
2. Delete: Remove a transaction after confirmation.
3. Add validations, cancel options, and clear prompts.

In [5]:
def display_transaction_summary(transactions):
    """Display transactions with index numbers for selection."""
    print("\nTransactions:")
    for idx, t in enumerate(transactions, start=1):
        date_str = t['date'].strftime('%b %d, %Y')
        print(f"{idx}. {date_str} | {t['customer_id']} | {t['amount']:.2f} | {t['type']} | {t['description'][:40]}")


def update_transaction(transactions):
    """Update a transaction’s details."""
    if not transactions:
        print("No transactions to update.")
        return

    display_transaction_summary(transactions)

    try:
        choice = int(input("Select transaction to update (number): "))
        if choice < 1 or choice > len(transactions):
            print("Invalid selection.")
            return

        t = transactions[choice - 1]
        print("Leave any field blank to keep it unchanged.")

        new_date = input(f"New date ({t['date'].strftime('%Y-%m-%d')}): ").strip()
        new_customer = input(f"New customer ID ({t['customer_id']}): ").strip()
        new_amount = input(f"New amount ({t['amount']}): ").strip()
        new_type = input(f"New type ({t['type']}): ").strip().lower()
        new_desc = input(f"New description ({t['description']}): ").strip()

        # Update fields if not blank
        if new_date:
            try:
                t['date'] = datetime.strptime(new_date, '%Y-%m-%d')
            except ValueError:
                print("Invalid date. Keeping original.")
        if new_customer:
            t['customer_id'] = int(new_customer)
        if new_amount:
            amt = float(new_amount)
            t['amount'] = -abs(amt) if t['type'] == 'debit' else amt
        if new_type in ['credit', 'debit', 'transfer']:
            t['type'] = new_type
            if t['type'] == 'debit':
                t['amount'] = -abs(t['amount'])
            else:
                t['amount'] = abs(t['amount'])
        elif new_type:
            print("Invalid type. Keeping original.")
        if new_desc:
            t['description'] = new_desc

        print("Transaction updated!")

    except Exception as e:
        print(f"Error: {e}")


def delete_transaction(transactions):
    """Delete a transaction after confirmation."""
    if not transactions:
        print("No transactions to delete.")
        return

    display_transaction_summary(transactions)

    try:
        choice = int(input("Select transaction to delete (number): "))
        if choice < 1 or choice > len(transactions):
            print("Invalid selection.")
            return

        t = transactions[choice - 1]
        print(f"Selected: {t['date'].strftime('%b %d, %Y')} | {t['customer_id']} | {t['amount']} | {t['type']} | {t['description'][:40]}")
        confirm = input("Are you sure you want to delete this transaction? (yes/no): ").strip().lower()

        if confirm == 'yes':
            del transactions[choice - 1]
            print("Transaction deleted.")
        else:
            print("Deletion cancelled.")

    except Exception as e:
        print(f"Error: {e}")


Task 4:  
1. Calculate total credits, debits, transfers.
2. Show net balance.
3. Summarize by type.
4. Find the customer with the highest total debit.
5. Show percentage by transaction type.
6. Analyze 2022-only transactions.
7. Save analysis to analysis.txt.

In [6]:
def analyze_finances(transactions, only_2022=False, save_to_file=False, filename='analysis.txt'):
    """Analyze transactions and optionally filter by year or save to file."""

    if only_2022:
        filtered = [t for t in transactions if t['date'].year == 2022]
        print("Analyzing 2022-only transactions...")
    else:
        filtered = transactions

    totals = {'credit': 0.0, 'debit': 0.0, 'transfer': 0.0}
    customer_debits = {}

    for t in filtered:
        t_type = t['type']
        amt = t['amount']
        totals[t_type] += abs(amt)

        if t_type == 'debit':
            cust_id = t['customer_id']
            customer_debits[cust_id] = customer_debits.get(cust_id, 0.0) + abs(amt)

    net_balance = totals['credit'] - totals['debit']
    total_all = sum(totals.values())

    # Determine top debit customer
    top_debit_cust = max(customer_debits.items(), key=lambda x: x[1], default=(None, 0))

    report_lines = [
        "📊 Financial Summary",
        "---------------------",
        f"Total Credits  : ${totals['credit']:.2f}",
        f"Total Debits   : ${totals['debit']:.2f}",
        f"Total Transfers: ${totals['transfer']:.2f}",
        f"Net Balance    : ${net_balance:.2f}",
        "",
        "By Type:",
    ]
    for t_type, amount in totals.items():
        percent = (amount / total_all) * 100 if total_all > 0 else 0
        report_lines.append(f"  {t_type.capitalize():<8}: ${amount:.2f} ({percent:.1f}%)")

    report_lines += [
        "",
        f"Customer with highest total debits: {top_debit_cust[0]} (${top_debit_cust[1]:.2f})"
    ]

    if save_to_file:
        try:
            with open(filename, 'w') as f:
                f.write('\n'.join(report_lines))
            print(f"Report saved to {filename}")
        except Exception as e:
            print(f"Failed to save report: {e}")
    else:
        print("\n".join(report_lines))


Task 5:  Save Transactions and Generate TExt Reports
1. Save updated transactions back to financial_transactions.csv.
2. Use csv.DictWriter with proper field formatting.
3. Generate a timestamped financial report (e.g., report_20250601.txt).
4. Include transaction date range in the report.
5. Back up the original CSV file.
6. Handle file write errors.

In [11]:
import csv
import shutil
from datetime import datetime

def save_transactions(transactions, filename='financial_transactions.csv'):
    """Save all transactions back to a CSV file."""
    # Backup original file
    backup_filename = filename.replace('.csv', '_backup.csv')
    try:
        shutil.copy(filename, backup_filename)
        print(f"Backup saved to {backup_filename}")
    except FileNotFoundError:
        print("No original file found to back up (this may be your first save).")
    
    # Save new data
    try:
        with open(filename, 'w', newline='', encoding='utf-8') as f:
            fieldnames = ['transaction_id', 'date', 'customer_id', 'amount', 'type', 'description']
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()
            for t in transactions:
                writer.writerow({
                    'transaction_id': t['transaction_id'],
                    'date': t['date'].strftime('%Y-%m-%d'),
                    'customer_id': t['customer_id'],
                    'amount': t['amount'],
                    'type': t['type'],
                    'description': t['description']
                })
        print(f"Transactions saved to {filename}")
    except Exception as e:
        print(f"Error saving transactions: {e}")


Generate a timestamped report

In [7]:
def generate_report(transactions):
    """Generate a financial summary report and save to a timestamped .txt file."""
    if not transactions:
        print("No transactions to report.")
        return

    dates = [t['date'] for t in transactions]
    min_date, max_date = min(dates), max(dates)
    date_range = f"{min_date.strftime('%b %d, %Y')} to {max_date.strftime('%b %d, %Y')}"

    # Reuse analysis logic
    from io import StringIO
    report_buffer = StringIO()
    totals = {'credit': 0.0, 'debit': 0.0, 'transfer': 0.0}
    customer_debits = {}

    for t in transactions:
        t_type = t['type']
        amt = t['amount']
        totals[t_type] += abs(amt)
        if t_type == 'debit':
            cid = t['customer_id']
            customer_debits[cid] = customer_debits.get(cid, 0.0) + abs(amt)

    net = totals['credit'] - totals['debit']
    total_all = sum(totals.values())
    top_debit = max(customer_debits.items(), key=lambda x: x[1], default=(None, 0))

    report_buffer.write("📄 Smart Personal Finance Report\n")
    report_buffer.write("--------------------------------\n")
    report_buffer.write(f"Date Range: {date_range}\n\n")
    report_buffer.write(f"Total Credits  : ${totals['credit']:.2f}\n")
    report_buffer.write(f"Total Debits   : ${totals['debit']:.2f}\n")
    report_buffer.write(f"Total Transfers: ${totals['transfer']:.2f}\n")
    report_buffer.write(f"Net Balance    : ${net:.2f}\n\n")
    report_buffer.write("Breakdown by Type:\n")
    for k, v in totals.items():
        percent = (v / total_all * 100) if total_all > 0 else 0
        report_buffer.write(f"  {k.title():<8}: ${v:.2f} ({percent:.1f}%)\n")
    report_buffer.write(f"\nTop Debit Customer: {top_debit[0]} (${top_debit[1]:.2f})\n")

    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    report_filename = f"report_{timestamp}.txt"

    try:
        with open(report_filename, 'w', encoding='utf-8') as file:
            file.write(report_buffer.getvalue())
        print(f"Report saved as {report_filename}")
    except Exception as e:
        print(f"Error writing report: {e}")


Menu for the project

In [None]:
def main():
    transactions = []
    totals = {}
    net_balance = 0.0

    csv_file_path = './financial_transactions.csv'
    report_folder = './output/'

    while True:
        print("\n📊 Smart Personal Finance Analyzer")
        print("1. Load Transactions from CSV")
        print("2. Add Transaction")
        print("3. View Transactions")
        print("4. Update Transaction")
        print("5. Delete Transaction")
        print("6. Analyze Finances")
        print("7. Save Transactions to CSV")
        print("8. Generate Report")
        print("9. Exit")

        choice = input("Select an option (1–9): ").strip()

        if choice == '1':
            transactions = load_transactions(csv_file_path)

        elif choice == '2':
            if transactions:
                add_transaction(transactions)
            else:
                print(" Please load transactions first (option 1).")

        elif choice == '3':
            if transactions:
                view_transactions(transactions)
            else:
                print(" Please load transactions first.")

        elif choice == '4':
            if transactions:
                update_transaction(transactions)
            else:
                print(" Please load transactions first.")

        elif choice == '5':
            if transactions:
                delete_transaction(transactions)
            else:
                print(" Please load transactions first.")

        elif choice == '6':
            if transactions:
                totals, net_balance = analyze_finances(transactions)
            else:
                print(" Please load transactions first.")

        elif choice == '7':
            if transactions:
                save_transactions(transactions, csv_file_path)
            else:
                print(" No transactions to save.")

        elif choice == '8':
            if transactions and totals:
                generate_report(transactions, totals, net_balance, report_folder)
            else:
                print(" Please run analysis first (option 6).")

        elif choice == '9':
            print(" Exiting program. Goodbye!")
            break

        else:
            print(" Invalid option. Please enter a number from 1 to 9.")

if __name__ == "__main__":
    main()



📊 Smart Personal Finance Analyzer
1. Load Transactions from CSV
2. Add Transaction
3. View Transactions
4. Update Transaction
5. Delete Transaction
6. Analyze Finances
7. Save Transactions to CSV
8. Generate Report
9. Exit
