In [6]:
import polars as pl
import re
from zipfile import ZipFile
import requests
from io import BytesIO
import pyarrow as pa
import pyarrow.parquet as pq
from datetime import datetime

In [4]:
# Creating a function to generate urls for the zip files containing the data
def generate_urls():
    zip_file_names = [
        "dialysis_facilities_07_2024.zip", "dialysis_facilities_10_2023.zip",
        "dialysis_facilities_11_2022.zip", "dialysis_facilities_04_2021.zip",
        "dialysis_facilities_archive_10_2020.zip", "dfc_Revised_FlatFiles_archive_10_2019.zip",
        "dfc_Revised_Flatfiles_archive_10_2018.zip", "dfc_Revised_Flatfiles_archive_10_2017.zip"
    ]
    url_path = "https://data.cms.gov/provider-data/sites/default/files/archive/Dialysis%20facilities/"
    urls = []

    for zip_file_name in zip_file_names:
        match = re.search(r'_(\d{2})_(\d{4})\.zip$', zip_file_name)
        if match:
            month, year = match.groups()
            urls.append(f"{url_path}{year}/{zip_file_name}")
        else:
            print(f"Warning: Could not extract date from {zip_file_name}")
    
    return urls

def clean_column_name(name):
    # Remove special characters, convert to lowercase, replace spaces with underscores
    cleaned = re.sub(r'[^a-zA-Z0-9\s]', '', name)
    cleaned = cleaned.lower().replace(' ', '_')
    return cleaned

def standardize_column_names(df):
    column_mapping = {
        "provider": "provider_number",
        "provider_number": "provider_number",
        "cms_certification_number_ccn": "provider_number"
    }
    
    new_columns = {}
    for col in df.columns:
        cleaned_col = clean_column_name(col)
        if cleaned_col in column_mapping:
            new_columns[col] = column_mapping[cleaned_col]
        else:
            new_columns[col] = cleaned_col
    
    return df.rename(new_columns)
def read_csv_from_zip(zip_file, csv_name):
    try:
        df = pl.read_csv(zip_file.read(csv_name), infer_schema_length=10000)
        return standardize_column_names(df)
    except KeyError:
        print(f"Warning: {csv_name} not found in zip file")
        return None
    except Exception as e:
        print(f"Error reading {csv_name}: {e}")
        return None

def read_dialysis_data(url):
    try:
        response = requests.get(url)
        response.raise_for_status()
        
        zip_file = ZipFile(BytesIO(response.content))
        
        year = re.search(r'/(\d{4})/', url).group(1)
        month = re.search(r'_(\d{2})_\d{4}\.zip$', url).group(1)
        
        facility_files = [
            "DFC_SOCRATA_FAC_DATA.csv",
            "DFC_FACILITY.csv",
            "23ew-n7w9.csv"  # facility data from 2021
        ]
        
        cahps_files = [
            "ICH_CAHPS_FACILITY.csv",
            "DFC_SOCRATA_ICH CAHPS_FAC_DATA.csv",
            "DFC_SOCRATA_ICH_CAHPS_FAC_DATA.csv",
            "59mq-zhts.csv"  # CAHPS data from 2021
        ]
        
        facility_df = None
        cahps_df = None
        
        for csv_file in facility_files:
            df = read_csv_from_zip(zip_file, csv_file)
            if df is not None:
                df = df.with_columns(
                    pl.lit(year).alias('year'),
                    pl.lit(month).alias('month')
                )
                if facility_df is None:
                    facility_df = df
                else:
                    facility_df = pl.concat([facility_df, df], how="diagonal")
        
        for csv_file in cahps_files:
            df = read_csv_from_zip(zip_file, csv_file)
            if df is not None:
                df = df.with_columns(
                    pl.lit(year).alias('year'),
                    pl.lit(month).alias('month')
                )
                if cahps_df is None:
                    cahps_df = df
                else:
                    cahps_df = pl.concat([cahps_df, df], how="diagonal")
        
        zip_file.close()
        return facility_df, cahps_df
    
    except requests.RequestException as e:
        print(f"Error downloading {url}: {e}")
    except Exception as e:
        print(f"Error processing {url}: {e}")
    
    return None, None

def cast_columns_to_string(df):
    return df.select([
        pl.col(col).cast(pl.Utf8).alias(col) for col in df.columns
    ])

def load_all_dialysis_data():
    urls = generate_urls()
    all_facility_data = []
    all_cahps_data = []
    
    for url in urls:
        print(f"Processing {url}")
        facility_df, cahps_df = read_dialysis_data(url)
        if facility_df is not None:
            all_facility_data.append(cast_columns_to_string(facility_df))
        if cahps_df is not None:
            all_cahps_data.append(cast_columns_to_string(cahps_df))
    
    combined_facility_df = pl.concat(all_facility_data, how="diagonal")
    combined_cahps_df = pl.concat(all_cahps_data, how="diagonal")
    
    return combined_facility_df, combined_cahps_df

combined_facility_df, combined_cahps_df = load_all_dialysis_data()

print("\nCombined Facility Data:")
print(f"Shape: {combined_facility_df.shape}")
print("Columns:", combined_facility_df.columns)

print("\nCombined CAHPS Data:")
print(f"Shape: {combined_cahps_df.shape}")
print("Columns:", combined_cahps_df.columns)

print("\nFirst few rows of Combined Facility Data:")
print(combined_facility_df.head())

print("\nFirst few rows of Combined CAHPS Data:")
print(combined_cahps_df.head())

Processing https://data.cms.gov/provider-data/sites/default/files/archive/Dialysis%20facilities/2024/dialysis_facilities_07_2024.zip
Processing https://data.cms.gov/provider-data/sites/default/files/archive/Dialysis%20facilities/2023/dialysis_facilities_10_2023.zip
Processing https://data.cms.gov/provider-data/sites/default/files/archive/Dialysis%20facilities/2022/dialysis_facilities_11_2022.zip
Processing https://data.cms.gov/provider-data/sites/default/files/archive/Dialysis%20facilities/2021/dialysis_facilities_04_2021.zip
Processing https://data.cms.gov/provider-data/sites/default/files/archive/Dialysis%20facilities/2020/dialysis_facilities_archive_10_2020.zip
Processing https://data.cms.gov/provider-data/sites/default/files/archive/Dialysis%20facilities/2019/dfc_Revised_FlatFiles_archive_10_2019.zip
Processing https://data.cms.gov/provider-data/sites/default/files/archive/Dialysis%20facilities/2018/dfc_Revised_Flatfiles_archive_10_2018.zip
Processing https://data.cms.gov/provider-

In [7]:
# Check years in combined_facility_df
facility_years = combined_facility_df['year'].unique().sort()
print("Years represented in combined_facility_df:")
print(facility_years)

# Check years in combined_cahps_df
cahps_years = combined_cahps_df['year'].unique().sort()
print("\nYears represented in combined_cahps_df:")
print(cahps_years)

Years represented in combined_facility_df:
shape: (8,)
Series: 'year' [str]
[
	"2017"
	"2018"
	"2019"
	"2020"
	"2021"
	"2022"
	"2023"
	"2024"
]

Years represented in combined_cahps_df:
shape: (8,)
Series: 'year' [str]
[
	"2017"
	"2018"
	"2019"
	"2020"
	"2021"
	"2022"
	"2023"
	"2024"
]


In [8]:
# Generating a timestamp for the files
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

# Defining output paths without timestamp to replace the prior version
cms_output_path = '../../../003_data/001_raw-data/2017-2024_national_cms_dialysis-facility_data.parquet'
cahps_output_path = '../../../003_data/001_raw-data/2017-2024_national_cms_dialysis-facility_cahps-data.parquet'

# Function to save Polars DataFrame as Parquet with metadata
def save_polars_parquet_with_metadata(df, output_path, description):
    try:
        # Convert Polars DataFrame to Arrow Table
        arrow_table = df.to_arrow()

        # Get existing metadata
        metadata = arrow_table.schema.metadata if arrow_table.schema.metadata else {}

        # Update metadata
        metadata.update({
            b'created_at': str(datetime.now()).encode('utf-8'),
            b'description': description.encode('utf-8'),
            b'version': b'1.0',
            b'cleaning_steps': b'''
                1. Generated URLs for CMS dialysis facility data zip files from 2017 to 2024.
                2. For each URL:
                   a. Downloaded and extracted CSV files from zip archives.
                   b. Standardized column names, including mapping provider column variations to 'provider_number'.
                   c. Added 'year' and 'month' columns based on the data source.
                   d. Concatenated facility and CAHPS data separately.
                3. Cast all columns to string type for consistency.
                4. Combined data from all years into single facility and CAHPS datasets.
            '''
        })

        # Creating a new Arrow Table with updated metadata
        updated_table = arrow_table.replace_schema_metadata(metadata)

        # Writing the updated table to a Parquet file with Snappy compression to reduce file size
        pq.write_table(updated_table, output_path, compression='snappy')

        print(f"Data saved to {output_path}")
    except Exception as e:
        print(f"Error saving data: {e}")
        raise

# Saving the merged dataframes as parquet files
save_polars_parquet_with_metadata(combined_facility_df, cms_output_path, "Merged CMS dialysis facility data")
save_polars_parquet_with_metadata(combined_cahps_df, cahps_output_path, "Merged CMS dialysis facility CAHPS data")

Data saved to ../../../003_data/001_raw-data/2017-2024_national_cms_dialysis-facility_data.parquet
Data saved to ../../../003_data/001_raw-data/2017-2024_national_cms_dialysis-facility_cahps-data.parquet
