## Public and private school data

- public schools: https://hifld-geoplatform.opendata.arcgis.com/datasets/public-schools/explore
- private schools: https://hifld-geoplatform.opendata.arcgis.com/datasets/private-schools/explore

### Census data

Income, population, education, and more

In [None]:
import os
import re
import zipfile
import requests
import shutil
import subprocess
import psycopg2
from psycopg2.extras import DictCursor
import pandas as pd
from io import StringIO
from sqlalchemy import create_engine

CENSUS_API_KEY = '5fdf56abf43997adf0d8533a71dea339e4ac5974'
BASE_URL = 'https://api.census.gov/data'
CONN_STR = "postgresql://tommyunger@localhost/work"

In [None]:
def get_metric_details(data_set, variable):
    ENDPOINT = f"{BASE_URL}/{data_set}/variables/{variable}.json"
    response = requests.get(ENDPOINT)

    if response.status_code == 200:
        data = response.json()
        return data
    else:
        return None

def get_or_insert_metric(data_set, census_id):
    check_sql = """SELECT metric_id FROM census.metric WHERE census_id = %s"""
    insert_sql = """
        INSERT INTO census.metric (data_set, census_id, name, details) 
        VALUES (%s, %s, %s, %s)
        RETURNING metric_id;
    """
    with psycopg2.connect(CONN_STR) as conn:
        with conn.cursor() as cursor:
            cursor.execute(check_sql, (census_id,))
            result = cursor.fetchone()
            if result:
                return result[0]
            else:
                dets = get_metric_details(data_set, census_id)
                name = re.sub(r"[^a-z0-9]+", " ", (dets['concept'] + ' ' + dets['label']).lower()).strip()
                details = str(dets)
                cursor.execute(insert_sql, (data_set, census_id, name, details))
                new_metric_id = cursor.fetchone()[0]
                return new_metric_id

def get_states():
    sql = f"""select statefp, stusps, name
            from census.geo_state
            order by 1"""
    with psycopg2.connect(CONN_STR) as conn:
        with conn.cursor(cursor_factory=DictCursor) as cursor:
            cursor.execute(sql)
            results = cursor.fetchall()
            return [dict(row) for row in results]


In [None]:
def download_file(url, dest_folder):
    if not os.path.exists(dest_folder):
        os.makedirs(dest_folder)
    filename = re.sub(r"[^a-z0-9]+", "_", url.lower()) + ".zip"
    filename = os.path.join(dest_folder, filename)
    if os.path.exists(filename):
        return filename
    r = requests.get(url, stream=True)
    with open(filename, 'wb') as f:
        for chunk in r.iter_content(chunk_size=8192):
            f.write(chunk)
    return filename

def unzip_file(file_path, dest_folder):
    with zipfile.ZipFile(file_path, 'r') as zip_ref:
        zip_ref.extractall(dest_folder)
    return dest_folder

def table_exists(table_name, schema_name='public'):
    connection = psycopg2.connect("postgresql://localhost/work")
    cursor = connection.cursor()
    cursor.execute(f"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema='{schema_name}' and table_name='{table_name}');")
    exists = cursor.fetchone()[0]
    cursor.close()
    connection.close()
    return exists

def shp2pgsql(schema_name, table_name, shp_folder, srid):
    # Assuming only one shapefile in the directory. Modify as needed.
    shp_file = next((f for f in os.listdir(shp_folder) if f.endswith('.shp')), None)
    if shp_file is None:
        raise ValueError('No shapefile found in the directory.')
    shp_file_path = os.path.join(shp_folder, shp_file)
    sql_file_path = os.path.join(shp_folder, "output.sql")
    create_table = '-a'
    if not table_exists(table_name, schema_name):
        create_table = '-c'
    cmd = f'shp2pgsql -s {srid} -D {create_table} {shp_file_path} {schema_name}.{table_name} > {sql_file_path}'
    print(cmd)
    subprocess.run(cmd, shell=True, check=True)
    return sql_file_path


def dataframe_to_postgres(df, schema, table_name):
    conn = create_engine("postgresql://localhost/work")
    df[0:10].to_sql(table_name, schema=schema, con=conn, if_exists='replace', index=False)
    connection = psycopg2.connect("postgresql://localhost/work")
    cursor = connection.cursor()
    cursor.execute(f"truncate table {schema}.{table_name}")
    connection.commit()
    cursor.close()
    connection.close()
    conn = psycopg2.connect("postgresql://localhost/work")
    cur = conn.cursor()
    buffer = StringIO()
    df.to_csv(buffer, index=False, header=False)
    buffer.seek(0)
    copy_query = f"COPY {schema}.{table_name} FROM stdin WITH CSV"
    cur.copy_expert(copy_query, buffer)
    conn.commit()
    cur.close()
    conn.close()

def get_column_names(data_set, columns, replace_concept=None):
    column_names = []
    for col in columns:
        vd = get_variable_details(data_set, col)
        col_name = col
        if vd and col not in ['NAME']:
            concept = vd['concept']
            if replace_concept:
                concept = replace_concept
            col_name = re.sub(r" FOR SELECTED [^\(]*", " ", concept) + ' ' + vd['label']
        col_name = re.sub(r"[^a-z0-9]+", " ", col_name.lower()).strip().replace(" ", "_")
        col_name = col_name.replace("_estimate_total", "")
        column_names.append(col_name)
    return column_names

def census_data_to_db(table_name, column_names, data):
    df = pd.DataFrame(data[1:], columns = column_names)
    for col in df.columns:
        if col not in ('name', 'state', 'place'):
            df[col] = df[col].astype(float)
    if 'place' in df.columns and 'state' in df.columns:
        df["place_id"] = df['state'] + df['place']
        df["place_id"] = df['place_id'].astype(int)
    dataframe_to_postgres(df, "census", table_name)

def get_geos(geo_type):
    res = requests.get(f"https://www2.census.gov/geo/tiger/TIGER_RD18/LAYER/{geo_type}/")
    for file_name in re.findall(r'href="([^.]+.zip)"', res.text):
        print(f"Process: {file_name}")
        url = f"https://www2.census.gov/geo/tiger/TIGER_RD18/LAYER/{geo_type}/{file_name}"
        downloaded_zip = download_file(url, '.downloaded')
        try:
            unzipped_folder = unzip_file(downloaded_zip, 'unzipped')
        except:
            print(f"deleting bad file: {downloaded_zip}")
            os.remove(downloaded_zip)
            continue
        sql_file_path = shp2pgsql("census", f"geo_{geo_type.lower()}", unzipped_folder, "4326")
        print(f"Load: {sql_file_path}")
        subprocess.run(f"psql -d work -f {sql_file_path}", shell=True, check=True, 
                       stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
        shutil.rmtree(unzipped_folder)


def get_schools(table_name, data_id, from_srid):
    downloaded_zip = download_file(f"https://opendata.arcgis.com/api/v3/datasets/{data_id}/downloads/data?format=shp&spatialRefId={from_srid}&where=1%3D1", 'downloaded')
    if os.path.exists('.unzipped'):
        shutil.rmtree(unzipped_folder)
    unzipped_folder = unzip_file(downloaded_zip, '.unzipped')
    sql_file_path = shp2pgsql("schools", table_name, unzipped_folder, "{from_srid}:4326")
    print(f"Load: {sql_file_path}")
    subprocess.run(f"psql -d work -f {sql_file_path}", shell=True, check=True, 
                   stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
    shutil.rmtree(unzipped_folder)


In [None]:
def append_df_to_table(df, schema, table_name):
    conn = psycopg2.connect(CONN_STR)
    cur = conn.cursor()
    buffer = StringIO()
    df.to_csv(buffer, index=False, header=False)
    buffer.seek(0)
    copy_query = f"COPY {schema}.{table_name} FROM stdin WITH CSV"
    cur.copy_expert(copy_query, buffer)
    conn.commit()
    cur.close()
    conn.close()

def get_census_data(data_set, variables, geo):
    for st in get_states():
        print(f"Process state: {st['name']}")
        data_names = variables.split(",")
        ENDPOINT = f"{BASE_URL}/{data_set}?get=NAME,{variables}&for={geo}:*&in=state:{st['statefp']}&key={CENSUS_API_KEY}"
        response = requests.get(ENDPOINT)
        if response.status_code == 200:
            data = response.json()
        else:
            print(f"Error {response.status_code}: {response.text}")
            return None
        df = pd.DataFrame(data[1:], columns = data[0])
        for col_num, col in enumerate(df.columns):
            if col_num >= 1 and col_num <= len(data_names):
                df[col] = df[col].astype(float)
        geo_id_cols = df.columns[len(data_names)+1:len(df.columns)]
        df["geo_id"] = df[geo_id_cols].astype(str).apply(''.join, axis=1)
        for col in data_names:
            df["metric_id"] = get_or_insert_metric(data_set, col)
            new_df = df[["geo_id", "metric_id", col]].copy()
            append_df_to_table(new_df, "census", "metric_data")


In [None]:
get_geos("TRACT")
get_geos("STATE")
get_geos("COUNTY")
get_geos("PLACE")
get_geos("BG")
get_geos("TABBLOCK20") # takes a few hours to download all the data

In [None]:
get_schools("public_school", "87376bdb0cb3490cbda39935626f6604_0", "3857")
get_schools("private_school", "0dfe37d2a68545a699b999804354dacf_0", "4326")

In [None]:
# population by age, sex male
fields = ','.join(['P12_0{:0>2}N'.format(d) for d in range(1, 26)])
data = get_census_data('2020/dec/dhc', fields, 'place')

# population by age, sex female
fields = ','.join(['P12_0{:0>2}N'.format(d) for d in range(26, 50)])
data = get_census_data('2020/dec/dhc', fields, 'place')

# population by age, sex male, white only
fields = ','.join(['P12I_0{:0>2}N'.format(d) for d in range(1, 26)])
data = get_census_data('2020/dec/dhc', fields, 'place')

# population by age, sex female, white only
fields = ','.join(['P12I_0{:0>2}N'.format(d) for d in range(26, 50)])
data = get_census_data('2020/dec/dhc', fields, 'place')

# income data
fields = ','.join(['B19001_0{:0>2}E'.format(d) for d in range(1, 18)])
data = get_census_data('2021/acs/acs5', fields, 'place')

# education
fields = ','.join(['B29002_0{:0>2}E'.format(d) for d in range(1, 9)])
data = get_census_data('2021/acs/acs5', fields, 'place')


In [None]:
# tracts population by sex and age
fields = ','.join(['P12_0{:0>2}N'.format(d) for d in range(1, 26)])
data = get_census_data('2020/dec/dhc', fields, 'tract')

fields = ','.join(['P12_0{:0>2}N'.format(d) for d in range(26, 50)])
data = get_census_data('2020/dec/dhc', fields, 'tract')


In [159]:
data = get_census_data('2020/dec/dhc', ','.join(['H10_001N', 'H10_002N', 'H10_010N']), 'place')

Process state: Alabama
Process state: Alaska
Process state: Arizona
Process state: Arkansas
Process state: California
Process state: Colorado
Process state: Connecticut
Process state: Delaware
Process state: District of Columbia
Process state: Florida
Process state: Georgia
Process state: Hawaii
Process state: Idaho
Process state: Illinois
Process state: Indiana
Process state: Iowa
Process state: Kansas
Process state: Kentucky
Process state: Louisiana
Process state: Maine
Process state: Maryland
Process state: Massachusetts
Process state: Michigan
Process state: Minnesota
Process state: Mississippi
Process state: Missouri
Process state: Montana
Process state: Nebraska
Process state: Nevada
Process state: New Hampshire
Process state: New Jersey
Process state: New Mexico
Process state: New York
Process state: North Carolina
Process state: North Dakota
Process state: Ohio
Process state: Oklahoma
Process state: Oregon
Process state: Pennsylvania
Process state: Rhode Island
Process state: S