In [0]:
import requests
from datetime import datetime
from time import sleep

In [0]:
# PARAMS
base_url = "https://npiregistry.cms.hhs.gov/api/"
current_date = datetime.utcnow().isoformat()

params = {
    "version": "2.1",
    "state": "CA",
    "city": "Los Angeles",
    "limit": 20
}

In [0]:
# Call NPI API
response = requests.get(base_url, params=params)

if response.status_code == 200:
    npi_data = response.json()
    npi_list = [result.get("number") for result in npi_data.get("results", [])]

    detailed_results = []

    for npi in npi_list:
        detail_params = {"version": "2.1", "number": npi}
        detail_response = requests.get(base_url, params=detail_params)

        if detail_response.status_code == 200:
            detail_data = detail_response.json()

            for result in detail_data.get("results", []):
                npi_number = result.get("number")
                basic_info = result.get("basic", {})
                enumeration_type = result.get("enumeration_type", "")

                if enumeration_type == "NPI-1":
                    fname = basic_info.get("first_name", "")
                    lname = basic_info.get("last_name", "")
                else:
                    fname = basic_info.get("authorized_official_first_name", "")
                    lname = basic_info.get("authorized_official_last_name", "")

                position = basic_info.get("authorized_official_title_or_position", "")
                organisation = basic_info.get("organization_name", "")
                last_updated = basic_info.get("last_updated", "")

                # Define Schema
                detailed_results.append({
                    "npi_id": npi_number,
                    "first_name": fname,
                    "last_name": lname,
                    "position": position,
                    "organisation_name": organisation,
                    "last_updated": last_updated,
                    "refreshed_at": current_date,
                })

        else:
            print(f"Failed to fetch details for NPI {npi}")
            sleep(1)
    # Write and save data
    if detailed_results:
        df = spark.createDataFrame(detailed_results)

        adls_path = "abfss://bronze@databricksdevetl.dfs.core.windows.net/npi_extract/"

        # Save as Parquet to external storage
        df.write.format("parquet").mode("overwrite").save(adls_path)

        # Save as Delta table
        df.write.format("delta").mode("overwrite").saveAsTable("databricks_dev.bronze.npi_extract")

        print("Data saved successfully.")
    else:
        print("No detailed results found.")
else:
    print(f"Initial fetch failed: {response.status_code} - {response.text}")

In [0]:
%sql
select * from databricks_dev.bronze.npi_extract;