In [0]:
from datetime import date
import requests
from pyspark.sql import SparkSession

# Get current date
current_date = date.today()

# Initialize Spark session (Databricks notebooks usually have Spark available by default)
spark = SparkSession.builder.appName("NPI Data").getOrCreate()

# NPI Registry API base URL and query parameters
base_url = "https://npiregistry.cms.hhs.gov/api/"
params = {
    "version": "2.1",
    "state": "CA",
    "city": "Los Angeles",
    "limit": 20,
}

# Make initial API request to get list of NPIs
response = requests.get(base_url, params=params)

if response.status_code == 200:
    npi_data = response.json()
    npi_list = [result["number"] for result in npi_data.get("results", [])]

    detailed_results = []

    for npi in npi_list:
        detail_params = {"version": "2.1", "number": npi}
        detail_response = requests.get(base_url, params=detail_params)

        if detail_response.status_code == 200:
            detail_data = detail_response.json()
            for result in detail_data.get("results", []):
                basic_info = result.get("basic", {})
                enumeration_type = result.get("enumeration_type", "")

                if enumeration_type == "NPI-1":
                    fname = basic_info.get("first_name", "")
                    lname = basic_info.get("last_name", "")
                else:
                    fname = basic_info.get("authorized_official_first_name", "")
                    lname = basic_info.get("authorized_official_last_name", "")

                position = basic_info.get("authorized_official_title_or_position", "")
                organisation = basic_info.get("organization_name", "")
                last_updated = basic_info.get("last_updated", "")

                detailed_results.append({
                    "npi_id": result.get("number"),
                    "first_name": fname,
                    "last_name": lname,
                    "position": position,
                    "organisation_name": organisation,
                    "last_updated": last_updated,
                    "refreshed_at": current_date,
                })

    # Save to Parquet and Delta table if data exists
    if detailed_results:
        df = spark.createDataFrame(detailed_results)
        df.write.format("parquet").mode("overwrite").save("/mnt/bronze/npi_extract/")
        df.write.format("delta").mode("overwrite").saveAsTable("databricksdev.default.npi_extract")
    else:
        print("No detailed results found.")
else:
    print(f"Failed to fetch data: {response.status_code} - {response.text}")


In [0]:
display(df)