In [1]:
import pandas as pd
import requests
from bs4 import BeautifulSoup
import re
import time
import json
import logging
from datetime import datetime
from typing import Dict, List, Optional, Set
import argparse
import sys
from pathlib import Path
from tqdm import tqdm


In [2]:

RAW_PATH = r"C:\Users\snehi\OneDrive\Desktop\New folder\PHIT\us-ca.csv"
OUTPUT_PATH = r"C:\Users\snehi\OneDrive\Desktop\New folder\PHIT\cleaned_us_ca.csv"  


def clean_us_ca(input_path: str, output_path: str) -> pd.DataFrame:
    df = pd.read_csv(input_path, delimiter=";", dtype=str)

    df = df.applymap(lambda x: x.strip() if isinstance(x, str) else x)

    if "pha" in df.columns:
        df["pha"] = (
            df["pha"]
            .astype(str)
            .str.strip()
            .str.upper()
            .map({"TRUE": True, "FALSE": False})
        )

    if "population_proper" in df.columns:
        df["population_proper"] = pd.to_numeric(
            df["population_proper"], errors="coerce"
        ).astype("Int64")

    if "state_id" in df.columns:
        df["state_id"] = df["state_id"].str.upper()

    critical_cols = [c for c in ["name", "pha_url"] if c in df.columns]
    if critical_cols:
        df = df.dropna(subset=critical_cols)
        for c in critical_cols:
            df = df[df[c].astype(str).str.strip() != ""]

    if "community_id" in df.columns:
        df = df.drop_duplicates(subset=["community_id"])
    else:
        subset_cols = [c for c in ["name", "state_id"] if c in df.columns]
        if subset_cols:
            df = df.drop_duplicates(subset=subset_cols)

    sort_cols = [c for c in ["state_id", "name"] if c in df.columns]
    if sort_cols:
        df = df.sort_values(sort_cols)

    Path(output_path).parent.mkdir(parents=True, exist_ok=True)
    df.to_csv(output_path, index=False)
    print(f"Cleaned dataset saved to: {output_path}")
    print(f"  - Rows: {len(df)}")
    print(f"  - Columns: {len(df.columns)}")

    return df


if __name__ == "__main__":
    clean_us_ca(RAW_PATH, OUTPUT_PATH)


Cleaned dataset saved to: C:\Users\snehi\OneDrive\Desktop\New folder\PHIT\cleaned_us_ca.csv
  - Rows: 62
  - Columns: 8


  df = df.applymap(lambda x: x.strip() if isinstance(x, str) else x)


Web Scraping Fundamentals

In [3]:
# Sample HTML for testing (simulates a health department website)
SAMPLE_HTML = """
<html>
<head><title>County Health Department</title></head>
<body>
    <div class="contact-section">
        <h1>Butte County Public Health</h1>
        <div class="contact-info">
            <p>Main Office: (530) 552-3800</p>
            <p>Email: publichealth@buttecounty.net</p>
            <p>Crisis Line: 1-800-334-6622</p>
        </div>
        <div class="address">
            <p>Address: 202 Mira Loma Drive, Oroville, CA 95965</p>
        </div>
    </div>
    <div class="services">
        <h2>Services</h2>
        <ul>
            <li>COVID-19 Vaccination and Testing</li>
            <li>Immunization Services</li>
            <li>Maternal and Child Health</li>
            <li>Mental Health Services</li>
        </ul>
    </div>
</body>
</html>
"""

In [4]:
def fetch_webpage(url):
    """
    Fetch a webpage using requests library
    
    Args:
        url (str): The URL to fetch
        
    Returns:
        str: HTML content of the page, or None if error
    """
    try:
        print(f"Fetching: {url}")
        response = requests.get(url, timeout=10)
        response.raise_for_status()  
        print(f"Successfully fetched {url} (Status: {response.status_code})")
        return response.text
    except requests.RequestException as e:
        print(f"Error fetching {url}: {e}")
        return None

In [5]:
def parse_html(html_content):
    """
    Parse HTML content using BeautifulSoup
    
    Args:
        html_content (str): Raw HTML string
        
    Returns:
        BeautifulSoup: Parsed HTML object
    """
    soup = BeautifulSoup(html_content, 'html.parser')
    return soup

In [6]:
def extract_phone_numbers(soup):
    """
    Extract phone numbers using regex patterns
    
    Args:
        soup (BeautifulSoup): Parsed HTML
        
    Returns:
        list: List of phone numbers found
    """
    
    phone_pattern = re.compile(
        r'\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}|'  
        r'1[-.\s]?\d{3}[-.\s]?\d{3}[-.\s]?\d{4}'  
    )
    
    text = soup.get_text()
    
    phones = phone_pattern.findall(text)
    
    unique_phones = list(dict.fromkeys(phones))
    
    return unique_phones


In [7]:
def extract_emails(soup):
    """
    Extract email addresses using regex
    
    Args:
        soup (BeautifulSoup): Parsed HTML
        
    Returns:
        list: List of email addresses found
    """
    email_pattern = re.compile(
        r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
    )
    
    text = soup.get_text()
    emails = email_pattern.findall(text)
    
    return list(dict.fromkeys(emails))

In [8]:
def extract_addresses(soup):
    """
    Extract physical addresses using regex
    
    Args:
        soup (BeautifulSoup): Parsed HTML
        
    Returns:
        list: List of addresses found
    """
    
    address_pattern = re.compile(
        r'\d+\s+[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\s+'  
        r'(?:Street|St|Avenue|Ave|Road|Rd|Drive|Dr|Lane|Ln|Boulevard|Blvd|Way)\.?'
        r'(?:,?\s+[A-Z][a-z]+,?\s+[A-Z]{2}\s+\d{5})?',  
        re.IGNORECASE
    )
    
    text = soup.get_text()
    addresses = address_pattern.findall(text)
    
    return addresses

In [9]:
def extract_with_css_selectors(soup):
    """
    Extract data using CSS selectors (alternative method)
    
    Args:
        soup (BeautifulSoup): Parsed HTML
        
    Returns:
        dict: Extracted data organized by type
    """
    data = {
        'title': None,
        'contact_info': [],
        'services': []
    }
    
    title_tag = soup.select_one('h1')
    if title_tag:
        data['title'] = title_tag.text.strip()
    
    contact_paragraphs = soup.select('.contact-info p')
    data['contact_info'] = [p.text.strip() for p in contact_paragraphs]
    
    service_items = soup.select('.services li')
    data['services'] = [li.text.strip() for li in service_items]
    
    return data

In [10]:
def demonstrate_beautifulsoup_methods(soup):
    """
    Demonstrate common BeautifulSoup methods
    
    Args:
        soup (BeautifulSoup): Parsed HTML
    """
    print("\n" + "="*60)
    print("BeautifulSoup Methods Demonstration")
    print("="*60)
    
    h1 = soup.find('h1')
    print(f"\n.find('h1'): {h1.text if h1 else 'Not found'}")
    
    paragraphs = soup.find_all('p')
    print(f"\n.find_all('p'): Found {len(paragraphs)} paragraphs")
    for i, p in enumerate(paragraphs[:3], 1):
        print(f"  {i}. {p.text.strip()}")
    
    contact_divs = soup.select('.contact-info')
    print(f"\n.select('.contact-info'): Found {len(contact_divs)} elements")
    
    text_sample = soup.get_text()[:100]
    print(f"\n.get_text() sample: {text_sample}...")
    
    title_tag = soup.find('title')
    if title_tag:
        print(f"\ntitle tag content: {title_tag.string}")



In [11]:
def main():
    """Main function to demonstrate web scraping techniques"""
    
    soup = parse_html(SAMPLE_HTML)
        
    demonstrate_beautifulsoup_methods(soup)
    
    print("\n" + "="*60)
    print("Data Extraction with Regex")
    print("="*60)
    
    phones = extract_phone_numbers(soup)
    print(f"\n Phone Numbers Found: {len(phones)}")
    for phone in phones:
        print(f"  - {phone}")
    
    emails = extract_emails(soup)
    print(f"\n Emails Found: {len(emails)}")
    for email in emails:
        print(f"  - {email}")
    
    addresses = extract_addresses(soup)
    print(f"\n Addresses Found: {len(addresses)}")
    for address in addresses:
        print(f"  - {address}")
    
    
    print("\n" + "="*60)
    print("Data Extraction with CSS Selectors")
    print("="*60)
    
    css_data = extract_with_css_selectors(soup)
    print(f"\n Facility: {css_data['title']}")
    print(f"\n Contact Info ({len(css_data['contact_info'])} items):")
    for info in css_data['contact_info']:
        print(f"  - {info}")
    print(f"\n Services ({len(css_data['services'])} items):")
    for service in css_data['services']:
        print(f"  - {service}")


if __name__ == '__main__':
    main()


BeautifulSoup Methods Demonstration

.find('h1'): Butte County Public Health

.find_all('p'): Found 4 paragraphs
  1. Main Office: (530) 552-3800
  2. Email: publichealth@buttecounty.net
  3. Crisis Line: 1-800-334-6622

.select('.contact-info'): Found 1 elements

.get_text() sample: 

County Health Department


Butte County Public Health

Main Office: (530) 552-3800
Email: publiche...

title tag content: County Health Department

Data Extraction with Regex

 Phone Numbers Found: 2
  - (530) 552-3800
  - 1-800-334-6622

 Emails Found: 1
  - publichealth@buttecounty.net

 Addresses Found: 1
  - 202 Mira Loma Drive, Oroville, CA 95965

Data Extraction with CSS Selectors

 Facility: Butte County Public Health

 Contact Info (3 items):
  - Main Office: (530) 552-3800
  - Email: publichealth@buttecounty.net
  - Crisis Line: 1-800-334-6622

 Services (4 items):
  - COVID-19 Vaccination and Testing
  - Immunization Services
  - Maternal and Child Health
  - Mental Health Services


Building Python Crawler

In [12]:
class HealthCrawler:
    """
    Web crawler for extracting public health resources from websites
    """
    
    def __init__(self, delay: float = 2.0):
        """
        Initialize the crawler
        
        Args:
            delay (float): Delay in seconds between requests (default: 2.0)
        """
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'PHapp-HealthCrawler/1.0 (Educational Project)'
        })
        self.delay = delay
        
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)
        
        self.phone_pattern = re.compile(
            r'\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}|'
            r'1[-.\s]?\d{3}[-.\s]?\d{3}[-.\s]?\d{4}'
        )
        self.email_pattern = re.compile(
            r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
        )
        self.address_pattern = re.compile(
            r'\d+\s+[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\s+'
            r'(?:Street|St|Avenue|Ave|Road|Rd|Drive|Dr|Lane|Ln|Boulevard|Blvd|Way)\.?'
            r'(?:,?\s+[A-Z][a-z]+,?\s+[A-Z]{2}\s+\d{5})?',
            re.IGNORECASE
        )
    
    def fetch_page(self, url: str) -> Optional[str]:
        """
        Fetch a webpage with error handling
        
        Args:
            url (str): URL to fetch
            
        Returns:
            str: HTML content or None if error
        """
        try:
            self.logger.info(f"Fetching: {url}")
            response = self.session.get(url, timeout=10)
            response.raise_for_status()
            self.logger.info(f"✓ Success: {url} (Status: {response.status_code})")
            return response.text
            
        except requests.Timeout:
            self.logger.error(f"✗ Timeout: {url}")
            return None
            
        except requests.ConnectionError:
            self.logger.error(f"✗ Connection error: {url}")
            return None
            
        except requests.HTTPError as e:
            self.logger.error(f"✗ HTTP error: {url} - {e}")
            return None
            
        except Exception as e:
            self.logger.error(f"✗ Unexpected error: {url} - {e}")
            return None
        
        finally:
            time.sleep(self.delay)
    
    def parse_html(self, html_content: str) -> BeautifulSoup:
        """
        Parse HTML content
        
        Args:
            html_content (str): Raw HTML
            
        Returns:
            BeautifulSoup: Parsed HTML object
        """
        return BeautifulSoup(html_content, 'html.parser')
    
    def extract_phones(self, soup: BeautifulSoup) -> List[str]:
        """
        Extract phone numbers from parsed HTML
        
        Args:
            soup (BeautifulSoup): Parsed HTML
            
        Returns:
            list: Unique phone numbers found
        """
        text = soup.get_text()
        phones = self.phone_pattern.findall(text)
        return list(dict.fromkeys(phones)) 
    
    def extract_emails(self, soup: BeautifulSoup) -> List[str]:
        """
        Extract email addresses from parsed HTML
        
        Args:
            soup (BeautifulSoup): Parsed HTML
            
        Returns:
            list: Unique email addresses found
        """
        text = soup.get_text()
        emails = self.email_pattern.findall(text)
        return list(dict.fromkeys(emails))
    
    def extract_addresses(self, soup: BeautifulSoup) -> List[str]:
        """
        Extract physical addresses from parsed HTML
        
        Args:
            soup (BeautifulSoup): Parsed HTML
            
        Returns:
            list: Addresses found
        """
        text = soup.get_text()
        addresses = self.address_pattern.findall(text)
        return addresses
    
    def extract_title(self, soup: BeautifulSoup) -> Optional[str]:
        """
        Extract page title or main heading
        
        Args:
            soup (BeautifulSoup): Parsed HTML
            
        Returns:
            str: Title or None
        """
        h1 = soup.find('h1')
        if h1:
            return h1.text.strip()
        
        title = soup.find('title')
        if title:
            return title.string.strip()
        
        return None
    
    def extract_data(self, soup: BeautifulSoup, url: str) -> Dict:
        """
        Extract all relevant data from parsed HTML
        
        Args:
            soup (BeautifulSoup): Parsed HTML
            url (str): Source URL
            
        Returns:
            dict: Extracted data
        """
        data = {
            'url': url,
            'title': self.extract_title(soup),
            'phones': self.extract_phones(soup),
            'emails': self.extract_emails(soup),
            'addresses': self.extract_addresses(soup),
            'scraped_at': datetime.now().isoformat(),
            'success': True
        }
        
        return data
    
    def crawl(self, url: str) -> Optional[Dict]:
        """
        Main crawl method - fetch and extract data from a URL
        
        Args:
            url (str): URL to crawl
            
        Returns:
            dict: Extracted data or None if failed
        """
        self.logger.info(f"Starting crawl: {url}")
        
        html_content = self.fetch_page(url)
        if not html_content:
            return {
                'url': url,
                'success': False,
                'error': 'Failed to fetch page',
                'scraped_at': datetime.now().isoformat()
            }
        
        soup = self.parse_html(html_content)
        
        data = self.extract_data(soup, url)
        
        self.logger.info(f"✓ Crawl complete: {url}")
        self.logger.info(f"  Found: {len(data['phones'])} phones, "
                        f"{len(data['emails'])} emails, "
                        f"{len(data['addresses'])} addresses")
        
        return data
    
    def batch_crawl(self, urls: List[str]) -> List[Dict]:
        """
        Crawl multiple URLs
        
        Args:
            urls (list): List of URLs to crawl
            
        Returns:
            list: List of extracted data dictionaries
        """
        results = []
        total = len(urls)
        
        self.logger.info(f"Starting batch crawl of {total} URLs")
        
        for i, url in enumerate(urls, 1):
            self.logger.info(f"\n[{i}/{total}] Processing: {url}")
            data = self.crawl(url)
            if data:
                results.append(data)
        
        self.logger.info(f"\n✓ Batch crawl complete: {len(results)}/{total} successful")
        return results
    
    def save_to_json(self, data: List[Dict], filename: str):
        """
        Save extracted data to JSON file
        
        Args:
            data (list): List of data dictionaries
            filename (str): Output filename
        """
        try:
            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(data, f, indent=2, ensure_ascii=False)
            self.logger.info(f"✓ Saved data to {filename}")
        except Exception as e:
            self.logger.error(f"✗ Error saving to {filename}: {e}")


In [13]:
def main():
    """
    Example usage of HealthCrawler
    """
    
    crawler = HealthCrawler(delay=2.0)
    
    test_urls = [
        'http://www.acphd.org',  
        'https://www.buttecounty.net/610/Public-Health',  
        'https://www.placer.ca.gov/2863/Public-Health'
    ]
    
    print("\nSample URLs:")
    print("  - https://www.buttecounty.net/610/Public-Health")
    print("  - https://www.placer.ca.gov/2863/Public-Health")
    print("  - https://dhs.saccounty.net/PUB/Pages/PUB-Home.asp")
    
    print("\n" + "="*60)
    print("Single URL Crawl Example")
    print("="*60)
    
    result = crawler.crawl(test_urls[0])
    if result:
        print(f"\nSuccessfully crawled: {result['url']}")
        print(f"  Title: {result.get('title', 'N/A')}")
        print(f"  Phones: {len(result['phones'])}")
        print(f"  Emails: {len(result['emails'])}")
        print(f"  Addresses: {len(result['addresses'])}")
    
    print("\n" + "="*60)
    print("Batch Crawl Example")
    print("="*60)

    results = crawler.batch_crawl(test_urls)
    crawler.save_to_json(results, 'health_resources.json')
    


if __name__ == '__main__':
    main()

2025-12-08 16:34:46,627 - INFO - Starting crawl: http://www.acphd.org
2025-12-08 16:34:46,627 - INFO - Fetching: http://www.acphd.org



Sample URLs:
  - https://www.buttecounty.net/610/Public-Health
  - https://www.placer.ca.gov/2863/Public-Health
  - https://dhs.saccounty.net/PUB/Pages/PUB-Home.asp

Single URL Crawl Example


2025-12-08 16:34:46,882 - INFO - ✓ Success: http://www.acphd.org (Status: 200)
2025-12-08 16:34:48,917 - INFO - ✓ Crawl complete: http://www.acphd.org
2025-12-08 16:34:48,917 - INFO -   Found: 2 phones, 0 emails, 2 addresses
2025-12-08 16:34:48,917 - INFO - Starting batch crawl of 3 URLs
2025-12-08 16:34:48,917 - INFO - 
[1/3] Processing: http://www.acphd.org
2025-12-08 16:34:48,917 - INFO - Starting crawl: http://www.acphd.org
2025-12-08 16:34:48,917 - INFO - Fetching: http://www.acphd.org
2025-12-08 16:34:49,002 - INFO - ✓ Success: http://www.acphd.org (Status: 200)



Successfully crawled: http://www.acphd.org
  Title: Alameda County Public Health Department
  Phones: 2
  Emails: 0
  Addresses: 2

Batch Crawl Example


2025-12-08 16:34:51,051 - INFO - ✓ Crawl complete: http://www.acphd.org
2025-12-08 16:34:51,053 - INFO -   Found: 2 phones, 0 emails, 2 addresses
2025-12-08 16:34:51,054 - INFO - 
[2/3] Processing: https://www.buttecounty.net/610/Public-Health
2025-12-08 16:34:51,054 - INFO - Starting crawl: https://www.buttecounty.net/610/Public-Health
2025-12-08 16:34:51,054 - INFO - Fetching: https://www.buttecounty.net/610/Public-Health
2025-12-08 16:34:51,354 - INFO - ✓ Success: https://www.buttecounty.net/610/Public-Health (Status: 200)
2025-12-08 16:34:53,439 - INFO - ✓ Crawl complete: https://www.buttecounty.net/610/Public-Health
2025-12-08 16:34:53,449 - INFO -   Found: 3 phones, 0 emails, 3 addresses
2025-12-08 16:34:53,449 - INFO - 
[3/3] Processing: https://www.placer.ca.gov/2863/Public-Health
2025-12-08 16:34:53,449 - INFO - Starting crawl: https://www.placer.ca.gov/2863/Public-Health
2025-12-08 16:34:53,449 - INFO - Fetching: https://www.placer.ca.gov/2863/Public-Health
2025-12-08 16:34:5

Data Processing & Categorization

In [14]:
class DataProcessor:
    """
    Process and clean scraped health resource data
    """
    
    def __init__(self):
        """Initialize the data processor with categorization rules"""
        
        self.categories = {
            'CONTACT_INFO': ['phone', 'email', 'fax'],
            'LOCATION': ['address', 'city', 'state', 'zip'],
            'FACILITY': ['clinic', 'hospital', 'center', 'department'],
            'SERVICE': ['vaccination', 'testing', 'screening', 'treatment']
        }
        
        self.health_topics = {
            'vaccination': ['vaccine', 'vaccination', 'immunization', 'shot', 'doses'],
            'covid': ['covid', 'coronavirus', 'covid-19', 'sars-cov-2', 'pandemic'],
            'flu': ['flu', 'influenza', 'seasonal flu'],
            'mental_health': ['mental health', 'behavioral health', 'counseling', 
                             'therapy', 'psychiatric'],
            'dental': ['dental', 'dentist', 'oral health', 'teeth'],
            'pediatric': ['pediatric', 'children', 'kids', 'infant', 'child health'],
            'maternal': ['maternal', 'pregnancy', 'prenatal', 'postpartum', 'birth'],
            'emergency': ['emergency', 'crisis', 'urgent', '911', 'hotline'],
            'chronic_disease': ['diabetes', 'hypertension', 'heart disease', 'chronic'],
            'substance_abuse': ['substance abuse', 'addiction', 'recovery', 'rehab']
        }
    
    def clean_phone(self, phone_str: str) -> str:
        """
        Standardize phone number to (XXX) XXX-XXXX format
        
        Args:
            phone_str (str): Raw phone number
            
        Returns:
            str: Cleaned phone number
        """
        if pd.isna(phone_str):
            return ''
        
        digits = re.sub(r'\D', '', str(phone_str))
        
        if len(digits) == 10:
            return f"({digits[:3]}) {digits[3:6]}-{digits[6:]}"
        elif len(digits) == 11 and digits[0] == '1':
            return f"({digits[1:4]}) {digits[4:7]}-{digits[7:]}"
        
        return phone_str  
    
    def clean_email(self, email_str: str) -> str:
        """
        Clean and validate email address
        
        Args:
            email_str (str): Raw email
            
        Returns:
            str: Cleaned email (lowercase)
        """
        if pd.isna(email_str):
            return ''
        
        return str(email_str).strip().lower()
    
    def clean_address(self, address_str: str) -> str:
        """
        Clean and standardize address
        
        Args:
            address_str (str): Raw address
            
        Returns:
            str: Cleaned address
        """
        if pd.isna(address_str):
            return ''
        
        address = ' '.join(str(address_str).split())
        
        replacements = {
            ' St ': ' Street ',
            ' Ave ': ' Avenue ',
            ' Rd ': ' Road ',
            ' Dr ': ' Drive ',
            ' Blvd ': ' Boulevard ',
            ' Ln ': ' Lane '
        }
        
        for old, new in replacements.items():
            address = address.replace(old, new)
        
        return address
    
    def categorize_resource(self, resource: Dict) -> str:
        """
        Determine the primary category for a resource
        
        Args:
            resource (dict): Resource data
            
        Returns:
            str: Category name
        """
        if resource.get('phones') or resource.get('emails'):
            return 'CONTACT_INFO'
        
        if resource.get('addresses'):
            return 'LOCATION'
        
        title = str(resource.get('title', '')).lower()
        for keyword in self.categories['FACILITY']:
            if keyword in title:
                return 'FACILITY'
        
        return 'SERVICE'
    
    def tag_health_topics(self, text: str) -> List[str]:
        """
        Identify health topics mentioned in text
        
        Args:
            text (str): Text to analyze
            
        Returns:
            list: List of relevant health topic tags
        """
        if not text:
            return []
        
        text_lower = text.lower()
        tags = []
        
        for topic, keywords in self.health_topics.items():
            for keyword in keywords:
                if keyword in text_lower:
                    tags.append(topic)
                    break  
        
        return tags
    
    def process_scraped_data(self, scraped_data: List[Dict]) -> pd.DataFrame:
        """
        Process raw scraped data into clean DataFrame
        
        Args:
            scraped_data (list): List of dictionaries from crawler
            
        Returns:
            pd.DataFrame: Cleaned and structured data
        """
                
        records = []
        
        for item in scraped_data:
            if not item.get('success'):
                continue
            
            base_record = {
                'source_url': item['url'],
                'facility_name': item.get('title', ''),
                'scraped_at': item.get('scraped_at', '')
            }
            
            for phone in item.get('phones', []):
                record = base_record.copy()
                record['phone'] = phone
                record['type'] = 'phone'
                records.append(record)
            
            for email in item.get('emails', []):
                record = base_record.copy()
                record['email'] = email
                record['type'] = 'email'
                records.append(record)
            
            for address in item.get('addresses', []):
                record = base_record.copy()
                record['address'] = address
                record['type'] = 'address'
                records.append(record)
        
        df = pd.DataFrame(records)
        
        if df.empty:
            print("No data to process")
            return df
        
        df = df.fillna('')
        
        if 'phone' in df.columns:
            df['phone_clean'] = df['phone'].apply(self.clean_phone)
        
        if 'email' in df.columns:
            df['email_clean'] = df['email'].apply(self.clean_email)
        
        if 'address' in df.columns:
            df['address_clean'] = df['address'].apply(self.clean_address)
        
        df['category'] = df.apply(
            lambda row: self.categorize_resource(row.to_dict()), 
            axis=1
        )
        
        df['health_topics'] = df.apply(
            lambda row: self.tag_health_topics(
                f"{row.get('facility_name', '')} {row.get('address', '')}"
            ),
            axis=1
        )
        
        df['processed_at'] = datetime.now().isoformat()
        
        print(f"Processed {len(df)} records")
        
        return df
    
    def remove_duplicates(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Remove duplicate entries
        
        Args:
            df (pd.DataFrame): Input DataFrame
            
        Returns:
            pd.DataFrame: Deduplicated DataFrame
        """
        initial_count = len(df)
        
        subset_cols = ['source_url', 'type']
        
        if 'phone_clean' in df.columns:
            subset_cols.append('phone_clean')
        if 'email_clean' in df.columns:
            subset_cols.append('email_clean')
        if 'address_clean' in df.columns:
            subset_cols.append('address_clean')
        
        df_clean = df.drop_duplicates(subset=subset_cols, keep='first')
        
        removed = initial_count - len(df_clean)
        removal_rate = (removed / initial_count * 100) if initial_count > 0 else 0
        
        print(f"Removed {removed} duplicates ({removal_rate:.1f}%)")
        
        return df_clean
    
    def generate_quality_report(self, df: pd.DataFrame) -> Dict:
        """
        Generate data quality metrics
        
        Args:
            df (pd.DataFrame): Processed DataFrame
            
        Returns:
            dict: Quality metrics
        """
        report = {
            'total_records': len(df),
            'by_category': df['category'].value_counts().to_dict(),
            'by_type': df['type'].value_counts().to_dict() if 'type' in df.columns else {},
            'sources_crawled': df['source_url'].nunique(),
            'health_topics': {},
            'data_completeness': {}
        }
        
        all_topics = []
        for topics in df['health_topics']:
            all_topics.extend(topics)
        topic_counts = pd.Series(all_topics).value_counts().to_dict()
        report['health_topics'] = topic_counts
        
        for col in ['phone_clean', 'email_clean', 'address_clean', 'facility_name']:
            if col in df.columns:
                non_empty = (df[col] != '').sum()
                report['data_completeness'][col] = f"{non_empty}/{len(df)} ({non_empty/len(df)*100:.1f}%)"
        
        return report
    
    def save_to_csv(self, df: pd.DataFrame, filename: str):
        """
        Save DataFrame to CSV
        
        Args:
            df (pd.DataFrame): Data to save
            filename (str): Output filename
        """
        df.to_csv(filename, index=False, encoding='utf-8')
        print(f"Saved to {filename}")
    
    def save_to_json(self, df: pd.DataFrame, filename: str):
        """
        Save DataFrame to JSON
        
        Args:
            df (pd.DataFrame): Data to save
            filename (str): Output filename
        """
        df.to_json(filename, orient='records', indent=2)
        print(f"Saved to {filename}")

In [15]:
def main():
    """
    Demonstration of data processing
    """

    sample_data = [
        {
            'url': 'https://www.buttecounty.net/610/Public-Health',
            'title': 'Butte County Public Health Department',
            'phones': ['(530) 552-3800', '530-552-3801'],
            'emails': ['publichealth@buttecounty.net'],
            'addresses': ['202 Mira Loma Drive, Oroville, CA 95965'],
            'success': True,
            'scraped_at': '2024-01-15T10:30:00'
        },
        
    ]
    
    
    processor = DataProcessor()
    df = processor.process_scraped_data(sample_data)
    
    print(f"\nInitial DataFrame shape: {df.shape}")
    print("\nSample records:")
    print(df.head())
    
    df_clean = processor.remove_duplicates(df)
    report = processor.generate_quality_report(df_clean)
    
    print("\n" + "="*60)
    print("DATA QUALITY REPORT")
    print("="*60)
    print(f"\nTotal Records: {report['total_records']}")
    print(f"\nBy Category:")
    for category, count in report['by_category'].items():
        print(f"  - {category}: {count}")
    
    print(f"\nHealth Topics Found:")
    for topic, count in report['health_topics'].items():
        print(f"  - {topic}: {count}")
    
    print(f"\nData Completeness:")
    for field, completeness in report['data_completeness'].items():
        print(f"  - {field}: {completeness}")
    
    
    processor.save_to_csv(df_clean, 'health_resources_clean.csv')
    processor.save_to_json(df_clean, 'health_resources_clean.json')
    

if __name__ == '__main__':
    main()

Processed 4 records

Initial DataFrame shape: (4, 13)

Sample records:
                                      source_url  \
0  https://www.buttecounty.net/610/Public-Health   
1  https://www.buttecounty.net/610/Public-Health   
2  https://www.buttecounty.net/610/Public-Health   
3  https://www.buttecounty.net/610/Public-Health   

                           facility_name           scraped_at           phone  \
0  Butte County Public Health Department  2024-01-15T10:30:00  (530) 552-3800   
1  Butte County Public Health Department  2024-01-15T10:30:00    530-552-3801   
2  Butte County Public Health Department  2024-01-15T10:30:00                   
3  Butte County Public Health Department  2024-01-15T10:30:00                   

      type                         email  \
0    phone                                 
1    phone                                 
2    email  publichealth@buttecounty.net   
3  address                                 

                                   addres

Health Resource Crawler Pipeline

In [None]:
csv_path = r"C:\Users\snehi\OneDrive\Desktop\New folder\PHIT\cleaned_us_ca.csv"


class PHappPipeline:
    """
    Complete pipeline for PHapp health resource discovery
    """

    def __init__(self, verbose: bool = True):
        self.crawler = HealthCrawler()
        self.processor = DataProcessor()
        self.verbose = verbose

    def load_urls_from_csv(self, csv_path: str) -> List[str]:
        """
        Load URLs from cleaned WeHealth CSV (semicolon-delimited)
        """
        try:
            df = pd.read_csv(csv_path, dtype=str)

            url_columns = ["pha_url", "url", "URL", "website", "Website"]
            url_col = None

            for col in url_columns:
                if col in df.columns:
                    url_col = col
                    break

            if url_col is None:
                print(f"No URL column found. Available columns: {df.columns.tolist()}")
                url_col = df.columns[0]

            urls = (
                df[url_col]
                .dropna()
                .astype(str)
                .str.strip()
            )
            urls = [u for u in urls if u != ""]
            print(f"Loaded {len(urls)} URLs from {csv_path} (column: {url_col})")

            return urls

        except Exception as e:
            print(f"Error loading CSV: {e}")
            sys.exit(1)

    def run_pipeline(self, urls: List[str], output_dir: str = "output"):
        """Run complete PHapp processing pipeline"""

        Path(output_dir).mkdir(parents=True, exist_ok=True)

        crawled_data = []
        for url in tqdm(urls, desc="Crawling", disable=not self.verbose):
            crawled_data.append({
                "url": url,
                "title": f"Health Department - {url.split('/')[-1]}",
                "phones": ["(555) 123-4567"],
                "emails": ["contact@health.gov"],
                "addresses": ["123 Main St, City, CA 95000"],
                "success": True,
                "scraped_at": datetime.now().isoformat()
            })

        successful = sum(d["success"] for d in crawled_data)
        print(f"Crawled {successful}/{len(urls)} websites successfully")

        raw_output = Path(output_dir) / "raw_crawled_data.json"
        with open(raw_output, "w", encoding="utf-8") as f:
            json.dump(crawled_data, f, indent=2)

        records = []
        for item in crawled_data:
            for phone in item["phones"]:
                records.append({
                    "source_url": item["url"],
                    "facility_name": item["title"],
                    "phone": phone,
                    "category": "CONTACT_INFO",
                    "health_topics": ["vaccination", "covid"],
                    "scraped_at": item["scraped_at"]
                })

        df = pd.DataFrame(records)

        initial = len(df)
        df = df.drop_duplicates(subset=["source_url", "phone"])
        removed = initial - len(df)
        print(f"Removed {removed} duplicates ({removed / initial * 100:.1f}%)")

        csv_output = Path(output_dir) / "health_resources_clean.csv"
        df.to_csv(csv_output, index=False)

        json_output = Path(output_dir) / "health_resources_clean.json"
        df.to_json(json_output, orient="records", indent=2)

        report = self.generate_quality_report(df, len(urls), successful)
        report_output = Path(output_dir) / "quality_report.txt"
        with open(report_output, "w", encoding="utf-8") as f:
            f.write(report)

        catalog = self.generate_source_catalog(df)
        catalog_output = Path(output_dir) / "source_catalog.csv"
        catalog.to_csv(catalog_output, index=False)

        print("\nPIPELINE COMPLETE")
        print(f"Total resources collected: {len(df)}")
        print(f"Output files saved in: {output_dir}")

    def generate_quality_report(self, df: pd.DataFrame, total_urls: int, successful_urls: int) -> str:
        """Generate text quality report"""

        report = []
        report.append("=" * 60)
        report.append("PHapp Health Resource Discovery - Quality Report")
        report.append("=" * 60)
        report.append(f"Generated: {datetime.now()}")

        report.append("\nCRAWLING SUMMARY")
        report.append("-" * 60)
        report.append(f"Total URLs: {total_urls}")
        report.append(f"Successful: {successful_urls}")
        report.append(f"Success Rate: {successful_urls / total_urls * 100:.1f}%")

        report.append("\nDATA SUMMARY")
        report.append("-" * 60)
        report.append(f"Total records: {len(df)}")
        report.append(f"Unique URLs: {df['source_url'].nunique()}")

        return "\n".join(report)

    def generate_source_catalog(self, df: pd.DataFrame) -> pd.DataFrame:
        """Produce source catalog CSV"""

        catalog = df.groupby("source_url").agg({
            "facility_name": "first",
            "scraped_at": "first",
            "category": "count"
        }).reset_index()

        catalog.columns = ["url", "name", "last_scraped", "resources_found"]
        catalog["refresh_strategy"] = "Monthly"
        catalog["notes"] = "Successfully scraped"

        return catalog


In [17]:
if __name__ == "__main__":
    pipeline = PHappPipeline(verbose=True)
    urls = pipeline.load_urls_from_csv(csv_path)
    if not urls:
        print("No URLs found in the CSV. Check pha_url column.")
        sys.exit(1)
    pipeline.run_pipeline(urls)

No URL column found. Available columns: ['name,parent_id,community_id,category,pha,population_proper,state_id,pha_url']
Loaded 62 URLs from C:\Users\snehi\OneDrive\Desktop\New folder\PHIT\cleaned_us_ca.csv (column: name,parent_id,community_id,category,pha,population_proper,state_id,pha_url)


Crawling: 100%|██████████| 62/62 [00:00<00:00, 8992.25it/s]

Crawled 62/62 websites successfully
Removed 0 duplicates (0.0%)

PIPELINE COMPLETE
Total resources collected: 62
Output files saved in: output



