# NIH RePORTER API v2 - Exploration Notebook

This notebook documents the exploration of the NIH RePORTER API v2 to understand the complete response schema and design the data pipeline.

## Objectives

1. Test API connectivity and authentication
2. Discover all available fields in the response
3. Understand nested structures and data types
4. Test pagination and rate limiting
5. Document field mappings for pipeline design

In [None]:
import requests
import json
import pandas as pd
from typing import Dict, Any, List
import time
from datetime import datetime

## 1. API Configuration

In [None]:
# API Configuration
API_BASE_URL = 'https://api.reporter.nih.gov'
API_ENDPOINT = '/v2/projects/search'
RATE_LIMIT_DELAY = 1.0  # seconds between requests

## 2. Basic API Test - Small Query

In [None]:
# Test with a very small query to get complete field structure
test_payload = {
    'criteria': {
        'fiscal_years': [2024],
        'agencies': ['NCI']  # Narrow down to get faster response
    },
    'offset': 0,
    'limit': 5  # Just 5 records to see structure
}

response = requests.post(
    f'{API_BASE_URL}{API_ENDPOINT}',
    json=test_payload,
    headers={'Content-Type': 'application/json'}
)

print(f"Status Code: {response.status_code}")
data = response.json()
print(f"Total available records: {data['meta']['total']}")
print(f"Records returned: {len(data['results'])}")

## 3. Inspect Complete Response Structure

In [None]:
# Look at first record to see all fields
if data['results']:
    sample_record = data['results'][0]
    print("Complete field structure:")
    print(json.dumps(sample_record, indent=2))

## 4. Extract All Top-Level Field Names

In [None]:
def get_all_fields(records: List[Dict]) -> Dict[str, type]:
    """
    Extract all unique field names and their types from a list of records.
    """
    all_fields = {}
    
    for record in records:
        for key, value in record.items():
            if key not in all_fields:
                all_fields[key] = type(value).__name__
    
    return all_fields

field_types = get_all_fields(data['results'])
print("\nAll top-level fields found:")
for field, dtype in sorted(field_types.items()):
    print(f"  {field}: {dtype}")

## 5. Inspect Nested Structures

In [None]:
# Organization structure
if 'organization' in sample_record:
    print("Organization fields:")
    print(json.dumps(sample_record['organization'], indent=2))

# Principal Investigators
if 'principal_investigators' in sample_record:
    print("\nPrincipal Investigators structure:")
    if sample_record['principal_investigators']:
        print(json.dumps(sample_record['principal_investigators'][0], indent=2))

# Program Officers
if 'program_officers' in sample_record:
    print("\nProgram Officers structure:")
    if sample_record['program_officers']:
        print(json.dumps(sample_record['program_officers'][0], indent=2))

# Study Section
if 'full_study_section' in sample_record:
    print("\nStudy Section structure:")
    print(json.dumps(sample_record['full_study_section'], indent=2))

# Agency IC Admin
if 'agency_ic_admin' in sample_record:
    print("\nAgency IC Admin structure:")
    print(json.dumps(sample_record['agency_ic_admin'], indent=2))

# Publications
if 'publications' in sample_record:
    print("\nPublications structure:")
    if sample_record['publications']:
        print(json.dumps(sample_record['publications'][0], indent=2))

## 6. Test Pagination

In [None]:
# Test pagination with different offsets
def test_pagination(fiscal_year: int, limit: int = 10):
    """
    Test pagination by fetching multiple pages.
    """
    results = []
    total = None
    
    for offset in [0, 10, 20]:
        payload = {
            'criteria': {'fiscal_years': [fiscal_year]},
            'offset': offset,
            'limit': limit
        }
        
        response = requests.post(
            f'{API_BASE_URL}{API_ENDPOINT}',
            json=payload,
            headers={'Content-Type': 'application/json'}
        )
        
        data = response.json()
        if total is None:
            total = data['meta']['total']
        
        print(f"Offset {offset}: Retrieved {len(data['results'])} records")
        results.extend(data['results'])
        
        # Rate limiting
        time.sleep(RATE_LIMIT_DELAY)
    
    print(f"\nTotal available: {total}")
    print(f"Total retrieved: {len(results)}")
    return results

pagination_test = test_pagination(2024, limit=10)

## 7. Test Different Query Criteria

In [None]:
# Test with different criteria to see field variations
test_queries = [
    {
        'name': 'By IC',
        'criteria': {
            'fiscal_years': [2024],
            'agencies': ['NCI']
        }
    },
    {
        'name': 'By Organization',
        'criteria': {
            'fiscal_years': [2024],
            'org_names': ['HARVARD']
        }
    },
    {
        'name': 'With Publications',
        'criteria': {
            'fiscal_years': [2024],
            'include_active_projects': True
        }
    }
]

for query in test_queries:
    payload = {
        'criteria': query['criteria'],
        'offset': 0,
        'limit': 5
    }
    
    response = requests.post(
        f'{API_BASE_URL}{API_ENDPOINT}',
        json=payload,
        headers={'Content-Type': 'application/json'}
    )
    
    data = response.json()
    print(f"\n{query['name']}: {data['meta']['total']} total records")
    
    time.sleep(RATE_LIMIT_DELAY)

## 8. Field Coverage Analysis

In [None]:
# Analyze field coverage across multiple records
def analyze_field_coverage(records: List[Dict]) -> pd.DataFrame:
    """
    Analyze which fields are populated across records.
    """
    field_stats = {}
    total_records = len(records)
    
    for record in records:
        for key, value in record.items():
            if key not in field_stats:
                field_stats[key] = {
                    'count': 0,
                    'null_count': 0,
                    'type': type(value).__name__
                }
            
            if value is not None and value != '' and value != []:
                field_stats[key]['count'] += 1
            else:
                field_stats[key]['null_count'] += 1
    
    # Convert to DataFrame
    df = pd.DataFrame.from_dict(field_stats, orient='index')
    df['coverage_%'] = (df['count'] / total_records * 100).round(2)
    df = df.sort_values('coverage_%', ascending=False)
    
    return df

# Fetch more records for better coverage analysis
large_payload = {
    'criteria': {'fiscal_years': [2024]},
    'offset': 0,
    'limit': 100
}

response = requests.post(
    f'{API_BASE_URL}{API_ENDPOINT}',
    json=large_payload,
    headers={'Content-Type': 'application/json'}
)

large_data = response.json()
coverage_df = analyze_field_coverage(large_data['results'])

print("Field Coverage Analysis (100 records):")
print(coverage_df)

## 9. Generate Field Mapping Documentation

In [None]:
def generate_field_documentation(records: List[Dict]) -> str:
    """
    Generate markdown documentation of all fields.
    """
    all_fields = {}
    
    for record in records:
        def extract_fields(obj, prefix=''):
            if isinstance(obj, dict):
                for key, value in obj.items():
                    field_name = f"{prefix}.{key}" if prefix else key
                    all_fields[field_name] = type(value).__name__
                    
                    if isinstance(value, dict):
                        extract_fields(value, field_name)
                    elif isinstance(value, list) and value and isinstance(value[0], dict):
                        extract_fields(value[0], f"{field_name}[]")
        
        extract_fields(record)
    
    # Generate markdown
    md = "# NIH RePORTER API Fields\n\n"
    md += "| Field Name | Data Type |\n"
    md += "|------------|-----------|\n"
    
    for field, dtype in sorted(all_fields.items()):
        md += f"| `{field}` | {dtype} |\n"
    
    return md

field_docs = generate_field_documentation(large_data['results'])
print(field_docs)

## 10. Test Rate Limiting Behavior

In [None]:
# Test rapid requests to understand rate limiting
def test_rate_limiting(num_requests: int = 5):
    """
    Test API rate limiting behavior.
    """
    payload = {
        'criteria': {'fiscal_years': [2024]},
        'offset': 0,
        'limit': 1
    }
    
    results = []
    
    for i in range(num_requests):
        start_time = time.time()
        
        response = requests.post(
            f'{API_BASE_URL}{API_ENDPOINT}',
            json=payload,
            headers={'Content-Type': 'application/json'}
        )
        
        elapsed = time.time() - start_time
        
        results.append({
            'request': i + 1,
            'status_code': response.status_code,
            'elapsed_time': f"{elapsed:.3f}s"
        })
        
        print(f"Request {i+1}: Status {response.status_code}, Time {elapsed:.3f}s")
        
        # Wait before next request
        time.sleep(RATE_LIMIT_DELAY)
    
    return pd.DataFrame(results)

# Uncomment to test (will make multiple API calls)
# rate_limit_df = test_rate_limiting(5)
# print(rate_limit_df)

## 11. Summary and Recommendations

Based on this exploration:

### Key Findings

1. **API Response Structure**: 
   - Meta object with total count
   - Results array with project records
   - Consistent JSON structure

2. **Pagination**:
   - Max 500 records per request
   - Max offset of 14,999
   - Need to refine queries for >15,000 records

3. **Rate Limiting**:
   - Recommended 1 second delay between requests
   - Returns 429 if exceeded

4. **Field Coverage**:
   - 100+ unique fields across nested structures
   - Some fields have sparse data (publications, clinical trials)
   - Array fields require special handling

### Pipeline Design Recommendations

1. **Bronze Layer**: Store complete raw JSON to ensure no data loss
2. **Silver Layer**: Create normalized tables for:
   - dim_projects
   - dim_organizations
   - dim_personnel
   - fact_funding
   - bridge_publications
   - bridge_clinical_trials
3. **Incremental Processing**: Use fiscal_year partitioning
4. **Error Handling**: Implement retry logic for rate limits
5. **Monitoring**: Track API response times and data volumes