In [1]:
#### Import Necessary Libraries

In [2]:
import requests
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from pyspark.sql import SparkSession
import re
import pandas

In [3]:
#### Set Global Variables

In [4]:
cms_metastore_api_url = 'https://data.cms.gov/provider-data/api/1/metastore/schemas/dataset/items'
current_datetime = datetime.now()
formatted_date = current_datetime.strftime("%Y_%m_%d_%H_%M_%S")
fs_staging_path = f"staging/{formatted_date}"
fs_clean_path = f"clean/{formatted_date}"
enable_logs = True
spark = SparkSession.builder.master("local[1]") \
          .appName("CSV_Cleaner") \
          .getOrCreate()

In [5]:
#### Set Global Functions

In [6]:
def log_message(s):
    if enable_logs == True:
        print(s)

def get_json(url):
    response = requests.get(url)
    
    if response.status_code == 200:
        data = response.json()
        return data
    else:
        print(f"Failed to retrieve data: {response.status_code}")
        
def download_csv_from_url(url):
    file_name = url.split('/')[-1]
    fs_write_path = f"{fs_staging_path}/{file_name}"
    response = requests.get(url)

    if response.status_code == 200:
        os.makedirs(fs_staging_path, exist_ok=True)
            
        with open(fs_write_path, "wb+") as file:
            file.write(response.content)
            log_message(f"SUCCESS: {fs_write_path}")
            
        return True
    else:
        log_message(f"FAIL: Failed to retrieve data for {file_name}: {response.status_code}")
        return False

def clean_csv_columns(staging_csv):
    metastore_staging_csv_path = f"{fs_staging_path}/{staging_csv}"
    df = spark.read.options(header='True', inferSchema='True', delimiter=',').csv(metastore_staging_csv_path)
    renamed_column_names = {}
    csv_column_names = df.schema.names
    pattern = r'[^a-zA-Z0-9_]'
    
    for column_name in csv_column_names:
        clean_column_name = column_name.lower()
        clean_column_name = '_'.join(clean_column_name.split())
        clean_column_name = re.sub(pattern, '', clean_column_name)
        renamed_column_names[column_name] = clean_column_name

    clean_df = df.withColumnsRenamed(renamed_column_names)
    clean_df.toPandas().to_csv(f"{fs_clean_path}/{staging_csv}", index=False)
    log_message(f"SUCCESS: {metastore_staging_csv_path}")

In [7]:
#### Extract CSV Urls from JSON

In [8]:
metastore_data = get_json(cms_metastore_api_url)
distribution_csv_download_links = [*map(lambda j: j.get('distribution')[0].get('downloadURL'), metastore_data)]

In [9]:
#### Use Multithreading to download all csv's in paralell from the web tot he staging folder under a timestamped directory

In [10]:
threadpool_running_processes = []
failed_extractions = [],

with ThreadPoolExecutor(max_workers=10) as executor:
    for file_path in distribution_csv_download_links:
        threadpool_running_processes.append(executor.submit(download_csv_from_url, file_path))

failed_extractions = [*filter(lambda status_object: not(status_object.result()), threadpool_running_processes)]


log_message(f"FAILED EXTRACTIONS {len(failed_extractions)}")

SUCCESS: staging/2024_12_04_16_18_29/HH_National_Oct2024.csv
SUCCESS: staging/2024_12_04_16_18_29/DFC_STATE.csv
SUCCESS: staging/2024_12_04_16_18_29/ICH_CAHPS_NATIONAL.csv
SUCCESS: staging/2024_12_04_16_18_29/DFC_NATIONAL.csv
SUCCESS: staging/2024_12_04_16_18_29/Hospice_General-Information_Nov2024.csv
SUCCESS: staging/2024_12_04_16_18_29/ASCQR_OAS_CAHPS_BY_ASC.csv
SUCCESS: staging/2024_12_04_16_18_29/National_CAHPS_Hospice_Survey_Data_Nov2024.csv
SUCCESS: staging/2024_12_04_16_18_29/ASC_National.csv
SUCCESS: staging/2024_12_04_16_18_29/ICH_CAHPS_FACILITY.csv
SUCCESS: staging/2024_12_04_16_18_29/ASCQR_OAS_CAHPS_NATIONAL.csv
SUCCESS: staging/2024_12_04_16_18_29/ASC_State.csv
SUCCESS: staging/2024_12_04_16_18_29/ASCQR_OAS_CAHPS_STATE.csv
SUCCESS: staging/2024_12_04_16_18_29/CJR_Quality_Reporting_January_2024_Production_File.csv
SUCCESS: staging/2024_12_04_16_18_29/ASC_Facility.csv
SUCCESS: staging/2024_12_04_16_18_29/Complications_and_Deaths-National.csv
SUCCESS: staging/2024_12_04_16_18_

In [11]:
#### Use Multithreading to transform all staging CSV's and write to the timestamped clean folder

In [12]:
threadpool_running_processes = []
failed_extractions = []

metastore_staging_csvs = os.listdir(fs_staging_path)
os.makedirs(fs_clean_path, exist_ok=True)

with ThreadPoolExecutor(max_workers=5) as executor:
    for staging_csv in metastore_staging_csvs:
        threadpool_running_processes.append(executor.submit(clean_csv_columns, staging_csv))


SUCCESS: staging/2024_12_04_16_18_29/ASCQR_OAS_CAHPS_BY_ASC.csv
SUCCESS: staging/2024_12_04_16_18_29/ASCQR_OAS_CAHPS_NATIONAL.csv
SUCCESS: staging/2024_12_04_16_18_29/ASCQR_OAS_CAHPS_STATE.csv
SUCCESS: staging/2024_12_04_16_18_29/Anesthesiology.csv
SUCCESS: staging/2024_12_04_16_18_29/Advanced_Heart_Failure_and_Transplant_Cardiology.csv
SUCCESS: staging/2024_12_04_16_18_29/Addiction_Medicine.csv
SUCCESS: staging/2024_12_04_16_18_29/Allergy_Immunology.csv
SUCCESS: staging/2024_12_04_16_18_29/ASC_National.csv
SUCCESS: staging/2024_12_04_16_18_29/ASC_State.csv
SUCCESS: staging/2024_12_04_16_18_29/ASC_Facility.csv
SUCCESS: staging/2024_12_04_16_18_29/Audiologist.csv
SUCCESS: staging/2024_12_04_16_18_29/Cardiac_Surgery.csv
SUCCESS: staging/2024_12_04_16_18_29/Cardiology.csv
SUCCESS: staging/2024_12_04_16_18_29/CJR_Quality_Reporting_January_2024_Production_File.csv
SUCCESS: staging/2024_12_04_16_18_29/Certified_Nurse_Midwife.csv
SUCCESS: staging/2024_12_04_16_18_29/Certified_Clinical_Nurse_S