In [4]:
# System Libs
import csv
from collections import defaultdict
from datetime import datetime
import io
from pathlib import Path
import pandas as pd
from core import (extract_file_from_zip,
                   download_file,
                   clean_extraction_directory)

from dagster import asset


In [5]:

@asset
def raw_527_data():
    """
    Downloads the IRS 527 data zip file, extracts it, and prepares the data for processing.
    """

    url = 'http://forms.irs.gov/app/pod/dataDownload/fullData'
    base_dir = Path("output_data/irs_527")

    zip_path = (base_dir / 'data.zip')
    extract_path = (base_dir / 'unzipped/')
    final_path = (base_dir / 'raw_FullDataFile.txt')

    download_file(url, zip_path)
    extract_file_from_zip(zip_path, extract_path)
    clean_extraction_directory(zip_path, extract_path, final_path)

    return str(final_path)


In [6]:

@asset
def data_dictionary():
    """
    Load mapping data needed for processing 527 data from an Excel file and build mappings for each record type.

    Returns:
    - dict: A dictionary containing mappings for each record type.
    """
    # mappings_path = Path("input_data/IRS527/mappings.xlsx")
    mappings_path = Path("../../input_data/IRS527/mappings.xlsx")
    record_types = ["1", "D", "R", "E", "2", "A", "B"]

    # Load all mappings from the Excel file into a dictionary
    mappings = {r: pd.read_excel(mappings_path, sheet_name=r) for r in record_types}

    # Build a mapping dictionary for each record type
    combined_mappings = {}
    for record_type, df in mappings.items():
        # Check if required columns are present
        required_columns = ['position', 'model_name', 'field_type']
        if not all(col in df.columns for col in required_columns):
            raise ValueError(f"Missing one of the required columns in record type {record_type}: {required_columns}")

        # Build the mapping for the current record type
        mapping = {row['position']: (row['model_name'], row['field_type']) for _, row in df.iterrows()}
        combined_mappings[record_type] = mapping

    return combined_mappings



In [82]:
def clean_cell(cell, cell_type):
    """
    Cleans and converts a cell to the correct type based on cell_type.
    """
    null_terms = ['N/A', 'NOT APPLICABLE', 'NA', 'NONE', 'NOT APPLICABE', 'NOT APLICABLE', 'N A', 'N-A']

    if cell_type == 'D':
        # try:
        return datetime.strptime(cell, '%Y-%m-%d %H:%M:%S')
        # except:
        #     return datetime.strptime(cell, '%Y%m%d')
    elif cell_type == 'I':
        return int(cell)
    elif cell_type == 'N':
        return float(cell)
    else:
        cell = cell.upper()
        if len(cell) > 50:
            cell = cell[0:50]
        if cell in null_terms: cell = None
    return cell



In [57]:

def parse_row(row, mapping):
    """
    Parses a single row into a dictionary based on the provided mapping.
    """
    parsed_row = {}
    for i, cell in enumerate(row):
        try:
            parsed_cell = clean_cell(cell, mapping[i][1])
            parsed_row[mapping[i][0]] = parsed_cell
        except KeyError as e:
            if cell != '':
                raise KeyError(f"Unknown field position in row {row}: {i}")
        except Exception as e:
            print(row)
            raise e
    return parsed_row


In [58]:

def process_row(row_number, row, mappings, records):
    """
    Processes a single row and adds it to the records dictionary.
    """
    form_type = str(row[0])
    
    if form_type in ('H', 'F'):
        print(row)
    else:
        parsed_row = parse_row(row, mappings[form_type])
        records[form_type].append(parsed_row)



In [83]:
def fix_malformed(line):
    malformed = (
        ('|"I Factor|', '|"I Factor"|'),
        ('''|"N/A'|''','''|"N/A"|'''),
        ('''|"522 Highland Avenue |''','''|"522 Highland Avenue"|'''),
        ('''|"AV-TECH INDUSTRIES|''','''|"AV-TECH INDUSTRIES"|'''),
        ('''|"c/o Moxie Innovative|''','''|"c/o Moxie Innovative"|''')
        )
    
    for m in malformed:
        if line.count(m[0])>0:
            line = line.replace(m[0], m[1])

    return line

In [84]:
def clean_527_data(raw_527_data: str, data_dictionary: dict):
    """
    Processes the file at final_path using the provided mappings for each form type.
    """
    records = defaultdict(list)
    with io.open(raw_527_data, 'r', encoding='ISO-8859-1') as raw_file:
        reader = csv.reader(map(fix_malformed, raw_file), delimiter='|')
        for i, row in enumerate(reader):
            if len(row) == 0:
                continue
            if row[0] in data_dictionary.keys():
                process_row(i, previous_row, data_dictionary, records)
                previous_row = row
            elif row[0] in ('H','F'):
                previous_row = row
            else:
                previous_row = previous_row[:-1] + [previous_row[-1] + row[0]] + row[1:]

        return records


In [85]:
_ = clean_527_data(raw_fpath, data_mappings)

['H', '20231104', '0316', 'F', '']


KeyboardInterrupt: 

In [86]:
datetime.strptime('20230512', '%Y-%m-%d %H:%M:%S')

ValueError: time data '20230512' does not match format '%Y-%m-%d %H:%M:%S'

In [12]:
raw_fpath = raw_527_data()

In [13]:
data_mappings = data_dictionary()

In [55]:
dte = '20130306'
# parse dte into datetime tiype
datetime.strptime('20130306', '%Y%m%d')

datetime.datetime(2013, 3, 6, 0, 0)

In [67]:
row = ['1', '8871', '9616076', '1', '0', '0', '462200647', 'Committee to Elect Joseph Estes For Palmdale Water District ', '36055 43rd St East', '', 'Palmdale', 'CA', '93552', '6235', 'jestesforpwd@gmail.com', '20130306', 'Elsie Estes', '36055 43rd St East', '', 'Palmdale', 'CA', '93552', '6235', 'Elsie Estes', '36055 43rd St East', '', 'Palmdale', 'CA', '93552', '6235', '36055 43rd St East', '', 'Palmdale', 'CA', '93552', '6235', '1', 'CA', '1', 'Candidate Running For Palmdale Water District. ', 'Accepting Campaign and Making Campaign Contributions.', '20130306', '2013-03-06 16:32:00', '1', '1']

In [70]:
[row[-1] + row[1]]

['18871']

In [66]:
for v,c in zip(row,data_mappings['1'].values()):
    print((c,v))

(('record_type', 'N'), '1')
(('form_type', 'N'), '8871')
(('form_id_number', 'N'), '9616076')
(('initial_report_indicator', 'N'), '1')
(('amended_report_indicator', 'N'), '0')
(('final_report_indicator', 'N'), '0')
(('ein', 'C'), '462200647')
(('organization_name', 'C'), 'Committee to Elect Joseph Estes For Palmdale Water District ')
(('mailing_address_1', 'C'), '36055 43rd St East')
(('mailing_address_2', 'C'), '')
(('mailing_address_city', 'C'), 'Palmdale')
(('mailing_address_state', 'C'), 'CA')
(('mailing_address_zip_code', 'C'), '93552')
(('mailing_address_zip_ext', 'C'), '6235')
(('e_mail_address', 'C'), 'jestesforpwd@gmail.com')
(('established_date', 'C'), '20130306')
(('custodian_name', 'C'), 'Elsie Estes')
(('custodian_address_1', 'C'), '36055 43rd St East')
(('custodian_address_2', 'C'), '')
(('custodian_address_city', 'C'), 'Palmdale')
(('custodian_address_state', 'C'), 'CA')
(('custodian_address_zip_code', 'C'), '93552')
(('custodian_address_zip_ext', 'C'), '6235')
(('contac