In [None]:
import pandas as pd
import requests
import xmltodict
import uuid

BASE_URL = 'https://www.ecfr.gov'
TITLES_PATH = '/api/versioner/v1/titles.json'
REQUEST_DATE = '2025-02-10'

In [None]:
def get_content_dict(title):
    url = f'{BASE_URL}/api/versioner/v1/full/{REQUEST_DATE}/title-{title}.xml'
    response = requests.get(url)

    if response.status_code != 200:
        raise ValueError(f'Failed to fetch content for title {title}, ')
    
    dict_data = xmltodict.parse(response.text)
    return dict_data


In [None]:
import json


def cache_content_dict(title, content_dict):
    content_json = json.dumps(content_dict)
    with open(f'./data/content-{title}.json', 'w') as f:
        f.write(content_json)

def read_content_dict(title):
    try:
        with open(f'./data/content-{title}.json', 'r') as f:
            content_json = f.read()
            return json.loads(content_json)
    except FileNotFoundError:
        return None

url = BASE_URL + TITLES_PATH
response = requests.get(url)

if response.status_code != 200:
    raise RuntimeError(f'Failed to get titles: {response.status_code}')

parsed = response.json()

content_by_title = {}

for title_obj in parsed['titles']:
    title_id = title_obj['number']
    print(f'Fetching content for title {title_obj["number"]}')
    if title_id == 35 or title_id == 7:
        continue  # skip title 35 b/c it doesn't exist, skip 7 b/c the API doesn't respond well to 7
    content_dict = read_content_dict(title_id)

    if content_dict is None:
        content_dict = get_content_dict(title_id)
        cache_content_dict(title_id, content_dict)

    content_by_title[title_id] = content_dict['ECFR']

print(f"Key in content_by_title: {content_by_title.keys()}")
    

In [None]:

def traverse_and_extract(data, extracted):
    if isinstance(data, dict):
        obj_id = str(uuid.uuid4())  # Generate a unique ID
        new_dict = {"id": obj_id, **{k: v for k, v in data.items() if not str(k).startswith("DIV")}}  # Exclude "DIV" fields
        
        # Handle different levels of DIV fields
        for k in list(data.keys()):
            string_k = str(k)
            if string_k.startswith("DIV"):
                if isinstance(data[k], list):
                    new_dict[k] = [traverse_and_extract(item, extracted) for item in data[k]]
                else:
                    new_dict[k] = [traverse_and_extract(data[k], extracted)]
        
        extracted.append(new_dict)
        return obj_id
    
    return data  # If not a dict, return as-is

def transform(parsed_data):
    extracted = []
    root_id = traverse_and_extract(parsed_data, extracted)
    return {"root_id": root_id, "objects": extracted}

def clean_p_tags(data):
    # if its a string, return it
    if isinstance(data, str):
        return data
    #if its a dict, return "#text" value
    if isinstance(data, dict):
        content = ''
        if "#text" in data:
            content += data["#text"]
        if "I" in data and isinstance(data["I"], str):
            content += data["I"]
        return content
    
    # if it is a list, clean_p_tags each item
    if isinstance(data, list):
        return " ".join([clean_p_tags(item) for item in data])

    if not data:
        return ""    
    raise ValueError(f'Unexpected data type: {type(data)}')

def join_p_tags(data):
    if isinstance(data, dict):
        for k in data.keys():
            if k == "P":
                cleaned_p_values = clean_p_tags(data[k])
                if isinstance(cleaned_p_values, str):
                    data[k] = cleaned_p_values
                elif isinstance(cleaned_p_values, dict):
                    data[k] = " ".join(cleaned_p_values)
            else:
                join_p_tags(data[k])
    elif isinstance(data, list):
        for item in data:
            join_p_tags(item)

    return data

def clean_div_names(data):
    for obj in data:
        keys_to_replace = []
        for k in obj.keys():
            if k.startswith("DIV"):
                keys_to_replace.append(k)
        for k in keys_to_replace:
            obj["DIV"] = obj.pop(k)

    return data

def extract_dates(data):
    for obj in data:
        if "Volume" in obj:
            obj["AMMEND_DATE"] = obj["Volume"]["@AMDDATE"]

    return data

all_objects = []
for content in content_by_title.values():
    objects = transform(content)
    print(f'Count of local objects is {len(objects["objects"])}')
    p_tagged = join_p_tags(objects["objects"])
    cleaned_divs = clean_div_names(p_tagged)
    extracted_dates = extract_dates(cleaned_divs)
    all_objects.extend(extracted_dates)

print(f'Count of all objects is {len(all_objects)}')

In [None]:
# prune items
pruned = []
whitelist_columns = ["id", "@TYPE", "@VOLUME", "@N", "HEAD", "P", "DIV", "AMMEND_DATE"]
for obj in all_objects:
    pruned_obj = {k: v for k, v in obj.items() if k in whitelist_columns}
    pruned.append(pruned_obj)

df_items = pd.DataFrame(pruned)
df_items = df_items.rename(columns={"P": "contents", "@N": "n", "@VOLUME": "volume", "@TYPE": "type", "HEAD": "head", "DIV": "children_ids"})

# replace NaN with empty string, except for children_ids, make that an empty list
df_items["contents"] = df_items["contents"].fillna("")
df_items["n"] = df_items["n"].fillna("")
df_items["volume"] = df_items["volume"].fillna("")
df_items["type"] = df_items["type"].fillna("")
df_items["head"] = df_items["head"].fillna("")
df_items["children_ids"] = df_items["children_ids"].apply(lambda x: x if isinstance(x, list) else [])

# index on id
df_items = df_items.set_index("id")

df_items

In [None]:
df_items["type"].unique()

In [None]:
# display all where type is TITLE
df_items[df_items["type"] == "TITLE"]

In [None]:
df_items_transformed = df_items.copy()

hierarchy = ["TITLE", "CHAPTER", "SUBCHAP", "PART", "SUBPART", "SECTION"]

# for each item if type TITLE, add it's "n" as "title_id" of its immediate children
def add_title_id(df):
    for index, row in df.iterrows():
        if row["type"] == "TITLE":
            title_id = row["n"]
            children_ids = row["children_ids"]
            for child_id in children_ids:
                df.at[child_id, "title"] = title_id

# for each item of type CHAPTER, add it's "n" as "chapter" of its immediate children and propogate title_id
def add_chapter_id(df):
    for index, row in df.iterrows():
        if row["type"] == "CHAPTER":
            chapter_id = row["n"]
            children_ids = row["children_ids"]
            for child_id in children_ids:
                df.at[child_id, "chapter"] = chapter_id
                df.at[child_id, "title"] = row["title"]

def add_subchapter_id(df):
    for index, row in df.iterrows():
        if row["type"] == "SUBCHAP":
            subchapter_id = row["n"]
            children_ids = row["children_ids"]
            for child_id in children_ids:
                df.at[child_id, "subchapter"] = subchapter_id
                df.at[child_id, "chapter"] = row["chapter"]
                df.at[child_id, "title"] = row["title"]

def add_part_id(df):
    for index, row in df.iterrows():
        if row["type"] == "PART":
            part_id = row["n"]
            children_ids = row["children_ids"]
            for child_id in children_ids:
                df.at[child_id, "part"] = part_id
                df.at[child_id, "subchapter"] = row.get("subchapter", "")
                df.at[child_id, "chapter"] = row["chapter"]
                df.at[child_id, "title"] = row["title"]

def add_subpart_id(df):
    for index, row in df.iterrows():
        if row["type"] == "SUBPART":
            subpart_id = row["n"]
            children_ids = row["children_ids"]
            for child_id in children_ids:
                df.at[child_id, "subpart"] = subpart_id
                df.at[child_id, "part"] = row["part"]
                df.at[child_id, "subchapter"] = row.get("subchapter", "")
                df.at[child_id, "chapter"] = row["chapter"]
                df.at[child_id, "title"] = row["title"]

add_title_id(df_items_transformed)
add_chapter_id(df_items_transformed)
add_subchapter_id(df_items_transformed)
add_part_id(df_items_transformed)
add_subpart_id(df_items_transformed)

# display all where type is SECTION
df_content_sections = df_items_transformed[df_items_transformed["type"] == "SECTION"]
df_content_sections

In [None]:
df_content_sections.to_csv("data/ecfr_contents.csv")