In [148]:
import pandas as pd
from bs4 import BeautifulSoup
import requests
from utils import generate_filename
import time
from all_continuous_nhanes_schemas import table_schemas
import sqlfluff
import re

In [4]:
url = 'https://wwwn.cdc.gov/Nchs/Nhanes/2017-2018/AUX_J.htm'

r = requests.get(url)

soup = BeautifulSoup(r.text, 'lxml')


In [2]:
with open('bq_tables.txt', 'r') as f:
    tables = f.readlines()

tables = [t.strip() for t in tables if '---' not in t and t != 'nhanes_file_metadata']

In [6]:
# tables

In [8]:
# table_schema = table_schemas[table_name]
# table_schema
# column_type_dict = {d['name']:d['type'] for d in table_schema}

In [18]:
column_type_dict

{'SEQN': 'INTEGER',
 'WTSA2YR': 'FLOAT',
 'URXUNI': 'FLOAT',
 'URDUNILC': 'FLOAT',
 'WTSAPRP': 'FLOAT',
 'filename': 'STRING',
 'survey': 'STRING',
 'survey_type': 'STRING',
 'start_year': 'INTEGER',
 'end_year': 'INTEGER',
 'last_updated': 'TIMESTAMP',
 'published_date': 'DATE',
 'parquet_filename': 'STRING',
 'data_file_url': 'STRING',
 'doc_file_url': 'STRING',
 'dataset': 'STRING'}

In [155]:
# sources_yml_seed = """version: 2
# sources: 
#   - name: all_continuous
#     schema: nhanes
#     tables: 
# """

# with open("./dbt_generations/sources.yml",'w') as f: 
#             f.write(sources_yml_seed)

with open(f"../../dbt/models/all_continuous/bronze/schema.yml",'w') as f:
        f.write(f"""version: 2

models:
""")

for table_name in tables:
# table_name = 'alcohol_use_questionnaire'
    table_schema = table_schemas[table_name]
    column_type_dict = {d['name']:d['type'] for d in table_schema if d != 'nhanes_file_metadata'}

    try:
        data_df = pd.read_gbq(
            f"""SELECT DISTINCT doc_file_url, start_year, end_year
            FROM nhanes.{table_name}
            WHERE doc_file_url IS NOT NULL
            ORDER BY start_year DESC
            """,
            project_id="nhanes-genai",
            dialect="standard",
        )

    except Exception as ex:
        print(ex)
        print(table_name)

    
    variable_definitions = []
    component_description = ""
        
    for url in data_df['doc_file_url'].tolist():
        time.sleep(1)
        try:
            r = requests.get(url)
        except ConnectionError as e:
            print(f"{e} for {url}")
            continue
        soup = BeautifulSoup(r.text, 'lxml')

        if not soup.find('div',id='Codebook'):
            # url = data_df['doc_file_url'].tolist()[1]
            # time.sleep(.5)
            # r = requests.get(url)
            # soup = BeautifulSoup(r.text, 'lxml')
            # if not soup.find('div',id='Codebook'):
            print(f"Unable to find codebook for {table_name} in {url}")
            continue

        else:
            if not component_description:
                component_description = re.sub(r'[^A-Za-z0-9 ,.-]+', '',soup.find('div',id="Sections").find('p').text)
            for variable_section in soup.find('div',id='Codebook').find_all('div',{'class':'pagebreak'}):
                variable_definition = {}
                variable_name = variable_section.find('h3')['id'].upper()
                variable_definition['name'] = variable_name
                for item in list(zip(variable_section.find_all("dt"),variable_section.find_all("dd"))):
                    if 'Label' in item[0].text:
                        variable_definition['label'] = item[1].text.strip()
                        variable_definition['code'] = generate_filename(item[1].text.strip(),'')

                        if variable_definition['code'] in [definition.get('code',None) for definition in variable_definitions]:
                            variable_definition['code'] = variable_definition['code'] + "_" + variable_name

                        if variable_definition['code'][0].isdigit():
                            variable_definition['code'] = table_name.split('_')[-1][:3] + '_' + variable_definition['code']

                    if 'Text' in item[0].text or 'Instructions' in item[0].text:
                        variable_definition['description'] = re.sub(r'[^A-Za-z0-9 ,.-]+', '',item[1].text.strip())
                    targets = []
                    if 'Target' in item[0].text:
                        targets.append(" ".join(item[1].text.strip().split()))

                    variable_definition['targets'] = targets

                table = variable_section.find('table',{'class':'values'})

                if variable_name in column_type_dict.keys() and variable_name not in [n['name'] for n in variable_definitions]:
                    if table:
                        headers = []
                        for i in table.find_all('th'):
                            title = i.text.strip()
                            headers.append(title)

                        data = []

                        for j in table.find_all('tr')[1:]:
                            row_data = j.find_all('td')
                            row = [i.text.strip() for i in row_data]
                            data.append(row)

                        table_df = pd.DataFrame(columns = headers,data=data)

                        accepted_values = table_df['Code or Value'].tolist()
                        accepted_codes = table_df['Value Description'].tolist()

                        if '.' in accepted_values and 'Range of Values' not in accepted_codes:
                            data_type = 'integer_codes'
                            variable_definition['data_type'] = 'integer_codes'
                        elif '.' in accepted_values and 'Range of Values' in accepted_codes and len(table_df) <= 3:
                            if all([d.isdigit() or d == '.' for d in accepted_values]):
                                data_type = 'integer_codes'
                                variable_definition['data_type'] = 'integer_codes'
                            else:
                                data_type = 'float'
                                variable_definition['data_type'] = 'float'
                        elif '.' in accepted_values and 'Range of Values' in accepted_codes and len(table_df) > 3:
                            data_type = 'float_remove_missing'
                            variable_definition['data_type'] = 'float'
                        elif any(['blank' in c for c in table_df['Code or Value'].tolist()]):
                            data_type = 'string'
                            variable_definition['data_type'] = 'string'
                        else:
                            data_type = 'unknown'
                            variable_definition['data_type'] = 'unknown'

                        sql_case_statement = """CASE
"""
                        for code, value, cnt, cumul, skip in table_df.values:
                            value = value.replace("'","")
                            if code != '.' and 'blank' not in code and 'No Lab Specimen' not in value:
                                if data_type == 'integer_codes':
                                    if column_type_dict[variable_name] in ('FLOAT'):
                                        sql_case_statement += f"WHEN SAFE_CAST(ROUND(SAFE_CAST({variable_name} AS FLOAT64),0) AS INT64) = SAFE_CAST(ROUND(SAFE_CAST({code} AS FLOAT64),0) AS INT64) THEN '{value}' -- categorize numeric values\n"
                                        output_data_type = 'STRING'
                                    elif column_type_dict[variable_name] in ('INTEGER'):
                                        sql_case_statement += f"WHEN SAFE_CAST(ROUND(SAFE_CAST({variable_name} AS FLOAT64),0) AS INT64) = SAFE_CAST({code} AS INT64) THEN '{value}' -- categorize numeric values\n"
                                        output_data_type = 'STRING'
                                    else: ## DATA TYPE IS STRING
                                        if "'" in code:
                                            sql_case_statement += "WHEN REPLACE(REPLACE({},'.0',''),\"\'\",\"\") = '{}' THEN '{}' -- categorize string values \n".format(variable_name,code.replace("'",""),value)
                                            output_data_type = 'STRING'
                                        else:
                                            sql_case_statement += f"WHEN SAFE_CAST(SAFE_CAST(ROUND(SAFE_CAST({variable_name} AS FLOAT64),0) AS INT64) AS STRING) = '{code}' THEN '{value}' -- categorize string values \n"
                                            output_data_type = 'STRING'
                                elif data_type == 'float':
                                    if column_type_dict[variable_name] not in ('INTEGER','FLOAT'):
                                        sql_case_statement += f"WHEN {variable_name} IS NOT NULL THEN SAFE_CAST({variable_name} AS FLOAT64) -- correct wrong data types for numerical data \n"
                                        output_data_type = 'FLOAT64'
                                    else:
                                        pass
                                elif data_type == 'float_remove_missing':
                                    if column_type_dict[variable_name] not in ('INTEGER','FLOAT') and 'Range of Values' in value:
                                        max_value = code.split(' to ')[-1]
                                        max_value = float(max_value)
                                        sql_case_statement += f"WHEN SAFE_CAST({variable_name} AS FLOAT64) > {max_value + 11} THEN NULL -- remove missing, dont know, categories in float field  \n"
                                        output_data_type = 'FLOAT64'
                                    else:
                                        pass
                                else:
                                    if column_type_dict[variable_name] in ('STRING'):
                                        if "'" in code:
                                            sql_case_statement += "WHEN REPLACE(REPLACE({},'.0',''),\"\'\",\"\") = '{}' THEN '{}' -- categorize string values \n".format(variable_name,code.replace("'",""),value)
                                            output_data_type = 'STRING'
                                        else:
                                            sql_case_statement += f"WHEN REPLACE({variable_name},'.0','') = '{code}' THEN '{value}' -- categorize string values \n"
                                            output_data_type = 'STRING'
                                    else:
                                        sql_case_statement += f"WHEN SAFE_CAST({variable_name} AS FLOAT64) = SAFE_CAST({code} AS FLOAT64) THEN '{value}' \n"
                                        output_data_type = 'STRING'
                            elif 'No Lab Specimen' in value:
                                sql_case_statement += f"WHEN {variable_name} = {code} THEN NULL --remove no lab specimen samples from data \n"
                            else:
                                sql_case_statement += f"WHEN {variable_name} IS NULL THEN NULL \n"

                        sql_case_statement += f'ELSE SAFE_CAST({variable_name} AS {output_data_type}) \n'

                        variable_definition['sql'] = sql_case_statement + f" END as {variable_definition['code']}, \n"
                    else:
                        if variable_definition.get('code',None):
                            variable_definition['sql'] = f"{variable_name} as {variable_definition['code']}, -- could not identify transformation logic \n"
                        else:
                            variable_definition['sql'] = f"{variable_name} as {variable_name}, -- could not automatically decode name of variable or transformation logic \n"
                else:
                    #variable_definition['sql'] = f"-- {variable_name} as {variable_name}, -- not included in table but included in docs without transformation logic \n"
                    # print(f"Passing on {variable_name}")
                    pass
                
                if variable_definition:
                    variable_definitions.append(variable_definition)

    stg_alias = f"stg_{table_name}"
    sql = """SELECT
"""

    for d in variable_definitions:
        if d.get('sql',''):
            sql += d.get('sql','') + '\n'
    
    sql += """start_year,
end_year,
last_updated,
published_date,
parquet_filename,
data_file_url,
doc_file_url,
dataset,
"""

    sql += """ FROM {{{{ ref('{}') }}}}

/* 
Docs utilized to generate this SQL can be found at:
{}
*/
""".format(stg_alias,"\n".join(data_df['doc_file_url'].tolist()))

    with open(f"../../dbt/models/all_continuous/bronze/schema.yml",'a') as f:
        f.write(f"""
  - name: {table_name}
    description: {component_description}
    columns:
""")
        for d in variable_definitions:
            if d.get('code',None) and d.get('description',None):
                f.write(f"""
      - name: {d['code']}
        description: {d['description']}
""")
            

    with open(f"../../dbt/models/all_continuous/staging/{stg_alias}.sql",'w') as f:
        f.write(f"SELECT * FROM nhanes.{table_name}")

    with open(f"../../dbt/models/all_continuous/bronze/{table_name}.sql",'w') as f:
        f.write(sql)

    print(f"Finished SQL generation for {table_name}")
    # else:
    #     print(f"No codebook available for {table_name}")

Finished SQL generation for acculturation_questionnaire
Finished SQL generation for albumin_creatinine_urine_laboratory
Finished SQL generation for alcohol_use_questionnaire
Finished SQL generation for alpha_1_acid_glycoprotein_serum_surplus_laboratory
Finished SQL generation for arsenic_total_urine_laboratory
Finished SQL generation for arsenics_speciated_urine_laboratory
Finished SQL generation for audiometry_acoustic_reflex_examination
Finished SQL generation for audiometry_examination
Finished SQL generation for audiometry_questionnaire
Finished SQL generation for audiometry_wideband_reflectance_examination
Finished SQL generation for blood_pressure_cholesterol_questionnaire
Finished SQL generation for body_measures_examination
Finished SQL generation for cardiovascular_health_questionnaire
Finished SQL generation for cholesterol_high_density_lipoprotein_hdl_laboratory
Finished SQL generation for cholesterol_low_density_lipoproteins_ldl_triglycerides_laboratory
Finished SQL generat

In [None]:
# url

In [None]:
# url

In [115]:
var_df = pd.DataFrame(variable_definitions)

In [116]:
var_df

Unnamed: 0,name,targets,label,code,description,sql,data_type
0,SEQN,[Both males and females 3 YEARS - 150 YEARS],Respondent sequence number,respondent_sequence_number,Respondent sequence number.,"SEQN as respondent_sequence_number, -- could n...",
1,URXUMA,[Both males and females 3 YEARS - 150 YEARS],"Albumin, urine (ug/mL)",albumin_urine_ug_ml,"Albumin, urine (ug/mL)",CASE\n WHEN URXUMA IS NULL THEN NULL \n...,float
2,URXUMS,[Both males and females 3 YEARS - 150 YEARS],"Albumin, urine (mg/L)",albumin_urine_mg_l,"Albumin, urine (mg/L)",CASE\n WHEN URXUMS IS NOT NULL THEN SAF...,float
3,URDUMALC,[Both males and females 3 YEARS - 150 YEARS],"Albumin, urine comment code",albumin_urine_comment_code,"Albumin, urine comment code",CASE\n WHEN SAFE_CAST(SAFE_CAST(REPLACE...,float_category
4,URXUCR,[Both males and females 3 YEARS - 150 YEARS],"Creatinine, urine (mg/dL)",creatinine_urine_mg_dl,"Creatinine, urine (mg/dL)",CASE\n WHEN URXUCR IS NULL THEN NULL \n...,float
5,URXCRS,[Both males and females 3 YEARS - 150 YEARS],"Creatinine, urine (umol/L)",creatinine_urine_umol_l,"Creatinine, urine (umol/L)",CASE\n WHEN URXCRS IS NOT NULL THEN SAF...,float
6,URDUCRLC,[Both males and females 3 YEARS - 150 YEARS],"Creatinine, urine comment code",creatinine_urine_comment_code,"Creatinine, urine comment code",CASE\n WHEN SAFE_CAST(SAFE_CAST(REPLACE...,float_category
7,URDACT,[Both males and females 3 YEARS - 150 YEARS],Albumin creatinine ratio (mg/g),albumin_creatinine_ratio_mg_g,Albumin creatinine ratio (mg/g),CASE\n WHEN URDACT IS NOT NULL THEN SAF...,float


In [30]:
[definition['code'] for definition in variable_definitions]

['respondent_sequence_number',
 'status_of_an_audio_exam',
 'comment_code_for_an_audio_exam',
 'have_ear_tube_right_or_left_ear',
 'had_cold_runny_nose_earache_today',
 'had_cold_runny_nose_earache_yesterday',
 'had_cold',
 'had_runny_nose',
 'earache_right_ear',
 'earache_left_ear',
 'earache_both_ears',
 'listened_to_loud_sound_today',
 'listened_to_loud_sound_yesterday',
 'hours_since_loud_sound_ended',
 'listened_to_music_with_earphones_today',
 'listened_to_music_w_earphones_yesterday',
 'hours_since_stopped_listening',
 'self_reported_better_ear',
 'have_ear_tube_right_or_left_ear',
 'had_cold_sinus_or_earache_last_24_hrs',
 'had_cold_last_24_hrs',
 'had_sinus_problem_last_24_hours',
 'earache_last_24_hours_right',
 'earache_last_24_hours_left',
 'earache_last_24_hours_both',
 'exposed_to_loud_noise_last_24_hrs',
 'hours_since_noise_ended',
 'listened_to_music_w_earphone_last_24_hrs',
 'hours_since_music_ended',
 'self_reported_better_ear',
 'normal_otoscopy_left_ear',
 'excessiv

In [24]:
# soup.find('div',id='Codebook')

In [80]:

    
print(sql)

 SELECT
SEQN as respondent_sequence_number,
CASE
WHEN AUAEXSTS = 1 THEN 'Complete' 
WHEN AUAEXSTS = 2 THEN 'Partial' 
WHEN AUAEXSTS = 3 THEN 'Not done' 
WHEN AUAEXSTS IS NULL THEN NULL 
ELSE NULL 
 END as status_of_an_audio_exam,
CASE
WHEN AUAEXCMT = 2 THEN 'SP refusal' 
WHEN AUAEXCMT = 3 THEN 'No time' 
WHEN AUAEXCMT = 4 THEN 'Physical limitation' 
WHEN AUAEXCMT = 5 THEN 'Communication problem' 
WHEN AUAEXCMT = 6 THEN 'Equipment failure' 
WHEN AUAEXCMT = 7 THEN 'SP ill/emergency' 
WHEN AUAEXCMT = 14 THEN 'Interrupted' 
WHEN AUAEXCMT = 51 THEN 'SP unable to comply' 
WHEN AUAEXCMT = 56 THEN 'Came late/left early' 
WHEN AUAEXCMT = 72 THEN 'Error (technician/software/supply)' 
WHEN AUAEXCMT = 99 THEN 'Other, specify' 
WHEN AUAEXCMT = 122 THEN 'Language barrier' 
WHEN AUAEXCMT = 150 THEN 'SP with parent SP' 
WHEN AUAEXCMT IS NULL THEN NULL 
ELSE NULL 
 END as comment_code_for_an_audio_exam,
CASE
WHEN AUQ011 = 1 THEN 'Yes, right ear' 
WHEN AUQ011 = 2 THEN 'Yes, left ear' 
WHEN AUQ011 = 3 TH