In [1]:
import pandas as pd
import numpy as np
import bs4 
import os
from bs4 import BeautifulSoup
import random
import matplotlib.pyplot as plt
import requests
import time
import re
from urllib.parse import urlencode, parse_qsl, urlparse, urlunparse
from contextlib import redirect_stdout

In [2]:
response = requests.get("https://nces.ed.gov/ccd/schoolsearch/")
response.status_code

200

In [None]:
href = "school_detail.asp?Search=1&InstName=ALDEN+INTERMEDIATE+SCHOOL&State=36&County=Erie&SchoolType=1&SchoolType=2&SchoolType=3&SchoolType=4&SpecificSchlTypes=all&IncGrade=-1&LoGrade=-1&HiGrade=-1&ID=360255006572"
base_url = "https://nces.ed.gov/ccd/schoolsearch/school_list.asp"
detail_url = base_url + href  


In [4]:
detail_response = requests.get(detail_url)
detail_soup = BeautifulSoup(detail_response.content, 'html.parser')


In [5]:
def get_school_detail_url(school_name, county, address=''):
    def try_search(name, addr=''):
        base_url = "https://nces.ed.gov/ccd/schoolsearch/school_list.asp"
        
        params = {
            "Search": "1",
            "InstName": name,
            "Address": addr,
            "State": "36",        # New York 
            "County": county,
            "SchoolType": ["1", "2", "3", "4"],  
            "SpecificSchlTypes": "all",
            "IncGrade": "-1",
            "LoGrade": "-1",
            "HiGrade": "-1"
        }

        session = requests.Session()
        req = requests.Request('GET', base_url, params=params)
        prepped = session.prepare_request(req)

        url_parts = list(urlparse(prepped.url))
        query = dict(parse_qsl(url_parts[4]))
        query.pop('SchoolType', None)
        school_types = ["1", "2", "3", "4"]
        new_query_items = [(k, v) for k, v in query.items()]
        for st in school_types:
            new_query_items.append(("SchoolType", st))
        url_parts[4] = urlencode(new_query_items)
        final_url = urlunparse(url_parts)

        response = session.get(final_url)
        soup = BeautifulSoup(response.text, "html.parser")

        link_tag = soup.find('a', href=lambda x: x and "school_detail.asp" in x)
        if link_tag:
            return "https://nces.ed.gov/ccd/schoolsearch/" + link_tag['href']
        else:
            return None

    # full name 
    detail_url = try_search(school_name)
    if detail_url:
        return detail_url

    # first two words if different from full name
    first_two_words = ' '.join(school_name.split()[:2])
    if len(first_two_words) > 2 and first_two_words != school_name:
        detail_url = try_search(first_two_words)
        if detail_url:
            return detail_url

    # first word plus address
    first_word = school_name.split()[0]
    if address and first_word.lower() != school_name.lower():
        detail_url = try_search(first_word, address)
        if detail_url:
            return detail_url
    
    # address only, no name
    if address:
        detail_url = try_search('',address)
        if detail_url:
            return detail_url
        
    # first word only
    if first_word.lower() != school_name.lower():
        detail_url = try_search(first_word)
        if detail_url:
            return detail_url
        
    # second word plus address
    '''
    second_word = school_name.split()[1]
    if address and second_word.lower() != school_name.lower():
        detail_url = try_search(second_word, address)
        if detail_url:
            return detail_url
    '''        
        



    print(f"No detail link found for {school_name} in {county}")
    print(f"Address of school : {address}")
    return None


In [6]:


def get_free_reduced_lunch_total(html):
    """
    Number for 'Free and reduced-price lunch eligible total' from the school detail page HTML.

    Parameters:
        html (str): HTML content of the school detail page.

    Returns:
        str or None: The number as a string if found, otherwise None.
    """
    soup = BeautifulSoup(html, 'html.parser')
    label_part = "Free and reduced-price lunch eligible total"

    for b in soup.find_all('b'):
        if label_part in b.get_text():
            value = b.next_sibling
            if value:
                return value.strip()
    return None

def get_total_count(html):
    """
    Number for 'Total Students' from the school detail page HTML.

    Parameters:
        html (str): HTML content of the school detail page.

    Returns:
        str or None: The number as a string if found, otherwise None.
    """
    soup = BeautifulSoup(html, 'html.parser')
    label_part = "Total Students"

    for b in soup.find_all('b'):
        if label_part in b.get_text():
            value = b.next_sibling
            if value:
                return value.strip()
    return None

def get_demographics(html):
    
    soup    = BeautifulSoup(html,'html.parser')
    label   = soup.find('strong', string="Enrollment by Race/Ethnicity:")
    table   = label.find_next("table")
    rows    = table.find_all("tr")
    headers = rows[0].find_all('td')[1:]
    ethnicities = [''.join(header.stripped_strings) for header in headers]

    num_cells  = rows[1].find_all('td')[1:]
    demographics = {}
    for header, cell in zip(ethnicities, num_cells):
        text = cell.get_text(strip=True).replace(',','')
        try:
            count = int(text)
            demographics[header] = count
        except ValueError:
            print(f"Skipped invalid demographic count: '{text}' for ethnicity '{header}'")
            continue
    return demographics


def get_stud_teacher_ratio(html):

    soup= BeautifulSoup(html,'html.parser')
    label_parts = "Student/Teacher Ratio:"

    for b in soup.find_all('b'):
        if label_parts in b.get_text():
            value = b.next_sibling
            print(value)
            if value:
                return value.strip()
    return None

def school_name(name):
    # Remove parentheses content and 'Dr.' prefix
    name = re.sub(r'\s*\(.*?\)\s*', '', name)
    name = re.sub(r'^\s*Dr\.?\s+', '', name, flags=re.IGNORECASE)

    abbreviation_map = {
        'JF': 'John F',
        'MLK': 'Martin Luther King',
        'RFK': 'Robert F Kennedy',
    }

    # Replace abbreviation only if name starts with abbreviation
    for abbr, full in abbreviation_map.items():
        if name.upper().startswith(abbr + ' '):
            name = re.sub(r'^' + abbr + r'\b', full, name, flags=re.IGNORECASE)
            break
    
    name =name.replace('-','')
    # Remove single letters that are surrounded by spaces only (not initials)
    name = re.sub(r'(?<=\s)[A-Za-z](?=\s)', '', name)
    # Collapse multiple spaces to one and strip
    name = re.sub(r'\s{2,}', ' ', name).strip()

    # Remove suffixes that can interfere
    suffix_list = [r'\bPREK\b', r'\bH S\b', r'\bES-JS-HS\b']
    for suffix in suffix_list:
        name = re.sub(suffix, '', name, flags=re.IGNORECASE)

    parts = name.split(',', 1)
    main_name = parts[0].strip()
    extra_info = parts[1].strip() if len(parts) > 1 else ''

    ps_match = re.match(r'PS\s*(\d+)', main_name, re.IGNORECASE)
    ps_number = int(ps_match.group(1)) if ps_match else None

    if ps_match:
        main_name = main_name[ps_match.end():].strip()

    school_type = ''
    suffix_map = {
        r'\bPREK\b': 'Pre-Kindergarten',
        r'\bES\b': 'Elementary School',
        r'\bHS\b': 'High School',
        r'\bMS\b': 'Middle School',
        r'\bJHS\b': 'Junior High School',
        r'\bK-12\b': 'K-12 School',
        r'\bELEM\b': 'Elementary School',
        r'\bSHS\b': 'High School',
        r'\bPRIMARY\b': 'Elementary School',
    }

    for pattern, type_name in suffix_map.items():
        if re.search(pattern, main_name, flags=re.IGNORECASE):
            school_type = type_name
            main_name = re.sub(pattern, '', main_name, flags=re.IGNORECASE).strip()
            break

    main_name = main_name.replace('.', '')

    return main_name, extra_info, ps_number


In [7]:
school = pd.read_csv("Lead_Testing_in_Schools.csv")

In [8]:
school.columns

Index(['Compliance Period', 'School District', 'School', 'County',
       'Type of Organization', 'BEDS Code', 'School Website',
       'Number of Outlets that Require Sampling',
       'Number of Outlets Sampled 2023', 'Number of Outlets Sampled 2024',
       'Number of Outlets Sampled 2025', 'Sampling Complete',
       'Number of Outlets, Result ≤ 5 ppb',
       'Number of Outlets, Result > 5 ppb', 'All Results Received',
       'Out of Service or Addressed', 'Remediation Status', 'School Street',
       'School City', 'School State', 'School ZIP Code', 'Date Survey Updated',
       'County Location', 'Location'],
      dtype='object')

In [9]:
school.County.unique()

array(['Steuben', 'Oneida', 'Chenango', 'Orleans', 'Erie', 'Jefferson',
       'Allegany', 'Cattaraugus', 'Suffolk', 'Washington', 'Dutchess',
       'Rensselaer', 'Nassau', 'Onondaga', 'Lewis', 'Clinton', 'Albany',
       'Westchester', 'Warren', 'Essex', 'St.Lawrence', 'Putnam',
       'Monroe', 'Chautauqua', 'Franklin', 'Greene', 'Montgomery',
       'Madison', 'Tioga', 'Oswego', 'Delaware', 'Broome', 'Orange',
       'Cortland', 'Saratoga', 'Herkimer', 'Tompkins', 'Schenectady',
       'Yates', 'Ontario', 'Rockland', 'Otsego', 'Sullivan', 'Wayne',
       'Columbia', 'Schoharie', 'Fulton', 'Chemung', 'Hamilton', 'Ulster',
       'Wyoming', 'Niagara', 'Queens', 'Richmond', 'Bronx', 'Manhattan',
       'Man', 'Kings', 'Schuyler', 'Genesee', 'Seneca', 'Cayuga',
       'New York', 'Livingston'], dtype=object)

In [10]:
school_info = school.copy()

In [11]:
school_info[['Cleaned_school_name','Type','PS_No']] = school_info['School'].apply(lambda x: pd.Series(school_name(x)))

In [12]:
school_info['Free_lunch']                       = np.nan
school_info['Total_students']                   = np.nan
school_info['Ratio_free']                       = np.nan
school_info['Student_teacher_ratio']            = np.nan
school_info['American Indian/Alaska Native']    = np.nan
school_info['Asian']                            = np.nan
school_info['Black']                            = np.nan
school_info['Hispanic']                         = np.nan
school_info['White']                            = np.nan
school_info['Native Hawaiian/Pacific Islander'] = np.nan
school_info['Two or More Races']                = np.nan

In [None]:
start = 2300
end   = 2399

log_filename = f"schools_data_{start}_{end}_log.txt"
csv_filename = f"schools_data_{start}_{end}.csv"

log_path = os.path.join(os.getcwd(), log_filename)
with open(log_path, "w") as log_file, redirect_stdout(log_file):

    for idx, row in school_info.iloc[start:end+1].iterrows():
        school_name   = row['Cleaned_school_name']
        county        = row['County']
        school_street = row['School Street']

        if county == 'St.Lawrence':
            county = 'St. Lawrence'

        if county == 'Manhattan':
            county = 'New York County'

        detail_url = get_school_detail_url(school_name, county, school_street)

        if detail_url:
            detail_resp = requests.get(detail_url)
            if detail_resp.status_code == 200:
                lunch      = get_free_reduced_lunch_total(detail_resp.text)
                total_kids = get_total_count(detail_resp.text)

                try:
                    lunch_val = int(lunch.replace(",", "")) if lunch else np.nan
                    total_val = int(total_kids.replace(",", "")) if total_kids else np.nan
                except ValueError:
                    lunch_val = np.nan
                    total_val = np.nan

                school_info.at[idx, 'Free_lunch']     = lunch_val
                school_info.at[idx, 'Total_students'] = total_val

                if not np.isnan(lunch_val) and not np.isnan(total_val) and total_val != 0:
                    school_info.at[idx, 'Ratio_free'] = lunch_val / total_val
                
                stud_teacher_ratio = get_stud_teacher_ratio(detail_resp.text)
                if stud_teacher_ratio:
                    try:
                        stud_teacher_ratio_float = float(stud_teacher_ratio)
                    except(ValueError,TypeError):
                        stud_teacher_ratio_float = np.nan
                    school_info.at[idx, 'Student_teacher_ratio'] = stud_teacher_ratio_float

                demographics = get_demographics(detail_resp.text)
                if demographics and not np.isnan(total_val) and total_val!=0:
                    for key in demographics:
                        if key in school_info.columns:
                            ratio = demographics[key]/total_val
                            school_info.at[idx, key] = ratio

            else:
                print(f"Failed to get detail page for {school_name}")
        else:
            print(f"Detail URL not found for {school_name}")

        time.sleep(random.uniform(0.02, 0.04))

    if 'Type' in school_info.columns:
        school_info = school_info.drop('Type', axis=1)


    output_path = os.path.join(os.getcwd(), csv_filename)
    school_info.iloc[start:end+1].to_csv(output_path, index=False)

    print(f"\nSaved processed schools to {csv_filename}")
    print(f"Logs saved to {log_filename}")
