In [1]:
from splinter import Browser
from bs4 import BeautifulSoup as soup
from webdriver_manager.chrome import ChromeDriverManager
from collections import namedtuple
from sqlalchemy import create_engine

In [2]:
# Set up Splinter
executable_path = {'executable_path': ChromeDriverManager().install()}
browser = Browser('chrome', **executable_path, headless=False)



Current google-chrome version is 91.0.4472
Get LATEST driver version for 91.0.4472
Driver [C:\Users\elizk\.wdm\drivers\chromedriver\win32\91.0.4472.101\chromedriver.exe] found in cache


In [3]:
# Visit coding scheme page
url = 'https://www.cdc.gov/brfss/annual_data/2019/pdf/codebook19_llcp-v2-508.HTML'
browser.visit(url)

In [4]:
# Parse the HTML
html = browser.html
html_soup = soup(html, 'html.parser')

In [11]:


RowInfo = namedtuple('RowInfo', ['value', 'value_label'])
TableInfo = namedtuple('TableInfo', ['header_info', 'body_info'])
HeaderInfo = namedtuple('HeaderInfo', ['label', 'name', 'question'])

def get_value(header_line):
    index = header_line.find(':')
    label = header_line[index + 1:].replace(u'\xa0', u' ').strip()
    return label

def get_header_info(thead):
    tr = thead.find('tr')
    td = tr.find('td')
    var_label = get_value(td.contents[0])
    sas_var_names = get_value(td.contents[12])
    var_text = get_value(td.contents[16])
    return HeaderInfo(var_label, sas_var_names, var_text)
    
#Scrape SAS variable names, question numbers and question text

tables_uncut =  html_soup.find_all('table', class_='table')

table_list = []
for tables in tables_uncut[1:]:
    table_body = tables.find('tbody')
    table_header = tables.find('thead')
    header_info = get_header_info(table_header)
    rows = []
    for tr in table_body.find_all('tr'):
        row = tr.find_all('td')
        value = row[0].get_text(separator = ' ')
        value_label = row[1].get_text(separator = ' ')
        rows.append(RowInfo(value, value_label)) 
    table_list.append(TableInfo(header_info, rows))
    
  

In [14]:
table_list[22]

TableInfo(header_info=HeaderInfo(label='Correct Phone Number?', name='CTELNUM1', question='Is this     (phone number)     ?'), body_info=[RowInfo(value='1', value_label='Yes—Go to CP.03, CELLFON5'), RowInfo(value='BLANK', value_label='Not asked or Missing Notes: QSTVER < = 20')])

In [17]:

from getpass import getpass
password = getpass('Enter database password')
db = create_engine(f'postgresql://postgres:{password}@localhost:5432/BRFSSAnalysis')

Enter database password········


In [35]:
db.execute("DROP TABLE user_answers, question_values, question_info")
db.execute("""
CREATE TABLE question_info (
    id SERIAL,
    var_name VARCHAR(8) NOT NULL,
    label TEXT NOT NULL,
    text TEXT NOT NULL,
    PRIMARY KEY (id),
	UNIQUE (var_name)
);

CREATE TABLE user_answers (
    id SERIAL,
    user_id INT NOT NULL,
    question_id INT NOT NULL,
    val INT NOT NULL,
	FOREIGN KEY (question_id) REFERENCES question_info (id),
    PRIMARY KEY (id),
	UNIQUE (user_id, question_id)
);

CREATE TABLE question_values (
    id SERIAL,
    question_id INT  NOT NULL,
    label TEXT  NOT NULL,
    value NUMERIC,
    value_end NUMERIC, -- if NULL, not relevant
    FOREIGN KEY (question_id) REFERENCES question_info (id),
    PRIMARY KEY (id),
    UNIQUE (question_id, value)
);
            """)

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x2d378c56108>

In [37]:

QUESTION_VALUE_HIDDEN = 'HIDDEN'
QUESTION_VALUE_BLANK = 'BLANK'


QuestionValueEntry = namedtuple('QuestionValueEntry', ['question_id', 'label', 'value', 'value_end'])
def generate_question_values_for_insert(question_id, row_info):
    if row_info.value.find('-') != -1:
        # range of values
        range_vals = [v.strip() for v in row_info.value.split('-')]
        assert len(range_vals) == 2
        start = int(range_vals[0])
        end = int(range_vals[1])

        return QuestionValueEntry(question_id, row_info.value_label, start, end)

    elif row_info.value == QUESTION_VALUE_HIDDEN or row_info.value == QUESTION_VALUE_BLANK:
        # There is no answer to look at, correspond to NULL
        return QuestionValueEntry(question_id, row_info.value_label, None, None)

    else:
        return QuestionValueEntry(question_id, row_info.value_label, row_info.value, None)

db.execute("""TRUNCATE user_answers, question_values, question_info""")

for table in table_list:
    # Insert question
    result = db.execute("""INSERT INTO question_info(var_name, label, text)
            VALUES (%s, %s, %s) RETURNING id""", (table.header_info.name, table.header_info.label, table.header_info.question))
    question_id = result.fetchone()[0]

    # Insert possible question values
    prev_label = None
    for row_info in table.body_info:
        entry = generate_question_values_for_insert(question_id, row_info)
        db.execute("""INSERT INTO question_values(question_id, label, value, value_end) VALUES (%s, %s, %s, %s)""", (entry.question_id, entry.label, entry.value, entry.value_end))