In [0]:
import os
import requests
import pandas as pd
import re
import smtplib
import json
import sys
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime



In [0]:
%sql
CREATE TABLE IF NOT EXISTS default.job_log (
    file_name STRING NOT NULL,
    url STRING NOT NULL,
    download_timestamp timestamp NOT NULL,
    Modified_date date NOT NULL
)

In [0]:
# Configuration
API_URL = "https://data.cms.gov/provider-data/api/1/metastore/schemas/dataset/items"
DATA_DIR = "data/"

In [0]:
# Load metadata to track run history and modified timestamps
def read_metadata():
    metadata_df = spark.sql("select * from default.job_log")
    return metadata_df

# Fetch datasets related to 'Hospitals'
def fetch_datasets():
    response = requests.get(API_URL)
    response.raise_for_status()  # Ensure we got a successful response
    data = response.json()
    filtered_data = [item for item in data if item.get('theme')[0] == 'Hospitals']
    return filtered_data

# Convert column names to snake_case
def to_snake_case(column_name):
    # Replace special characters with space
    column_name = re.sub(r"[^\w\s]", " ", column_name)
    # Replace spaces or multiple underscores with a single underscore
    column_name = re.sub(r"\s+", "_", column_name)
    # Convert to lowercase
    column_name = column_name.lower()
    return column_name.strip("_")
   
# # write metadata to track run history and modified timestamps
def insert_metadata(csv_filename, csv_url, last_modified, table_name="job_log"):
    """
    Insert metadata such as csv_filename, csv_url, download_timestamp, and last_modified into a specified table.
    Args:
    - csv_filename: Name of the CSV file.
    - csv_url: URL to the CSV file.
    - last_modified: Last modified date of the file.
    - table_name: The name of the table where data will be inserted (default is "your_table_name").
    """
    # Get current timestamp for download timestamp
    download_timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

    # Construct the dynamic SQL query
    insert_query = f"""
    INSERT INTO {table_name} (file_name, url, download_timestamp, Modified_date)
    VALUES ('{csv_filename}', '{csv_url}', '{download_timestamp}', '{last_modified}')
    """
    
    # Execute the dynamic SQL query using spark.sql()
    spark.sql(insert_query)

    return (f"Insert metadata executed successfully: {insert_query}")

In [0]:
def process_csv(dataset):
    # print(dataset)
    csv_url = dataset['distribution'][0]['downloadURL']
    csv_filename = os.path.basename(csv_url)
      # Check if file has been modified since last run
    last_modified = dataset.get('modified', '')
    #print(last_modified)

    metadata_df = read_metadata()

    # Filter DataFrame for the given csv_filename
    filtered_df = metadata_df.filter(metadata_df.file_name == csv_filename)

    # Check if the csv_filename exists and compare modified_date to check if file is modified.
    if filtered_df.count() > 0:
        # Extract the modified date from the filtered DataFrame
        modified_date1 = filtered_df.collect()[0]['Modified_date']
        modified_date = modified_date1.strftime('%Y-%m-%d')
        if modified_date != last_modified:
            print(f"The last modified date for {csv_filename} is different. Expected: {last_modified}, Found: {modified_date}")
            #Download and process CSV
            print(f"Downloading and processing {csv_url}...")
            response = requests.get(csv_url)
            response.raise_for_status()

            # Save the CSV
            os.makedirs(DATA_DIR, exist_ok=True)
            with open(DATA_DIR+csv_filename, 'wb') as file:
                file.write(response.content)

            # Process the CSV into DataFrame
            df = pd.read_csv(DATA_DIR+csv_filename, dtype={"column_name": str},low_memory=False)

            # Convert column names to snake_case
            df.columns = [to_snake_case(col) for col in df.columns]
            #print(df)
            spark_df = spark.createDataFrame(df)
            #display(spark_df)
            insert_metadata(csv_filename, csv_url, last_modified)
            
        else:
            print(f"Dataset {csv_filename} not modified, skipping download process....")
    else:
        print(f"CSV file '{csv_filename}' not found in the DataFrame.")
        #Download and process CSV
        print(f"Downloading and processing {csv_url}...")
        response = requests.get(csv_url)
        response.raise_for_status()

        # Save the CSV
        os.makedirs(DATA_DIR, exist_ok=True)
        with open(DATA_DIR+csv_filename, 'wb') as file:
            file.write(response.content)

        # Process the CSV into DataFrame
        df = pd.read_csv(DATA_DIR+csv_filename, dtype={"column_name": str},low_memory=False)

        # Convert column names to snake_case
        df.columns = [to_snake_case(col) for col in df.columns]
        #print(df)
        spark_df = spark.createDataFrame(df)
        #display(spark_df)
        insert_metadata(csv_filename, csv_url, last_modified)
    return csv_filename  
  


In [0]:
# Main function to download and process datasets
def main():
    # Fetch datasets
    datasets = fetch_datasets()
    
    # Process datasets in parallel
    with ThreadPoolExecutor() as executor:
        files = list(executor.map(process_csv, datasets))


if __name__ == "__main__":
    main()

CSV file 'ASCQR_OAS_CAHPS_NATIONAL.csv' not found in the DataFrame.
Downloading and processing https://data.cms.gov/provider-data/sites/default/files/resources/65a38069cc48376e9f519a3609424537_1729022709/ASCQR_OAS_CAHPS_NATIONAL.csv...
CSV file 'ASCQR_OAS_CAHPS_STATE.csv' not found in the DataFrame.
Downloading and processing https://data.cms.gov/provider-data/sites/default/files/resources/f086b4b7fc9d628f08e70ade47dff3c5_1729022711/ASCQR_OAS_CAHPS_STATE.csv...
CSV file 'ASC_State.csv' not found in the DataFrame.
Downloading and processing https://data.cms.gov/provider-data/sites/default/files/resources/d3aea2dfaeae85c44017441b184aeb6d_1729022710/ASC_State.csv...
CSV file 'ASC_Facility.csv' not found in the DataFrame.
Downloading and processing https://data.cms.gov/provider-data/sites/default/files/resources/dd03994fc93e296bb0297f1cd43cc987_1729022707/ASC_Facility.csv...
CSV file 'ASC_National.csv' not found in the DataFrame.
Downloading and processing https://data.cms.gov/provider-dat

In [0]:
%sql
select * from job_log
--truncate table job_log

file_name,url,download_timestamp,Modified_date
FY2021_Distribution_of_Net_Change_in_Base_Op_DRG_Payment_Amt.csv,https://data.cms.gov/provider-data/sites/default/files/resources/1896b4292d6cbc089d0b0bb6d8e6ebe7_1720656688/FY2021_Distribution_of_Net_Change_in_Base_Op_DRG_Payment_Amt.csv,2025-01-10T04:31:20.000+0000,2023-01-06
FY_2024_Hospital_Readmissions_Reduction_Program_Hospital.csv,https://data.cms.gov/provider-data/sites/default/files/resources/bfd0c8c38e221fa2045de81691a6e300_1720656704/FY_2024_Hospital_Readmissions_Reduction_Program_Hospital.csv,2025-01-10T04:32:42.000+0000,2024-01-08
PCH_Complications_Unplanned_Hospital_Visits_HOSPITAL.csv,https://data.cms.gov/provider-data/sites/default/files/resources/bc386a698d7a9b8d09bf37a73371a899_1729022729/PCH_Complications_Unplanned_Hospital_Visits_HOSPITAL.csv,2025-01-10T04:32:09.000+0000,2024-10-10
PCH_Complications_Unplanned_Hospital_Visits_NATIONAL.csv,https://data.cms.gov/provider-data/sites/default/files/resources/6a91b2a437ea47e61cd34c975dd54729_1729022730/PCH_Complications_Unplanned_Hospital_Visits_NATIONAL.csv,2025-01-10T04:32:18.000+0000,2024-10-10
CJR_Quality_Reporting_January_2024_Production_File.csv,https://data.cms.gov/provider-data/sites/default/files/resources/739d54d012ec9a7113d06d2f1d29e9c2_1720656684/CJR_Quality_Reporting_January_2024_Production_File.csv,2025-01-10T04:31:04.000+0000,2024-01-08
Veterans_Health_Administration_Provider_Level_Data.csv,https://data.cms.gov/provider-data/sites/default/files/resources/9cea8696850b78e02559183bd6071fd6_1729022762/Veterans_Health_Administration_Provider_Level_Data.csv,2025-01-10T04:34:05.000+0000,2024-10-10
Medicare_Hospital_Spending_Per_Patient-National.csv,https://data.cms.gov/provider-data/sites/default/files/resources/ff15a8451b83ebc70b85a5c0e870a751_1729022747/Medicare_Hospital_Spending_Per_Patient-National.csv,2025-01-10T04:33:18.000+0000,2024-10-10
Medicare_Hospital_Spending_Per_Patient-Hospital.csv,https://data.cms.gov/provider-data/sites/default/files/resources/69874ce604586980ac088283c1b35095_1729022746/Medicare_Hospital_Spending_Per_Patient-Hospital.csv,2025-01-10T04:33:08.000+0000,2024-10-10
PCH_HEALTHCARE_ASSOCIATED_INFECTIONS_HOSPITAL.csv,https://data.cms.gov/provider-data/sites/default/files/resources/d24fd9fa17ad5673de9cf6220c1c7a5a_1729022732/PCH_HEALTHCARE_ASSOCIATED_INFECTIONS_HOSPITAL.csv,2025-01-10T04:32:26.000+0000,2024-10-10
Medicare_Hospital_Spending_Per_Patient-State.csv,https://data.cms.gov/provider-data/sites/default/files/resources/b53c8e7d24557eb1e898557e4c7d7484_1729022747/Medicare_Hospital_Spending_Per_Patient-State.csv,2025-01-10T04:33:18.000+0000,2024-10-10


In [0]:
%sh

pwd
cd /databricks/driver/data
ls

/databricks/driver
ASCQR_OAS_CAHPS_BY_ASC.csv
ASCQR_OAS_CAHPS_NATIONAL.csv
ASCQR_OAS_CAHPS_STATE.csv
ASC_Facility.csv
ASC_National.csv
ASC_State.csv
CJR_Quality_Reporting_January_2024_Production_File.csv
CMS_PSI_6_decimal_file.csv
Complications_and_Deaths-Hospital.csv
Complications_and_Deaths-National.csv
Complications_and_Deaths-State.csv
Data_Updates_October_2024.csv
FY2021_Distribution_of_Net_Change_in_Base_Op_DRG_Payment_Amt.csv
FY2021_Net_Change_in_Base_Op_DRG_Payment_Amt.csv
FY2021_Percent_Change_in_Medicare_Payments.csv
FY2021_Value_Based_Incentive_Payment_Amount.csv
FY_2024_HAC_Reduction_Program_Hospital.csv
FY_2024_Hospital_Readmissions_Reduction_Program_Hospital.csv
Footnote_Crosswalk.csv
HCAHPS-Hospital.csv
HCAHPS-National.csv
HCAHPS-State.csv
HOSPITAL_QUARTERLY_MSPB_6_DECIMALS.csv
Health_Equity_Hospital.csv
Health_Equity_National.csv
Health_Equity_State.csv
Healthcare_Associated_Infections-Hospital.csv
Healthcare_Associated_Infections-National.csv
Healthcare_Associated_Infe