In [1]:
import requests
import json
import csv

# Define the base URL of the API
api_url = "https://npiregistry.cms.hhs.gov/api/"

# Set parameters (adjust limit and skip for pagination)
params = {
    "number": "",
    "enumeration_type": "",
    "taxonomy_description": "",
    "name_purpose": "",
    "first_name": "",
    "use_first_name_alias": "",
    "last_name": "",
    "organization_name": "",
    "address_purpose": "",
    "city": "Topeka",
    "state": "KS",  # Corrected to the two-letter state code
    "postal_code": "",
    "country_code": "",
    "limit": 200,
    "skip": 0,
    "pretty": "on",
    "version": "2.1"
}

try:
    # Make the API request with a timeout
    response = requests.get(api_url, params=params, timeout=10)

    # Check if the response is successful
    if response.status_code == 200:
        # Parse the JSON data
        data = response.json()

        # Extract results from JSON data
        results = data.get("results", [])

        if results:
            # Define CSV file path
            csv_filename = "npi_registry_results.csv"

            # Extract relevant fields from each result
            csv_data = []
            for result in results:
                row = {
                    "number": result.get("number", ""),
                    "enumeration_type": result.get("enumeration_type", ""),
                    "enumeration_date": result.get("enumeration_date", ""),
                    "taxonomy_description": result.get("taxonomy_description", ""),
                    "first_name": result.get("basic", {}).get("first_name", ""),
                    "last_name": result.get("basic", {}).get("last_name", ""),
                    "organization_name": result.get("basic", {}).get("organization_name", ""),
                    "address": result.get("addresses", [{}])[0].get("address_1", ""),
                    "address_purpose": result.get("basic", {}).get("address_purpose", ""),
                    "city": result.get("addresses", [{}])[0].get("city", ""),
                    "state": result.get("addresses", [{}])[0].get("state", ""),
                    "postal_code": result.get("addresses", [{}])[0].get("postal_code", ""),
                    "country_code": result.get("addresses", [{}])[0].get("country_code", ""),
                    "telephone_number": result.get("addresses", [{}])[0].get("telephone_number", ""),
                    "fax_number": result.get("addresses", [{}])[0].get("fax_number", ""),
                    "taxonomy_code": result.get("taxonomies", [{}])[0].get("code", ""),
                    "taxonomy_group": result.get("taxonomies", [{}])[0].get("taxonomy_group", ""),
                    "taxonomy_description": result.get("taxonomies", [{}])[0].get("desc", ""),
                    "license_no": result.get("taxonomies", [{}])[0].get("license", "")
                }
                csv_data.append(row)

            # Write data to CSV file
            with open(csv_filename, mode='w', newline='', encoding='utf-8') as csvfile:
                fieldnames = ["number", "enumeration_type", "enumeration_date","taxonomy_description", "first_name", "last_name",
                              "organization_name", "address", "address_purpose","city", "state", "postal_code", "country_code",
                              "telephone_number", "fax_number", "taxonomy_code", "taxonomy_group", "taxonomy_description", 
                              "license_no"
                              ]
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

                writer.writeheader()
                for row in csv_data:
                    writer.writerow(row)

            print(f"CSV file '{csv_filename}' has been successfully created.")
        else:
            print("No results found.")

    else:
        print(f"Failed to retrieve data. Status code: {response.status_code}")
        print(f"Error message: {response.text}")

except requests.exceptions.RequestException as e:
    print(f"An error occurred: {e}")


CSV file 'npi_registry_results.csv' has been successfully created.


In [2]:
import requests
import zipfile
import os
from io import BytesIO
from datetime import datetime

def get_quarter_and_abbr(month):
    """Determine the quarter and its corresponding abbreviation based on the month."""
    if month in [1, 2, 3]:
        return "january", "JAN"
    elif month in [4, 5, 6]:
        return "april", "APR"
    elif month in [7, 8, 9]:
        return "july", "JUL"
    else:
        return "october", "OCT"

def get_quarterly_zip_url(year=None, month=None):
    """Construct the quarterly ZIP URL based on the current year and quarter."""
    now = datetime.now()
    year = year or now.year
    month = month or now.month

    # Get the quarter name
    quarter, _ = get_quarter_and_abbr(month)

    # Construct the URL
    zip_url = f"https://www.cms.gov/files/zip/{quarter}-{year}-alpha-numeric-hcpcs-file.zip"
    return zip_url

def list_files_in_zip(zip_url):
    """Download the ZIP file and list all files inside."""
    try:
        # Send a GET request to download the ZIP file content
        response = requests.get(zip_url, stream=True)
        
        # Check if the request was successful
        if response.status_code == 200:
            # Create a BytesIO object from the response content
            zip_file = BytesIO(response.content)

            # Open the ZIP file from the BytesIO object
            with zipfile.ZipFile(zip_file, 'r') as z:
                # List all files inside the ZIP
                print("Files in the ZIP archive:")
                for file in z.namelist():
                    print(file)
                return z.namelist()  # Return the list of files
        else:
            print(f"Failed to download the file. Status code: {response.status_code}")
            print(f"Error message: {response.text}")
            return []

    except requests.exceptions.RequestException as e:
        print(f"An error occurred during the request: {e}")
        return []
    except zipfile.BadZipFile:
        print("The downloaded file is not a valid ZIP file.")
        return []

def download_and_extract_specific_file(zip_url, filename_prefix, extract_to):
    """Download the ZIP file and extract the specific NOC file (.xls or .xlsx) to the given path."""
    try:
        # Send a GET request to download the ZIP file content
        response = requests.get(zip_url, stream=True)
        
        # Check if the request was successful
        if response.status_code == 200:
            # Create a BytesIO object from the response content
            zip_file = BytesIO(response.content)

            # Open the ZIP file from the BytesIO object
            with zipfile.ZipFile(zip_file, 'r') as z:
                # Try to find the file with either .xls or .xlsx extension
                file_to_extract = None
                for file in z.namelist():
                    if file.startswith(filename_prefix) and (file.endswith('.xls') or file.endswith('.xlsx')):
                        file_to_extract = file
                        break

                # If the file was found, extract it
                if file_to_extract:
                    z.extract(file_to_extract, extract_to)
                    print(f"'{file_to_extract}' has been extracted successfully to '{extract_to}'.")
                else:
                    print(f"'{filename_prefix}' not found in the ZIP file with either .xls or .xlsx extension.")
        else:
            print(f"Failed to download the file. Status code: {response.status_code}")
            print(f"Error message: {response.text}")

    except requests.exceptions.RequestException as e:
        print(f"An error occurred during the request: {e}")
    except zipfile.BadZipFile:
        print("The downloaded file is not a valid ZIP file.")

def get_noc_codes_filename_prefix(year, quarter_abbr):
    """Construct the expected NOC codes file prefix with a space between the month and year."""
    return f"NOC codes_{quarter_abbr} {year}"

if __name__ == "__main__":
    # Get the current date and determine the year and month
    now = datetime.now()
    year = now.year
    month = now.month

    # Get the current quarter and its abbreviation
    quarter, quarter_abbr = get_quarter_and_abbr(month)

    # Get the URL for the quarterly ZIP file
    zip_url = get_quarterly_zip_url(year, month)

    # List all files inside the ZIP archive
    zip_file_list = list_files_in_zip(zip_url)

    # Define the expected filename prefix for the NOC codes file (with space between month and year)
    noc_filename_prefix = get_noc_codes_filename_prefix(year, quarter_abbr)

    # Define the path to save the extracted file
    extract_path = "/root/projects/portfolio_projects/data_engineering/healthcare-etl-data-pipeline/data_extraction/files"

    # Create the directory if it does not exist
    os.makedirs(extract_path, exist_ok=True)

    # Extract the NOC codes file if it exists in the ZIP
    download_and_extract_specific_file(zip_url, noc_filename_prefix, extract_path)


Files in the ZIP archive:
HCPC2024_JUL_ANWEB_Transaction Report_v3.xlsx
HCPC2024_JUL_ANWEB_v3.txt
HCPC2024_JUL_ANWEB_v3.xlsx
HCPC2024_JUL_Corrections on V3.xlsx
HCPC2024_recordlayout.txt
NOC codes_JUL 2024.xlsx
proc_notes_JUL2024.txt
'NOC codes_JUL 2024.xlsx' has been extracted successfully to '/root/projects/portfolio_projects/data_engineering/healthcare-etl-data-pipeline/data_extraction/files'.


In [1]:
import requests

# Base URL format for the ICD files
BASE_URL = "https://www.cms.gov/files/document/valid-icd-{}-list.xlsx"

def check_version_exists(version):
    """Check if a specific version of the ICD list exists."""
    url = BASE_URL.format(version)
    response = requests.head(url)
    if response.status_code == 200:
        return True, url
    else:
        return False, None

def get_latest_icd_version():
    """Determine the latest available ICD version by incrementing version numbers."""
    version = 10  # Start checking from ICD-10

    while True:
        # Check if the next version exists
        exists, url = check_version_exists(version)
        if exists:
            latest_version = version
            print(f"ICD-{latest_version} is available at {url}")
            version += 1  # Check the next version
        else:
            # If the next version does not exist, return the latest available version
            print(f"ICD-{version} not found. Latest available version is ICD-{latest_version}.")
            return latest_version, BASE_URL.format(latest_version)

def download_icd_file(icd_url):
    """Download the ICD file from the provided URL."""
    try:
        response = requests.get(icd_url, stream=True)
        if response.status_code == 200:
            # Extract the filename from the URL
            file_name = icd_url.split('/')[-1]

            # Save the file
            with open(file_name, 'wb') as file:
                for chunk in response.iter_content(chunk_size=8192):
                    file.write(chunk)

            print(f"File '{file_name}' downloaded successfully.")
        else:
            print(f"Failed to download the file. Status code: {response.status_code}")

    except requests.exceptions.RequestException as e:
        print(f"An error occurred: {e}")

if __name__ == "__main__":
    # Get the latest ICD version URL
    latest_version, latest_icd_url = get_latest_icd_version()

    # If a valid URL was found, download the ICD file
    if latest_icd_url:
        download_icd_file(latest_icd_url)


ICD-10 is available at https://www.cms.gov/files/document/valid-icd-10-list.xlsx
ICD-11 not found. Latest available version is ICD-10.
File 'valid-icd-10-list.xlsx' downloaded successfully.


ICD-10 is available at https://www.cms.gov/files/document/valid-icd-10-list.xlsx
ICD-11 not found. Latest available version is ICD-10.
File 'valid-icd-10-list.xlsx' downloaded successfully.


In [13]:
import requests
import csv

# Base API URL
BASE_API_URL = "https://api.fda.gov/drug/ndc.json"

def fetch_fda_data(base_url, query, limit=1000, max_records=10000):
    all_data = []
    skip = 0

    while True:
        # Construct the full URL with limit and skip for pagination
        url = f"{base_url}?search={query}&limit={limit}&skip={skip}"
        print(f"Fetching data from: {url}")
        
        try:
            # Send a GET request to the API
            response = requests.get(url)
            
            # Check if the request was successful
            if response.status_code == 200:
                # Parse the JSON response
                data = response.json()
                
                # Check if there are any results
                if "results" in data and data["results"]:
                    all_data.extend(data["results"])
                    print(f"Fetched {len(data['results'])} records, Total so far: {len(all_data)}")
                    
                    # Increase skip to get the next batch of records
                    skip += limit
                    
                    # Stop if we've fetched at least 5000 records
                    if len(all_data) >= max_records:
                        all_data = all_data[:max_records]  # Limit to exactly 5000 records if more were fetched
                        break
                else:
                    print("No more data to fetch.")
                    break
            else:
                print(f"Failed to retrieve data. Status code: {response.status_code}")
                break
        except requests.exceptions.RequestException as e:
            print(f"An error occurred: {e}")
            break
    
    return all_data

def save_data_to_csv(data, file_path):
    try:
        # Open a file for writing
        with open(file_path, mode='w', newline='', encoding='utf-8') as file:
            writer = csv.writer(file)

            # Write the header row
            writer.writerow([
                "product_ndc", "generic_name", "labeler_name", "brand_name", 
                "active_ingredient_name", "active_ingredient_strength", "finished", 
                "package_ndc", "package_description", "marketing_start_date", 
                "listing_expiration_date", "manufacturer_name", "marketing_category", 
                "dosage_form", "product_type", "route"
            ])
            
            # Write the data rows
            for item in data:
                # Some fields have nested data (active_ingredients, packaging, openfda), we need to handle these
                product_ndc = item.get('product_ndc', '')
                generic_name = item.get('generic_name', '')
                labeler_name = item.get('labeler_name', '')
                brand_name = item.get('brand_name', '')
                finished = item.get('finished', '')
                listing_expiration_date = item.get('listing_expiration_date', '')
                marketing_category = item.get('marketing_category', '')
                dosage_form = item.get('dosage_form', '')
                product_type = item.get('product_type', '')
                
                # Extract active ingredient (handling multiple active ingredients)
                for active in item.get('active_ingredients', []):
                    active_ingredient_name = active.get('name', '')
                    active_ingredient_strength = active.get('strength', '')
                    
                    # Extract packaging details (handling multiple packages)
                    for pkg in item.get('packaging', []):
                        package_ndc = pkg.get('package_ndc', '')
                        package_description = pkg.get('description', '')
                        marketing_start_date = pkg.get('marketing_start_date', '')
                        
                        # Extract manufacturer_name from openfda
                        manufacturer_name = ','.join(item.get('openfda', {}).get('manufacturer_name', []))
                        
                        # Extract route (can be multiple)
                        route = ','.join(item.get('route', []))

                        # Write row with all fields
                        writer.writerow([
                            product_ndc, generic_name, labeler_name, brand_name, 
                            active_ingredient_name, active_ingredient_strength, finished, 
                            package_ndc, package_description, marketing_start_date, 
                            listing_expiration_date, manufacturer_name, marketing_category, 
                            dosage_form, product_type, route
                        ])
        
        print(f"Data saved to {file_path}")
    except IOError as e:
        print(f"Error saving data to file: {e}")

if __name__ == "__main__":
    # Search query for finished drugs
    query = "finished:true"
    
    # Fetch all data from the FDA API with a limit of 5000 records
    fda_data = fetch_fda_data(BASE_API_URL, query, limit=1000, max_records=10000)
    
    # If data was successfully retrieved, save it to a CSV file
    if fda_data:
        # Define the file path where the data will be saved as CSV
        file_path = "/root/projects/portfolio_projects/data_engineering/healthcare-etl-data-pipeline/data_extraction/files/fda_ndc_raw.csv"
        save_data_to_csv(fda_data, file_path)


Fetching data from: https://api.fda.gov/drug/ndc.json?search=finished:true&limit=1000&skip=0
Fetched 1000 records, Total so far: 1000
Fetching data from: https://api.fda.gov/drug/ndc.json?search=finished:true&limit=1000&skip=1000
Fetched 1000 records, Total so far: 2000
Fetching data from: https://api.fda.gov/drug/ndc.json?search=finished:true&limit=1000&skip=2000
Fetched 1000 records, Total so far: 3000
Fetching data from: https://api.fda.gov/drug/ndc.json?search=finished:true&limit=1000&skip=3000
Fetched 1000 records, Total so far: 4000
Fetching data from: https://api.fda.gov/drug/ndc.json?search=finished:true&limit=1000&skip=4000
Fetched 1000 records, Total so far: 5000
Data saved to /root/projects/portfolio_projects/data_engineering/healthcare-etl-data-pipeline/data_extraction/files/ndc_finished_drug_products.csv


In [5]:
import os
import requests
import psycopg2
from psycopg2 import sql

BASE_URL = "https://www.cms.gov/files/document/valid-icd-{}-list.xlsx"

db_params = {
    'dbname': 'health_data',
    'user': 'postgres',
    'password': 'postgres',
    'host': 'localhost',
    'port': '5432'
}

def check_version_exists(version):
    url = BASE_URL.format(version)
    response = requests.head(url)
    return response.status_code == 200, url

def get_latest_icd_version():
    version = 10
    latest_version = None

    while True:
        exists, url = check_version_exists(version)
        if exists:
            latest_version = version
            print(f"ICD-{latest_version} is available at {url}")
            version += 1
        else:
            if latest_version is not None:
                print(f"ICD-{version} not found. Latest available version is ICD-{latest_version}.")
                return latest_version, BASE_URL.format(latest_version)
            else:
                print("No valid ICD versions found.")
                return None, None

def insert_icd_metadata_to_db(version, file_name, url):
    try:
        conn = psycopg2.connect(**db_params)
        cursor = conn.cursor()

        create_table_query = """
        CREATE TABLE IF NOT EXISTS icd_files (
            id SERIAL PRIMARY KEY,
            version INT,
            file_name VARCHAR(255),
            file_url TEXT
        );
        """
        cursor.execute(create_table_query)
        conn.commit()

        insert_query = """
        INSERT INTO icd_files (version, file_name, file_url)
        VALUES (%s, %s, %s);
        """
        cursor.execute(insert_query, (version, file_name, url))
        conn.commit()

        print(f"Metadata for ICD-{version} saved to database.")

        # Confirm the insertion
        cursor.execute("SELECT * FROM icd_files WHERE version = %s;", (version,))
        records = cursor.fetchall()
        print("Current records in the table:", records)

    except psycopg2.DatabaseError as db_error:
        print(f"Database error: {db_error}")
    finally:
        if conn:
            cursor.close()
            conn.close()

def download_icd_file(icd_url, version):
    try:
        response = requests.get(icd_url, stream=True)
        if response.status_code == 200:
            file_name = icd_url.split('/')[-1]
            insert_icd_metadata_to_db(version, file_name, icd_url)
        else:
            print(f"Failed to download the file. Status code: {response.status_code}")

    except requests.exceptions.RequestException as e:
        print(f"An error occurred: {e}")

if __name__ == "__main__":
    latest_version, latest_icd_url = get_latest_icd_version()
    if latest_icd_url:
        download_icd_file(latest_icd_url, latest_version)


ICD-10 is available at https://www.cms.gov/files/document/valid-icd-10-list.xlsx
ICD-11 not found. Latest available version is ICD-10.
Metadata for ICD-10 saved to database.
Current records in the table: [(1, 10, 'valid-icd-10-list.xlsx', 'https://www.cms.gov/files/document/valid-icd-10-list.xlsx')]


In [2]:
import csv

with open('/root/projects/portfolio_projects/data_engineering/healthcare-etl-data-pipeline/icd_codes_data.csv', 'r') as file:
    reader = csv.reader(file)
    for line_number, row in enumerate(reader, start=1):
        if len(row) != 3:
            print(f"Line {line_number} has {len(row)} columns: {row}")


In [2]:
import requests
import psycopg2
import csv

# Base API URL
BASE_API_URL = "https://api.fda.gov/drug/ndc.json"

db_params = {
    'host': 'localhost', 
    'dbname': 'postgres',
    'user': 'postgres',
    'password': 'postgres',
    'options': '-c search_path=private'
}

def fetch_fda_data(base_url, query, limit=1000, max_records=10000):
    all_data = []
    skip = 0

    while True:
        # Construct the full URL with limit and skip for pagination
        url = f"{base_url}?search={query}&limit={limit}&skip={skip}"
        print(f"Fetching data from: {url}")
        
        try:
            # Send a GET request to the API
            response = requests.get(url)
            
            # Check if the request was successful
            if response.status_code == 200:
                # Parse the JSON response
                data = response.json()
                
                # Check if there are any results
                if "results" in data and data["results"]:
                    all_data.extend(data["results"])
                    print(f"Fetched {len(data['results'])} records, Total so far: {len(all_data)}")
                    
                    # Increase skip to get the next batch of records
                    skip += limit
                    
                    # Stop if we've fetched at least 5000 records
                    if len(all_data) >= max_records:
                        all_data = all_data[:max_records]  # Limit to exactly 5000 records if more were fetched
                        break
                else:
                    print("No more data to fetch.")
                    break
            else:
                print(f"Failed to retrieve data. Status code: {response.status_code}")
                break
        except requests.exceptions.RequestException as e:
            print(f"An error occurred: {e}")
            break
    
    return all_data

def save_data_to_postgres(data, db_params):
    try:
        # Establish a connection to PostgreSQL
        conn = psycopg2.connect(**db_params)
        cursor = conn.cursor()
        
        # Create table if it doesn't exist
        create_table_query = '''
        CREATE TABLE IF NOT EXISTS fda_ndc_raw (
            product_ndc VARCHAR(50),
            generic_name VARCHAR(255),
            labeler_name VARCHAR(255),
            brand_name VARCHAR(255),
            active_ingredient_name VARCHAR(255),
            active_ingredient_strength VARCHAR(255),
            finished BOOLEAN,
            package_ndc VARCHAR(50),
            package_description TEXT,
            marketing_start_date DATE,
            listing_expiration_date DATE,
            manufacturer_name VARCHAR(255),
            marketing_category VARCHAR(255),
            dosage_form VARCHAR(100),
            product_type VARCHAR(100),
            route VARCHAR(255)
        );
        '''
        cursor.execute(create_table_query)
        
        # Insert data
        insert_query = '''
        INSERT INTO fda_ndc_raw (
            product_ndc, generic_name, labeler_name, brand_name, active_ingredient_name,
            active_ingredient_strength, finished, package_ndc, package_description, 
            marketing_start_date, listing_expiration_date, manufacturer_name, 
            marketing_category, dosage_form, product_type, route
        ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
        '''
        
        for item in data:
            product_ndc = item.get('product_ndc', '')
            generic_name = item.get('generic_name', '')
            labeler_name = item.get('labeler_name', '')
            brand_name = item.get('brand_name', '')
            finished = item.get('finished', False)
            listing_expiration_date = item.get('listing_expiration_date', None)
            marketing_category = item.get('marketing_category', '')
            dosage_form = item.get('dosage_form', '')
            product_type = item.get('product_type', '')
            
            for active in item.get('active_ingredients', []):
                active_ingredient_name = active.get('name', '')
                active_ingredient_strength = active.get('strength', '')
                
                for pkg in item.get('packaging', []):
                    package_ndc = pkg.get('package_ndc', '')
                    package_description = pkg.get('description', '')
                    marketing_start_date = pkg.get('marketing_start_date', None)
                    
                    manufacturer_name = ','.join(item.get('openfda', {}).get('manufacturer_name', []))
                    route = ','.join(item.get('route', []))

                    # Insert the row into PostgreSQL
                    cursor.execute(insert_query, (
                        product_ndc, generic_name, labeler_name, brand_name, 
                        active_ingredient_name, active_ingredient_strength, finished, 
                        package_ndc, package_description, marketing_start_date, 
                        listing_expiration_date, manufacturer_name, marketing_category, 
                        dosage_form, product_type, route
                    ))

        # Commit the transaction
        conn.commit()
        print("Data saved to PostgreSQL successfully.")
    
    except Exception as e:
        print(f"Error saving data to PostgreSQL: {e}")
    finally:
        if conn:
            cursor.close()
            conn.close()

if __name__ == "__main__":
    # Search query for finished drugs
    query = "finished:true"
    
    # Fetch all data from the FDA API with a limit of 5000 records
    fda_data = fetch_fda_data(BASE_API_URL, query, limit=1000, max_records=10000)
    
    # If data was successfully retrieved, save it to PostgreSQL
    if fda_data:
        save_data_to_postgres(fda_data, db_params)


Fetching data from: https://api.fda.gov/drug/ndc.json?search=finished:true&limit=1000&skip=0
Fetched 1000 records, Total so far: 1000
Fetching data from: https://api.fda.gov/drug/ndc.json?search=finished:true&limit=1000&skip=1000
Fetched 1000 records, Total so far: 2000
Fetching data from: https://api.fda.gov/drug/ndc.json?search=finished:true&limit=1000&skip=2000
Fetched 1000 records, Total so far: 3000
Fetching data from: https://api.fda.gov/drug/ndc.json?search=finished:true&limit=1000&skip=3000
Fetched 1000 records, Total so far: 4000
Fetching data from: https://api.fda.gov/drug/ndc.json?search=finished:true&limit=1000&skip=4000
Fetched 1000 records, Total so far: 5000
Fetching data from: https://api.fda.gov/drug/ndc.json?search=finished:true&limit=1000&skip=5000
Fetched 1000 records, Total so far: 6000
Fetching data from: https://api.fda.gov/drug/ndc.json?search=finished:true&limit=1000&skip=6000
Fetched 1000 records, Total so far: 7000
Fetching data from: https://api.fda.gov/drug

In [3]:
import requests
import csv
import json
from google.cloud import storage
import os

# Set the environment variable for Google Cloud credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/root/projects/portfolio_projects/data_engineering/healthcare-etl-data-pipeline/gcp/healthcare-etl-data-project-0499b67cb8ee.json"


# Define the base URL of the API
api_url = "https://npiregistry.cms.hhs.gov/api/"

# Set parameters for API requests
params = {
    "number": "",
    "enumeration_type": "",
    "taxonomy_description": "",
    "name_purpose": "",
    "first_name": "",
    "use_first_name_alias": "",
    "last_name": "",
    "organization_name": "",
    "address_purpose": "",
    "city": "Topeka",
    "state": "KS",
    "postal_code": "",
    "country_code": "",
    "limit": 200,
    "skip": 0,
    "pretty": "on",
    "version": "2.1"
}

def fetch_npi_data(params):
    try:
        response = requests.get(api_url, params=params, timeout=10)
        if response.status_code == 200:
            data = response.json()
            return data.get("results", [])
        else:
            print(f"Failed to retrieve data. Status code: {response.status_code}")
            return []
    except requests.exceptions.RequestException as e:
        print(f"An error occurred: {e}")
        return []

def save_to_gcs(bucket_name, file_name, data, content_type='application/json'):
    """Saves data to Google Cloud Storage."""
    try:
        # Initialize the Google Cloud Storage client
        client = storage.Client()
        bucket = client.get_bucket(bucket_name)
        blob = bucket.blob(file_name)
        
        # Upload data to GCS
        if content_type == 'application/json':
            blob.upload_from_string(data, content_type='application/json')
        elif content_type == 'text/csv':
            blob.upload_from_string(data, content_type='text/csv')
        
        print(f"File {file_name} successfully saved to GCS bucket {bucket_name}")
    except Exception as e:
        print(f"Failed to save file to GCS: {e}")

def write_data_to_csv(data):
    """Writes API data to CSV format and returns it as a string."""
    output = []
    for result in data:
        row = {
            'number': result.get("number", ""),
            'enumeration_type': result.get("enumeration_type", ""),
            'enumeration_date': result.get("basic", {}).get("enumeration_date", ""),
            'first_name': result.get("basic", {}).get("first_name", ""),
            'last_name': result.get("basic", {}).get("last_name", ""),
            'organization_name': result.get("basic", {}).get("organization_name", ""),
            'address': result.get("addresses", [{}])[0].get("address_1", ""),
            'city': result.get("addresses", [{}])[0].get("city", ""),
            'state': result.get("addresses", [{}])[0].get("state", ""),
            'postal_code': result.get("addresses", [{}])[0].get("postal_code", ""),
            'country_code': result.get("addresses", [{}])[0].get("country_code", ""),
            'telephone_number': result.get("addresses", [{}])[0].get("telephone_number", ""),
            'taxonomy_code': result.get("taxonomies", [{}])[0].get("code", ""),
            'taxonomy_group': result.get("taxonomies", [{}])[0].get("taxonomy_group", ""),
            'taxonomy_desc': result.get("taxonomies", [{}])[0].get("desc", ""),
            'license_no': result.get("taxonomies", [{}])[0].get("license", "")
        }
        output.append(row)

    # Write the data to CSV format
    csv_output = csv.DictWriter(open('temp.csv', 'w'), fieldnames=output[0].keys())
    csv_output.writeheader()
    csv_output.writerows(output)

    # Read the content of the CSV file back
    with open('temp.csv', 'r') as f:
        csv_data = f.read()
    return csv_data

def main():
    # Fetch data from API
    results = fetch_npi_data(params)
    if not results:
        print("No results found.")
        return

    # Convert the data to JSON format
    json_data = json.dumps(results, indent=4)
    file_name_json = 'npi_registry_raw.json'
    
    # Save the JSON data to GCS
    save_to_gcs(bucket_name, file_name_json, json_data, content_type='application/json')

    # Alternatively, write the data to CSV and save to GCS
    csv_data = write_data_to_csv(results)
    file_name_csv = 'npi_registry_raw.csv'
    
    # Save the CSV data to GCS
    save_to_gcs(bucket_name, file_name_csv, csv_data, content_type='text/csv')

if __name__ == "__main__":
    main()


File npi_registry_raw.json successfully saved to GCS bucket health_data_pipeline
File npi_registry_raw.csv successfully saved to GCS bucket health_data_pipeline


In [None]:
import requests
import psycopg2
from psycopg2 import sql

# Database connection details (update these if needed)
db_params = {
    'host': 'localhost',
    'dbname': 'postgres',
    'user': 'postgres',
    'password': 'postgres',
    'options': '-c search_path=private'
}

# Define the base URL of the API
api_url = "https://npiregistry.cms.hhs.gov/api/"

# Set parameters for API requests
params = {
    "number": "",
    "enumeration_type": "",
    "taxonomy_description": "",
    "name_purpose": "",
    "first_name": "",
    "use_first_name_alias": "",
    "last_name": "",
    "organization_name": "",
    "address_purpose": "",
    "city": "Topeka",
    "state": "KS",
    "postal_code": "",
    "country_code": "",
    "limit": 200,
    "skip": 0,
    "pretty": "on",
    "version": "2.1"
}

def create_npi_table():
    """Create the NPI registry table in PostgreSQL."""
    try:
        conn = psycopg2.connect(**db_params)
        cursor = conn.cursor()

        create_table_query = """
        CREATE TABLE IF NOT EXISTS npi_registry_raw (
            number VARCHAR(50),
            enumeration_type VARCHAR(50),
            enumeration_date DATE,
            taxonomy_description VARCHAR(255),
            first_name VARCHAR(50),
            last_name VARCHAR(50),
            organization_name VARCHAR(255),
            address VARCHAR(255),
            address_purpose VARCHAR(255),
            city VARCHAR(50),
            state VARCHAR(2),
            postal_code VARCHAR(20),
            country_code VARCHAR(2),
            telephone_number VARCHAR(20),
            fax_number VARCHAR(20),
            taxonomy_code VARCHAR(50),
            taxonomy_group VARCHAR(255),
            taxonomy_desc VARCHAR(255),
            license_no VARCHAR(50)
        );
        """
        cursor.execute(create_table_query)
        conn.commit()
        print("Table npi_registry_raw created or already exists.")
    except psycopg2.DatabaseError as db_error:
        print(f"Database error: {db_error}")
    finally:
        if conn:
            cursor.close()
            conn.close()

def fetch_npi_data(params):
    """Fetch NPI data from the API using the provided parameters."""
    try:
        response = requests.get(api_url, params=params, timeout=10)
        if response.status_code == 200:
            data = response.json()
            return data.get("results", [])
        else:
            print(f"Failed to retrieve data. Status code: {response.status_code}")
            return []
    except requests.exceptions.RequestException as e:
        print(f"An error occurred: {e}")
        return []

def insert_npi_data(results):
    """Insert fetched NPI data into the PostgreSQL database."""
    try:
        conn = psycopg2.connect(**db_params)
        cursor = conn.cursor()

        insert_query = """
        INSERT INTO npi_registry_raw (
            number, enumeration_type, enumeration_date, taxonomy_description,
            first_name, last_name, organization_name, address, address_purpose,
            city, state, postal_code, country_code, telephone_number, fax_number,
            taxonomy_code, taxonomy_group, taxonomy_desc, license_no
        ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
        """

        for result in results:
            row = (
                result.get("number", ""),
                result.get("enumeration_type", ""),
                result.get("basic", {}).get("enumeration_date", None),
                result.get("taxonomy_description", ""),
                result.get("basic", {}).get("first_name", ""),
                result.get("basic", {}).get("last_name", ""),
                result.get("basic", {}).get("organization_name", ""),
                result.get("addresses", [{}])[0].get("address_1", ""),
                result.get("basic", {}).get("address_purpose", ""),
                result.get("addresses", [{}])[0].get("city", ""),
                result.get("addresses", [{}])[0].get("state", ""),
                result.get("addresses", [{}])[0].get("postal_code", ""),
                result.get("addresses", [{}])[0].get("country_code", ""),
                result.get("addresses", [{}])[0].get("telephone_number", ""),
                result.get("addresses", [{}])[0].get("fax_number", ""),
                result.get("taxonomies", [{}])[0].get("code", ""),
                result.get("taxonomies", [{}])[0].get("taxonomy_group", ""),
                result.get("taxonomies", [{}])[0].get("desc", ""),
                result.get("taxonomies", [{}])[0].get("license", "")
            )
            cursor.execute(insert_query, row)

        conn.commit()
        print("Data successfully saved to the PostgreSQL database.")
    except psycopg2.DatabaseError as db_error:
        print(f"Database error: {db_error}")
    finally:
        if conn:
            cursor.close()
            conn.close()

def extract_npi_data():
    """Extract NPI data from the API and save it to the PostgreSQL database."""
    create_npi_table()
    results = fetch_npi_data(params)
    if results:
        insert_npi_data(results)
    else:
        print("No results found.")

if __name__ == "__main__":
    extract_npi_data()

In [None]:
import os
import requests
import pandas as pd
import psycopg2
from psycopg2 import sql

# Base URL format for the ICD files
BASE_URL = "https://www.cms.gov/files/document/valid-icd-{}-list.xlsx"

# Database connection details (update these if needed)
db_params = {
    'host': 'localhost',
    'dbname': 'postgres',
    'user': 'postgres',
    'password': 'postgres',
    'options': '-c search_path=private'
}

def check_version_exists(version):
    """Check if a specific version of the ICD list exists."""
    url = BASE_URL.format(version)
    response = requests.head(url)
    return response.status_code == 200, url

def get_latest_icd_version():
    """Determine the latest available ICD version by incrementing version numbers."""
    version = 10  # Start checking from ICD-10
    latest_version = None

    while True:
        exists, url = check_version_exists(version)
        if exists:
            latest_version = version
            print(f"ICD-{latest_version} is available at {url}")
            version += 1  # Check the next version
        else:
            if latest_version is not None:
                print(f"ICD-{version} not found. Latest available version is ICD-{latest_version}.")
                return latest_version, BASE_URL.format(latest_version)
            else:
                print("No valid ICD versions found.")
                return None, None

def insert_icd_data_to_db(df):
    """Insert data from the DataFrame into the PostgreSQL database."""
    try:
        conn = psycopg2.connect(**db_params)
        cursor = conn.cursor()

        create_table_query = """
        CREATE TABLE IF NOT EXISTS icd_codes_raw (
            id SERIAL PRIMARY KEY,
            code VARCHAR(10),
            short_description TEXT,
            long_description TEXT,
            nf_excl TEXT
        );
        """
        cursor.execute(create_table_query)
        conn.commit()

        insert_query = """
        INSERT INTO icd_codes_raw (code, short_description, long_description, nf_excl)
        VALUES (%s, %s, %s, %s);
        """
        for _, row in df.iterrows():
            cursor.execute(insert_query, (
                row['CODE'],
                row['SHORT DESCRIPTION (VALID ICD-10 FY2024)'],
                row['LONG DESCRIPTION (VALID ICD-10 FY2024)'],
                row['NF EXCL']
            ))

        conn.commit()
        print("ICD data saved to database.")

    except psycopg2.DatabaseError as db_error:
        print(f"Database error: {db_error}")
    finally:
        if conn:
            cursor.close()
            conn.close()

def download_and_process_icd_file(icd_url):
    """Download the ICD file from the provided URL and insert data into the database."""
    try:
        response = requests.get(icd_url)
        if response.status_code == 200:
            # Read the Excel file into a DataFrame
            df = pd.read_excel(icd_url)

            # Debug: Print the first few rows of the DataFrame
            print("DataFrame contents:")
            print(df.head())

            # Insert the data into the database
            insert_icd_data_to_db(df)
        else:
            print(f"Failed to download the file. Status code: {response.status_code}")

    except requests.exceptions.RequestException as e:
        print(f"An error occurred: {e}")
    except Exception as e:
        print(f"Failed to process the Excel file: {e}")

def extract_icd_data():
    """Function to extract ICD data and save it to PostgreSQL."""
    # Get the latest ICD version URL
    latest_version, latest_icd_url = get_latest_icd_version()

    # If a valid URL was found, download the ICD file and save metadata to the PostgreSQL database
    if latest_icd_url:
        download_and_process_icd_file(latest_icd_url)

if __name__ == "__main__":
    extract_icd_data()

In [None]:
import requests
import zipfile
import os
from io import BytesIO
from datetime import datetime
import pandas as pd
from sqlalchemy import create_engine

# PostgreSQL connection parameters
db_params = {
    'host': 'localhost',
    'dbname': 'postgres',
    'user': 'postgres',
    'password': 'postgres',
    'port': '5432',
    'options': '-c search_path=private'
}

def get_quarter_and_abbr(month):
    """Determine the quarter and its corresponding abbreviation based on the month."""
    if month in [1, 2, 3]:
        return "january", "JAN"
    elif month in [4, 5, 6]:
        return "april", "APR"
    elif month in [7, 8, 9]:
        return "july", "JUL"
    else:
        return "october", "OCT"

def get_quarterly_zip_url(year=None, month=None):
    """Construct the quarterly ZIP URL based on the current year and quarter."""
    now = datetime.now()
    year = year or now.year
    month = month or now.month

    quarter, _ = get_quarter_and_abbr(month)

    zip_url = f"https://www.cms.gov/files/zip/{quarter}-{year}-alpha-numeric-hcpcs-file.zip"
    return zip_url

def list_files_in_zip(zip_url):
    """Download the ZIP file and list all files inside."""
    try:
        response = requests.get(zip_url, stream=True)

        if response.status_code == 200:
            zip_file = BytesIO(response.content)

            with zipfile.ZipFile(zip_file, 'r') as z:
                print("Files in the ZIP archive:")
                for file in z.namelist():
                    print(file)
                return z.namelist()
        else:
            print(f"Failed to download the file. Status code: {response.status_code}")
            return []

    except requests.exceptions.RequestException as e:
        print(f"An error occurred during the request: {e}")
        return []
    except zipfile.BadZipFile:
        print("The downloaded file is not a valid ZIP file.")
        return []

def download_and_save_to_postgres(zip_url, filename_prefix):
    """Download the ZIP file, extract the specific NOC file, and save the data to PostgreSQL."""
    try:
        response = requests.get(zip_url, stream=True)

        if response.status_code == 200:
            zip_file = BytesIO(response.content)

            with zipfile.ZipFile(zip_file, 'r') as z:
                file_to_extract = None
                for file in z.namelist():
                    if file.startswith(filename_prefix) and (file.endswith('.xls') or file.endswith('.xlsx')):
                        file_to_extract = file
                        break

                if file_to_extract:
                    print(f"Extracting '{file_to_extract}' and saving data to PostgreSQL...")

                    with z.open(file_to_extract) as extracted_file:
                        df = pd.read_excel(extracted_file)

                    engine = create_engine(
                        f'postgresql://{db_params["user"]}:{db_params["password"]}@{db_params["host"]}:{db_params["port"]}/{db_params["dbname"]}?options={db_params["options"]}'
                    )

                    df.to_sql('noc_codes_raw', con=engine, if_exists='replace', index=False)

                    print(f"'{file_to_extract}' has been saved successfully to the PostgreSQL database.")

                else:
                    print(f"'{filename_prefix}' not found in the ZIP file with either .xls or .xlsx extension.")
        else:
            print(f"Failed to download the file. Status code: {response.status_code}")

    except requests.exceptions.RequestException as e:
        print(f"An error occurred during the request: {e}")
    except zipfile.BadZipFile:
        print("The downloaded file is not a valid ZIP file.")
    except Exception as e:
        print(f"An error occurred: {e}")

def get_noc_codes_filename_prefix(year, quarter_abbr):
    """Construct the expected NOC codes file prefix with a space between the month and year."""
    return f"NOC codes_{quarter_abbr} {year}"

def extract_hcpcs_data():
    """Function to extract HCPCS data and save it to PostgreSQL."""
    now = datetime.now()
    year = now.year
    month = now.month

    quarter, quarter_abbr = get_quarter_and_abbr(month)
    zip_url = get_quarterly_zip_url(year, month)

    zip_file_list = list_files_in_zip(zip_url)
    noc_filename_prefix = get_noc_codes_filename_prefix(year, quarter_abbr)

    download_and_save_to_postgres(zip_url, noc_filename_prefix)

if __name__ == "__main__":
    extract_hcpcs_data()