In [67]:
import os
import configparser
import tqdm
import warnings
import pandas as pd
import geopandas as gpd
from shapely import wkt
import numpy as np
from scipy.stats import chi2_contingency
warnings.filterwarnings('ignore')

In [68]:
BASE_DIR = os.getcwd()
CONFIG = configparser.ConfigParser()
CONFIG.read(os.path.join(BASE_DIR, 'script_config.ini'))

BASE_PATH = os.path.abspath(os.path.join(os.getcwd(), '..', 'data'))

DATA_RAW = os.path.join(BASE_PATH, 'raw')
DATA_RESULTS = os.path.join(BASE_PATH, '..', 'results')

In [69]:
data_path = os.path.join(DATA_RESULTS, 'processed', 'sex_cdc_pulmonary_data.csv')
df = pd.read_csv(data_path)
df.head(5)

Unnamed: 0,state,county_name,state_fips,res_countyfips,res_statefips,sex,mort_count,fileyear,filetype
0,Alabama,Autauga,1,1,AL,F,1,2005,US
1,Alabama,Autauga,1,1,AL,M,1,2005,US
2,Alabama,Autauga,1,1,AL,F,2,2006,US
3,Alabama,Autauga,1,1,AL,M,3,2006,US
4,Alabama,Autauga,1,1,AL,F,2,2007,US


Only select from the year 2012

In [70]:
df = df[df['fileyear'] >= 2004].copy()

Sum the mortality count for all the years.

## 1. Test for Sex Independence

In [71]:
df = (df.groupby(['state', 'county_name', 'state_fips', 
                  'res_countyfips', 'res_statefips', 'sex'], 
                 as_index = False).agg({'mort_count': 'sum'}))

We want to pivot our DataFrame so that the "sex" column becomes wide columns, and the "total_mortality" values appear under those columns.

In [72]:
df = (df.pivot_table(index = ['state', 'county_name', 
                              'state_fips', 'res_countyfips', 
                              'res_statefips'], columns = 'sex', 
                     values = 'mort_count', aggfunc = 'sum').reset_index())
df = df[['state', 'county_name', 'res_statefips', 'F', 'M']]

#### Perform Chi-Square

In [73]:
df = df[['county_name', 'F', 'M']]
cols_to_fill = ['F', 'M']  
df[cols_to_fill] = df[cols_to_fill].fillna(0)
df = df[(df["F"] != 0) & (df["M"] != 0)]
df.head(5)

sex,county_name,F,M
0,Autauga,20.0,13.0
1,Baldwin,71.0,42.0
2,Barbour,18.0,10.0
3,Bibb,9.0,16.0
4,Blount,22.0,16.0


In [74]:
def sex_chi_square(df, county_col = 'county_name', 
                   male_col = 'M', female_col = 'F',
                   fillna_with = 0, order = ('M','F')):
    """
    This is a function for performing chi-square test 
    for males and females mortality count data across 
    US counties.

    Parameters
    ----------
    df : dataframe
        Dataframe with one row per county and columns 
        for Male and Female counts
    county_col : string
        Column name with county names

    Returns
    -------
    details_df : dataframe
        DataFrame with rows like "County, Gender" and 
        columns O, E, and (O-E)^2/E.
    chi2_summary : dictionary
        Dictionary with chi2, p-value, dof
    """
    
    df2 = df[[county_col, male_col, female_col]].copy()

    # Ensure numeric and replace missing values
    for c in (male_col, female_col):
        
        df2[c] = pd.to_numeric(df2[c], errors = 
            "coerce").fillna(fillna_with).astype(float)

    # Build contingency table (rows = counties, cols = categories in 'order')
    contingency = df2[list(order)].to_numpy()  
    counties = df2[county_col].astype(str).tolist()

    # Run chi-square
    chi2, p, dof, expected = chi2_contingency(contingency)

    # Compute per-cell (O-E)^2 / E safely:
    O = contingency
    E = expected

    # compute term but avoid division by zero:
    term = np.empty_like(O, dtype=float)
 
    mask_E_zero = (E == 0)
    mask_both_zero = (E == 0) & (O == 0)
    mask_div_ok = ~mask_E_zero

    term[mask_both_zero] = 0.0
    term[mask_div_ok] = ((O - E)**2 / E)[mask_div_ok]
    term[mask_E_zero & ~mask_both_zero] = np.nan

    # Build long-format result table
    rows = []
    for i, county in enumerate(counties):
        
        for j, cat in enumerate(order):
            
            O_ij = O[i, j]
            E_ij = E[i, j]
            term_ij = term[i, j]

            if np.isnan(E_ij):
                
                formula_str = "E=NaN"
            else:
                
                diff = O_ij - E_ij
                formula_str = (f"({diff:.3f}^2)/{E_ij:.3f} = {term_ij:.6f}" 
                               if not np.isnan(term_ij) else 
                f"({diff:.3f}^2)/{E_ij:.3f} = NaN")
            rows.append({
                'county_name': f"{county}, {cat}",
                'observed': float(O_ij),
                'expected': float(E_ij) if not np.isnan(E_ij) else np.nan,
                'chi_value': float(term_ij) if not np.isnan(term_ij) else np.nan,
            })

    details_df = pd.DataFrame(rows, columns = ['county_name', 'observed', 
                                               'expected', 'chi_value'])

    chi2_summary = {"chi2": float(chi2), "p_value": float(p), "dof": int(dof)}
    
    folder_out = os.path.join(DATA_RESULTS, 'stat_test')
    filename = 'sex_chi_test.csv'
    path_out = os.path.join(folder_out, filename)
    details_df.to_csv(path_out, index = False)

    
    return details_df, chi2_summary

In [75]:
details, sex_summary = sex_chi_square(df, county_col="county_name",
                   male_col = 'M', female_col = 'F',
                   fillna_with = 0, order = ('M','F'))

## 2. Test for Age Independence

In [76]:
age_path = os.path.join(DATA_RESULTS, 'processed', 'age_cdc_pulmonary_data.csv')
df1 = pd.read_csv(age_path)

In [77]:
df1 = (df1.groupby(['state', 'county_name', 'state_fips', 
                  'res_countyfips', 'res_statefips', 'age_cat'], 
                 as_index = False).agg({'mort_count': 'sum'}))

In [78]:
df1['age_cat'].unique()

array(['30 - 49 years', '70 years or above', '10 - 29 years',
       ' 9 years or below'], dtype=object)

In [79]:
df1 = (df1.pivot_table(index = ['state', 'county_name', 
                              'state_fips', 'res_countyfips', 
                              'res_statefips'], columns = 'age_cat', 
                     values = 'mort_count', aggfunc = 'sum').reset_index())
df1 = df1[['state', 'county_name', 'res_statefips', '30 - 49 years', 
           '70 years or above', '10 - 29 years', ' 9 years or below']]

In [80]:
def age_chi_square(df, county_col = "county_name",
                     age_group_cols = ['30 - 49 years', 
                     '70 years or above', '10 - 29 years', 
                     ' 9 years or below'],
                     fillna_with = 0):
    
    df2 = df[[county_col] + age_group_cols].copy()

    for col in age_group_cols:
        
        df2[col] = pd.to_numeric(df2[col], errors = 
                "coerce").fillna(fillna_with).astype(float)

    contingency = df2[age_group_cols].to_numpy()
    counties = df2[county_col].astype(str).tolist()

    chi2, p, dof, expected = chi2_contingency(contingency)

    O = contingency
    E = expected

    chi_matrix = np.empty_like(O, dtype = float)

    mask_E0 = (E == 0)
    mask_both0 = (E == 0) & (O == 0)
    mask_ok = ~mask_E0

    chi_matrix[mask_both0] = 0.0
    chi_matrix[mask_ok] = ((O - E)**2 / E)[mask_ok]
    chi_matrix[mask_E0 & ~mask_both0] = np.nan

    rows = []
    for i, county in enumerate(counties):
        
        for j, age in enumerate(age_group_cols):
            
            rows.append({
                "county_name": f"{county}, {age}",
                "age_group": age,
                "observed": float(O[i, j]),
                "expected": float(E[i, j]) if not np.isnan(E[i, j]) else np.nan,
                "chi_value": float(chi_matrix[i, j]) if not np.isnan(chi_matrix[i, j]) else np.nan
            })

    details_df = pd.DataFrame(rows,
        columns=["county_name", "age_group", "observed", "expected", "chi_value"])

    chi2_summary = {"chi2": float(chi2), "p_value": float(p), "dof": int(dof)}
    
    folder_out = os.path.join(DATA_RESULTS, 'stat_test')
    filename = 'age_test.csv'
    path_out = os.path.join(folder_out, filename)
    details_df.to_csv(path_out, index = False)

    return details_df, chi2_summary

In [81]:
details, age_summary = age_chi_square(df1)

## 3. Test for Race Independence

In [82]:
race_path = os.path.join(DATA_RESULTS, 'processed', 'race_cdc_pulmonary_data.csv')
df2 = pd.read_csv(race_path)

In [83]:
df2 = (df2.groupby(['state', 'county_name', 'state_fips', 
                  'res_countyfips', 'res_statefips', 'race_recode3'], 
                 as_index = False).agg({'mort_count': 'sum'}))
df2 = (df2.pivot_table(index = ['state', 'county_name', 'state_fips', 
                                'res_countyfips', 'res_statefips'], 
                       columns = 'race_recode3', values = 'mort_count', 
                       aggfunc = 'sum').reset_index())

In [84]:
df2 = df2[['county_name', 'res_statefips', 'Black', 'White']]

In [85]:
def chi2_by_race(df, county_col = "county_name",
                 race_group_cols = ["Black", "White"],
                 fillna_with = 0):
    
    df2 = df[[county_col] + race_group_cols].copy()
    df2 = df2[df2[race_group_cols].sum(axis=1) > 0]

    for col in race_group_cols:
        
        df2[col] = pd.to_numeric(df2[col], errors = "coerce"
                    ).fillna(fillna_with).astype(float)

    contingency = df2[race_group_cols].to_numpy()
    counties = df2[county_col].astype(str).tolist()

    chi2, p, dof, expected = chi2_contingency(contingency)

    O = contingency
    E = expected

    chi_matrix = np.empty_like(O, dtype=float)

    mask_E0 = (E == 0)
    mask_both0 = (E == 0) & (O == 0)
    mask_ok = ~mask_E0

    chi_matrix[mask_both0] = 0.0
    chi_matrix[mask_ok] = ((O - E)**2 / E)[mask_ok]
    chi_matrix[mask_E0 & ~mask_both0] = np.nan

    rows = []
    for i, county in enumerate(counties):
        for j, race in enumerate(race_group_cols):
            rows.append({
                "county_name": f"{county}, {race}",
                "race_group": race,
                "observed": float(O[i, j]),
                "expected": float(E[i, j]) if not np.isnan(E[i, j]) else np.nan,
                "chi_value": float(chi_matrix[i, j]) if not np.isnan(chi_matrix[i, j]) else np.nan
            })

    details_df = pd.DataFrame(rows,
        columns = ["county_name", "race_group", "observed", "expected", "chi_value"])

    chi2_summary = {"chi2": float(chi2), "p_value": float(p), "dof": int(dof)}
    folder_out = os.path.join(DATA_RESULTS, 'stat_test')
    filename = 'race_test.csv'
    path_out = os.path.join(folder_out, filename)
    details_df.to_csv(path_out, index = False)

    return details_df, chi2_summary

In [86]:
details, race_summary = chi2_by_race(df2)

In [87]:
all_summaries = [{"det_variable": "gender", **sex_summary},
    {"det_variable": "age",    **age_summary},
    {"det_variable": "race",   **race_summary},]

df_results = pd.DataFrame(all_summaries)
folder_out = os.path.join(DATA_RESULTS, 'stat_test')
filename = 'all_chi_test.csv'
path_out = os.path.join(folder_out, filename)
df_results.to_csv(path_out, index = False)