In [1]:
!git clone https://github.com/NickCrews/fech-sources
!cd fech-sources && git checkout 781671732c5ffd411d374cdb46401c28ff761507 && cd ..

Cloning into 'fech-sources'...
remote: Enumerating objects: 373, done.[K
remote: Counting objects: 100% (154/154), done.[K
remote: Compressing objects: 100% (91/91), done.[K
remote: Total 373 (delta 72), reused 114 (delta 62), pack-reused 219[K
Receiving objects: 100% (373/373), 194.15 KiB | 1.64 MiB/s, done.
Resolving deltas: 100% (208/208), done.
Note: switching to '781671732c5ffd411d374cdb46401c28ff761507'.

You are in 'detached HEAD' state. You can look around, make experimental
changes and commit them, and you can discard any commits you make in this
state without impacting any branches by switching back to a branch.

If you want to create a new branch to retain commits you create, you may
do so (now or later) by using -c with the switch command. Example:

  git switch -c <new-branch-name>

Or undo this operation with:

  git switch -

Turn off this advice by setting config variable advice.detachedHead to false

HEAD is now at 7816717 Fix F57.csv


In [2]:
from pathlib import Path
import json

import csv
from typing import NamedTuple

from collections import Counter

csvs = Path("fech-sources/").glob("*.csv")
csvs = list(csvs)
print(f"Found {len(csvs)} CSVs")
csvs[:5]

Found 58 CSVs


[PosixPath('fech-sources/F3Z2.csv'),
 PosixPath('fech-sources/F76.csv'),
 PosixPath('fech-sources/F3Z1.csv'),
 PosixPath('fech-sources/F99.csv'),
 PosixPath('fech-sources/TEXT.csv')]

In [3]:
class Field(NamedTuple):
    # The canonical name of the field
    canonical_name: str
    # The raw name of the field in the CSV
    name_raw: str


VersionPattern = str
Schema = list[Field]


def make_schema_lookup(p: Path) -> dict[VersionPattern, Schema]:
    """Load a CSV into a lookup table form version -> schema."""
    p = Path(p)
    r = csv.reader(open(p))

    # See for an example https://github.com/dwillis/fech-sources/blob/master/F3.csv
    # The file looks like:
    #
    # canonical,^8.4|8.3|8.2|8.1|8.0|7.0|6.4,,^6.3|6.2|6.1,,^5.3|5.2|5.1|5.0,,^3.0,
    # form_type,1,FORM TYPE,1,FORM TYPE,1,FORM TYPE,1,FORM TYPE
    # filer_committee_id_number,2,FILER COMMITTEE ID NUMBER,2,FILER COMMITTEE ID NUMBER,2,FILER COMMITTEE ID NUMBER,2,FILER COMMITTEE ID NUMBER
    # committee_name,3,COMMITTEE NAME,3,COMMITTEE NAME,3,COMMITTEE NAME,3,COMMITTEE NAME
    # change_of_address,4,CHANGE OF ADDRESS,4,CHANGE OF ADDRESS,9,CHG OF ADDRESS,9,CHG OF ADDRESS
    #
    # Where the first columns is the canonical name of the field, and the rest are
    # regexes that match the filing versions
    first_row = next(r)
    # Every other column, starting with the second, is a regex
    patterns = first_row[1::2]

    pattern_to_fields = {p: [] for p in patterns}
    for i, row in enumerate(r):
        # First column is the canonical name for that field
        # Then the rest of the columns are pairs, one for each pattern:
        # The first element of each pair is the column position in the source file,
        # and the second is name of the field for that version of the form.
        canonical_name = row[0]
        positions = row[1::2]
        names = row[2::2]
        if len(positions) != len(patterns):
            raise ValueError(
                f"Expected {len(patterns)} positions, but found {positions} in row {i}"
            )
        if len(names) != len(patterns):
            raise ValueError(
                f"Expected {len(patterns)} names, but found {names} in row {i}"
            )
        for pattern, pos, name in zip(patterns, positions, names, strict=True):
            if not pos:
                # This field doesn't exist in this version of the form
                continue
            try:
                p = int(pos)
            except ValueError:
                raise ValueError(f"Expected a position, but found {pos} in row {i}")
            pattern_to_fields[pattern].append((p, canonical_name.strip(), name.strip()))

    result = {}
    for pattern, fields in pattern_to_fields.items():
        try:
            s = _sort_fields(fields)
        except ValueError as e:
            raise ValueError(f"Error in fields for pattern {pattern}") from e
        result[pattern] = s
    return result


def _sort_fields(fields: list[tuple]):
    s = sorted(fields, key=lambda x: x[0])
    positions = [pos for pos, canonical_name, name in s]
    # expected = list(range(len(positions)))
    # if positions != expected:
    #     print(s)
    #     print(positions)
    #     print(expected)
    #     print(set(positions) - set(expected))
    #     print(set(expected) - set(positions))
    #     raise ValueError("Positions are not sequential")
    c = Counter(positions)
    for pos, count in c.items():
        if count > 1:
            raise ValueError(f"Position {pos} has {count} appearances")

    s = [(canonical_name, name) for pos, canonical_name, name in s]
    return s

In [4]:
form_name_to_schemas = {}
for i, p in enumerate(csvs):
    print(f"Loading {i}/{len(csvs)}")
    form_name = p.stem
    try:
        lookup = make_schema_lookup(p)
    except ValueError as e:
        raise ValueError(f"Error parsing {p}") from e
    form_name_to_schemas[form_name] = lookup

form_name_to_schemas

Loading 0/58
Loading 1/58
Loading 2/58
Loading 3/58
Loading 4/58
Loading 5/58
Loading 6/58
Loading 7/58
Loading 8/58
Loading 9/58
Loading 10/58
Loading 11/58
Loading 12/58
Loading 13/58
Loading 14/58
Loading 15/58
Loading 16/58
Loading 17/58
Loading 18/58
Loading 19/58
Loading 20/58
Loading 21/58
Loading 22/58
Loading 23/58
Loading 24/58
Loading 25/58
Loading 26/58
Loading 27/58
Loading 28/58
Loading 29/58
Loading 30/58
Loading 31/58
Loading 32/58
Loading 33/58
Loading 34/58
Loading 35/58
Loading 36/58
Loading 37/58
Loading 38/58
Loading 39/58
Loading 40/58
Loading 41/58
Loading 42/58
Loading 43/58
Loading 44/58
Loading 45/58
Loading 46/58
Loading 47/58
Loading 48/58
Loading 49/58
Loading 50/58
Loading 51/58
Loading 52/58
Loading 53/58
Loading 54/58
Loading 55/58
Loading 56/58
Loading 57/58


{'F3Z2': {'^8.4|8.3|8.2': [('form_type', 'FORM TYPE'),
   ('filer_committee_id_number', 'FILER COMMITTEE ID NUMBER'),
   ('principal_committee_name', 'PRINCIPAL COMMITTEE NAME'),
   ('coverage_from_date', 'COVERAGE FROM DATE'),
   ('coverage_through_date', 'COVERAGE THROUGH DATE'),
   ('col_a_net_contributions', '6(c) Net Contributions'),
   ('col_a_net_operating_expenditures', '7(c) Net Operating Expenditures'),
   ('col_a_debts_to', '9 Debts and Obligations Owed TO the Committee'),
   ('col_a_debts_by', '10 Debts and Obligations Owed BY the Committee'),
   ('col_a_individual_contributions',
    '11(a) Contributions from Individuals/Persons Other Than Political Committees'),
   ('col_a_political_party_contributions',
    '11(b) Contributions from Political Party Committees'),
   ('col_a_pac_contributions',
    '11(c) Contributions from Other Political Committees'),
   ('col_a_candidate_contributions', '11(d) Contributions from the Candidate'),
   ('col_a_total_contributions', '11(e) T

In [6]:
json.dump(form_name_to_schemas, open("form_schemas.json", "w"), indent=2)

In [None]:
# Now generate a rust file with the schemas
# in a lookup table

rust_path = Path("../crates/feco3/src/schemas.rs")