# This file downloads and loads data for irs-filings

**Post Description:** Download and process IRS data on political organizations (ie 527)

**Post Categories:** Data Engineering | IRS



>Credit: [This repo](https://github.com/sahilchinoy/django-irs-filings) by [Sahil Chinoy](https://sahilchinoy.com/) was very helpful in writing this code.  I changed a lot, but left a lot the same.  The data provided by the irs has changed significantly since the original code was written, but it was a huge help anyway.  Several chunks of the code below is taken close to straight from that repo.

https://forms.irs.gov/app/pod/dataDownload/dataDownload

In [3]:
import os, sys, shutil
import csv, io, zipfile, pickle
import logging
import requests
import pandas as pd 
from IPython.display import clear_output
import sqlite3

from fastcore.all import *
from datetime import datetime
from collections import defaultdict

from pathlib import Path
logger = logging.getLogger(name="jupyter")
if len(logger.handlers) == 0: logger.addHandler(logging.StreamHandler(stream=sys.stdout))
logger.setLevel(logging.INFO)

def def_value(): return []

# Download and Extract

This step is pretty straightforward.  We need to download and unzip a file.  When unzipped there is a single file that is almost a half dozen directories deep so we also clean that up for ease of use.

In [2]:
# Import functions for downloading and unzipping file
from download_utils import unzip_file, download_file

In [31]:
# Flatten the directory tree out
def clean_527(zip_path,extract_path,final_path):
    logger.info('Cleaning up archive...')
    shutil.move(f"{extract_path}/var/IRS/data/scripts/pofd/download/FullDataFile.txt",final_path)
    shutil.rmtree(extract_path)
    os.remove(zip_path)
    logger.info(f"FINAL RAW DATA FILE RELATIVE PATH: {final_path}")

In [32]:
# Download, unzip, flatten
def extract_data(url, zip_path,extract_path,final_path):
    download_file(url,zip_path)
    unzip_file(zip_path,extract_path)
    clean_527(zip_path,extract_path,final_path)

In [14]:
url = 'http://forms.irs.gov/app/pod/dataDownload/fullData'
base_dir = Path("./data/irs-filings")
zip_path = (base_dir/'data.zip')
extract_path = (base_dir/'unzipped/')
final_path = (base_dir/'raw_FullDataFile.txt')

In [None]:

extract_data(
    url, # URL where the data is located on the IRS Website
    zip_path, # The location we will download the data to
    extract_path, # The location we will unzip the file to
    final_path # The final unzipped file location and name
    )

# Clean and Split

Now that we have the file to read in, it gets a bit trickier.  The file is not in a standard format and we need to parse row by row.  The file is pipe delimited, but each row has different number of fields and belong to a different table.

In a normal "relational" structure this would be multiple different files and we are going to break it out in that way.

First, we need to get and set up our mappings.  This gives us our schema of what all the different types of rows we might encounter and what the names and data types of the fields in that type of row will be.

In [34]:
def load_mapping_file(fname, record_types = ["1","D","R","E","2","A","B"]):
    mappings = {}
    for r in record_types: mappings[r] = pd.read_excel(mappings_path,sheet_name=r)
    return mappings

In [35]:
def build_mappings(mapping_file):
    record_types = L(mapping_file.keys())
    mappings = {}
    for record_type in record_types:
        cols = L(o for o in mapping_file[record_type].columns)
        mapping = {}
        for row in mapping_file[record_type].values:
            mapping[row[cols.index('position')]] = (row[cols.index('model_name')],row[cols.index('field_type')])
        mappings[record_type] = mapping
    return mappings

Next we create functions to parse a given row using the mapping file.  It will determine the row type, look up the mapping for that row, and then set the datatype of each cell appropriately.

In [36]:
def clean_cell(cell, cell_type,NULL_TERMS = ['N/A','NOT APPLICABLE','NA','NONE','NOT APPLICABE','NOT APLICABLE','N A','N-A']):
    if cell_type == 'D': cell = datetime.strptime(cell, '%Y-%m-%d %H:%M:%S')
    elif cell_type == 'I': cell = int(cell)
    elif cell_type == 'N': cell = float(cell)
    else:
        cell = cell.upper()
        if len(cell) > 50: cell = cell[0:50]
        if not cell or cell in NULL_TERMS: cell = None
    return cell

In [37]:
def parse_row(row, mapping):
    fields = mapping
    parsed_row = {}
    for i, cell in enumerate(row[0:len(fields)]):
        field_name, field_type = fields[i]
        parsed_cell = clean_cell(cell, field_type)
        parsed_row[field_name] = parsed_cell
    return parsed_row

In [3]:
# Knowing the length of the file lets us know how much we have left to parse
def file_len(fname):
    with open(fname) as f:
        for i, l in enumerate(f): pass
    return i + 1

In [41]:
def process_file(final_path,mappings):
    with io.open(final_path, 'r', encoding='ISO-8859-1') as raw_file:
        reader = csv.reader(raw_file, delimiter='|')

        def def_value(): return []
        records = defaultdict(def_value)
        file_length = file_len(final_path)
        start_time = datetime.now()
        for i,row in enumerate(reader):
            try:
                form_type = str(row[0])
                if form_type in mappings.keys(): 
                    parsed_row = parse_row(row, mappings[form_type])
                    records[form_type].append(parsed_row)
                elif form_type in ("H","F"): logger.info(row)
            except IndexError:
                if row != '\n': records["error_idxs"].append(i)
            if i%10000 ==0:
                clear_output(wait=True)
                elapsed = datetime.now()-start_time
                time_per = elapsed/max(i,1)
                logger.info(f"{i} of {file_length} | {round((i/file_length)*100,2)}% | Elapsed={elapsed} | Time Per={time_per} | Remaining={time_per*(file_length-i)}")
    pickle.dump(dict(records), open('data/irs-filings/processed_lists.pickle', "wb" ) 

In [42]:
csv.field_size_limit(sys.maxsize)
final_path = (base_dir/'raw_FullDataFile.txt')
mappings_path = Path("DataConfigs/irs-filings/mappings.xlsx")
mappings = build_mappings(load_mapping_file(mappings_path))
process_file(final_path,mappings)

NameError: name 'build_mappings' is not defined

# Load

In [5]:
with open('data/irs-filings/processed_lists.pickle', 'rb') as handle:
    records = pickle.load(handle)

In [43]:
errors = L(records['error_idxs'])
new_errors = []
with open("data/irs-filings/raw_FullDataFile.txt") as fp:
    for i, line in enumerate(fp):
        if i in errors:
            if line == '\n': continue
            new_errors.append((i,line))
len(new_errors)

237

In [15]:
(base_dir/'raw_FullDataFile.txt')

Path('data/irs-filings/raw_FullDataFile.txt')

In [1]:
# conn = sqlite3.connect('./data/irs-filings/db.sqlite')for k in records.keys():
#     if k=='error_idxs': continue
#     print(f"type_{k} writings...")
#     pd.DataFrame(records[k]).to_sql(f"type_{k}",conn,if_exists="replace",index=False)

In [2]:
# pd.read_sql(f"select * from type_A limit 25",conn).head(3)