<a href="https://colab.research.google.com/github/Eugenetyx/AgenticAidemo/blob/main/Copy_of_AgenticAI_Demo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install streamlit pandas plotly openpyxl PyPDF2 python-dateutil requests pyngrok

Collecting streamlit
  Downloading streamlit-1.46.1-py3-none-any.whl.metadata (9.0 kB)
Collecting PyPDF2
  Downloading pypdf2-3.0.1-py3-none-any.whl.metadata (6.8 kB)
Collecting pyngrok
  Downloading pyngrok-7.2.11-py3-none-any.whl.metadata (9.4 kB)
Collecting watchdog<7,>=2.1.5 (from streamlit)
  Downloading watchdog-6.0.0-py3-none-manylinux2014_x86_64.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.3/44.3 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
Collecting pydeck<1,>=0.8.0b4 (from streamlit)
  Downloading pydeck-0.9.1-py2.py3-none-any.whl.metadata (4.1 kB)
Downloading streamlit-1.46.1-py3-none-any.whl (10.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.1/10.1 MB[0m [31m73.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pypdf2-3.0.1-py3-none-any.whl (232 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m232.6/232.6 kB[0m [31m21.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pyngrok-7.2.11-py3-

In [2]:
%%writefile main.py
#!/usr/bin/env python3
"""
Property Management System
Main application entry point
"""

import sqlite3
import sys
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple
import json

class DatabaseManager:
    """Handles all database operations"""

    def __init__(self, db_path: str = 'database.db'):
        self.db_path = db_path
        self.init_database()

    def get_connection(self):
        """Get database connection with foreign keys enabled"""
        conn = sqlite3.connect(self.db_path)
        conn.execute("PRAGMA foreign_keys = ON;")
        conn.row_factory = sqlite3.Row  # Enable dict-like access
        return conn

    def init_database(self):
        """Initialize database with schema and sample data"""
        conn = self.get_connection()
        cursor = conn.cursor()

        # Execute the schema
        cursor.executescript("""
        -- drop old tables if they exist
        DROP TABLE IF EXISTS ticket_conversations;
        DROP TABLE IF EXISTS ticket_comments;
        DROP TABLE IF EXISTS service_tickets;
        DROP TABLE IF EXISTS payments;
        DROP TABLE IF EXISTS leases;
        DROP TABLE IF EXISTS units;
        DROP TABLE IF EXISTS agents;
        DROP TABLE IF EXISTS properties;
        DROP TABLE IF EXISTS tenants;

        -- tenants
        CREATE TABLE tenants (
            id              INTEGER PRIMARY KEY AUTOINCREMENT,
            timestamp       DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S','now','+8 hours')),
            first_name      TEXT    NOT NULL,
            last_name       TEXT    NOT NULL,
            email           TEXT    UNIQUE NOT NULL,
            phone           TEXT,
            date_of_birth   TEXT,
            created_at      DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S','now','+8 hours'))
        );

        -- properties
        CREATE TABLE properties (
            id              INTEGER PRIMARY KEY AUTOINCREMENT,
            timestamp       DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S','now','+8 hours')),
            name            TEXT    NOT NULL,
            address_line1   TEXT    NOT NULL,
            address_line2   TEXT,
            city            TEXT    NOT NULL,
            state           TEXT,
            postal_code     TEXT,
            country         TEXT    NOT NULL,
            created_at      DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S','now','+8 hours'))
        );

        -- units
        CREATE TABLE units (
            id              INTEGER PRIMARY KEY AUTOINCREMENT,
            timestamp       DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S','now','+8 hours')),
            property_id     INTEGER NOT NULL REFERENCES properties(id) ON DELETE CASCADE,
            unit_number     TEXT    NOT NULL,
            floor           TEXT,
            bedrooms        INTEGER,
            bathrooms       REAL,
            square_feet     INTEGER,
            status          TEXT    DEFAULT 'available',
            created_at      DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S','now','+8 hours'))
        );

        -- leases
        CREATE TABLE leases (
            id              INTEGER PRIMARY KEY AUTOINCREMENT,
            timestamp       DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S','now','+8 hours')),
            tenant_id       INTEGER NOT NULL REFERENCES tenants(id) ON DELETE CASCADE,
            unit_id         INTEGER NOT NULL REFERENCES units(id) ON DELETE CASCADE,
            start_date      DATETIME NOT NULL,
            end_date        DATETIME NOT NULL,
            rent_amount     REAL    NOT NULL,
            security_deposit REAL,
            status          TEXT    DEFAULT 'active',
            created_at      DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S','now','+8 hours'))
        );

        -- agents
        CREATE TABLE agents (
            id              INTEGER PRIMARY KEY AUTOINCREMENT,
            timestamp       DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S','now','+8 hours')),
            first_name      TEXT,
            last_name       TEXT,
            role            TEXT,
            email           TEXT    UNIQUE,
            phone           TEXT,
            created_at      DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S','now','+8 hours'))
        );

        -- service tickets
        CREATE TABLE service_tickets (
            id              INTEGER PRIMARY KEY AUTOINCREMENT,
            timestamp       DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S','now','+8 hours')),
            lease_id        INTEGER NOT NULL REFERENCES leases(id) ON DELETE CASCADE,
            raised_by       INTEGER NOT NULL REFERENCES tenants(id) ON DELETE SET NULL,
            assigned_to     INTEGER REFERENCES agents(id) ON DELETE SET NULL,
            category        TEXT    NOT NULL,
            subcategory     TEXT,
            description     TEXT    NOT NULL,
            status          TEXT    DEFAULT 'open',
            priority        TEXT    DEFAULT 'normal',
            created_at      DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S','now','+8 hours')),
            updated_at      DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S','now','+8 hours'))
        );

        -- ticket comments
        CREATE TABLE ticket_comments (
            id              INTEGER PRIMARY KEY AUTOINCREMENT,
            timestamp       DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S','now','+8 hours')),
            ticket_id       INTEGER NOT NULL REFERENCES service_tickets(id) ON DELETE CASCADE,
            author_id       INTEGER NOT NULL,
            author_type     TEXT    NOT NULL,
            comment_text    TEXT    NOT NULL,
            created_at      DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S','now','+8 hours'))
        );

        -- payments
        CREATE TABLE payments (
            id              INTEGER PRIMARY KEY AUTOINCREMENT,
            timestamp       DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S','now','+8 hours')),
            lease_id        INTEGER NOT NULL REFERENCES leases(id) ON DELETE CASCADE,
            payment_type    TEXT    NOT NULL,
            billing_period  TEXT,
            due_date        DATETIME,
            amount          REAL    NOT NULL,
            method          TEXT,
            paid_on         DATETIME,
            reference_number TEXT,
            created_at      DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S','now','+8 hours'))
        );

        -- ticket conversations
        CREATE TABLE ticket_conversations (
            id              INTEGER PRIMARY KEY AUTOINCREMENT,
            timestamp       DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S','now','+8 hours')),
            ticket_id       INTEGER NOT NULL REFERENCES service_tickets(id) ON DELETE CASCADE,
            author_type     TEXT    NOT NULL,
            author_id       INTEGER NOT NULL,
            message_text    TEXT    NOT NULL,
            sent_at         DATETIME NOT NULL DEFAULT (strftime('%Y-%m-%d %H:%M:%S','now','+8 hours'))
        );
        """)

        # Insert sample data
        cursor.executescript("""
        -- Tenants
        INSERT INTO tenants (first_name, last_name, email, phone, date_of_birth) VALUES
        ('Alice', 'Tan',   'alice.tan@example.com',   '012-3456789', '1990-04-15'),
        ('Brian', 'Lee',   'brian.lee@example.com',   '013-2345678', '1985-11-22'),
        ('Clara', 'Wong',  'clara.wong@example.com',  '014-1234567', '1992-07-09'),
        ('David','Chong',  'david.chong@example.com', '015-9876543', '1988-02-28'),
        ('Elaine','Ng',    'elaine.ng@example.com',   '016-8765432', '1995-12-05');

        -- Properties
        INSERT INTO properties (name, address_line1, address_line2, city, state, postal_code, country) VALUES
        ('Sunset Apartments',  '10 Jalan Bukit',     NULL,            'Petaling Jaya', 'Selangor','46000','Malaysia'),
        ('Ocean View Condos',  '25 Jalan Pantai',    'Block B, Unit 12','Penang',      'Penang',  '10470','Malaysia'),
        ('Hilltop Villas',     '5 Jalan Bukit Tinggi',NULL,            'Kuala Lumpur','KL','50450','Malaysia'),
        ('Lakewood Homes',     '88 Lake Drive',       NULL,            'Ipoh',        'Perak',  '30000','Malaysia'),
        ('Riverdale Towers',   '123 Riverside Rd',    'Tower A, 15th Fl','Kuantan',   'Pahang', '25000','Malaysia');

        -- Units
        INSERT INTO units (property_id, unit_number, floor, bedrooms, bathrooms, square_feet) VALUES
        (1, 'A-01',  '1', 2, 1.5,  850),
        (1, 'B-03',  '2', 3, 2.0, 1200),
        (2, 'B-12', '12', 1, 1.0,  600),
        (3, 'V-05',  '1', 4, 3.0, 2000),
        (5, 'T-150','15', 2, 2.0,  900);

        -- Leases
        INSERT INTO leases (tenant_id, unit_id, start_date, end_date, rent_amount, security_deposit) VALUES
        (1, 1, '2025-01-01','2025-12-31', 1500.00, 1500.00),
        (2, 2, '2025-03-15','2026-03-14', 2100.00, 2100.00),
        (3, 3, '2025-05-01','2026-04-30',  800.00,  800.00),
        (4, 4, '2025-02-01','2025-07-31', 3200.00, 3200.00),
        (5, 5, '2025-06-01','2026-05-31', 1200.00, 1200.00);

        -- Agents
        INSERT INTO agents (first_name, last_name, role, email, phone) VALUES
        ('Farah','Iskandar','Manager',   'farah.iskandar@example.com','017-1234567'),
        ('Gavin','Lim',     'Technician','gavin.lim@example.com',    '018-2345678'),
        ('Han',  'Yeo',     'Clerk',     'han.yeo@example.com',      '019-3456789'),
        ('Irene','Chew',    'Supervisor','irene.chew@example.com',   '010-4567890'),
        ('Jamal','Omar',    'Technician','jamal.omar@example.com',   '011-5678901');

        -- Service Tickets
        INSERT INTO service_tickets (lease_id, raised_by, assigned_to, category, subcategory, description, priority) VALUES
        (1, 1, 2, 'Maintenance','Plumbing',   'Leaking faucet in kitchen',       'high'),
        (2, 2, 3, 'Billing',    'Invoice',    'Dispute on last month''s invoice', 'normal'),
        (3, 3, NULL,'Maintenance','Electrical','Living room light flickering',   'high'),
        (4, 4, 5, 'Inquiries',  'General',    'How to renew lease online?',      'low'),
        (5, 5, 4, 'Maintenance','Painting',   'Wall paint peeling in bedroom',   'normal');

        -- Ticket Comments
        INSERT INTO ticket_comments (ticket_id, author_id, author_type, comment_text) VALUES
        (1, 2, 'agent',  'I have scheduled a plumber for tomorrow.'),
        (1, 1, 'tenant', 'Thanks—please let me know the time.'),
        (2, 3, 'agent',  'Please provide the invoice number.'),
        (3, 3, 'agent',  'Electrician will arrive this afternoon.'),
        (4, 5, 'agent',  'You can renew via our website under "My Account."');

        -- Payments
        INSERT INTO payments (lease_id, payment_type, billing_period, due_date, amount, method, paid_on, reference_number) VALUES
        (1, 'rent',       '2025-06','2025-06-05',1500.00,'bank_transfer','2025-06-03 10:15:00','BTX123456'),
        (2, 'electricity','2025-05','2025-05-20', 120.50,'credit_card',  '2025-05-18 14:22:00','CC987654'),
        (3, 'water',      '2025-05','2025-05-25',  45.75,'bank_transfer', NULL,                NULL),
        (4, 'rent',       '2025-06','2025-06-01',3200.00,'credit_card',  '2025-05-30 09:00:00','CC112233'),
        (5, 'rent',       '2025-06','2025-06-07',1200.00,'bank_transfer','2025-06-06 11:45:00','BTX778899');

        -- Ticket Conversations
        INSERT INTO ticket_conversations (ticket_id, author_type, author_id, message_text) VALUES
        (1, 'agent',  2, 'Plumber ETA: 9am tomorrow.'),
        (1, 'tenant', 1, 'Okay, I''ll be home by then.'),
        (2, 'agent',  3, 'Awaiting invoice details.'),
        (5, 'agent',  4, 'Painting crew scheduled Friday.'),
        (3, 'agent',  3, 'Electric switch replaced.');
        """)

        conn.commit()
        conn.close()
        print("Database initialized successfully!")

class PropertyManager:
    """Main business logic for property management"""

    def __init__(self):
        self.db = DatabaseManager()

    # TENANT OPERATIONS
    def get_all_tenants(self) -> List[Dict]:
        """Get all tenants"""
        conn = self.db.get_connection()
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM tenants ORDER BY last_name, first_name")
        tenants = [dict(row) for row in cursor.fetchall()]
        conn.close()
        return tenants

    def get_tenant_by_id(self, tenant_id: int) -> Optional[Dict]:
        """Get tenant by ID"""
        conn = self.db.get_connection()
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM tenants WHERE id = ?", (tenant_id,))
        row = cursor.fetchone()
        conn.close()
        return dict(row) if row else None

    def add_tenant(self, first_name: str, last_name: str, email: str,
                   phone: str = None, date_of_birth: str = None) -> int:
        """Add new tenant"""
        conn = self.db.get_connection()
        cursor = conn.cursor()
        cursor.execute("""
            INSERT INTO tenants (first_name, last_name, email, phone, date_of_birth)
            VALUES (?, ?, ?, ?, ?)
        """, (first_name, last_name, email, phone, date_of_birth))
        tenant_id = cursor.lastrowid
        conn.commit()
        conn.close()
        return tenant_id

    # PROPERTY OPERATIONS
    def get_all_properties(self) -> List[Dict]:
        """Get all properties with unit counts"""
        conn = self.db.get_connection()
        cursor = conn.cursor()
        cursor.execute("""
            SELECT p.*, COUNT(u.id) as unit_count
            FROM properties p
            LEFT JOIN units u ON p.id = u.property_id
            GROUP BY p.id
            ORDER BY p.name
        """)
        properties = [dict(row) for row in cursor.fetchall()]
        conn.close()
        return properties

    def get_units_by_property(self, property_id: int) -> List[Dict]:
        """Get all units for a property"""
        conn = self.db.get_connection()
        cursor = conn.cursor()
        cursor.execute("""
            SELECT u.*,
                   CASE WHEN l.id IS NOT NULL THEN 'occupied' ELSE u.status END as current_status,
                   t.first_name || ' ' || t.last_name as tenant_name
            FROM units u
            LEFT JOIN leases l ON u.id = l.unit_id AND l.status = 'active'
            LEFT JOIN tenants t ON l.tenant_id = t.id
            WHERE u.property_id = ?
            ORDER BY u.unit_number
        """, (property_id,))
        units = [dict(row) for row in cursor.fetchall()]
        conn.close()
        return units

    # LEASE OPERATIONS
    def get_active_leases(self) -> List[Dict]:
        """Get all active leases with tenant and unit info"""
        conn = self.db.get_connection()
        cursor = conn.cursor()
        cursor.execute("""
            SELECT l.*,
                   t.first_name || ' ' || t.last_name as tenant_name,
                   t.email as tenant_email,
                   p.name as property_name,
                   u.unit_number
            FROM leases l
            JOIN tenants t ON l.tenant_id = t.id
            JOIN units u ON l.unit_id = u.id
            JOIN properties p ON u.property_id = p.id
            WHERE l.status = 'active'
            ORDER BY l.end_date
        """)
        leases = [dict(row) for row in cursor.fetchall()]
        conn.close()
        return leases

    def get_expiring_leases(self, days_ahead: int = 30) -> List[Dict]:
        """Get leases expiring within specified days"""
        conn = self.db.get_connection()
        cursor = conn.cursor()
        end_date = datetime.now() + timedelta(days=days_ahead)
        cursor.execute("""
            SELECT l.*,
                   t.first_name || ' ' || t.last_name as tenant_name,
                   t.email as tenant_email,
                   p.name as property_name,
                   u.unit_number
            FROM leases l
            JOIN tenants t ON l.tenant_id = t.id
            JOIN units u ON l.unit_id = u.id
            JOIN properties p ON u.property_id = p.id
            WHERE l.status = 'active' AND l.end_date <= ?
            ORDER BY l.end_date
        """, (end_date.strftime('%Y-%m-%d'),))
        leases = [dict(row) for row in cursor.fetchall()]
        conn.close()
        return leases

    # PAYMENT OPERATIONS
    def get_pending_payments(self) -> List[Dict]:
        """Get all pending payments"""
        conn = self.db.get_connection()
        cursor = conn.cursor()
        cursor.execute("""
            SELECT p.*,
                   t.first_name || ' ' || t.last_name as tenant_name,
                   t.email as tenant_email,
                   pr.name as property_name,
                   u.unit_number
            FROM payments p
            JOIN leases l ON p.lease_id = l.id
            JOIN tenants t ON l.tenant_id = t.id
            JOIN units u ON l.unit_id = u.id
            JOIN properties pr ON u.property_id = pr.id
            WHERE p.paid_on IS NULL
            ORDER BY p.due_date
        """)
        payments = [dict(row) for row in cursor.fetchall()]
        conn.close()
        return payments

    def mark_payment_paid(self, payment_id: int, method: str, reference: str = None):
        """Mark a payment as paid"""
        conn = self.db.get_connection()
        cursor = conn.cursor()
        cursor.execute("""
            UPDATE payments
            SET paid_on = strftime('%Y-%m-%d %H:%M:%S','now','+8 hours'),
                method = ?,
                reference_number = ?
            WHERE id = ?
        """, (method, reference, payment_id))
        conn.commit()
        conn.close()

    # SERVICE TICKET OPERATIONS
    def get_open_tickets(self) -> List[Dict]:
        """Get all open service tickets"""
        conn = self.db.get_connection()
        cursor = conn.cursor()
        cursor.execute("""
            SELECT st.*,
                   t.first_name || ' ' || t.last_name as tenant_name,
                   a.first_name || ' ' || a.last_name as agent_name,
                   p.name as property_name,
                   u.unit_number
            FROM service_tickets st
            JOIN leases l ON st.lease_id = l.id
            JOIN tenants t ON l.tenant_id = t.id
            JOIN units u ON l.unit_id = u.id
            JOIN properties p ON u.property_id = p.id
            LEFT JOIN agents a ON st.assigned_to = a.id
            WHERE st.status IN ('open', 'in_progress')
            ORDER BY st.priority DESC, st.created_at
        """)
        tickets = [dict(row) for row in cursor.fetchall()]
        conn.close()
        return tickets

    def create_service_ticket(self, lease_id: int, raised_by: int, category: str,
                             description: str, priority: str = 'normal',
                             subcategory: str = None) -> int:
        """Create new service ticket"""
        conn = self.db.get_connection()
        cursor = conn.cursor()
        cursor.execute("""
            INSERT INTO service_tickets (lease_id, raised_by, category, subcategory, description, priority)
            VALUES (?, ?, ?, ?, ?, ?)
        """, (lease_id, raised_by, category, subcategory, description, priority))
        ticket_id = cursor.lastrowid
        conn.commit()
        conn.close()
        return ticket_id

    def assign_ticket(self, ticket_id: int, agent_id: int):
        """Assign ticket to agent"""
        conn = self.db.get_connection()
        cursor = conn.cursor()
        cursor.execute("""
            UPDATE service_tickets
            SET assigned_to = ?,
                status = 'assigned',
                updated_at = strftime('%Y-%m-%d %H:%M:%S','now','+8 hours')
            WHERE id = ?
        """, (agent_id, ticket_id))
        conn.commit()
        conn.close()

    # REPORTING
    def get_financial_summary(self, month: str = None) -> Dict:
        """Get financial summary for a given month (YYYY-MM format)"""
        conn = self.db.get_connection()
        cursor = conn.cursor()

        if month:
            date_filter = f"AND strftime('%Y-%m', p.due_date) = '{month}'"
        else:
            date_filter = f"AND strftime('%Y-%m', p.due_date) = strftime('%Y-%m', 'now')"

        cursor.execute(f"""
            SELECT
                COALESCE(SUM(CASE WHEN p.paid_on IS NOT NULL THEN p.amount ELSE 0 END), 0) as total_collected,
                COALESCE(SUM(CASE WHEN p.paid_on IS NULL THEN p.amount ELSE 0 END), 0) as total_pending,
                COUNT(CASE WHEN p.paid_on IS NOT NULL THEN 1 END) as payments_received,
                COUNT(CASE WHEN p.paid_on IS NULL THEN 1 END) as payments_pending
            FROM payments p
            WHERE 1=1 {date_filter}
        """)

        summary = dict(cursor.fetchone())
        conn.close()
        return summary


def main():
    """Main entry point"""
    print("Initializing Property Management System...")
    pm = PropertyManager()
    print("System ready!")


if __name__ == "__main__":
    main()

Writing main.py


In [3]:
%%writefile models.py
"""
Data models and utility classes for Property Management System
"""

from datetime import datetime, date
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
import json

@dataclass
class Tenant:
    """Tenant data model"""
    id: Optional[int] = None
    first_name: str = ""
    last_name: str = ""
    email: str = ""
    phone: Optional[str] = None
    date_of_birth: Optional[str] = None
    created_at: Optional[str] = None
    timestamp: Optional[str] = None

    @property
    def full_name(self) -> str:
        return f"{self.first_name} {self.last_name}".strip()

    def to_dict(self) -> Dict:
        return asdict(self)

    @classmethod
    def from_dict(cls, data: Dict) -> 'Tenant':
        return cls(**data)

@dataclass
class Property:
    """Property data model"""
    id: Optional[int] = None
    name: str = ""
    address_line1: str = ""
    address_line2: Optional[str] = None
    city: str = ""
    state: Optional[str] = None
    postal_code: Optional[str] = None
    country: str = ""
    created_at: Optional[str] = None
    timestamp: Optional[str] = None

    @property
    def full_address(self) -> str:
        parts = [self.address_line1]
        if self.address_line2:
            parts.append(self.address_line2)
        parts.extend([self.city, self.state, self.postal_code])
        return ", ".join(filter(None, parts))

    def to_dict(self) -> Dict:
        return asdict(self)

    @classmethod
    def from_dict(cls, data: Dict) -> 'Property':
        return cls(**data)

@dataclass
class Unit:
    """Unit data model"""
    id: Optional[int] = None
    property_id: int = 0
    unit_number: str = ""
    floor: Optional[str] = None
    bedrooms: Optional[int] = None
    bathrooms: Optional[float] = None
    square_feet: Optional[int] = None
    status: str = "available"
    created_at: Optional[str] = None
    timestamp: Optional[str] = None

    @property
    def description(self) -> str:
        parts = []
        if self.bedrooms:
            parts.append(f"{self.bedrooms}BR")
        if self.bathrooms:
            parts.append(f"{self.bathrooms}BA")
        if self.square_feet:
            parts.append(f"{self.square_feet}sqft")
        return " / ".join(parts)

    def to_dict(self) -> Dict:
        return asdict(self)

    @classmethod
    def from_dict(cls, data: Dict) -> 'Unit':
        return cls(**data)

@dataclass
class Lease:
    """Lease data model"""
    id: Optional[int] = None
    tenant_id: int = 0
    unit_id: int = 0
    start_date: str = ""
    end_date: str = ""
    rent_amount: float = 0.0
    security_deposit: Optional[float] = None
    status: str = "active"
    created_at: Optional[str] = None
    timestamp: Optional[str] = None

    @property
    def is_active(self) -> bool:
        return self.status == "active"

    @property
    def days_until_expiry(self) -> Optional[int]:
        if not self.end_date:
            return None
        try:
            end_date = datetime.strptime(self.end_date, '%Y-%m-%d').date()
            today = date.today()
            return (end_date - today).days
        except ValueError:
            return None

    @property
    def is_expiring_soon(self, days: int = 30) -> bool:
        days_left = self.days_until_expiry
        return days_left is not None and 0 <= days_left <= days

    def to_dict(self) -> Dict:
        return asdict(self)

    @classmethod
    def from_dict(cls, data: Dict) -> 'Lease':
        return cls(**data)

@dataclass
class Agent:
    """Agent data model"""
    id: Optional[int] = None
    first_name: Optional[str] = None
    last_name: Optional[str] = None
    role: Optional[str] = None
    email: Optional[str] = None
    phone: Optional[str] = None
    created_at: Optional[str] = None
    timestamp: Optional[str] = None

    @property
    def full_name(self) -> str:
        if self.first_name and self.last_name:
            return f"{self.first_name} {self.last_name}"
        elif self.first_name:
            return self.first_name
        elif self.last_name:
            return self.last_name
        return "Unknown Agent"

    def to_dict(self) -> Dict:
        return asdict(self)

    @classmethod
    def from_dict(cls, data: Dict) -> 'Agent':
        return cls(**data)

@dataclass
class ServiceTicket:
    """Service ticket data model"""
    id: Optional[int] = None
    lease_id: int = 0
    raised_by: int = 0
    assigned_to: Optional[int] = None
    category: str = ""
    subcategory: Optional[str] = None
    description: str = ""
    status: str = "open"
    priority: str = "normal"
    created_at: Optional[str] = None
    updated_at: Optional[str] = None
    timestamp: Optional[str] = None

    @property
    def is_open(self) -> bool:
        return self.status in ['open', 'assigned', 'in_progress']

    @property
    def priority_level(self) -> int:
        priority_map = {'low': 1, 'normal': 2, 'high': 3, 'urgent': 4}
        return priority_map.get(self.priority, 2)

    def to_dict(self) -> Dict:
        return asdict(self)

    @classmethod
    def from_dict(cls, data: Dict) -> 'ServiceTicket':
        return cls(**data)

@dataclass
class Payment:
    """Payment data model"""
    id: Optional[int] = None
    lease_id: int = 0
    payment_type: str = ""
    billing_period: Optional[str] = None
    due_date: Optional[str] = None
    amount: float = 0.0
    method: Optional[str] = None
    paid_on: Optional[str] = None
    reference_number: Optional[str] = None
    created_at: Optional[str] = None
    timestamp: Optional[str] = None

    @property
    def is_paid(self) -> bool:
        return self.paid_on is not None

    @property
    def is_overdue(self) -> bool:
        if not self.due_date or self.is_paid:
            return False
        try:
            due_date = datetime.strptime(self.due_date, '%Y-%m-%d').date()
            return date.today() > due_date
        except ValueError:
            return False

    @property
    def days_overdue(self) -> Optional[int]:
        if not self.is_overdue:
            return None
        try:
            due_date = datetime.strptime(self.due_date, '%Y-%m-%d').date()
            return (date.today() - due_date).days
        except ValueError:
            return None

    def to_dict(self) -> Dict:
        return asdict(self)

    @classmethod
    def from_dict(cls, data: Dict) -> 'Payment':
        return cls(**data)

@dataclass
class TicketComment:
    """Ticket comment data model"""
    id: Optional[int] = None
    ticket_id: int = 0
    author_id: int = 0
    author_type: str = ""
    comment_text: str = ""
    created_at: Optional[str] = None
    timestamp: Optional[str] = None

    def to_dict(self) -> Dict:
        return asdict(self)

    @classmethod
    def from_dict(cls, data: Dict) -> 'TicketComment':
        return cls(**data)

@dataclass
class TicketConversation:
    """Ticket conversation data model"""
    id: Optional[int] = None
    ticket_id: int = 0
    author_type: str = ""
    author_id: int = 0
    message_text: str = ""
    sent_at: Optional[str] = None
    timestamp: Optional[str] = None

    def to_dict(self) -> Dict:
        return asdict(self)

    @classmethod
    def from_dict(cls, data: Dict) -> 'TicketConversation':
        return cls(**data)

class DataValidator:
    """Data validation utilities"""

    @staticmethod
    def validate_email(email: str) -> bool:
        """Basic email validation"""
        import re
        pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        return re.match(pattern, email) is not None

    @staticmethod
    def validate_phone(phone: str) -> bool:
        """Basic Malaysian phone number validation"""
        import re
        # Malaysian phone patterns: 01X-XXXXXXX, +601X-XXXXXXX, etc.
        patterns = [
            r'^01[0-9]-[0-9]{7,8}$',  # 01X-XXXXXXX
            r'^01[0-9][0-9]{7,8}$',   # 01XXXXXXXX
            r'^\+601[0-9]-[0-9]{7,8}$',  # +601X-XXXXXXX
            r'^\+601[0-9][0-9]{7,8}$'    # +601XXXXXXXX
        ]
        return any(re.match(pattern, phone) for pattern in patterns)

    @staticmethod
    def validate_date(date_str: str, format_str: str = '%Y-%m-%d') -> bool:
        """Validate date string format"""
        try:
            datetime.strptime(date_str, format_str)
            return True
        except ValueError:
            return False

    @staticmethod
    def validate_amount(amount: Any) -> bool:
        """Validate monetary amount"""
        try:
            float_amount = float(amount)
            return float_amount >= 0
        except (ValueError, TypeError):
            return False

    @staticmethod
    def validate_postal_code(postal_code: str, country: str = 'Malaysia') -> bool:
        """Validate postal code based on country"""
        import re
        if country.lower() == 'malaysia':
            # Malaysian postal codes are 5 digits
            return re.match(r'^\d{5}$', postal_code) is not None
        else:
            # Generic validation - at least 3 characters
            return len(postal_code.strip()) >= 3

    @staticmethod
    def validate_unit_number(unit_number: str) -> bool:
        """Validate unit number format"""
        # Unit numbers should be non-empty and alphanumeric
        return bool(unit_number.strip()) and len(unit_number.strip()) <= 20

class ReportGenerator:
    """Generate various reports"""

    @staticmethod
    def generate_lease_expiry_report(leases: List[Dict], days_ahead: int = 30) -> Dict:
        """Generate lease expiry report"""
        expiring_leases = []
        total_rent_at_risk = 0.0

        for lease_data in leases:
            lease = Lease.from_dict(lease_data)
            if lease.is_expiring_soon(days_ahead):
                expiring_leases.append(lease_data)
                total_rent_at_risk += lease.rent_amount

        return {
            'report_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            'period_days': days_ahead,
            'total_expiring': len(expiring_leases),
            'total_rent_at_risk': total_rent_at_risk,
            'leases': expiring_leases
        }

    @staticmethod
    def generate_payment_summary(payments: List[Dict], period: str = 'current_month') -> Dict:
        """Generate payment summary report"""
        paid_payments = []
        pending_payments = []
        overdue_payments = []

        total_collected = 0.0
        total_pending = 0.0
        total_overdue = 0.0

        for payment_data in payments:
            payment = Payment.from_dict(payment_data)

            if payment.is_paid:
                paid_payments.append(payment_data)
                total_collected += payment.amount
            elif payment.is_overdue:
                overdue_payments.append(payment_data)
                total_overdue += payment.amount
            else:
                pending_payments.append(payment_data)
                total_pending += payment.amount

        return {
            'report_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            'period': period,
            'summary': {
                'total_collected': total_collected,
                'total_pending': total_pending,
                'total_overdue': total_overdue,
                'collection_rate': (total_collected / (total_collected + total_pending + total_overdue) * 100) if (total_collected + total_pending + total_overdue) > 0 else 0
            },
            'counts': {
                'paid': len(paid_payments),
                'pending': len(pending_payments),
                'overdue': len(overdue_payments)
            },
            'details': {
                'paid_payments': paid_payments,
                'pending_payments': pending_payments,
                'overdue_payments': overdue_payments
            }
        }

    @staticmethod
    def generate_occupancy_report(properties: List[Dict], units: List[Dict], leases: List[Dict]) -> Dict:
        """Generate property occupancy report"""
        occupancy_data = []

        for property_data in properties:
            property_units = [u for u in units if u['property_id'] == property_data['id']]
            occupied_units = 0

            for unit in property_units:
                unit_leases = [l for l in leases if l['unit_id'] == unit['id'] and l['status'] == 'active']
                if unit_leases:
                    occupied_units += 1

            total_units = len(property_units)
            occupancy_rate = (occupied_units / total_units * 100) if total_units > 0 else 0

            occupancy_data.append({
                'property_id': property_data['id'],
                'property_name': property_data['name'],
                'total_units': total_units,
                'occupied_units': occupied_units,
                'vacant_units': total_units - occupied_units,
                'occupancy_rate': occupancy_rate
            })

        # Overall statistics
        total_units_all = sum(p['total_units'] for p in occupancy_data)
        total_occupied_all = sum(p['occupied_units'] for p in occupancy_data)
        overall_occupancy = (total_occupied_all / total_units_all * 100) if total_units_all > 0 else 0

        return {
            'report_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            'overall_occupancy_rate': overall_occupancy,
            'total_units': total_units_all,
            'total_occupied': total_occupied_all,
            'total_vacant': total_units_all - total_occupied_all,
            'properties': occupancy_data
        }

    @staticmethod
    def generate_service_ticket_report(tickets: List[Dict]) -> Dict:
        """Generate service ticket analysis report"""
        if not tickets:
            return {
                'report_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                'total_tickets': 0,
                'summary': {},
                'recommendations': []
            }

        # Analyze ticket data
        priorities = {}
        categories = {}
        statuses = {}

        for ticket in tickets:
            priority = ticket.get('priority', 'normal')
            category = ticket.get('category', 'Unknown')
            status = ticket.get('status', 'open')

            priorities[priority] = priorities.get(priority, 0) + 1
            categories[category] = categories.get(category, 0) + 1
            statuses[status] = statuses.get(status, 0) + 1

        # Generate recommendations
        recommendations = []
        if priorities.get('urgent', 0) > 0:
            recommendations.append(f"Address {priorities['urgent']} urgent tickets immediately")
        if priorities.get('high', 0) > 2:
            recommendations.append("Consider additional maintenance staff for high priority tickets")
        if categories.get('Maintenance', 0) > len(tickets) * 0.6:
            recommendations.append("Review preventive maintenance schedule")

        return {
            'report_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            'total_tickets': len(tickets),
            'summary': {
                'by_priority': priorities,
                'by_category': categories,
                'by_status': statuses
            },
            'recommendations': recommendations
        }

    @staticmethod
    def generate_tenant_demographics_report(tenants: List[Dict]) -> Dict:
        """Generate tenant demographics report"""
        if not tenants:
            return {
                'report_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                'total_tenants': 0,
                'demographics': {}
            }

        age_groups = {'18-25': 0, '26-35': 0, '36-45': 0, '46-55': 0, '55+': 0, 'Unknown': 0}
        contact_completeness = {'email_only': 0, 'phone_only': 0, 'both': 0, 'neither': 0}

        for tenant in tenants:
            # Age analysis
            if tenant.get('date_of_birth'):
                age = Utils.calculate_age(tenant['date_of_birth'])
                if age:
                    if age <= 25:
                        age_groups['18-25'] += 1
                    elif age <= 35:
                        age_groups['26-35'] += 1
                    elif age <= 45:
                        age_groups['36-45'] += 1
                    elif age <= 55:
                        age_groups['46-55'] += 1
                    else:
                        age_groups['55+'] += 1
                else:
                    age_groups['Unknown'] += 1
            else:
                age_groups['Unknown'] += 1

            # Contact completeness
            has_email = bool(tenant.get('email'))
            has_phone = bool(tenant.get('phone'))

            if has_email and has_phone:
                contact_completeness['both'] += 1
            elif has_email:
                contact_completeness['email_only'] += 1
            elif has_phone:
                contact_completeness['phone_only'] += 1
            else:
                contact_completeness['neither'] += 1

        return {
            'report_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            'total_tenants': len(tenants),
            'demographics': {
                'age_groups': age_groups,
                'contact_completeness': contact_completeness
            }
        }

class Utils:
    """General utility functions"""

    @staticmethod
    def format_currency(amount: float, currency: str = 'RM') -> str:
        """Format amount as currency"""
        return f"{currency} {amount:,.2f}"

    @staticmethod
    def format_date(date_str: str, input_format: str = '%Y-%m-%d', output_format: str = '%d/%m/%Y') -> str:
        """Format date string"""
        try:
            date_obj = datetime.strptime(date_str, input_format)
            return date_obj.strftime(output_format)
        except ValueError:
            return date_str

    @staticmethod
    def calculate_age(birth_date: str) -> Optional[int]:
        """Calculate age from birth date"""
        try:
            birth = datetime.strptime(birth_date, '%Y-%m-%d').date()
            today = date.today()
            return today.year - birth.year - ((today.month, today.day) < (birth.month, birth.day))
        except ValueError:
            return None

    @staticmethod
    def export_to_json(data: Any, filename: str) -> bool:
        """Export data to JSON file"""
        try:
            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(data, f, indent=2, ensure_ascii=False, default=str)
            return True
        except Exception as e:
            print(f"Error exporting to JSON: {e}")
            return False

    @staticmethod
    def import_from_json(filename: str) -> Optional[Any]:
        """Import data from JSON file"""
        try:
            with open(filename, 'r', encoding='utf-8') as f:
                return json.load(f)
        except Exception as e:
            print(f"Error importing from JSON: {e}")
            return None

    @staticmethod
    def calculate_lease_duration(start_date: str, end_date: str) -> Optional[int]:
        """Calculate lease duration in days"""
        try:
            start = datetime.strptime(start_date, '%Y-%m-%d').date()
            end = datetime.strptime(end_date, '%Y-%m-%d').date()
            return (end - start).days
        except ValueError:
            return None

    @staticmethod
    def generate_reference_number(prefix: str = "REF", length: int = 6) -> str:
        """Generate a reference number"""
        import random
        import string
        suffix = ''.join(random.choices(string.digits, k=length))
        return f"{prefix}{suffix}"

    @staticmethod
    def sanitize_filename(filename: str) -> str:
        """Sanitize filename for safe file operations"""
        import re
        # Remove or replace invalid characters
        sanitized = re.sub(r'[<>:"/\\|?*]', '_', filename)
        # Remove leading/trailing spaces and dots
        sanitized = sanitized.strip(' .')
        # Limit length
        if len(sanitized) > 200:
            sanitized = sanitized[:200]
        return sanitized

    @staticmethod
    def parse_address(address_string: str) -> Dict[str, str]:
        """Parse a full address string into components"""
        # Simple address parsing - can be enhanced
        parts = [part.strip() for part in address_string.split(',')]

        result = {
            'address_line1': '',
            'address_line2': '',
            'city': '',
            'state': '',
            'postal_code': '',
            'country': ''
        }

        if len(parts) >= 1:
            result['address_line1'] = parts[0]
        if len(parts) >= 2:
            result['city'] = parts[1]
        if len(parts) >= 3:
            result['state'] = parts[2]
        if len(parts) >= 4:
            result['country'] = parts[3]

        return result

    @staticmethod
    def mask_sensitive_data(data: str, mask_char: str = '*', show_last: int = 4) -> str:
        """Mask sensitive data like phone numbers or emails"""
        if not data or len(data) <= show_last:
            return data

        visible_part = data[-show_last:]
        masked_part = mask_char * (len(data) - show_last)
        return masked_part + visible_part

    @staticmethod
    def calculate_business_days(start_date: str, end_date: str) -> Optional[int]:
        """Calculate business days between two dates (excluding weekends)"""
        try:
            start = datetime.strptime(start_date, '%Y-%m-%d').date()
            end = datetime.strptime(end_date, '%Y-%m-%d').date()

            business_days = 0
            current_date = start

            while current_date <= end:
                if current_date.weekday() < 5:  # Monday = 0, Sunday = 6
                    business_days += 1
                current_date += timedelta(days=1)

            return business_days
        except ValueError:
            return None

class DataAnalyzer:
    """Advanced data analysis utilities"""

    @staticmethod
    def calculate_revenue_trends(payments: List[Dict], months: int = 12) -> Dict:
        """Calculate revenue trends over specified months"""
        from collections import defaultdict

        monthly_revenue = defaultdict(float)
        monthly_counts = defaultdict(int)

        for payment in payments:
            if payment.get('paid_on'):
                try:
                    payment_date = datetime.strptime(payment['paid_on'], '%Y-%m-%d %H:%M:%S')
                    month_key = payment_date.strftime('%Y-%m')
                    monthly_revenue[month_key] += payment['amount']
                    monthly_counts[month_key] += 1
                except ValueError:
                    continue

        # Sort by month and get last N months
        sorted_months = sorted(monthly_revenue.keys())[-months:]

        return {
            'monthly_revenue': {month: monthly_revenue[month] for month in sorted_months},
            'monthly_counts': {month: monthly_counts[month] for month in sorted_months},
            'trend_analysis': {
                'total_months': len(sorted_months),
                'average_monthly_revenue': sum(monthly_revenue[m] for m in sorted_months) / len(sorted_months) if sorted_months else 0,
                'highest_month': max(sorted_months, key=lambda m: monthly_revenue[m]) if sorted_months else None,
                'lowest_month': min(sorted_months, key=lambda m: monthly_revenue[m]) if sorted_months else None
            }
        }

    @staticmethod
    def analyze_tenant_retention(leases: List[Dict]) -> Dict:
        """Analyze tenant retention patterns"""
        tenant_lease_counts = {}
        total_leases = len(leases)

        for lease in leases:
            tenant_id = lease.get('tenant_id')
            if tenant_id:
                tenant_lease_counts[tenant_id] = tenant_lease_counts.get(tenant_id, 0) + 1

        retention_stats = {
            'single_lease': 0,
            'multiple_leases': 0,
            'max_leases_per_tenant': 0,
            'retention_rate': 0.0
        }

        if tenant_lease_counts:
            retention_stats['single_lease'] = sum(1 for count in tenant_lease_counts.values() if count == 1)
            retention_stats['multiple_leases'] = sum(1 for count in tenant_lease_counts.values() if count > 1)
            retention_stats['max_leases_per_tenant'] = max(tenant_lease_counts.values())
            retention_stats['retention_rate'] = (retention_stats['multiple_leases'] / len(tenant_lease_counts)) * 100

        return retention_stats

    @staticmethod
    def identify_maintenance_patterns(tickets: List[Dict]) -> Dict:
        """Identify patterns in maintenance requests"""
        category_frequency = {}
        property_issues = {}
        seasonal_patterns = {}

        for ticket in tickets:
            # Category analysis
            category = ticket.get('category', 'Unknown')
            category_frequency[category] = category_frequency.get(category, 0) + 1

            # Property analysis
            property_name = ticket.get('property_name', 'Unknown')
            if property_name not in property_issues:
                property_issues[property_name] = {}
            property_issues[property_name][category] = property_issues[property_name].get(category, 0) + 1

            # Seasonal analysis (if created_at is available)
            if ticket.get('created_at'):
                try:
                    created_date = datetime.strptime(ticket['created_at'], '%Y-%m-%d %H:%M:%S')
                    month = created_date.strftime('%m')
                    seasonal_patterns[month] = seasonal_patterns.get(month, 0) + 1
                except ValueError:
                    continue

        return {
            'category_frequency': category_frequency,
            'property_issues': property_issues,
            'seasonal_patterns': seasonal_patterns,
            'recommendations': DataAnalyzer._generate_maintenance_recommendations(category_frequency, property_issues)
        }

    @staticmethod
    def _generate_maintenance_recommendations(category_freq: Dict, property_issues: Dict) -> List[str]:
        """Generate maintenance recommendations based on patterns"""
        recommendations = []

        if category_freq:
            most_common = max(category_freq, key=category_freq.get)
            if category_freq[most_common] > len(category_freq) * 0.5:
                recommendations.append(f"Focus on preventive maintenance for {most_common} issues")

        # Property-specific recommendations
        for property_name, issues in property_issues.items():
            if len(issues) > 3:
                recommendations.append(f"Review maintenance protocols for {property_name}")

        return recommendations

Writing models.py


In [4]:
%%writefile config.py
"""
Configuration settings for Property Management System
"""

import os
from datetime import timedelta

class Config:
    """Base configuration class"""

    # Database settings
    DATABASE_PATH = os.path.join(os.path.dirname(__file__), 'database.db')
    DATABASE_BACKUP_PATH = os.path.join(os.path.dirname(__file__), 'backups')

    # Application settings
    APP_NAME = "Property Management System"
    APP_VERSION = "1.0.0"
    TIMEZONE = "Asia/Kuala_Lumpur"

    # Business rules
    DEFAULT_LEASE_EXPIRY_WARNING_DAYS = 30
    DEFAULT_PAYMENT_DUE_WARNING_DAYS = 7

    # Service ticket priorities
    TICKET_PRIORITIES = ['low', 'normal', 'high', 'urgent']
    TICKET_CATEGORIES = [
        'Maintenance',
        'Billing',
        'Inquiries',
        'Complaints',
        'Emergency'
    ]

    # Payment methods
    PAYMENT_METHODS = [
        'bank_transfer',
        'credit_card',
        'debit_card',
        'cash',
        'check'
    ]

    # Unit statuses
    UNIT_STATUSES = [
        'available',
        'occupied',
        'maintenance',
        'renovation'
    ]

    # Lease statuses
    LEASE_STATUSES = [
        'active',
        'expired',
        'terminated',
        'pending'
    ]

    # Service ticket statuses
    TICKET_STATUSES = [
        'open',
        'assigned',
        'in_progress',
        'resolved',
        'closed'
    ]

    # Agent roles
    AGENT_ROLES = [
        'Manager',
        'Supervisor',
        'Technician',
        'Clerk',
        'Administrator'
    ]

    # Currency settings
    CURRENCY_SYMBOL = 'RM'
    CURRENCY_CODE = 'MYR'

    # Date formats
    DATE_FORMAT = '%Y-%m-%d'
    DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'
    DISPLAY_DATE_FORMAT = '%d/%m/%Y'
    DISPLAY_DATETIME_FORMAT = '%d/%m/%Y %H:%M'

    # Reporting settings
    REPORTS_PATH = os.path.join(os.path.dirname(__file__), 'reports')

    @classmethod
    def ensure_directories(cls):
        """Ensure required directories exist"""
        directories = [
            cls.DATABASE_BACKUP_PATH,
            cls.REPORTS_PATH
        ]

        for directory in directories:
            os.makedirs(directory, exist_ok=True)

class DevelopmentConfig(Config):
    """Development configuration"""
    DEBUG = True
    TESTING = False

class ProductionConfig(Config):
    """Production configuration"""
    DEBUG = False
    TESTING = False

class TestingConfig(Config):
    """Testing configuration"""
    DEBUG = True
    TESTING = True
    DATABASE_PATH = ':memory:'  # Use in-memory database for testing

# Configuration mapping
config = {
    'development': DevelopmentConfig,
    'production': ProductionConfig,
    'testing': TestingConfig,
    'default': DevelopmentConfig
}

Writing config.py


In [13]:
"""
Streamlit Property Management System Dashboard
Interactive web interface for querying property data and document analysis
"""

import streamlit as st
import pandas as pd
import sqlite3
import json
import io
import os
import sys
import tempfile
import re
from datetime import datetime, timedelta
import plotly.express as px
import plotly.graph_objects as go
from typing import Dict, List, Any, Optional

# Import our existing modules
try:
    from main import PropertyManager, DatabaseManager
    from models import Utils, ReportGenerator, DataValidator, DataAnalyzer
    from config import Config
except ImportError:
    st.error("Please ensure main.py, models.py, and config.py are in the same directory")
    st.stop()

# Page configuration
st.set_page_config(
    page_title="Property Management System",
    page_icon="🏢",
    layout="wide",
    initial_sidebar_state="expanded"
)

# Custom CSS for better styling
st.markdown("""
<style>
    .main-header {
        font-size: 2.5rem;
        color: #1f77b4;
        text-align: center;
        margin-bottom: 2rem;
    }
    .metric-card {
        background-color: #f0f2f6;
        padding: 1rem;
        border-radius: 0.5rem;
        border-left: 4px solid #1f77b4;
    }
    .success-message {
        background-color: #d4edda;
        color: #155724;
        padding: 0.75rem;
        border-radius: 0.25rem;
        border: 1px solid #c3e6cb;
    }
    .error-message {
        background-color: #f8d7da;
        color: #721c24;
        padding: 0.75rem;
        border-radius: 0.25rem;
        border: 1px solid #f5c6cb;
    }
    .sidebar .sidebar-content {
        background-color: #f8f9fa;
    }
    .stAlert {
        margin-top: 1rem;
    }
    .css-1d391kg {
        padding: 1rem;
    }
    .stTabs [data-baseweb="tab-list"] {
        gap: 24px;
    }
    .stTabs [data-baseweb="tab"] {
        height: 50px;
        padding: 0px 24px;
        background-color: #f0f2f6;
        border-radius: 8px 8px 0px 0px;
    }
    .stTabs [aria-selected="true"] {
        background-color: #1f77b4;
        color: white;
    }
</style>
""", unsafe_allow_html=True)

class StreamlitPropertyApp:
    """Main Streamlit application class"""

    def __init__(self):
        self.pm = None
        self.initialize_session_state()
        self.initialize_database()

    def initialize_session_state(self):
        """Initialize Streamlit session state variables"""
        if 'api_key' not in st.session_state:
            st.session_state.api_key = ""
        if 'uploaded_files' not in st.session_state:
            st.session_state.uploaded_files = []
        if 'query_history' not in st.session_state:
            st.session_state.query_history = []
        if 'data_cache' not in st.session_state:
            st.session_state.data_cache = {}
        if 'chat_messages' not in st.session_state:
            st.session_state.chat_messages = []
        if 'selected_property' not in st.session_state:
            st.session_state.selected_property = None
        if 'selected_tenant' not in st.session_state:
            st.session_state.selected_tenant = None
        if 'theme_mode' not in st.session_state:
            st.session_state.theme_mode = "light"

    def initialize_database(self):
        """Initialize database connection"""
        try:
            self.pm = PropertyManager()
            st.session_state.db_connected = True
        except Exception as e:
            st.error(f"Database connection failed: {e}")
            st.session_state.db_connected = False

    def render_sidebar(self):
        """Render sidebar with configuration options"""
        st.sidebar.title("🔧 Configuration")

        # API Key input
        st.sidebar.subheader("🔑 API Configuration")
        api_key = st.sidebar.text_input(
            "Enter API Key (Optional)",
            value=st.session_state.api_key,
            type="password",
            help="Enter your OpenAI, Claude, or other AI API key for enhanced features"
        )
        st.session_state.api_key = api_key

        if api_key:
            st.sidebar.success("✅ API Key configured")

            # API Provider selection
            api_provider = st.sidebar.selectbox(
                "Select AI Provider:",
                ["OpenAI", "Anthropic Claude", "Custom"],
                help="Choose your AI service provider"
            )
            st.session_state.api_provider = api_provider

        # File upload section
        st.sidebar.subheader("📁 Document Upload")
        uploaded_files = st.sidebar.file_uploader(
            "Upload documents for analysis",
            accept_multiple_files=True,
            type=['csv', 'xlsx', 'json', 'txt', 'pdf'],
            help="Upload CSV, Excel, JSON, text or PDF files"
        )

        if uploaded_files:
            st.session_state.uploaded_files = uploaded_files
            st.sidebar.success(f"✅ {len(uploaded_files)} file(s) uploaded")

        # Database status and operations
        st.sidebar.subheader("🗄️ Database Management")
        if st.session_state.get('db_connected', False):
            st.sidebar.success("✅ Database Connected")

            # Quick stats
            try:
                tenants = self.pm.get_all_tenants()
                properties = self.pm.get_all_properties()
                leases = self.pm.get_active_leases()

                st.sidebar.metric("Tenants", len(tenants))
                st.sidebar.metric("Properties", len(properties))
                st.sidebar.metric("Active Leases", len(leases))

            except Exception as e:
                st.sidebar.error(f"Error loading stats: {e}")
        else:
            st.sidebar.error("❌ Database Not Connected")
            if st.sidebar.button("🔄 Reconnect Database"):
                self.initialize_database()

        # System preferences
        st.sidebar.subheader("⚙️ Preferences")

        # Currency setting
        currency = st.sidebar.selectbox(
            "Currency:",
            ["RM (Malaysian Ringgit)", "USD", "EUR", "SGD"],
            index=0
        )
        st.session_state.currency = currency.split()[0]

        # Date format
        date_format = st.sidebar.selectbox(
            "Date Format:",
            ["DD/MM/YYYY", "MM/DD/YYYY", "YYYY-MM-DD"],
            index=0
        )
        st.session_state.date_format = date_format

        # Clear cache and reset
        st.sidebar.subheader("🧹 System Maintenance")
        col1, col2 = st.sidebar.columns(2)
        with col1:
            if st.button("🗑️ Clear Cache"):
                self.clear_cache()
                st.success("Cache cleared!")
        with col2:
            if st.button("🔄 Reset All"):
                self.reset_application()

    def render_main_header(self):
        """Render main application header"""
        # Logo and title
        col1, col2, col3 = st.columns([1, 2, 1])
        with col2:
            st.markdown('<h1 class="main-header">🏢 Property Management System</h1>', unsafe_allow_html=True)

        # Status bar
        if st.session_state.get('db_connected', False):
            status_text = "🟢 System Online"
            try:
                tenants_count = len(self.pm.get_all_tenants())
                properties_count = len(self.pm.get_all_properties())
                status_text += f" | {tenants_count} Tenants | {properties_count} Properties"
            except:
                pass
        else:
            status_text = "🔴 System Offline"

        st.markdown(f"<div style='text-align: center; color: #666; margin-bottom: 1rem;'>{status_text}</div>", unsafe_allow_html=True)

        # Quick navigation tabs
        tab1, tab2, tab3, tab4, tab5 = st.tabs([
            "📊 Dashboard",
            "🔍 Query Interface",
            "📋 Data Management",
            "📈 Analytics",
            "💬 AI Assistant"
        ])

        return tab1, tab2, tab3, tab4, tab5

    def render_dashboard(self, tab):
          """Render main dashboard"""
          with tab:
              if not st.session_state.get('db_connected', False):
                  st.error("Database not connected. Please check your database configuration.")
                  return

              # Dashboard header with refresh option
              col1, col2 = st.columns([3, 1])
              with col1:
                  st.subheader("📊 System Overview")
              with col2:
                  if st.button("🔄 Refresh Dashboard"):
                      st.rerun()

              # Key metrics row
              try:
                  # Get all data
                  tenants = self.pm.get_all_tenants()
                  properties = self.pm.get_all_properties()
                  active_leases = self.pm.get_active_leases()
                  pending_payments = self.pm.get_pending_payments()
                  open_tickets = self.pm.get_open_tickets()
                  financial_summary = self.pm.get_financial_summary()

                  # Main metrics
                  col1, col2, col3, col4, col5 = st.columns(5)

                  with col1:
                      st.metric(
                          "👥 Total Tenants",
                          len(tenants),
                          help="Number of registered tenants"
                      )

                  with col2:
                      st.metric(
                          "🏠 Properties",
                          len(properties),
                          help="Total properties managed"
                      )

                  with col3:
                      st.metric(
                          "📋 Active Leases",
                          len(active_leases),
                          help="Currently active lease agreements"
                      )

                  with col4:
                      total_pending = sum(p['amount'] for p in pending_payments)
                      st.metric(
                          "💳 Pending Payments",
                          f"RM {total_pending:,.2f}",
                          help="Total amount in pending payments"
                      )

                  with col5:
                      st.metric(
                          "🎫 Open Tickets",
                          len(open_tickets),
                          help="Active service tickets"
                      )

                  st.divider()

                  # Financial overview section
                  st.subheader("💰 Financial Overview")

                  col1, col2, col3, col4 = st.columns(4)

                  with col1:
                      collected = financial_summary.get('total_collected', 0)
                      st.metric(
                          "Monthly Collected",
                          f"RM {collected:,.2f}",
                          help="Total payments received this month"
                      )

                  with col2:
                      pending = financial_summary.get('total_pending', 0)
                      st.metric(
                          "Monthly Pending",
                          f"RM {pending:,.2f}",
                          help="Total payments pending this month"
                      )

                  with col3:
                      total_amount = collected + pending
                      if total_amount > 0:
                          collection_rate = (collected / total_amount) * 100
                          delta_color = "normal" if collection_rate >= 80 else "inverse"
                          st.metric(
                              "Collection Rate",
                              f"{collection_rate:.1f}%",
                              delta=f"Target: 85%",
                              delta_color=delta_color,
                              help="Percentage of payments collected"
                          )
                      else:
                          st.metric("Collection Rate", "0%")

                  with col4:
                      overdue_payments = [p for p in pending_payments if self.is_payment_overdue(p)]
                      overdue_amount = sum(p['amount'] for p in overdue_payments)
                      st.metric(
                          "Overdue Amount",
                          f"RM {overdue_amount:,.2f}",
                          delta=f"{len(overdue_payments)} payments",
                          delta_color="inverse" if overdue_amount > 0 else "normal",
                          help="Total overdue payment amount"
                      )

                  # Charts section
                  st.subheader("📈 Visual Analytics")

                  col1, col2 = st.columns(2)

                  with col1:
                      # Financial summary pie chart
                      if collected > 0 or pending > 0:
                          financial_data = pd.DataFrame({
                              'Category': ['Collected', 'Pending'],
                              'Amount': [collected, pending],
                              'Color': ['#2E8B57', '#FF6B6B']
                          })

                          fig_financial = px.pie(
                              financial_data,
                              values='Amount',
                              names='Category',
                              title="💰 Payment Status Distribution",
                              color='Category',
                              color_discrete_map={'Collected': '#2E8B57', 'Pending': '#FF6B6B'}
                          )
                          fig_financial.update_layout(height=400)
                          st.plotly_chart(fig_financial, use_container_width=True)
                      else:
                          st.info("No financial data available for this period")

                  with col2:
                      # Service tickets by priority
                      if open_tickets:
                          ticket_priorities = {}
                          for ticket in open_tickets:
                              priority = ticket.get('priority', 'normal')
                              ticket_priorities[priority] = ticket_priorities.get(priority, 0) + 1

                          # Create color mapping for priorities
                          priority_colors = {
                              'urgent': '#FF4444',
                              'high': '#FF8800',
                              'normal': '#4CAF50',
                              'low': '#2196F3'
                          }

                          priority_df = pd.DataFrame(
                              list(ticket_priorities.items()),
                              columns=['Priority', 'Count']
                          )

                          fig_tickets = px.bar(
                              priority_df,
                              x='Priority',
                              y='Count',
                              title="🎫 Service Tickets by Priority",
                              color='Priority',
                              color_discrete_map=priority_colors
                          )
                          fig_tickets.update_layout(height=400)
                          st.plotly_chart(fig_tickets, use_container_width=True)
                      else:
                          st.info("No open service tickets")

                  # Property occupancy overview
                  st.subheader("🏢 Property Occupancy Overview")

                  occupancy_data = []
                  for prop in properties:
                      units = self.pm.get_units_by_property(prop['id'])
                      total_units = len(units)
                      occupied_units = sum(1 for unit in units if unit.get('current_status') == 'occupied')
                      occupancy_rate = (occupied_units / total_units * 100) if total_units > 0 else 0

                      occupancy_data.append({
                          'Property': prop['name'],
                          'Total Units': total_units,
                          'Occupied': occupied_units,
                          'Vacant': total_units - occupied_units,
                          'Occupancy Rate': occupancy_rate
                      })

                  if occupancy_data:
                      occupancy_df = pd.DataFrame(occupancy_data)

                      col1, col2 = st.columns(2)

                      with col1:
                          # Occupancy rate bar chart
                          fig_occupancy = px.bar(
                              occupancy_df,
                              x='Property',
                              y='Occupancy Rate',
                              title="Occupancy Rate by Property",
                              color='Occupancy Rate',
                              color_continuous_scale='RdYlGn'
                          )
                          fig_occupancy.update_layout(height=400)
                          st.plotly_chart(fig_occupancy, use_container_width=True)

                      with col2:
                          # Unit status breakdown
                          fig_units = px.bar(
                              occupancy_df,
                              x='Property',
                              y=['Occupied', 'Vacant'],
                              title="Unit Status by Property",
                              color_discrete_map={'Occupied': '#4CAF50', 'Vacant': '#FFC107'}
                          )
                          fig_units.update_layout(height=400)
                          st.plotly_chart(fig_units, use_container_width=True)

                  # Quick actions section
                  st.subheader("⚡ Quick Actions")

                  col1, col2, col3, col4 = st.columns(4)

                  with col1:
                      if st.button("➕ Add New Tenant", use_container_width=True):
                          self.show_add_tenant_form()

                  with col2:
                      if st.button("🏠 Add New Property", use_container_width=True):
                          st.info("Property addition feature coming soon!")

                  with col3:
                      if st.button("📋 Create Lease", use_container_width=True):
                          st.info("Lease creation feature coming soon!")

                  with col4:
                      if st.button("🎫 Create Service Ticket", use_container_width=True):
                          st.info("Service ticket creation feature coming soon!")

                  # Alerts and notifications
                  self.show_dashboard_alerts(pending_payments, active_leases, open_tickets)

              except Exception as e:
                  st.error(f"Error loading dashbodef render_query_interface(self, tab)")
          """Render natural language query interface"""
          with tab:
              st.subheader("🔍 Natural Language Query Interface")

              # Query input section
              st.write("Ask questions about your property data in natural language:")

              col1, col2 = st.columns([4, 1])

              with col1:
                  query = st.text_input(
                      "Enter your question:",
                      placeholder="e.g., 'Show me all tenants with pending payments' or 'Which properties have the highest occupancy?'",
                      help="Type your question in natural language",
                      key="main_query_input"
                  )

              with col2:
                  search_button = st.button("🔍 Search", type="primary", use_container_width=True)

              # Process query if button clicked
              if search_button and query:
                  self.process_natural_language_query(query)
              elif search_button and not query:
                  st.warning("Please enter a query")

              # Quick query suggestions
              st.subheader("💡 Quick Query Examples")

              # Organize examples by category
              col1, col2, col3 = st.columns(3)

              with col1:
                  st.write("**📊 Financial Queries**")
                  if st.button("💰 Show Financial Summary", key="fin_summary"):
                      self.show_financial_summary()

                  if st.button("💳 Pending Payments", key="pending_pay"):
                      self.show_pending_payments()

                  if st.button("📈 Revenue Analysis", key="revenue_analysis"):
                      st.info("Revenue analysis feature coming soon!")

              with col2:
                  st.write("**🏠 Property Queries**")
                  if st.button("🏠 List All Properties", key="all_props"):
                      self.show_all_properties()

                  if st.button("📊 Occupancy Analysis", key="occupancy"):
                      self.show_property_occupancy()

                  if st.button("🏢 Unit Details", key="unit_details"):
                      st.info("Unit details feature coming soon!")

              with col3:
                  st.write("**👥 Tenant & Lease Queries**")
                  if st.button("👥 Show All Tenants", key="all_tenants"):
                      self.show_all_tenants()

                  if st.button("⚠️ Expiring Leases", key="expiring_leases"):
                      self.show_expiring_leases()

                  if st.button("🎫 Open Service Tickets", key="open_tickets"):
                      self.show_open_tickets()

              # Query history section
              if st.session_state.query_history:
                  st.subheader("📜 Query History")

                  # Show last 10 queries
                  for i, (timestamp, query_text, result_type) in enumerate(reversed(st.session_state.query_history[-10:])):
                      with st.expander(f"🕒 {timestamp} - {query_text[:60]}{'...' if len(query_text) > 60 else ''}"):
                          col1, col2 = st.columns([3, 1])

                          with col1:
                              st.write(f"**Query:** {query_text}")
                              st.write(f"**Result Type:** {result_type}")
                              st.write(f"**Time:** {timestamp}")

                          with col2:
                              if st.button("🔄 Re-run", key=f"rerun_{i}"):
                                  self.process_natural_language_query(query_text)

                  # Clear history option
                  if st.button("🗑️ Clear Query History"):
                      st.session_state.query_history = []
                      st.success("Query history cleared!")

    def process_natural_language_query(self, query: str):
        """Process natural language query and return results"""
        query_lower = query.lower()
        timestamp = datetime.now().strftime("%H:%M:%S")

        try:
            # Enhanced keyword-based query processing
            if any(word in query_lower for word in ['tenant', 'tenants']):
                if 'pending' in query_lower or 'payment' in query_lower:
                    self.show_tenants_with_pending_payments()
                    result_type = "Tenants with Pending Payments"
                else:
                    self.show_all_tenants()
                    result_type = "All Tenants"

            elif any(word in query_lower for word in ['property', 'properties']):
                if 'occupancy' in query_lower or 'occupied' in query_lower:
                    self.show_property_occupancy()
                    result_type = "Property Occupancy"
                else:
                    self.show_all_properties()
                    result_type = "All Properties"

            elif any(word in query_lower for word in ['payment', 'payments']):
                if 'pending' in query_lower:
                    self.show_pending_payments()
                    result_type = "Pending Payments"
                else:
                    self.show_financial_summary()
                    result_type = "Financial Summary"

            elif any(word in query_lower for word in ['lease', 'leases']):
                if 'expir' in query_lower:
                    self.show_expiring_leases()
                    result_type = "Expiring Leases"
                else:
                    self.show_active_leases()
                    result_type = "Active Leases"

            elif any(word in query_lower for word in ['ticket', 'tickets', 'service']):
                self.show_open_tickets()
                result_type = "Service Tickets"

            elif any(word in query_lower for word in ['financial', 'money', 'revenue', 'income']):
                self.show_financial_summary()
                result_type = "Financial Summary"

            else:
                st.info("I couldn't understand your query. Please try using keywords like 'tenants', 'properties', 'payments', 'leases', or 'tickets'.")
                result_type = "Unrecognized Query"

            # Add to query history
            st.session_state.query_history.append((timestamp, query, result_type))

        except Exception as e:
            st.error(f"Error processing query: {e}")

    def show_all_tenants(self):
        """Display all tenants"""
        st.subheader("👥 All Tenants")
        try:
            tenants = self.pm.get_all_tenants()
            if tenants:
                df = pd.DataFrame(tenants)
                st.dataframe(df, use_container_width=True)

                # Download button
                csv = df.to_csv(index=False)
                st.download_button(
                    label="📥 Download as CSV",
                    data=csv,
                    file_name=f"tenants_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv",
                    mime="text/csv"
                )
            else:
                st.info("No tenants found")
        except Exception as e:
            st.error(f"Error loading tenants: {e}")

    def show_all_properties(self):
        """Display all properties"""
        st.subheader("🏠 All Properties")
        try:
            properties = self.pm.get_all_properties()
            if properties:
                df = pd.DataFrame(properties)
                st.dataframe(df, use_container_width=True)

                # Show units for each property
                st.subheader("🏢 Units by Property")
                for prop in properties:
                    with st.expander(f"{prop['name']} - {prop['unit_count']} units"):
                        units = self.pm.get_units_by_property(prop['id'])
                        if units:
                            units_df = pd.DataFrame(units)
                            st.dataframe(units_df, use_container_width=True)
            else:
                st.info("No properties found")
        except Exception as e:
            st.error(f"Error loading properties: {e}")

    def show_financial_summary(self):
        """Display financial summary"""
        st.subheader("💰 Financial Summary")
        try:
            summary = self.pm.get_financial_summary()

            col1, col2, col3, col4 = st.columns(4)

            with col1:
                st.metric(
                    "Total Collected",
                    f"RM {summary.get('total_collected', 0):,.2f}",
                    help="Total payments received this month"
                )

            with col2:
                st.metric(
                    "Total Pending",
                    f"RM {summary.get('total_pending', 0):,.2f}",
                    help="Total payments pending this month"
                )

            with col3:
                st.metric(
                    "Payments Received",
                    summary.get('payments_received', 0),
                    help="Number of payments received"
                )

            with col4:
                st.metric(
                    "Payments Pending",
                    summary.get('payments_pending', 0),
                    help="Number of payments pending"
                )

            # Collection rate
            total_amount = summary.get('total_collected', 0) + summary.get('total_pending', 0)
            if total_amount > 0:
                collection_rate = (summary.get('total_collected', 0) / total_amount) * 100
                st.progress(collection_rate / 100)
                st.write(f"**Collection Rate:** {collection_rate:.1f}%")

        except Exception as e:
            st.error(f"Error loading financial summary: {e}")

    # def show_pending_payments(self):
    #     """Display pending payments"""
    #     st.subheader("💳 Pending Payments")
    #     try:
    #         payments = self.pm.get_pending_payments()
    #         if payments:
    #             df = pd.DataFrame(payments)

    #             # Add overdue indicator
    #             df['due_date'] = pd.to_datetime(df['due_date'])
    #             df['days_overdue'] = (datetime.now() - df['due_date']).dt.days
    #             df['status'] =ard data: {e}")
    #             st.info("Please check your database connection and try refreshing the page.")

    def show_pending_payments(self):
        """Display pending payments"""
    st.subheader("💳 Pending Payments")
    try:
        payments = self.pm.get_pending_payments()
        if payments:
            df = pd.DataFrame(payments)

            # Add overdue indicator
            df['due_date'] = pd.to_datetime(df['due_date'])
            df['days_overdue'] = (datetime.now() - df['due_date']).dt.days
            df['status'] = df['days_overdue'].apply(lambda x: "Overdue" if x > 0 else "On Time")

            st.dataframe(df)
        else:
            st.info("No pending payments found.")
    except Exception as e:
        st.error(f"Error loading payment data: {e}")
        st.info("Please check your database connection and try refreshing the page.")




2025-07-01 01:52:22.820 
  command:

    streamlit run /usr/local/lib/python3.11/dist-packages/colab_kernel_launcher.py [ARGUMENTS]
