# FDA NTC ETL Pipeline
- API website: https://open.fda.gov/data/downloads/
- Objective: Collect drug information from the FDA NDC (National Drug Code Directory) API

In [1]:
# Requesting the API and getting a status code
import requests
response = requests.get("https://api.fda.gov/drug/ndc.json")
print(response.status_code)

200


In [None]:
import json
import time

limit = 100 # i got this number by testing on Postman
all_results = []

# get the total number of results from the request
if response.status_code == 200:
    data = response.json()
    total_results = data['meta']['results']['total']

    # print out the count
    print(f"Total results: {total_results}")

else:
    print('Failed to fetch data', response.status_code)



# loop through all the data and add to it the 'all_results' open list 
for x in range(0,total_results,limit):
    if x >= total_results:
        break  # this is to prevent the issue of skipping past the results


    site_map = requests.get(f"https://api.fda.gov/drug/ndc.json?limit={limit}&skip={x}")

    if site_map.status_code == 200:
        data = site_map.json()
        if 'results' in data:
            all_results.extend(data['results'])

    else:
        print(f'failed to get data starting at {x} with a status code of {site_map.status_code}')

    # short delay to avoid rate limit
    time.sleep(1) 

# print out the count
print(f"Final total results: {len(all_results)}")

Total results: 128775
failed to get data starting at 25100 with a status code of 400
failed to get data starting at 25200 with a status code of 400
failed to get data starting at 25300 with a status code of 400
failed to get data starting at 25400 with a status code of 400
failed to get data starting at 25500 with a status code of 400
failed to get data starting at 25600 with a status code of 400
failed to get data starting at 25700 with a status code of 400
failed to get data starting at 25800 with a status code of 400
failed to get data starting at 25900 with a status code of 400
failed to get data starting at 26000 with a status code of 400
failed to get data starting at 26100 with a status code of 400
failed to get data starting at 26200 with a status code of 400
failed to get data starting at 26300 with a status code of 400
failed to get data starting at 26400 with a status code of 400
failed to get data starting at 26500 with a status code of 400
failed to get data starting at 26

KeyboardInterrupt: 

In [10]:
# testing purposes
print("First 2 items fetched:")
for x in all_results[:2]:
    print(x)

First 2 items fetched:
{'product_ndc': '82429-126', 'generic_name': 'SULFACETAMIDE SODIUM, SULFUR', 'labeler_name': 'Gabar Health Sciences Corp.', 'brand_name': 'Sodium Sulfacetamide 8% and Sulfur 4% Cleanser', 'active_ingredients': [{'name': 'SULFACETAMIDE SODIUM', 'strength': '80 mg/g'}, {'name': 'SULFUR', 'strength': '40 mg/g'}], 'finished': True, 'packaging': [{'package_ndc': '82429-126-16', 'description': '454 g in 1 BOTTLE, PLASTIC (82429-126-16)', 'marketing_start_date': '20230405', 'sample': False}], 'listing_expiration_date': '20251231', 'openfda': {'manufacturer_name': ['Gabar Health Sciences Corp.'], 'rxcui': ['1010234'], 'spl_set_id': ['c9227154-edd4-4787-944b-4cb5ed901f05'], 'is_original_packager': [True], 'upc': ['0382429126166'], 'unii': ['4NRT660KJQ', '70FD1KFU70']}, 'marketing_category': 'UNAPPROVED DRUG OTHER', 'dosage_form': 'LIQUID', 'spl_id': '08368bc6-272c-a9f2-e063-6394a90a3ab9', 'product_type': 'HUMAN PRESCRIPTION DRUG', 'route': ['TOPICAL'], 'marketing_start_da

In [None]:
import psycopg2
from dotenv import load_dotenv
import os

# load credentials
load_dotenv()
db_user = os.getenv('db_user')
db_password = os.getenv('db_password')

# connect to the postgres database
conn = psycopg2.connect(
    host = 'localhost',
    database = 'postgres',
    user = db_user,
    password = db_password)

cursor = conn.cursor()

In [None]:
# reconnect to the access the new database
conn = psycopg2.connect(
    host = 'localhost',
    database = 'Postgres 16 - Localhost - FDA-NDC-ETL_db',
    user = db_user,
    password = db_password)

cursor = conn.cursor()

In [None]:
# create the table
cursor.execute(
'''
CREATE TABLE IF NOT EXISTS FDA_Drugs_db (
    product_id VARCHAR PRIMARY KEY,
    product_ndc VARCHAR,
    generic_name TEXT,
    labeler_name TEXT,
    brand_name TEXT,
    active_ingredients VARCHAR,  -- This needs to be serialized
    finished BOOLEAN,
    packaging VARCHAR,  -- This needs to be serialized
    listing_expiration_date DATE,
    openfda VARCHAR,
    marketing_category VARCHAR,
    dosage_form VARCHAR,
    spl_id VARCHAR,
    product_type VARCHAR,
    route TEXT[],
    marketing_start_date DATE,
    brand_name_base VARCHAR,
    pharm_class VARCHAR  -- This needs to be serialized
                                        );
'''
)
# Commit
conn.commit()

In [None]:
# Print each items in 'results' as a list for easy viewing and editing
for result in data['results']:
    print("[", end="")
    print(*result, sep=", ", end="")
    print("]")

# Using a manual approach for data extraction and handling to break out each item for SQL insertion instead of relying on pd.json_normalize
for result in data['results']:
    product_ndc = result['product_ndc']
    generic_name = result['generic_name']
    labeler_name = result['labeler_name']
    brand_name = result['brand_name']
    # active_ingredients = result['active_ingredients']  # this is a dictionary
    finished = result['finished']
    # packaging = result['packaging']  - this is a dictionary
    listing_expiration_date = result['listing_expiration_date']
    # openfda = result['openfda'] - this is a dictionary
    marketing_category = result['marketing_category']
    dosage_form = result['dosage_form']
    spl_id = result['spl_id']
    product_type = result['product_type']
    route = result['route']
    marketing_start_date = result['marketing_start_date']
    product_id = result['product_id']
    brand_name_base = result['brand_name_base']
    # pharm_class = result['pharm_class'] - this is a dictionary

In [None]:
# Add data to the table
cursor.execute(
    '''
    INSERT INTO FDA_Drugs_db (
    "product_ndc",
    "generic_name",
    "labeler_name",
    "brand_name",
    "finished",
    "listing_expiration_date",
    "marketing_category",
    "dosage_form",
    "spl_id",
    "product_type",
    "route",
    "marketing_start_date",
    "product_id",
    "brand_name_base"
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT ("product_id") DO NOTHING
    ''', 
    (
        product_ndc,
        generic_name,
        labeler_name,
        brand_name,
        finished,
        listing_expiration_date,
        marketing_category,
        dosage_form,
        spl_id,
        product_type,
        route,
        marketing_start_date,
        product_id,
        brand_name_base
    )
)

# Commit the transaction
conn.commit()

In [None]:
# add data to the table
sql_query = '''
            SELECT * FROM FDA_Drugs_db
            '''
    
# Execute the query
cursor.execute(sql_query)
# Fetch all the results
results = cursor.fetchall()

In [None]:
import pandas as pd

# Add column names to dataframe
column_names = [x[0] for x in cursor.description]
df = pd.DataFrame(results, columns=column_names)

# See results
df