In [1]:
from sqlalchemy import create_engine
from zipfile import ZipFile
from io import StringIO
import pandas as pd
import requests
import os

Read the database connection details from the repository's `secrets` and create the `engine` that will be used to connect to the database.

In [2]:
host = os.getenv("DATABASE_HOST")
user = os.getenv("DATABASE_USERNAME")
passwd = os.getenv("DATABASE_PASSWORD")
db = os.getenv("DATABASE")

engine = create_engine(
    f'mysql+mysqlconnector://{user}:{passwd}@{host}/{db}',
    echo=False,
    connect_args={'ssl_ca': '/etc/ssl/certs/ca-certificates.crt'}
    )

Download the zip file that contains the data from the CDC's website to the `/data/` directory.

In [3]:
if not os.path.exists('../data/LLCP2019ASC.zip'):
    url = 'https://www.cdc.gov/brfss/annual_data/2019/files/LLCP2019ASC.zip'
    response = requests.get(url, stream=True)
    with open('../data/LLCP2019ASC.zip', 'wb') as f:
        for chunk in response.iter_content(chunk_size=512):
            if chunk: 
                f.write(chunk)
    # Delete the `response` variable to avoid memory issues.
    del response

Read the zip file and extract each line of the file into a list of strings to later convert it to a dataframe.

In [4]:
data_rows = []
asc_zip = ZipFile('../data/LLCP2019ASC.zip', 'r')
file_in_zip = asc_zip.namelist()[0]
for line in asc_zip.open(file_in_zip).readlines():
    data_rows.append(line.decode('utf-8'))
asc_zip.close()

Delete the `asc_zip` and `file_in_zip` variables to avoid memory issues.

In [5]:
del asc_zip
del file_in_zip

Convert the list of strings into a dataframe and show the head and tail of the dataframe to understand its structure.

In [6]:
data = pd.DataFrame(data_rows,columns=['raw'])
data

Unnamed: 0,raw
0,01 0101182019 11002019000001 ...
1,01 0101132019 11002019000002 ...
2,01 0101182019 11002019000003 ...
3,01 0101182019 12002019000004 ...
4,01 0101042019 11002019000005 ...
...,...
418263,72 0903152020 11002019006029 ...
418264,72 0903082020 11002019006030 ...
418265,72 0903102020 11002019006031 ...
418266,72 0903062020 11002019006032 ...


Once we've seen how the data would be structured in a dataframe, delete the `data` variable to avoid memory issues.

In [7]:
del(data)

#### Retreive the metadata from the MySQL database

Query the MySQL database to retrieve the layout table, which describes the structure of the columns. 
The layout will be used to perform an action similar to the MS Excel action "Text to Columns", the raw data (strings) in the dataframe will be split into multiple columns.

_Note:_ An `if` condition is used to check if the query has already been executed and saved as a `.json` file, if so, the json file will be read instead. This is done to avoid exhausting the amount of free queries per month provided by the MySQL database provider. 

In [8]:
if not os.path.exists('../data/layout.json'):
    df = pd.io.sql.read_sql('SELECT * FROM layout', engine,index_col='Code')
    df.to_json('../data/layout.json',orient='table')

layout = pd.read_json('../data/layout.json', orient='table')
layout.head()

Unnamed: 0_level_0,Start,Length
Code,Unnamed: 1_level_1,Unnamed: 2_level_1
_STATE,1,2
FMONTH,17,2
IDATE,19,8
IMONTH,19,2
IDAY,21,2


Query the MySQL database to retrieve the `catalog` table. The `Value` column in this table can be used to find which columns contain `float` values. This information will be used to convert the columns to the correct data type.

In [9]:
if not os.path.exists('../data/catalog.json'):
    df = pd.io.sql.read_sql('SELECT * FROM catalog', engine,index_col=['Code','Value'])
    df.to_json('../data/catalog.json',orient='table')

catalog = pd.read_json('../data/catalog.json', orient='table')
catalog.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,Value Label,Frequency,Percentage,Weighted Percentage
Code,Value,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
_STATE,1,Alabama,7052,1.69,1.51
_STATE,2,Alaska,2977,0.71,0.22
_STATE,4,Arizona,8941,2.14,2.24
_STATE,5,Arkansas,5359,1.28,0.92
_STATE,6,California,11613,2.78,12.22


Make a list of columns that contain `float` values.

In [10]:
float_cols = []
for index, row in catalog.iterrows():
    if ' decimal ' in str(row['Value Label']).lower():
        if index not in float_cols: 
            float_cols.append(index[0])

del catalog
print(float_cols)

['_STSTR', '_STRWT', '_RAWRAKE', '_WT2RAKE', '_CLLCPWT', '_LLCPWT2', 'HTM4', 'WTKG3', '_BMI5', '_BMI5CAT', '_RFBMI5', 'METVL11_', 'METVL21_', 'MAXVO21_', 'FC601_', 'PAFREQ1_', 'PAFREQ2_', 'STRFREQ_', 'FTJUDA2_', 'FRUTDA2_', 'GRENDA1_', 'FRNCHDA_', 'VEGEDA2_', '_FRUTSU1', '_VEGESU1']


#### Build the dataframe from the list of rows

Extract the `colspecs` from the `layout` table. It will be used to split the strings in the dataframe into multiple columns.

In [11]:
colspecs = []
for index, row in layout.iterrows():
    colspecs.append((row.Start-1, row.Length+row.Start-1))
names = layout.index.get_level_values(0).tolist()

Split the raw data into multiple columns.

In [12]:
size = 10000
chunks = []

for pos in range(0, len(data_rows), size):
    rows = data_rows[pos:pos+size]
    if not rows:
        break

    file = StringIO(''.join(rows))
    file.seek(0)
    chunk = pd.read_fwf(file, names=names, colspecs=colspecs, header=None)
    chunk.fillna(pd.NA, inplace=True)
    chunks.append(chunk)

del data_rows
data = pd.concat(chunks, ignore_index=True)
data

: 

In [None]:
for col in data.columns:
    chunks = []
    for pos in range(0, len(data[col]), size):
        rows = data[col][pos:pos+size]
        if len(rows) == 0:
            break
        
        if col in float_cols:
            rows = rows.astype('float64')
        else:
            rows = rows.astype('Int64')

        chunks.append(rows)
    
    data[col] = pd.concat(chunks, ignore_index=True)

#### Upload the processed data to the database

Use the list of codes thacan be passed as an index dataframe resulting from splitting the raw data into different named columns into the MySQL database.