In [7]:
import requests
import json
import csv
import string
from utils import api_token

In [5]:
# Function to create a session with the API token
def make_session():
    s = requests.session()
    s.headers.update({"Authorization": f"Token {api_token}"})
    return s

In [6]:
# Function to fetch data from the API endpoint with pagination handling for a limited number of pages
def fetch_data(url, max_pages):
    session = make_session()
    all_data = []
    page_count = 0
    
    while url and page_count < max_pages:
        response = session.get(url)
        
        if response.status_code != 200:
            print(f"Error: {response.status_code}, {response.text}")
            break
        
        data = response.json()
        all_data.extend(data['results']) # append data from the current page to the list

        if 'next' in data and data['next'] is not None: # next is the key for the next page
            url = data['next']
            page_count += 1
        else:
            break
    
    # testing: save data to a json file
    with open('testing_data_fetch.json', 'w') as f:
        json.dump(all_data, f)
    
    return all_data

In [8]:
# Function to process and flatten the data
def process_data(data):
    processed_data = []  # processed data is a list of elements
    for item in data:  # data is the list of "results" from the API response; each item in the list is a dictionary of financial disclosures
        
        # Extract general fields
        base_entry = {
            'person_url': item['person'],
            'person_id': item['person'].rstrip('/').split('/')[-1]  # last element of the person url gives ID
        }

        # Process agreements
        for agreement in item['agreements']:
            entry = base_entry.copy()
            entry.update({
                'type': 'agreement',
                'date_raw': agreement.get('date_raw', None),
                'parties_and_terms': agreement.get('parties_and_terms', None),
                'redacted': agreement.get('redacted', None)
            })
            processed_data.append(entry)

        # Process investments
        for investment in item['investments']:
            entry = base_entry.copy()
            entry.update({
                'type': 'investment',
                'description': investment.get('description', None),
                'redacted': investment.get('redacted', None),
                'income_during_reporting_period_code': investment.get('income_during_reporting_period_code', None),
                'income_during_reporting_period_type': investment.get('income_during_reporting_period_type', None),
                'gross_value_code': investment.get('gross_value_code', None),
                'gross_value_method': investment.get('gross_value_method', None),
                'transaction_during_reporting_period': investment.get('transaction_during_reporting_period', None),
                'transaction_date_raw': investment.get('transaction_date_raw', None),
                'transaction_date': investment.get('transaction_date', None),
                'transaction_value_code': investment.get('transaction_value_code', None),
                'transaction_gain_code': investment.get('transaction_gain_code', None),
                'transaction_partner': investment.get('transaction_partner', None),
                'has_inferred_values': investment.get('has_inferred_values', None)
            })
            processed_data.append(entry)

        # Process non-investment incomes
        for non_investment_income in item['non_investment_incomes']:
            entry = base_entry.copy()
            entry.update({
                'type': 'non_investment_income',
                'date_raw': non_investment_income.get('date_raw', None),
                'source_type': non_investment_income.get('source_type', None),
                'income_amount': non_investment_income.get('income_amount', None),
                'redacted': non_investment_income.get('redacted', None)
            })
            processed_data.append(entry)

        # Process reimbursements
        for reimbursement in item['reimbursements']:
            entry = base_entry.copy()
            entry.update({
                'type': 'reimbursement',
                'source': reimbursement.get('source', None),
                'date_raw': reimbursement.get('date_raw', None),
                'location': reimbursement.get('location', None),
                'purpose': reimbursement.get('purpose', None),
                'items_paid_or_provided': reimbursement.get('items_paid_or_provided', None),
                'redacted': reimbursement.get('redacted', None)
            })
            processed_data.append(entry)

        # Process spouse incomes
        for spouse_income in item['spouse_incomes']:
            entry = base_entry.copy()
            entry.update({
                'type': 'spouse_income',
                'source_type': spouse_income.get('source_type', None),
                'date_raw': spouse_income.get('date_raw', None),
                'redacted': spouse_income.get('redacted', None)
            })
            processed_data.append(entry)

    return processed_data

In [9]:
# Function to save the data to a CSV file
def save_to_csv(data, filename):
    with open(filename, 'w', newline='') as csvfile:
        fieldnames = [
            'person_id', 'person_url', 'type', 'date_raw', 'parties_and_terms', 'redacted', 'description',
            'income_during_reporting_period_code', 'income_during_reporting_period_type', 'gross_value_code',
            'gross_value_method', 'transaction_during_reporting_period', 'transaction_date_raw', 'transaction_date',
            'transaction_value_code', 'transaction_gain_code', 'transaction_partner', 'has_inferred_values', 
            'source_type', 'income_amount', 'source', 'location', 'purpose', 'items_paid_or_provided'
        ]
        
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        
        for row in data:
            # Use dictionary comprehension to handle missing keys by setting them to None
            writer.writerow({field: row.get(field, None) for field in fieldnames})

In [10]:
# Fetch and process data for the first 5 pages
base_url = "https://www.courtlistener.com/api/rest/v4/financial-disclosures/" # updated the API to v4 from v3
all_data = fetch_data(base_url, max_pages=10)

# Process the fetched data
processed_data = process_data(all_data)
print("Processing done!")

Processing done!


Processing 10 pages took 14 seconds

In [11]:
# Save the processed data to a CSV file
save_to_csv(processed_data, 'fin_disc_v1.csv')

# Print confirmation
print(f"Data saved to fin_dis_v1.csv")

Data saved to fin_dis_v1.csv
