<a href="https://colab.research.google.com/github/erokemwa/Data-Science/blob/main/Water.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [10]:
import json
import secrets
from datetime import datetime, timedelta
from bcrypt import hashpw, gensalt, checkpw
from collections import defaultdict

class User:
    def __init__(self, username, password, pin):
        self.username = username
        self.password = hashpw(password.encode(), gensalt())
        self.pin = hashpw(pin.encode(), gensalt())
        self.balance = 0.0
        self.consumption_history = []
        self.billing_history = []
        self.pin_attempts = 0
        self.last_login = None

class BillingRecord:
    def __init__(self, consumption, amount, due_date):
        self.consumption = consumption  # in cubic meters
        self.amount = amount
        self.due_date = due_date
        self.payments = []
        self.penalties = 0.0

class WaterBillingSystem:
    TIERED_RATES = {
        0: 2.50,    # 0-10 m³
        10: 3.75,   # 11-30 m³
        30: 5.00    # 30+ m³
    }
    PENALTY_RATE = 0.05  # 5% monthly penalty
    DUE_DAYS = 15

    def __init__(self):
        self.users = {}
        self.current_user = None
        self.login_attempts = defaultdict(int)
        self.load_data()
        self.admin_password = hashpw("secure_admin123".encode(), gensalt())

    # ----------------------
    # Data Persistence
    # ----------------------
    def save_data(self):
        data = {
            "users": {
                uname: {
                    "password": u.password.decode(),
                    "pin": u.pin.decode(),
                    "balance": u.balance,
                    "consumption_history": u.consumption_history,
                    "billing_history": [
                        {
                            "consumption": br.consumption,
                            "amount": br.amount,
                            "due_date": br.due_date.isoformat(),
                            "payments": br.payments,
                            "penalties": br.penalties
                        } for br in u.billing_history
                    ],
                    "pin_attempts": u.pin_attempts
                } for uname, u in self.users.items()
            }
        }
        with open("billing_data.json", "w") as f:
            json.dump(data, f, indent=2)

    def load_data(self):
        try:
            with open("billing_data.json") as f:
                data = json.load(f)
                for uname, udata in data["users"].items():
                    user = User(uname, "temp", "temp")
                    user.password = udata["password"].encode()
                    user.pin = udata["pin"].encode()
                    user.balance = udata["balance"]
                    user.consumption_history = udata["consumption_history"]
                    user.billing_history = [
                        BillingRecord(
                            br["consumption"],
                            br["amount"],
                            datetime.fromisoformat(br["due_date"])
                        ) for br in udata["billing_history"]
                    ]
                    self.users[uname] = user
        except FileNotFoundError:
            pass

    # ----------------------
    # Core Functionality
    # ----------------------
    def calculate_bill(self, consumption):
        amount = 0
        remaining = consumption

        tiers = sorted(self.TIERED_RATES.keys(), reverse=True)
        for tier in tiers:
            if consumption > tier:
                usage = remaining - tier
                amount += usage * self.TIERED_RATES[tier]
                remaining = tier

        return round(amount, 2)

    def generate_bill(self, user):
        if not user.consumption_history:
            return

        last_reading = user.consumption_history[-1]
        consumption = last_reading["current"] - last_reading.get("previous", 0)
        amount = self.calculate_bill(consumption)

        due_date = datetime.now() + timedelta(days=self.DUE_DAYS)
        new_bill = BillingRecord(consumption, amount, due_date)
        user.billing_history.append(new_bill)
        print(f"New bill generated: ${amount} due {due_date.strftime('%Y-%m-%d')}")


    # ----------------------
    # Enhanced Security
    # ----------------------
    def admin_login(self):
        password = input("Enter admin password: ")
        if not checkpw(password.encode(), self.admin_password):
            print("Invalid admin credentials")
            return False
        return True

    def check_penalties(self, user):
        today = datetime.now()
        for bill in user.billing_history:
            if today > bill.due_date and bill.amount > 0:
                penalty = bill.amount * self.PENALTY_RATE
                bill.penalties += penalty
                bill.amount += penalty
                print(f"Applied late penalty of ${penalty} to bill due {bill.due_date}")

    def record_consumption(self):
        """Records the user's current water consumption."""
        user = self.users[self.current_user]
        try:
            current_reading = float(input("Enter current meter reading: "))
            previous_reading = user.consumption_history[-1]["current"] if user.consumption_history else 0
            user.consumption_history.append({"current": current_reading, "previous": previous_reading, "date": datetime.now().isoformat()})
            print("Consumption recorded successfully.")
            self.save_data()  # Save the updated data
        except ValueError:
            print("Invalid reading. Please enter a number.")
        except IndexError:
            user.consumption_history.append({"current": current_reading, "previous": 0, "date": datetime.now().isoformat()})
            print("Consumption recorded successfully.")
            self.save_data()

    def show_history(self):
        """Displays the user's transaction history."""
        user = self.users[self.current_user]
        print("\nTransaction History:")
        if not user.billing_history:
            print("No transactions found.")
            return

        for bill in user.billing_history:
            print(f"Consumption: {bill.consumption}m³, Amount: ${bill.amount}, Due Date: {bill.due_date.strftime('%Y-%m-%d')}")
            for payment in bill.payments:
                print(f"  - Payment: ${payment['amount']} on {payment['date']}")
            if bill.penalties:
                print(f"  - Penalty: ${bill.penalties}")

    def change_pin(self):
        """Allows the user to change their PIN."""
        user = self.users[self.current_user]
        old_pin = input("Enter your current PIN: ")
        if not checkpw(old_pin.encode(), user.pin):
            print("Incorrect PIN.")
            return

        new_pin = input("Enter your new PIN: ")
        user.pin = hashpw(new_pin.encode(), gensalt())
        print("PIN changed successfully.")
        self.save_data()


    # ----------------------
    # User Interface
    # ----------------------
    def user_dashboard(self):
        while True:
            print("\nUser Dashboard")
            print("1. View/Pay Bills")
            print("2. Record Consumption")
            print("3. Transaction History")
            print("4. Change PIN")
            print("5. Logout")

            choice = input("Select option: ")
            if choice == '1':
                self.handle_bills()
            elif choice == '2':
                self.record_consumption()
            elif choice == '3':
                self.show_history()
            elif choice == '4':
                self.change_pin()
            elif choice == '5':
                self.current_user = None
                break
            else:
                print("Invalid choice")

    def admin_dashboard(self):
        while True:
            print("\nAdmin Dashboard")
            print("1. View All Users")
            print("2. Generate Reports")
            print("3. Adjust Rates")
            print("4. System Maintenance")
            print("5. Return to Main")

            choice = input("Select option: ")
            if choice == '1':
                self.list_users()
            elif choice == '5':
                break

    def handle_bills(self):
        user = self.users[self.current_user]
        self.check_penalties(user)

        for i, bill in enumerate(user.billing_history):
            status = "PAID" if bill.amount <= 0 else f"Due {bill.due_date.strftime('%Y-%m-%d')}"
            print(f"{i+1}. {bill.consumption}m³: ${bill.amount} {status}")

        bill_choice = input("Select bill to pay (or 'back'): ")
        if bill_choice == 'back':
            return

        try:
            idx = int(bill_choice) - 1
            selected_bill = user.billing_history[idx]
            amount = float(input(f"Enter payment amount (max ${selected_bill.amount}): "))

            if amount > user.balance:
                print("Insufficient funds")
            elif amount > selected_bill.amount:
                print("Cannot overpay bill")
            else:
                user.balance -= amount
                selected_bill.amount -= amount
                selected_bill.payments.append({
                    "date": datetime.now().isoformat(),
                    "amount": amount
                })
                print(f"Payment of ${amount} processed")
                self.save_data()
        except (ValueError, IndexError):
            print("Invalid selection")

    def register_user(self):
        """Registers a new user in the system."""
        username = input("Enter username: ")
        if username in self.users:
            print("Username already exists")
            return

        password = input("Enter password: ")
        pin = input("Enter PIN: ")

        new_user = User(username, password, pin)
        self.users[username] = new_user
        print(f"User '{username}' registered successfully")
        self.save_data()

    def user_login(self):
        """Handles user login and authentication."""
        username = input("Enter username: ")
        if username not in self.users:
            print("Invalid username")
            return

        password = input("Enter password: ")
        if not checkpw(password.encode(), self.users[username].password):
            print("Incorrect password")
            return

        self.current_user = username
        print(f"Welcome, {username}!")
        self.user_dashboard()

    # ----------------------
    # Main Execution
    # ----------------------
    def main_menu(self):
        while True:
            print("\nWater Billing System")
            print("1. User Login")
            print("2. Register")
            print("3. Admin Login")
            print("4. Exit")

            choice = input("Select option: ")
            if choice == '1':
                self.user_login()
            elif choice == '2':
                self.register_user()
            elif choice == '3':
                if self.admin_login():
                    self.admin_dashboard()
            elif choice == '4':
                self.save_data()
                print("System shutdown")
                break

if __name__ == "__main__":
    system = WaterBillingSystem()
    system.main_menu()


Water Billing System
1. User Login
2. Register
3. Admin Login
4. Exit
Select option: 1
Enter username: 1
Invalid username

Water Billing System
1. User Login
2. Register
3. Admin Login
4. Exit
Select option: 1
Enter username: Eric
Enter password: 1234
Welcome, Eric!

User Dashboard
1. View/Pay Bills
2. Record Consumption
3. Transaction History
4. Change PIN
5. Logout
Select option: 4
Enter your current PIN: 1234
Enter your new PIN: 4321
PIN changed successfully.

User Dashboard
1. View/Pay Bills
2. Record Consumption
3. Transaction History
4. Change PIN
5. Logout
Select option: 1
Select bill to pay (or 'back'): 1
Invalid selection

User Dashboard
1. View/Pay Bills
2. Record Consumption
3. Transaction History
4. Change PIN
5. Logout
Select option: 2
Enter current meter reading: 7890
Consumption recorded successfully.

User Dashboard
1. View/Pay Bills
2. Record Consumption
3. Transaction History
4. Change PIN
5. Logout
Select option: 1
Select bill to pay (or 'back'): back

User Dashboar

In [11]:
import json
from datetime import datetime, timedelta
from bcrypt import hashpw, gensalt, checkpw
from collections import defaultdict
from dateutil.relativedelta import relativedelta

class User:
    def __init__(self, username, password, pin):
        self.username = username
        self.password = hashpw(password.encode(), gensalt())
        self.pin = hashpw(pin.encode(), gensalt())
        self.balance = 0.0
        self.consumption_history = []
        self.billing_history = []
        self.pin_attempts = 0
        self.last_login = None

class BillingRecord:
    def __init__(self, consumption, amount, due_date):
        self.consumption = consumption
        self.amount = amount
        self.due_date = due_date
        self.payments = []
        self.penalties = 0.0
        self.penalty_months_applied = 0

class WaterBillingSystem:
    TIERED_RATES = {
        0: 2.50,
        10: 3.75,
        30: 5.00
    }
    PENALTY_RATE = 0.05
    DUE_DAYS = 15

    def __init__(self):
        self.users = {}
        self.current_user = None
        self.login_attempts = defaultdict(int)
        self.load_data()
        self.admin_password = hashpw("secure_admin123".encode(), gensalt())

    def save_data(self):
        data = {
            "users": {
                uname: {
                    "password": u.password.decode(),
                    "pin": u.pin.decode(),
                    "balance": u.balance,
                    "consumption_history": u.consumption_history,
                    "billing_history": [
                        {
                            "consumption": br.consumption,
                            "amount": br.amount,
                            "due_date": br.due_date.isoformat(),
                            "payments": br.payments,
                            "penalties": br.penalties,
                            "penalty_months_applied": br.penalty_months_applied
                        } for br in u.billing_history
                    ],
                    "pin_attempts": u.pin_attempts
                } for uname, u in self.users.items()
            }
        }
        with open("billing_data.json", "w") as f:
            json.dump(data, f, indent=2)

    def load_data(self):
        try:
            with open("billing_data.json") as f:
                data = json.load(f)
                for uname, udata in data["users"].items():
                    user = User(uname, "temp", "temp")
                    user.password = udata["password"].encode()
                    user.pin = udata["pin"].encode()
                    user.balance = udata["balance"]
                    user.consumption_history = udata["consumption_history"]
                    user.billing_history = []
                    for br_data in udata["billing_history"]:
                        due_date = datetime.fromisoformat(br_data["due_date"])
                        br = BillingRecord(br_data["consumption"], br_data["amount"], due_date)
                        br.payments = br_data["payments"]
                        br.penalties = br_data["penalties"]
                        br.penalty_months_applied = br_data.get("penalty_months_applied", 0)
                        user.billing_history.append(br)
                    self.users[uname] = user
        except FileNotFoundError:
            pass

    def calculate_bill(self, consumption):
        amount = 0
        remaining = consumption
        tiers = sorted(self.TIERED_RATES.keys(), reverse=True)
        for tier in tiers:
            if consumption > tier:
                usage = remaining - tier
                amount += usage * self.TIERED_RATES[tier]
                remaining = tier
        return round(amount, 2)

    def generate_bill(self, user):
        if not user.consumption_history:
            return
        last_reading = user.consumption_history[-1]
        consumption = last_reading["current"] - last_reading.get("previous", 0)
        amount = self.calculate_bill(consumption)
        due_date = datetime.now() + timedelta(days=self.DUE_DAYS)
        new_bill = BillingRecord(consumption, amount, due_date)
        user.billing_history.append(new_bill)
        print(f"New bill generated: ${amount} due {due_date.strftime('%Y-%m-%d')}")

    def check_penalties(self, user):
        today = datetime.now()
        for bill in user.billing_history:
            if bill.amount <= 0:
                continue
            if today <= bill.due_date:
                continue
            due_date = bill.due_date
            months_overdue = (today.year - due_date.year) * 12 + (today.month - due_date.month)
            if today.day > due_date.day:
                months_overdue += 1
            months_overdue -= bill.penalty_months_applied
            if months_overdue > 0:
                penalty = bill.amount * self.PENALTY_RATE * months_overdue
                bill.penalties += penalty
                bill.amount += penalty
                bill.penalty_months_applied += months_overdue
                print(f"Applied {months_overdue} month(s) penalty: ${penalty} to bill due {due_date.date()}")

    def record_consumption(self):
        user = self.users[self.current_user]
        try:
            current_reading = float(input("Enter current meter reading: "))
            previous_reading = user.consumption_history[-1]["current"] if user.consumption_history else 0
            user.consumption_history.append({
                "current": current_reading,
                "previous": previous_reading,
                "date": datetime.now().isoformat()
            })
            self.generate_bill(user)
            self.save_data()
            print("Consumption recorded and bill generated.")
        except ValueError:
            print("Invalid reading. Please enter a number.")
        except IndexError:
            user.consumption_history.append({
                "current": current_reading,
                "previous": 0,
                "date": datetime.now().isoformat()
            })
            self.generate_bill(user)
            self.save_data()
            print("Consumption recorded and bill generated.")

    def add_funds(self):
        user = self.users[self.current_user]
        try:
            amount = float(input("Enter amount to add: "))
            if amount <= 0:
                print("Amount must be positive.")
                return
            user.balance += amount
            print(f"Added ${amount:.2f}. New balance: ${user.balance:.2f}")
            self.save_data()
        except ValueError:
            print("Invalid amount.")

    def show_history(self):
        user = self.users[self.current_user]
        print("\nTransaction History:")
        if not user.billing_history:
            print("No transactions found.")
            return
        for bill in user.billing_history:
            status = "PAID" if bill.amount <= 0 else f"Due {bill.due_date.strftime('%Y-%m-%d')}"
            print(f"Bill: {bill.consumption}m³, Amount: ${bill.amount:.2f}, {status}")
            for payment in bill.payments:
                print(f"  Paid ${payment['amount']:.2f} on {payment['date']}")
            if bill.penalties:
                print(f"  Penalties: ${bill.penalties:.2f}")

    def change_pin(self):
        user = self.users[self.current_user]
        old_pin = input("Enter current PIN: ")
        if not checkpw(old_pin.encode(), user.pin):
            print("Incorrect PIN.")
            return
        new_pin = input("Enter new PIN: ")
        user.pin = hashpw(new_pin.encode(), gensalt())
        print("PIN changed successfully.")
        self.save_data()

    def user_dashboard(self):
        while True:
            print("\nUser Dashboard")
            print("1. View/Pay Bills")
            print("2. Record Consumption")
            print("3. Transaction History")
            print("4. Add Funds")
            print("5. Change PIN")
            print("6. Logout")
            choice = input("Select option: ")
            if choice == '1':
                self.handle_bills()
            elif choice == '2':
                self.record_consumption()
            elif choice == '3':
                self.show_history()
            elif choice == '4':
                self.add_funds()
            elif choice == '5':
                self.change_pin()
            elif choice == '6':
                self.current_user = None
                break
            else:
                print("Invalid choice")

    def admin_dashboard(self):
        while True:
            print("\nAdmin Dashboard")
            print("1. View All Users")
            print("2. Generate Reports")
            print("3. Adjust Rates")
            print("4. System Maintenance")
            print("5. Return to Main")
            choice = input("Select option: ")
            if choice == '1':
                self.list_users()
            elif choice == '2':
                self.generate_reports()
            elif choice == '3':
                self.adjust_rates()
            elif choice == '5':
                break

    def handle_bills(self):
        user = self.users[self.current_user]
        self.check_penalties(user)
        for i, bill in enumerate(user.billing_history):
            status = "PAID" if bill.amount <= 0 else f"Due {bill.due_date.strftime('%Y-%m-%d')}"
            print(f"{i+1}. {bill.consumption}m³: ${bill.amount:.2f} {status}")
        bill_choice = input("Select bill to pay (or 'back'): ")
        if bill_choice.lower() == 'back':
            return
        try:
            idx = int(bill_choice) - 1
            selected_bill = user.billing_history[idx]
            if selected_bill.amount <= 0:
                print("This bill is already paid.")
                return
            amount = float(input(f"Enter payment amount (max ${selected_bill.amount:.2f}): "))
            if amount > user.balance:
                print("Insufficient funds.")
            elif amount > selected_bill.amount:
                print("Cannot overpay. Enter amount up to the due amount.")
            else:
                user.balance -= amount
                selected_bill.amount -= amount
                selected_bill.payments.append({
                    "date": datetime.now().isoformat(),
                    "amount": amount
                })
                print(f"Payment of ${amount:.2f} processed. Remaining balance: ${user.balance:.2f}")
                self.save_data()
        except (ValueError, IndexError):
            print("Invalid selection.")

    def list_users(self):
        print("\nRegistered Users:")
        for username in self.users:
            print(f"- {username}")

    def generate_reports(self):
        total_outstanding = 0.0
        total_paid = 0.0
        total_penalties = 0.0
        for user in self.users.values():
            for bill in user.billing_history:
                total_outstanding += bill.amount
                total_paid += sum(p['amount'] for p in bill.payments)
                total_penalties += bill.penalties
        print("\nSystem Report:")
        print(f"Total Users: {len(self.users)}")
        print(f"Total Outstanding: ${total_outstanding:.2f}")
        print(f"Total Paid: ${total_paid:.2f}")
        print(f"Total Penalties: ${total_penalties:.2f}")

    def adjust_rates(self):
        print("\nAdjust Rates")
        print(f"Current Tiered Rates: {self.TIERED_RATES}")
        print(f"Current Penalty Rate: {self.PENALTY_RATE * 100}%")
        print("1. Change Tiered Rates\n2. Change Penalty Rate\n3. Back")
        choice = input("Select option: ")
        if choice == '1':
            new_rates = {}
            for tier in sorted(self.TIERED_RATES.keys()):
                rate = float(input(f"Enter rate for tier {tier}+ m³: "))
                new_rates[tier] = rate
            self.TIERED_RATES = new_rates
            print("Tiered rates updated.")
        elif choice == '2':
            new_rate = float(input("Enter new penalty rate (%): ")) / 100
            self.PENALTY_RATE = new_rate
            print(f"Penalty rate updated to {new_rate*100}%.")
        elif choice == '3':
            return
        else:
            print("Invalid choice.")
        self.save_data()

    def register_user(self):
        username = input("Enter username: ")
        if username in self.users:
            print("Username exists.")
            return
        password = input("Enter password: ")
        pin = input("Enter PIN: ")
        self.users[username] = User(username, password, pin)
        print(f"User {username} registered.")
        self.save_data()

    def user_login(self):
        username = input("Username: ")
        if username not in self.users:
            print("User not found.")
            return
        password = input("Password: ")
        user = self.users[username]
        if not checkpw(password.encode(), user.password):
            print("Incorrect password.")
            return
        self.current_user = username
        self.user_dashboard()

    def admin_login(self):
        password = input("Admin password: ")
        if checkpw(password.encode(), self.admin_password):
            self.admin_dashboard()
        else:
            print("Invalid admin password.")

    def main_menu(self):
        while True:
            print("\nWater Billing System")
            print("1. User Login")
            print("2. Register")
            print("3. Admin Login")
            print("4. Exit")
            choice = input("Select option: ")
            if choice == '1':
                self.user_login()
            elif choice == '2':
                self.register_user()
            elif choice == '3':
                self.admin_login()
            elif choice == '4':
                self.save_data()
                print("Goodbye!")
                break

if __name__ == "__main__":
    system = WaterBillingSystem()
    system.main_menu()


Water Billing System
1. User Login
2. Register
3. Admin Login
4. Exit
Select option: 4
Goodbye!
