In [None]:
# Import get_db from the db module
from db import get_db

# Import other required dependencies
from flask import request, render_template
from contextlib import closing
import re

# Try to import pgeocode for ZIP distance calculations
try:
    import pgeocode
except ImportError:
    pgeocode = None

# Services Module

This notebook contains all public service listing, filtering, and related logic migrated from `app.py`. It covers:
- `/services` route and filtering
- Provider service listings
- Service ZIP filtering and radius logic
- Service search and display helpers

In [None]:
# Public services listing and filtering
from flask import request, render_template, session, abort, url_for, flash
from contextlib import closing
import re

def get_services(search_query='', filter_type='all', zip_param=''):
    with closing(get_db()) as db:
        cur = db.cursor()
        base_query = (
            "SELECT s.id, s.title, s.description, s.price, s.posted_by, s.provider_id, s.is_certified, "
            "s.certification_proof, p.business_name as provider_business_name, "
            "p.base_zip as provider_base_zip "
            "FROM services s "
            "LEFT JOIN providers p ON s.provider_id = p.id "
            "WHERE s.active = 1"
        )
        params = []
        if search_query:
            base_query += " AND (LOWER(s.title) LIKE LOWER(?) OR LOWER(s.description) LIKE LOWER(?) OR LOWER(s.posted_by) LIKE LOWER(?) OR LOWER(p.business_name) LIKE LOWER(?) OR LOWER(p.first_name) LIKE LOWER(?))"
            search_param = f"%{search_query}%"
            params.extend([search_param, search_param, search_param, search_param, search_param])
        if filter_type == 'certified':
            base_query += " AND s.is_certified = 1"
        elif filter_type == 'best_price':
            base_query += " ORDER BY CASE WHEN s.price LIKE '$%' THEN CAST(SUBSTR(s.price, 2, INSTR(s.price || '-', '-') - 2) AS INTEGER) ELSE 999999 END ASC, s.created_at DESC"
        elif filter_type == 'by_provider':
            base_query += " ORDER BY COALESCE(p.business_name, 'Independent Contributor'), s.title ASC"
        else:
            base_query += " ORDER BY s.created_at DESC"
        cur.execute(base_query, params)
        services_raw = cur.fetchall()
        # ZIP filtering logic omitted for brevity
        services = [dict(row) for row in services_raw]
    return services

# Example Flask route usage:
# @app.route("/services")
# def services():
#     search_query = request.args.get('q', '').strip()
#     filter_type = request.args.get('filter', 'all')
#     zip_param = (request.args.get('zip', '') or '').strip()
#     services = get_services(search_query, filter_type, zip_param)
#     return render_template("services.html", services=services, ...)


In [None]:
# --- /services route logic ---
from flask import request, render_template, session
from contextlib import closing
import re

def services_route():
    search_query = request.args.get('q', '').strip()
    filter_type = request.args.get('filter', 'all')
    zip_param = (request.args.get('zip', '') or '').strip()
    geo_dist = pgeocode.GeoDistance("US") if 'pgeocode' in globals() else None
    with closing(get_db()) as db:
        cur = db.cursor()
        base_query = (
            "SELECT s.id, s.title, s.description, s.price, s.posted_by, s.provider_id, s.is_certified, "
            "s.certification_proof, p.business_name as provider_business_name, "
            "p.base_zip as provider_base_zip "
            "FROM services s "
            "LEFT JOIN providers p ON s.provider_id = p.id "
            "WHERE s.active = 1"
        )
        params = []
        if search_query:
            base_query += " AND (LOWER(s.title) LIKE LOWER(?) OR LOWER(s.description) LIKE LOWER(?) OR LOWER(s.posted_by) LIKE LOWER(?) OR LOWER(p.business_name) LIKE LOWER(?) OR LOWER(p.first_name) LIKE LOWER(?))"
            search_param = f"%{search_query}%"
            params.extend([search_param, search_param, search_param, search_param, search_param])
        if filter_type == 'certified':
            base_query += " AND s.is_certified = 1"
        elif filter_type == 'best_price':
            base_query += " ORDER BY CASE WHEN s.price LIKE '$%' THEN CAST(SUBSTR(s.price, 2, INSTR(s.price || '-', '-') - 2) AS INTEGER) ELSE 999999 END ASC, s.created_at DESC"
        elif filter_type == 'by_provider':
            base_query += " ORDER BY COALESCE(p.business_name, 'Independent Contributor'), s.title ASC"
        else:
            base_query += " ORDER BY s.created_at DESC"
        cur.execute(base_query, params)
        services_raw = cur.fetchall()
        allowed_provider_ids = None
        applied_zip = None
        if re.fullmatch(r"\d{5}", zip_param or ""):
            applied_zip = zip_param
            allowed_provider_ids = set()
            cur.execute("SELECT base_zip FROM profile WHERE id = 1")
            profile_row = cur.fetchone()
            admin_base_zip = (profile_row["base_zip"] if profile_row else None) or ""
            def within_radius(z1, z2, miles):
                if not z1 or not z2 or not re.fullmatch(r"\d{5}", z1) or not re.fullmatch(r"\d{5}", z2):
                    return False
                if geo_dist is None:
                    return z1 == z2
                try:
                    km = geo_dist.query_postal_code(z1, z2)
                    if km is None or (isinstance(km, float) and (km != km)):
                        return False
                    return (float(km) * 0.621371) <= miles
                except Exception:
                    return False
            cur.execute("SELECT provider_id, zip_code, radius_miles FROM provider_zips")
            provider_area_rows = cur.fetchall()
            areas_by_provider = {}
            for r in provider_area_rows:
                areas_by_provider.setdefault(r["provider_id"], []).append((r["zip_code"], r["radius_miles"]))
            cur.execute("SELECT id, base_zip FROM providers WHERE active = 1")
            prov_rows = cur.fetchall()
            if admin_base_zip and (admin_base_zip == applied_zip or within_radius(admin_base_zip, applied_zip, 20)):
                allowed_provider_ids.add(0)
            for pr in prov_rows:
                pid = pr["id"]
                base_zip = (pr["base_zip"] or "").strip()
                match = False
                if base_zip and (base_zip == applied_zip or within_radius(base_zip, applied_zip, 20)):
                    match = True
                if not match:
                    for (z, rm) in areas_by_provider.get(pid, []) or []:
                        if z == applied_zip or within_radius(z, applied_zip, float(rm or 10)):
                            match = True
                            break
                if match:
                    allowed_provider_ids.add(pid)
        services = []
        for service in services_raw:
            if allowed_provider_ids is not None:
                if service["provider_id"] not in allowed_provider_ids:
                    continue
            services.append({
                'id': service['id'],
                'title': service['title'],
                'description': service['description'],
                'price': service['price'],
                'posted_by': service['posted_by'],
                'provider_id': service['provider_id'],
                'is_certified': service['is_certified'],
                'certification_proof': service['certification_proof'],
                'provider_business_name': service['provider_business_name']
            })
        cur.execute("SELECT first_name, business_name FROM profile WHERE id = 1")
        profile_row = cur.fetchone()
        profile = {
            "first_name": profile_row["first_name"] if profile_row else "",
            "business_name": profile_row["business_name"] if profile_row else ""
        }
    return {
        "services": services,
        "profile": profile,
        "search_query": search_query,
        "filter_type": filter_type,
        "applied_zip": applied_zip,
        "title": "All Services"
    }

# --- Provider services listing ---
def provider_services(provider_id):
    with closing(get_db()) as db:
        cur = db.cursor()
        cur.execute(
            "SELECT id, title, description, price, posted_by FROM services WHERE provider_id = ? AND active = 1 ORDER BY created_at DESC",
            (provider_id,)
        )
        services_raw = cur.fetchall()
        services = [dict(row) for row in services_raw]
        if provider_id == 0:
            cur.execute("SELECT first_name, business_name FROM profile WHERE id = 1")
            profile_row = cur.fetchone()
            provider_name = profile_row["business_name"] if profile_row else "Services"
        else:
            cur.execute("SELECT first_name, business_name FROM providers WHERE id = ? AND active = 1", (provider_id,))
            provider_row = cur.fetchone()
            provider_name = provider_row["business_name"] if provider_row else "Provider Services"
        profile = {
            "first_name": profile_row["first_name"] if 'profile_row' in locals() and profile_row else (provider_row["first_name"] if 'provider_row' in locals() and provider_row else ""),
            "business_name": provider_name
        }
    return {"services": services, "profile": profile, "title": f"{provider_name} - Services"}


In [None]:
# --- Main function called by app.py for /services route ---
def get_services_page(request):
    """
    Main function called by app.py for the /services route
    Returns rendered template with services data
    """
    search_query = request.args.get('q', '').strip()
    filter_type = request.args.get('filter', 'all')
    zip_param = (request.args.get('zip', '') or '').strip()
    geo_dist = pgeocode.GeoDistance("US") if pgeocode else None
    
    with closing(get_db()) as db:
        cur = db.cursor()
        base_query = (
            "SELECT s.id, s.title, s.description, s.price, s.posted_by, s.provider_id, s.is_certified, "
            "s.certification_proof, p.business_name as provider_business_name, "
            "p.base_zip as provider_base_zip "
            "FROM services s "
            "LEFT JOIN providers p ON s.provider_id = p.id "
            "WHERE s.active = 1"
        )
        params = []
        
        if search_query:
            base_query += " AND (LOWER(s.title) LIKE LOWER(?) OR LOWER(s.description) LIKE LOWER(?) OR LOWER(s.posted_by) LIKE LOWER(?) OR LOWER(p.business_name) LIKE LOWER(?) OR LOWER(p.first_name) LIKE LOWER(?))"
            search_param = f"%{search_query}%"
            params.extend([search_param] * 5)
        
        if filter_type == 'certified':
            base_query += " AND s.is_certified = 1"
        
        if filter_type == 'best_price':
            base_query += " ORDER BY CASE WHEN s.price LIKE '$%' THEN CAST(SUBSTR(s.price, 2, INSTR(s.price || '-', '-') - 2) AS INTEGER) ELSE 999999 END ASC, s.created_at DESC"
        elif filter_type == 'by_provider':
            base_query += " ORDER BY COALESCE(p.business_name, 'Independent Contributor'), s.title ASC"
        else:
            base_query += " ORDER BY s.created_at DESC"
        
        cur.execute(base_query, params)
        services_raw = cur.fetchall()
        
        # ZIP filtering logic
        allowed_provider_ids = None
        applied_zip = None
        if re.fullmatch(r"\d{5}", zip_param or ""):
            applied_zip = zip_param
            allowed_provider_ids = set()
            
            # Get admin base ZIP
            cur.execute("SELECT base_zip FROM profile WHERE id = 1")
            profile_row = cur.fetchone()
            admin_base_zip = (profile_row["base_zip"] if profile_row else None) or ""
            
            def within_radius(z1, z2, miles):
                if not z1 or not z2 or not re.fullmatch(r"\d{5}", z1) or not re.fullmatch(r"\d{5}", z2):
                    return False
                if geo_dist is None:
                    return z1 == z2
                try:
                    km = geo_dist.query_postal_code(z1, z2)
                    if km is None or (isinstance(km, float) and (km != km)):
                        return False
                    return (float(km) * 0.621371) <= miles
                except Exception:
                    return False
            
            # Get provider service areas
            cur.execute("SELECT provider_id, zip_code, radius_miles FROM provider_zips")
            provider_area_rows = cur.fetchall()
            areas_by_provider = {}
            for r in provider_area_rows:
                areas_by_provider.setdefault(r["provider_id"], []).append((r["zip_code"], r["radius_miles"]))
            
            # Check all providers
            cur.execute("SELECT id, base_zip FROM providers WHERE active = 1")
            prov_rows = cur.fetchall()
            
            # Check admin provider
            if admin_base_zip and (admin_base_zip == applied_zip or within_radius(admin_base_zip, applied_zip, 20)):
                allowed_provider_ids.add(0)
            
            # Check registered providers
            for pr in prov_rows:
                pid = pr["id"]
                base_zip = (pr["base_zip"] or "").strip()
                match = False
                
                if base_zip and (base_zip == applied_zip or within_radius(base_zip, applied_zip, 20)):
                    match = True
                
                if not match:
                    for (z, rm) in areas_by_provider.get(pid, []) or []:
                        if z == applied_zip or within_radius(z, applied_zip, float(rm or 10)):
                            match = True
                            break
                
                if match:
                    allowed_provider_ids.add(pid)
        
        # Filter services by allowed provider IDs
        services = []
        for service in services_raw:
            if allowed_provider_ids is not None:
                if service["provider_id"] not in allowed_provider_ids:
                    continue
            services.append({
                'id': service['id'],
                'title': service['title'],
                'description': service['description'],
                'price': service['price'],
                'posted_by': service['posted_by'],
                'provider_id': service['provider_id'],
                'is_certified': service['is_certified'],
                'certification_proof': service['certification_proof'],
                'provider_business_name': service['provider_business_name']
            })
        
        # Get profile for template
        cur.execute("SELECT first_name, business_name FROM profile WHERE id = 1")
        profile_row = cur.fetchone()
        profile = {
            "first_name": profile_row["first_name"] if profile_row else "",
            "business_name": profile_row["business_name"] if profile_row else "Your Service Provider"
        }
    
    return render_template(
        "services.html",
        services=services,
        profile=profile,
        search_query=search_query,
        filter_type=filter_type,
        applied_zip=applied_zip,
        title="All Services"
    )

In [None]:
# --- Service search logic ---
def search_services(q):
    results = []
    count = 0
    if q:
        tokens = re.findall(r"\w+", q.lower())
        synonyms = {
            "lawn": ["yard", "mow", "mowing", "landscape", "landscaping"],
            "clean": ["cleaning", "housekeeping", "maid"],
            "repair": ["fix", "handyman"],
        }
        expanded = set(tokens)
        for t in tokens:
            for syn in synonyms.get(t, []):
                expanded.add(syn)
        match_expr = " OR ".join(f"{t}*" for t in expanded)
        with closing(get_db()) as db:
            cur = db.cursor()
            try:
                cur.execute(
                    """
                    SELECT s.id, s.title, s.description, s.price, s.posted_by
                    FROM services s
                    JOIN services_fts f ON f.rowid = s.id
                    WHERE s.active = 1 AND f MATCH ?
                    ORDER BY bm25(services_fts) ASC
                    """,
                    (match_expr or None,)
                )
                results = cur.fetchall()
            except Exception:
                ors = []
                params = []
                for t in expanded:
                    like = f"%{t}%"
                    ors.append("title LIKE ? OR description LIKE ?")
                    params.extend([like, like])
                where = "(" + ") OR (".join(ors) + ")" if ors else "1=1"
                cur.execute(
                    f"""
                    SELECT id, title, description, price, posted_by
                    FROM services
                    WHERE active = 1 AND {where}
                    ORDER BY created_at DESC
                    """,
                    params,
                )
                results = cur.fetchall()
            count = len(results)
    return {"results": results, "count": count}


In [None]:
# --- Helper: is_zip_allowed ---
def is_zip_allowed(zip_code):
    if not zip_code or not re.fullmatch(r"\d{5}", zip_code):
        return False
    with closing(get_db()) as db:
        cur = db.cursor()
        cur.execute("SELECT zip, radius_miles FROM zips")
        rows = cur.fetchall()
    if any(r["zip"] == zip_code for r in rows):
        return True
    geo_dist = pgeocode.GeoDistance("US") if 'pgeocode' in globals() else None
    if geo_dist is not None:
        for r in rows:
            try:
                km = geo_dist.query_postal_code(r["zip"], zip_code)
            except Exception:
                km = None
            if km is None or (isinstance(km, float) and (km != km)):
                continue
            miles = float(km) * 0.621371
            if miles <= float(r["radius_miles"]):
                return True
    return False


In [None]:
# --- Helper: get_provider_service_areas ---
def get_provider_service_areas(provider_id):
    with closing(get_db()) as db:
        cur = db.cursor()
        cur.execute("SELECT zip_code, radius_miles FROM provider_zips WHERE provider_id = ? ORDER BY zip_code", (provider_id,))
        service_areas = [dict(row) for row in cur.fetchall()]
    return service_areas
