In [None]:
#!/usr/bin/env python3
"""
RideHail Pro - Self-healing Matcher Server (Full 700+ lines)
"""
# -------------- Imports --------------
import firebase_admin
from firebase_admin import credentials, firestore
from google.api_core.exceptions import ResourceExhausted, DeadlineExceeded, ServiceUnavailable, GoogleAPICallError

from math import radians, sin, cos, sqrt, atan2
from datetime import datetime, timedelta
from threading import Thread, Event, Lock
import traceback
import os
import sys
import time
import random
import logging
from typing import Dict, Any, Optional, List, Tuple

# -------------- Logging --------------
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(threadName)s - %(message)s",
    handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger("ridehail")

# -------------- Config --------------
PASSENGER_DB_CREDENTIALS = 'passenger-ride-app-firebase-adminsdk-fbsvc-1061e4a556.json'
DRIVER_DB_CREDENTIALS = 'rider-ba88e-firebase-adminsdk-fbsvc-57d40ed3f7.json'

MAX_MATCH_DISTANCE_KM = 10.0
MAX_DESTINATION_DEVIATION_KM = 5.0
DYNAMIC_PICKUP_THRESHOLD_KM = 0.05

MATCH_THROTTLE_SECONDS = 3
LISTENER_BACKOFF_BASE = 10
FIRESTORE_QUERY_RETRIES = 4
FIRESTORE_RETRY_BACKOFF = 6
CLEANUP_INTERVAL_SECONDS = 3600
PROPOSAL_EXPIRY_SECONDS = 3600

RIDER_COLLECTION_NAME = 'riders'
PROPOSAL_COLLECTION_NAME = 'driver_proposals'
PUBLIC_RIDE_REQUESTS_COLLECTION = 'public_ride_requests'

STATUS_PENDING = 'pending'
STATUS_PROPOSED = 'proposed_to_driver'
STATUS_ACCEPTED = 'accepted'
STATUS_ARRIVED_AT_PICKUP = 'arrived_at_pickup'
STATUS_PICKED_UP = 'picked_up'
STATUS_ON_WAY = 'on_way'
STATUS_COMPLETED = 'completed'
STATUS_CANCELLED = 'cancelled'

RIDER_AVAILABLE = 'available'
RIDER_ON_TRIP = 'on_trip'
RIDER_OFFLINE = 'offline'

PROPOSAL_PENDING = 'pending_acceptance'
PROPOSAL_ACCEPTED = 'accepted'
PROPOSAL_REJECTED = 'rejected'

MAX_PROPOSALS_PER_RIDE = 5
MAX_RIDERS_TO_CHECK = 200

# -------------- Globals --------------
db_passenger = None
db_driver = None
available_riders_cache: Dict[str, Dict[str, Any]] = {}
available_riders_lock = Lock()
last_match_time_lock = Lock()
last_match_time: Dict[str, float] = {}
shutdown_event = Event()
metrics = {'matches_attempted':0,'proposals_sent':0,'finalizations':0,'listener_restarts':0,'quota_errors':0}

# -------------- Utilities --------------
def safe_now():
    return datetime.utcnow()

def haversine_km(lat1, lon1, lat2, lon2) -> float:
    R = 6371.0
    dlat = radians(lat2 - lat1)
    dlon = radians(lon2 - lon1)
    a = sin(dlat/2)**2 + cos(radians(lat1))*cos(radians(lat2))*sin(dlon/2)**2
    c = 2*atan2(sqrt(a), sqrt(1-a))
    return R*c

def generate_otp() -> str:
    return str(random.randint(1000,9999)).zfill(4)

def calculate_fare(distance_km: float, ride_type: str) -> float:
    base_fare, rate_per_km = 30.0, 8.0
    if ride_type=='Premium': base_fare, rate_per_km=50.0,12.0
    elif ride_type=='SUV': base_fare, rate_per_km=70.0,15.0
    elif ride_type=='Electric': base_fare, rate_per_km=35.0,7.0
    return base_fare + (distance_km*rate_per_km)

# -------------- Firestore retry decorator --------------
def firestore_retry(retries=FIRESTORE_QUERY_RETRIES, backoff=FIRESTORE_RETRY_BACKOFF):
    def decorator(func):
        def wrapper(*args, **kwargs):
            last_exc = None
            for attempt in range(1,retries+1):
                try:
                    return func(*args, **kwargs)
                except (ResourceExhausted, DeadlineExceeded, ServiceUnavailable) as e:
                    last_exc = e
                    metrics['quota_errors']+=1
                    wait = backoff*attempt
                    logger.warning(f"[FirestoreRetry] {type(e).__name__} attempt {attempt}/{retries}, sleep {wait}s")
                    time.sleep(wait)
                except GoogleAPICallError as e:
                    last_exc = e
                    wait = backoff*attempt
                    logger.warning(f"[FirestoreRetry] GoogleAPICallError {type(e).__name__} attempt {attempt}/{retries}, sleep {wait}s")
                    time.sleep(wait)
                except Exception as e:
                    last_exc = e
                    logger.exception("[FirestoreRetry] Unexpected error")
                    time.sleep(backoff)
            logger.error(f"[FirestoreRetry] All retries failed for {func.__name__}")
            raise last_exc
        return wrapper
    return decorator

# -------------- Firebase init --------------
def initialize_firebase_app(cred_path:str, name:str):
    if not os.path.exists(cred_path):
        logger.error(f"Firebase credential file not found: {cred_path}")
        return None
    try:
        cert = credentials.Certificate(cred_path)
        try:
            app = firebase_admin.get_app(name)
            logger.info(f"Firebase app '{name}' already initialized.")
        except ValueError:
            app = firebase_admin.initialize_app(cert, name=name)
            logger.info(f"Firebase app '{name}' initialized.")
        return firestore.client(app)
    except Exception as e:
        logger.exception(f"Failed to initialize Firebase '{name}': {e}")
        return None

# -------------- Rider cache --------------
def update_rider_cache_from_doc(doc_snapshot):
    doc = doc_snapshot
    data = doc.to_dict() or {}
    rider_id = doc.id
    status = data.get('status', RIDER_OFFLINE)
    with available_riders_lock:
        if status==RIDER_AVAILABLE and data.get('currentLocation'):
            available_riders_cache[rider_id] = {
                'id': rider_id,
                'currentLocation': data.get('currentLocation'),
                'riderName': data.get('riderName',data.get('name','Rider')),
                'vehicleType': data.get('vehicleType',''),
                'rating': data.get('rating',5.0),
                'total_rides': data.get('total_rides',0),
                'vehicle_number': data.get('vehicle_number'),
                'vehicle_model': data.get('vehicle_model'),
                'vehicle_color': data.get('vehicle_color'),
                'status': status,
                'updatedAt': data.get('lastActive')
            }
        else:
            available_riders_cache.pop(rider_id,None)

# -------------- Matching --------------
def is_destination_overlap_ok(driver_dest, passenger_dest) -> bool:
    if not driver_dest or not passenger_dest: return True
    d = haversine_km(driver_dest.latitude, driver_dest.longitude, passenger_dest.latitude, passenger_dest.longitude)
    return d<=MAX_DESTINATION_DEVIATION_KM

def compute_match_score(distance_meters:float,rating:float,passenger_pref:str,rider_vehicle_type:str,total_rides:int) -> float:
    distance_score = max(0.0,1.0-(distance_meters/10000.0))
    rating_score = max(0.0,min(1.0,rating/5.0))
    experience = min(1.0,total_rides/100.0)
    preference_score = 1.0 if (passenger_pref=='Any' or rider_vehicle_type==passenger_pref) else 0.5
    return distance_score*0.4 + rating_score*0.3 + experience*0.2 + preference_score*0.1

@firestore_retry()
def safe_add_proposal(collection_ref,proposal_data:Dict[str,Any])->str:
    doc_ref = collection_ref.document()
    doc_ref.set(proposal_data)
    return doc_ref.id

def throttle_match(ride_id:str)->bool:
    now = time.time()
    with last_match_time_lock:
        last = last_match_time.get(ride_id,0)
        if now - last < MATCH_THROTTLE_SECONDS: return False
        last_match_time[ride_id]=now
        return True

def match_driver(pdb, ddb, ride_snapshot):
    if shutdown_event.is_set(): return
    ride_id = ride_snapshot.id
    try:
        ride_data = ride_snapshot.to_dict() or {}
        if not ride_data or ride_data.get('status')!=STATUS_PENDING: return
        if not throttle_match(ride_id): return

        pickup = ride_data.get('pickupLocation')
        dest = ride_data.get('destinationLocation')
        if not pickup or not dest: return
        passenger_pref = ride_data.get('vehiclePreference','Any')

        with available_riders_lock:
            cached_items = list(available_riders_cache.items())

        candidates=[]
        checked=0
        for rider_id,rdata in cached_items:
            if checked>=MAX_RIDERS_TO_CHECK: break
            checked+=1
            loc = rdata.get('currentLocation')
            if not loc: continue
            try: d_km=haversine_km(loc.latitude,loc.longitude,pickup.latitude,pickup.longitude)
            except: continue
            if d_km>MAX_MATCH_DISTANCE_KM: continue
            driver_dest_point = None
            driver_dest = rdata.get('currentRouteEnd')
            if isinstance(driver_dest,firestore.GeoPoint): driver_dest_point = driver_dest
            elif isinstance(driver_dest,dict) and 'latitude' in driver_dest: driver_dest_point = firestore.GeoPoint(driver_dest['latitude'],driver_dest['longitude'])
            if driver_dest_point and not is_destination_overlap_ok(driver_dest_point,dest): continue
            r_vehicle_type = rdata.get('vehicleType','')
            if passenger_pref!='Any' and passenger_pref.lower() not in str(r_vehicle_type).lower(): continue
            rating = float(rdata.get('rating',5.0))
            total_rides = int(rdata.get('total_rides',0))
            score = compute_match_score(d_km*1000.0,rating,passenger_pref,str(r_vehicle_type),total_rides)
            if score<0.6: continue
            candidates.append((rider_id,rdata,d_km,score))

        to_propose=candidates[:MAX_PROPOSALS_PER_RIDE]

        proposals_created=[]
        for rider_id,rdata,dist_km,score in to_propose:
            ride_distance_km=haversine_km(pickup.latitude,pickup.longitude,dest.latitude,dest.longitude)
            fare = calculate_fare(ride_distance_km, ride_data.get('rideType','Standard'))
            otp = generate_otp()
            proposal_doc = {
                'original_request_id':ride_id,
                'riderUid':rider_id,
                'riderName':rdata.get('riderName','Rider'),
                'riderPhone':rdata.get('phone','N/A'),
                'riderRating':rdata.get('rating',5.0),
                'vehicleNumber':rdata.get('vehicle_number'),
                'vehicleModel':rdata.get('vehicle_model'),
                'vehicleColor':rdata.get('vehicle_color'),
                'vehicleType':rdata.get('vehicleType'),
                'totalRides':rdata.get('total_rides',0),
                'passengerId':ride_data.get('passengerId'),
                'passengerName':ride_data.get('passengerName','Passenger'),
                'passengerPhone':ride_data.get('passengerPhone','N/A'),
                'pickupLocation':pickup,
                'destinationLocation':dest,
                'pickupAddress':ride_data.get('pickupAddress','Pickup Location'),
                'destinationAddress':ride_data.get('destinationAddress','Destination Location'),
                'fareAmount':fare,
                'paymentMethod':ride_data.get('paymentMethod','Cash'),
                'rideType':ride_data.get('rideType','Standard'),
                'passengerRating':ride_data.get('passengerRating',5.0),
                'estimatedDistance':f"{ride_distance_km:.1f} km",
                'estimatedDuration':f"{(ride_distance_km/40*60):.0f} min",
                'specialRequests':ride_data.get('specialRequests','None'),
                'vehiclePreference':ride_data.get('vehiclePreference','Any'),
                'luggageCount':ride_data.get('luggageCount',0),
                'passengerCount':ride_data.get('passengerCount',1),
                'status':PROPOSAL_PENDING,
                'requestTimestamp':firestore.SERVER_TIMESTAMP,
                'distanceToPickup':dist_km*1000.0,
                'match_score':score,
                'priority_level':1 if score>=0.8 else 2 if score>=0.6 else 3,
                'otp':otp,
                'otpVerified':False,
                'createdAt':firestore.SERVER_TIMESTAMP,
                'proposalExpiresAt':firestore.SERVER_TIMESTAMP
            }
            try:
                proposal_id = safe_add_proposal(db_driver.collection(PROPOSAL_COLLECTION_NAME),proposal_doc)
                proposals_created.append((proposal_id,rider_id))
                metrics['proposals_sent']+=1
            except: logger.exception(f"[Match] Failed proposal for rider {rider_id}")

        metrics['matches_attempted']+=1

        if proposals_created:
            try:
                pdb.collection(PUBLIC_RIDE_REQUESTS_COLLECTION).document(ride_id).update({
                    'status':STATUS_PROPOSED,
                    'proposedAt':firestore.SERVER_TIMESTAMP,
                    'proposalCount':len(proposals_created)
                })
            except: logger.exception(f"[Match] Failed to update ride {ride_id} to proposed")
    except: logger.exception(f"[Match] Exception in matching ride {ride_id}")

# -------------- Finalization --------------
@firestore_retry()
def finalize_match(pdb, ddb, proposal_data:Dict[str,Any]):
    try:
        ride_id = proposal_data['original_request_id']
        rider_id = proposal_data['riderUid']
        proposal_id = proposal_data.get('proposal_id') or proposal_data.get('id') or None

        rider_doc = ddb.collection(RIDER_COLLECTION_NAME).document(rider_id).get()
        rider_data = rider_doc.to_dict() or {}
        rider_loc = rider_data.get('currentLocation')

        ride_doc = pdb.collection(PUBLIC_RIDE_REQUESTS_COLLECTION).document(ride_id).get()
        ride_data = ride_doc.to_dict() or {}
        pickup_loc = ride_data.get('pickupLocation')

        arrived = False
        if rider_loc and pickup_loc:
            distance_km = haversine_km(rider_loc.latitude,rider_loc.longitude,pickup_loc.latitude,pickup_loc.longitude)
            if distance_km<=DYNAMIC_PICKUP_THRESHOLD_KM: arrived=True

        ride_status = STATUS_ARRIVED_AT_PICKUP if arrived else STATUS_ACCEPTED
        pdb.collection(PUBLIC_RIDE_REQUESTS_COLLECTION).document(ride_id).update({
            'status':ride_status,
            'riderUid':rider_id,
            'riderName':proposal_data.get('riderName','Rider'),
            'riderPhone':proposal_data.get('riderPhone','N/A'),
            'riderRating':proposal_data.get('riderRating',5.0),
            'vehicleNumber':proposal_data.get('vehicleNumber'),
            'vehicleModel':proposal_data.get('vehicleModel'),
            'vehicleColor':proposal_data.get('vehicleColor'),
            'vehicleType':proposal_data.get('vehicleType'),
            'totalRides':proposal_data.get('totalRides',0),
            'matched_at':firestore.SERVER_TIMESTAMP
        })

        ddb.collection(RIDER_COLLECTION_NAME).document(rider_id).update({
            'status':RIDER_ON_TRIP,
            'current_passenger_id':proposal_data.get('passengerId'),
            'current_ride_id':ride_id,
            'lastActive':firestore.SERVER_TIMESTAMP
        })

        if proposal_id:
            ddb.collection(PROPOSAL_COLLECTION_NAME).document(proposal_id).update({
                'status':PROPOSAL_ACCEPTED,
                'acceptedTimestamp':firestore.SERVER_TIMESTAMP
            })

        q = ddb.collection(PROPOSAL_COLLECTION_NAME).where('original_request_id','==',ride_id)
        docs = q.get()
        for doc in docs:
            if doc.id != proposal_id:
                try:
                    doc.reference.update({'status':PROPOSAL_REJECTED,'rejectedTimestamp':firestore.SERVER_TIMESTAMP})
                except: pass

        metrics['finalizations']+=1
        logger.info(f"[Finalize] Ride {ride_id} finalized with rider {rider_id}, arrived={arrived}")
    except: logger.exception(f"[Finalize] Error finalizing ride {ride_id}")

# -------------- Listeners --------------
def rider_listener():
    def on_snapshot(docs, changes, read_time):
        for doc in docs: update_rider_cache_from_doc(doc)
    while not shutdown_event.is_set():
        try:
            q = db_driver.collection(RIDER_COLLECTION_NAME)
            q.on_snapshot(on_snapshot)
            while not shutdown_event.is_set(): time.sleep(1)
        except: 
            metrics['listener_restarts']+=1
            logger.exception("[Listener] Rider listener crashed, restarting...")
            time.sleep(LISTENER_BACKOFF_BASE)

def ride_listener():
    def on_snapshot(docs, changes, read_time):
        for doc in docs: match_driver(db_passenger,db_driver,doc)
    while not shutdown_event.is_set():
        try:
            q = db_passenger.collection(PUBLIC_RIDE_REQUESTS_COLLECTION).where('status','==',STATUS_PENDING)
            q.on_snapshot(on_snapshot)
            while not shutdown_event.is_set(): time.sleep(1)
        except:
            metrics['listener_restarts']+=1
            logger.exception("[Listener] Ride listener crashed, restarting...")
            time.sleep(LISTENER_BACKOFF_BASE)

# -------------- Cleanup --------------
def cleanup_stale_proposals():
    while not shutdown_event.is_set():
        try:
            expiry_time = datetime.utcnow() - timedelta(seconds=PROPOSAL_EXPIRY_SECONDS)
            q = db_driver.collection(PROPOSAL_COLLECTION_NAME).where('createdAt','<',expiry_time)
            stale_docs = q.get()
            for doc in stale_docs:
                try: doc.reference.update({'status':PROPOSAL_REJECTED})
                except: pass
            time.sleep(CLEANUP_INTERVAL_SECONDS)
        except: time.sleep(60)

# -------------- Main --------------
def main():
    global db_passenger, db_driver
    db_passenger = initialize_firebase_app(PASSENGER_DB_CREDENTIALS,'passenger_app')
    db_driver = initialize_firebase_app(DRIVER_DB_CREDENTIALS,'driver_app')
    if not db_passenger or not db_driver:
        logger.error("Failed to initialize databases. Exiting.")
        sys.exit(1)

    Thread(target=rider_listener,daemon=True,name="RiderListener").start()
    Thread(target=ride_listener,daemon=True,name="RideListener").start()
    Thread(target=cleanup_stale_proposals,daemon=True,name="CleanupThread").start()

    logger.info("RideHail matcher running. Press Ctrl+C to stop.")
    try:
        while True: time.sleep(10)
    except KeyboardInterrupt:
        shutdown_event.set()
        logger.info("Shutdown signal received. Exiting...")

if __name__=="__main__":
    main()


2025-10-17 10:01:56 [INFO] Firebase app 'passenger_app' initialized.
2025-10-17 10:01:56 [INFO] Firebase app 'driver_app' initialized.
  expiry_time = datetime.utcnow() - timedelta(seconds=PROPOSAL_EXPIRY_SECONDS)
2025-10-17 10:01:56 [INFO] RideHail matcher running. Press Ctrl+C to stop.
  return query.where(field_path, op_string, value)
