In [1]:
from azureml.core import Workspace, Dataset, Datastore

subscription_id = 'f48a2553-c966-4d06-8faa-c5096da10254'
resource_group = 'rg-fecdata'
workspace_name = 'fecaml'

workspace = Workspace(subscription_id, resource_group, workspace_name)

datastore = workspace.datastores['fecrawzips']

metadata_dataset = Dataset.get_by_name(workspace, name='fecfileformats')
local_metadata_dataset = metadata_dataset.download()[0]

output_datastore = workspace.datastores['fecparquetoutputs']

In [2]:
# This is used in a notebook - will eventually be pushed into git as a notebook.

import datetime
from os.path import join, dirname
from os import makedirs
import tempfile
from pyarrow import csv, parquet, string
import ntpath


output_delimiter = '\t'


class SchemaHandler(object):

    def __init__(self, definitions):
        self.feclookup = definitions
        self.schema_cache : dict[str, dict[str, list[str]]]= {}  # file_version => (line type => schema)

    def get_schema(self, version, linetype):
        linetype = linetype.upper()
        if linetype in ['H1', 'H2', 'H3', 'H4', 'H5', 'H6', 'H7']:
            # handle weird data bug
            linetype = 'S' + linetype
        
        versioned_formdata = self.feclookup['v' + version]
        i = len(linetype)

        while i >= 0 and linetype[:i] not in versioned_formdata:
            i -= 1

        if not linetype:
            raise Exception("Could not match linetype {0} on version {1}".format(linetype, version))

        final_key = linetype[:i]
        return final_key, versioned_formdata.get(final_key, dict())

    def get_schema_string(self, fileversion, clean_linetype):
        if fileversion not in self.schema_cache:
            self.schema_cache[fileversion] = {}

        if clean_linetype not in self.schema_cache[fileversion]:
            if clean_linetype == 'error':
                self.schema_cache[fileversion][clean_linetype] = output_delimiter.join(['clean_linetype', 'upload_date', 'linetype', 'error', 'filename']).encode()
            else:
                _, schema = self.get_schema(fileversion, clean_linetype)
                schema = list(schema)
                schema.insert(0, 'upload_date')
                schema.insert(0, 'clean_linetype')
                schema.append('filename')
                self.schema_cache[fileversion][clean_linetype] = output_delimiter.join(schema).encode()

        final_value = self.schema_cache[fileversion][clean_linetype]
        return final_value


class LowMemoryFecFileParser(object):
    """
    Given a file from the FEC, apply correct definitions.
    """

    def __init__(self, schema_handler, upload_date, line_aggregator):
        self.schema_handler = schema_handler
        self.upload_date = upload_date
        self.line_aggregator = line_aggregator
        self.schema_cache : dict[str, dict[str, list[str]]]= {}  # file_version => (line type => schema)

    def processfile(self, filehandle, filename):
        """
        Process all lines of a file and list of dictionaries, one per line.
        """
        first_line = filehandle.readline()
        first_line = first_line.replace('"', '').strip().split(chr(28))
        if first_line[0] != "HDR":
            raise Exception("Failed to parse: HDR expected on first line")

        fileversion = first_line[2].strip()

        in_comment = False

        for line in filehandle:
            line = line.strip()
            line = line.replace('"', '')

            if not line:
                continue
            
            if line == '[BEGINTEXT]':
                in_comment = True
                continue
            elif in_comment:
                if line == '[ENDTEXT]':
                    in_comment = False
                continue

            line : list[str] = line.split(chr(28))
            line = [l.replace(output_delimiter, ' ') for l in line]
            linetype = line[0]
            clean_linetype, schema = self.schema_handler.get_schema(fileversion, linetype)
            
            # Send the line to right place.
            if schema:
                if len(schema) < len(line):
                    line = line[:len(schema)]
                while len(line) < len(schema):
                    line.append('')

                line.insert(0, self.upload_date)
                line.insert(0, clean_linetype)
                line.append(ntpath.basename(filename))
            else:
                clean_linetype = 'error'
                print(f"Error row: {line}")
                line = [clean_linetype, self.upload_date, linetype, "NoSchema", filename]

            line_out = output_delimiter.join(line)
            self.write_line(clean_linetype, line_out, fileversion)

    def summarize_schema_cache(self):
        for file_version, cache in self.schema_cache.items():
            for line_type, final_cache in cache.items():
                cache_s = final_cache.decode().split(output_delimiter)
                cache_size = len(cache_s)
                print(f"{file_version}, {line_type}, {cache_size}")

    def write_line(self, clean_linetype : str, line : str, fileversion : str):
        self.line_aggregator.write(clean_linetype, line, fileversion)

    def finalize(self):
        self.line_aggregator.finalize()


class LineAggregator(object):

    def __init__(self, schema_handler, dateprefix, converter_handler):
        self.schema_handler = schema_handler
        self.dateprefix = dateprefix
        self.file_pointers : dict[str, tempfile.TemporaryFile] = {}  # lineType -> fp
        self.file_sizes : dict[str, int] = {}  # lineType -> current file size 
        self.converter_handler = converter_handler

        self.file_size_limit = 1024 * 1024 * 200

    def write(self, clean_linetype : str, line : str, file_version: str):
        # Note that original_schema does not contain our added columns.
        # It's used in converter to force null types to behave. Our added columns already do.
        schema_str = self.schema_handler.get_schema_string(file_version, clean_linetype)
        if not self._get_file(file_version, clean_linetype):
            self._set_file(file_version, clean_linetype, schema_str)

        line = line + '\n'
        line = line.encode()
        self.file_sizes[clean_linetype] += self._get_file(file_version, clean_linetype).write(line)
        
        if self.file_sizes[clean_linetype] > self.file_size_limit:
            _, original_schema = self.schema_handler.get_schema(file_version, clean_linetype)
            self.converter_handler.convert(clean_linetype, self._get_file(file_version, clean_linetype), original_schema)
            self._set_file(file_version, clean_linetype, schema_str)

    def _get_file(self, file_version : str, clean_linetype : str):
        version_pointers = self.file_pointers.get(file_version)
        if not version_pointers:
            return None
        return version_pointers.get(clean_linetype)

    def _set_file(self, fileversion : str, clean_linetype : str, schema_str : str):
        if fileversion not in self.file_pointers:
            self.file_pointers[fileversion] = {}

        if clean_linetype in self.file_pointers[fileversion]:
            del self.file_pointers[fileversion][clean_linetype]

        local_file_handle = tempfile.TemporaryFile()

        self.file_sizes[clean_linetype] = local_file_handle.write(schema_str)
        local_file_handle.write('\n'.encode())
        self.file_pointers[fileversion][clean_linetype] = local_file_handle
        
        return local_file_handle

    def finalize(self):
        for fileversion, pointers in self.file_pointers.items():
            for clean_linetype, file_pointer in pointers.items():
                _, original_schema = self.schema_handler.get_schema(fileversion, clean_linetype)
                self.converter_handler.convert(clean_linetype, file_pointer, original_schema)
        self.file_pointers = {}
        self.file_sizes = {}


class ParquetConverter(object):

    def __init__(self, root_folder, date_pattern) -> None:
        self.root_folder = root_folder
        self.date_pattern = date_pattern
        self.counter = 0
        self.files = {}  # line type => filenames

    def convert(self, line_type: str, file_pointer, original_schema):
        """Convert a delimited file to parquet"""
        file_pointer.flush()
        file_pointer.seek(0)

        column_opts_dict = {}
        for col in original_schema:
            column_opts_dict[col] = string()

        try:
            df = csv.read_csv(
                file_pointer, 
                parse_options=csv.ParseOptions(delimiter=output_delimiter), 
                convert_options=csv.ConvertOptions(column_types=column_opts_dict)
            )
        except Exception:
            print(f"Failed on {line_type}. Dumping")

            emh = open('/tmp/emergency_dump.csv', 'wb')
            file_pointer.seek(0)
            emh.write(file_pointer.read())
            emh.close()

            raise

        local_filename = join(self.root_folder, line_type, f'{self.date_pattern}_{self.counter}.snappy.parquet')
        self.counter += 1
        makedirs(dirname(local_filename), exist_ok=True)
        new_fp = open(local_filename, 'wb')

        parquet.write_table(df, new_fp, flavor='spark')
        new_fp.close()
        return local_filename   


    def consolidate(self):
        """For each line type, consolidate the folder to a single parquet file and remove nulls"""
        pass



def build_parser(fec_definitions, parquet_root, date_pattern):
    utc_timestamp = str(datetime.datetime.utcnow())
    schema_handler = SchemaHandler(fec_definitions)
    parquet_convert = ParquetConverter(parquet_root, date_pattern)
    line_aggregator = LineAggregator(schema_handler, date_pattern, parquet_convert)
    return LowMemoryFecFileParser(schema_handler, utc_timestamp, line_aggregator)


def upload(local_path):
    remote_path = f'amluploads/electronic/'
    print(f'Uploading {local_path} to {remote_path}')
    output_datastore.upload(local_path, remote_path, overwrite=True, show_progress=False)


def get_tempdir(reason):
    td = tempfile.TemporaryDirectory()
    print(f"Got {td.name} for {reason}")
    return td

def process_file(files, datepattern, unzip_tempdir):
    parquet_folder = get_tempdir("parquet_folder")
    parser = build_parser(fec_definitions, parquet_folder.name, datepattern)

    print(f"Starting to operate on {len(files)} files")
    try:
        for rawfilename in files:
            # run in blocks of 100 files, or otherwise watch for full disk. On full/100 then run convert/upload then cleanup/recreate
            # your temp space.
            filename = join(unzip_tempdir.name, rawfilename)
            fh = open(filename, 'r', errors='ignore')
            parser.processfile(fh, filename)
        parser.finalize()
        upload(parquet_folder.name)
    finally:
        print("Cleaning up")
        parquet_folder.cleanup()
    

In [3]:
from os import listdir
import json

import zipfile
import tempfile
import datetime

dt = datetime.datetime(2021, 4, 20)
while dt < datetime.datetime(2021, 5, 1):
    datepattern = datetime.datetime.strftime(dt, '%Y%m%d')
    print(f"Working on {datepattern}")

    ds = Dataset.File.from_files((datastore, f'electronic/{datepattern}.zip'))
    downloaded_files = ds.download()

    unzip_tempdir = get_tempdir("rawdownload")
    with zipfile.ZipFile(open(downloaded_files[0], 'rb')) as zipObj:
        zipObj.extractall(unzip_tempdir.name)

    fec_definitions = json.loads(open(local_metadata_dataset, 'r').read())
    files = listdir(unzip_tempdir.name)
    process_file(files, datepattern, unzip_tempdir)
    unzip_tempdir.cleanup()

    dt += datetime.timedelta(days=1)


Working on 20210420
Got /tmp/tmpxwhg2kln for rawdownload
Got /tmp/tmpau8y4345 for parquet_folder
Starting to operate on 614 files
Uploading /tmp/tmpau8y4345 to amluploads/electronic/
Cleaning up
Working on 20210421
Got /tmp/tmpqmqke7d_ for rawdownload
Got /tmp/tmp82m3vlh9 for parquet_folder
Starting to operate on 170 files
Uploading /tmp/tmp82m3vlh9 to amluploads/electronic/
Cleaning up
Working on 20210422
Got /tmp/tmpx4ukmhlh for rawdownload
Got /tmp/tmptnz4y8nz for parquet_folder
Starting to operate on 109 files
Uploading /tmp/tmptnz4y8nz to amluploads/electronic/
Cleaning up
Working on 20210423
Got /tmp/tmp0ehlslkb for rawdownload
Got /tmp/tmpt2rv4l7g for parquet_folder
Starting to operate on 95 files
Uploading /tmp/tmpt2rv4l7g to amluploads/electronic/
Cleaning up
Working on 20210424
Got /tmp/tmpjjgpuody for rawdownload
Got /tmp/tmptdlkb8ms for parquet_folder
Starting to operate on 33 files
Uploading /tmp/tmptdlkb8ms to amluploads/electronic/
Cleaning up
Working on 20210425
Got /tm