In [None]:
#Black Teacher Archvive - School Finder
##Inputs historical school listings and identifies current locations
##https://curiosity.lib.harvard.edu/black-teacher-archive

In [None]:
#Setup - libraries, globals, I/O, references

##Libraries
import pandas as pd
import re
import os
import time
import re
import requests
import random

##Global variables
prepped_extension = "_PREPPED"
geonames_extension = "_GEONAMES"
hmdb_extension = "_HMDB"
geonames_username = "..." #GeoNames username
fuzzy_value = 0.7 #based on the Damerau-Levenshtein

##I/O
input_csv_path = '...'  # Update with the actual path
input_parent_dir = os.path.dirname(input_csv_path)
processing_output_dir = os.path.join(input_parent_dir, 'Processing')
hmdb_output_dir = os.path.join(input_parent_dir, 'Output')

if not os.path.exists(processing_output_dir):
    os.makedirs(processing_output_dir)
    print(f"Created directory: {processing_output_dir}")
else:
    print(f"Directory already exists: {processing_output_dir}")

if not os.path.exists(hmdb_output_dir):
    os.makedirs(hmdb_output_dir)
    print(f"Created directory: {hmdb_output_dir}")
else:
    print(f"Directory already exists: {hmdb_output_dir}")

prepped_output_csv = os.path.join(processing_output_dir, os.path.splitext(os.path.basename(input_csv_path))[0] + prepped_extension + ".csv")
geonames_output_csv = os.path.join(processing_output_dir, os.path.splitext(os.path.basename(input_csv_path))[0] + geonames_extension + ".csv")
hmdb_output_csv = os.path.join(hmdb_output_dir, os.path.splitext(os.path.basename(input_csv_path))[0] + hmdb_extension + ".csv")

print(f"Prepped output CSV path: {prepped_output_csv}")
print(f"GeoNames output CSV path: {geonames_output_csv}")
print(f"HMDB output CSV path: {hmdb_output_csv}")

##References
state_abbreviations = {
    'Alabama': 'AL', 'Alaska': 'AK', 'Arizona': 'AZ', 'Arkansas': 'AR', 'California': 'CA', 
    'Colorado': 'CO', 'Connecticut': 'CT', 'Delaware': 'DE', 'Florida': 'FL', 'Georgia': 'GA', 
    'Hawaii': 'HI', 'Idaho': 'ID', 'Illinois': 'IL', 'Indiana': 'IN', 'Iowa': 'IA', 
    'Kansas': 'KS', 'Kentucky': 'KY', 'Louisiana': 'LA', 'Maine': 'ME', 'Maryland': 'MD', 
    'Massachusetts': 'MA', 'Michigan': 'MI', 'Minnesota': 'MN', 'Mississippi': 'MS', 'Missouri': 'MO', 
    'Montana': 'MT', 'Nebraska': 'NE', 'Nevada': 'NV', 'New Hampshire': 'NH', 'New Jersey': 'NJ', 
    'New Mexico': 'NM', 'New York': 'NY', 'North Carolina': 'NC', 'North Dakota': 'ND', 'Ohio': 'OH', 
    'Oklahoma': 'OK', 'Oregon': 'OR', 'Pennsylvania': 'PA', 'Rhode Island': 'RI', 'South Carolina': 'SC', 
    'South Dakota': 'SD', 'Tennessee': 'TN', 'Texas': 'TX', 'Utah': 'UT', 'Vermont': 'VT', 
    'Virginia': 'VA', 'Washington': 'WA', 'West Virginia': 'WV', 'Wisconsin': 'WI', 'Wyoming': 'WY'
}


In [None]:
#Data Prep
##Takes arbitrary historical table input and outputs standardized table for downstream searches
def standardize_csv(input_csv_path, prepped_output_csv):
    # Parse the input filename to get the state
    filename = os.path.basename(input_csv_path)
    state = filename.split()[0]
    print(f"DEBUG: Extracted state from filename: {state}", " processing now...")  # Debug log for state name
    
    # Load the input CSV file into a DataFrame
    df = pd.read_csv(input_csv_path)
    
    # Define the template columns and corresponding regular expressions for matching
    template_columns = {
        "State": re.compile(r'\bstate\b', re.IGNORECASE),
        "County": re.compile(r'\bcounty\b', re.IGNORECASE),
        "City": re.compile(r'\b(city|location)\b', re.IGNORECASE),
        "School": re.compile(r'\b(school|name\s*of\s*school|school\s*name|institution)\b', re.IGNORECASE),
        "Latitude": re.compile(r'\blatitude\b', re.IGNORECASE),
        "Longitude": re.compile(r'\blongitude\b', re.IGNORECASE),
        "GeoNames Name": re.compile(r'\bgeonames\s*name\b', re.IGNORECASE),
        "GeoNames Feature Code": re.compile(r'\bgeonames\s*feature\s*code\b', re.IGNORECASE),
        "HMDB Name": re.compile(r'\bhmdb\s*name\b', re.IGNORECASE)
    }
    
    # Create a new DataFrame with the required template columns
    standardized_df = pd.DataFrame(columns=template_columns.keys())
    
    # Copy relevant columns from the input DataFrame to the new DataFrame
    for template_col, regex in template_columns.items():
        if template_col != "State":  # Skip the "State" column as it's handled separately
            matching_columns = [col for col in df.columns if regex.search(col)]
            if matching_columns:
                standardized_df[template_col] = df[matching_columns[0]]
    
    # Set the "State" column with the parsed state value for each row
    standardized_df["State"] = state
    
    # Save the standardized DataFrame to the output CSV file
    standardized_df.to_csv(prepped_output_csv, index=False)
    print(f"Standardized CSV file saved to {prepped_output_csv}")

standardize_csv(input_csv_path, prepped_output_csv)


In [None]:
#GEONAMES Search Script
##Takes schools table and returns lat/long + GeoNames name + feature code (e.g., SCH)

# Start timer
start_time = time.time()

# Function to increment API request counter
request_counter = 0
def increment_request_counter():
    global request_counter
    request_counter += 1

# Function to check if county is in GeoNames hierarchy
def is_county_in_hierarchy(username, geonameId, target_county):
    if pd.isna(target_county):
        return False
    hierarchy_url = "http://api.geonames.org/hierarchyJSON"
    params = {
        'geonameId': geonameId,
        'username': username
    }
    increment_request_counter()
    response = requests.get(hierarchy_url, params=params)
    if response.status_code == 200:
        hierarchy_data = response.json()
        for place in hierarchy_data.get('geonames', []):
            if target_county.lower() in place.get('name', '').lower():
                return True
    return False

# Function to check if city is in GeoNames hierarchy
def is_city_in_hierarchy(username, geonameId, target_city):
    if pd.isna(target_city):
        return False
    hierarchy_url = "http://api.geonames.org/hierarchyJSON"
    params = {
        'geonameId': geonameId,
        'username': username
    }
    increment_request_counter()
    response = requests.get(hierarchy_url, params=params)
    if response.status_code == 200:
        hierarchy_data = response.json()
        for place in hierarchy_data.get('geonames', []):
            if target_city.lower() in place.get('name', '').lower():
                return True
    return False

# Function to get school information from GeoNames
def get_school_info(username, school, target_county, target_city, state, fuzzy, state_abbreviations):
    global request_counter
    # Extract the first word of the school's name for broader search
    base_school_name = school.split()[0]
    school_name_extension = school.split()[1:]
    s = " "
    school_name_extension = s.join(school_name_extension)
    # Educational institution types to include in the search
    institution_types = [school_name_extension, "School", "College", "Academy", school_name_extension + " (historical)"]
    
    # Get state abbreviation
    state_abbr = state_abbreviations.get(state)  # Assume state abbreviation is available
    
    # Attempt searches for each institution type with the base school name
    for institution_type in institution_types:
        print(f"Searching {base_school_name} {institution_type} in {state_abbr}\n")
        search_url = "http://api.geonames.org/searchJSON"
        search_params = {
            'q': f"{base_school_name} {institution_type}",
            'country': 'US',
            'adminCode1': state_abbr,
            'username': username,
            'fuzzy': fuzzy,
            'maxRows': 100
        }
        increment_request_counter()
        search_response = requests.get(search_url, params=search_params)
        if search_response.status_code == 200:
            search_data = search_response.json()
            for result in search_data.get('geonames', []):
                if ((target_county and is_county_in_hierarchy(username, result['geonameId'], target_county)) or
                    (target_city and is_city_in_hierarchy(username, result['geonameId'], target_city))) and result.get('fcode', '') == 'SCH':
                    return result['lat'], result['lng'], result.get('name', ''), result.get('fcode', '')

    return None, None, None, None

# Load your CSV file into a DataFrame
df = pd.read_csv(prepped_output_csv)

# Initialize counters for results
county_matches = 0
city_matches = 0
total_matches = 0
total_matches_list = []

for index, row in df.iterrows():
    school = row['School']
    county = row['County'] if 'County' in row else None
    city = row['City'] if 'City' in row else None
    state = row['State']
    
    lat, lng, geoname_name, fcode = get_school_info(geonames_username, school, county, city, state, fuzzy_value, state_abbreviations)
    if lat and lng:
        df.at[index, 'Latitude'] = lat
        df.at[index, 'Longitude'] = lng
        df.at[index, 'GeoNames Name'] = geoname_name
        df.at[index, 'GeoNames Feature Code'] = fcode
        total_matches += 1
        total_matches_list.append(geoname_name)
        if county:
            county_matches += 1
        elif city:
            city_matches += 1
    else:
        print(f"No valid results found for {school} in {county or city}, {state}")

# Ensure the DataFrame has all the required columns
required_columns = ["State", "County", "City", "School", "Latitude", "Longitude", "GeoNames Name", "GeoNames Feature Code", "HMDB Name"]
for col in required_columns:
    if col not in df.columns:
        df[col] = ""

# Save the updated DataFrame back to CSV
df.to_csv(geonames_output_csv, index=False)
print(f"Updated CSV file saved to {geonames_output_csv}.")

# Print the total number of requests made to the GeoNames API
print(f"Total requests made to GeoNames API: {request_counter}")

# Log the time elapsed
elapsed_time = time.time() - start_time
print(f"Time elapsed: {elapsed_time} seconds")

# Log the results
print(f"Total matches: {total_matches}")
for match in total_matches_list:
    print(f"Match found: {match}")


In [None]:
# Libraries
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

# Function to parse latitude and longitude from text
def parse_lat_long(text):
    text = text.replace('′', '').replace('″', '').replace('°', '')
    parts = text.split(',')
    lat_text = parts[0].strip()
    lng_text = parts[1].strip()

    lat_deg, lat_min = map(float, re.findall(r"[-+]?\d*\.\d+|\d+", lat_text))
    lat = lat_deg + (lat_min / 60)
    if 'S' in lat_text:
        lat = -lat

    lng_deg, lng_min = map(float, re.findall(r"[-+]?\d*\.\d+|\d+", lng_text))
    lng = lng_deg + (lng_min / 60)
    if 'W' in lng_text:
        lng = -lng

    return lat, lng

# Function to extract latitude and longitude from the HMDB page
def extract_lat_long(driver):
    try:
        location_element = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, '//*[@id="mainblock"]/article/span[contains(text(), "Location.")]'))
        )
        location_text = location_element.find_element(By.XPATH, 'following-sibling::node()').text.strip()
        print(f"Location text: {location_text}")
        
        lat_lng_match = re.search(r'([-\d.]+°\s*[\d.]+′\s*[NS]),\s*([-\d.]+°\s*[\d.]+′\s*[EW])', location_text)
        if lat_lng_match:
            lat_lng_text = lat_lng_match.group()
            print(f"Lat/Long text: {lat_lng_text}")
            print('\n')
            lat, lng = parse_lat_long(lat_lng_text)
            return lat, lng
        
        return None, None
    except Exception as e:
       # print(f"Error extracting lat/long: {e}")
        return None, None

# Function to perform the HMDB search and extract lat/long
def perform_hmdb_search(driver, keyword, country, state, county, city):
    wait = WebDriverWait(driver, 3)
    
    show_filters = wait.until(EC.element_to_be_clickable((By.XPATH, "//a[contains(text(), 'show filters')]")))
    show_filters.click()
    
    keyword_search_label = wait.until(EC.presence_of_element_located((By.XPATH, "//h4[text()='Keyword Search']")))
    search_box = keyword_search_label.find_element(By.XPATH, "./following-sibling::div//input[@type='text']")
    search_box.send_keys(keyword)
    
    country_input = wait.until(EC.visibility_of_element_located((By.NAME, "FilterCountry")))
    country_input.send_keys(country)
    
    state_input = wait.until(EC.visibility_of_element_located((By.NAME, "FilterState")))
    state_input.send_keys(state)
    
    if county:
        county_input = wait.until(EC.visibility_of_element_located((By.NAME, "FilterCounty")))
        county_input.send_keys(county)
    
    if city:
        city_input = wait.until(EC.visibility_of_element_located((By.NAME, "FilterTown")))
        city_input.send_keys(city)
    
    search_button = wait.until(EC.element_to_be_clickable((By.ID, "TheButton1")))
    search_button.click()
    
    time.sleep(2)  # Adjust the sleep time if necessary
    headers = driver.find_elements(By.TAG_NAME, "h1")
    headers += driver.find_elements(By.TAG_NAME, "h2")
    
    for header in headers:
        if keyword.lower() in header.text.lower():
            print(f"Found match: {header.text}")  # Debugging statement
            lat, lng = extract_lat_long(driver)
            return header.text, lat, lng
    
    return None, None, None

# Load your CSV file into a DataFrame
df = pd.read_csv(geonames_output_csv)

# Initialize the Selenium WebDriver
driver = webdriver.Safari()
driver.get('https://www.hmdb.org/search.asp')
wait = WebDriverWait(driver, 3)

start_time = time.time()

for index, row in df.iterrows():
    school_name = row['School']
    country = 'United States'
    state = row['State']
    county = row['County'] if 'County' in row else None
    city = row['City'] if 'City' in row else None
    
    if pd.isna(row['GeoNames Name']):  # Check if GeoNames entry is missing
        base_school_name = " ".join(school_name.split()[:2])
        header, lat, lng = perform_hmdb_search(driver, base_school_name, country, state, county, city)
        if header and lat and lng:
            df.at[index, 'HMDB Name'] = header
            df.at[index, 'Latitude'] = lat
            df.at[index, 'Longitude'] = lng
        else:
            print(f"No matches found for {school_name} in {county or city}, {state}")
            print('\n')
            
        driver.get('https://www.hmdb.org/search.asp')
        time.sleep(2)  # Give some time for the page to load properly

elapsed_time = time.time() - start_time
print(f"Time elapsed: {elapsed_time} seconds")

df.to_csv(hmdb_output_csv, index=False)
print(f"Updated CSV file saved to {hmdb_output_csv}")

driver.quit()


In [None]:
#TO DO:
#Find Nearest Address (GeoNames):
##https://www.geonames.org/maps/us-reverse-geocoder.html#findNearestAddress
#Select table image OCR tool ("pre-prep")
#update github README
#bypass Cloudflare verification for HMDB search code block

In [None]:
##Cook 2024
###mncook.net