In [1]:
import requests
import pandas as pd
import time
import yaml

from typing import Optional
from typing import Dict

In [2]:
#ingest 200 NPIs per State:Cityto figure out how many can possibly service dementia patients (essentially a lead list)
#I did one major city in every state

In [3]:
with open("state_city.yml", "r") as file:
    data = yaml.safe_load(file)

state_city_request_dict = data['state_city_request_dict']

In [4]:
error_list = []

def get_flatten_payload(state_in: str, city_in: str) -> Optional[pd.DataFrame]:
    """
    Fetches NPI registry data for a given state and city, and returns it as a DataFrame. The api requires at least two inputs. 

    Args:
        state_in (str): The 2-letter abbreviation of the state (e.g., 'CA' for California).
        city_in (str): The name of the city, with spaces replaced by '+'. Api Requirement.

    Returns:
        pd.DataFrame: A DataFrame containing the results from the NPI registry API if the request is successful.
        None: If an error occurs during the request or the response is invalid.
    
    """
    
    url = f'https://npiregistry.cms.hhs.gov/api/?number=&enumeration_type=&taxonomy_description=&first_name=&last_name=&organization_name=&address_purpose=&city={city_in}&state={state_in}&postal_code=&country_code=&limit=200&skip=&version=2.1'
    
    try:
        r = requests.get(url)
        r.raise_for_status()  
        
        data = r.json()
        return pd.DataFrame(data['results'])
    
    except requests.exceptions.RequestException as e:
        error_message = str(e)
        error_list.append({"STATE": state_in, "CITY": city_in, "ERROR": error_message})
        return None

In [5]:
def execute_data_collection(state_city_dict: Dict[str, str]) -> None:
    """
    Collects data for multiple state-city pairs, combines the results into a single DataFrame, 
    and appends the data to CSV files. Logs any errors encountered during the data collection.

    Args:
        state_city_dict (Dict[str, str]): A dictionary where keys are state abbreviations (e.g., 'CA') 
                                          and values are city names (e.g., 'Los+Angeles').

    Returns:
        None: If error, return nothing. Append to a csv for now, the csv is considered the 'table' in a warehosue
    """
    data_frames = []
    
    for state, city in state_city_dict.items():
        df = get_flatten_payload(state, city)
        if df is not None:
            data_frames.append(df)
    
    if data_frames:
        combined_df = pd.concat(data_frames, ignore_index=True)
        combined_df['ETLRUNTIME'] = pd.Timestamp.now().date()  # important for incremental building
        combined_df.to_csv('bronze/nppes_extract_json_columns.csv', mode='a', header=not pd.io.common.file_exists('bronze/nppes_extract_json_columns.csv'), index=False)
        print("Appended data to nppes_extract_json_columns.csv")
    
    if error_list:
        error_df = pd.DataFrame(error_list)
        error_df.to_csv('bronze/erorr_log_manual.csv', mode='a', header=not pd.io.common.file_exists('bronze/ERROR_LOG_MANUAL.csv'), index=False)
        print("Appended errors to ERROR_LOG_MANUAL.csv")


In [6]:
for state, city in state_city_request_dict.items():
    execute_data_collection({state: city}) 
    
    print(f"{state} completed, waiting till 61 seconds later for next state to process")
    time.sleep(61)  # Sleep for 61 seconds (1 minute is the requirement, but just to make sure i dont get blocked from the nppes)
                    # They have a 200 requests per minute limit

Appended data to nppes_extract_json_columns.csv
AL completed, waiting till 61 seconds later for next state to process
